Task Scheduler
The task package provides a simple task scheduler that can be used to run tasks periodically.
Note
This is not a replacement for full task queues such as Celery, taskiq, or APScheduler. It is small, simple, and safe for running tasks in a distributed system.
The key features are:
- Fast and easy: simple decorators to define and schedule tasks with minimal boilerplate.
- Interval tasks: run tasks at fixed intervals, locally or across a cluster.
- Synchronization: control concurrency with distributed primitives (see Synchronization Primitives).
- Dependency injection: use FastDepends to inject dependencies into tasks.
- Error handling: errors are caught and logged, so a failing task does not stop the scheduler.
Tasks
The Tasks class is the main entry point to manage tasks. The recommended way to lifecycle it is to register it with a Grelmicro app:
from grelmicro import Grelmicro
from grelmicro.task import Tasks
tasks = Tasks()
micro = Grelmicro(uses=[tasks])
@tasks.interval(seconds=5)
async def cleanup() -> None:
...
async with micro:
...
Grelmicro.use(item) (or the uses= constructor kwarg) accepts any async context manager and lifecycles it with the app. The caller keeps the reference and uses the manager directly.
Start it standalone using the application lifespan:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from grelmicro.task import Tasks
task = Tasks()
@asynccontextmanager
async def lifespan(app: FastAPI):
async with task:
yield
app = FastAPI(lifespan=lifespan)
from contextlib import asynccontextmanager
from faststream import ContextRepo, FastStream
from faststream.redis import RedisBroker
from grelmicro.task import Tasks
task = Tasks()
@asynccontextmanager
async def lifespan(context: ContextRepo):
async with task:
yield
broker = RedisBroker()
app = FastStream(broker, lifespan=lifespan)
Interval Task
Use the interval decorator to run a task at a fixed interval:
Note
The interval specifies the waiting time between task executions. Ensure that the task execution duration is considered to meet deadlines effectively.
from grelmicro.task import Tasks
task = Tasks()
@task.interval(seconds=5)
async def my_task():
print("Hello, World!")
from grelmicro.task import TaskRouter
task = TaskRouter()
@task.interval(seconds=5)
async def my_task():
print("Hello, World!")
Distributed Lock
Set max_lock_seconds to enable distributed locking: the task runs at most once per interval across all workers. This uses a built-in TaskLock automatically.
from grelmicro.task import Tasks
task = Tasks()
@task.interval(seconds=60, max_lock_seconds=300)
async def cleanup():
print("Running cleanup...")
| Parameter | Description |
|---|---|
seconds |
Duration in seconds between each scheduling attempt. Each worker retries every N seconds, but only one executes per interval. |
max_lock_seconds |
Crash protection TTL. Must be >= seconds. If a worker crashes, the lock expires after this duration. |
min_lock_seconds |
Minimum duration to hold the lock after task completion. Prevents re-execution on other nodes too soon. Defaults to seconds. |
Leader Gating
Restrict the task to the leader worker with a Leader Election, so only one worker executes it. Setting leader also enables distributed locking, with max_lock_seconds defaulting to seconds * 5:
from grelmicro.sync import LeaderElection
from grelmicro.task import Tasks
leader = LeaderElection("my-service")
task = Tasks()
task.add_task(leader)
@task.interval(seconds=60, leader=leader)
async def cleanup():
print("Running cleanup...")
Custom Lock Timing
For long-running tasks, customize both max_lock_seconds and min_lock_seconds:
from grelmicro.task import Tasks
task = Tasks()
@task.interval(seconds=60, max_lock_seconds=300, min_lock_seconds=30)
async def long_task():
print("Running long task...")
Resource Lock
Combine distributed locking with a Lock to synchronize access to a shared resource during task execution. Pass the Lock via the sync parameter:
from grelmicro.sync import Lock
from grelmicro.task import Tasks
task = Tasks()
resource_lock = Lock("shared-resource")
@task.interval(seconds=60, max_lock_seconds=300, sync=resource_lock)
async def cleanup():
print("Running cleanup...")
How It Works
When the lock is already held, the task skips the execution (logged at DEBUG level) and retries on the next interval.
Node A: [acquire] → [execute] → [hold for seconds] → [TTL expires]
Node B: [skip] → ... → [skip] → ... → [acquire] → [execute]
When combining leader gating, distributed locking, and a resource lock, the synchronization primitives are acquired in this order:
| Order | Primitive | Purpose |
|---|---|---|
| 1 | LeaderElection |
Rejects non-leader workers immediately without acquiring any lock, which avoids unnecessary contention. |
| 2 | TaskLock |
Guarantees at-most-once execution per interval. It is acquired after leadership is confirmed so the TTL window stays short. |
| 3 | Lock |
User-provided lock for shared-resource access. It is acquired last so the resource is held only during actual execution. |
Each primitive is only acquired if the previous one succeeded. For example, a non-leader worker is rejected at step 1 and never touches the task lock or resource lock.
Task Router
For bigger applications, use the TaskRouter class to organize tasks across modules:
Then include the TaskRouter into the Tasks or other routers:
from grelmicro.task import Tasks
task = Tasks()
task.include_router(router)
Tip
The TaskRouter follows the same philosophy as the APIRouter in FastAPI or the Router in FastStream.
See Synchronization Primitives for more details.