diff --git a/src/contracts/adapter.ts b/src/contracts/adapter.ts index 8bd8e12..c7a7fdd 100644 --- a/src/contracts/adapter.ts +++ b/src/contracts/adapter.ts @@ -205,6 +205,16 @@ export interface Adapter { */ destroy(): Promise + /** + * Run adapter-specific migrations needed after a major version upgrade. + * + * This method is idempotent — it is always safe to call multiple times. + * Adapters that have no pending migrations return immediately. + * + * Call this once during your deployment process before starting workers. + */ + migrate(): Promise + /** * Create or update a schedule. * diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index 3671cc4..1b98825 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -385,6 +385,10 @@ export class FakeAdapter implements Adapter { return Promise.resolve() } + migrate(): Promise { + return Promise.resolve() + } + async upsertSchedule(config: ScheduleConfig): Promise { const id = config.id ?? randomUUID() const existing = this.#schedules.get(id) diff --git a/src/drivers/knex_adapter.ts b/src/drivers/knex_adapter.ts index adeb610..7c43b03 100644 --- a/src/drivers/knex_adapter.ts +++ b/src/drivers/knex_adapter.ts @@ -88,6 +88,8 @@ export class KnexAdapter implements Adapter { } } + async migrate(): Promise {} + async pop(): Promise { return this.popFrom('default') } diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index ad3626e..178bb2c 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -30,6 +30,7 @@ import { const redisKey = 'jobs' const schedulesKey = 'schedules' const schedulesIndexKey = 'schedules::index' +const schedulesDueKey = 'schedules::due' type RedisConfig = Redis | RedisOptions /** @@ -65,7 +66,6 @@ export class RedisAdapter implements Adapter { readonly #connection: Redis readonly #ownsConnection: boolean #workerId: string = '' - constructor(connection: Redis, ownsConnection: boolean = false) { this.#connection = connection this.#ownsConnection = ownsConnection @@ -408,10 +408,11 @@ export class RedisAdapter implements Adapter { const id = config.id ?? randomUUID() const now = Date.now() const scheduleKey = `${schedulesKey}::${id}` - const [existingRunCount, existingCreatedAt] = await this.#connection.hmget( + const [existingRunCount, existingCreatedAt, existingNextRunAt] = await this.#connection.hmget( scheduleKey, 'run_count', - 'created_at' + 'created_at', + 'next_run_at' ) const scheduleData: Record = { @@ -430,13 +431,17 @@ export class RedisAdapter implements Adapter { if (config.to !== undefined) scheduleData.to_date = config.to.getTime().toString() if (config.limit !== undefined) scheduleData.run_limit = config.limit.toString() - // Upsert schedule and clear stale optional fields from previous config. - await this.#connection + const multi = this.#connection .multi() .hdel(scheduleKey, 'cron_expression', 'every_ms', 'from_date', 'to_date', 'run_limit') .hset(scheduleKey, scheduleData) .sadd(schedulesIndexKey, id) - .exec() + + if (existingNextRunAt) { + multi.zadd(schedulesDueKey, Number.parseInt(existingNextRunAt, 10), id) + } + + await multi.exec() return id } @@ -512,14 +517,38 @@ export class RedisAdapter implements Adapter { } if (updates.runCount !== undefined) data.run_count = updates.runCount.toString() - if (Object.keys(data).length > 0) { - await this.#connection.hset(scheduleKey, data) + if (Object.keys(data).length === 0) return + + const multi = this.#connection.multi().hset(scheduleKey, data) + + if (updates.nextRunAt) { + multi.zadd(schedulesDueKey, updates.nextRunAt.getTime(), id) + } else if (updates.nextRunAt === null || updates.status === 'paused') { + multi.zrem(schedulesDueKey, id) } + + if (updates.status === 'active' && updates.nextRunAt === undefined) { + const existing = await this.#connection.hget(scheduleKey, 'next_run_at') + if (existing) { + multi.zadd(schedulesDueKey, Number.parseInt(existing, 10), id) + } + } + + await multi.exec() } async deleteSchedule(id: string): Promise { const scheduleKey = `${schedulesKey}::${id}` - await this.#connection.multi().del(scheduleKey).srem(schedulesIndexKey, id).exec() + await this.#connection + .multi() + .del(scheduleKey) + .srem(schedulesIndexKey, id) + .zrem(schedulesDueKey, id) + .exec() + } + + async migrate(): Promise { + await this.backfillDueIndex() } async claimDueSchedule(): Promise { @@ -527,7 +556,7 @@ export class RedisAdapter implements Adapter { const result = await this.#connection.eval( CLAIM_SCHEDULE_SCRIPT, 2, - schedulesIndexKey, + schedulesDueKey, `${schedulesKey}::`, now.toString() ) @@ -549,7 +578,6 @@ export class RedisAdapter implements Adapter { }) const nextRun = cron.next().toDate().getTime() - // Check limits before updating const runCount = Number.parseInt(data.run_count || '0', 10) + 1 const runLimit = data.run_limit ? Number.parseInt(data.run_limit, 10) : null const toDate = data.to_date ? Number.parseInt(data.to_date, 10) : null @@ -562,16 +590,51 @@ export class RedisAdapter implements Adapter { newNextRunAt = '' } - await this.#connection.hset( - `${schedulesKey}::${data.id}`, - 'next_run_at', - newNextRunAt.toString() - ) + const scheduleKey = `${schedulesKey}::${data.id}` + const multi = this.#connection + .multi() + .hset(scheduleKey, 'next_run_at', newNextRunAt.toString()) + + if (typeof newNextRunAt === 'number') { + multi.zadd(schedulesDueKey, newNextRunAt, data.id) + } else { + multi.zrem(schedulesDueKey, data.id) + } + + await multi.exec() } return this.#hashToScheduleData(data) } + async backfillDueIndex(): Promise { + const ids = await this.#connection.smembers(schedulesIndexKey) + if (ids.length === 0) return 0 + + const pipeline = this.#connection.pipeline() + for (const id of ids) { + pipeline.hmget(`${schedulesKey}::${id}`, 'next_run_at', 'status') + } + const results = await pipeline.exec() + if (!results) return 0 + + const addPipeline = this.#connection.pipeline() + let count = 0 + + for (let i = 0; i < ids.length; i++) { + const [err, values] = results[i] + if (err || !values) continue + const [nextRunAt, status] = values as [string | null, string | null] + if (nextRunAt && status === 'active') { + addPipeline.zadd(schedulesDueKey, Number.parseInt(nextRunAt, 10), ids[i]) + count++ + } + } + + if (count > 0) await addPipeline.exec() + return count + } + #hashToScheduleData(data: Record): ScheduleData { return { id: data.id, diff --git a/src/drivers/redis_scripts.ts b/src/drivers/redis_scripts.ts index e0ff6dd..82e00cb 100644 --- a/src/drivers/redis_scripts.ts +++ b/src/drivers/redis_scripts.ts @@ -461,74 +461,105 @@ ${REDIS_JOB_STORAGE_LUA} ` /** - * Lua script for atomically claiming a due schedule. - * Iterates the schedule index server-side and claims the first due schedule. - * Returns the schedule data if claimed, nil otherwise. + * Lua script for atomically claiming a due schedule using a sorted set index. + * + * Uses ZRANGEBYSCORE on schedules::due (scored by next_run_at) for O(log N) + * lookup instead of scanning all schedule hashes via SMEMBERS. + * + * Stale entries (paused, exhausted, deleted) are cleaned from the ZSET on + * sight so subsequent calls skip them. + * + * KEYS[1] = schedules::due (the ZSET) + * KEYS[2] = schedule key prefix (e.g. "schedules::") + * ARGV[1] = now (epoch milliseconds) */ export const CLAIM_SCHEDULE_SCRIPT = ` - local schedules_index_key = KEYS[1] - local schedule_key_prefix = KEYS[2] + local due_key = KEYS[1] + local prefix = KEYS[2] local now = tonumber(ARGV[1]) - local ids = redis.call('SMEMBERS', schedules_index_key) + while true do + local candidates = redis.call('ZRANGEBYSCORE', due_key, '-inf', tostring(now), 'LIMIT', 0, 1) + + if #candidates == 0 then + return nil + end - for i = 1, #ids do - local schedule_key = schedule_key_prefix .. ids[i] + local id = candidates[1] + local schedule_key = prefix .. id -- Get schedule data local data = redis.call('HGETALL', schedule_key) - if #data > 0 then + + -- Deleted schedule still in ZSET + if #data == 0 then + redis.call('ZREM', due_key, id) + else -- Convert HGETALL result to table local schedule = {} for j = 1, #data, 2 do schedule[data[j]] = data[j + 1] end - -- Check if schedule is due - if schedule.status == 'active' then - local next_run_at = tonumber(schedule.next_run_at) - - if next_run_at and next_run_at <= now then - local run_count = tonumber(schedule.run_count or '0') - local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil - local to_date = schedule.to_date and tonumber(schedule.to_date) or nil - - -- Check limits - if not (run_limit and run_count >= run_limit) and not (to_date and now > to_date) then - -- This schedule is claimable - atomically update it - local new_run_count = run_count + 1 - - -- Calculate new next_run_at (simple interval-based for now) - -- Complex cron calculation happens in the caller - local new_next_run_at = '' - local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil - if every_ms then - new_next_run_at = tostring(now + every_ms) - end + -- Check if schedule is active + if schedule.status ~= 'active' then + redis.call('ZREM', due_key, id) + else + -- Hash is the source of truth for next_run_at. + -- If the ZSET score is stale, repair it and skip this candidate. + local hash_nra = schedule.next_run_at + if not hash_nra or hash_nra == '' then + redis.call('ZREM', due_key, id) + elseif tonumber(hash_nra) > now then + redis.call('ZADD', due_key, tonumber(hash_nra), id) + else + local run_count = tonumber(schedule.run_count or '0') + local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil + local to_date = schedule.to_date and tonumber(schedule.to_date) or nil - -- Check if we've hit the limit after this run - if run_limit and new_run_count >= run_limit then - new_next_run_at = '' - end + -- Check limits + if (run_limit and run_count >= run_limit) or (to_date and now > to_date) then + redis.call('ZREM', due_key, id) + else + -- This schedule is claimable - atomically update it + local new_run_count = run_count + 1 + + -- Calculate new next_run_at (simple interval-based for now) + -- Complex cron calculation happens in the caller + local new_next_run_at = '' + local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil + if every_ms then + new_next_run_at = tostring(now + every_ms) + end - -- Check if past end date - if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then - new_next_run_at = '' - end + -- Check if we've hit the limit after this run + if run_limit and new_run_count >= run_limit then + new_next_run_at = '' + end - -- Update the schedule atomically - redis.call('HSET', schedule_key, - 'next_run_at', new_next_run_at, - 'last_run_at', tostring(now), - 'run_count', tostring(new_run_count)) + -- Check if past end date + if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then + new_next_run_at = '' + end - -- Return the schedule data (before update) as JSON - return cjson.encode(schedule) + -- Update the schedule atomically + redis.call('HSET', schedule_key, + 'next_run_at', new_next_run_at, + 'last_run_at', tostring(now), + 'run_count', tostring(new_run_count)) + + -- Update or remove from ZSET + if new_next_run_at ~= '' then + redis.call('ZADD', due_key, tonumber(new_next_run_at), id) + else + redis.call('ZREM', due_key, id) end + + -- Return the schedule data (before update) as JSON + return cjson.encode(schedule) + end end end end end - - return nil ` diff --git a/src/drivers/sync_adapter.ts b/src/drivers/sync_adapter.ts index d97fa55..e94aa7b 100644 --- a/src/drivers/sync_adapter.ts +++ b/src/drivers/sync_adapter.ts @@ -118,6 +118,10 @@ export class SyncAdapter implements Adapter { return Promise.resolve() } + migrate(): Promise { + return Promise.resolve() + } + upsertSchedule(_config: ScheduleConfig): Promise { // No-op: schedules don't make sense for sync adapter // Return a fake ID so code doesn't break in dev diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index fa64e31..40f40e5 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -293,6 +293,10 @@ export class MemoryAdapter implements Adapter { return Promise.resolve() } + migrate(): Promise { + return Promise.resolve() + } + async upsertSchedule(config: ScheduleConfig): Promise { const id = config.id ?? randomUUID() const existing = this.#schedules.get(id) diff --git a/tests/adapter.spec.ts b/tests/adapter.spec.ts index 7285c3d..5e3209a 100644 --- a/tests/adapter.spec.ts +++ b/tests/adapter.spec.ts @@ -604,6 +604,92 @@ test.group('Adapter | Redis', (group) => { assert.isNull(await connection.hget(metadataKey, 'metadata-stalled-uuid-1')) assert.isNull(await adapter.getJob('metadata-stalled-uuid-1', queue)) }) + + test('backfillDueIndex populates ZSET for pre-existing schedules', async ({ assert }) => { + const adapter = new RedisAdapter(connection) + + // Simulate pre-upgrade schedule data: write hash + index directly, skip ZSET + const id = 'pre-existing-schedule' + const pastRunAt = (Date.now() - 5_000).toString() + await connection + .multi() + .hset(`schedules::${id}`, { + id, + name: 'LegacyJob', + payload: '{}', + status: 'active', + every_ms: '60000', + timezone: 'UTC', + next_run_at: pastRunAt, + last_run_at: '', + run_count: '0', + created_at: Date.now().toString(), + }) + .sadd('schedules::index', id) + .exec() + + // Without backfill, ZSET has no entry so claim returns null + const beforeBackfill = await adapter.claimDueSchedule() + assert.isNull(beforeBackfill) + + await adapter.backfillDueIndex() + + const afterBackfill = await adapter.claimDueSchedule() + assert.isNotNull(afterBackfill) + assert.equal(afterBackfill!.id, id) + }) + + test('backfillDueIndex is idempotent', async ({ assert }) => { + const adapter = new RedisAdapter(connection) + + await adapter.upsertSchedule({ + id: 'idempotent-schedule', + name: 'TestJob', + payload: {}, + everyMs: 60_000, + timezone: 'UTC', + }) + await adapter.updateSchedule('idempotent-schedule', { + nextRunAt: new Date(Date.now() + 30_000), + }) + + // Clear the ZSET so backfill has work to do + await connection.del('schedules::due') + + const first = await adapter.backfillDueIndex() + const second = await adapter.backfillDueIndex() + + assert.isAbove(first, 0) + assert.equal(second, first) + + const score = await connection.zscore('schedules::due', 'idempotent-schedule') + assert.isNotNull(score) + }) + + test('stale ZSET score is self-healed during claim', async ({ assert }) => { + const adapter = new RedisAdapter(connection) + const id = 'stale-score-schedule' + const futureRunAt = Date.now() + 60_000 + + await adapter.upsertSchedule({ + id, + name: 'StaleJob', + payload: {}, + everyMs: 60_000, + timezone: 'UTC', + }) + await adapter.updateSchedule(id, { nextRunAt: new Date(futureRunAt) }) + + // Corrupt the ZSET score to a past value while hash still says future + await connection.zadd('schedules::due', Date.now() - 10_000, id) + + const claimed = await adapter.claimDueSchedule() + assert.isNull(claimed, 'should not claim when hash says schedule is not due yet') + + // ZSET score should have been repaired to match the hash + const repairedScore = await connection.zscore('schedules::due', id) + assert.equal(Number(repairedScore), futureRunAt) + }) }) test.group('Adapter | Knex (SQLite)', (group) => {