Automations enable you to configure actions that execute automatically based on trigger conditions.

Potential triggers include the occurrence of events from changes in a flow run’s state—or the absence of such events. You can define your own custom trigger to fire based on a custom event defined in Python code. With Prefect Cloud you can even create webhooks that can receive data for use in actions.

Actions include starting flow runs, pausing schedules, and sending custom notifications.

Create an automation

On the Automations page, select the + icon to create a new automation. You’ll be prompted to configure:

  • A trigger condition that causes the automation to execute.
  • One or more actions carried out by the automation.
  • Details about the automation, such as a name and description.

Manage automations

The Automations page provides an overview of all configured automations for your workspace.

Select the toggle next to an automation to pause execution of the automation.

The button next to the toggle provides commands to copy the automation ID, edit the automation, or delete the automation.

Select the name of an automation to view Details about it and relevant Events.

Triggers

Triggers specify the conditions under which your action should be performed. The Prefect UI includes templates for many common conditions, such as:

  • Flow run state change (Flow Run Tags are only evaluated with OR criteria)
  • Work pool status
  • Work queue status
  • Deployment status
  • Metric thresholds, such as average duration, lateness, or completion percentage
  • Incident declarations (available on Pro and Enterprise plans)
  • Custom event triggers

Automations API

The automations API enables further programmatic customization of trigger and action policies based on arbitrary events.

Importantly, you can configure the triggers not only in reaction to events, but also proactively: in the absence of an expected event.

For example, in the case of flow run state change triggers, you might expect production flows to finish in no longer than thirty minutes. But transient infrastructure or network issues could cause your flow to get “stuck” in a running state. A trigger could kick off an action if the flow stays in a running state for more than 30 minutes.

This action could be taken on the flow itself, such as cancelling or restarting it. Or the action could take the form of a notification for someone to take manual remediation steps. Or you could set both actions to take place when the trigger occurs.

Actions

Actions specify what your automation does when its trigger criteria are met. Current action types include:

  • Cancel a flow run
  • Pause or resume a schedule
  • Run a deployment
  • Pause or resume a deployment schedule
  • Pause or resume a work pool
  • Pause or resume a work queue
  • Pause or resume an automation
  • Send a notification
  • Call a webhook
  • Suspend a flow run
  • Declare an incident (available on Pro and Enterprise plans)
  • Change the state of a flow run

Create automations In Python code

You can create and access any automation with the Python SDK’s Automation class and its methods.

from prefect.automations import Automation
from prefect.events.schemas.automations import EventTrigger
from prefect.events.actions import CancelFlowRun

# creating an automation
automation = Automation(
    name="woodchonk",
    trigger=EventTrigger(
        expect={"animal.walked"},
        match={
            "genus": "Marmota",
            "species": "monax",
        },
        posture="Reactive",
        threshold=3,
    ),
    actions=[CancelFlowRun()],
).create()
print(automation)
# name='woodchonk' description='' enabled=True trigger=EventTrigger(type='event', match=ResourceSpecification(__root__={'genus': 'Marmota', 'species': 'monax'}), match_related=ResourceSpecification(__root__={}), after=set(), expect={'animal.walked'}, for_each=set(), posture=Posture.Reactive, threshold=3, within=datetime.timedelta(seconds=10)) actions=[CancelFlowRun(type='cancel-flow-run')] actions_on_trigger=[] actions_on_resolve=[] owner_resource=None id=UUID('d641c552-775c-4dc6-a31e-541cb11137a6')

# reading the automation

automation = Automation.read(id=automation.id)
# or
automation = Automation.read(name="woodchonk")

print(automation)
# name='woodchonk' description='' enabled=True trigger=EventTrigger(type='event', match=ResourceSpecification(__root__={'genus': 'Marmota', 'species': 'monax'}), match_related=ResourceSpecification(__root__={}), after=set(), expect={'animal.walked'}, for_each=set(), posture=Posture.Reactive, threshold=3, within=datetime.timedelta(seconds=10)) actions=[CancelFlowRun(type='cancel-flow-run')] actions_on_trigger=[] actions_on_resolve=[] owner_resource=None id=UUID('d641c552-775c-4dc6-a31e-541cb11137a6')

