Module: Raif::Concerns::Llms::OpenAi::BatchInference
- Extended by:
- ActiveSupport::Concern
- Includes:
- SupportsBatchInference
- Included in:
- Llms::OpenAiBase
- Defined in:
- app/models/raif/concerns/llms/open_ai/batch_inference.rb
Overview
OpenAI Batches API support for Raif::Llms::OpenAiBase. Implements the Raif::Concerns::Llms::SupportsBatchInference contract on top of /v1/batches and the JSONL input/output file flow.
The host LLM class is expected to provide #build_request_parameters, #update_model_completion, and #supports_temperature? -- these are reused verbatim from the synchronous path so request body and response parsing are identical between the sync and batch paths. Subclasses must also implement #batch_endpoint_path to declare which OpenAI endpoint to target.
Instance Method Summary collapse
-
#apply_batch_result(mc, raw_result) ⇒ Object
Applies one per-entry batch result envelope to a Raif::ModelCompletion.
- #batch_class ⇒ Object
-
#batch_endpoint_path ⇒ String
Subclasses override to declare which OpenAI endpoint they target.
-
#cancel_batch!(batch) ⇒ Object
Sends a cancel request to OpenAI's Batches API.
- #fetch_batch_results!(batch) ⇒ Object
- #fetch_batch_status!(batch) ⇒ Object
-
#submit_batch!(batch) ⇒ Object
Submits all child Raif::ModelCompletion records as a single OpenAI batch.
Instance Method Details
#apply_batch_result(mc, raw_result) ⇒ Object
Applies one per-entry batch result envelope to a Raif::ModelCompletion. OpenAI's batch envelope is { id, custom_id, response: { status_code, request_id, body }, error }. The success path delegates to the host class's existing #update_model_completion (the same one used by the synchronous and streaming paths), so token counts, tool calls, and response shape match. The 50% batch discount is applied automatically by Raif::ModelCompletion#calculate_costs.
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 186 def apply_batch_result(mc, raw_result) response_envelope = raw_result["response"] error_envelope = raw_result["error"] # Set started_at in-memory before any save below, so update_model_completion's # save (or mc.failed!'s save) persists it in a single round-trip. mc.started_at ||= mc.raif_model_completion_batch&.started_at || Time.current if response_envelope.is_a?(Hash) && (response_envelope["status_code"] || 200).to_i.between?(200, 299) update_model_completion(mc, response_envelope["body"]) mc.completed! else = if error_envelope.is_a?(Hash) error_envelope["message"] elsif response_envelope.is_a?(Hash) response_envelope.dig("body", "error", "message") end mc.failure_error = "OpenAI batch entry failed (status: #{response_envelope&.dig("status_code") || "unknown"})" mc.failure_reason = (.presence || "unknown OpenAI batch failure").to_s.truncate(255) mc.failed! end mc end |
#batch_class ⇒ Object
20 21 22 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 20 def batch_class Raif::ModelCompletionBatches::OpenAi end |
#batch_endpoint_path ⇒ String
Subclasses override to declare which OpenAI endpoint they target. The same
value is used both as the JSONL line's url field (per-request endpoint)
and as the OpenAI Batches API endpoint parameter (whole-batch endpoint).
28 29 30 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 28 def batch_endpoint_path raise NotImplementedError, "#{self.class.name} must implement #batch_endpoint_path" end |
#cancel_batch!(batch) ⇒ Object
Sends a cancel request to OpenAI's Batches API. Cancellation is asynchronous on OpenAI's side: the provider transitions to status "cancelling" first, then to "cancelled" (mapped to "canceled") once in-flight entries finish. Returns the (possibly transitional) Raif status.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 117 def cancel_batch!(batch) raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no provider_batch_id" if batch.provider_batch_id.blank? raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} is already terminal (status=#{batch.status})" if batch.terminal? response = batch_connection.post("batches/#{batch.provider_batch_id}/cancel") body = response.body new_status = map_batch_status(body["status"]) batch.with_lock do return batch.status if batch.terminal? provider_response_updates = (batch.provider_response || {}).merge( "output_file_id" => body["output_file_id"], "error_file_id" => body["error_file_id"] ) updates = { status: new_status, request_counts: body["request_counts"] || batch.request_counts, provider_response: provider_response_updates } if Raif::ModelCompletionBatch::TERMINAL_STATUSES.include?(new_status) && batch.ended_at.nil? updates[:ended_at] = Time.current end batch.update!(updates) end new_status end |
#fetch_batch_results!(batch) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 148 def fetch_batch_results!(batch) completions_by_id = batch.raif_model_completions.index_by(&:batch_custom_id) if batch.output_file_id.present? apply_batch_jsonl(batch, batch.output_file_id, completions_by_id) end # The error_file_id contains entries that errored before the model produced # a response (validation failures, request shape errors, etc.). These have # the same `{ id, custom_id, response, error }` envelope as the output file. if batch.error_file_id.present? apply_batch_jsonl(batch, batch.error_file_id, completions_by_id) end # Anything that was never reported in either file (rare; possible if the # batch expired mid-flight or was canceled) is left as a failed completion # so the workflow can advance. completions_by_id.each_value do |mc| mc.reload next if mc.completed? || mc.failed? mc.started_at ||= batch.started_at mc.failure_error = "OpenAI batch entry missing" mc.failure_reason = "Result not present in output_file or error_file (batch ##{batch.id})" mc.failed! end batch.recalculate_costs! batch end |
#fetch_batch_status!(batch) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 81 def fetch_batch_status!(batch) response = batch_connection.get("batches/#{batch.provider_batch_id}") body = response.body new_status = map_batch_status(body["status"]) # Re-acquire a row-level lock + reload so we don't overwrite a status another # process (e.g. ExpireStuckModelCompletionBatchesJob) just transitioned to # terminal. Without this guard, a stale instance can stomp a `failed` # decision back to whatever the provider currently reports. batch.with_lock do return batch.status if batch.terminal? provider_response_updates = (batch.provider_response || {}).merge( "output_file_id" => body["output_file_id"], "error_file_id" => body["error_file_id"] ) updates = { status: new_status, request_counts: body["request_counts"] || {}, provider_response: provider_response_updates } if Raif::ModelCompletionBatch::TERMINAL_STATUSES.include?(new_status) && batch.ended_at.nil? updates[:ended_at] = Time.current end batch.update!(updates) end new_status end |
#submit_batch!(batch) ⇒ Object
Submits all child Raif::ModelCompletion records as a single OpenAI batch. Three-step flow: build the JSONL string, upload it as a file with purpose=batch, then create the batch referencing the file.
The batch + child writes happen in a transaction so a partial failure (e.g. the network call succeeds but the child started_at update raises) leaves no submitted-but-unstamped state behind.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 39 def submit_batch!(batch) batch.assert_submittable! completions = batch.raif_model_completions.to_a raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no child completions" if completions.empty? jsonl = build_batch_jsonl(completions) input_file_id = upload_batch_input_file!(jsonl) response = batch_connection.post("batches") do |req| req.body = { input_file_id: input_file_id, endpoint: batch_endpoint_path, completion_window: Raif.config.open_ai_batch_completion_window } end body = response.body submitted_at = Time.current Raif::ModelCompletionBatch.transaction do batch.update!( provider_batch_id: body["id"], status: map_batch_status(body["status"]) || "submitted", submitted_at: submitted_at, started_at: submitted_at, provider_response: (batch.provider_response || {}).merge( "input_file_id" => input_file_id, "endpoint" => batch_endpoint_path ), request_counts: body["request_counts"] || {} ) # Single UPDATE for all children that don't already have a started_at, # filtered in SQL so we can't stomp a started_at that was set by another # process between when we loaded `completions` and now. batch.raif_model_completions.where(started_at: nil).update_all(started_at: submitted_at) end batch end |