Skip to content

Task Scheduler

The task package provides a simple task scheduler that can be used to run tasks periodically.

Note

This is not a replacement for full task queues such as Celery, taskiq, or APScheduler. It is small, simple, and safe for running tasks in a distributed system.

The key features are:

  • Fast and easy: simple decorators to define and schedule tasks with minimal boilerplate.
  • Interval tasks: run tasks at fixed intervals, locally or across a cluster.
  • Synchronization: control concurrency with distributed primitives (see Synchronization Primitives).
  • Dependency injection: use FastDepends to inject dependencies into tasks.
  • Error handling: errors are caught and logged, so a failing task does not stop the scheduler.

Tasks

The Tasks class is the main entry point to manage tasks. The recommended way to lifecycle it is to register it with a Grelmicro app:

from grelmicro import Grelmicro
from grelmicro.task import Tasks

tasks = Tasks()
micro = Grelmicro(uses=[tasks])

@tasks.interval(seconds=5)
async def cleanup() -> None:
    ...

async with micro:
    ...

Grelmicro.use(item) (or the uses= constructor kwarg) accepts any async context manager and lifecycles it with the app. The caller keeps the reference and uses the manager directly.

Start it standalone using the application lifespan:

from contextlib import asynccontextmanager

from fastapi import FastAPI

from grelmicro.task import Tasks

task = Tasks()


@asynccontextmanager
async def lifespan(app: FastAPI):
    async with task:
        yield


app = FastAPI(lifespan=lifespan)
from contextlib import asynccontextmanager

from faststream import ContextRepo, FastStream
from faststream.redis import RedisBroker

from grelmicro.task import Tasks

task = Tasks()


@asynccontextmanager
async def lifespan(context: ContextRepo):
    async with task:
        yield


broker = RedisBroker()
app = FastStream(broker, lifespan=lifespan)

Interval Task

Use the interval decorator to run a task at a fixed interval:

Note

The interval specifies the waiting time between task executions. Ensure that the task execution duration is considered to meet deadlines effectively.

from grelmicro.task import Tasks

task = Tasks()


@task.interval(seconds=5)
async def my_task():
    print("Hello, World!")
from grelmicro.task import TaskRouter

task = TaskRouter()


@task.interval(seconds=5)
async def my_task():
    print("Hello, World!")

Distributed Lock

Set max_lock_seconds to enable distributed locking: the task runs at most once per interval across all workers. This uses a built-in TaskLock automatically.

from grelmicro.task import Tasks

task = Tasks()


@task.interval(seconds=60, max_lock_seconds=300)
async def cleanup():
    print("Running cleanup...")
Parameter Description
seconds Duration in seconds between each scheduling attempt. Each worker retries every N seconds, but only one executes per interval.
max_lock_seconds Crash protection TTL. Must be >= seconds. If a worker crashes, the lock expires after this duration.
min_lock_seconds Minimum duration to hold the lock after task completion. Prevents re-execution on other nodes too soon. Defaults to seconds.

Leader Gating

Restrict the task to the leader worker with a Leader Election, so only one worker executes it. Setting leader also enables distributed locking, with max_lock_seconds defaulting to seconds * 5:

from grelmicro.sync import LeaderElection
from grelmicro.task import Tasks

leader = LeaderElection("my-service")
task = Tasks()
task.add_task(leader)


@task.interval(seconds=60, leader=leader)
async def cleanup():
    print("Running cleanup...")

Custom Lock Timing

For long-running tasks, customize both max_lock_seconds and min_lock_seconds:

from grelmicro.task import Tasks

task = Tasks()


@task.interval(seconds=60, max_lock_seconds=300, min_lock_seconds=30)
async def long_task():
    print("Running long task...")

Resource Lock

Combine distributed locking with a Lock to synchronize access to a shared resource during task execution. Pass the Lock via the sync parameter:

from grelmicro.sync import Lock
from grelmicro.task import Tasks

task = Tasks()
resource_lock = Lock("shared-resource")


@task.interval(seconds=60, max_lock_seconds=300, sync=resource_lock)
async def cleanup():
    print("Running cleanup...")

How It Works

When the lock is already held, the task skips the execution (logged at DEBUG level) and retries on the next interval.

Node A:  [acquire] → [execute] → [hold for seconds] → [TTL expires]
Node B:  [skip] → ... → [skip] → ... → [acquire] → [execute]

When combining leader gating, distributed locking, and a resource lock, the synchronization primitives are acquired in this order:

Order Primitive Purpose
1 LeaderElection Rejects non-leader workers immediately without acquiring any lock, which avoids unnecessary contention.
2 TaskLock Guarantees at-most-once execution per interval. It is acquired after leadership is confirmed so the TTL window stays short.
3 Lock User-provided lock for shared-resource access. It is acquired last so the resource is held only during actual execution.

Each primitive is only acquired if the previous one succeeded. For example, a non-leader worker is rejected at step 1 and never touches the task lock or resource lock.

Task Router

For bigger applications, use the TaskRouter class to organize tasks across modules:


Then include the TaskRouter into the Tasks or other routers:

from grelmicro.task import Tasks

task = Tasks()
task.include_router(router)

Tip

The TaskRouter follows the same philosophy as the APIRouter in FastAPI or the Router in FastStream.

See Synchronization Primitives for more details.