Class: Raif::ModelCompletionBatch

Inherits:
ApplicationRecord
  • Object
show all
Defined in:
app/models/raif/model_completion_batch.rb

Constant Summary collapse

STATUSES =
%w[pending submitted in_progress ended canceled expired failed].freeze
TERMINAL_STATUSES =
%w[ended canceled expired failed].freeze

Instance Method Summary collapse

Instance Method Details

#add_task(task, batch_custom_id: nil) ⇒ Raif::Task

Attaches an existing Raif::Task to this batch as a pending child completion. The task is persisted if not already, then routed through Raif::Task#prepare_for_batch! to populate prompts and build the pending Raif::ModelCompletion.

Pair with Raif::Llm#create_batch when the producer constructs tasks outside of the batch (composing them in a loop, in a service object, etc.). For the one-call shortcut – build + save + attach in a single call – use Raif::Task.build_for_batch instead.

Parameters:

  • task (Raif::Task)
  • batch_custom_id (String, nil) (defaults to: nil)

    unique-within-batch identifier; defaults to "raif_task_<task.id>".

Returns:

  • (Raif::Task)

    the same task, now attached to this batch.



113
114
115
116
117
# File 'app/models/raif/model_completion_batch.rb', line 113

def add_task(task, batch_custom_id: nil)
  task.save! if task.new_record?
  task.prepare_for_batch!(batch: self, batch_custom_id: batch_custom_id)
  task
end

#assert_submittable!Object

Idempotency guard for batch submission. Raises Raif::Errors::InvalidBatchError if the batch already has a provider_batch_id or has moved past pending, so a duplicate batch.submit! / llm.submit_batch!(batch) call cannot create a second provider-side batch and orphan the first one. Called by every provider's #submit_batch! at the top of the method.



191
192
193
194
195
196
197
198
# File 'app/models/raif/model_completion_batch.rb', line 191

def assert_submittable!
  return if status == "pending" && provider_batch_id.blank?

  raise Raif::Errors::InvalidBatchError,
    "Raif::ModelCompletionBatch ##{id} is not submittable: status=#{status}, " \
      "provider_batch_id=#{provider_batch_id.inspect}. submit! / submit_batch! " \
      "is single-shot; use #cancel! and create a new batch if you need to retry."
end

#cancel!Object

Asks the provider to cancel the batch. Cancellation is asynchronous on both Anthropic and OpenAI: the provider acknowledges with a transitional status and the next poll picks up the final canceled state. The polling job then routes the canceled batch through the same finalize/dispatch path as any other terminal status, force-failing remaining children.



182
183
184
# File 'app/models/raif/model_completion_batch.rb', line 182

def cancel!
  llm.cancel_batch!(self)
end

#dispatch_completion_handler!Object

Resolves and invokes the batch's completion handler, if one is configured. The handler class must implement .handle_batch_completion(batch).

Idempotent via handler_dispatched_at: once a successful run finishes, future callers (the polling job's terminal-batch path, the safety sweep, an ActiveJob retry of either) skip the dispatch. If the handler raises, handler_dispatched_at stays NULL so a future caller can re-dispatch -- without that retry path the polling job's return if batch.terminal? guard would silently swallow handler-raised errors and the consumer's on_batch_completion block would never run.

At-most-once across concurrent callers: the guard + handler invocation + timestamp write are wrapped in #with_lock so a normal poll job racing with the safety sweep (or a resume-stalled poll) cannot both pass the blank-handler_dispatched_at check and run the handler twice. Cheap callers still short-circuit before taking the lock.



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'app/models/raif/model_completion_batch.rb', line 216

def dispatch_completion_handler!
  return if handler_dispatched_at.present?
  return if completion_handler_class_name.blank?
  # Local-side resolution must be complete before the handler runs --
  # otherwise the handler would dispatch against child completions that
  # haven't been hydrated with their provider results yet, which is how
  # batches got silently stranded under the prior code path.
  return if results_fetched_at.blank?

  handler = completion_handler_class_name.safe_constantize
  if handler.blank?
    Raif.logger.error(
      "Raif::ModelCompletionBatch##{id} has completion_handler_class_name=#{completion_handler_class_name.inspect} " \
        "which could not be resolved to a class. Skipping handler dispatch."
    )
    return
  end

  with_lock do
    # Re-check under lock: another worker may have dispatched between our
    # early-return check above and lock acquisition.
    return if handler_dispatched_at.present?

    handler.handle_batch_completion(self)
    update_column(:handler_dispatched_at, Time.current)
  end
end

#enqueue_first_poll!Object

Stamps next_poll_at and enqueues the first Raif::PollModelCompletionBatchJob using the first entry of Raif.config.model_completion_batch_poll_schedule. Called by #submit! by default; can also be invoked manually if a host opted out of auto-enqueue (#submit!(enqueue_poll: false)) and wants to start polling later.

Guards against being called on a batch that hasn't been submitted yet (no provider_batch_id, so the polling job's fetch_batch_status! would 404 and burn the entire poll schedule until max_age expiry) or that's already terminal (nothing to poll for). Raises Raif::Errors::InvalidBatchError so a misordered call surfaces immediately instead of silently scheduling a doomed poll chain.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'app/models/raif/model_completion_batch.rb', line 152

