Limit concurrent task runs with tags
Prevent too many tasks from running simultaneously using tags.
Task run concurrency limits help prevent too many tasks from running simultaneously. For example, if many tasks across multiple flows are designed to interact with a database that only allows 10 connections.
Task run concurrency limits use task tags. You can specify an optional concurrency limit as the maximum number of concurrent
task runs in a Running
state for tasks with a given tag.
Tag-based task concurrency is different from Global concurrency limits, though they can be used to achieve similar outcomes. Global concurrency limits are a more general way to control concurrency for any Python-based operation, whereas tag-based concurrency limits are specific to Prefect tasks.
If a task has multiple tags, it will run only if all tags have available concurrency.
Tags without specified concurrency limits are treated as unlimited. Setting a tag’s concurrency limit to 0 causes immediate abortion of any task runs with that tag, rather than delaying them.
Execution behavior
Task tag limits are checked whenever a task run attempts to enter a Running
state.
If there are no concurrency slots available for any one of your task’s tags, it delays the transition to a Running
state
and instructs the client to try entering a Running
state again in 30 seconds
(or the value specified by the PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS
setting).
Configure concurrency limits
Flow run concurrency limits are set at a work pool, work queue, or deployment level
While task run concurrency limits are configured through tags (as shown below), flow run concurrency limits are configured through work pools and/or work queues.
You can set concurrency limits on as few or as many tags as you wish. You can set limits through:
- Prefect CLI
- Prefect API by using
PrefectClient
Python client - Prefect server UI or Prefect Cloud
CLI
You can create, list, and remove concurrency limits with Prefect CLI concurrency-limit
commands.
Command | Description |
---|---|
create | Create a concurrency limit by specifying a tag and limit. |
delete | Delete the concurrency limit set on the specified tag. |
inspect | View details about a concurrency limit set on the specified tag. |
ls | View all defined concurrency limits. |
For example, to set a concurrency limit of 10 on the ‘small_instance’ tag:
To delete the concurrency limit on the ‘small_instance’ tag:
To view details about the concurrency limit on the ‘small_instance’ tag:
Python client
To update your tag concurrency limits programmatically, use
PrefectClient.orchestration.create_concurrency_limit
.
create_concurrency_limit
takes two arguments:
tag
specifies the task tag on which you’re setting a limit.concurrency_limit
specifies the maximum number of concurrent task runs for that tag.
For example, to set a concurrency limit of 10 on the ‘small_instance’ tag:
To remove all concurrency limits on a tag, use PrefectClient.delete_concurrency_limit_by_tag
, passing the tag:
If you wish to query for the current limit on a tag, use PrefectClient.read_concurrency_limit_by_tag
, passing the tag:
To see all of your limits across all of your tags, use PrefectClient.read_concurrency_limits
.
Was this page helpful?