Workflow scheduling
Automatically run workflows at specified times without manual triggering.
How it works
How it works
Workers automatically register workflow schedules with Workflows during startup. The system handles:
- Schedule registration with Workflows
- Periodic refresh of schedule definitions (every 10 seconds)
- Execution of workflows according to the schedule
Defining Schedules
Defining Schedules
Add schedules to workflows using cron expressions:
from mistralai.workflows.models import ScheduleDefinition
schedule = ScheduleDefinition(
input={"report_type": "daily"},
cron_expressions=["0 0 * * *"] # Daily at midnight UTC
)
@workflows.workflow.define(schedules=[schedule])
class ReportWorkflow:
async def run(self, report_type: str = "daily"):
# Generate report
passDefining Schedule Policies
Defining Schedule Policies
Customize schedule behavior with policies:
from mistralai.workflows.models import SchedulePolicy, ScheduleOverlapPolicy
# Override default schedule policy
schedule_policy = SchedulePolicy(
catchup_window=timedelta(days=1), # Allow 1 day of catchup
overlap_policy=ScheduleOverlapPolicy.SKIP, # Skip overlapping executions
)
schedule = ScheduleDefinition(
input={"report_type": "daily"},
cron_expressions=["0 0 * * *"], # Daily at midnight UTC
policy=schedule_policy
)
@workflows.workflow.define(schedules=[schedule])
class ReportWorkflow:
async def run(self, report_type: str = "daily"):
# Generate report
passKey Considerations
Key Considerations
-
Worker Configuration:
- Ensure all workers have identical schedule configurations
- Mismatched configurations can cause conflicts and unexpected behavior
-
Schedule Definition:
- Uses standard cron syntax
- Includes input parameters for scheduled executions
- Supports multiple cron expressions per workflow
Complete Example
Complete Example
from datetime import timedelta
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy
# Run every Saturday at 3 AM UTC
backup_schedule = ScheduleDefinition(
input={"retention_days": 30},
cron_expressions=["0 3 * * 6"],
policy=SchedulePolicy(
catchup_window=timedelta(days=7),
overlap_policy=ScheduleOverlapPolicy.SKIP,
)
)
@workflows.workflow.define(schedules=[backup_schedule])
class DatabaseBackupWorkflow:
async def run(self, retention_days: int = 30):
print(f"Starting backup with {retention_days} day retention")
# Backup implementation here
# Start worker with:
# asyncio.run(workflows.run_worker([DatabaseBackupWorkflow]))Important Notes
Important Notes
- Schedules use UTC time zone by default
- Each schedule can specify different input parameters
- Workers automatically maintain schedule registrations
- Ensure consistent schedule definitions across all workers