Module: Raif::Concerns::Llms::XAi::BatchInference

Extended by:
ActiveSupport::Concern
Includes:
SupportsBatchInference
Included in:
Llms::XAi
Defined in:
app/models/raif/concerns/llms/x_ai/batch_inference.rb

Overview

xAI Batch API support for Raif::Llms::XAi. Implements the Raif::Concerns::Llms::SupportsBatchInference contract on top of /v1/files (multipart JSONL upload) and /v1/batches.

xAI's Batch API supports two submission flows; we use the JSONL file-upload flow because it lets us send each per-request body in /v1/chat/completions shape (with messages), matching the sync path verbatim. The alternate REST append flow (POST /v1/batches/id/requests) only accepts the responses wrapper, which targets xAI's Responses API and would force a body-shape conversion (messages -> input, response_format -> text.format, ...).

Submission flow:

POST /v1/files                        -> multipart upload of the JSONL,
                                       returns { id: "file-..." }
POST /v1/batches { input_file_id }    -> creates the batch, returns
                                       { batch_id, state: {...} }
GET  /v1/batches/{id}                 -> status (counts: pending/success/error/cancelled)
GET  /v1/batches/{id}/results         -> paginated per-entry results
POST /v1/batches/{id}:cancel          -> cancels pending entries

Result envelope per entry (same for both submission flows):

{
"batch_request_id": "...",
"batch_result": {
  "response": {
    "chat_get_completion": { "id": ..., "choices": [...], "usage": {...} }
  }
}
}

The chat_get_completion payload matches the synchronous /v1/chat/completions response shape, so we feed it through the host class's #update_model_completion verbatim (the same parser used by the sync and streaming paths). Failures carry an "error" / "error_message" field instead of "response".

xAI has no batch-level state enum: terminal is derived locally from num_pending hitting zero (or an explicit cancel acknowledgement, or expires_at elapsing).

Instance Method Summary collapse

Instance Method Details

#apply_batch_result(mc, raw_result) ⇒ Object

Applies one per-entry xAI batch result envelope to a Raif::ModelCompletion. The success path feeds batch_result.response.chat_get_completion through the host class's #update_model_completion (same parser used by the sync path), so token counts, tool calls, and response shape are populated identically. The 50% batch discount is applied automatically by Raif::ModelCompletion#calculate_costs.



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'app/models/raif/concerns/llms/x_ai/batch_inference.rb', line 209

def apply_batch_result(mc, raw_result)
  batch_result = raw_result["batch_result"] || {}
  response_envelope = batch_result["response"] || raw_result["response"]
  error_envelope = batch_result["error"] || raw_result["error"] || raw_result["error_message"]

  mc.started_at ||= mc.raif_model_completion_batch&.started_at || Time.current

  chat_payload = response_envelope.is_a?(Hash) ? response_envelope["chat_get_completion"] : nil

  if chat_payload.is_a?(Hash)
    update_model_completion(mc, chat_payload)
    mc.completed!
  else
    err_message = if error_envelope.is_a?(Hash)
      error_envelope["message"] || error_envelope["error_message"]
    else
      error_envelope
    end

    mc.failure_error = "xAI batch entry failed"
    mc.failure_reason = (err_message.presence || "unknown xAI batch failure").to_s.truncate(255)
    mc.failed!
  end

  mc
end

#batch_classObject



51
52
53
# File 'app/models/raif/concerns/llms/x_ai/batch_inference.rb', line 51

def batch_class
  Raif::ModelCompletionBatches::XAi
end

#cancel_batch!(batch) ⇒ Object

Cancellation is fire-and-forget for pending entries -- xAI continues to serve already-processed results, but no further entries are processed. The provider response after a cancel may still report num_pending > 0 if the in-flight cancel hasn't propagated yet; treat as transitional in_progress until counts settle.



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'app/models/raif/concerns/llms/x_ai/batch_inference.rb', line 130

