Class: Raif::PollModelCompletionBatchJob
- Inherits:
-
ApplicationJob
- Object
- ApplicationJob
- ApplicationJob
- Raif::PollModelCompletionBatchJob
- Defined in:
- app/jobs/raif/poll_model_completion_batch_job.rb
Overview
Self-rescheduling poller for Raif::ModelCompletionBatch.
Each invocation:
- Loads the batch. Returns immediately if already terminal (idempotent).
- Asks the LLM provider for the batch's current status.
- If terminal:
- on
ended: fetches per-entry results via the LLM, which marks each child Raif::ModelCompletion as completed or failed. - on
failed/canceled/expired: marks every child completion that's still pending as failed with the batch's terminal status as the reason; no per-entry fetch happens. Then dispatches the batch's completion handler (if any).
- on
- Otherwise re-enqueues itself with the next backoff delay from Raif.config.model_completion_batch_poll_schedule (the last entry repeats once exhausted) until Raif.config.model_completion_batch_max_age has elapsed since the batch was submitted, after which the batch is force-failed and the handler is dispatched.
On a transient error from the provider (the exception classes listed in Raif.config.llm_request_retriable_exceptions), the job logs + notifies but reschedules itself instead of re-raising, so the chain self-heals across the kinds of failures that ActiveJob retry policies typically cover. Non-transient errors are re-raised so the host's job adapter surfaces them.
The self-reschedule uses perform_later(wait: ...), so the configured
ActiveJob queue adapter must support scheduled jobs. If the host app's
queue adapter exposes a job-grouping primitive that auto-enrolls jobs
enqueued from within a running job, the polling chain will participate
in that grouping transparently.
Instance Method Summary collapse
Instance Method Details
#perform(batch_id, attempt: 1) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'app/jobs/raif/poll_model_completion_batch_job.rb', line 36 def perform(batch_id, attempt: 1) batch = Raif::ModelCompletionBatch.find_by(id: batch_id) if batch.nil? Raif.logger.info("Raif::PollModelCompletionBatchJob ##{batch_id}: batch not found; skipping.") return end Raif.logger.info( "Raif::PollModelCompletionBatchJob ##{batch.id}: polling " \ "(attempt=#{attempt}, provider=#{batch.class.name}, llm_model_key=#{batch.llm_model_key.inspect}, " \ "provider_batch_id=#{batch.provider_batch_id.inspect}, status=#{batch.status})" ) # If the batch is already terminal at the top of perform, this run # exists either to retry a handler that raised on a previous attempt # (host's job adapter retried us, or the safety sweep beat us to the # terminal transition) or because the polling chain raced with another # finalize path. dispatch_completion_handler! is gated on # handler_dispatched_at, so it's a no-op when the handler already ran # successfully -- otherwise the early return here would silently # swallow handler errors and the consumer's on_batch_completion block # would never run. if batch.terminal? Raif.logger.info( "Raif::PollModelCompletionBatchJob ##{batch.id}: batch already terminal " \ "(status=#{batch.status}, handler_dispatched_at=#{batch.handler_dispatched_at&.iso8601 || "nil"}); " \ "dispatching completion handler and returning." ) batch.dispatch_completion_handler! return end previous_status = batch.status new_status = batch.fetch_status! Raif.logger.info( "Raif::PollModelCompletionBatchJob ##{batch.id}: provider returned status=#{new_status.inspect} " \ "(was #{previous_status.inspect}, request_counts=#{batch.request_counts.inspect})" ) if batch.terminal? Raif.logger.info( "Raif::PollModelCompletionBatchJob ##{batch.id}: terminal status reached (#{batch.status}); " \ "finalizing and dispatching completion handler." ) batch.finalize! batch.dispatch_completion_handler! return end if batch.max_age_exceeded? Raif.logger.warn( "Raif::PollModelCompletionBatchJob ##{batch.id}: max_age exceeded " \ "(submitted_at=#{batch.submitted_at&.iso8601}, max_age=#{Raif.config.model_completion_batch_max_age}); expiring." ) batch.expire!(reason: "Batch exceeded Raif.config.model_completion_batch_max_age (#{Raif.config.model_completion_batch_max_age})") batch.dispatch_completion_handler! return end reschedule!(batch, attempt: attempt) rescue StandardError => e log_and_notify_error(batch_id, e) if transient_error?(e) # Reload (the batch may have been transitioned by another process while # we were in flight). Three cases after reload: # 1. Batch missing -> nothing to reschedule against, end of chain. # 2. Batch terminal AND handler already dispatched -> nothing to do. # 3. Batch terminal but handler NOT dispatched -> reschedule so the # next run can call dispatch_completion_handler! (the error may # have come from the handler itself, e.g. a host downstream blip). # 4. Batch non-terminal -> reschedule to pick up polling. batch&.reload if batch.nil? || (batch.terminal? && batch.handler_dispatched_at.present?) Raif.logger.info( "Raif::PollModelCompletionBatchJob ##{batch_id}: not rescheduling after transient error " \ "(#{e.class}); batch is missing or already finalized + handler dispatched." ) return end Raif.logger.info( "Raif::PollModelCompletionBatchJob ##{batch_id}: rescheduling after transient error " \ "(#{e.class}); the polling chain will pick up where it left off." ) reschedule!(batch, attempt: attempt) return end raise end |