--[[ Function to fetch the next job to process. Tries to get the next job to avoid an extra roundtrip if the queue is not closing and not rate limited. Input: waitKey - wait list key activeKey - active list key prioritizedKey - prioritized sorted set key eventStreamKey - event stream key rateLimiterKey - rate limiter key delayedKey - delayed sorted set key pausedKey - paused list key metaKey - meta hash key pcKey - priority counter key markerKey - marker key prefix - keys prefix timestamp - current timestamp opts - options table: token (required) - lock token used when locking jobs lockDuration (required) - lock duration for acquired jobs limiter (optional) - rate limiter options table (e.g. { max = number }) ]] -- Includes --- @include "getNextDelayedTimestamp" --- @include "getRateLimitTTL" --- @include "getTargetQueueList" --- @include "moveJobFromPrioritizedToActive" --- @include "prepareJobForProcessing" --- @include "promoteDelayedJobs" local function fetchNextJob(waitKey, activeKey, prioritizedKey, eventStreamKey, rateLimiterKey, delayedKey, pausedKey, metaKey, pcKey, markerKey, prefix, timestamp, opts) local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration = getTargetQueueList(metaKey, activeKey, waitKey, pausedKey) -- Check if there are delayed jobs that can be promoted promoteDelayedJobs(delayedKey, markerKey, target, prioritizedKey, eventStreamKey, prefix, timestamp, pcKey, isPausedOrMaxed) local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max'])) -- Check if we are rate limited first. local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey) if expireTime > 0 then return {0, 0, expireTime, 0} end -- paused or maxed queue if isPausedOrMaxed then return {0, 0, 0, 0} end local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration local jobId = rcall("RPOPLPUSH", waitKey, activeKey) if jobId then -- Markers in waitlist DEPRECATED in v5: Remove in v6. if string.sub(jobId, 1, 2) == "0:" then rcall("LREM", activeKey, 1, jobId) -- If jobId is special ID 0:delay (delay greater than 0), then there is no job to process -- but if ID is 0:0, then there is at least 1 prioritized job to process if jobId == "0:0" then jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey) return prepareJobForProcessing(prefix, rateLimiterKey, eventStreamKey, jobId, timestamp, maxJobs, limiterDuration, markerKey, opts) end else return prepareJobForProcessing(prefix, rateLimiterKey, eventStreamKey, jobId, timestamp, maxJobs, limiterDuration, markerKey, opts) end else jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey) if jobId then return prepareJobForProcessing(prefix, rateLimiterKey, eventStreamKey, jobId, timestamp, maxJobs, limiterDuration, markerKey, opts) end end -- Return the timestamp for the next delayed job if any. local nextTimestamp = getNextDelayedTimestamp(delayedKey) if nextTimestamp ~= nil then -- The result is guaranteed to be positive, since the -- ZRANGEBYSCORE command would have return a job otherwise. return {0, 0, 0, nextTimestamp} end end