Module: Raif::Concerns::Llms::OpenAi::BatchInference

Extended by:
ActiveSupport::Concern
Includes:
SupportsBatchInference
Included in:
Llms::OpenAiBase
Defined in:
app/models/raif/concerns/llms/open_ai/batch_inference.rb

Overview

OpenAI Batches API support for Raif::Llms::OpenAiBase. Implements the Raif::Concerns::Llms::SupportsBatchInference contract on top of /v1/batches and the JSONL input/output file flow.

The host LLM class is expected to provide #build_request_parameters, #update_model_completion, and #supports_temperature? -- these are reused verbatim from the synchronous path so request body and response parsing are identical between the sync and batch paths. Subclasses must also implement #batch_endpoint_path to declare which OpenAI endpoint to target.

Instance Method Summary collapse

Instance Method Details

#apply_batch_result(mc, raw_result) ⇒ Object

Applies one per-entry batch result envelope to a Raif::ModelCompletion. OpenAI's batch envelope is { id, custom_id, response: { status_code, request_id, body }, error }. The success path delegates to the host class's existing #update_model_completion (the same one used by the synchronous and streaming paths), so token counts, tool calls, and response shape match. The 50% batch discount is applied automatically by Raif::ModelCompletion#calculate_costs.



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 186

def apply_batch_result(mc, raw_result)
  response_envelope = raw_result["response"]
  error_envelope = raw_result["error"]

  # Set started_at in-memory before any save below, so update_model_completion's
  # save (or mc.failed!'s save) persists it in a single round-trip.
  mc.started_at ||= mc.raif_model_completion_batch&.started_at || Time.current

  if response_envelope.is_a?(Hash) && (response_envelope["status_code"] || 200).to_i.between?(200, 299)
    update_model_completion(mc, response_envelope["body"])
    mc.completed!
  else
    err_message = if error_envelope.is_a?(Hash)
      error_envelope["message"]
    elsif response_envelope.is_a?(Hash)
      response_envelope.dig("body", "error", "message")
    end

    mc.failure_error = "OpenAI batch entry failed (status: #{response_envelope&.dig("status_code") || "unknown"})"
    mc.failure_reason = (err_message.presence || "unknown OpenAI batch failure").to_s.truncate(255)
    mc.failed!
  end

  mc
end

#batch_classObject



20
21
22
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 20

def batch_class
  Raif::ModelCompletionBatches::OpenAi
end

#batch_endpoint_pathString

Subclasses override to declare which OpenAI endpoint they target. The same value is used both as the JSONL line's url field (per-request endpoint) and as the OpenAI Batches API endpoint parameter (whole-batch endpoint).

Returns:

  • (String)

    e.g. "/v1/responses" or "/v1/chat/completions"

Raises:

  • (NotImplementedError)


28
29
30
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 28

def batch_endpoint_path
  raise NotImplementedError, "#{self.class.name} must implement #batch_endpoint_path"
end

#cancel_batch!(batch) ⇒ Object

Sends a cancel request to OpenAI's Batches API. Cancellation is asynchronous on OpenAI's side: the provider transitions to status "cancelling" first, then to "cancelled" (mapped to "canceled") once in-flight entries finish. Returns the (possibly transitional) Raif status.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 117

