Class: Raif::PollModelCompletionBatchJob

Inherits:
ApplicationJob
  • Object
show all
Defined in:
app/jobs/raif/poll_model_completion_batch_job.rb

Overview

Self-rescheduling poller for Raif::ModelCompletionBatch.

Each invocation:

  1. Loads the batch. Returns immediately if already terminal (idempotent).
  2. Asks the LLM provider for the batch's current status.
  3. If terminal:
    • on ended: fetches per-entry results via the LLM, which marks each child Raif::ModelCompletion as completed or failed.
    • on failed / canceled / expired: marks every child completion that's still pending as failed with the batch's terminal status as the reason; no per-entry fetch happens. Then dispatches the batch's completion handler (if any).
  4. Otherwise re-enqueues itself with the next backoff delay from Raif.config.model_completion_batch_poll_schedule (the last entry repeats once exhausted) until Raif.config.model_completion_batch_max_age has elapsed since the batch was submitted, after which the batch is force-failed and the handler is dispatched.

On a transient error from the provider (the exception classes listed in Raif.config.llm_request_retriable_exceptions), the job logs + notifies but reschedules itself instead of re-raising, so the chain self-heals across the kinds of failures that ActiveJob retry policies typically cover. Non-transient errors are re-raised so the host's job adapter surfaces them.

The self-reschedule uses perform_later(wait: ...), so the configured ActiveJob queue adapter must support scheduled jobs. If the host app's queue adapter exposes a job-grouping primitive that auto-enrolls jobs enqueued from within a running job, the polling chain will participate in that grouping transparently.

Instance Method Summary collapse

Instance Method Details

#perform(batch_id, attempt: 1) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'app/jobs/raif/poll_model_completion_batch_job.rb', line 36

def perform(batch_id, attempt: 1)
  batch = Raif::ModelCompletionBatch.find_by(id: batch_id)
  if batch.nil?
    Raif.logger.info("Raif::PollModelCompletionBatchJob ##{batch_id}: batch not found; skipping.")
    return
  end

  Raif.logger.info(
    "Raif::PollModelCompletionBatchJob ##{batch.id}: polling " \
      "(attempt=#{attempt}, provider=#{batch.class.name}, llm_model_key=#{batch.llm_model_key.inspect}, " \
      "provider_batch_id=#{batch.provider_batch_id.inspect}, status=#{batch.status})"
  )

  # If the batch is already terminal at the top of perform, this run
  # exists either to retry a handler that raised on a previous attempt
  # (host's job adapter retried us, or the safety sweep beat us to the
  # terminal transition) or because the polling chain raced with another
  # finalize path. dispatch_completion_handler! is gated on
  # handler_dispatched_at, so it's a no-op when the handler already ran
  # successfully -- otherwise the early return here would silently
  # swallow handler errors and the consumer's on_batch_completion block
  # would never run.
  if batch.terminal?
    Raif.logger.info(
      "Raif::PollModelCompletionBatchJob ##{batch.id}: batch already terminal " \
        "(status=#{batch.status}, handler_dispatched_at=#{batch.handler_dispatched_at&.iso8601 || "nil"}); " \
        "dispatching completion handler and returning."
    )
    batch.dispatch_completion_handler!
    return
  end

  previous_status = batch.status
  new_status = batch.fetch_status!
  Raif.logger.info(
    "Raif::PollModelCompletionBatchJob ##{batch.id}: provider returned status=#{new_status.inspect} " \
      "(was #{previous_status.inspect}, request_counts=#{batch.request_counts.inspect})"
  )

  if batch.terminal?
    Raif.logger.info(
      "Raif::PollModelCompletionBatchJob ##{batch.id}: terminal status reached (#{batch.status}); " \
        "finalizing and dispatching completion handler."
    )
    batch.finalize!
    batch.dispatch_completion_handler!
    return
  end

  if batch.max_age_exceeded?
    Raif.logger.warn(
      "Raif::PollModelCompletionBatchJob ##{batch.id}: max_age exceeded " \
        "(submitted_at=#{batch.&.iso8601}, max_age=#{Raif.config.model_completion_batch_max_age}); expiring."
    )
    batch.expire!(reason: "Batch exceeded Raif.config.model_completion_batch_max_age (#{Raif.config.model_completion_batch_max_age})")
    batch.dispatch_completion_handler!
    return
  end

  reschedule!(batch, attempt: attempt)
rescue StandardError => e
  log_and_notify_error(batch_id, e)

  if transient_error?(e)
    # Reload (the batch may have been transitioned by another process while
    # we were in flight). Three cases after reload:
    # 1. Batch missing -> nothing to reschedule against, end of chain.
    # 2. Batch terminal AND handler already dispatched -> nothing to do.
    # 3. Batch terminal but handler NOT dispatched -> reschedule so the
    #    next run can call dispatch_completion_handler! (the error may
    #    have come from the handler itself, e.g. a host downstream blip).
    # 4. Batch non-terminal -> reschedule to pick up polling.
    batch&.reload
    if batch.nil? || (batch.terminal? && batch.handler_dispatched_at.present?)
      Raif.logger.info(
        "Raif::PollModelCompletionBatchJob ##{batch_id}: not rescheduling after transient error " \
          "(#{e.class}); batch is missing or already finalized + handler dispatched."
      )
      return
    end

    Raif.logger.info(
      "Raif::PollModelCompletionBatchJob ##{batch_id}: rescheduling after transient error " \
        "(#{e.class}); the polling chain will pick up where it left off."
    )
    reschedule!(batch, attempt: attempt)
    return
  end

  raise
end