Selected and inferred action targets

Some actions require you to either select the target of the action, or specify that the target of the action should be inferred. Selected targets are simple and useful for when you know exactly what object your action should act on. For example, the case of a cleanup flow you want to run or a specific notification you want to send.

Inferred targets are deduced from the trigger itself.

For example, if a trigger fires on a flow run that is stuck in a running state, and the action is to cancel an inferred flow run—the flow run that caused the trigger to fire.

Similarly, if a trigger fires on a work queue event and the corresponding action is to pause an inferred work queue, the inferred work queue is the one that emitted the event.

Prefect infers the relevant event whenever possible, but sometimes one does not exist.

Specify a name and, optionally, a description for the automation.

Create an automation with deployment triggers

To enable the simple configuration of event-driven deployments, Prefect provides deployment triggers—a shorthand for creating automations that are linked to specific deployments to run them based on the presence or absence of events.

Trigger definitions for deployments are supported in prefect.yaml, .serve, and .deploy. At deployment time, specified trigger definitions create linked automations triggered by events matching your chosen grammar. Each trigger definition may include a jinja template to render the triggering event as the parameters of your deployment’s flow run.

Define triggers in prefect.yaml

You can include a list of triggers on any deployment in a prefect.yaml file:

deployments:
  - name: my-deployment
    entrypoint: path/to/flow.py:decorated_fn
    work_pool:
      name: my-work-pool
    triggers:
      - type: event
        enabled: true
        match:
          prefect.resource.id: my.external.resource
        expect:
          - external.resource.pinged
        parameters:
          param_1: "{{ event }}"

This deployment creates a flow run when an external.resource.pinged event and an external.resource.replied event have been seen from my.external.resource:

deployments:
  - name: my-deployment
    entrypoint: path/to/flow.py:decorated_fn
    work_pool:
      name: my-work-pool
    triggers:
      - type: compound
        require: all
        parameters:
          param_1: "{{ event }}"
        triggers:
          - type: event
            match:
              prefect.resource.id: my.external.resource
            expect:
              - external.resource.pinged
          - type: event
            match:
              prefect.resource.id: my.external.resource
            expect:
              - external.resource.replied

Define triggers in .serve and .deploy

To create deployments with triggers in Python, the trigger types DeploymentEventTrigger, DeploymentMetricTrigger, DeploymentCompoundTrigger, and DeploymentSequenceTrigger can be imported from prefect.events:

from prefect import flow
from prefect.events import DeploymentEventTrigger


@flow(log_prints=True)
def decorated_fn(param_1: str):
    print(param_1)


if __name__=="__main__":
    decorated_fn.serve(
        name="my-deployment",
        triggers=[
            DeploymentEventTrigger(
                enabled=True,
                match={"prefect.resource.id": "my.external.resource"},
                expect=["external.resource.pinged"],
                parameters={
                    "param_1": "{{ event }}",
                },
            )
        ],
    )

As with prior examples, you must supply composite triggers with a list of underlying triggers:

from prefect import flow
from prefect.events import DeploymentCompoundTrigger


@flow(log_prints=True)
def decorated_fn(param_1: str):
    print(param_1)


if __name__=="__main__":
    decorated_fn.deploy(
        name="my-deployment",
        image="my-image-registry/my-image:my-tag",
        triggers=[
            DeploymentCompoundTrigger(
                enabled=True,
                name="my-compound-trigger",
                require="all",
                triggers=[
                    {
                      "type": "event",
                      "match": {"prefect.resource.id": "my.external.resource"},
                      "expect": ["external.resource.pinged"],
                    },
                    {
                      "type": "event",
                      "match": {"prefect.resource.id": "my.external.resource"},
                      "expect": ["external.resource.replied"],
                    },
                ],
                parameters={
                    "param_1": "{{ event }}",
                },
            )
        ],
        work_pool_name="my-work-pool",
    )

