Module: Raif::Concerns::Llms::Anthropic::BatchInference

Extended by:
ActiveSupport::Concern
Includes:
SupportsBatchInference
Included in:
Llms::Anthropic
Defined in:
app/models/raif/concerns/llms/anthropic/batch_inference.rb

Overview

Anthropic Messages Batches API support for Raif::Llms::Anthropic. Implements the Raif::Concerns::Llms::SupportsBatchInference contract on top of /v1/messages/batches and the JSONL results stream.

The host LLM class is expected to provide #build_request_parameters and #update_model_completion -- these are reused verbatim from the synchronous path so prompt caching, tool definitions, and response shape carry over.

Instance Method Summary collapse

Instance Method Details

#apply_batch_result(mc, raw_result) ⇒ Object

Applies one per-entry batch result to a Raif::ModelCompletion. The success path feeds the embedded message payload through update_model_completion -- the same parser used by the synchronous and streaming paths -- so token counts, tool calls, citations, and response shape are populated identically. The 50% Anthropic batch discount is applied automatically by Raif::ModelCompletion#calculate_costs (because raif_model_completion_batch_id is set).



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'app/models/raif/concerns/llms/anthropic/batch_inference.rb', line 192

def apply_batch_result(mc, raw_result)
  result = raw_result["result"] || {}

  # Set started_at in-memory before any save below, so update_model_completion's
  # save (or mc.failed!'s save) persists it in a single round-trip.
  mc.started_at ||= mc.raif_model_completion_batch&.started_at || Time.current

  case result["type"]
  when "succeeded"
    update_model_completion(mc, result["message"])
    mc.completed!
  else
    type = result["type"].to_s
    error = result["error"] || {}
    mc.failure_error = "Anthropic batch entry #{type.presence || "failed"}"
    mc.failure_reason = (error["message"].presence || type.presence || "unknown failure").to_s.truncate(255)
    mc.failed!
  end

  mc
end

#batch_classObject



15
16
17
# File 'app/models/raif/concerns/llms/anthropic/batch_inference.rb', line 15

def batch_class
  Raif::ModelCompletionBatches::Anthropic
end

#cancel_batch!(batch) ⇒ Object

Sends a cancel request to Anthropic's Messages Batches API. Cancellation is asynchronous on Anthropic's side: the provider transitions to processing_status "canceling" first, then to "canceled" once in-flight entries finish. Returns the (possibly transitional) Raif status.



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'app/models/raif/concerns/llms/anthropic/batch_inference.rb', line 103

