Scheduling State

Overview

The life of a computation with Dask can be described in the following stages:

  1. The user authors a graph using some library, perhaps dask.delayed or dask.dataframe or the submit/map functions on the client. They submit these tasks to the scheduler.

  2. The scheduler assimilates these tasks into its graph of all tasks to track, and as their dependencies become available it asks workers to run each of these tasks in turn.

  3. The worker receives information about how to run the task, communicates with its peer workers to collect data dependencies, and then runs the relevant function on the appropriate data. It reports back to the scheduler that it has finished, keeping the result stored in the worker where it was computed.

  4. The scheduler reports back to the user that the task has completed. If the user desires, it then fetches the data from the worker through the scheduler.

Most relevant logic is in tracking tasks as they evolve from newly submitted, to waiting for dependencies, to actively running on some worker, to finished in memory, to garbage collected. Tracking this process, and tracking all effects that this task has on other tasks that might depend on it, is the majority of the complexity of the dynamic task scheduler. This section describes the system used to perform this tracking.

For more abstract information about the policies used by the scheduler, see Scheduling Policies.

The scheduler keeps internal state about several kinds of entities:

  • Individual tasks known to the scheduler

  • Workers connected to the scheduler

  • Clients connected to the scheduler

Note

Everything listed in this page is an internal detail of how Dask operates. It may change between versions and you should probably avoid relying on it in user code (including on any APIs explained here).

Task State

Internally, the scheduler moves tasks between a fixed set of states, notably released, waiting, no-worker, processing, memory, error.

Tasks flow along the following states with the following allowed transitions:

Dask scheduler task states
  • Released: Known but not actively computing or in memory

  • Waiting: On track to be computed, waiting on dependencies to arrive in memory

  • No-worker: Ready to be computed, but no appropriate worker exists (for example because of resource restrictions, or because no worker is connected at all).

  • Processing: All dependencies are available and the task is assigned to a worker for compute (the scheduler doesn’t know whether it’s in a worker queue or actively being computed).

  • Memory: In memory on one or more workers

  • Erred: Task computation, or one of its dependencies, has encountered an error

  • Forgotten (not actually a state): Task is no longer needed by any client or dependent task

In addition to the literal state, though, other information needs to be kept and updated about each task. Individual task state is stored in an object named TaskState and consists of the following information:

class distributed.scheduler.TaskState(key: str, run_spec: object)[source]

A simple object holding information about a task.

Not to be confused with distributed.worker_state_machine.TaskState, which holds similar information on the Worker side.

actor: bool

Whether or not this task is an Actor

add_dependency(other: distributed.scheduler.TaskState) None[source]

Add another task as a dependency of this task

annotations: dict[str, Any]

Task annotations

dependencies: set[TaskState]

The set of tasks this task depends on for proper execution. Only tasks still alive are listed in this set. If, for whatever reason, this task also depends on a forgotten task, the has_lost_dependencies flag is set.

A task can only be executed once all its dependencies have already been successfully executed and have their result stored on at least one worker. This is tracked by progressively draining the waiting_on set.

dependents: set[TaskState]

The set of tasks which depend on this task. Only tasks still alive are listed in this set. This is the reverse mapping of dependencies.

erred_on: set[str]

Worker addresses on which errors appeared, causing this task to be in an error state.

exception: object | None

If this task failed executing, the exception object is stored here.

exception_blame: TaskState | None

If this task or one of its dependencies failed executing, the failed task is stored here (possibly itself).

exception_text: str

string representation of exception

group: TaskGroup

The group of tasks to which this one belongs

group_key: str

Same as of group.name

has_lost_dependencies: bool

Whether any of the dependencies of this task has been forgotten. For memory consumption reasons, forgotten tasks are not kept in memory even though they may have dependent tasks. When a task is forgotten, therefore, each of its dependents has their has_lost_dependencies attribute set to True.

If has_lost_dependencies is true, this task cannot go into the “processing” state anymore.

host_restrictions: set[str]

A set of hostnames where this task can be run (or None if empty). Usually this is empty unless the task has been specifically restricted to only run on certain hosts. A hostname may correspond to one or several connected workers.

key: str

The key is the unique identifier of a task, generally formed from the name of the function, followed by a hash of the function and arguments, like 'inc-ab31c010444977004d656610d2d421ec'.

loose_restrictions: bool
False

Each of host_restrictions, worker_restrictions and resource_restrictions is a hard constraint: if no worker is available satisfying those restrictions, the task cannot go into the “processing” state and will instead go into the “no-worker” state.

True

The above restrictions are mere preferences: if no worker is available satisfying those restrictions, the task can still go into the “processing” state and be sent for execution to another connected worker.

metadata: dict[str, Any]

Metadata related to task

nbytes: int

The number of bytes, as determined by sizeof, of the result of a finished task. This number is used for diagnostics and to help prioritize work. Set to -1 for unfinished tasks.

prefix: TaskPrefix

The broad class of tasks to which this task belongs like “inc” or “read_csv”

priority: tuple[int, ...]

The priority provides each task with a relative ranking which is used to break ties when many tasks are being considered for execution.

This ranking is generally a 2-item tuple. The first (and dominant) item corresponds to when it was submitted. Generally, earlier tasks take precedence. The second item is determined by the client, and is a way to prioritize tasks within a large graph that may be important, such as if they are on the critical path, or good to run in order to release many dependencies. This is explained further in Scheduling Policy.

processing_on: WorkerState | None

