This is an experimental feature and may rapidly change without a deprecation cycle.

Dask offers various diagnostics and Fine Performance Metrics about tasks, grouping them by their prefix (commonly, the name of the function being called). This can be suboptimal:

  • your client code may be sophisticated enough that correlating lines on the client and tasks being run on the cluster may prove difficult;

  • the same function may be applied to different parts of your workflow, with a different performance profile;

  • you may be in a multitenancy setup, where part of the load on the cluster was not caused by your client code.

In these situations, it may be useful to attach meaningful tags to your workflow, or segments of it. To do so, you should use the span() context manager inside the client code.

For example:

import dask.config
import dask.array as da
from distributed import Client, span

# Read important note below
dask.config.set({"": False})
client = Client()

with span("Alice's workflow"):
    with span("data load"):
        a = da.read_zarr(...)
    with span("ML preprocessing"):
        a = preprocess(a)
    with span("Model training"):
        model = train(a)
    model = model.compute()

Note how the span() context manager can be nested. The example will create the following spans on the scheduler:

  • ("Alice's workflow", )

  • ("Alice's workflow", "data load")

  • ("Alice's workflow", "ML preprocessing")

  • ("Alice's workflow", "Model training")

Each of the spans will be mapped to the tasks matching the segment of the graph that was defined inside its context manager. The parent span will be mapped to all tasks of its children.

Tags are arbitrary and nothing stops you from parameterizing them; for example

>>> with span(f"{user}'s workflow"):
...     ...

Which may give you

  • ("Alice's workflow", "data load")

  • ("Bob's workflow", "data load")

  • etc.

This is useful for example if you want to observe either all the workload submitted by Alice, while hiding Bob’s activity, or alternatively all the data loading activity, regardless of who submitted it.

The possibilities are more or less endless - instead or in addition to a username at the top, you could store information on what dataset you’re processing, etc.

The default span

If you don’t use the span() context manager, your tasks will be automatically attributed to the ("default", ) span.

Viewing the spans

You can filter by span tags in the Fine Performance Metrics dashboard widget to filter your workload:

Span selection in the Fine Performance Metrics dashboard

Additionally, spans can be queried using scheduler extensions or run_on_scheduler(); see Dask Developer API.

User API


Dataframes have a minimum granularity of a single call to compute() or persist() and can’t break it down further into groups of operations - if the example above used dataframes, everything would have been uniformly tagged as “Alice’s Workflow”, as it is the span that’s active during compute().

In other collections, such as arrays and delayed objects, spans that don’t wrap around a call to compute() or persist() can get lost during the optimization phase. To prevent this issue, you must set

>>> dask.config.set({"": False})

Or in dask.yaml:

    active: false

A possible workaround, that also works for dataframes, can be to perform intermediate calls to persist(). Note however that this can significantly impact optimizations and reduce overall performance.

with span("Alice's workflow"):
    with span("data load"):
        a = dd.read_parquet(...).persist()
    with span("ML preprocessing"):
        a = preprocess(a).persist()
        del a  # Release distributed memory for a as soon as possible
    with span("Model training"):
        model = train(b).persist()
        del b  # Release distributed memory for b as soon as possible
        model = model.compute()
distributed.span(*tags: str)[str][source]

Tag group of tasks to be part of a certain group, called a span.

This context manager can be nested, thus creating sub-spans. If you close and re-open a span context manager with the same tag, you’ll end up with two separate spans.

Every cluster defines a global “default” span when no span has been defined by the client; the default span is automatically closed and reopened when all tasks associated to it have been completed; in other words the cluster is idle save for tasks that are explicitly annotated by a span. Note that, in some edge cases, you may end up with overlapping default spans, e.g. if a worker crashes and all unique tasks that were in memory on it need to be recomputed.

You may capture the ID of a span on the client to match it with the Span objects the scheduler:

>>> client = Client()
>>> with span("my workflow") as span_id:
...     client.submit(lambda: "Hello world!").result()
>>> client.cluster.scheduler.extensions["spans"].spans[span_id]
Span<name=('my workflow',), id=5dc9b908-116b-49a5-b0d7-5a681f49a111>


You may retrieve the current span with dask.get_annotations().get("span"). You can do so in the client code as well as from inside a task.

Dask Developer API

Intended audience

This section is only of interest to developers maintaining Dask or writing scheduler extensions, e.g. to create an alternative dashboard or to store metrics long-term.

Spans can be accessed on the scheduler through Scheduler.extensions["spans"], which contains a singleton instance of SpansSchedulerExtension. In turn, the extension contains a mapping of all Span objects, plus a variety of convenience methods to access and aggregate them.

Note how Span objects offer a variety of methods that the dashboard currently doesn’t use - such as start/stop times, tasks counts, and size of the output.

class distributed.spans.Span(name: tuple[str, ...], id_: str, parent: distributed.spans.Span | None, total_nthreads_history: list[tuple[float, int]])[source]
property active_cpu_seconds: float

Return number of CPU seconds that were made available on the cluster while this Span was running; in other words (Span.stop - Span.enqueued) * Scheduler.total_nthreads.

This accounts for workers joining and leaving the cluster while this Span was active. If this Span is the output of merge(), do not count gaps between input spans.

See also

add_metadata(metadata: distributed.spans.SpanMetadata) None[source]

Add metadata to the span, e.g. code snippets

property all_durations: dict[str, float]

Cumulative duration of all completed actions in this span tree, by action

See also

