Class: Raif::ModelCompletionBatch

Inherits:
ApplicationRecord
  • Object
show all
Defined in:
app/models/raif/model_completion_batch.rb

Constant Summary collapse

STATUSES =
%w[pending submitted in_progress ended canceled expired failed].freeze
TERMINAL_STATUSES =
%w[ended canceled expired failed].freeze

Instance Method Summary collapse

Instance Method Details

#add_task(task, batch_custom_id: nil) ⇒ Raif::Task

Attaches an existing Raif::Task to this batch as a pending child completion. The task is persisted if not already, then routed through Raif::Task#prepare_for_batch! to populate prompts and build the pending Raif::ModelCompletion.

Pair with Raif::Llm#create_batch when the producer constructs tasks outside of the batch (composing them in a loop, in a service object, etc.). For the one-call shortcut – build + save + attach in a single call – use Raif::Task.build_for_batch instead.

Parameters:

  • task (Raif::Task)
  • batch_custom_id (String, nil) (defaults to: nil)

    unique-within-batch identifier; defaults to "raif_task_<task.id>".

Returns:

  • (Raif::Task)

    the same task, now attached to this batch.



112
113
114
115
116
# File 'app/models/raif/model_completion_batch.rb', line 112

def add_task(task, batch_custom_id: nil)
  task.save! if task.new_record?
  task.prepare_for_batch!(batch: self, batch_custom_id: batch_custom_id)
  task
end

#assert_submittable!Object

Idempotency guard for batch submission. Raises Raif::Errors::InvalidBatchError if the batch already has a provider_batch_id or has moved past pending, so a duplicate batch.submit! / llm.submit_batch!(batch) call cannot create a second provider-side batch and orphan the first one. Called by every provider's #submit_batch! at the top of the method.



190
191
192
193
194
195
196
197
# File 'app/models/raif/model_completion_batch.rb', line 190

def assert_submittable!
  return if status == "pending" && provider_batch_id.blank?

  raise Raif::Errors::InvalidBatchError,
    "Raif::ModelCompletionBatch ##{id} is not submittable: status=#{status}, " \
      "provider_batch_id=#{provider_batch_id.inspect}. submit! / submit_batch! " \
      "is single-shot; use #cancel! and create a new batch if you need to retry."
end

#cancel!Object

Asks the provider to cancel the batch. Cancellation is asynchronous on both Anthropic and OpenAI: the provider acknowledges with a transitional status and the next poll picks up the final canceled state. The polling job then routes the canceled batch through the same finalize/dispatch path as any other terminal status, force-failing remaining children.



181
182
183
# File 'app/models/raif/model_completion_batch.rb', line 181

def cancel!
  llm.cancel_batch!(self)
end

#dispatch_completion_handler!Object

Resolves and invokes the batch's completion handler, if one is configured. The handler class must implement .handle_batch_completion(batch).

Idempotent via handler_dispatched_at: once a successful run finishes, future callers (the polling job's terminal-batch path, the safety sweep, an ActiveJob retry of either) skip the dispatch. If the handler raises, handler_dispatched_at stays NULL so a future caller can re-dispatch -- without that retry path the polling job's return if batch.terminal? guard would silently swallow handler-raised errors and the consumer's on_batch_completion block would never run.



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'app/models/raif/model_completion_batch.rb', line 209

def dispatch_completion_handler!
  return if handler_dispatched_at.present?
  return if completion_handler_class_name.blank?

  handler = completion_handler_class_name.safe_constantize
  if handler.blank?
    Raif.logger.error(
      "Raif::ModelCompletionBatch##{id} has completion_handler_class_name=#{completion_handler_class_name.inspect} " \
        "which could not be resolved to a class. Skipping handler dispatch."
    )
    return
  end

  handler.handle_batch_completion(self)
  update_column(:handler_dispatched_at, Time.current)
end

#enqueue_first_poll!Object

Stamps next_poll_at and enqueues the first Raif::PollModelCompletionBatchJob using the first entry of Raif.config.model_completion_batch_poll_schedule. Called by #submit! by default; can also be invoked manually if a host opted out of auto-enqueue (#submit!(enqueue_poll: false)) and wants to start polling later.

Guards against being called on a batch that hasn't been submitted yet (no provider_batch_id, so the polling job's fetch_batch_status! would 404 and burn the entire poll schedule until max_age expiry) or that's already terminal (nothing to poll for). Raises Raif::Errors::InvalidBatchError so a misordered call surfaces immediately instead of silently scheduling a doomed poll chain.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'app/models/raif/model_completion_batch.rb', line 151