If this task is in the “processing” state, which worker is currently processing it. This attribute is kept in sync with WorkerState.processing.

resource_restrictions: dict[str, float]

Resources required by this task, such as {'gpu': 1} or {'memory': 1e9} These are user-defined names and are matched against the : contents of each WorkerState.resources dictionary.

retries: int

The number of times this task can automatically be retried in case of failure. If a task fails executing (the worker returns with an error), its retries attribute is checked. If it is equal to 0, the task is marked “erred”. If it is greater than 0, the retries attribute is decremented and execution is attempted again.

run_spec: object

A specification of how to run the task. The type and meaning of this value is opaque to the scheduler, as it is only interpreted by the worker to which the task is sent for executing.

As a special case, this attribute may also be None, in which case the task is “pure data” (such as, for example, a piece of data loaded in the scheduler using Client.scatter()). A “pure data” task cannot be computed again if its value is lost.

property state: str

This task’s current state. Valid states include released, waiting, no-worker, processing, memory, erred and forgotten. If it is forgotten, the task isn’t stored in the tasks dictionary anymore and will probably disappear soon from memory.

suspicious: int

The number of times this task has been involved in a worker death.

Some tasks may cause workers to die (such as calling os._exit(0)). When a worker dies, all of the tasks on that worker are reassigned to others. This combination of behaviors can cause a bad task to catastrophically destroy all workers on the cluster, one after another. Whenever a worker dies, we mark each task currently processing on that worker (as recorded by WorkerState.processing) as suspicious. If a task is involved in three deaths (or some other fixed constant) then we mark the task as erred.

traceback: object | None

If this task failed executing, the traceback object is stored here.

traceback_text: str

string representation of traceback

type: str

The type of the object as a string. Only present for tasks that have been computed.

waiters: set[TaskState]

The set of tasks which need this task to remain alive. This is always a subset of dependents. Each time one of the dependents has finished processing, it is removed from the waiters set.

Once both waiters and who_wants become empty, this task can be released (if it has a non-empty run_spec) or forgotten (otherwise) by the scheduler, and by any workers in who_has.

Note

Counter-intuitively, waiting_on and waiters are not reverse mappings of each other.

waiting_on: set[TaskState]

The set of tasks this task is waiting on before it can be executed. This is always a subset of dependencies. Each time one of the dependencies has finished processing, it is removed from the waiting_on set.

Once waiting_on becomes empty, this task can move from the “waiting” state to the “processing” state (unless one of the dependencies errored out, in which case this task is instead marked “erred”).

who_has: set[WorkerState]

The set of workers who have this task’s result in memory. It is non-empty iff the task is in the “memory” state. There can be more than one worker in this set if, for example, Client.scatter() or Client.replicate() was used.

This is the reverse mapping of WorkerState.has_what.

who_wants: set[ClientState]

The set of clients who want the result of this task to remain alive. This is the reverse mapping of ClientState.wants_what.

When a client submits a graph to the scheduler it also specifies which output tasks it desires, such that their results are not released from memory.

Once a task has finished executing (i.e. moves into the “memory” or “erred” state), the clients in who_wants are notified.

Once both waiters and who_wants become empty, this task can be released (if it has a non-empty run_spec) or forgotten (otherwise) by the scheduler, and by any workers in who_has.

worker_restrictions: set[str]

A set of complete worker addresses where this can be run (or None if empty). Usually this is empty unless the task has been specifically restricted to only run on certain workers. Note this is tracking worker addresses, not worker states, since the specific workers may not be connected at this time.

The scheduler keeps track of all the TaskState objects (those not in the “forgotten” state) using several containers:

tasks: {str: TaskState}

A dictionary mapping task keys (usually strings) to TaskState objects. Task keys are how information about tasks is communicated between the scheduler and clients, or the scheduler and workers; this dictionary is then used to find the corresponding TaskState object.

unrunnable: {TaskState}

A set of TaskState objects in the “no-worker” state. These tasks already have all their dependencies satisfied (their waiting_on set is empty), and are waiting for an appropriate worker to join the network before computing.

Worker State

Each worker’s current state is stored in a WorkerState object. This information is involved in deciding which worker to run a task on.

class distributed.scheduler.WorkerState(*, address: str, status: Status, pid: int, name: object, nthreads: int = 0, memory_limit: int, local_directory: str, nanny: str, server_id: str, services: dict[str, int] | None = None, versions: dict[str, Any] | None = None, extra: dict[str, Any] | None = None)[source]

A simple object holding information about a worker.

Not to be confused with distributed.worker_state_machine.WorkerState.

actors: set[distributed.scheduler.TaskState]

A set of all TaskStates on this worker that are actors. This only includes those actors whose state actually lives on this worker, not actors to which this worker has a reference.

address: str

This worker’s unique key. This can be its connected address (such as "tcp://127.0.0.1:8891") or an alias (such as "alice").

clean() distributed.scheduler.WorkerState[source]

Return a version of this object that is appropriate for serialization

executing: dict[distributed.scheduler.TaskState, float]

A dictionary of tasks that are currently being run on this worker. Each task state is asssociated with the duration in seconds which the task has been running.

extra: dict[str, Any]

Arbitrary additional metadata to be added to identity()

property has_what: collections.abc.Set[distributed.scheduler.TaskState]

An insertion-sorted set-like of tasks which currently reside on this worker. All the tasks here are in the “memory” state. This is the reverse mapping of TaskState.who_has.

