The PrefectClient
contains many methods that make it simpler to perform actions, such as:
- reschedule late flow runs
- get the last
N
completed flow runs from a workspace
The PrefectClient
is an async context manager. Here’s an example usage:
from prefect import get_client
async with get_client() as client:
response = await client.hello()
print(response.json())
Examples
Reschedule late flow runs
To bulk reschedule flow runs that are late, delete the late flow runs and create new ones in a
Scheduled
state with a delay. This is useful if you accidentally scheduled many
flow runs of a deployment to an inactive work pool, for example.
The following example reschedules the last three late flow runs of a deployment named
healthcheck-storage-test
to run six hours later than their original expected start time.
It also deletes any remaining late flow runs of that deployment.
import asyncio
from datetime import datetime, timedelta, timezone
from typing import Optional
from prefect import get_client
from prefect.client.schemas.filters import (
DeploymentFilter, FlowRunFilter
)
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.sorting import FlowRunSort
from prefect.states import Scheduled
async def reschedule_late_flow_runs(
deployment_name: str,
delay: timedelta,
most_recent_n: int,
delete_remaining: bool = True,
states: Optional[list[str]] = None
) -> list[FlowRun]:
if not states:
states = ["Late"]
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=dict(name=dict(any_=states)),
expected_start_time=dict(
before_=datetime.now(timezone.utc)
),
),
deployment_filter=DeploymentFilter(
name={'like_': deployment_name}
),
sort=FlowRunSort.START_TIME_DESC,
limit=most_recent_n if not delete_remaining else None
)
if not flow_runs:
print(f"No flow runs found in states: {states!r}")
return []
rescheduled_flow_runs = []
for i, run in enumerate(flow_runs):
await client.delete_flow_run(flow_run_id=run.id)
if i < most_recent_n:
new_run = await client.create_flow_run_from_deployment(
deployment_id=run.deployment_id,
state=Scheduled(
scheduled_time=run.expected_start_time + delay
),
)
rescheduled_flow_runs.append(new_run)
return rescheduled_flow_runs
if __name__ == "__main__":
rescheduled_flow_runs = asyncio.run(
reschedule_late_flow_runs(
deployment_name="healthcheck-storage-test",
delay=timedelta(hours=6),
most_recent_n=3,
)
)
print(f"Rescheduled {len(rescheduled_flow_runs)} flow runs")
assert all(
run.state.is_scheduled() for run in rescheduled_flow_runs
)
assert all(
run.expected_start_time > datetime.now(timezone.utc)
for run in rescheduled_flow_runs
)
Get the last N
completed flow runs from your workspace
To get the last N
completed flow runs from your workspace, use read_flow_runs
and prefect.client.schemas
.
This example gets the last three completed flow runs from your workspace:
import asyncio
from typing import Optional
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.sorting import FlowRunSort
async def get_most_recent_flow_runs(
n: int = 3,
states: Optional[list[str]] = None
) -> list[FlowRun]:
if not states:
states = ["COMPLETED"]
async with get_client() as client:
return await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state={'type': {'any_': states}}
),
sort=FlowRunSort.END_TIME_DESC,
limit=n,
)
if __name__ == "__main__":
last_3_flow_runs: list[FlowRun] = asyncio.run(
get_most_recent_flow_runs()
)
print(last_3_flow_runs)
assert all(
run.state.is_completed() for run in last_3_flow_runs
)
assert (
end_times := [run.end_time for run in last_3_flow_runs]
) == sorted(end_times, reverse=True)
Instead of the last three from the whole workspace, you can also use the DeploymentFilter
to get the last three completed flow runs of a specific deployment.
Transition all running flows to cancelled through the Client
Use get_client
to set multiple runs to a Cancelled
state.
The code below cancels all flow runs that are in Pending
, Running
, Scheduled
, or Late
states when the script is run.
import anyio
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
from prefect.client.schemas.objects import StateType
async def list_flow_runs_with_states(states: list[str]):
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=states)
)
)
)
return flow_runs
async def cancel_flow_runs(flow_runs):
async with get_client() as client:
for idx, flow_run in enumerate(flow_runs):
print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
state_updates = {}
state_updates.setdefault("name", "Cancelled")
state_updates.setdefault("type", StateType.CANCELLED)
state = flow_run.state.copy(update=state_updates)
await client.set_flow_run_state(flow_run.id, state, force=True)
async def bulk_cancel_flow_runs():
states = ["Pending", "Running", "Scheduled", "Late"]
flow_runs = await list_flow_runs_with_states(states)
while len(flow_runs) > 0:
print(f"Cancelling {len(flow_runs)} flow runs\n")
await cancel_flow_runs(flow_runs)
flow_runs = await list_flow_runs_with_states(states)
print("Done!")
if __name__ == "__main__":
anyio.run(bulk_cancel_flow_runs)