Module: Raif::Concerns::Llms::OpenAi::BatchInference
- Extended by:
- ActiveSupport::Concern
- Includes:
- SupportsBatchInference
- Included in:
- Llms::OpenAiBase
- Defined in:
- app/models/raif/concerns/llms/open_ai/batch_inference.rb
Overview
OpenAI Batches API support for Raif::Llms::OpenAiBase. Implements the Raif::Concerns::Llms::SupportsBatchInference contract on top of /v1/batches and the JSONL input/output file flow.
The host LLM class is expected to provide #build_request_parameters, #update_model_completion, and #supports_temperature? -- these are reused verbatim from the synchronous path so request body and response parsing are identical between the sync and batch paths. Subclasses must also implement #batch_endpoint_path to declare which OpenAI endpoint to target.
Instance Method Summary collapse
-
#apply_batch_result(mc, raw_result) ⇒ Object
Applies one per-entry batch result envelope to a Raif::ModelCompletion.
- #batch_class ⇒ Object
-
#batch_endpoint_path ⇒ String
Subclasses override to declare which OpenAI endpoint they target.
-
#cancel_batch!(batch) ⇒ Object
Sends a cancel request to OpenAI's Batches API.
- #fetch_batch_results!(batch) ⇒ Object
- #fetch_batch_status!(batch) ⇒ Object
-
#submit_batch!(batch) ⇒ Object
Submits all child Raif::ModelCompletion records as a single OpenAI batch.
Instance Method Details
#apply_batch_result(mc, raw_result) ⇒ Object
Applies one per-entry batch result envelope to a Raif::ModelCompletion. OpenAI's batch envelope is { id, custom_id, response: { status_code, request_id, body }, error }. The success path delegates to the host class's existing #update_model_completion (the same one used by the synchronous and streaming paths), so token counts, tool calls, and response shape match. The 50% batch discount is applied automatically by Raif::ModelCompletion#calculate_costs.
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 194 def apply_batch_result(mc, raw_result) response_envelope = raw_result["response"] error_envelope = raw_result["error"] # Set started_at in-memory before any save below, so update_model_completion's # save (or mc.failed!'s save) persists it in a single round-trip. mc.started_at ||= mc.raif_model_completion_batch&.started_at || Time.current if response_envelope.is_a?(Hash) && (response_envelope["status_code"] || 200).to_i.between?(200, 299) update_model_completion(mc, response_envelope["body"]) mc.completed! else = if error_envelope.is_a?(Hash) error_envelope["message"] elsif response_envelope.is_a?(Hash) response_envelope.dig("body", "error", "message") end mc.failure_error = "OpenAI batch entry failed (status: #{response_envelope&.dig("status_code") || "unknown"})" mc.failure_reason = (.presence || "unknown OpenAI batch failure").to_s.truncate(255) mc.failed! end mc end |
#batch_class ⇒ Object
20 21 22 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 20 def batch_class Raif::ModelCompletionBatches::OpenAi end |
#batch_endpoint_path ⇒ String
Subclasses override to declare which OpenAI endpoint they target. The same
value is used both as the JSONL line's url field (per-request endpoint)
and as the OpenAI Batches API endpoint parameter (whole-batch endpoint).
28 29 30 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 28 def batch_endpoint_path raise NotImplementedError, "#{self.class.name} must implement #batch_endpoint_path" end |
#cancel_batch!(batch) ⇒ Object
Sends a cancel request to OpenAI's Batches API. Cancellation is asynchronous on OpenAI's side: the provider transitions to status "cancelling" first, then to "cancelled" (mapped to "canceled") once in-flight entries finish. Returns the (possibly transitional) Raif status.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 123 def cancel_batch!(batch) raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no provider_batch_id" if batch.provider_batch_id.blank? raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} is already terminal (status=#{batch.status})" if batch.terminal? response = with_batch_transient_retry(:cancel, batch_id: batch.id) do batch_connection.post("batches/#{batch.provider_batch_id}/cancel") end body = response.body new_status = map_batch_status(body["status"]) batch.with_lock do return batch.status if batch.terminal? provider_response_updates = (batch.provider_response || {}).merge( "output_file_id" => body["output_file_id"], "error_file_id" => body["error_file_id"] ) updates = { status: new_status, request_counts: body["request_counts"] || batch.request_counts, provider_response: provider_response_updates } if Raif::ModelCompletionBatch::TERMINAL_STATUSES.include?(new_status) && batch.ended_at.nil? updates[:ended_at] = Time.current end batch.update!(updates) end new_status end |
#fetch_batch_results!(batch) ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 156 def fetch_batch_results!(batch) completions_by_id = batch.raif_model_completions.index_by(&:batch_custom_id) if batch.output_file_id.present? apply_batch_jsonl(batch, batch.output_file_id, completions_by_id) end # The error_file_id contains entries that errored before the model produced # a response (validation failures, request shape errors, etc.). These have # the same `{ id, custom_id, response, error }` envelope as the output file. if batch.error_file_id.present? apply_batch_jsonl(batch, batch.error_file_id, completions_by_id) end # Anything that was never reported in either file (rare; possible if the # batch expired mid-flight or was canceled) is left as a failed completion # so the workflow can advance. completions_by_id.each_value do |mc| mc.reload next if mc.completed? || mc.failed? mc.started_at ||= batch.started_at mc.failure_error = "OpenAI batch entry missing" mc.failure_reason = "Result not present in output_file or error_file (batch ##{batch.id})" mc.failed! end batch.recalculate_costs! batch end |
#fetch_batch_status!(batch) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 85 def fetch_batch_status!(batch) response = with_batch_transient_retry(:fetch_status, batch_id: batch.id) do batch_connection.get("batches/#{batch.provider_batch_id}") end body = response.body new_status = map_batch_status(body["status"]) # Re-acquire a row-level lock + reload so we don't overwrite a status another # process (e.g. ExpireStuckModelCompletionBatchesJob) just transitioned to # terminal. Without this guard, a stale instance can stomp a `failed` # decision back to whatever the provider currently reports. batch.with_lock do return batch.status if batch.terminal? provider_response_updates = (batch.provider_response || {}).merge( "output_file_id" => body["output_file_id"], "error_file_id" => body["error_file_id"] ) updates = { status: new_status, request_counts: body["request_counts"] || {}, provider_response: provider_response_updates } if Raif::ModelCompletionBatch::TERMINAL_STATUSES.include?(new_status) && batch.ended_at.nil? updates[:ended_at] = Time.current end batch.update!(updates) end new_status end |
#submit_batch!(batch) ⇒ Object
Submits all child Raif::ModelCompletion records as a single OpenAI batch. Three-step flow: build the JSONL string, upload it as a file with purpose=batch, then create the batch referencing the file.
The batch + child writes happen in a transaction so a partial failure (e.g. the network call succeeds but the child started_at update raises) leaves no submitted-but-unstamped state behind.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 39 def submit_batch!(batch) batch.assert_submittable! completions = batch.raif_model_completions.to_a raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no child completions" if completions.empty? jsonl = build_batch_jsonl(completions) input_file_id = with_batch_transient_retry(:submit_upload_input, batch_id: batch.id) do upload_batch_input_file!(jsonl) end response = with_batch_transient_retry(:submit_create, batch_id: batch.id) do batch_connection.post("batches") do |req| req.body = { input_file_id: input_file_id, endpoint: batch_endpoint_path, completion_window: Raif.config.open_ai_batch_completion_window } end end body = response.body submitted_at = Time.current Raif::ModelCompletionBatch.transaction do batch.update!( provider_batch_id: body["id"], status: map_batch_status(body["status"]) || "submitted", submitted_at: submitted_at, started_at: submitted_at, provider_response: (batch.provider_response || {}).merge( "input_file_id" => input_file_id, "endpoint" => batch_endpoint_path ), request_counts: body["request_counts"] || {} ) # Single UPDATE for all children that don't already have a started_at, # filtered in SQL so we can't stomp a started_at that was set by another # process between when we loaded `completions` and now. batch.raif_model_completions.where(started_at: nil).update_all(started_at: submitted_at) end batch end |