This is a read-only public accessor. The data is implemented as a dict without values, because rebalance() relies on dicts being insertion-sorted.

last_seen: float

The last time we received a heartbeat from this worker, in local scheduler time.

long_running: set[distributed.scheduler.TaskState]

Running tasks that invoked distributed.secede()

memory_limit: int

Memory available to the worker, in bytes

nanny: str

Address of the associated Nanny, if present

nbytes: int

The total memory size, in bytes, used by the tasks this worker holds in memory (i.e. the tasks in this worker’s has_what).

nthreads: int

The number of CPU threads made available on this worker

processing: dict[distributed.scheduler.TaskState, float]

A dictionary of tasks that have been submitted to this worker. Each task state is associated with the expected cost in seconds of running that task, summing both the task’s expected computation time and the expected communication time of its result.

If a task is already executing on the worker and the excecution time is twice the learned average TaskGroup duration, this will be set to twice the current executing time. If the task is unknown, the default task duration is used instead of the TaskGroup average.

Multiple tasks may be submitted to a worker in advance and the worker will run them eventually, depending on its execution resources (but see Work Stealing).

All the tasks here are in the “processing” state. This attribute is kept in sync with TaskState.processing_on.

resources: dict[str, float]

The available resources on this worker, e.g. {"GPU": 2}. These are abstract quantities that constrain certain tasks from running at the same time on this worker.

status: distributed.core.Status

Read-only worker status, synced one way from the remote Worker object

used_resources: dict[str, float]

The sum of each resource used by all tasks allocated to this worker. The numbers in this dictionary can only be less or equal than those in this worker’s resources.

versions: dict[str, Any]

Output of distributed.versions.get_versions() on the worker

In addition to individual worker state, the scheduler maintains two containers to help with scheduling tasks:

Scheduler.saturated: {WorkerState}

A set of workers whose computing power (as measured by WorkerState.nthreads) is fully exploited by processing tasks, and whose current occupancy is a lot greater than the average.

Scheduler.idle: {WorkerState}

A set of workers whose computing power is not fully exploited. These workers are assumed to be able to start computing new tasks immediately.

These two sets are disjoint. Also, some workers may be neither “idle” nor “saturated”. “Idle” workers will be preferred when deciding a suitable worker to run a new task on. Conversely, “saturated” workers may see their workload lightened through Work Stealing.

Client State

Information about each individual client of the scheduler is kept in a ClientState object:

class distributed.scheduler.ClientState(client: str, *, versions: dict[str, Any] | None = None)[source]

A simple object holding information about a client.

client_key: str

A unique identifier for this client. This is generally an opaque string generated by the client itself.

last_seen: float

The last time we received a heartbeat from this client, in local scheduler time.

versions: dict[str, Any]

Output of distributed.versions.get_versions() on the client

wants_what: set[distributed.scheduler.TaskState]

A set of tasks this client wants to be kept in memory, so that it can download its result when desired. This is the reverse mapping of TaskState.who_wants. Tasks are typically removed from this set when the corresponding object in the client’s space (for example a Future or a Dask collection) gets garbage-collected.

Understanding a Task’s Flow

As seen above, there are numerous pieces of information pertaining to task and worker state, and some of them can be computed, updated or removed during a task’s transitions.

The table below shows which state variable a task is in, depending on the task’s state. Cells with a check mark () indicate the task key must be present in the given state variable; cells with an question mark (?) indicate the task key may be present in the given state variable.

State variable

Released

Waiting

No-worker

Processing

Memory

Erred

TaskState.dependencies

TaskState.dependents

TaskState.host_restrictions

?

?

?

?

?

?

TaskState.worker_restrictions

?

?

?

?

?

?

TaskState.resource_restrictions

?

?

?

?

?

?

TaskState.loose_restrictions

?

?

?

?

?

?

TaskState.waiting_on

TaskState.waiters

TaskState.processing_on

WorkerState.processing

TaskState.who_has

WorkerState.has_what

TaskState.nbytes (1)

?

?

?

?

?

TaskState.exception (2)

?

TaskState.traceback (2)

?

TaskState.exception_blame

TaskState.retries

?

?

?

?

?

?

TaskState.suspicious_tasks

?

?

?

?

?

?

Notes:

  1. TaskState.nbytes: this attribute can be known as long as a task has already been computed, even if it has been later released.

  2. TaskState.exception and TaskState.traceback should be looked up on the TaskState.exception_blame task.

The table below shows which worker state variables are updated on each task state transition.

Transition

Affected worker state

released → waiting

occupancy, idle, saturated

waiting → processing

occupancy, idle, saturated, used_resources

waiting → memory

idle, saturated, nbytes

processing → memory

occupancy, idle, saturated, used_resources, nbytes

processing → erred

occupancy, idle, saturated, used_resources

processing → released

occupancy, idle, saturated, used_resources

memory → released

nbytes

memory → forgotten

nbytes

Note

Another way of understanding this table is to observe that entering or exiting a specific task state updates a well-defined set of worker state variables. For example, entering and exiting the “memory” state updates WorkerState.nbytes.

Implementation

Every transition between states is a separate method in the scheduler. These task transition functions are prefixed with transition and then have the name of the start and finish task state like the following.

def transition_released_waiting(self, key):

def transition_processing_memory(self, key):

def transition_processing_erred(self, key):

