Configure task caching
Learn how to use caching to gain efficiency and pipeline idempotency.
Caching refers to the ability of a task run to enter a Completed
state and return a predetermined
value without actually running the code that defines the task.
Caching allows you to efficiently reuse results of tasks that may be expensive to compute
and ensure that your pipelines are idempotent when retrying them due to unexpected failure.
By default Prefect’s caching logic is based on the following attributes of a task invocation:
- the inputs provided to the task
- the code definition of the task
- the prevailing flow run ID, or if executed autonomously, the prevailing task run ID
These values are hashed to compute the task’s cache key. This implies that, by default, calling the same task with the same inputs more than once within a flow will result in cached behavior for all calls after the first. This behavior can be configured - see customizing the cache below.
Caching requires result persistence
Caching requires result persistence, which is off by default.
To turn on result persistence for all of your tasks use the PREFECT_RESULTS_PERSIST_BY_DEFAULT
setting:
See managing results for more details on managing your result configuration, and settings for more details on managing Prefect settings.
Cache keys
To determine whether a task run should retrieve a cached state, Prefect uses the concept of a “cache key”.
A cache key is a computed string value that determines where the task’s return value will be persisted within
its configured result storage.
When a task run begins, Prefect first computes its cache key and uses this key to lookup a record in the task’s result
storage.
If an unexpired record is found, this result is returned and the task does not run, but instead, enters a
Cached
state with the corresponding result value.
Cache keys can be shared by the same task across different flows, and even among different tasks, so long as they all share a common result storage location.
By default Prefect stores results locally in ~/.prefect/storage/
.
The filenames in this directory will correspond exactly to computed cache keys from your task runs.
Relationship with result persistence
Task caching and result persistence are intimately related. Because task caching relies on loading a known result, task caching will only work when your task can persist its output to a fixed and known location.
Therefore any configuration which explicitly avoids result persistence will result in your task never
using a cache, for example setting persist_result=False
.
Cache policies
Cache key computation can be configured through the use of cache policies. A cache policy is a recipe for computing cache keys for a given task.
Prefect comes prepackaged with a few common cache policies:
DEFAULT
: this cache policy uses the task’s inputs, its code definition, as well as the prevailing flow run ID to compute the task’s cache key.INPUTS
: this cache policy uses only the task’s inputs to compute the cache key.TASK_SOURCE
: this cache policy uses only the task’s code definition to compute the cache key.FLOW_PARAMETERS
: this cache policy uses only the parameter values provided to the parent flow run to compute the cache key.NONE
: this cache policy always returnsNone
and therefore avoids caching and result persistence altogether.
These policies can be set using the cache_policy
keyword on the task decorator:
No matter how many flows call it, this task will run once and only once until its underlying code is altered:
Customizing the cache
Prefect allows you to configure task caching behavior in numerous ways.
Cache expiration
All cache keys can optionally be given an expiration through the cache_expiration
keyword on
the task decorator.
This keyword accepts a datetime.timedelta
specifying a duration for which the cached value should be
considered valid.
Providing an expiration value results in Prefect persisting an expiration timestamp alongside the result record for the task. This expiration is then applied to all other tasks that may share this cache key.
Cache policies
Cache policies can be composed and altered using basic Python syntax to form more complex policies.
For example, all task policies except for NONE
can be added together to form new policies that combine
the individual policies’ logic into a larger cache key computation.
Combining policies in this way results in caches that are easier to invalidate.
For example:
This task will rerun anytime you provide new values for x
, or anytime you change the underlying code.
The INPUTS
policy is a special policy that allows you to subtract string values to ignore
certain task inputs:
Cache key functions
You can configure custom cache policy logic through the use of cache key functions. A cache key function is a function that accepts two positional arguments:
- The first argument corresponds to the
TaskRunContext
, which stores task run metadata. For example, this object has attributestask_run_id
,flow_run_id
, andtask
, all of which can be used in your custom logic. - The second argument corresponds to a dictionary of input values to the task. For example,
if your task has the signature
fn(x, y, z)
then the dictionary will have keys “x”, “y”, and “z” with corresponding values that can be used to compute your cache key.
This function can then be specified using the cache_key_fn
argument on
the task decorator.
For example:
Cache storage
By default, cache records are collocated with task results and files containing task results will include metadata used for caching.
Configuring a cache policy with a key_storage
argument allows cache records to be stored separately from task results.
When cache key storage is configured, persisted task results will only include the return value of your task and cache records can be deleted or modified without effecting your task results.
You can configure where cache records are stored by using the .configure
method with a key_storage
argument on a cache policy.
The key_storage
argument accepts either a path to a local directory or a storage block.
For example:
This task will store cache records in the specified directory.
To store cache records in a remote object store such as S3, pass a storage block instead:
Cache isolation
Cache isolation controls how concurrent task runs interact with cache records. Prefect supports two isolation levels: READ_COMMITTED
and SERIALIZABLE
.
By default, cache records operate with a READ_COMMITTED
isolation level. This guarantees that reading a cache record will see the latest committed cache value,
but allows multiple executions of the same task to occur simultaneously.
Consider the following example:
When running this script, both tasks will execute in parallel and perform work despite both tasks using the same cache key.
This is evidenced by seeing both my_task_version_1 running
and my_task_version_2 running
in the output:
For stricter isolation, you can use the SERIALIZABLE
isolation level. This ensures that only one execution of a task occurs at a time for a given cache
record via a locking mechanism.
To configure the isolation level, use the .configure
method with an isolation_level
argument on a cache policy. When using SERIALIZABLE
, you must
also provide a lock_manager
that implements locking logic for your system.
Here’s an updated version of the previous example that uses SERIALIZABLE
isolation:
In the updated script, only one of the tasks will run and the other will use the cached value.
This is evidenced by seeing only one of my_task_version_1 running
or my_task_version_2 running
in the output:
Locking in a distributed setting
To manage locks in a distributed setting, you will need to use a storage system for locks that is accessible by all of your execution infrastructure.
We recommend using the RedisLockManager
provided by prefect-redis
in conjunction with a shared Redis instance:
Multi-task caching
There are many situations in which multiple tasks need to always run together or not at all. This can be achieved in Prefect by configuring these tasks to always write to their caches within a single transaction.
When this flow is run with default parameter values it will fail on the process_data
task.
The load_data
task will succeed. However, because caches are only written to when a transaction
is committed, the load_data
task will not write a result to its cache key location until
the process_data
task succeeds as well.
This ensures that anytime you need to rerun this flow both load_data
and process_data
are executed
together.
After a successful execution both tasks will be cached until the cache key is updated.
Read more about transactions.
Caching example
In this example, until the cache_expiration
time is reached, as long as the input to hello_task()
remains
the same when it is called, the cached return value will be returned. The task is not rerun.
However, if the input argument value changes, hello_task()
runs using the new input.
A more realistic example might include the flow run id in the cache key, so only repeated calls in the same flow run are cached:
Force ignore the cache
A cache “refresh” instructs Prefect to ignore the data associated with a task’s cache key and rerun no matter what.
The refresh_cache
option enables this behavior for a specific task:
When this task runs, it always updates the cache key instead of using the cached value. This is particularly useful when you have a flow that is responsible for updating the cache.
To refresh the cache for all tasks, use the PREFECT_TASKS_REFRESH_CACHE
setting.
Setting PREFECT_TASKS_REFRESH_CACHE=true
changes the default behavior of all tasks to refresh.
This is particularly useful to rerun a flow without cached results.
See settings for more details on managing Prefect settings.
If you have tasks that should not refresh when this setting is enabled, you may explicitly set refresh_cache
to False
. These tasks will never refresh the cache. If a cache key exists it will be read, not updated.
If a cache key does not exist yet, these tasks can still write to the cache.
Was this page helpful?