def cancel_batch!(batch)
  raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no provider_batch_id" if batch.provider_batch_id.blank?
  raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} is already terminal (status=#{batch.status})" if batch.terminal?

  response = batch_connection.post("batches/#{batch.provider_batch_id}:cancel")
  body = response.body
  new_status = derive_batch_status(body, batch, post_cancel: true)

  batch.with_lock do
    return batch.status if batch.terminal?

    provider_response_updates = (batch.provider_response || {}).merge(
      "expires_at" => body["expires_at"],
      "cost_breakdown" => body["cost_breakdown"]
    ).compact

    updates = {
      status: new_status,
      request_counts: derive_request_counts(body) || batch.request_counts,
      provider_response: provider_response_updates
    }
    if Raif::ModelCompletionBatch::TERMINAL_STATUSES.include?(new_status) && batch.ended_at.nil?
      updates[:ended_at] = Time.current
    end

    batch.update!(updates)
  end

  new_status
end

#fetch_batch_results!(batch) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'app/models/raif/concerns/llms/x_ai/batch_inference.rb', line 161

def fetch_batch_results!(batch)
  completions_by_id = batch.raif_model_completions.index_by(&:batch_custom_id)

  pagination_token = nil
  loop do
    params = {}
    params[:pagination_token] = pagination_token if pagination_token.present?

    response = batch_connection.get("batches/#{batch.provider_batch_id}/results", params)
    body = response.body || {}

    Array(body["results"]).each do |raw|
      custom_id = raw["batch_request_id"]
      mc = completions_by_id[custom_id]
      if mc.nil?
        Raif.logger.warn(
          "xAI batch results: batch_request_id #{custom_id.inspect} did not match any child completion in batch ##{batch.id}"
        )
        next
      end

      apply_batch_result(mc, raw)
    end

    pagination_token = body["pagination_token"].presence
    break if pagination_token.nil?
  end

  completions_by_id.each_value do |mc|
    mc.reload
    next if mc.completed? || mc.failed?

    mc.started_at ||= batch.started_at
    mc.failure_error = "xAI batch entry missing"
    mc.failure_reason = "Result not present in /results stream (batch ##{batch.id})"
    mc.failed!
  end

  batch.recalculate_costs!
  batch
end

#fetch_batch_status!(batch) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'app/models/raif/concerns/llms/x_ai/batch_inference.rb', line 97

def fetch_batch_status!(batch)
  response = batch_connection.get("batches/#{batch.provider_batch_id}")
  body = response.body
  new_status = derive_batch_status(body, batch)

  batch.with_lock do
    return batch.status if batch.terminal?

    provider_response_updates = (batch.provider_response || {}).merge(
      "expires_at" => body["expires_at"],
      "cost_breakdown" => body["cost_breakdown"]
    ).compact

    updates = {
      status: new_status,
      request_counts: derive_request_counts(body) || batch.request_counts,
      provider_response: provider_response_updates
    }
    if Raif::ModelCompletionBatch::TERMINAL_STATUSES.include?(new_status) && batch.ended_at.nil?
      updates[:ended_at] = Time.current
    end

    batch.update!(updates)
  end

  new_status
end

#submit_batch!(batch) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'app/models/raif/concerns/llms/x_ai/batch_inference.rb', line 55

def submit_batch!(batch)
  batch.assert_submittable!

  completions = batch.raif_model_completions.to_a
  raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no child completions" if completions.empty?

  jsonl = build_batch_jsonl(completions)
  input_file_id = upload_batch_input_file!(jsonl)

  response = batch_connection.post("batches") do |req|
    req.body = { name: "raif_batch_#{batch.id}", input_file_id: input_file_id }
  end
  body = response.body.is_a?(Hash) ? response.body : {}
  provider_batch_id = body["batch_id"] || body["id"]

  if provider_batch_id.blank?
    raise Raif::Errors::InvalidBatchError,
      "xAI batch create returned no batch id (body=#{response.body.inspect})"
  end

   = Time.current

  Raif::ModelCompletionBatch.transaction do
    batch.update!(
      provider_batch_id: provider_batch_id,
      status: "submitted",
      submitted_at: ,
      started_at: ,
      provider_response: (batch.provider_response || {}).merge(
        "input_file_id" => input_file_id,
        "expires_at" => body["expires_at"],
        "cost_breakdown" => body["cost_breakdown"]
      ).compact,
      request_counts: derive_request_counts(body) || batch.request_counts
    )

    batch.raif_model_completions.where(started_at: nil).update_all(started_at: )
  end

  batch
end