These functions each have three effects.

  1. They perform the necessary transformations on the scheduler state (the 20 dicts/lists/sets) to move one key between states.

  2. They return a dictionary of recommended {key: state} transitions to enact directly afterwards on other keys. For example after we transition a key into memory we may find that many waiting keys are now ready to transition from waiting to a ready state.

  3. Optionally they include a set of validation checks that can be turned on for testing.

Rather than call these functions directly we call the central function transition:

def transition(self, key, final_state):
    """ Transition key to the suggested state """

This transition function finds the appropriate path from the current to the final state. It also serves as a central point for logging and diagnostics.

Often we want to enact several transitions at once or want to continually respond to new transitions recommended by initial transitions until we reach a steady state. For that we use the transitions function (note the plural s).

def transitions(self, recommendations):
    recommendations = recommendations.copy()
    while recommendations:
        key, finish = recommendations.popitem()
        new = self.transition(key, finish)
        recommendations.update(new)

This function runs transition, takes the recommendations and runs them as well, repeating until no further task-transitions are recommended.

Stimuli

Transitions occur from stimuli, which are state-changing messages to the scheduler from workers or clients. The scheduler responds to the following stimuli:

  • Workers
    • Task finished: A task has completed on a worker and is now in memory

    • Task erred: A task ran and erred on a worker

    • Task missing data: A task tried to run but was unable to find necessary data on other workers

    • Worker added: A new worker was added to the network

    • Worker removed: An existing worker left the network

  • Clients
    • Update graph: The client sends more tasks to the scheduler

    • Release keys: The client no longer desires the result of certain keys

Stimuli functions are prepended with the text stimulus, and take a variety of keyword arguments from the message as in the following examples:

def stimulus_task_finished(self, key=None, worker=None, nbytes=None,
                           type=None, compute_start=None, compute_stop=None,
                           transfer_start=None, transfer_stop=None):

def stimulus_task_erred(self, key=None, worker=None,
                        exception=None, traceback=None)

These functions change some non-essential administrative state and then call transition functions.

Note that there are several other non-state-changing messages that we receive from the workers and clients, such as messages requesting information about the current state of the scheduler. These are not considered stimuli.

API

class distributed.scheduler.Scheduler(loop=None, delete_interval='500ms', synchronize_worker_interval='60s', services=None, service_kwargs=None, allowed_failures=None, extensions=None, validate=None, scheduler_file=None, security=None, worker_ttl=None, idle_timeout=None, interface=None, host=None, port=0, protocol=None, dashboard_address=None, dashboard=None, http_prefix='/', preload=None, preload_argv=(), plugins=(), contact_address=None, transition_counter_max=False, **kwargs)[source]

Dynamic distributed task scheduler

The scheduler tracks the current state of workers, data, and computations. The scheduler listens for events and responds by controlling workers appropriately. It continuously tries to use the workers to execute an ever growing dask graph.

All events are handled quickly, in linear time with respect to their input (which is often of constant size) and generally within a millisecond. To accomplish this the scheduler tracks a lot of state. Every operation maintains the consistency of this state.

The scheduler communicates with the outside world through Comm objects. It maintains a consistent and valid view of the world even when listening to several clients at once.

A Scheduler is typically started either with the dask-scheduler executable:

$ dask-scheduler
Scheduler started at 127.0.0.1:8786

Or within a LocalCluster a Client starts up without connection information:

>>> c = Client()  
>>> c.cluster.scheduler  
Scheduler(...)

Users typically do not interact with the scheduler directly but rather with the client object Client.

The contact_address parameter allows to advertise a specific address to the workers for communication with the scheduler, which is different than the address the scheduler binds to. This is useful when the scheduler listens on a private address, which therefore cannot be used by the workers to contact it.

State

The scheduler contains the following state variables. Each variable is listed along with what it stores and a brief description.

  • tasks: {task key: TaskState}

    Tasks currently known to the scheduler

  • unrunnable: {TaskState}

    Tasks in the “no-worker” state

  • workers: {worker key: WorkerState}

    Workers currently connected to the scheduler

  • idle: {WorkerState}:

    Set of workers that are not fully utilized

  • saturated: {WorkerState}:

    Set of workers that are not over-utilized

  • host_info: {hostname: dict}:

    Information about each worker host

  • clients: {client key: ClientState}

    Clients currently connected to the scheduler

  • services: {str: port}:

    Other services running on this scheduler, like Bokeh

  • loop: IOLoop:

    The running Tornado IOLoop

  • client_comms: {client key: Comm}

    For each client, a Comm object used to receive task requests and report task status updates.

  • stream_comms: {worker key: Comm}

    For each worker, a Comm object from which we both accept stimuli and report results

  • task_duration: {key-prefix: time}

    Time we expect certain functions to take, e.g. {'sum': 0.25}

adaptive_target(target_duration=None)[source]

Desired number of workers based on the current workload

This looks at the current running tasks and memory use, and returns a number of desired workers. This is often used by adaptive scheduling.

Parameters
target_durationstr

A desired duration of time for computations to take. This affects how rapidly the scheduler will ask to scale.

async add_client(comm: distributed.comm.core.Comm, client: str, versions: dict[str, Any]) None[source]

Add client to network

We listen to all future messages from this Comm.

add_keys(worker=None, keys=(), stimulus_id=None)[source]

Learn that a worker has certain keys

This should not be used in practice and is mostly here for legacy reasons. However, it is sent by workers from time to time.

add_plugin(plugin: SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs)[source]

Add external plugin to scheduler.

See https://distributed.readthedocs.io/en/latest/plugins.html

Parameters
pluginSchedulerPlugin

