From c20cc8f2d1aab2e37856b6575b9c4f9d1334bb6f Mon Sep 17 00:00:00 2001 From: Adhik Joshi Date: Fri, 19 Jun 2026 17:05:08 +0530 Subject: [PATCH] fix: keep queued_requests in sync with ml_tasks (stop unbounded growth) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `queued_requests` (the sorted-set index behind queue_num / queue_time) was added on enqueue but only removed on explicit cancel — never when a worker claimed or completed a task. So it accumulated every task ever enqueued (observed one deepfake queue at 934k, ~99% already-resolved), wildly inflating reported queue depth/ETA and bloating Redis memory. Now mirror `ml_tasks` at every boundary: - zrem queued_requests when a worker claims a task (blpop) - zadd queued_requests on every re-queue (stuck / delayed / task-not-allowed) - clear queued_requests in delete_queue() All 49 tests pass. --- modelq/app/base.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/modelq/app/base.py b/modelq/app/base.py index b7f8321..eca82c4 100644 --- a/modelq/app/base.py +++ b/modelq/app/base.py @@ -293,7 +293,8 @@ def requeue_stuck_processing_tasks(self, threshold: float = 180.0): # Push it back into ml_tasks self.redis_client.rpush("ml_tasks", json.dumps(task_dict)) - + self.redis_client.zadd("queued_requests", {task_id: now}) + # Remove from processing set self.redis_client.srem("processing_tasks", task_id) @@ -487,7 +488,8 @@ def enqueue_task(self, task_data: dict, payload: dict): def delete_queue(self): self.redis_client.ltrim("ml_tasks", 1, 0) - + self.redis_client.delete("queued_requests") + def enqueue_delayed_task(self, task_dict: dict, delay_seconds: int): """ Enqueues a task into a Redis sorted set ('delayed_tasks') to be processed later. @@ -508,6 +510,11 @@ def requeue_delayed_tasks(self): for task_json in ready_tasks: self.redis_client.zrem("delayed_tasks", task_json) self.redis_client.lpush("ml_tasks", task_json) + try: + _td = json.loads(task_json) + self.redis_client.zadd("queued_requests", {_td["task_id"]: _td.get("queued_at", now)}) + except Exception: + pass time.sleep(1) def requeue_inprogress_tasks(self): @@ -682,6 +689,12 @@ def worker_loop(worker_id): continue task.status = "processing" + # The task has left the queue (claimed for processing). Keep the + # `queued_requests` index in sync with `ml_tasks`; otherwise it + # accumulates every completed/failed task forever and badly + # inflates queue_num / queue_time. + self.redis_client.zrem("queued_requests", task.task_id) + # Set started_at task_dict["started_at"] = time.time() @@ -734,6 +747,8 @@ def worker_loop(worker_id): f"Worker {worker_id} cannot process task {task.task_name}, re-queueing..." ) self.redis_client.rpush("ml_tasks", task_json) + self.redis_client.zadd("queued_requests", {task.task_id: task_dict.get("queued_at", time.time())}) + self.redis_client.srem("processing_tasks", task.task_id) except Exception as e: logger.error(