Workflow scheduling

Automatically run workflows at specified times without manual triggering.

How it works

How it works

Workers automatically register workflow schedules with Workflows during startup. The system handles:

  1. Schedule registration with Workflows
  2. Periodic refresh of schedule definitions (every 10 seconds)
  3. Execution of workflows according to the schedule
Defining Schedules

Defining Schedules

Add schedules to workflows using cron expressions:

from mistralai.workflows.models import ScheduleDefinition

schedule = ScheduleDefinition(
    input={"report_type": "daily"},
    cron_expressions=["0 0 * * *"]  # Daily at midnight UTC
)

@workflows.workflow.define(schedules=[schedule])
class ReportWorkflow:
    async def run(self, report_type: str = "daily"):
        # Generate report
        pass
Defining Schedule Policies

Defining Schedule Policies

Customize schedule behavior with policies:

from mistralai.workflows.models import SchedulePolicy, ScheduleOverlapPolicy

# Override default schedule policy
schedule_policy = SchedulePolicy(
    catchup_window=timedelta(days=1),  # Allow 1 day of catchup
    overlap_policy=ScheduleOverlapPolicy.SKIP,  # Skip overlapping executions
)

schedule = ScheduleDefinition(
    input={"report_type": "daily"},
    cron_expressions=["0 0 * * *"],  # Daily at midnight UTC
    policy=schedule_policy
)

@workflows.workflow.define(schedules=[schedule])
class ReportWorkflow:
    async def run(self, report_type: str = "daily"):
        # Generate report
        pass
Key Considerations

Key Considerations

  1. Worker Configuration:

    • Ensure all workers have identical schedule configurations
    • Mismatched configurations can cause conflicts and unexpected behavior
  2. Schedule Definition:

    • Uses standard cron syntax
    • Includes input parameters for scheduled executions
    • Supports multiple cron expressions per workflow
Complete Example

Complete Example

from datetime import timedelta
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy

# Run every Saturday at 3 AM UTC
backup_schedule = ScheduleDefinition(
    input={"retention_days": 30},
    cron_expressions=["0 3 * * 6"],
    policy=SchedulePolicy(
        catchup_window=timedelta(days=7),
        overlap_policy=ScheduleOverlapPolicy.SKIP,
    )
)

@workflows.workflow.define(schedules=[backup_schedule])
class DatabaseBackupWorkflow:
    async def run(self, retention_days: int = 30):
        print(f"Starting backup with {retention_days} day retention")
        # Backup implementation here

# Start worker with:
# asyncio.run(workflows.run_worker([DatabaseBackupWorkflow]))
Important Notes

Important Notes

  • Schedules use UTC time zone by default
  • Each schedule can specify different input parameters
  • Workers automatically maintain schedule registrations
  • Ensure consistent schedule definitions across all workers