Pass triggers to prefect deploy

You can pass one or more --trigger arguments to prefect deploy as either a JSON string or a path to a .yaml or .json file.

# Pass a trigger as a JSON string
prefect deploy -n test-deployment \
  --trigger '{
    "enabled": true,
    "match": {
      "prefect.resource.id": "prefect.flow-run.*"
    },
    "expect": ["prefect.flow-run.Completed"]
  }'

# Pass a trigger using a JSON/YAML file
prefect deploy -n test-deployment --trigger triggers.yaml
prefect deploy -n test-deployment --trigger my_stuff/triggers.json

For example, a triggers.yaml file could have many triggers defined:

triggers:
  - enabled: true
    match:
      prefect.resource.id: my.external.resource
    expect:
      - external.resource.pinged
    parameters:
      param_1: "{{ event }}"
  - enabled: true
    match:
      prefect.resource.id: my.other.external.resource
    expect:
      - some.other.event
    parameters:
      param_1: "{{ event }}"

Both of the above triggers would be attached to test-deployment after running prefect deploy.

Triggers passed to prefect deploy will override any triggers defined in prefect.yaml

While you can define triggers in prefect.yaml for a given deployment, triggers passed to prefect deploy take precedence over those defined in prefect.yaml.

Note that deployment triggers contribute to the total number of automations in your workspace.

Sending notifications with automations

Automations support sending notifications through any predefined block that is capable of and configured to send a message, including:

  • Slack message to a channel
  • Microsoft Teams message to a channel
  • Email to an email address

Templating with Jinja

You can access templated variables with automation actions through Jinja syntax. Templated variables enable you to dynamically include details from an automation trigger, such as a flow or pool name.

Jinja templated variable syntax wraps the variable name in double curly brackets, like this: {{ variable }}.

You can access properties of the underlying flow run objects including:

In addition to its native properties, each object includes an id along with created and updated timestamps.

The flow_run|ui_url token returns the URL to view the flow run in the UI.

Here’s an example relevant to a flow run state-based notification:

Flow run {{ flow_run.name }} entered state {{ flow_run.state.name }}.

    Timestamp: {{ flow_run.state.timestamp }}
    Flow ID: {{ flow_run.flow_id }}
    Flow Run ID: {{ flow_run.id }}
    State message: {{ flow_run.state.message }}

The resulting Slack webhook notification looks something like this:

You could include flow and deployment properties:

Flow run {{ flow_run.name }} for flow {{ flow.name }}
entered state {{ flow_run.state.name }}
with message {{ flow_run.state.message }}

Flow tags: {{ flow_run.tags }}
Deployment name: {{ deployment.name }}
Deployment version: {{ deployment.version }}
Deployment parameters: {{ deployment.parameters }}

An automation that reports on work pool status might include notifications using work_pool properties:

Work pool status alert!

Name: {{ work_pool.name }}
Last polled: {{ work_pool.last_polled }}

In addition to those shortcuts for flows, deployments, and work pools, you have access to the automation and the event that triggered the automation. See the Automations API for additional details.

Automation: {{ automation.name }}
Description: {{ automation.description }}

Event: {{ event.id }}
Resource:
{% for label, value in event.resource %}
{{ label }}: {{ value }}
{% endfor %}
Related Resources:
{% for related in event.related %}
    Role: {{ related.role }}
    {% for label, value in related %}
    {{ label }}: {{ value }}
    {% endfor %}
{% endfor %}

Note that this example also illustrates the ability to use Jinja features such as iterator and for loop control structures when templating notifications.

API example

This example grabs data from an API and sends a notification based on the end state.

Create the example script

Start by pulling hypothetical user data from an endpoint and then performing data cleaning and transformations.

First create a simple extract method that pulls the data from a random user data generator endpoint:

from prefect import flow, task, get_run_logger
import requests
import json

@task
def fetch(url: str):
    logger = get_run_logger()
    response = requests.get(url)
    raw_data = response.json()
    logger.info(f"Raw response: {raw_data}")
    return raw_data