SchedulerPlugin instance to add

idempotentbool

If true, the plugin is assumed to already exist and no action is taken.

namestr

A name for the plugin, if None, the name attribute is checked on the Plugin instance and generated if not discovered.

add_replica(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState)

Note that a worker holds a replica of a task with state=’memory’

async add_worker(comm=None, *, address: str, status: str, server_id: str, keys=(), nthreads=None, name=None, resolve_address=True, nbytes=None, types=None, now=None, resources=None, host_info=None, memory_limit=None, metrics=None, pid=0, services=None, local_directory=None, versions: dict[str, Any] | None = None, nanny=None, extra=None, stimulus_id=None)[source]

Add a new worker to the cluster

property address: str

The address this Server can be contacted on. If the server is not up, yet, this raises a ValueError.

property address_safe: str

The address this Server can be contacted on. If the server is not up, yet, this returns a "not-running".

async benchmark_hardware() dict[str, dict[str, float]][source]

Run a benchmark on the workers for memory, disk, and network bandwidths

Returns
result: dict

A dictionary mapping the names “disk”, “memory”, and “network” to dictionaries mapping sizes to bandwidths. These bandwidths are averaged over many workers running computations across the cluster.

async broadcast(comm=None, *, msg: dict, workers: list[str] | None = None, hosts: list[str] | None = None, nanny: bool = False, serializers=None, on_error: "Literal['raise', 'return', 'return_pickle', 'ignore']" = 'raise') dict[source]

Broadcast message to workers, return all results

bulk_schedule_after_adding_worker(ws: distributed.scheduler.WorkerState)

Send tasks with ts.state==’no-worker’ in bulk to a worker that just joined. Return recommendations. As the worker will start executing the new tasks immediately, without waiting for the batch to end, we can’t rely on worker-side ordering, so the recommendations are sorted by priority order here.

check_idle_saturated(ws: distributed.scheduler.WorkerState, occ: float = - 1.0)

Update the status of the idle and saturated state

The scheduler keeps track of workers that are ..

  • Saturated: have enough work to stay busy

  • Idle: do not have enough work to stay busy

They are considered saturated if they both have enough tasks to occupy all of their threads, and if the expected runtime of those tasks is large enough.

This is useful for load balancing and adaptivity.

client_heartbeat(client=None)[source]

Handle heartbeats from Client

client_releases_keys(keys=None, client=None, stimulus_id=None)[source]

Remove keys from client desired list

client_send(client, msg)[source]

Send message to client

async close(fast=None, close_workers=None)[source]

Send cleanup signal to all coroutines then wait until finished

See also

Scheduler.cleanup
async close_worker(worker: str, stimulus_id: str, safe: bool = False)[source]

Remove a worker from the cluster

This both removes the worker from our local state and also sends a signal to the worker to shut down. This works regardless of whether or not the worker has a nanny process restarting it

coerce_address(addr, resolve=True)[source]

Coerce possible input addresses to canonical form. resolve can be disabled for testing with fake hostnames.

Handles strings, tuples, or aliases.

coerce_hostname(host)

Coerce the hostname of a worker.

decide_worker(ts: TaskState) WorkerState | None

Decide on a worker for task ts. Return a WorkerState.

If it’s a root or root-like task, we place it with its relatives to reduce future data tansfer.

If it has dependencies or restrictions, we use decide_worker_from_deps_and_restrictions.

Otherwise, we pick the least occupied worker, or pick from all workers in a round-robin fashion.

async delete_worker_data(worker_address: str, keys: collections.abc.Collection[str], stimulus_id: str) None[source]

Delete data from a worker and update the corresponding worker/task states

Parameters
worker_address: str

Worker address to delete keys from

keys: list[str]

List of keys to delete on the specified worker

async dump_cluster_state_to_url(url: str, exclude: collections.abc.Collection[str], format: Literal['msgpack', 'yaml'], **storage_options: dict[str, Any]) None[source]

Write a cluster state dump to an fsspec-compatible URL.

async feed(comm, function=None, setup=None, teardown=None, interval='1s', **kwargs)[source]

Provides a data Comm to external requester

Caution: this runs arbitrary Python code on the scheduler. This should eventually be phased out. It is mostly used by diagnostics.

async finished()

Wait until the server has finished

async gather(keys, serializers=None)[source]

Collect data from workers to the scheduler

async gather_on_worker(worker_address: str, who_has: dict[str, list[str]]) set[source]

Peer-to-peer copy of keys from multiple workers to a single worker

Parameters
worker_address: str

Recipient worker address to copy keys to

who_has: dict[Hashable, list[str]]

{key: [sender address, sender address, …], key: …}

Returns
returns:

set of keys that failed to be copied

async get_cluster_state(exclude: collections.abc.Collection[str]) dict[source]

Produce the state dict used in a cluster state dump

get_comm_cost(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) float

Get the estimated communication cost (in s.) to compute the task on the given worker.

get_logs(start=0, n=None, timestamps=False)

Fetch log entries for this node

Parameters
startfloat, optional

A time (in seconds) to begin filtering log entries from

nint, optional

Maximum number of log entries to return from filtered results

timestampsbool, default False

Do we want log entries to include the time they were generated?

Returns
List of tuples containing the log level, message, and (optional) timestamp for each filtered entry
get_task_duration(ts: distributed.scheduler.TaskState) float

Get the estimated computation cost of the given task (not including any communication cost).

