Module: Raif::Concerns::Llms::Anthropic::BatchInference
- Extended by:
- ActiveSupport::Concern
- Includes:
- SupportsBatchInference
- Included in:
- Llms::Anthropic
- Defined in:
- app/models/raif/concerns/llms/anthropic/batch_inference.rb
Overview
Anthropic Messages Batches API support for Raif::Llms::Anthropic. Implements the Raif::Concerns::Llms::SupportsBatchInference contract on top of /v1/messages/batches and the JSONL results stream.
The host LLM class is expected to provide #build_request_parameters and #update_model_completion -- these are reused verbatim from the synchronous path so prompt caching, tool definitions, and response shape carry over.
Instance Method Summary collapse
-
#apply_batch_result(mc, raw_result) ⇒ Object
Applies one per-entry batch result to a Raif::ModelCompletion.
- #batch_class ⇒ Object
-
#cancel_batch!(batch) ⇒ Object
Sends a cancel request to Anthropic's Messages Batches API.
- #fetch_batch_results!(batch) ⇒ Object
- #fetch_batch_status!(batch) ⇒ Object
-
#submit_batch!(batch) ⇒ Object
Submits all child Raif::ModelCompletion records of the batch as a single Anthropic Messages Batch.
Instance Method Details
#apply_batch_result(mc, raw_result) ⇒ Object
Applies one per-entry batch result to a Raif::ModelCompletion. The success
path feeds the embedded message payload through update_model_completion --
the same parser used by the synchronous and streaming paths -- so token
counts, tool calls, citations, and response shape are populated identically.
The 50% Anthropic batch discount is applied automatically by
Raif::ModelCompletion#calculate_costs (because raif_model_completion_batch_id is set).
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'app/models/raif/concerns/llms/anthropic/batch_inference.rb', line 192 def apply_batch_result(mc, raw_result) result = raw_result["result"] || {} # Set started_at in-memory before any save below, so update_model_completion's # save (or mc.failed!'s save) persists it in a single round-trip. mc.started_at ||= mc.raif_model_completion_batch&.started_at || Time.current case result["type"] when "succeeded" update_model_completion(mc, result["message"]) mc.completed! else type = result["type"].to_s error = result["error"] || {} mc.failure_error = "Anthropic batch entry #{type.presence || "failed"}" mc.failure_reason = (error["message"].presence || type.presence || "unknown failure").to_s.truncate(255) mc.failed! end mc end |
#batch_class ⇒ Object
15 16 17 |
# File 'app/models/raif/concerns/llms/anthropic/batch_inference.rb', line 15 def batch_class Raif::ModelCompletionBatches::Anthropic end |
#cancel_batch!(batch) ⇒ Object
Sends a cancel request to Anthropic's Messages Batches API. Cancellation is asynchronous on Anthropic's side: the provider transitions to processing_status "canceling" first, then to "canceled" once in-flight entries finish. Returns the (possibly transitional) Raif status.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'app/models/raif/concerns/llms/anthropic/batch_inference.rb', line 103 def cancel_batch!(batch) raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no provider_batch_id" if batch.provider_batch_id.blank? raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} is already terminal (status=#{batch.status})" if batch.terminal? response = batch_connection.post("messages/batches/#{batch.provider_batch_id}/cancel") body = response.body new_status = map_processing_status(body["processing_status"]) batch.with_lock do return batch.status if batch.terminal? updates = { status: new_status, request_counts: body["request_counts"] || batch.request_counts, provider_response: (batch.provider_response || {}).merge( "results_url" => body["results_url"], "cancel_url" => body["cancel_url"] ) } if Raif::ModelCompletionBatch::TERMINAL_STATUSES.include?(new_status) && batch.ended_at.nil? updates[:ended_at] = Time.current end batch.update!(updates) end new_status end |
#fetch_batch_results!(batch) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'app/models/raif/concerns/llms/anthropic/batch_inference.rb', line 132 def fetch_batch_results!(batch) raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no results_url" if batch.results_url.blank? completions_by_id = batch.raif_model_completions.index_by(&:batch_custom_id) response = batch_results_connection.get(batch.results_url) body = response.body.to_s body.each_line do |line| line = line.strip next if line.blank? begin raw = JSON.parse(line) rescue JSON::ParserError => e # One bad line shouldn't poison the rest of the batch. Skip it; any # child completion that never gets matched falls through to the # missing-entry sweep below and is force-failed there. Raif.logger.error( "Anthropic batch ##{batch.id} results: skipping malformed JSONL line " \ "(#{e.class}: #{e.}): #{line.inspect}" ) next end custom_id = raw["custom_id"] mc = completions_by_id[custom_id] if mc.nil? Raif.logger.warn( "Anthropic batch results: custom_id #{custom_id.inspect} did not match any child completion in batch ##{batch.id}" ) next end apply_batch_result(mc, raw) end # Anything that was never reported in the results stream (rare; possible if # the batch expired mid-flight or was canceled) is force-failed so the # workflow can advance. completions_by_id.each_value do |mc| mc.reload next if mc.completed? || mc.failed? mc.started_at ||= batch.started_at mc.failure_error = "Anthropic batch entry missing" mc.failure_reason = "Result not present in results stream (batch ##{batch.id})" mc.failed! end batch.recalculate_costs! batch end |
#fetch_batch_status!(batch) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'app/models/raif/concerns/llms/anthropic/batch_inference.rb', line 69 def fetch_batch_status!(batch) response = batch_connection.get("messages/batches/#{batch.provider_batch_id}") body = response.body new_status = map_processing_status(body["processing_status"]) # Re-acquire a row-level lock + reload so we don't overwrite a status another # process (e.g. ExpireStuckModelCompletionBatchesJob) just transitioned to # terminal. Without this guard, a stale instance can stomp a `failed` # decision back to whatever the provider currently reports. batch.with_lock do return batch.status if batch.terminal? updates = { status: new_status, request_counts: body["request_counts"] || {}, provider_response: (batch.provider_response || {}).merge( "results_url" => body["results_url"], "cancel_url" => body["cancel_url"] ) } if Raif::ModelCompletionBatch::TERMINAL_STATUSES.include?(new_status) && batch.ended_at.nil? updates[:ended_at] = Time.current end batch.update!(updates) end new_status end |
#submit_batch!(batch) ⇒ Object
Submits all child Raif::ModelCompletion records of the batch as a single
Anthropic Messages Batch. Each entry's params body is identical to what
the synchronous /v1/messages endpoint would receive.
The batch + child writes happen in a transaction so a partial failure (e.g. the network call succeeds but the child started_at update raises) leaves no submitted-but-unstamped state behind.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'app/models/raif/concerns/llms/anthropic/batch_inference.rb', line 26 def submit_batch!(batch) batch.assert_submittable! completions = batch.raif_model_completions.to_a raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no child completions" if completions.empty? requests = completions.map do |mc| if mc.batch_custom_id.blank? raise Raif::Errors::InvalidBatchError, "Raif::ModelCompletion ##{mc.id} has blank batch_custom_id" end { custom_id: mc.batch_custom_id, params: build_request_parameters(mc) } end response = batch_connection.post("messages/batches") do |req| req.body = { requests: requests } end body = response.body submitted_at = Time.current Raif::ModelCompletionBatch.transaction do batch.update!( provider_batch_id: body["id"], status: map_processing_status(body["processing_status"]) || "submitted", submitted_at: submitted_at, started_at: submitted_at, provider_response: (batch.provider_response || {}).merge( "results_url" => body["results_url"], "cancel_url" => body["cancel_url"] ), request_counts: body["request_counts"] || {} ) # Single UPDATE for all children that don't already have a started_at, # filtered in SQL so we can't stomp a started_at that was set by another # process between when we loaded `completions` and now. batch.raif_model_completions.where(started_at: nil).update_all(started_at: submitted_at) end batch end |