diff --git a/modelq/app/base.py b/modelq/app/base.py index eca82c4..3f96ff1 100644 --- a/modelq/app/base.py +++ b/modelq/app/base.py @@ -131,6 +131,7 @@ def __init__( sentry_release: Optional[str] = None, sentry_send_default_pii: bool = False, sentry_debug: bool = False, + sentry_ignore_exceptions: Optional[Any] = None, silent: bool = False, **kwargs, ): @@ -164,6 +165,9 @@ def __init__( self.task_ttl = task_ttl or self.TASK_TTL self.inactive_if_worker_boot_fail = inactive_if_worker_boot_fail self.worker_healthy = True # Track worker health status + self.sentry_ignore_exceptions = self._normalize_sentry_ignore_exceptions( + sentry_ignore_exceptions + ) # Silent mode: suppress all modelq logging output if silent: @@ -182,6 +186,7 @@ def __init__( server_name=self.server_id, send_default_pii=sentry_send_default_pii, debug=sentry_debug, + ignore_errors=self.sentry_ignore_exceptions or None, ) if self.sentry_enabled: logger.info("Sentry integration enabled for ModelQ") @@ -189,6 +194,42 @@ def __init__( # Register this server in Redis (with an initial heartbeat) self.register_server() + @staticmethod + def _normalize_sentry_ignore_exceptions(ignore_exceptions: Optional[Any]) -> tuple: + """ + Normalize Sentry ignored exception configuration to a tuple of exception types. + + Accepts a single exception class or an iterable of exception classes. + """ + if ignore_exceptions is None: + return () + + if isinstance(ignore_exceptions, type): + ignore_exceptions = (ignore_exceptions,) + else: + ignore_exceptions = tuple(ignore_exceptions) + + invalid = [ + exc_type + for exc_type in ignore_exceptions + if not isinstance(exc_type, type) or not issubclass(exc_type, BaseException) + ] + if invalid: + raise TypeError( + "sentry_ignore_exceptions must contain exception classes, " + f"got invalid values: {invalid}" + ) + + return ignore_exceptions + + def _should_ignore_sentry_exception(self, exc: Exception) -> bool: + """Return True when an exception should fail normally but not report to Sentry.""" + return ( + exc is not None + and bool(self.sentry_ignore_exceptions) + and isinstance(exc, self.sentry_ignore_exceptions) + ) + def _connect_to_redis( self, host: str, @@ -723,9 +764,16 @@ def worker_loop(worker_id): ) except TaskProcessingError as e: - logger.error( - f"Worker {worker_id} encountered a TaskProcessingError: {e}" - ) + if self._should_ignore_sentry_exception(e.__cause__): + logger.warning( + "Worker %s encountered an ignored Sentry TaskProcessingError: %s", + worker_id, + e, + ) + else: + logger.error( + f"Worker {worker_id} encountered a TaskProcessingError: {e}" + ) if task.payload.get("retries", 0) > 0: new_task_dict = task.to_dict() new_task_dict["payload"] = task.original_payload @@ -926,27 +974,41 @@ def process_task(self, task: Task) -> None: # 2) Webhook (if configured) self.post_error_to_webhook(task, e) - # 3) Sentry (if enabled) - capture with full traceback + # 3) Sentry (if enabled) - capture with full traceback unless ignored if self.sentry_enabled: - import sys - event_id = capture_task_exception( - exc=e, - task_id=task.task_id, - task_name=task.task_name, - payload=task.payload, - worker_id=self.server_id, - additional_context={ - "created_at": task.created_at, - "started_at": task.started_at, - "additional_params": task.additional_params, - }, - exc_info=sys.exc_info(), # Pass full traceback with line numbers - ) - if event_id: - logger.info(f"Error reported to Sentry: {event_id}") + if self._should_ignore_sentry_exception(e): + logger.info( + "Skipping Sentry capture for ignored exception %s in task %s", + type(e).__name__, + task.task_name, + ) + else: + import sys + event_id = capture_task_exception( + exc=e, + task_id=task.task_id, + task_name=task.task_name, + payload=task.payload, + worker_id=self.server_id, + additional_context={ + "created_at": task.created_at, + "started_at": task.started_at, + "additional_params": task.additional_params, + }, + exc_info=sys.exc_info(), # Pass full traceback with line numbers + ) + if event_id: + logger.info(f"Error reported to Sentry: {event_id}") - logger.error(f"Task {task.task_name} failed with error: {e}") - raise TaskProcessingError(task.task_name, str(e)) + if self._should_ignore_sentry_exception(e): + logger.warning( + "Task %s failed with ignored Sentry exception: %s", + task.task_name, + e, + ) + else: + logger.error(f"Task {task.task_name} failed with error: {e}") + raise TaskProcessingError(task.task_name, str(e)) from e finally: self.redis_client.srem("processing_tasks", task.task_id) diff --git a/tests/test_base.py b/tests/test_base.py index a721146..fa07f0c 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -935,6 +935,89 @@ def simple_task(): assert mq.worker_healthy is True +def test_modelq_reports_unignored_task_exceptions_to_sentry(mock_redis, monkeypatch): + """Task errors are reported to Sentry by default when Sentry is enabled.""" + from modelq.app.tasks import Task + from modelq.exceptions import TaskProcessingError + + class ExpectedUserError(Exception): + pass + + captured = [] + + def fake_capture_task_exception(**kwargs): + captured.append(kwargs) + return "event-id" + + monkeypatch.setattr( + "modelq.app.base.capture_task_exception", + fake_capture_task_exception, + ) + + mq = ModelQ(redis_client=mock_redis) + mq.sentry_enabled = True + + @mq.task() + def failing_task(): + raise ExpectedUserError("face not detected") + + task = Task( + task_name="failing_task", + payload={"data": {"args": (), "kwargs": {}}, "timeout": None, "stream": False, "retries": 0}, + task_id="reported-error", + ) + + with pytest.raises(TaskProcessingError): + mq.process_task(task) + + assert len(captured) == 1 + assert isinstance(captured[0]["exc"], ExpectedUserError) + + +def test_modelq_raises_but_does_not_report_ignored_task_exceptions_to_sentry(mock_redis, monkeypatch): + """Ignored task exceptions still fail the task but are not captured by Sentry.""" + from modelq.app.tasks import Task + from modelq.exceptions import TaskProcessingError + + class ExpectedUserError(Exception): + pass + + captured = [] + + def fake_capture_task_exception(**kwargs): + captured.append(kwargs) + return "event-id" + + monkeypatch.setattr( + "modelq.app.base.capture_task_exception", + fake_capture_task_exception, + ) + + mq = ModelQ( + redis_client=mock_redis, + sentry_ignore_exceptions=[ExpectedUserError], + ) + mq.sentry_enabled = True + + @mq.task() + def ignored_failing_task(): + raise ExpectedUserError("face not detected") + + task = Task( + task_name="ignored_failing_task", + payload={"data": {"args": (), "kwargs": {}}, "timeout": None, "stream": False, "retries": 0}, + task_id="ignored-error", + ) + + with pytest.raises(TaskProcessingError): + mq.process_task(task) + + assert captured == [] + stored = _json_bytes_to_dict(mock_redis.get("task:ignored-error")) + assert stored["status"] == "failed" + assert stored["error"]["type"] == "ExpectedUserError" + + def test_worker_health_task_pickup_blocked(mock_redis): """Test that unhealthy workers don't pick up tasks.""" from modelq.app.middleware import Middleware