def enqueue_first_poll!
  if provider_batch_id.blank?
    raise Raif::Errors::InvalidBatchError,
      "Raif::ModelCompletionBatch ##{id}#enqueue_first_poll! requires provider_batch_id; " \
        "call submit! (or llm.submit_batch!(batch)) first."
  end
  if terminal?
    raise Raif::Errors::InvalidBatchError,
      "Raif::ModelCompletionBatch ##{id}#enqueue_first_poll! refusing to schedule a poll " \
        "for a terminal batch (status=#{status})."
  end

  delay = Array(Raif.config.model_completion_batch_poll_schedule).first || 1.minute
  update_column(:next_poll_at, delay.from_now)
  Raif::PollModelCompletionBatchJob.set(wait: delay).perform_later(id)
end

#expire!(reason:) ⇒ Object

Used by the expiry paths (the polling job's max_age_exceeded? branch and Raif::ExpireStuckModelCompletionBatchesJob) when we decide to give up on a batch the provider hasn't finalized yet. Distinct from #force_fail!, which only flips local state -- this method also issues a best-effort provider-side cancel so we don't keep paying for batch results we've stopped reading and don't keep occupying the provider's per-org concurrent-batch quota.

Cancellation is best-effort: if it fails (5xx, network, auth, etc.) we log and continue, because the local force-fail still has to happen so any waiting workflow can advance. A still-running provider-side batch will eventually self-finalize via the provider's own timer; the worst case is paying for results we discard.

Skips the cancel call entirely when the batch is already terminal (the provider's done with it) or hasn't been submitted yet (no provider_batch_id to cancel).



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'app/models/raif/model_completion_batch.rb', line 265

def expire!(reason:)
  if !terminal? && provider_batch_id.present?
    begin
      llm.cancel_batch!(self)
    rescue StandardError => e
      Raif.logger.warn(
        "Raif::ModelCompletionBatch ##{id} best-effort provider-side cancel failed " \
          "while expiring: #{e.class}: #{e.message}. Continuing with local force-fail; " \
          "the provider-side batch may still complete and be billed."
      )
    end
  end

  force_fail!(reason: reason)
end

#fetch_results!Object



172
173
174
# File 'app/models/raif/model_completion_batch.rb', line 172

def fetch_results!
  llm.fetch_batch_results!(self)
end

#fetch_status!Object



168
169
170
# File 'app/models/raif/model_completion_batch.rb', line 168

def fetch_status!
  llm.fetch_batch_status!(self)
end

#finalize!Object

Called by the polling job once the batch reaches a terminal status. On ended (successful), fetches per-entry results from the provider. On the other terminal statuses (canceled / expired / failed), force-fails every still-pending child completion since there are no per-entry results to collect.



240
241
242
243
244
245
246
# File 'app/models/raif/model_completion_batch.rb', line 240

def finalize!
  if successful?
    fetch_results!
  else
    force_fail!(reason: "Batch ended with status: #{status}")
  end
end

#force_fail!(reason:) ⇒ Object

Marks every non-terminal child completion as failed and sets the batch to failed (preserving an already-terminal status, e.g. canceled). Idempotent: children already completed or failed are skipped.

Wrapped in a transaction so a partial failure mid-iteration rolls back the batch-status update too. Without this, an exception while flipping children would leave the batch terminal and prevent the polling/expire jobs from re-entering this path on a future run.

Local-only: does not touch the provider. Use #expire! when expiring a batch the provider hasn't finalized yet, so the provider-side batch is canceled on a best-effort basis.



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'app/models/raif/model_completion_batch.rb', line 293

def force_fail!(reason:)
  reason_str = reason.to_s.truncate(255)

  transaction do
    unless terminal?
      update!(status: "failed", failed_at: Time.current, failure_reason: reason_str)
    end

    raif_model_completions.each do |mc|
      mc.reload
      next if mc.completed? || mc.failed?

      mc.failure_error = "Raif::ModelCompletionBatch ##{id} #{status}"
      mc.failure_reason = reason_str
      mc.update_columns(started_at: started_at) if mc.started_at.nil? && started_at.present?
      mc.failed!
    end
  end
end

#llmObject



81
82
83
# File 'app/models/raif/model_completion_batch.rb', line 81

def llm
  Raif.llm(llm_model_key.to_sym)
end

#max_age_exceeded?Boolean

True once submitted_at is older than Raif.config.model_completion_batch_max_age. Used by the polling and expire-stuck jobs to decide when to force-fail a batch that the provider hasn't finalized in time.

Returns:

  • (Boolean)


229
230
231
232
233
# File 'app/models/raif/model_completion_batch.rb', line 229

def max_age_exceeded?
  return false if .blank?

  Time.current -  >= Raif.config.model_completion_batch_max_age
end

#recalculate_costs!Object

Aggregates total_cost / prompt_token_cost / output_token_cost from child completions after results have been applied. Should be called by the polling job once all children have been finalized.



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
# File 'app/models/raif/model_completion_batch.rb', line 316

def recalculate_costs!
  if raif_model_completions.empty?
    Raif.logger.warn(
      "Raif::ModelCompletionBatch ##{id}#recalculate_costs! skipped: no child raif_model_completions to aggregate"
    )
    return
  end

  # Three .sum calls instead of one Arel-flavored pick. The dataset is bounded
  # by batch size and these only run once per batch finalization, so the extra
  # round-trips are immaterial and the call is much easier to read.
  prompt_sum = raif_model_completions.sum(:prompt_token_cost)
  output_sum = raif_model_completions.sum(:output_token_cost)
  total_sum = raif_model_completions.sum(:total_cost)

  # ActiveRecord's .sum returns 0 (not nil) when every row's column is NULL.
  # Skip the write if everything is zero so we don't null-out manually-set
  # batch-level cost values from the host (rare, but cheap to guard).
  if prompt_sum.zero? && output_sum.zero? && total_sum.zero?
    Raif.logger.warn(
      "Raif::ModelCompletionBatch ##{id}#recalculate_costs! skipped: every child completion's " \
        "prompt_token_cost / output_token_cost / total_cost is NULL or zero"
    )
    return
  end

  update_columns(
    prompt_token_cost: prompt_sum,
    output_token_cost: output_sum,
    total_cost: total_sum,
    updated_at: Time.current
  )
end

#submit!(enqueue_poll: true) ⇒ Raif::ModelCompletionBatch

Submits the batch to the provider and (by default) enqueues Raif::PollModelCompletionBatchJob so the polling chain starts on its own. Pass enqueue_poll: false if your host app is driving its own poll scheduler off the Raif::ModelCompletionBatch.due_for_poll scope.

Parameters:

  • enqueue_poll (Boolean) (defaults to: true)

    whether to auto-enqueue the polling job

Returns:



133
134
135
136
137
# File 'app/models/raif/model_completion_batch.rb', line 133

def submit!(enqueue_poll: true)
  result = llm.submit_batch!(self)
  enqueue_first_poll! if enqueue_poll
  result
end

#successful?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'app/models/raif/model_completion_batch.rb', line 77

def successful?
  status == "ended"
end

#tasksObject

Convenience accessor for batches whose children were produced via Raif::Task.build_for_batch (the typical case). Returns the Raif::Task records attached to this batch through their child Raif::ModelCompletions.

Heterogeneous batches that mix Raif::Task producers with raw Raif::Llm#build_pending_model_completion producers will see only the Raif::Task subset here; use raif_model_completions for full coverage.



92
93
94
95
96
# File 'app/models/raif/model_completion_batch.rb', line 92

def tasks
  Raif::Task.where(
    id: raif_model_completions.where(source_type: "Raif::Task").select(:source_id)
  )
end

#terminal?Boolean

Returns:

  • (Boolean)


73
74
75
# File 'app/models/raif/model_completion_batch.rb', line 73

def terminal?
  TERMINAL_STATUSES.include?(status)
end