Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions docs/examples/router/multiple_brokers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Route one task through several brokers with a shared router."""

import asyncio

from taskiq import Flow, InMemoryBroker, TaskiqRoute, TaskiqRouter

router = TaskiqRouter()

default_email_flow = Flow("emails.default")
priority_email_flow = Flow("emails.priority")
bulk_email_flow = Flow("emails.bulk")

default_broker = InMemoryBroker(
router=router,
broker_name="default",
default_flow=default_email_flow,
await_inplace=True,
)
priority_broker = InMemoryBroker(
router=router,
broker_name="priority",
default_flow=priority_email_flow,
await_inplace=True,
)


@default_broker.task(task_name="examples.send_email", domain="notifications")
async def send_email(user_id: int, template: str) -> str:
"""Pretend to render and send an email."""
return f"{template} email sent to user {user_id}"


priority_route = router.route_task(
send_email,
broker=priority_broker,
flow=priority_email_flow,
)
priority_subscription = router.subscribe(
priority_broker,
priority_email_flow,
send_email,
)


def _format_route(task_name: str, route: TaskiqRoute) -> str:
"""Return a readable route diagnostic for the example output."""
flow_name = route.flow.name if route.flow is not None else "<default>"
return f"{task_name} -> broker={route.broker_name}, flow={flow_name}"


def _format_listen_plan() -> str:
"""Return flows that the priority broker should subscribe to."""
flow_names = ", ".join(flow.name for flow in priority_broker.get_subscribed_flows())
return f"priority listens to: {flow_names}"


async def _main() -> None:
await default_broker.startup()
await priority_broker.startup()
try:
direct_result = await send_email(7, "welcome")

declared_route = router.resolve_route(send_email)
assert declared_route == priority_route

routed_task = (
await send_email.kicker()
.with_route(declared_route)
.kiq(
7,
"welcome",
)
)
routed_result = await routed_task.wait_result(timeout=2)

bulk_route = router.resolve_route(
send_email,
broker=default_broker,
flow=bulk_email_flow,
)
bulk_task = await send_email.kicker().with_route(bulk_route).kiq(8, "digest")
bulk_result = await bulk_task.wait_result(timeout=2)

print(f"Direct call: {direct_result}")
print(f"Router rule: {_format_route(send_email.task_name, priority_route)}")
print(f"Subscription tasks: {sorted(priority_subscription.task_names)}")
print(_format_listen_plan())
print(f"Resolved route: {_format_route(send_email.task_name, declared_route)}")
print(f"Routed call: {routed_result.return_value}")
print(f"Override route: {_format_route(send_email.task_name, bulk_route)}")
print(f"Route override: {bulk_result.return_value}")
finally:
await priority_broker.shutdown()
await default_broker.shutdown()


if __name__ == "__main__":
asyncio.run(_main())
100 changes: 100 additions & 0 deletions docs/examples/router/shared_task_package.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Declare shared task definitions and bind them in the final application."""

import asyncio
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any

from taskiq import (
AsyncTaskiqDecoratedTask,
Flow,
InMemoryBroker,
TaskiqRouter,
task_builder,
)


@dataclass(frozen=True, slots=True)
class BillingQueue:
"""Broker-specific flow that follows the shared flow protocol."""

name: str
priority: int

def broker_options(self) -> Mapping[str, object]:
"""Return options that a billing broker adapter can understand."""
return {
"priority": self.priority,
}


class BillingTask(AsyncTaskiqDecoratedTask[Any, Any]):
"""Custom task class shared by billing package tasks."""

def billing_name(self) -> str:
"""Return a billing-specific task name."""
return self.task_name


@task_builder("billing.calculate_total", base_cls=BillingTask, domain="billing")
async def calculate_total(price: int, quantity: int) -> int:
"""Package-level task definition that is not bound to any broker."""
return price * quantity


router = TaskiqRouter()
billing_flow = Flow("billing.tasks")
priority_billing_flow = BillingQueue(name="billing.priority", priority=10)

billing_broker = InMemoryBroker(
router=router,
broker_name="billing",
default_flow=billing_flow,
await_inplace=True,
)

registered_calculate_total = router.register_task(
calculate_total,
broker=billing_broker,
flow=billing_flow,
)
router.subscribe(
billing_broker,
billing_flow,
registered_calculate_total,
)


async def _main() -> None:
await billing_broker.startup()
try:
direct_result = await calculate_total.call(19, 3)

priority_route = router.resolve_route(
registered_calculate_total,
broker=billing_broker,
flow=priority_billing_flow,
)
prepared_task = (
registered_calculate_total.kicker()
.with_route(
priority_route,
)
.prepare(19, 3)
)

queued_task = await prepared_task.kiq()
queued_result = await queued_task.wait_result(timeout=2)

print(f"Shared task direct call: {direct_result}")
print(f"Registered task class: {registered_calculate_total.billing_name()}")
listen_flow = billing_broker.get_subscribed_flows()[0]
print(f"Registered listen flow: {listen_flow.name}")
print(f"Prepared message: {prepared_task.message.task_name}")
print(f"Registered queued call: {queued_result.return_value}")
finally:
await billing_broker.shutdown()


if __name__ == "__main__":
asyncio.run(_main())
22 changes: 22 additions & 0 deletions docs/guide/architecture-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,28 @@ asyncio.run(main())

```

## Router and flows

Taskiq can use a `TaskiqRouter` to keep routing rules outside of broker
implementations. Brokers remain transport adapters, while the router owns task
registration, route resolution and flow subscriptions.
This section describes the `experiment/separate_broker` branch contract. The
old `@broker.task(...)`, `.kiq()`, labels, scheduler and result backend behavior
remain compatible, while router/flow APIs are additive review material for the
branch.

`Flow` is a transport-neutral delivery address. Broker packages may provide
their own flow classes for queue, topic, subject or stream options, as long as
they implement the same flow protocol. The router deduplicates subscriptions by
flow name and rejects same-name flows with incompatible broker options.

Routing and subscribing are separate responsibilities. `route_task(...)`
chooses the outbound broker and flow for task invocations. `subscribe(...)`
adds flows to a broker listen plan for flow-aware broker adapters. Worker task
lookup still uses `task_name`; flow does not select the Python task.

Read more in the [Routing and flows](./routing-and-flows.md) section.

## Messages

Every message has labels. You can define labels
Expand Down
Loading
Loading