Class: Raif::ModelCompletionBatch
- Inherits:
-
ApplicationRecord
- Object
- ApplicationRecord
- Raif::ModelCompletionBatch
- Defined in:
- app/models/raif/model_completion_batch.rb
Direct Known Subclasses
Raif::ModelCompletionBatches::Anthropic, Raif::ModelCompletionBatches::Google, Raif::ModelCompletionBatches::OpenAi
Constant Summary collapse
- STATUSES =
%w[pending submitted in_progress ended canceled expired failed].freeze
- TERMINAL_STATUSES =
%w[ended canceled expired failed].freeze
Instance Method Summary collapse
-
#add_task(task, batch_custom_id: nil) ⇒ Raif::Task
Attaches an existing Raif::Task to this batch as a pending child completion.
-
#assert_submittable! ⇒ Object
Idempotency guard for batch submission.
-
#cancel! ⇒ Object
Asks the provider to cancel the batch.
-
#dispatch_completion_handler! ⇒ Object
Resolves and invokes the batch's completion handler, if one is configured.
-
#enqueue_first_poll! ⇒ Object
Stamps next_poll_at and enqueues the first Raif::PollModelCompletionBatchJob using the first entry of Raif.config.model_completion_batch_poll_schedule.
-
#expire!(reason:) ⇒ Object
Used by the expiry paths (the polling job's max_age_exceeded? branch and Raif::ExpireStuckModelCompletionBatchesJob) when we decide to give up on a batch the provider hasn't finalized yet.
- #fetch_results! ⇒ Object
- #fetch_status! ⇒ Object
-
#finalize! ⇒ Object
Called by the polling job once the batch reaches a terminal status.
-
#force_fail!(reason:) ⇒ Object
Marks every non-terminal child completion as failed and sets the batch to
failed(preserving an already-terminal status, e.g.canceled). - #llm ⇒ Object
-
#max_age_exceeded? ⇒ Boolean
True once submitted_at is older than Raif.config.model_completion_batch_max_age.
-
#recalculate_costs! ⇒ Object
Aggregates total_cost / prompt_token_cost / output_token_cost from child completions after results have been applied.
-
#submit!(enqueue_poll: true) ⇒ Raif::ModelCompletionBatch
Submits the batch to the provider and (by default) enqueues Raif::PollModelCompletionBatchJob so the polling chain starts on its own.
- #successful? ⇒ Boolean
-
#tasks ⇒ Object
Convenience accessor for batches whose children were produced via Raif::Task.build_for_batch (the typical case).
- #terminal? ⇒ Boolean
Instance Method Details
#add_task(task, batch_custom_id: nil) ⇒ Raif::Task
Attaches an existing Raif::Task to this batch as a pending child completion. The task is persisted if not already, then routed through Raif::Task#prepare_for_batch! to populate prompts and build the pending Raif::ModelCompletion.
Pair with Raif::Llm#create_batch when the producer constructs tasks outside of the batch (composing them in a loop, in a service object, etc.). For the one-call shortcut – build + save + attach in a single call – use Raif::Task.build_for_batch instead.
112 113 114 115 116 |
# File 'app/models/raif/model_completion_batch.rb', line 112 def add_task(task, batch_custom_id: nil) task.save! if task.new_record? task.prepare_for_batch!(batch: self, batch_custom_id: batch_custom_id) task end |
#assert_submittable! ⇒ Object
Idempotency guard for batch submission. Raises Raif::Errors::InvalidBatchError
if the batch already has a provider_batch_id or has moved past pending,
so a duplicate batch.submit! / llm.submit_batch!(batch) call cannot create
a second provider-side batch and orphan the first one. Called by every
provider's #submit_batch! at the top of the method.
190 191 192 193 194 195 196 197 |
# File 'app/models/raif/model_completion_batch.rb', line 190 def assert_submittable! return if status == "pending" && provider_batch_id.blank? raise Raif::Errors::InvalidBatchError, "Raif::ModelCompletionBatch ##{id} is not submittable: status=#{status}, " \ "provider_batch_id=#{provider_batch_id.inspect}. submit! / submit_batch! " \ "is single-shot; use #cancel! and create a new batch if you need to retry." end |
#cancel! ⇒ Object
Asks the provider to cancel the batch. Cancellation is asynchronous on both Anthropic and OpenAI: the provider acknowledges with a transitional status and the next poll picks up the final canceled state. The polling job then routes the canceled batch through the same finalize/dispatch path as any other terminal status, force-failing remaining children.
181 182 183 |
# File 'app/models/raif/model_completion_batch.rb', line 181 def cancel! llm.cancel_batch!(self) end |
#dispatch_completion_handler! ⇒ Object
Resolves and invokes the batch's completion handler, if one is configured.
The handler class must implement .handle_batch_completion(batch).
Idempotent via handler_dispatched_at: once a successful run finishes, future
callers (the polling job's terminal-batch path, the safety sweep, an
ActiveJob retry of either) skip the dispatch. If the handler raises,
handler_dispatched_at stays NULL so a future caller can re-dispatch --
without that retry path the polling job's return if batch.terminal?
guard would silently swallow handler-raised errors and the consumer's
on_batch_completion block would never run.
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'app/models/raif/model_completion_batch.rb', line 209 def dispatch_completion_handler! return if handler_dispatched_at.present? return if completion_handler_class_name.blank? handler = completion_handler_class_name.safe_constantize if handler.blank? Raif.logger.error( "Raif::ModelCompletionBatch##{id} has completion_handler_class_name=#{completion_handler_class_name.inspect} " \ "which could not be resolved to a class. Skipping handler dispatch." ) return end handler.handle_batch_completion(self) update_column(:handler_dispatched_at, Time.current) end |
#enqueue_first_poll! ⇒ Object
Stamps next_poll_at and enqueues the first Raif::PollModelCompletionBatchJob using the first entry of Raif.config.model_completion_batch_poll_schedule. Called by #submit! by default; can also be invoked manually if a host opted out of auto-enqueue (#submit!(enqueue_poll: false)) and wants to start polling later.
Guards against being called on a batch that hasn't been submitted yet (no provider_batch_id, so the polling job's fetch_batch_status! would 404 and burn the entire poll schedule until max_age expiry) or that's already terminal (nothing to poll for). Raises Raif::Errors::InvalidBatchError so a misordered call surfaces immediately instead of silently scheduling a doomed poll chain.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'app/models/raif/model_completion_batch.rb', line 151 def enqueue_first_poll! if provider_batch_id.blank? raise Raif::Errors::InvalidBatchError, "Raif::ModelCompletionBatch ##{id}#enqueue_first_poll! requires provider_batch_id; " \ "call submit! (or llm.submit_batch!(batch)) first." end if terminal? raise Raif::Errors::InvalidBatchError, "Raif::ModelCompletionBatch ##{id}#enqueue_first_poll! refusing to schedule a poll " \ "for a terminal batch (status=#{status})." end delay = Array(Raif.config.model_completion_batch_poll_schedule).first || 1.minute update_column(:next_poll_at, delay.from_now) Raif::PollModelCompletionBatchJob.set(wait: delay).perform_later(id) end |
#expire!(reason:) ⇒ Object
Used by the expiry paths (the polling job's max_age_exceeded? branch and Raif::ExpireStuckModelCompletionBatchesJob) when we decide to give up on a batch the provider hasn't finalized yet. Distinct from #force_fail!, which only flips local state -- this method also issues a best-effort provider-side cancel so we don't keep paying for batch results we've stopped reading and don't keep occupying the provider's per-org concurrent-batch quota.
Cancellation is best-effort: if it fails (5xx, network, auth, etc.) we log and continue, because the local force-fail still has to happen so any waiting workflow can advance. A still-running provider-side batch will eventually self-finalize via the provider's own timer; the worst case is paying for results we discard.
Skips the cancel call entirely when the batch is already terminal (the provider's done with it) or hasn't been submitted yet (no provider_batch_id to cancel).
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'app/models/raif/model_completion_batch.rb', line 265 def expire!(reason:) if !terminal? && provider_batch_id.present? begin llm.cancel_batch!(self) rescue StandardError => e Raif.logger.warn( "Raif::ModelCompletionBatch ##{id} best-effort provider-side cancel failed " \ "while expiring: #{e.class}: #{e.}. Continuing with local force-fail; " \ "the provider-side batch may still complete and be billed." ) end end force_fail!(reason: reason) end |
#fetch_results! ⇒ Object
172 173 174 |
# File 'app/models/raif/model_completion_batch.rb', line 172 def fetch_results! llm.fetch_batch_results!(self) end |
#fetch_status! ⇒ Object
168 169 170 |
# File 'app/models/raif/model_completion_batch.rb', line 168 def fetch_status! llm.fetch_batch_status!(self) end |
#finalize! ⇒ Object
Called by the polling job once the batch reaches a terminal status. On
ended (successful), fetches per-entry results from the provider. On the
other terminal statuses (canceled / expired / failed), force-fails every
still-pending child completion since there are no per-entry results to
collect.
240 241 242 243 244 245 246 |
# File 'app/models/raif/model_completion_batch.rb', line 240 def finalize! if successful? fetch_results! else force_fail!(reason: "Batch ended with status: #{status}") end end |
#force_fail!(reason:) ⇒ Object
Marks every non-terminal child completion as failed and sets the batch to
failed (preserving an already-terminal status, e.g. canceled). Idempotent:
children already completed or failed are skipped.
Wrapped in a transaction so a partial failure mid-iteration rolls back the batch-status update too. Without this, an exception while flipping children would leave the batch terminal and prevent the polling/expire jobs from re-entering this path on a future run.
Local-only: does not touch the provider. Use #expire! when expiring a batch the provider hasn't finalized yet, so the provider-side batch is canceled on a best-effort basis.
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'app/models/raif/model_completion_batch.rb', line 293 def force_fail!(reason:) reason_str = reason.to_s.truncate(255) transaction do unless terminal? update!(status: "failed", failed_at: Time.current, failure_reason: reason_str) end raif_model_completions.each do |mc| mc.reload next if mc.completed? || mc.failed? mc.failure_error = "Raif::ModelCompletionBatch ##{id} #{status}" mc.failure_reason = reason_str mc.update_columns(started_at: started_at) if mc.started_at.nil? && started_at.present? mc.failed! end end end |
#llm ⇒ Object
81 82 83 |
# File 'app/models/raif/model_completion_batch.rb', line 81 def llm Raif.llm(llm_model_key.to_sym) end |
#max_age_exceeded? ⇒ Boolean
True once submitted_at is older than Raif.config.model_completion_batch_max_age. Used by the polling and expire-stuck jobs to decide when to force-fail a batch that the provider hasn't finalized in time.
229 230 231 232 233 |
# File 'app/models/raif/model_completion_batch.rb', line 229 def max_age_exceeded? return false if submitted_at.blank? Time.current - submitted_at >= Raif.config.model_completion_batch_max_age end |
#recalculate_costs! ⇒ Object
Aggregates total_cost / prompt_token_cost / output_token_cost from child completions after results have been applied. Should be called by the polling job once all children have been finalized.
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 |
# File 'app/models/raif/model_completion_batch.rb', line 316 def recalculate_costs! if raif_model_completions.empty? Raif.logger.warn( "Raif::ModelCompletionBatch ##{id}#recalculate_costs! skipped: no child raif_model_completions to aggregate" ) return end # Three .sum calls instead of one Arel-flavored pick. The dataset is bounded # by batch size and these only run once per batch finalization, so the extra # round-trips are immaterial and the call is much easier to read. prompt_sum = raif_model_completions.sum(:prompt_token_cost) output_sum = raif_model_completions.sum(:output_token_cost) total_sum = raif_model_completions.sum(:total_cost) # ActiveRecord's .sum returns 0 (not nil) when every row's column is NULL. # Skip the write if everything is zero so we don't null-out manually-set # batch-level cost values from the host (rare, but cheap to guard). if prompt_sum.zero? && output_sum.zero? && total_sum.zero? Raif.logger.warn( "Raif::ModelCompletionBatch ##{id}#recalculate_costs! skipped: every child completion's " \ "prompt_token_cost / output_token_cost / total_cost is NULL or zero" ) return end update_columns( prompt_token_cost: prompt_sum, output_token_cost: output_sum, total_cost: total_sum, updated_at: Time.current ) end |
#submit!(enqueue_poll: true) ⇒ Raif::ModelCompletionBatch
Submits the batch to the provider and (by default) enqueues Raif::PollModelCompletionBatchJob so the polling chain starts on its own. Pass enqueue_poll: false if your host app is driving its own poll scheduler off the Raif::ModelCompletionBatch.due_for_poll scope.
133 134 135 136 137 |
# File 'app/models/raif/model_completion_batch.rb', line 133 def submit!(enqueue_poll: true) result = llm.submit_batch!(self) enqueue_first_poll! if enqueue_poll result end |
#successful? ⇒ Boolean
77 78 79 |
# File 'app/models/raif/model_completion_batch.rb', line 77 def successful? status == "ended" end |
#tasks ⇒ Object
Convenience accessor for batches whose children were produced via Raif::Task.build_for_batch (the typical case). Returns the Raif::Task records attached to this batch through their child Raif::ModelCompletions.
Heterogeneous batches that mix Raif::Task producers with raw Raif::Llm#build_pending_model_completion producers will see only the Raif::Task subset here; use raif_model_completions for full coverage.
92 93 94 95 96 |
# File 'app/models/raif/model_completion_batch.rb', line 92 def tasks Raif::Task.where( id: raif_model_completions.where(source_type: "Raif::Task").select(:source_id) ) end |
#terminal? ⇒ Boolean
73 74 75 |
# File 'app/models/raif/model_completion_batch.rb', line 73 def terminal? TERMINAL_STATUSES.include?(status) end |