Class: Raif::ModelCompletionBatch
- Inherits:
-
ApplicationRecord
- Object
- ApplicationRecord
- Raif::ModelCompletionBatch
- Defined in:
- app/models/raif/model_completion_batch.rb
Direct Known Subclasses
Raif::ModelCompletionBatches::Anthropic, Raif::ModelCompletionBatches::Google, Raif::ModelCompletionBatches::OpenAi, Raif::ModelCompletionBatches::XAi
Constant Summary collapse
- STATUSES =
%w[pending submitted in_progress ended canceled expired failed].freeze
- TERMINAL_STATUSES =
%w[ended canceled expired failed].freeze
Instance Method Summary collapse
-
#add_task(task, batch_custom_id: nil) ⇒ Raif::Task
Attaches an existing Raif::Task to this batch as a pending child completion.
-
#assert_submittable! ⇒ Object
Idempotency guard for batch submission.
-
#cancel! ⇒ Object
Asks the provider to cancel the batch.
-
#dispatch_completion_handler! ⇒ Object
Resolves and invokes the batch's completion handler, if one is configured.
-
#enqueue_first_poll! ⇒ Object
Stamps next_poll_at and enqueues the first Raif::PollModelCompletionBatchJob using the first entry of Raif.config.model_completion_batch_poll_schedule.
-
#expire!(reason:) ⇒ Object
Used by the expiry paths (the polling job's max_age_exceeded? branch and Raif::ExpireStuckModelCompletionBatchesJob) when we decide to give up on a batch the provider hasn't finalized yet.
- #fetch_results! ⇒ Object
- #fetch_status! ⇒ Object
-
#finalize! ⇒ Object
Called by the polling job once the batch reaches a terminal status.
-
#force_fail!(reason:) ⇒ Object
Marks every non-terminal child completion as failed and sets the batch to
failed(preserving an already-terminal status, e.g.canceled). - #llm ⇒ Object
-
#max_age_exceeded? ⇒ Boolean
True once submitted_at is older than Raif.config.model_completion_batch_max_age.
-
#recalculate_costs! ⇒ Object
Aggregates total_cost / prompt_token_cost / output_token_cost from child completions after results have been applied.
-
#submit!(enqueue_poll: true) ⇒ Raif::ModelCompletionBatch
Submits the batch to the provider and (by default) enqueues Raif::PollModelCompletionBatchJob so the polling chain starts on its own.
- #successful? ⇒ Boolean
-
#tasks ⇒ Object
Convenience accessor for batches whose children were produced via Raif::Task.build_for_batch (the typical case).
- #terminal? ⇒ Boolean
Instance Method Details
#add_task(task, batch_custom_id: nil) ⇒ Raif::Task
Attaches an existing Raif::Task to this batch as a pending child completion. The task is persisted if not already, then routed through Raif::Task#prepare_for_batch! to populate prompts and build the pending Raif::ModelCompletion.
Pair with Raif::Llm#create_batch when the producer constructs tasks outside of the batch (composing them in a loop, in a service object, etc.). For the one-call shortcut – build + save + attach in a single call – use Raif::Task.build_for_batch instead.
113 114 115 116 117 |
# File 'app/models/raif/model_completion_batch.rb', line 113 def add_task(task, batch_custom_id: nil) task.save! if task.new_record? task.prepare_for_batch!(batch: self, batch_custom_id: batch_custom_id) task end |
#assert_submittable! ⇒ Object
Idempotency guard for batch submission. Raises Raif::Errors::InvalidBatchError
if the batch already has a provider_batch_id or has moved past pending,
so a duplicate batch.submit! / llm.submit_batch!(batch) call cannot create
a second provider-side batch and orphan the first one. Called by every
provider's #submit_batch! at the top of the method.
191 192 193 194 195 196 197 198 |
# File 'app/models/raif/model_completion_batch.rb', line 191 def assert_submittable! return if status == "pending" && provider_batch_id.blank? raise Raif::Errors::InvalidBatchError, "Raif::ModelCompletionBatch ##{id} is not submittable: status=#{status}, " \ "provider_batch_id=#{provider_batch_id.inspect}. submit! / submit_batch! " \ "is single-shot; use #cancel! and create a new batch if you need to retry." end |
#cancel! ⇒ Object
Asks the provider to cancel the batch. Cancellation is asynchronous on both Anthropic and OpenAI: the provider acknowledges with a transitional status and the next poll picks up the final canceled state. The polling job then routes the canceled batch through the same finalize/dispatch path as any other terminal status, force-failing remaining children.
182 183 184 |
# File 'app/models/raif/model_completion_batch.rb', line 182 def cancel! llm.cancel_batch!(self) end |
#dispatch_completion_handler! ⇒ Object
Resolves and invokes the batch's completion handler, if one is configured.
The handler class must implement .handle_batch_completion(batch).
Idempotent via handler_dispatched_at: once a successful run finishes, future
callers (the polling job's terminal-batch path, the safety sweep, an
ActiveJob retry of either) skip the dispatch. If the handler raises,
handler_dispatched_at stays NULL so a future caller can re-dispatch --
without that retry path the polling job's return if batch.terminal?
guard would silently swallow handler-raised errors and the consumer's
on_batch_completion block would never run.
At-most-once across concurrent callers: the guard + handler invocation + timestamp write are wrapped in #with_lock so a normal poll job racing with the safety sweep (or a resume-stalled poll) cannot both pass the blank-handler_dispatched_at check and run the handler twice. Cheap callers still short-circuit before taking the lock.
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'app/models/raif/model_completion_batch.rb', line 216 def dispatch_completion_handler! return if handler_dispatched_at.present? return if completion_handler_class_name.blank? # Local-side resolution must be complete before the handler runs -- # otherwise the handler would dispatch against child completions that # haven't been hydrated with their provider results yet, which is how # batches got silently stranded under the prior code path. return if results_fetched_at.blank? handler = completion_handler_class_name.safe_constantize if handler.blank? Raif.logger.error( "Raif::ModelCompletionBatch##{id} has completion_handler_class_name=#{completion_handler_class_name.inspect} " \ "which could not be resolved to a class. Skipping handler dispatch." ) return end with_lock do # Re-check under lock: another worker may have dispatched between our # early-return check above and lock acquisition. return if handler_dispatched_at.present? handler.handle_batch_completion(self) update_column(:handler_dispatched_at, Time.current) end end |
#enqueue_first_poll! ⇒ Object
Stamps next_poll_at and enqueues the first Raif::PollModelCompletionBatchJob using the first entry of Raif.config.model_completion_batch_poll_schedule. Called by #submit! by default; can also be invoked manually if a host opted out of auto-enqueue (#submit!(enqueue_poll: false)) and wants to start polling later.
Guards against being called on a batch that hasn't been submitted yet (no provider_batch_id, so the polling job's fetch_batch_status! would 404 and burn the entire poll schedule until max_age expiry) or that's already terminal (nothing to poll for). Raises Raif::Errors::InvalidBatchError so a misordered call surfaces immediately instead of silently scheduling a doomed poll chain.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'app/models/raif/model_completion_batch.rb', line 152 def enqueue_first_poll! if provider_batch_id.blank? raise Raif::Errors::InvalidBatchError, "Raif::ModelCompletionBatch ##{id}#enqueue_first_poll! requires provider_batch_id; " \ "call submit! (or llm.submit_batch!(batch)) first." end if terminal? raise Raif::Errors::InvalidBatchError, "Raif::ModelCompletionBatch ##{id}#enqueue_first_poll! refusing to schedule a poll " \ "for a terminal batch (status=#{status})." end delay = Array(Raif.config.model_completion_batch_poll_schedule).first || 1.minute update_column(:next_poll_at, delay.from_now) Raif::PollModelCompletionBatchJob.set(wait: delay).perform_later(id) end |
#expire!(reason:) ⇒ Object
Used by the expiry paths (the polling job's max_age_exceeded? branch and Raif::ExpireStuckModelCompletionBatchesJob) when we decide to give up on a batch the provider hasn't finalized yet. Distinct from #force_fail!, which only flips local state -- this method also issues a best-effort provider-side cancel so we don't keep paying for batch results we've stopped reading and don't keep occupying the provider's per-org concurrent-batch quota.
Cancellation is best-effort: if it fails (5xx, network, auth, etc.) we log and continue, because the local force-fail still has to happen so any waiting workflow can advance. A still-running provider-side batch will eventually self-finalize via the provider's own timer; the worst case is paying for results we discard.
Skips the cancel call entirely when the batch is already terminal (the provider's done with it) or hasn't been submitted yet (no provider_batch_id to cancel).
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'app/models/raif/model_completion_batch.rb', line 308 def expire!(reason:) if !terminal? && provider_batch_id.present? begin llm.cancel_batch!(self) rescue StandardError => e Raif.logger.warn( "Raif::ModelCompletionBatch ##{id} best-effort provider-side cancel failed " \ "while expiring: #{e.class}: #{e.}. Continuing with local force-fail; " \ "the provider-side batch may still complete and be billed." ) end end force_fail!(reason: reason) end |
#fetch_results! ⇒ Object
173 174 175 |
# File 'app/models/raif/model_completion_batch.rb', line 173 def fetch_results! llm.fetch_batch_results!(self) end |
#fetch_status! ⇒ Object
169 170 171 |
# File 'app/models/raif/model_completion_batch.rb', line 169 def fetch_status! llm.fetch_batch_status!(self) end |
#finalize! ⇒ Object
Called by the polling job once the batch reaches a terminal status. On
ended (successful), fetches per-entry results from the provider. On the
other terminal statuses (canceled / expired / failed), force-fails every
still-pending child completion since there are no per-entry results to
collect.
Idempotent via results_fetched_at: once a successful run finishes, future callers (the poll job's terminal-at-top path, the safety sweep, an ActiveJob retry of either) skip the provider round-trip. If fetch_results! raises mid-stream, results_fetched_at stays NULL so a subsequent finalize! call retries the fetch -- this is how the poll chain self-heals from transient provider errors on the results endpoint without permanently stranding the child completions.
At-most-once across concurrent callers: the guard + fetch + stamp are wrapped in #with_lock so a normal poll job racing with the safety sweep (or a resume-stalled poll) cannot both pass the blank- results_fetched_at check and double-call fetch_results!. Cheap callers still short-circuit before taking the lock.
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'app/models/raif/model_completion_batch.rb', line 272 def finalize! return if results_fetched_at.present? with_lock do # Re-check under lock: another worker may have hydrated results # between our early-return check above and lock acquisition. return if results_fetched_at.present? if successful? fetch_results! update_column(:results_fetched_at, Time.current) else # force_fail! sets results_fetched_at inside its transaction so the # success/failure paths converge on the same on-disk signal. force_fail!(reason: "Batch ended with status: #{status}") end end end |
#force_fail!(reason:) ⇒ Object
Marks every non-terminal child completion as failed and sets the batch to
failed (preserving an already-terminal status, e.g. canceled). Idempotent:
children already completed or failed are skipped.
Wrapped in a transaction so a partial failure mid-iteration rolls back the batch-status update too. Without this, an exception while flipping children would leave the batch terminal and prevent the polling/expire jobs from re-entering this path on a future run.
Local-only: does not touch the provider. Use #expire! when expiring a batch the provider hasn't finalized yet, so the provider-side batch is canceled on a best-effort basis.
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'app/models/raif/model_completion_batch.rb', line 336 def force_fail!(reason:) reason_str = reason.to_s.truncate(255) transaction do unless terminal? update!(status: "failed", failed_at: Time.current, failure_reason: reason_str) end raif_model_completions.each do |mc| mc.reload next if mc.completed? || mc.failed? mc.failure_error = "Raif::ModelCompletionBatch ##{id} #{status}" mc.failure_reason = reason_str mc.update_columns(started_at: started_at) if mc.started_at.nil? && started_at.present? mc.failed! end update_column(:results_fetched_at, Time.current) if results_fetched_at.blank? end end |
#llm ⇒ Object
82 83 84 |
# File 'app/models/raif/model_completion_batch.rb', line 82 def llm Raif.llm(llm_model_key.to_sym) end |
#max_age_exceeded? ⇒ Boolean
True once submitted_at is older than Raif.config.model_completion_batch_max_age. Used by the polling and expire-stuck jobs to decide when to force-fail a batch that the provider hasn't finalized in time.
247 248 249 250 251 |
# File 'app/models/raif/model_completion_batch.rb', line 247 def max_age_exceeded? return false if submitted_at.blank? Time.current - submitted_at >= Raif.config.model_completion_batch_max_age end |
#recalculate_costs! ⇒ Object
Aggregates total_cost / prompt_token_cost / output_token_cost from child completions after results have been applied. Should be called by the polling job once all children have been finalized.
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
# File 'app/models/raif/model_completion_batch.rb', line 361 def recalculate_costs! if raif_model_completions.empty? Raif.logger.warn( "Raif::ModelCompletionBatch ##{id}#recalculate_costs! skipped: no child raif_model_completions to aggregate" ) return end # Three .sum calls instead of one Arel-flavored pick. The dataset is bounded # by batch size and these only run once per batch finalization, so the extra # round-trips are immaterial and the call is much easier to read. prompt_sum = raif_model_completions.sum(:prompt_token_cost) output_sum = raif_model_completions.sum(:output_token_cost) total_sum = raif_model_completions.sum(:total_cost) # ActiveRecord's .sum returns 0 (not nil) when every row's column is NULL. # Skip the write if everything is zero so we don't null-out manually-set # batch-level cost values from the host (rare, but cheap to guard). if prompt_sum.zero? && output_sum.zero? && total_sum.zero? Raif.logger.warn( "Raif::ModelCompletionBatch ##{id}#recalculate_costs! skipped: every child completion's " \ "prompt_token_cost / output_token_cost / total_cost is NULL or zero" ) return end update_columns( prompt_token_cost: prompt_sum, output_token_cost: output_sum, total_cost: total_sum, updated_at: Time.current ) end |
#submit!(enqueue_poll: true) ⇒ Raif::ModelCompletionBatch
Submits the batch to the provider and (by default) enqueues Raif::PollModelCompletionBatchJob so the polling chain starts on its own. Pass enqueue_poll: false if your host app is driving its own poll scheduler off the Raif::ModelCompletionBatch.due_for_poll scope.
134 135 136 137 138 |
# File 'app/models/raif/model_completion_batch.rb', line 134 def submit!(enqueue_poll: true) result = llm.submit_batch!(self) enqueue_first_poll! if enqueue_poll result end |
#successful? ⇒ Boolean
78 79 80 |
# File 'app/models/raif/model_completion_batch.rb', line 78 def successful? status == "ended" end |
#tasks ⇒ Object
Convenience accessor for batches whose children were produced via Raif::Task.build_for_batch (the typical case). Returns the Raif::Task records attached to this batch through their child Raif::ModelCompletions.
Heterogeneous batches that mix Raif::Task producers with raw Raif::Llm#build_pending_model_completion producers will see only the Raif::Task subset here; use raif_model_completions for full coverage.
93 94 95 96 97 |
# File 'app/models/raif/model_completion_batch.rb', line 93 def tasks Raif::Task.where( id: raif_model_completions.where(source_type: "Raif::Task").select(:source_id) ) end |
#terminal? ⇒ Boolean
74 75 76 |
# File 'app/models/raif/model_completion_batch.rb', line 74 def terminal? TERMINAL_STATUSES.include?(status) end |