From 5f3c35a3beaef4914527057f595c64ba59a1b20d Mon Sep 17 00:00:00 2001 From: apelsynca Date: Thu, 4 Jun 2026 07:42:54 +0300 Subject: [PATCH 1/3] docs: testing with using dependency_overrides --- docs/guide/testing-taskiq.md | 114 +++++++++++++++++++++++++++++++---- 1 file changed, 103 insertions(+), 11 deletions(-) diff --git a/docs/guide/testing-taskiq.md b/docs/guide/testing-taskiq.md index b132c926..42f11f35 100644 --- a/docs/guide/testing-taskiq.md +++ b/docs/guide/testing-taskiq.md @@ -12,6 +12,7 @@ Let's dive into examples. ## Preparations ### Environment setup + For testing you maybe don't want to use actual distributed broker. But still you want to validate your logic. Since python is an interpreted language, you can easily replace you broker with another one if the expression is correct. @@ -21,7 +22,6 @@ We can set an environment variable, that indicates that currently we're running @tab linux|macos - ```bash export ENVIRONMENT="pytest" pytest -vv @@ -36,7 +36,6 @@ pytest -vv ::: - Or we can even tell pytest to set this environment for us, just before executing tests using [pytest-env](https://pypi.org/project/pytest-env/) plugin. ::: tabs @@ -155,8 +154,8 @@ async def parse_int_later(val: str) -> int: To test this function, we can do two things: 1. By setting the `await_inplace=True` parameter when creating the broker. -In that case all tasks will be automatically awaited as soon as they are called. -In such a way you don't need to manually call the `wait_result` in your code. + In that case all tasks will be automatically awaited as soon as they are called. + In such a way you don't need to manually call the `wait_result` in your code. To set it up, define the broker as the following: @@ -168,8 +167,8 @@ With this setup all `await function.kiq()` calls will behave similarly to `await with dependency injection and all taskiq-related functionality. 2. Alternatively, you can manually await all tasks after invoking the -target function by using the `wait_all` method. -This gives you more control over when to wait for tasks to complete. + target function by using the `wait_all` method. + This gives you more control over when to wait for tasks to complete. ```python from your_project.tkq import broker @@ -185,11 +184,21 @@ async def test_add_one(): ## Dependency injection -If you use dependencies in your tasks, you may think that this can become a problem. But it's not. -Here's what we came up with. We added a method called `add_dependency_context` to the broker. -It sets base dependencies for dependency resolution. You can use it for tests. +If you use dependencies in your tasks, you may think that this can become a problem for tests. But it's not. +Taskiq provides a way to set both first-level and local dependencies for dependency_resolution. + +### First-level dependencies + +If your dependency does not have a function -> this is what we can call first-level dependency. + +In order to override first-level dependency, you use the same method as to add it. + +Use a method called `add_dependency_context` on the broker. +It sets first-level dependencies for dependency resolution based on the type. -Let's add a task that depends on `Path`. I guess this example is not meant to be used in production code bases, but it's suitable for illustration purposes. +--- + +Let's add a task that depends on `Path`. ::: tabs @@ -278,4 +287,87 @@ async def test_modify_path(): ``` -This should pass. And that's it for now. +This should pass. + +### Dependencies + +To override dependencies that depend on some function you can use `broker.dependency_overrides` attribute. + +For example you have a task that depends on function `get_async_session` that yields `AsyncSession` + +::: tabs + +@tab Annotated 3.10+ + +```python +from collections.abc import AsyncGenerator +from typing import Annotated + +from pathlib import Path +from taskiq import TaskiqDepends + +from your_project.database import AsyncSession +from your_project.tkq import broker + +async def get_async_session(context: Annotated[Context, TaskiqDepends()]) -> AsyncGenerator[AsyncSession]: + session = context.state.async_session + + try: + yield session + except: + await session.rollback() + raise + else: + await session.commit() + +@broker.task +async def do_some_work(session: Annotated[AsyncSession, TaskiqDepends(get_async_session)]) -> None: + result = await session.execute(select()) + # ... +``` + +@tab default values + +```python +from pathlib import Path +from taskiq import TaskiqDepends + +from your_project.database import AsyncSession +from your_project.tkq import broker + +async def get_async_session(context: Context = TaskiqDepends()]) -> AsyncGenerator[AsyncSession]: + session = context.state.async_session + + try: + yield session + except: + await session.rollback() + raise + else: + await session.commit() + + +@broker.task +async def do_some_work(session: AsyncSession = TaskiqDepends(get_async_session)): + result = await session.execute(select()) + # ... +``` + +::: + +```python +import pytest + +from your_project.database import AsyncSession +from your_project.tqk import broker + +@pytest.fixture(autouse=True) +def patch_broker_dependencies( + session: AsyncSession +): + broker.dependency_overrides[get_async_session] = lambda: session + + yield + + broker.dependency_overrides.pop(get_async_session) +``` From a1d000964eb43c86f91aca07a2858607ac3dc952 Mon Sep 17 00:00:00 2001 From: apelsynca Date: Thu, 4 Jun 2026 08:01:16 +0300 Subject: [PATCH 2/3] docs: added comments in the example --- docs/guide/testing-taskiq.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/guide/testing-taskiq.md b/docs/guide/testing-taskiq.md index 42f11f35..14ae47fe 100644 --- a/docs/guide/testing-taskiq.md +++ b/docs/guide/testing-taskiq.md @@ -361,13 +361,17 @@ import pytest from your_project.database import AsyncSession from your_project.tqk import broker +# We use autouse, so this fixture +# is called automatically before all tests. @pytest.fixture(autouse=True) def patch_broker_dependencies( session: AsyncSession ): + # Here we override function get_async_session broker.dependency_overrides[get_async_session] = lambda: session yield + # Here we clear the override (for something like session it is good idea) broker.dependency_overrides.pop(get_async_session) ``` From 6925f5d8888154c02f673af645aafa7359d5f13b Mon Sep 17 00:00:00 2001 From: apelsynca Date: Sat, 6 Jun 2026 07:55:52 +0300 Subject: [PATCH 3/3] docs: added None type to cron, interval in label schedule sources --- docs/available-components/schedule-sources.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/available-components/schedule-sources.md b/docs/available-components/schedule-sources.md index 04ef3346..b525573b 100644 --- a/docs/available-components/schedule-sources.md +++ b/docs/available-components/schedule-sources.md @@ -26,7 +26,6 @@ scheduler = TaskiqScheduler(broker, sources=[redis_source]) For more information on how to use dynamic schedule sources read [Dynamic scheduling section](../guide/scheduling-tasks.md#dynamic-scheduling). - ### LabelScheduleSource This source parses labels of tasks, and if it finds a `schedule` label, it considers this task as scheduled. @@ -37,9 +36,9 @@ The format of the schedule label is the following: @broker.task( schedule=[ { - "cron": "* * * * *", # type: str, either cron, interval or time should be specified. + "cron": "* * * * *", # type: str | None, either cron, interval or time should be specified. "cron_offset": None, # type: str | timedelta | None, can be omitted. - "interval": None, # type: int | timedelta, either cron, interval or time should be specified. + "interval": None, # type: int | timedelta | None, either cron, interval or time should be specified. "time": None, # type: datetime | None, either cron, interval or time should be specified. "args": [], # type List[Any] | None, can be omitted. "kwargs": {}, # type: Dict[str, Any] | None, can be omitted.