If no data has been observed, value of distributed.scheduler.default-task-durations are used. If none is set for this task, distributed.scheduler.unknown-task-duration is used instead.

get_worker_service_addr(worker: str, service_name: str, protocol: bool = False) tuple[str, int] | str | None[source]

Get the (host, port) address of the named service on the worker. Returns None if the service doesn’t exist.

Parameters
workeraddress
service_namestr

Common services include ‘bokeh’ and ‘nanny’

protocolboolean

Whether or not to include a full address with protocol (True) or just a (host, port) pair

handle_comm(comm)

Start a background task that dispatches new communications to coroutine-handlers

handle_long_running(key: str, worker: str, compute_duration: float, stimulus_id: str) None[source]

A task has seceded from the thread pool

We stop the task from being stolen in the future, and change task duration accounting as if the task has stopped.

async handle_request_refresh_who_has(keys: collections.abc.Iterable[str], worker: str, stimulus_id: str) None[source]

Asynchronous request (through bulk comms) from a Worker to refresh the who_has for some keys. Not to be confused with scheduler.who_has, which is a synchronous RPC request from a Client.

async handle_worker(comm=None, worker=None, stimulus_id=None)[source]

Listen to responses from a single worker

This is the main loop for scheduler-worker interaction

See also

Scheduler.handle_client

Equivalent coroutine for clients

property host

The host this Server is running on.

This will raise ValueError if the Server is listening on a non-IP based protocol.

identity()[source]

Basic information about ourselves and our cluster

property listen_address

The address this Server is listening on. This may be a wildcard address such as tcp://0.0.0.0:1234.

new_task(key: str, spec: object, state: str, computation: Computation | None = None) TaskState

Create a new task, and associated states

property port

The port number this Server is listening on.

This will raise ValueError if the Server is listening on a non-IP based protocol.

async proxy(comm=None, msg=None, worker=None, serializers=None)[source]

Proxy a communication through the scheduler to some other worker

async rebalance(comm=None, keys: 'Iterable[Hashable]' | None = None, workers: 'Iterable[str]' | None = None, stimulus_id: str | None = None) dict[source]

Rebalance keys so that each worker ends up with roughly the same process memory (managed+unmanaged).

Warning

This operation is generally not well tested against normal operation of the scheduler. It is not recommended to use it while waiting on computations.

Algorithm

  1. Find the mean occupancy of the cluster, defined as data managed by dask + unmanaged process memory that has been there for at least 30 seconds (distributed.worker.memory.recent-to-old-time). This lets us ignore temporary spikes caused by task heap usage.

    Alternatively, you may change how memory is measured both for the individual workers as well as to calculate the mean through distributed.worker.memory.rebalance.measure. Namely, this can be useful to disregard inaccurate OS memory measurements.

  2. Discard workers whose occupancy is within 5% of the mean cluster occupancy (distributed.worker.memory.rebalance.sender-recipient-gap / 2). This helps avoid data from bouncing around the cluster repeatedly.

  3. Workers above the mean are senders; those below are recipients.

  4. Discard senders whose absolute occupancy is below 30% (distributed.worker.memory.rebalance.sender-min). In other words, no data is moved regardless of imbalancing as long as all workers are below 30%.

  5. Discard recipients whose absolute occupancy is above 60% (distributed.worker.memory.rebalance.recipient-max). Note that this threshold by default is the same as distributed.worker.memory.target to prevent workers from accepting data and immediately spilling it out to disk.

  6. Iteratively pick the sender and recipient that are farthest from the mean and move the least recently inserted key between the two, until either all senders or all recipients fall within 5% of the mean.

    A recipient will be skipped if it already has a copy of the data. In other words, this method does not degrade replication. A key will be skipped if there are no recipients available with enough memory to accept the key and that don’t already hold a copy.

The least recently insertd (LRI) policy is a greedy choice with the advantage of being O(1), trivial to implement (it relies on python dict insertion-sorting) and hopefully good enough in most cases. Discarded alternative policies were:

  • Largest first. O(n*log(n)) save for non-trivial additional data structures and risks causing the largest chunks of data to repeatedly move around the cluster like pinballs.

  • Least recently used (LRU). This information is currently available on the workers only and not trivial to replicate on the scheduler; transmitting it over the network would be very expensive. Also, note that dask will go out of its way to minimise the amount of time intermediate keys are held in memory, so in such a case LRI is a close approximation of LRU.

Parameters
keys: optional

allowlist of dask keys that should be considered for moving. All other keys will be ignored. Note that this offers no guarantee that a key will actually be moved (e.g. because it is unnecessary or because there are no viable recipient workers for it).

workers: optional

allowlist of workers addresses to be considered as senders or recipients. All other workers will be ignored. The mean cluster occupancy will be calculated only using the allowed workers.

async reevaluate_occupancy(worker_index: int = 0)[source]

Periodically reassess task duration time

The expected duration of a task can change over time. Unfortunately we don’t have a good constant-time way to propagate the effects of these changes out to the summaries that they affect, like the total expected runtime of each of the workers, or what tasks are stealable.

In this coroutine we walk through all of the workers and re-align their estimates with the current state of tasks. We do this periodically rather than at every transition, and we only do it if the scheduler process isn’t under load (using psutil.Process.cpu_percent()). This lets us avoid this fringe optimization when we have better things to think about.

async register_nanny_plugin(comm, plugin, name=None)[source]

Registers a setup function, and call it on every worker

async register_scheduler_plugin(plugin, name=None, idempotent=None)[source]

