Skip to content

Task Scheduler

The task package provides a simple task scheduler that can be used to run tasks periodically.

Note

This is not a replacement for bigger tools like Celery, taskiq, or APScheduler. It is just lightweight, easy to use, and safe for running tasks in a distributed system with synchronization.

The key features are:

  • Fast & Easy: Simple decorators to define and schedule tasks effortlessly.
  • Interval Task: Run tasks at specified intervals, locally or distributed.
  • Synchronization: Control concurrency using distributed primitives (see Synchronization Primitives).
  • Dependency Injection: Use FastDepends to inject dependencies into tasks.
  • Error Handling: Catches and logs errors so that task failures do not stop the scheduler.

Task Manager

The TaskManager class is the main entry point to manage tasks. Start it using the application lifespan:

from contextlib import asynccontextmanager

from fastapi import FastAPI

from grelmicro.task import TaskManager

task = TaskManager()


@asynccontextmanager
async def lifespan(app: FastAPI):
    async with task:
        yield


app = FastAPI(lifespan=lifespan)
from contextlib import asynccontextmanager

from faststream import ContextRepo, FastStream
from faststream.redis import RedisBroker

from grelmicro.task import TaskManager

task = TaskManager()


@asynccontextmanager
async def lifespan(context: ContextRepo):
    async with task:
        yield


broker = RedisBroker()
app = FastStream(broker, lifespan=lifespan)

Interval Task

Use the interval decorator to run a task at a fixed interval:

Note

The interval specifies the waiting time between task executions. Ensure that the task execution duration is considered to meet deadlines effectively.

from grelmicro.task import TaskManager

task = TaskManager()


@task.interval(seconds=5)
async def my_task():
    print("Hello, World!")
from grelmicro.task import TaskRouter

task = TaskRouter()


@task.interval(seconds=5)
async def my_task():
    print("Hello, World!")

Distributed Lock

Set max_lock_seconds to enable distributed locking: the task runs at most once per interval across all workers. This uses a built-in TaskLock automatically.

from grelmicro.task import TaskManager

task = TaskManager()


@task.interval(seconds=60, max_lock_seconds=300)
async def cleanup():
    print("Running cleanup...")
Parameter Description
seconds Duration in seconds between each scheduling attempt. Each worker retries every N seconds, but only one executes per interval.
max_lock_seconds Crash protection TTL. Must be >= seconds. If a worker crashes, the lock expires after this duration.
min_lock_seconds Minimum duration to hold the lock after task completion. Prevents re-execution on other nodes too soon. Defaults to seconds.

Leader Gating

Gate the task behind a Leader Election so only the leader worker executes it. Setting leader implies distributed locking (with max_lock_seconds defaulting to seconds * 5):

from grelmicro.sync import LeaderElection
from grelmicro.task import TaskManager

leader = LeaderElection("my-service")
task = TaskManager()
task.add_task(leader)


@task.interval(seconds=60, leader=leader)
async def cleanup():
    print("Running cleanup...")

Custom Lock Timing

For long-running tasks, customize both max_lock_seconds and min_lock_seconds:

from grelmicro.task import TaskManager

task = TaskManager()


@task.interval(seconds=60, max_lock_seconds=300, min_lock_seconds=30)
async def long_task():
    print("Running long task...")

Resource Lock

Combine distributed locking with a Lock to synchronize access to a shared resource during task execution. Pass the Lock via the sync parameter:

from grelmicro.sync import Lock
from grelmicro.task import TaskManager

task = TaskManager()
resource_lock = Lock("shared-resource")


@task.interval(seconds=60, max_lock_seconds=300, sync=resource_lock)
async def cleanup():
    print("Running cleanup...")

How It Works

When the lock is already held, the task skips the execution (logged at DEBUG level) and retries on the next interval.

Node A:  [acquire] → [execute] → [hold for seconds] → [TTL expires]
Node B:  [skip] → ... → [skip] → ... → [acquire] → [execute]

When combining leader gating, distributed locking, and a resource lock, the synchronization primitives are acquired in this order:

Order Primitive Purpose
1 LeaderElection Instantly rejects non-leader workers without touching any lock, avoiding unnecessary contention.
2 TaskLock Guarantees at-most-once execution per interval. Acquired after leadership is confirmed to keep the TTL window tight.
3 Lock User-provided lock for shared-resource access. Acquired last so the resource is held only during actual execution.

Each primitive is only acquired if the previous one succeeded. For example, a non-leader worker is rejected at step 1 and never touches the task lock or resource lock.

Task Router

For bigger applications, use the TaskRouter class to organize tasks across modules:


Then include the TaskRouter into the TaskManager or other routers:

from grelmicro.task.manager import TaskManager

task = TaskManager()
task.include_router(router)

Tip

The TaskRouter follows the same philosophy as the APIRouter in FastAPI or the Router in FastStream.

Deprecated APIs

Scheduled Task

Deprecated

The scheduled() decorator is deprecated. Use interval() with max_lock_seconds or leader instead.

The scheduled() decorator still works but emits a DeprecationWarning. It is equivalent to interval(seconds=N, max_lock_seconds=N*5, min_lock_seconds=N).

sync with TaskLock or LeaderElection

Deprecated

Using sync with TaskLock or LeaderElection is deprecated. Use max_lock_seconds and leader parameters instead.

See Synchronization Primitives for more details.