@task
def clean(raw_data: dict):
    print(raw_data.get('results')[0])
    results = raw_data.get('results')[0]
    logger = get_run_logger()
    logger.info(f"Cleaned results: {results}")
    return results['name']

@flow
def build_names(num: int = 10):
    df = []
    url = "https://randomuser.me/api/"
    logger = get_run_logger()
    copy = num
    while num != 0:
        raw_data = fetch(url)
        df.append(clean(raw_data))
        num -= 1
    logger.info(f"Built {copy} names: {df}")
    return df

if __name__ == "__main__":
    list_of_names = build_names()

The data cleaning workflow has visibility into each step, and sends a list of names to the next step of the pipeline.

Create a notification block in the UI

Next, send a notification based off a completed state outcome. Configure a notification that shows when to look into your workflow logic.

  1. Prior to creating the automation, confirm the notification location. Create a notification block to help define where the notification is sent.

  2. Navigate to the blocks page on the UI, and click into creating an email notification block.

  3. Go to the automations page to create your first automation.

  4. Next, find the trigger type. In this case, use a flow completion.

  5. Create the actions for when the trigger is hit. In this case, create a notification to showcase the completion.

  6. Now the automation is ready to be triggered from a flow run completion. Run the file locally and see that the notification is sent to your inbox after the completion. It may take a few minutes for the notification to arrive.

No deployment created

You do not need to create a deployment to trigger your automation. In the case above, the flow run state trigger fired in response to a flow run that was executed locally.

Now that you’ve seen how to create an email notification from a flow run completion, see how to kick off a deployment run in response to an event.

Event-based deployment automation

Create an automation to kick off a deployment instead of a notification. Explore how to programmatically create this automation with Prefect’s REST API.

See the REST API documentation as a reference for interacting with the automation endpoints.

Create a deployment to kick off some work based on how long a flow is running. For example, if the build_names flow takes too long to execute, you can kick off a deployment with the same build_names flow, but replace the count value with a lower number to speed up completion. Create a deployment with a prefect.yaml file or a Python file that uses flow.deploy.

Create a prefect.yaml file like this one for our flow build_names:

  # Welcome to your prefect.yaml file! You can use this file for storing and managing
  # configuration for deploying your flows. We recommend committing this file to source
  # control along with your flow code.

  # Generic metadata about this project
  name: automations-guide
  prefect-version: 3.0.0

  # build section allows you to manage and build docker images
  build: null

  # push section allows you to manage if and how this project is uploaded to remote locations
  push: null

  # pull section allows you to provide instructions for cloning this project in remote locations
  pull:
  - prefect.deployments.steps.set_working_directory:
      directory: /Users/src/prefect/Playground/automations-guide

  # the deployments section allows you to provide configuration for deploying flows
  deployments:
  - name: deploy-build-names
    version: null
    tags: []
    description: null
    entrypoint: test-automations.py:build_names
    parameters: {}
    work_pool:
      name: tutorial-process-pool
      work_queue_name: null
      job_variables: {}
    schedule: null

Grab your deployment_id from this deployment with the CLI and embed it in your automation.

Find deployment_id from the CLI

Run prefect deployment ls in an authenticated command prompt.

prefect deployment ls
                                          Deployments
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Name                                                  ┃ ID                                   ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Extract islands/island-schedule                       │ d9d7289c-7a41-436d-8313-80a044e61532 │
│ build-names/deploy-build-names                        │ 8b10a65e-89ef-4c19-9065-eec5c50242f4 │
│ ride-duration-prediction-backfill/backfill-deployment │ 76dc6581-1773-45c5-a291-7f864d064c57 │
└───────────────────────────────────────────────────────┴──────────────────────────────────────┘

Create an automation with a POST call to programmatically create the automation. Ensure you have your api_key, account_id, and workspace_id.

