Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 84 additions & 22 deletions modelq/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def __init__(
sentry_release: Optional[str] = None,
sentry_send_default_pii: bool = False,
sentry_debug: bool = False,
sentry_ignore_exceptions: Optional[Any] = None,
silent: bool = False,
**kwargs,
):
Expand Down Expand Up @@ -164,6 +165,9 @@ def __init__(
self.task_ttl = task_ttl or self.TASK_TTL
self.inactive_if_worker_boot_fail = inactive_if_worker_boot_fail
self.worker_healthy = True # Track worker health status
self.sentry_ignore_exceptions = self._normalize_sentry_ignore_exceptions(
sentry_ignore_exceptions
)

# Silent mode: suppress all modelq logging output
if silent:
Expand All @@ -182,13 +186,50 @@ def __init__(
server_name=self.server_id,
send_default_pii=sentry_send_default_pii,
debug=sentry_debug,
ignore_errors=self.sentry_ignore_exceptions or None,
)
if self.sentry_enabled:
logger.info("Sentry integration enabled for ModelQ")

# Register this server in Redis (with an initial heartbeat)
self.register_server()

@staticmethod
def _normalize_sentry_ignore_exceptions(ignore_exceptions: Optional[Any]) -> tuple:
"""
Normalize Sentry ignored exception configuration to a tuple of exception types.

Accepts a single exception class or an iterable of exception classes.
"""
if ignore_exceptions is None:
return ()

if isinstance(ignore_exceptions, type):
ignore_exceptions = (ignore_exceptions,)
else:
ignore_exceptions = tuple(ignore_exceptions)

invalid = [
exc_type
for exc_type in ignore_exceptions
if not isinstance(exc_type, type) or not issubclass(exc_type, BaseException)
]
if invalid:
raise TypeError(
"sentry_ignore_exceptions must contain exception classes, "
f"got invalid values: {invalid}"
)

return ignore_exceptions

def _should_ignore_sentry_exception(self, exc: Exception) -> bool:
"""Return True when an exception should fail normally but not report to Sentry."""
return (
exc is not None
and bool(self.sentry_ignore_exceptions)
and isinstance(exc, self.sentry_ignore_exceptions)
)

def _connect_to_redis(
self,
host: str,
Expand Down Expand Up @@ -723,9 +764,16 @@ def worker_loop(worker_id):
)

except TaskProcessingError as e:
logger.error(
f"Worker {worker_id} encountered a TaskProcessingError: {e}"
)
if self._should_ignore_sentry_exception(e.__cause__):
logger.warning(
"Worker %s encountered an ignored Sentry TaskProcessingError: %s",
worker_id,
e,
)
else:
logger.error(
f"Worker {worker_id} encountered a TaskProcessingError: {e}"
)
if task.payload.get("retries", 0) > 0:
new_task_dict = task.to_dict()
new_task_dict["payload"] = task.original_payload
Expand Down Expand Up @@ -926,27 +974,41 @@ def process_task(self, task: Task) -> None:
# 2) Webhook (if configured)
self.post_error_to_webhook(task, e)

# 3) Sentry (if enabled) - capture with full traceback
# 3) Sentry (if enabled) - capture with full traceback unless ignored
if self.sentry_enabled:
import sys
event_id = capture_task_exception(
exc=e,
task_id=task.task_id,
task_name=task.task_name,
payload=task.payload,
worker_id=self.server_id,
additional_context={
"created_at": task.created_at,
"started_at": task.started_at,
"additional_params": task.additional_params,
},
exc_info=sys.exc_info(), # Pass full traceback with line numbers
)
if event_id:
logger.info(f"Error reported to Sentry: {event_id}")
if self._should_ignore_sentry_exception(e):
logger.info(
"Skipping Sentry capture for ignored exception %s in task %s",
type(e).__name__,
task.task_name,
)
else:
import sys
event_id = capture_task_exception(
exc=e,
task_id=task.task_id,
task_name=task.task_name,
payload=task.payload,
worker_id=self.server_id,
additional_context={
"created_at": task.created_at,
"started_at": task.started_at,
"additional_params": task.additional_params,
},
exc_info=sys.exc_info(), # Pass full traceback with line numbers
)
if event_id:
logger.info(f"Error reported to Sentry: {event_id}")

logger.error(f"Task {task.task_name} failed with error: {e}")
raise TaskProcessingError(task.task_name, str(e))
if self._should_ignore_sentry_exception(e):
logger.warning(
"Task %s failed with ignored Sentry exception: %s",
task.task_name,
e,
)
else:
logger.error(f"Task {task.task_name} failed with error: {e}")
raise TaskProcessingError(task.task_name, str(e)) from e

finally:
self.redis_client.srem("processing_tasks", task.task_id)
Expand Down
83 changes: 83 additions & 0 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,89 @@ def simple_task():
assert mq.worker_healthy is True


def test_modelq_reports_unignored_task_exceptions_to_sentry(mock_redis, monkeypatch):
"""Task errors are reported to Sentry by default when Sentry is enabled."""
from modelq.app.tasks import Task
from modelq.exceptions import TaskProcessingError

class ExpectedUserError(Exception):
pass

captured = []

def fake_capture_task_exception(**kwargs):
captured.append(kwargs)
return "event-id"

monkeypatch.setattr(
"modelq.app.base.capture_task_exception",
fake_capture_task_exception,
)

mq = ModelQ(redis_client=mock_redis)
mq.sentry_enabled = True

@mq.task()
def failing_task():
raise ExpectedUserError("face not detected")

task = Task(
task_name="failing_task",
payload={"data": {"args": (), "kwargs": {}}, "timeout": None, "stream": False, "retries": 0},
task_id="reported-error",
)

with pytest.raises(TaskProcessingError):
mq.process_task(task)

assert len(captured) == 1
assert isinstance(captured[0]["exc"], ExpectedUserError)


def test_modelq_raises_but_does_not_report_ignored_task_exceptions_to_sentry(mock_redis, monkeypatch):
"""Ignored task exceptions still fail the task but are not captured by Sentry."""
from modelq.app.tasks import Task
from modelq.exceptions import TaskProcessingError

class ExpectedUserError(Exception):
pass

captured = []

def fake_capture_task_exception(**kwargs):
captured.append(kwargs)
return "event-id"

monkeypatch.setattr(
"modelq.app.base.capture_task_exception",
fake_capture_task_exception,
)

mq = ModelQ(
redis_client=mock_redis,
sentry_ignore_exceptions=[ExpectedUserError],
)
mq.sentry_enabled = True

@mq.task()
def ignored_failing_task():
raise ExpectedUserError("face not detected")

task = Task(
task_name="ignored_failing_task",
payload={"data": {"args": (), "kwargs": {}}, "timeout": None, "stream": False, "retries": 0},
task_id="ignored-error",
)

with pytest.raises(TaskProcessingError):
mq.process_task(task)

assert captured == []
stored = _json_bytes_to_dict(mock_redis.get("task:ignored-error"))
assert stored["status"] == "failed"
assert stored["error"]["type"] == "ExpectedUserError"


def test_worker_health_task_pickup_blocked(mock_redis):
"""Test that unhealthy workers don't pick up tasks."""
from modelq.app.middleware import Middleware
Expand Down
Loading