Spans
Contents
Spans¶
Note
This is an experimental feature and may rapidly change without a deprecation cycle.
Dask offers various diagnostics and Fine Performance Metrics about tasks, grouping them by their prefix (commonly, the name of the function being called). This can be suboptimal:
your client code may be sophisticated enough that correlating lines on the client and tasks being run on the cluster may prove difficult;
the same function may be applied to different parts of your workflow, with a different performance profile;
you may be in a multitenancy setup, where part of the load on the cluster was not caused by your client code.
In these situations, it may be useful to attach meaningful tags to your workflow, or
segments of it.
To do so, you should use the span()
context manager inside the client code.
For example:
import dask.config
import dask.dataframe as dd
from distributed import Client, span
dask.config.set({"optimization.fuse.active": False})
client = Client()
with span("Alice's workflow"):
with span("data load"):
df = dd.read_parquet(...)
with span("ML preprocessing"):
df = preprocess(df)
with span("Model training"):
model = train(df)
model = model.compute()
Note how the span()
context manager can be nested.
The example will create the following spans on the scheduler:
("Alice's workflow", )
("Alice's workflow", "data load")
("Alice's workflow", "ML preprocessing")
("Alice's workflow", "Model training")
Each of the spans will be mapped to the tasks matching the segment of the graph that was defined inside its context manager. The parent span will be mapped to all tasks of its children.
Tags are arbitrary and nothing stops you from parameterizing them; for example
>>> with span(f"{user}'s workflow"):
... ...
Which may give you
("Alice's workflow", "data load")
("Bob's workflow", "data load")
etc.
This is useful for example if you want to observe either all the workload submitted by Alice, while hiding Bob’s activity, or alternatively all the data loading activity, regardless of who submitted it.
The possibilities are more or less endless - instead or in addition to a username at the top, you could store information on what dataset you’re processing, etc.
The default span¶
If you don’t use the span()
context manager, your tasks will be automatically
attributed to the ("default", )
span.
Viewing the spans¶
You can filter by span tags in the Fine Performance Metrics dashboard widget to filter your workload:

Additionally, spans can be queried using scheduler extensions or
run_on_scheduler()
; see Dask Developer API.
User API¶
Warning
Spans are based on annotations, and just like annotations they can be lost during optimization. To prevent this issue, you must set
>>> dask.config.set({"optimization.fuse.active": False})
Or in dask.yaml:
optimization:
fuse:
active: false
- distributed.span(*tags: str) collections.abc.Iterator[str] [source]¶
Tag group of tasks to be part of a certain group, called a span.
This context manager can be nested, thus creating sub-spans. If you close and re-open a span context manager with the same tag, you’ll end up with two separate spans.
Every cluster defines a global “default” span when no span has been defined by the client; the default span is automatically closed and reopened when all tasks associated to it have been completed; in other words the cluster is idle save for tasks that are explicitly annotated by a span. Note that, in some edge cases, you may end up with overlapping default spans, e.g. if a worker crashes and all unique tasks that were in memory on it need to be recomputed.
You may capture the ID of a span on the client to match it with the
Span
objects the scheduler:>>> client = Client() >>> with span("my workflow") as span_id: ... client.submit(lambda: "Hello world!").result() >>> client.cluster.scheduler.extensions["spans"].spans[span_id] Span<name=('my workflow',), id=5dc9b908-116b-49a5-b0d7-5a681f49a111>
Notes
You may retrieve the current span with
dask.get_annotations().get("span")
. You can do so in the client code as well as from inside a task.
Dask Developer API¶
Intended audience
This section is only of interest to developers maintaining Dask or writing scheduler extensions, e.g. to create an alternative dashboard or to store metrics long-term.
Spans can be accessed on the scheduler through Scheduler.extensions["spans"]
, which
contains a singleton instance of SpansSchedulerExtension
. In turn, the
extension contains a mapping of all Span
objects, plus a variety of convenience
methods to access and aggregate them.
Note how Span
objects offer a variety of methods that the dashboard currently
doesn’t use - such as start/stop times, tasks counts, and size of the output.
- class distributed.spans.Span(name: tuple[str, ...], id_: str, parent: distributed.spans.Span | None, total_nthreads_history: list[tuple[float, int]])[source]¶
- property active_cpu_seconds: float¶
Return number of CPU seconds that were made available on the cluster while this Span was running; in other words
(Span.stop - Span.enqueued) * Scheduler.total_nthreads
.This accounts for workers joining and leaving the cluster while this Span was active. If this Span is the output of
merge()
, do not count gaps between input spans.See also
enqueued
stop
nthreads_intervals
distributed.scheduler.SchedulerState.total_nthreads
- property all_durations: dict[str, float]¶
Cumulative duration of all completed actions in this span tree, by action
See also
duration
distributed.scheduler.TaskGroup.all_durations
- property annotation: dict[str, tuple[str, ...]] | None¶
Rebuild the dask graph annotation which contains the full id history
Note that this may not match the original annotation in case of TaskGroup collision.
- property code: list[tuple[SourceCode, ...]]¶
Code snippets, sent by the client on compute(), persist(), and submit().
Only populated if
distributed.diagnostics.computations.nframes
is non-zero.
- property cumulative_worker_metrics: dict[tuple[collections.abc.Hashable, ...], float]¶
Replica of
Worker.digests_total
andScheduler.cumulative_worker_metrics
, but only for the metrics that can be attributed to the current span tree. The span id has been removed from the key.At the moment of writing, all keys are
("execute", <task prefix>, <activity>, <unit>)
but more may be added in the future with a different format; please test fork[0] == "execute"
.
- property done: bool¶
Return True if all tasks in this span tree are completed; False otherwise.
See also
distributed.scheduler.TaskGroup.done
Notes
This property may transition from True to False, e.g. when a new sub-span is added or when a worker that contained the only replica of a task in memory crashes and the task need to be recomputed.
- property duration: float¶
The total amount of time spent on all tasks in this span tree
See also
all_durations
distributed.scheduler.TaskGroup.duration
- enqueued: float¶
Time when the span first appeared on the scheduler. The same property on parent spans is always less than or equal to this.
- groups: set[TaskGroup]¶
Notes
TaskGroups are forgotten by the Scheduler when the last task is forgotten, but remain referenced here indefinitely. If a user calls compute() twice on the same collection, you’ll have more than one group with the same tg.name in this set! For the same reason, while the same TaskGroup object is guaranteed to be attached to exactly one Span, you may have different TaskGroups with the same key attached to different Spans.
- id: str¶
Unique ID, generated by
span()
and taken fromTaskState.annotations["span"]["id"][-1]
. Matchesdistributed.scheduler.TaskState.group.span_id
anddistributed.worker_state_machine.TaskState.span_id
.
- static merge(*items: distributed.spans.Span) distributed.spans.Span [source]¶
Merge multiple spans into a synthetic one. The input spans must not be related with each other.
- name: tuple[str, ...]¶
(<tag>, <tag>, …) Matches
TaskState.annotations["span"]["name"]
, both on the scheduler and the worker.
- property nbytes_total: int¶
The total number of bytes that this span tree has produced
See also
distributed.scheduler.TaskGroup.nbytes_total
- property nthreads_intervals: list[tuple[float, float, int]]¶
- Returns
- List of tuples:
- begin timestamp
- end timestamp
- Scheduler.total_nthreads during this interval
- When the Span is the output of
merge()
, the intervals may not be - contiguous.
See also
enqueued
stop
active_cpu_seconds
distributed.scheduler.SchedulerState.total_nthreads
- property start: float¶
Earliest time when a task belonging to this span tree started computing; 0 if no task has finished computing yet.
Notes
This is not updated until at least one task has finished computing. It could move backwards as tasks complete.
- property states: dict[TaskStateState, int]¶
The number of tasks currently in each state in this span tree; e.g.
{"memory": 10, "processing": 3, "released": 4, ...}
.See also
distributed.scheduler.TaskGroup.states
- property stop: float¶
When this span tree finished computing, or current timestamp if it didn’t finish yet.
Notes
This differs from
TaskGroup.stop
when there aren’t unfinished tasks; is also will never be zero.
- traverse_groups() Iterator[TaskGroup] [source]¶
All TaskGroups belonging to this branch of span tree
- traverse_spans() collections.abc.Iterator[distributed.spans.Span] [source]¶
Top-down recursion of all spans belonging to this branch off span tree, including self
- class distributed.spans.SpansSchedulerExtension(scheduler: Scheduler)[source]¶
Scheduler extension for spans support
- find_by_tags(*tags: str) collections.abc.Iterator[distributed.spans.Span] [source]¶
Yield all spans that contain any of the given tags. When a tag is shared both by a span and its (grand)children, only return the parent.
- heartbeat(ws: scheduler_module.WorkerState, data: dict[tuple[Hashable, ...], float]) None [source]¶
Triggered by
SpansWorkerExtension.heartbeat()
.Populate
Span.cumulative_worker_metrics()
with data from the worker.
- merge_all() distributed.spans.Span [source]¶
Return a synthetic Span which is the sum of all spans
- merge_by_tags(*tags: str) distributed.spans.Span [source]¶
Return a synthetic Span which is the sum of all spans containing the given tags
- observe_tasks(tss: Iterable[scheduler_module.TaskState], code: tuple[SourceCode, ...]) dict[str, dict] [source]¶
Acknowledge the existence of runnable tasks on the scheduler. These may either be new tasks, tasks that were previously unrunnable, or tasks that were already fed into this method already.
Attach newly observed tasks to either the desired span or to (“default”, ). Update TaskGroup.span_id and wipe TaskState.annotations[“span”].
- Returns
- Updated ‘span’ annotations: {key: {“name”: (…, …), “ids”: (…, …)}}
- root_spans: list[Span]¶
Only the spans that don’t have any parents, sorted by creation time. This is a convenience helper structure to speed up searches.
- spans_search_by_name: defaultdict[tuple[str, ...], list[Span]]¶
All spans, keyed by their full name and sorted by creation time. This is a convenience helper structure to speed up searches.
- class distributed.spans.SpansWorkerExtension(worker: Worker)[source]¶
Worker extension for spans support
- collect_digests() None [source]¶
Make a local copy of Worker.digests_total_since_heartbeat. We can’t just parse it directly in heartbeat() as the event loop may be yielded between its call and self.worker.digests_total_since_heartbeat.clear(), causing the scheduler to become misaligned with the workers.
- heartbeat() dict[tuple[collections.abc.Hashable, ...], float] [source]¶
Apportion the metrics that do have a span to the Spans on the scheduler
- Returns
{(context, span_id, prefix, activity, unit): value}}
See also
SpansSchedulerExtension.heartbeat
Span.cumulative_worker_metrics
distributed.worker.Worker.get_metrics