def create_event_driven_automation():
    api_url = f"https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/automations/"
    data = {
        "name": "Event Driven Redeploy",
        "description": "Programmatically created an automation to redeploy a flow based on an event",
        "enabled": "true",
        "trigger": {
            "after": [
                "string"
            ],
            "expect": [
                "prefect.flow-run.Running"
            ],
            "for_each": [
                "prefect.resource.id"
            ],
            "posture": "Proactive",
            "threshold": 30,
            "within": 0
        },
        "actions": [
            {
                "type": "run-deployment",
                "source": "selected",
                "deployment_id": "YOUR-DEPLOYMENT-ID",
                "parameters": "10"
            }
        ],
        "owner_resource": "string"
    }

    headers = {"Authorization": f"Bearer {PREFECT_API_KEY}"}
    response = requests.post(api_url, headers=headers, json=data)

    print(response.json())
    return response.json()

After running this function, you will see the changes that came from the post request within the UI. Keep in mind the context is “custom” on UI.

Run the underlying flow and see the deployment kick off after 30 seconds. This results in a new flow run of build_names. You can see this new deployment get initiated with the custom parameters outlined above.

In a few quick changes, you can programmatically create an automation that deploys workflows with custom parameters.

Use an underlying .yaml file

You can take this a step further by using your own .yaml version of the automation, and registering that file with the UI. This simplifies the requirements of the automation by declaring it in its own .yaml file, and then registering that .yaml with the API.

First start with creating the .yaml file to house the automation requirements:

automation.yaml
name: Cancel long running flows
description: Cancel any flow run after an hour of execution
trigger:
  match:
    "prefect.resource.id": "prefect.flow-run.*"
  match_related: {}
  after:
    - "prefect.flow-run.Failed"
  expect:
    - "prefect.flow-run.*"
  for_each:
    - "prefect.resource.id"
  posture: "Proactive"
  threshold: 1
  within: 30
actions:
  - type: "cancel-flow-run"

Make a helper function that applies this YAML file with the REST API function.

import yaml

from myproject.utils import post, put

def create_or_update_automation(path: str = "automation.yaml"):
    """Create or update an automation from a local YAML file"""
    # Load the definition
    with open(path, "r") as fh:
        payload = yaml.safe_load(fh)

    # Find existing automations by name
    automations = post("/automations/filter")
    existing_automation = [a["id"] for a in automations if a["name"] == payload["name"]]
    automation_exists = len(existing_automation) > 0

    # Create or update the automation
    if automation_exists:
        print(f"Automation '{payload['name']}' already exists and will be updated")
        put(f"/automations/{existing_automation[0]}", payload=payload)
    else:
        print(f"Creating automation '{payload['name']}'")
        post("/automations/", payload=payload)

if __name__ == "__main__":
    create_or_update_automation()

Find a complete repo with these APIs examples in this GitHub repository.

In this example, you created the automation by registering the .yaml file with a helper function.

Kick off an automation with a custom webhook

Use webhooks to expose the events API. This allows you to extend the functionality of deployments and respond to changes in your workflow.

By exposing a webhook endpoint, you can kick off workflows that trigger deployments, all from an event created from an HTTP request.

Create this webhook in the UI to create these dynamic events.

{
    "event": "model-update",
    "resource": {
        "prefect.resource.id": "product.models.{{ body.model_id}}",
        "prefect.resource.name": "{{ body.friendly_name }}",
        "run_count": "{{body.run_count}}"
    }
}

From this input, you can create an exposed webhook endpoint.

Each webhook corresponds to a custom event created where you can react to it downstream with a separate deployment or automation.

For example, you can create a curl request that sends the endpoint information such as a run count for your deployment:

curl -X POST https://api.prefect.cloud/hooks/34iV2SFke3mVa6y5Y-YUoA -d "model_id=adhoc" -d "run_count=10" -d "friendly_name=test-user-input"

From here, you can make a webhook that is connected to pulling in parameters on the curl command. It kicks off a deployment that uses these pulled parameters:

Go into the event feed to automate straight from this event:

This allows you to create automations that respond to these webhook events. From a few clicks in the UI, you can associate an external process with the Prefect events API that can trigger downstream deployments.

Examples

Trigger a downstream deployment with an event

