Task Swarms¶
Task swarms in MageFlow provide a powerful way to run multiple tasks in parallel with controlled concurrency. Unlike chains where tasks run sequentially, swarms allow you to manage a group of tasks that execute simultaneously while controlling how many can run at once and when to trigger callbacks for the entire group.
What is a Swarm?¶
A swarm is a collection of tasks that execute in parallel, where: - Multiple tasks run concurrently with configurable limits - Tasks can be added dynamically to the swarm queue - Callbacks are triggered when all tasks complete or when failure conditions are met - The swarm manages the lifecycle and concurrency of all its component tasks
Creating a Swarm¶
Use mageflow.aswarm() to create a task swarm:
import mageflow
swarm_signature = await mageflow.aswarm(tasks=[task1, task2, task3])
swarm_signature = await mageflow.aswarm(
tasks=[process_file1, process_file2, process_file3],
success_callbacks=[completion_callback],
error_callbacks=[error_handler],
config=SwarmConfig(max_concurrency=2),
)
Alternative Client Usage
You can also create swarms using the mageflow client instead of the global mageflow module:
Parameters¶
tasks: List of task signatures, task functions, or task names to run in parallelsuccess_callbacks: Tasks to execute when all tasks complete successfullyerror_callbacks: Tasks to execute when failure conditions are metconfig: SwarmConfig object to control swarm behavioris_swarm_closed: Whether the swarm should be closed immediately (defaults to False)
Managing Swarm Lifecycle¶
Starting a Swarm¶
Start a swarm like any other task with the aio_run_no_wait method:
max_concurrency parameter in SwarmConfig).
Adding Tasks¶
Use aio_run_in_swarm() to add and schedule a task in one step:
swarm = await mageflow.aswarm(tasks=[initial_task])
await swarm.aio_run_no_wait(SwarmMessage(swarm_data="shared"))
await swarm.aio_run_in_swarm(additional_task, TaskMessage(data="task-specific"))
The task receives its own message data merged with the swarm's shared parameters. Configure the message model to ignore extra fields so the merge doesn't affect the task:
class NewTaskMessage(BaseModel):
data: str
model_config = ConfigDict(extra="ignore")
class SwarmMessage(BaseModel):
swarm_data: str
@hatchet.task()
async def new_task(message: NewTaskMessage):
print(message.data)
swarm = await mageflow.aswarm(tasks=[initial_task])
await swarm.aio_run_no_wait(SwarmMessage(swarm_data="swarm_data"))
await swarm.aio_run_in_swarm(new_task, NewTaskMessage(data="hello"))
# Or send multiple tasks
multiple_tasks = await new_task.aduplicate_many(3)
await swarm.aio_run_in_swarm(multiple_tasks, NewTaskMessage(data="hello"))
Closing a Swarm¶
When you're done adding tasks to the swarm, close it.
await swarm.close_swarm()
# Or create a pre-closed swarm
swarm = await mageflow.aswarm(
tasks=task_list,
is_swarm_closed=True
)
Concurrency Control¶
Swarms automatically manage task concurrency:
file_tasks = [
await mageflow.asign("process-file", file_path=f"file_{i}.txt")
for i in range(20)
]
swarm = await mageflow.aswarm(
tasks=file_tasks,
config=SwarmConfig(max_concurrency=5),
is_swarm_closed=True
)
# Only 5 tasks run simultaneously
# As each completes, the next queued task starts
await swarm.aio_run_no_wait(ProcessMessage())
This is especially useful when you want to manage a sudden peak in tasks without deploying new workers to support the load.
Failure Handling¶
Control how swarms handle task failures:
# Stop after 3 failures
swarm = await mageflow.aswarm(
tasks=risky_tasks,
error_callbacks=[handle_swarm_failure],
config=SwarmConfig(stop_after_n_failures=3)
)
# Continue despite individual failures (no stop limit)
swarm = await mageflow.aswarm(
tasks=optional_tasks,
success_callbacks=[process_results],
config=SwarmConfig(stop_after_n_failures=None)
)
Swarm Callback¶
The swarm triggers callbacks when all tasks complete. The callback receives a list of all the task results (see ReturnValue Annotation docs).
Example Use Cases¶
Parallel File Processing¶
file_paths = ["file1.csv", "file2.csv", "file3.csv"]
process_tasks = [
await mageflow.asign("process-csv-file", file_path=path)
for path in file_paths
]
consolidate_results = await mageflow.asign("consolidate-results")
handle_processing_errors = await mageflow.asign("handle-file-errors")
file_swarm = await mageflow.aswarm(
tasks=process_tasks,
success_callbacks=[consolidate_results],
error_callbacks=[handle_processing_errors],
config=SwarmConfig(max_concurrency=3),
is_swarm_closed=True
)
await file_swarm.aio_run_no_wait(ProcessingMessage())
Dynamic Task Queue¶
initial_tasks = [await mageflow.asign("initial-task")]
notification_task = await mageflow.asign("notify-completion")
swarm = await mageflow.aswarm(
tasks=initial_tasks,
success_callbacks=[notification_task],
config=SwarmConfig(max_concurrency=10)
)
await swarm.aio_run_no_wait(InitialMessage())
for data_item in dynamic_data_stream:
await swarm.aio_run_in_swarm("process-item", ProcessMessage(data=data_item))
await swarm.close_swarm()
Batch Processing with Error Tolerance¶
batch_tasks = [
await mageflow.asign("process-record", record_id=i)
for i in range(1000)
]
completion_report = await mageflow.asign("generate-completion-report")
failure_alert = await mageflow.asign("send-failure-alert")
batch_swarm = await mageflow.aswarm(
tasks=batch_tasks,
success_callbacks=[completion_report],
error_callbacks=[failure_alert],
config=SwarmConfig(
max_concurrency=20,
stop_after_n_failures=50
),
is_swarm_closed=True
)
await batch_swarm.aio_run_no_wait(BatchMessage())