def cancel_batch!(batch)
  raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no provider_batch_id" if batch.provider_batch_id.blank?
  raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} is already terminal (status=#{batch.status})" if batch.terminal?

  response = batch_connection.post("batches/#{batch.provider_batch_id}/cancel")
  body = response.body
  new_status = map_batch_status(body["status"])

  batch.with_lock do
    return batch.status if batch.terminal?

    provider_response_updates = (batch.provider_response || {}).merge(
      "output_file_id" => body["output_file_id"],
      "error_file_id" => body["error_file_id"]
    )

    updates = {
      status: new_status,
      request_counts: body["request_counts"] || batch.request_counts,
      provider_response: provider_response_updates
    }
    if Raif::ModelCompletionBatch::TERMINAL_STATUSES.include?(new_status) && batch.ended_at.nil?
      updates[:ended_at] = Time.current
    end

    batch.update!(updates)
  end

  new_status
end

#fetch_batch_results!(batch) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 148

def fetch_batch_results!(batch)
  completions_by_id = batch.raif_model_completions.index_by(&:batch_custom_id)

  if batch.output_file_id.present?
    apply_batch_jsonl(batch, batch.output_file_id, completions_by_id)
  end

  # The error_file_id contains entries that errored before the model produced
  # a response (validation failures, request shape errors, etc.). These have
  # the same `{ id, custom_id, response, error }` envelope as the output file.
  if batch.error_file_id.present?
    apply_batch_jsonl(batch, batch.error_file_id, completions_by_id)
  end

  # Anything that was never reported in either file (rare; possible if the
  # batch expired mid-flight or was canceled) is left as a failed completion
  # so the workflow can advance.
  completions_by_id.each_value do |mc|
    mc.reload
    next if mc.completed? || mc.failed?

    mc.started_at ||= batch.started_at
    mc.failure_error = "OpenAI batch entry missing"
    mc.failure_reason = "Result not present in output_file or error_file (batch ##{batch.id})"
    mc.failed!
  end

  batch.recalculate_costs!
  batch
end

#fetch_batch_status!(batch) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 81

def fetch_batch_status!(batch)
  response = batch_connection.get("batches/#{batch.provider_batch_id}")
  body = response.body
  new_status = map_batch_status(body["status"])

  # Re-acquire a row-level lock + reload so we don't overwrite a status another
  # process (e.g. ExpireStuckModelCompletionBatchesJob) just transitioned to
  # terminal. Without this guard, a stale instance can stomp a `failed`
  # decision back to whatever the provider currently reports.
  batch.with_lock do
    return batch.status if batch.terminal?

    provider_response_updates = (batch.provider_response || {}).merge(
      "output_file_id" => body["output_file_id"],
      "error_file_id" => body["error_file_id"]
    )

    updates = {
      status: new_status,
      request_counts: body["request_counts"] || {},
      provider_response: provider_response_updates
    }
    if Raif::ModelCompletionBatch::TERMINAL_STATUSES.include?(new_status) && batch.ended_at.nil?
      updates[:ended_at] = Time.current
    end

    batch.update!(updates)
  end

  new_status
end

#submit_batch!(batch) ⇒ Object

Submits all child Raif::ModelCompletion records as a single OpenAI batch. Three-step flow: build the JSONL string, upload it as a file with purpose=batch, then create the batch referencing the file.

The batch + child writes happen in a transaction so a partial failure (e.g. the network call succeeds but the child started_at update raises) leaves no submitted-but-unstamped state behind.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'app/models/raif/concerns/llms/open_ai/batch_inference.rb', line 39

def submit_batch!(batch)
  batch.assert_submittable!

  completions = batch.raif_model_completions.to_a
  raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no child completions" if completions.empty?

  jsonl = build_batch_jsonl(completions)
  input_file_id = upload_batch_input_file!(jsonl)

  response = batch_connection.post("batches") do |req|
    req.body = {
      input_file_id: input_file_id,
      endpoint: batch_endpoint_path,
      completion_window: Raif.config.open_ai_batch_completion_window
    }
  end

  body = response.body
   = Time.current

  Raif::ModelCompletionBatch.transaction do
    batch.update!(
      provider_batch_id: body["id"],
      status: map_batch_status(body["status"]) || "submitted",
      submitted_at: ,
      started_at: ,
      provider_response: (batch.provider_response || {}).merge(
        "input_file_id" => input_file_id,
        "endpoint" => batch_endpoint_path
      ),
      request_counts: body["request_counts"] || {}
    )

    # Single UPDATE for all children that don't already have a started_at,
    # filtered in SQL so we can't stomp a started_at that was set by another
    # process between when we loaded `completions` and now.
    batch.raif_model_completions.where(started_at: nil).update_all(started_at: )
  end

  batch
end