def cancel_batch!(batch)
  raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no provider_batch_id" if batch.provider_batch_id.blank?
  raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} is already terminal (status=#{batch.status})" if batch.terminal?

  response = batch_connection.post("messages/batches/#{batch.provider_batch_id}/cancel")
  body = response.body
  new_status = map_processing_status(body["processing_status"])

  batch.with_lock do
    return batch.status if batch.terminal?

    updates = {
      status: new_status,
      request_counts: body["request_counts"] || batch.request_counts,
      provider_response: (batch.provider_response || {}).merge(
        "results_url" => body["results_url"],
        "cancel_url" => body["cancel_url"]
      )
    }
    if Raif::ModelCompletionBatch::TERMINAL_STATUSES.include?(new_status) && batch.ended_at.nil?
      updates[:ended_at] = Time.current
    end

    batch.update!(updates)
  end

  new_status
end

#fetch_batch_results!(batch) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'app/models/raif/concerns/llms/anthropic/batch_inference.rb', line 132

def fetch_batch_results!(batch)
  raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no results_url" if batch.results_url.blank?

  completions_by_id = batch.raif_model_completions.index_by(&:batch_custom_id)

  response = batch_results_connection.get(batch.results_url)
  body = response.body.to_s

  body.each_line do |line|
    line = line.strip
    next if line.blank?

    begin
      raw = JSON.parse(line)
    rescue JSON::ParserError => e
      # One bad line shouldn't poison the rest of the batch. Skip it; any
      # child completion that never gets matched falls through to the
      # missing-entry sweep below and is force-failed there.
      Raif.logger.error(
        "Anthropic batch ##{batch.id} results: skipping malformed JSONL line " \
          "(#{e.class}: #{e.message}): #{line.inspect}"
      )
      next
    end

    custom_id = raw["custom_id"]
    mc = completions_by_id[custom_id]
    if mc.nil?
      Raif.logger.warn(
        "Anthropic batch results: custom_id #{custom_id.inspect} did not match any child completion in batch ##{batch.id}"
      )
      next
    end

    apply_batch_result(mc, raw)
  end

  # Anything that was never reported in the results stream (rare; possible if
  # the batch expired mid-flight or was canceled) is force-failed so the
  # workflow can advance.
  completions_by_id.each_value do |mc|
    mc.reload
    next if mc.completed? || mc.failed?

    mc.started_at ||= batch.started_at
    mc.failure_error = "Anthropic batch entry missing"
    mc.failure_reason = "Result not present in results stream (batch ##{batch.id})"
    mc.failed!
  end

  batch.recalculate_costs!
  batch
end

#fetch_batch_status!(batch) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'app/models/raif/concerns/llms/anthropic/batch_inference.rb', line 69

def fetch_batch_status!(batch)
  response = batch_connection.get("messages/batches/#{batch.provider_batch_id}")
  body = response.body
  new_status = map_processing_status(body["processing_status"])

  # Re-acquire a row-level lock + reload so we don't overwrite a status another
  # process (e.g. ExpireStuckModelCompletionBatchesJob) just transitioned to
  # terminal. Without this guard, a stale instance can stomp a `failed`
  # decision back to whatever the provider currently reports.
  batch.with_lock do
    return batch.status if batch.terminal?

    updates = {
      status: new_status,
      request_counts: body["request_counts"] || {},
      provider_response: (batch.provider_response || {}).merge(
        "results_url" => body["results_url"],
        "cancel_url" => body["cancel_url"]
      )
    }
    if Raif::ModelCompletionBatch::TERMINAL_STATUSES.include?(new_status) && batch.ended_at.nil?
      updates[:ended_at] = Time.current
    end

    batch.update!(updates)
  end

  new_status
end

#submit_batch!(batch) ⇒ Object

Submits all child Raif::ModelCompletion records of the batch as a single Anthropic Messages Batch. Each entry's params body is identical to what the synchronous /v1/messages endpoint would receive.

The batch + child writes happen in a transaction so a partial failure (e.g. the network call succeeds but the child started_at update raises) leaves no submitted-but-unstamped state behind.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'app/models/raif/concerns/llms/anthropic/batch_inference.rb', line 26

def submit_batch!(batch)
  batch.assert_submittable!

  completions = batch.raif_model_completions.to_a
  raise Raif::Errors::InvalidBatchError, "Batch ##{batch.id} has no child completions" if completions.empty?

  requests = completions.map do |mc|
    if mc.batch_custom_id.blank?
      raise Raif::Errors::InvalidBatchError, "Raif::ModelCompletion ##{mc.id} has blank batch_custom_id"
    end

    { custom_id: mc.batch_custom_id, params: build_request_parameters(mc) }
  end

  response = batch_connection.post("messages/batches") do |req|
    req.body = { requests: requests }
  end

  body = response.body
   = Time.current

  Raif::ModelCompletionBatch.transaction do
    batch.update!(
      provider_batch_id: body["id"],
      status: map_processing_status(body["processing_status"]) || "submitted",
      submitted_at: ,
      started_at: ,
      provider_response: (batch.provider_response || {}).merge(
        "results_url" => body["results_url"],
        "cancel_url" => body["cancel_url"]
      ),
      request_counts: body["request_counts"] || {}
    )

    # Single UPDATE for all children that don't already have a started_at,
    # filtered in SQL so we can't stomp a started_at that was set by another
    # process between when we loaded `completions` and now.
    batch.raif_model_completions.where(started_at: nil).update_all(started_at: )
  end

  batch
end