Swarm API Reference¶
This page provides detailed API documentation for swarm functionality in MageFlow.
mageflow.swarm()¶
Create a new task swarm for parallel execution.
async def swarm(
tasks: List[TaskSignatureConvertible],
success_callbacks: Optional[List[TaskSignatureConvertible]] = None,
error_callbacks: Optional[List[TaskSignatureConvertible]] = None,
config: SwarmConfig = SwarmConfig(),
task_name: Optional[str] = None,
is_swarm_closed: bool = False,
**kwargs
) -> SwarmTaskSignature
Parameters:
- tasks: List of tasks to run in parallel
- success_callbacks: Tasks executed when all tasks complete successfully
- error_callbacks: Tasks executed when failure conditions are met
- config: Configuration object controlling swarm behavior
- task_name: Optional name for the swarm
- is_swarm_closed: Whether to close swarm immediately (prevents adding new tasks)
- **kwargs: Additional parameters passed to task contexts
Returns: SwarmTaskSignature - The swarm task signature
SwarmConfig¶
Configuration class for controlling swarm behavior.
class SwarmConfig(BaseModel):
max_concurrency: int = 30
stop_after_n_failures: Optional[int] = None
max_task_allowed: Optional[int] = None
Fields:
- max_concurrency: Maximum number of tasks running simultaneously (default: 30)
- stop_after_n_failures: Stop swarm after N task failures (default: None - no limit)
- max_task_allowed: Maximum total tasks allowed in swarm (default: None - no limit)
SwarmTaskSignature¶
The main swarm class that manages parallel task execution.
Properties¶
tasks: List of all task IDs in the swarmtasks_left_to_run: Queue of tasks waiting to executefinished_tasks: List of successfully completed task IDsfailed_tasks: List of failed task IDscurrent_running_tasks: Number of currently executing tasksis_swarm_closed: Whether new tasks can be addedconfig: SwarmConfig instance
Methods¶
add_task()¶
Add a new task to the swarm.
async def add_task(
self,
task: TaskSignatureConvertible,
close_on_max_task: bool = True
) -> BatchItemTaskSignature
Parameters:
- task: Task signature, function, or name to add
- close_on_max_task: If True and max_task_allowed is configured, automatically closes the swarm when the maximum task limit is reached (default: True)
Returns: BatchItemTaskSignature - Wrapper task for the swarm
Raises:
- TooManyTasksError: If max_task_allowed limit exceeded
- SwarmIsCanceledError: If swarm is canceled
Note: When close_on_max_task=True and the swarm reaches its max_task_allowed limit after adding this task, the swarm will be automatically closed, preventing any further tasks from being added and triggering completion callbacks once all tasks finish.
close_swarm()¶
Close the swarm to prevent new tasks and trigger completion callbacks.
Returns: The swarm instance
aio_run_no_wait()¶
Start the swarm execution.
Parameters:
- msg: Message object to pass to tasks
- **kwargs: Additional execution options
add_to_running_tasks()¶
Internal method to manage task concurrency.
Returns: True if task can run immediately, False if queued
is_swarm_done()¶
Check if swarm has completed all tasks.
Returns: True if swarm is closed and all tasks finished
BatchItemTaskSignature¶
Wrapper class for individual tasks within a swarm.
Properties¶
swarm_id: ID of the parent swarmoriginal_task_id: ID of the original task being wrapped
Methods¶
Tasks within swarms are automatically wrapped in BatchItemTaskSignature instances that:
- Manage concurrency within the swarm
- Forward execution to the original task
- Handle swarm lifecycle events
- Inherit all TaskSignature methods (suspend, resume, interrupt)
Error Classes¶
TooManyTasksError¶
Raised when attempting to add tasks beyond max_task_allowed limit.
SwarmIsCanceledError¶
Raised when attempting to add tasks to a canceled swarm.
MissingSwarmItemError¶
Raised when a swarm item task cannot be found during execution.