Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions modelq/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ def requeue_stuck_processing_tasks(self, threshold: float = 180.0):

# Push it back into ml_tasks
self.redis_client.rpush("ml_tasks", json.dumps(task_dict))

self.redis_client.zadd("queued_requests", {task_id: now})

# Remove from processing set
self.redis_client.srem("processing_tasks", task_id)

Expand Down Expand Up @@ -487,7 +488,8 @@ def enqueue_task(self, task_data: dict, payload: dict):

def delete_queue(self):
self.redis_client.ltrim("ml_tasks", 1, 0)

self.redis_client.delete("queued_requests")

def enqueue_delayed_task(self, task_dict: dict, delay_seconds: int):
"""
Enqueues a task into a Redis sorted set ('delayed_tasks') to be processed later.
Expand All @@ -508,6 +510,11 @@ def requeue_delayed_tasks(self):
for task_json in ready_tasks:
self.redis_client.zrem("delayed_tasks", task_json)
self.redis_client.lpush("ml_tasks", task_json)
try:
_td = json.loads(task_json)
self.redis_client.zadd("queued_requests", {_td["task_id"]: _td.get("queued_at", now)})
except Exception:
pass
time.sleep(1)

def requeue_inprogress_tasks(self):
Expand Down Expand Up @@ -682,6 +689,12 @@ def worker_loop(worker_id):
continue
task.status = "processing"

# The task has left the queue (claimed for processing). Keep the
# `queued_requests` index in sync with `ml_tasks`; otherwise it
# accumulates every completed/failed task forever and badly
# inflates queue_num / queue_time.
self.redis_client.zrem("queued_requests", task.task_id)

# Set started_at
task_dict["started_at"] = time.time()

Expand Down Expand Up @@ -734,6 +747,8 @@ def worker_loop(worker_id):
f"Worker {worker_id} cannot process task {task.task_name}, re-queueing..."
)
self.redis_client.rpush("ml_tasks", task_json)
self.redis_client.zadd("queued_requests", {task.task_id: task_dict.get("queued_at", time.time())})
self.redis_client.srem("processing_tasks", task.task_id)

except Exception as e:
logger.error(
Expand Down
Loading