This example shows how to use a trigger to schedule a downstream deployment when an upstream deployment completes.

event_driven_deployments.py
from prefect import flow, serve
from prefect.events import DeploymentEventTrigger


@flow(log_prints=True)
def upstream_flow():
    print("upstream flow")


@flow(log_prints=True)
def downstream_flow():
    print("downstream flow")


if __name__ == "__main__":
    upstream_deployment = upstream_flow.to_deployment(name="upstream_deployment")
    downstream_deployment = downstream_flow.to_deployment(
        name="downstream_deployment",
        triggers=[
            DeploymentEventTrigger(
                expect={"prefect.flow-run.Completed"},
                match_related={"prefect.resource.name": "upstream_deployment"},
            )
        ],
    )

    serve(upstream_deployment, downstream_deployment)

First, start the serve process to listen for scheduled deployments runs:

python event_driven_deployments.py

Now, run the upstream deployment and see the downstream deployment kick off after it completes:

prefect deployment run upstream-flow/upstream_deployment

Check the event feed

You can inspect raw events in the event feed in the UI to see what related resources are available to match against.

For example, the following prefect.flow-run.Completed event’s related resources include:

{
   "related": [
    {
      "prefect.resource.id": "prefect.flow.10e099ec-8358-4146-b188-be68027ee58f",
      "prefect.resource.role": "flow",
      "prefect.resource.name": "upstream-flow"
    },
    {
      "prefect.resource.id": "prefect.deployment.be777bbd-4b15-49f3-bc1f-4d109374cee2",
      "prefect.resource.role": "deployment",
      "prefect.resource.name": "upstream_deployment"
    },
    {
      "prefect.resource.id": "prefect-cloud.user.80546602-9f31-4396-ab4b-e873a5377feb",
      "prefect.resource.role": "creator",
      "prefect.resource.name": "stoat"
    }
  ]
}

Trigger a deployment when a customer completes an order

Imagine you are running an e-commerce platform and you want to trigger a deployment when a customer completes an order.

There might be a number of events that occur during an order on your platform, for example:

  • order.created
  • order.item.added
  • order.payment-method.confirmed
  • order.shipping-method.added
  • order.complete

Event grammar

The above choices of event names are arbitrary. With Prefect events, you’re free to select any event grammar that best represents your use case.

In this case, we want to trigger a deployment when a user completes an order, so our trigger should:

  • expect an order.complete event
  • after an order.created event
  • evaluate these conditions for_each user id

Finally, it should pass the user_id as a parameter to the deployment. Here’s how this looks in code:

post_order_deployment.py
from prefect import flow
from prefect.events.schemas.deployment_triggers import DeploymentEventTrigger

order_complete = DeploymentEventTrigger(
    expect={"order.complete"},
    after={"order.created"},
    for_each={"prefect.resource.id"},
    parameters={"user_id": "{{ event.resource.id }}"},
)


@flow(log_prints=True)
def post_order_complete(user_id: str):
    print(f"User {user_id} has completed an order -- doing stuff now")


if __name__ == "__main__":
    post_order_complete.serve(triggers=[order_complete])

Specify multiple events or resources

The expect and after fields accept a set of event names, so you can specify multiple events for each condition.

Similarly, the for_each field accepts a set of resource ids.

To simulate users causing order status events, run the following in a Python shell or script:

simulate_events.py
import time
from prefect.events import emit_event

user_id_1, user_id_2 = "123", "456"
for event_name, user_id in [
    ("order.created", user_id_1),
    ("order.created", user_id_2), # other user
    ("order.complete", user_id_1),
]:
    event = emit_event(
        event=event_name,
        resource={"prefect.resource.id": user_id},
    )
    time.sleep(1)
    print(f"{user_id} emitted {event_name}")

In the above example:

  • user_id_1 creates and then completes an order, triggering a run of our deployment.
  • user_id_2 creates an order, but no completed event is emitted so no deployment is triggered.

See also

  • To learn more about Prefect events, which can trigger automations, see the events docs.
  • See the webhooks guide to learn how to create webhooks and receive external events.