Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions docs/available-components/schedule-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ scheduler = TaskiqScheduler(broker, sources=[redis_source])

For more information on how to use dynamic schedule sources read [Dynamic scheduling section](../guide/scheduling-tasks.md#dynamic-scheduling).


### LabelScheduleSource

This source parses labels of tasks, and if it finds a `schedule` label, it considers this task as scheduled.
Expand All @@ -37,9 +36,9 @@ The format of the schedule label is the following:
@broker.task(
schedule=[
{
"cron": "* * * * *", # type: str, either cron, interval or time should be specified.
"cron": "* * * * *", # type: str | None, either cron, interval or time should be specified.
"cron_offset": None, # type: str | timedelta | None, can be omitted.
"interval": None, # type: int | timedelta, either cron, interval or time should be specified.
"interval": None, # type: int | timedelta | None, either cron, interval or time should be specified.
"time": None, # type: datetime | None, either cron, interval or time should be specified.
"args": [], # type List[Any] | None, can be omitted.
"kwargs": {}, # type: Dict[str, Any] | None, can be omitted.
Expand Down
118 changes: 107 additions & 11 deletions docs/guide/testing-taskiq.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Let's dive into examples.
## Preparations

### Environment setup

For testing you maybe don't want to use actual distributed broker. But still you want to validate your logic.
Since python is an interpreted language, you can easily replace you broker with another one if the expression is correct.

Expand All @@ -21,7 +22,6 @@ We can set an environment variable, that indicates that currently we're running

@tab linux|macos


```bash
export ENVIRONMENT="pytest"
pytest -vv
Expand All @@ -36,7 +36,6 @@ pytest -vv

:::


Or we can even tell pytest to set this environment for us, just before executing tests using [pytest-env](https://pypi.org/project/pytest-env/) plugin.

::: tabs
Expand Down Expand Up @@ -155,8 +154,8 @@ async def parse_int_later(val: str) -> int:
To test this function, we can do two things:

1. By setting the `await_inplace=True` parameter when creating the broker.
In that case all tasks will be automatically awaited as soon as they are called.
In such a way you don't need to manually call the `wait_result` in your code.
In that case all tasks will be automatically awaited as soon as they are called.
In such a way you don't need to manually call the `wait_result` in your code.

To set it up, define the broker as the following:

Expand All @@ -168,8 +167,8 @@ With this setup all `await function.kiq()` calls will behave similarly to `await
with dependency injection and all taskiq-related functionality.

2. Alternatively, you can manually await all tasks after invoking the
target function by using the `wait_all` method.
This gives you more control over when to wait for tasks to complete.
target function by using the `wait_all` method.
This gives you more control over when to wait for tasks to complete.

```python
from your_project.tkq import broker
Expand All @@ -185,11 +184,21 @@ async def test_add_one():

## Dependency injection

If you use dependencies in your tasks, you may think that this can become a problem. But it's not.
Here's what we came up with. We added a method called `add_dependency_context` to the broker.
It sets base dependencies for dependency resolution. You can use it for tests.
If you use dependencies in your tasks, you may think that this can become a problem for tests. But it's not.
Taskiq provides a way to set both first-level and local dependencies for dependency_resolution.

### First-level dependencies

If your dependency does not have a function -> this is what we can call first-level dependency.

In order to override first-level dependency, you use the same method as to add it.

Use a method called `add_dependency_context` on the broker.
It sets first-level dependencies for dependency resolution based on the type.

Let's add a task that depends on `Path`. I guess this example is not meant to be used in production code bases, but it's suitable for illustration purposes.
---

Let's add a task that depends on `Path`.

::: tabs

Expand Down Expand Up @@ -278,4 +287,91 @@ async def test_modify_path():

```

This should pass. And that's it for now.
This should pass.

### Dependencies

To override dependencies that depend on some function you can use `broker.dependency_overrides` attribute.

For example you have a task that depends on function `get_async_session` that yields `AsyncSession`

::: tabs

@tab Annotated 3.10+

```python
from collections.abc import AsyncGenerator
from typing import Annotated

from pathlib import Path
from taskiq import TaskiqDepends

from your_project.database import AsyncSession
from your_project.tkq import broker

async def get_async_session(context: Annotated[Context, TaskiqDepends()]) -> AsyncGenerator[AsyncSession]:
session = context.state.async_session

try:
yield session
except:
await session.rollback()
raise
else:
await session.commit()

@broker.task
async def do_some_work(session: Annotated[AsyncSession, TaskiqDepends(get_async_session)]) -> None:
result = await session.execute(select())
# ...
```

@tab default values

```python
from pathlib import Path
from taskiq import TaskiqDepends

from your_project.database import AsyncSession
from your_project.tkq import broker

async def get_async_session(context: Context = TaskiqDepends()]) -> AsyncGenerator[AsyncSession]:
session = context.state.async_session

try:
yield session
except:
await session.rollback()
raise
else:
await session.commit()


@broker.task
async def do_some_work(session: AsyncSession = TaskiqDepends(get_async_session)):
result = await session.execute(select())
# ...
```

:::

```python
import pytest

from your_project.database import AsyncSession
from your_project.tqk import broker

# We use autouse, so this fixture
# is called automatically before all tests.
@pytest.fixture(autouse=True)
def patch_broker_dependencies(
session: AsyncSession
):
# Here we override function get_async_session
broker.dependency_overrides[get_async_session] = lambda: session

yield

# Here we clear the override (for something like session it is good idea)
broker.dependency_overrides.pop(get_async_session)
```