def enqueue_first_poll!
  if provider_batch_id.blank?
    raise Raif::Errors::InvalidBatchError,
      "Raif::ModelCompletionBatch ##{id}#enqueue_first_poll! requires provider_batch_id; " \
        "call submit! (or llm.submit_batch!(batch)) first."
  end
  if terminal?
    raise Raif::Errors::InvalidBatchError,
      "Raif::ModelCompletionBatch ##{id}#enqueue_first_poll! refusing to schedule a poll " \
        "for a terminal batch (status=#{status})."
  end

  delay = Array(Raif.config.model_completion_batch_poll_schedule).first || 1.minute
  update_column(:next_poll_at, delay.from_now)
  Raif::PollModelCompletionBatchJob.set(wait: delay).perform_later(id)
end

#expire!(reason:) ⇒ Object

Used by the expiry paths (the polling job's max_age_exceeded? branch and Raif::ExpireStuckModelCompletionBatchesJob) when we decide to give up on a batch the provider hasn't finalized yet. Distinct from #force_fail!, which only flips local state -- this method also issues a best-effort provider-side cancel so we don't keep paying for batch results we've stopped reading and don't keep occupying the provider's per-org concurrent-batch quota.

Cancellation is best-effort: if it fails (5xx, network, auth, etc.) we log and continue, because the local force-fail still has to happen so any waiting workflow can advance. A still-running provider-side batch will eventually self-finalize via the provider's own timer; the worst case is paying for results we discard.

Skips the cancel call entirely when the batch is already terminal (the provider's done with it) or hasn't been submitted yet (no provider_batch_id to cancel).



308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'app/models/raif/model_completion_batch.rb', line 308

def expire!(reason:)
  if !terminal? && provider_batch_id.present?
    begin
      llm.cancel_batch!(self)
    rescue StandardError => e
      Raif.logger.warn(
        "Raif::ModelCompletionBatch ##{id} best-effort provider-side cancel failed " \
          "while expiring: #{e.class}: #{e.message}. Continuing with local force-fail; " \
          "the provider-side batch may still complete and be billed."
      )
    end
  end

  force_fail!(reason: reason)
end

#fetch_results!Object



173
174
175
# File 'app/models/raif/model_completion_batch.rb', line 173

def fetch_results!
  llm.fetch_batch_results!(self)
end

#fetch_status!Object



169
170
171
# File 'app/models/raif/model_completion_batch.rb', line 169

def fetch_status!
  llm.fetch_batch_status!(self)
end

#finalize!Object

Called by the polling job once the batch reaches a terminal status. On ended (successful), fetches per-entry results from the provider. On the other terminal statuses (canceled / expired / failed), force-fails every still-pending child completion since there are no per-entry results to collect.

Idempotent via results_fetched_at: once a successful run finishes, future callers (the poll job's terminal-at-top path, the safety sweep, an ActiveJob retry of either) skip the provider round-trip. If fetch_results! raises mid-stream, results_fetched_at stays NULL so a subsequent finalize! call retries the fetch -- this is how the poll chain self-heals from transient provider errors on the results endpoint without permanently stranding the child completions.

At-most-once across concurrent callers: the guard + fetch + stamp are wrapped in #with_lock so a normal poll job racing with the safety sweep (or a resume-stalled poll) cannot both pass the blank- results_fetched_at check and double-call fetch_results!. Cheap callers still short-circuit before taking the lock.



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'app/models/raif/model_completion_batch.rb', line 272

def finalize!
  return if results_fetched_at.present?

  with_lock do
    # Re-check under lock: another worker may have hydrated results
    # between our early-return check above and lock acquisition.
    return if results_fetched_at.present?

    if successful?
      fetch_results!
      update_column(:results_fetched_at, Time.current)
    else
      # force_fail! sets results_fetched_at inside its transaction so the
      # success/failure paths converge on the same on-disk signal.
      force_fail!(reason: "Batch ended with status: #{status}")
    end
  end
end

#force_fail!(reason:) ⇒ Object

Marks every non-terminal child completion as failed and sets the batch to failed (preserving an already-terminal status, e.g. canceled). Idempotent: children already completed or failed are skipped.

Wrapped in a transaction so a partial failure mid-iteration rolls back the batch-status update too. Without this, an exception while flipping children would leave the batch terminal and prevent the polling/expire jobs from re-entering this path on a future run.

Local-only: does not touch the provider. Use #expire! when expiring a batch the provider hasn't finalized yet, so the provider-side batch is canceled on a best-effort basis.



336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'app/models/raif/model_completion_batch.rb', line 336

def force_fail!(reason:)
  reason_str = reason.to_s.truncate(255)

  transaction do
    unless terminal?
      update!(status: "failed", failed_at: Time.current, failure_reason: reason_str)
    end

    raif_model_completions.each do |mc|
      mc.reload
      next if mc.completed? || mc.failed?

      mc.failure_error = "Raif::ModelCompletionBatch ##{id} #{status}"
      mc.failure_reason = reason_str
      mc.update_columns(started_at: started_at) if mc.started_at.nil? && started_at.present?
      mc.failed!
    end

    update_column(:results_fetched_at, Time.current) if results_fetched_at.blank?
  end
end

#llmObject



82
83
84
# File 'app/models/raif/model_completion_batch.rb', line 82

def llm
  Raif.llm(llm_model_key.to_sym)
end

#max_age_exceeded?Boolean

True once submitted_at is older than Raif.config.model_completion_batch_max_age. Used by the polling and expire-stuck jobs to decide when to force-fail a batch that the provider hasn't finalized in time.

Returns:

  • (Boolean)


247
248
249
250
251
# File 'app/models/raif/model_completion_batch.rb', line 247

def max_age_exceeded?
  return false if .blank?

  Time.current -  >= Raif.config.model_completion_batch_max_age
end

#recalculate_costs!Object

Aggregates total_cost / prompt_token_cost / output_token_cost from child completions after results have been applied. Should be called by the polling job once all children have been finalized.



361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'app/models/raif/model_completion_batch.rb', line 361

def recalculate_costs!
  if raif_model_completions.empty?
    Raif.logger.warn(
      "Raif::ModelCompletionBatch ##{id}#recalculate_costs! skipped: no child raif_model_completions to aggregate"
    )
    return
  end

  # Three .sum calls instead of one Arel-flavored pick. The dataset is bounded
  # by batch size and these only run once per batch finalization, so the extra
  # round-trips are immaterial and the call is much easier to read.
  prompt_sum = raif_model_completions.sum(:prompt_token_cost)
  output_sum = raif_model_completions.sum(:output_token_cost)
  total_sum = raif_model_completions.sum(:total_cost)

  # ActiveRecord's .sum returns 0 (not nil) when every row's column is NULL.
  # Skip the write if everything is zero so we don't null-out manually-set
  # batch-level cost values from the host (rare, but cheap to guard).
  if prompt_sum.zero? && output_sum.zero? && total_sum.zero?
    Raif.logger.warn(
      "Raif::ModelCompletionBatch ##{id}#recalculate_costs! skipped: every child completion's " \
        "prompt_token_cost / output_token_cost / total_cost is NULL or zero"
    )
    return
  end

  update_columns(
    prompt_token_cost: prompt_sum,
    output_token_cost: output_sum,
    total_cost: total_sum,
    updated_at: Time.current
  )
end

#submit!(enqueue_poll: true) ⇒ Raif::ModelCompletionBatch

Submits the batch to the provider and (by default) enqueues Raif::PollModelCompletionBatchJob so the polling chain starts on its own. Pass enqueue_poll: false if your host app is driving its own poll scheduler off the Raif::ModelCompletionBatch.due_for_poll scope.

Parameters:

  • enqueue_poll (Boolean) (defaults to: true)

    whether to auto-enqueue the polling job

Returns:



134
135
136
137
138
# File 'app/models/raif/model_completion_batch.rb', line 134

def submit!(enqueue_poll: true)
  result = llm.submit_batch!(self)
  enqueue_first_poll! if enqueue_poll
  result
end

#successful?Boolean

Returns:

  • (Boolean)


78
79
80
# File 'app/models/raif/model_completion_batch.rb', line 78

def successful?
  status == "ended"
end

#tasksObject

Convenience accessor for batches whose children were produced via Raif::Task.build_for_batch (the typical case). Returns the Raif::Task records attached to this batch through their child Raif::ModelCompletions.

Heterogeneous batches that mix Raif::Task producers with raw Raif::Llm#build_pending_model_completion producers will see only the Raif::Task subset here; use raif_model_completions for full coverage.



93
94
95
96
97
# File 'app/models/raif/model_completion_batch.rb', line 93

def tasks
  Raif::Task.where(
    id: raif_model_completions.where(source_type: "Raif::Task").select(:source_id)
  )
end

#terminal?Boolean

Returns:

  • (Boolean)


74
75
76
# File 'app/models/raif/model_completion_batch.rb', line 74

def terminal?
  TERMINAL_STATUSES.include?(status)
end