diff --git a/modelq/app/base.py b/modelq/app/base.py index b7f8321..eca82c4 100644 --- a/modelq/app/base.py +++ b/modelq/app/base.py @@ -293,7 +293,8 @@ def requeue_stuck_processing_tasks(self, threshold: float = 180.0): # Push it back into ml_tasks self.redis_client.rpush("ml_tasks", json.dumps(task_dict)) - + self.redis_client.zadd("queued_requests", {task_id: now}) + # Remove from processing set self.redis_client.srem("processing_tasks", task_id) @@ -487,7 +488,8 @@ def enqueue_task(self, task_data: dict, payload: dict): def delete_queue(self): self.redis_client.ltrim("ml_tasks", 1, 0) - + self.redis_client.delete("queued_requests") + def enqueue_delayed_task(self, task_dict: dict, delay_seconds: int): """ Enqueues a task into a Redis sorted set ('delayed_tasks') to be processed later. @@ -508,6 +510,11 @@ def requeue_delayed_tasks(self): for task_json in ready_tasks: self.redis_client.zrem("delayed_tasks", task_json) self.redis_client.lpush("ml_tasks", task_json) + try: + _td = json.loads(task_json) + self.redis_client.zadd("queued_requests", {_td["task_id"]: _td.get("queued_at", now)}) + except Exception: + pass time.sleep(1) def requeue_inprogress_tasks(self): @@ -682,6 +689,12 @@ def worker_loop(worker_id): continue task.status = "processing" + # The task has left the queue (claimed for processing). Keep the + # `queued_requests` index in sync with `ml_tasks`; otherwise it + # accumulates every completed/failed task forever and badly + # inflates queue_num / queue_time. + self.redis_client.zrem("queued_requests", task.task_id) + # Set started_at task_dict["started_at"] = time.time() @@ -734,6 +747,8 @@ def worker_loop(worker_id): f"Worker {worker_id} cannot process task {task.task_name}, re-queueing..." ) self.redis_client.rpush("ml_tasks", task_json) + self.redis_client.zadd("queued_requests", {task.task_id: task_dict.get("queued_at", time.time())}) + self.redis_client.srem("processing_tasks", task.task_id) except Exception as e: logger.error(