Register a plugin on the scheduler.

async register_worker_plugin(comm, plugin, name=None)[source]

Registers a worker plugin on all running and future workers

remove_all_replicas(ts: distributed.scheduler.TaskState)

Remove all replicas of a task from all workers

remove_client(client: str, stimulus_id: str | None = None) None[source]

Remove client from network

remove_plugin(name: str | None = None, plugin: SchedulerPlugin | None = None) None[source]

Remove external plugin from scheduler

Parameters
namestr

Name of the plugin to remove

remove_replica(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState)

Note that a worker no longer holds a replica of a task

async remove_worker(address: str, *, stimulus_id: str, safe: bool = False, close: bool = True) Literal['OK', 'already-removed'][source]

Remove worker from cluster

We do this when a worker reports that it plans to leave or when it appears to be unresponsive. This may send its tasks back to a released state.

async replicate(comm=None, keys=None, n=None, workers=None, branching_factor=2, delete=True, lock=True, stimulus_id=None)[source]

Replicate data throughout cluster

This performs a tree copy of the data throughout the network individually on each piece of data.

Parameters
keys: Iterable

list of keys to replicate

n: int

Number of replications we expect to see within the cluster

branching_factor: int, optional

The number of workers that can copy data in each generation. The larger the branching factor, the more data we copy in a single step, but the more a given worker risks being swamped by data requests.

report(msg: dict, ts: TaskState | None = None, client: str | None = None)[source]

Publish updates to all listening Queues and Comms

If the message contains a key then we only send the message to those comms that care about the key.

request_acquire_replicas(addr: str, keys: collections.abc.Iterable[str], *, stimulus_id: str) None[source]

Asynchronously ask a worker to acquire a replica of the listed keys from other workers. This is a fire-and-forget operation which offers no feedback for success or failure, and is intended for housekeeping and not for computation.

request_remove_replicas(addr: str, keys: list[str], *, stimulus_id: str) None[source]

Asynchronously ask a worker to discard its replica of the listed keys. This must never be used to destroy the last replica of a key. This is a fire-and-forget operation, intended for housekeeping and not for computation.

The replica disappears immediately from TaskState.who_has on the Scheduler side; if the worker refuses to delete, e.g. because the task is a dependency of another task running on it, it will (also asynchronously) inform the scheduler to re-add itself to who_has. If the worker agrees to discard the task, there is no feedback.

reschedule(key=None, worker=None)[source]

Reschedule a task

Things may have shifted and this task may now be better suited to run elsewhere

async restart(client=None, timeout=30)[source]

Restart all workers. Reset local state.

async retire_workers(comm=None, *, workers: list[str] | None = None, names: list | None = None, close_workers: bool = False, remove: bool = True, stimulus_id: str | None = None, **kwargs) dict[source]

Gracefully retire workers from cluster

Parameters
workers: list[str] (optional)

List of worker addresses to retire.

names: list (optional)

List of worker names to retire. Mutually exclusive with workers. If neither workers nor names are provided, we call workers_to_close which finds a good set.

close_workers: bool (defaults to False)

Whether or not to actually close the worker explicitly from here. Otherwise we expect some external job scheduler to finish off the worker.

remove: bool (defaults to True)

Whether or not to remove the worker metadata immediately or else wait for the worker to contact us

**kwargs: dict

Extra options to pass to workers_to_close to determine which workers we should drop

Returns
Dictionary mapping worker ID/address to dictionary of information about
that worker for each retired worker.
run_function(comm, function, args=(), kwargs=None, wait=True)[source]

Run a function within this process

See also

Client.run_on_scheduler
async scatter(comm=None, data=None, workers=None, client=None, broadcast=False, timeout=2)[source]

Send data out to workers

send_all(client_msgs: dict, worker_msgs: dict)[source]

Send messages to client and workers

send_task_to_worker(worker, ts: distributed.scheduler.TaskState, duration: float = - 1)[source]

Send a single computational task to a worker

start_http_server(routes, dashboard_address, default_port=0, ssl_options=None)

This creates an HTTP Server running on this node

start_periodic_callbacks()

Start Periodic Callbacks consistently

This starts all PeriodicCallbacks stored in self.periodic_callbacks if they are not yet running. It does this safely by checking that it is using the correct event loop.

async start_unsafe()[source]

Clear out old state and restart all running coroutines

async stimulus_cancel(keys, client, force=False)[source]

Stop execution on a list of keys

stimulus_task_erred(key=None, worker=None, exception=None, stimulus_id=None, traceback=None, **kwargs)[source]

Mark that a task has erred on a particular worker

stimulus_task_finished(key=None, worker=None, stimulus_id=None, **kwargs)[source]

Mark that a task has finished execution on a particular worker

story(*keys_or_tasks_or_stimuli: str | TaskState) list[tuple][source]

Get all transitions that touch one of the input keys or stimulus_id’s

transition(key, finish: str, *args, stimulus_id: str, **kwargs)[source]

Transition a key from its current state to the finish state

Returns
Dictionary of recommendations for future transitions

See also

Scheduler.transitions

transitive version of this function

Examples

>>> self.transition('x', 'waiting')
{'x': 'processing'}
transition_story(*keys_or_tasks_or_stimuli: str | TaskState) list[tuple]

Get all transitions that touch one of the input keys or stimulus_id’s

transitions(recommendations: dict, stimulus_id: str)[source]

Process transitions until none are left

This includes feedback from previous transitions and continues until we reach a steady state