property annotation: dict[str, tuple[str, ...]] | None

Rebuild the dask graph annotation which contains the full id history

Note that this may not match the original annotation in case of TaskGroup collision.

children: list[Span]

Direct children of this span, sorted by creation time

property code: list[tuple[SourceCode, ...]]

Code snippets, sent by the client on compute(), persist(), and submit().

Only populated if distributed.diagnostics.computations.nframes is non-zero.

property cumulative_worker_metrics: dict[tuple[, ...], float]

Replica of Worker.digests_total and Scheduler.cumulative_worker_metrics, but only for the metrics that can be attributed to the current span tree. The span id has been removed from the key.

At the moment of writing, all keys are ("execute", <task prefix>, <activity>, <unit>) or ("p2p", <where>, <activity>, <unit>) but more may be added in the future with a different format; please test e.g. for k[0] == "execute".

property done: bool

Return True if all tasks in this span tree are completed; False otherwise.

See also



This property may transition from True to False, e.g. when a new sub-span is added or when a worker that contained the only replica of a task in memory crashes and the task need to be recomputed.

property duration: float

The total amount of time spent on all tasks in this span tree

See also

enqueued: float

Time when the span first appeared on the scheduler. The same property on parent spans is always less than or equal to this.

See also

groups: set[TaskGroup]


TaskGroups are forgotten by the Scheduler when the last task is forgotten, but remain referenced here indefinitely. If a user calls compute() twice on the same collection, you’ll have more than one group with the same in this set! For the same reason, while the same TaskGroup object is guaranteed to be attached to exactly one Span, you may have different TaskGroups with the same key attached to different Spans.

id: str

Unique ID, generated by span() and taken from TaskState.annotations["span"]["id"][-1]. Matches and distributed.worker_state_machine.TaskState.span_id.

static merge(*items: distributed.spans.Span) distributed.spans.Span[source]

Merge multiple spans into a synthetic one. The input spans must not be related with each other.

name: tuple[str, ...]

(<tag>, <tag>, …) Matches TaskState.annotations["span"]["name"], both on the scheduler and the worker.

property nbytes_total: int

The total number of bytes that this span tree has produced

See also

property nthreads_intervals: list[tuple[float, float, int]]
List of tuples:
  • begin timestamp
  • end timestamp
  • Scheduler.total_nthreads during this interval
When the Span is the output of merge(), the intervals may not be

See also

property start: float

Earliest time when a task belonging to this span tree started computing; 0 if no task has finished computing yet.

See also



This is not updated until at least one task has finished computing. It could move backwards as tasks complete.

property states: dict[TaskStateState, int]

The number of tasks currently in each state in this span tree; e.g. {"memory": 10, "processing": 3, "released": 4, ...}.

See also

property stop: float

When this span tree finished computing, or current timestamp if it didn’t finish yet.

See also



This differs from TaskGroup.stop when there aren’t unfinished tasks; is also will never be zero.

traverse_groups() Iterator[TaskGroup][source]

All TaskGroups belonging to this branch of span tree


Top-down recursion of all spans belonging to this branch off span tree, including self

class distributed.spans.SpansSchedulerExtension(scheduler: Scheduler)[source]

Scheduler extension for spans support

find_by_tags(*tags: str)[distributed.spans.Span][source]

Yield all spans that contain any of the given tags. When a tag is shared both by a span and its (grand)children, only return the parent.

heartbeat(ws: scheduler_module.WorkerState, data: dict[tuple[Hashable, ...], float]) None[source]

Triggered by SpansWorkerExtension.heartbeat().

Populate Span.cumulative_worker_metrics() with data from the worker.

merge_all() distributed.spans.Span[source]

Return a synthetic Span which is the sum of all spans

merge_by_tags(*tags: str) distributed.spans.Span[source]

Return a synthetic Span which is the sum of all spans containing the given tags

observe_tasks(tss: Iterable[scheduler_module.TaskState], code: tuple[SourceCode, ...], span_metadata: SpanMetadata) dict[Key, dict][source]

Acknowledge the existence of runnable tasks on the scheduler. These may either be new tasks, tasks that were previously unrunnable, or tasks that were already fed into this method already.

Attach newly observed tasks to either the desired span or to (“default”, ). Update TaskGroup.span_id and wipe TaskState.annotations[“span”].

Updated ‘span’ annotations: {key: {“name”: (…, …), “ids”: (…, …)}}
root_spans: list[Span]

Only the spans that don’t have any parents, sorted by creation time. This is a convenience helper structure to speed up searches.

spans: dict[str, Span]

All Span objects by id

spans_search_by_name: defaultdict[tuple[str, ...], list[Span]]

All spans, keyed by their full name and sorted by creation time. This is a convenience helper structure to speed up searches.

spans_search_by_tag: defaultdict[str, list[Span]]

All spans, keyed by the individual tags that make up their name and sorted by creation time. This is a convenience helper structure to speed up searches.

class distributed.spans.SpansWorkerExtension(worker: Worker)[source]

Worker extension for spans support

collect_digests() None[source]

Make a local copy of Worker.digests_total_since_heartbeat. We can’t just parse it directly in heartbeat() as the event loop may be yielded between its call and self.worker.digests_total_since_heartbeat.clear(), causing the scheduler to become misaligned with the workers.

heartbeat() dict[tuple[, ...], float][source]

Apportion the metrics that do have a span to the Spans on the scheduler

{(context, span_id, prefix, activity, unit): value}}

See also