Swarm API Reference¶
This page provides detailed API documentation for swarm functionality in MageFlow.
mageflow.aswarm(tasks, task_name, **options)¶
Create a new task swarm for parallel execution.
async def aswarm(
tasks: List[TaskSignatureConvertible],
task_name: Optional[str] = None,
success_callbacks: Optional[List[TaskSignatureConvertible]] = None,
error_callbacks: Optional[List[TaskSignatureConvertible]] = None,
config: SwarmConfig = SwarmConfig(),
is_swarm_closed: bool = False,
**kwargs
) -> SwarmTaskSignature
Parameters:
- tasks (list): List of tasks to run in parallel
- task_name (str): Optional name for the swarm
- success_callbacks (list): Tasks executed when all tasks complete successfully
- error_callbacks (list): Tasks executed when failure conditions are met
- config (SwarmConfig): Configuration object controlling swarm behavior
- is_swarm_closed (bool): Whether to close swarm immediately (prevents adding new tasks)
Example:
swarm = await mageflow.aswarm(
tasks=file_tasks,
task_name="file-processing",
config=SwarmConfig(max_concurrency=5),
is_swarm_closed=True
)
SwarmConfig¶
Configuration class for controlling swarm behavior.
class SwarmConfig(BaseModel):
max_concurrency: int = 30
stop_after_n_failures: Optional[int] = None
max_task_allowed: Optional[int] = None
Fields:
- max_concurrency (int): Maximum number of tasks running simultaneously (default: 30)
- stop_after_n_failures (int): Stop swarm after N task failures (default: None - no limit)
- max_task_allowed (int): Maximum total tasks allowed in swarm (default: None - no limit)
SwarmTaskSignature¶
The main swarm class that manages parallel task execution.
Properties¶
tasks: List of all task IDs in the swarmtasks_left_to_run: Queue of tasks waiting to executefinished_tasks: List of successfully completed task IDsfailed_tasks: List of failed task IDscurrent_running_tasks: Number of currently executing tasksis_swarm_closed: Whether new tasks can be addedconfig: SwarmConfig instance
Methods¶
aio_run_no_wait(msg)¶
Start the swarm execution.
Parameters:
- msg (BaseModel): Message object to pass to tasks
aio_run_in_swarm(task, msg, close_on_max_task)¶
Add one or more tasks to the swarm and immediately schedule them. All tasks receive the same message.
async def aio_run_in_swarm(
self,
task: TaskSignatureConvertible | list[TaskSignatureConvertible],
msg: BaseModel,
options: TriggerWorkflowOptions = None,
close_on_max_task: bool = True,
)
Parameters:
- task: A single task or a list of tasks to add and run
- msg (BaseModel): Message object shared across all tasks
- options (TriggerWorkflowOptions): Optional Hatchet trigger options
- close_on_max_task (bool): Automatically close the swarm when max_task_allowed is reached (default: True)
aio_run_tasks_in_swarm(tasks, msgs, options, close_on_max_task)¶
Add multiple tasks to the swarm where each task receives its own message.
async def aio_run_tasks_in_swarm(
self,
tasks: list[TaskSignatureConvertible],
msgs: list[BaseModel],
options: TriggerWorkflowOptions = None,
close_on_max_task: bool = True,
)
Parameters:
- tasks (list): List of tasks to add and run
- msgs (list[BaseModel]): List of messages, one per task — each task gets its own message
- options (TriggerWorkflowOptions): Optional Hatchet trigger options
- close_on_max_task (bool): Automatically close the swarm when max_task_allowed is reached (default: True)
close_swarm()¶
Close the swarm to prevent new tasks and trigger completion callbacks.
is_swarm_done()¶
Check if swarm has completed all tasks.
Error Classes¶
TooManyTasksError¶
Raised when attempting to add tasks beyond max_task_allowed limit.
SwarmIsCanceledError¶
Raised when attempting to add tasks to a canceled swarm.