async unregister_nanny_plugin(comm, name)[source]

Unregisters a worker plugin

async unregister_worker_plugin(comm, name)[source]

Unregisters a worker plugin

update_data(*, who_has: dict, nbytes: dict, client=None)[source]

Learn that new data has entered the network from an external source

See also

Scheduler.mark_key_in_memory
update_graph(client=None, tasks=None, keys=None, dependencies=None, restrictions=None, priority=None, loose_restrictions=None, resources=None, submitting_task=None, retries=None, user_priority=0, actors=None, fifo_timeout=0, annotations=None, code=None, stimulus_id=None)[source]

Add new computations to the internal dask graph

This happens whenever the Client calls submit, map, get, or compute.

valid_workers(ts: TaskState) set[WorkerState] | None

Return set of currently valid workers for key

If all workers are valid then this returns None. This checks tracks the following state:

  • worker_restrictions

  • host_restrictions

  • resource_restrictions

worker_objective(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) tuple

Objective function to determine which worker should get the task

Minimize expected start time. If a tie then break with data storage.

worker_send(worker: str, msg: dict[str, Any]) None[source]

Send message to worker

This also handles connection failures by adding a callback to remove the worker on the next cycle.

workers_list(workers)[source]

List of qualifying workers

Takes a list of worker addresses or hostnames. Returns a list of all worker addresses that match

workers_to_close(comm=None, memory_ratio: int | float | None = None, n: int | None = None, key: Callable[[WorkerState], Hashable] | bytes | None = None, minimum: int | None = None, target: int | None = None, attribute: str = 'address') list[str][source]

Find workers that we can close with low cost

This returns a list of workers that are good candidates to retire. These workers are not running anything and are storing relatively little data relative to their peers. If all workers are idle then we still maintain enough workers to have enough RAM to store our data, with a comfortable buffer.

This is for use with systems like distributed.deploy.adaptive.

Parameters
memory_ratioNumber

Amount of extra space we want to have for our stored data. Defaults to 2, or that we want to have twice as much memory as we currently have data.

nint

Number of workers to close

minimumint

Minimum number of workers to keep around

keyCallable(WorkerState)

An optional callable mapping a WorkerState object to a group affiliation. Groups will be closed together. This is useful when closing workers must be done collectively, such as by hostname.

targetint

Target number of workers to have after we close

attributestr

The attribute of the WorkerState object to return, like “address” or “name”. Defaults to “address”.

Returns
to_close: list of worker addresses that are OK to close

Examples

>>> scheduler.workers_to_close()
['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']

Group workers by hostname prior to closing

>>> scheduler.workers_to_close(key=lambda ws: ws.host)
['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']

Remove two workers

>>> scheduler.workers_to_close(n=2)

Keep enough workers to have twice as much memory as we we need.

>>> scheduler.workers_to_close(memory_ratio=2)
distributed.scheduler.decide_worker(ts: TaskState, all_workers: Iterable[WorkerState], valid_workers: set[WorkerState] | None, objective: Callable[[WorkerState], Any]) WorkerState | None[source]

Decide which worker should take task ts.

We choose the worker that has the data on which ts depends.

If several workers have dependencies then we choose the less-busy worker.

Optionally provide valid_workers of where jobs are allowed to occur (if all workers are allowed to take the task, pass None instead).

If the task requires data communication because no eligible worker has all the dependencies already, then we choose to minimize the number of bytes sent between workers. This is determined by calling the objective function.

class distributed.scheduler.MemoryState(*, process: int, unmanaged_old: int, managed_in_memory: int, managed_spilled: int)[source]

Memory readings on a worker or on the whole cluster.

See Worker Memory Management.

Attributes / properties:

managed

Sum of the output of sizeof() for all dask keys held by the worker in memory, plus number of bytes spilled to disk

managed_in_memory

Sum of the output of sizeof() for the dask keys held in RAM. Note that this may be inaccurate, which may cause inaccurate unmanaged memory (see below).

managed_spilled

Number of bytes for the dask keys spilled to the hard drive. Note that this is the size on disk; size in memory may be different due to compression and inaccuracies in sizeof(). In other words, given the same keys, ‘managed’ will change depending if the keys are in memory or spilled.

process

Total RSS memory measured by the OS on the worker process. This is always exactly equal to managed_in_memory + unmanaged.

unmanaged

process - managed_in_memory. This is the sum of

  • Python interpreter and modules

  • global variables

  • memory temporarily allocated by the dask tasks that are currently running

  • memory fragmentation

  • memory leaks

  • memory not yet garbage collected

  • memory not yet free()’d by the Python memory manager to the OS

unmanaged_old

Minimum of the ‘unmanaged’ measures over the last distributed.memory.recent-to-old-time seconds

unmanaged_recent

unmanaged - unmanaged_old; in other words process memory that has been recently allocated but is not accounted for by dask; hopefully it’s mostly a temporary spike.

optimistic

managed_in_memory + unmanaged_old; in other words the memory held long-term by the process under the hopeful assumption that all unmanaged_recent memory is a temporary spike

Note

There is an intentional misalignment in terminology between this class (which is meant for internal / programmatic use) and the memory readings on the GUI (which is aimed at the general public:

MemoryState

GUI

managed

n/a

managed_in_memory

managed

managed_spilled

spilled

process

process (RSS); memory

unmanaged

n/a

unmanaged_old

unmanaged (old)

unmanaged_recent

unmanaged (recent)

optimistic

n/a