Scheduler State Machine
Contents
Scheduler State Machine¶
Overview¶
The life of a computation with Dask can be described in the following stages:
The user authors a graph using some library, perhaps dask.delayed or dask.dataframe or the
submit/map
functions on the client. They submit these tasks to the scheduler.The scheduler assimilates these tasks into its graph of all tasks to track, and as their dependencies become available it asks workers to run each of these tasks in turn.
The worker receives information about how to run the task, communicates with its peer workers to collect data dependencies, and then runs the relevant function on the appropriate data. It reports back to the scheduler that it has finished, keeping the result stored in the worker where it was computed.
The scheduler reports back to the user that the task has completed. If the user desires, it then fetches the data from the worker through the scheduler.
Most relevant logic is in tracking tasks as they evolve from newly submitted, to waiting for dependencies, to actively running on some worker, to finished in memory, to garbage collected. Tracking this process, and tracking all effects that this task has on other tasks that might depend on it, is the majority of the complexity of the dynamic task scheduler. This section describes the system used to perform this tracking.
For more abstract information about the policies used by the scheduler, see Scheduling Policies.
The scheduler keeps internal state about several kinds of entities:
Individual tasks known to the scheduler
Workers connected to the scheduler
Clients connected to the scheduler
Note
Everything listed in this page is an internal detail of how Dask operates. It may change between versions and you should probably avoid relying on it in user code (including on any APIs explained here).
Task State¶
Internally, the scheduler moves tasks between a fixed set of states,
notably released
, waiting
, no-worker
, queued
, processing
,
memory
, error
.
Tasks flow along the following states with the following allowed transitions:
Note that tasks may also transition to released
from any state (not shown on diagram).
- released
Known but not actively computing or in memory
- waiting
On track to be computed, waiting on dependencies to arrive in memory
- no-worker
Ready to be computed, but no appropriate worker exists (for example because of resource restrictions, or because no worker is connected at all).
- queued
Ready to be computed, but all workers are already full.
- processing
All dependencies are available and the task is assigned to a worker for compute (the scheduler doesn’t know whether it’s in a worker queue or actively being computed).
- memory
In memory on one or more workers
- erred
Task computation, or one of its dependencies, has encountered an error
- forgotten
Task is no longer needed by any client or dependent task, so it disappears from the scheduler as well. As soon as a task reaches this state, it is immediately dereferenced from the scheduler.
Note
Setting distributed.scheduler.worker_saturation
config value to 1.1
(default) or any other finite value will queue excess root tasks on the scheduler in the queued
state.
These tasks are only assigned to workers when they have capacity for them, reducing
the length of task queues on the workers.
When the distributed.scheduler.worker_saturation
config value is set to inf
,
there’s no intermediate state between waiting
/ no-worker
and
processing
: as soon as a task has all of its dependencies in memory somewhere on
the cluster, it is immediately assigned to a worker. This can lead to very long task
queues on the workers, which are then rebalanced dynamically through
Work Stealing.
In addition to the literal state, though, other information needs to be
kept and updated about each task. Individual task state is stored in an
object named TaskState
; see full API through the link.
The scheduler keeps track of all the TaskState
objects (those
not in the “forgotten” state) using several containers:
- tasks: {str: TaskState}
A dictionary mapping task keys to
TaskState
objects. Task keys are how information about tasks is communicated between the scheduler and clients, or the scheduler and workers; this dictionary is then used to find the correspondingTaskState
object.
- unrunnable: {TaskState}
A set of
TaskState
objects in the “no-worker” state. These tasks already have all theirdependencies
satisfied (theirwaiting_on
set is empty), and are waiting for an appropriate worker to join the network before computing.
Once a task is queued up on a worker, it is also tracked on the worker side by the Worker State Machine.
Worker State¶
Each worker’s current state is stored in a WorkerState
object; see full API
through the link.
This is a scheduler-side object, which holds information about what the scheduler
knows about each worker on the cluster, and is not to be confused with
distributed.worker-state-machine.WorkerState
.
This information is involved in deciding which worker to run a task on.
In addition to individual worker state, the scheduler maintains two containers to help with scheduling tasks:
- Scheduler.saturated: {WorkerState}
A set of workers whose computing power (as measured by
WorkerState.nthreads
) is fully exploited by processing tasks, and whose currentoccupancy
is a lot greater than the average.
- Scheduler.idle: {WorkerState}
A set of workers whose computing power is not fully exploited. These workers are assumed to be able to start computing new tasks immediately.
These two sets are disjoint. Also, some workers may be neither “idle” nor “saturated”. “Idle” workers will be preferred when deciding a suitable worker to run a new task on. Conversely, “saturated” workers may see their workload lightened through Work Stealing.
Client State¶
Information about each individual client of the scheduler is kept
in a ClientState
object; see full API through the link.
Understanding a Task’s Flow¶
As seen above, there are numerous pieces of information pertaining to task and worker state, and some of them can be computed, updated or removed during a task’s transitions.
The table below shows which state variable a task is in, depending on the task’s state. Cells with a check mark (✓) indicate the task key must be present in the given state variable; cells with an question mark (?) indicate the task key may be present in the given state variable.
State variable |
Released |
Waiting |
No-worker |
Processing |
Memory |
Erred |
---|---|---|---|---|---|---|
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
|
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
|
? |
? |
? |
? |
? |
? |
|
? |
? |
? |
? |
? |
? |
|
? |
? |
? |
? |
? |
? |
|
? |
? |
? |
? |
? |
? |
|
✓ |
✓ |
|||||
✓ |
✓ |
|||||
✓ |
||||||
✓ |
||||||
✓ |
||||||
✓ |
||||||
|
? |
? |
? |
? |
✓ |
? |
? |
||||||
? |
||||||
✓ |
||||||
? |
? |
? |
? |
? |
? |
|
|
? |
? |
? |
? |
? |
? |
Notes:
TaskState.nbytes
: this attribute can be known as long as a task has already been computed, even if it has been later released.TaskState.exception
andTaskState.traceback
should be looked up on theTaskState.exception_blame
task.
The table below shows which worker state variables are updated on each task state transition.
Transition |
Affected worker state |
---|---|
released → waiting |
occupancy, idle, saturated |
waiting → processing |
occupancy, idle, saturated, used_resources |
waiting → memory |
idle, saturated, nbytes |
processing → memory |
occupancy, idle, saturated, used_resources, nbytes |
processing → erred |
occupancy, idle, saturated, used_resources |
processing → released |
occupancy, idle, saturated, used_resources |
memory → released |
nbytes |
memory → forgotten |
nbytes |
Note
Another way of understanding this table is to observe that entering or
exiting a specific task state updates a well-defined set of worker state
variables. For example, entering and exiting the “memory” state updates
WorkerState.nbytes
.
Implementation¶
Every transition between states is a separate method in the scheduler. These
task transition functions are prefixed with transition
and then have the
name of the start and finish task state like the following.
def transition_released_waiting(self, key, stimulus_id): ...
def transition_processing_memory(self, key, stimulus_id): ...
def transition_processing_erred(self, key, stimulus_id): ...
These functions each have three effects.
They perform the necessary transformations on the scheduler state (the 20 dicts/lists/sets) to move one key between states.
They return a dictionary of recommended
{key: state}
transitions to enact directly afterwards on other keys. For example, after we transition a key into memory, we may find that many waiting keys are now ready to transition from waiting to a ready state.Optionally, they include a set of validation checks that can be turned on for testing.
Rather than call these functions directly we call the central function
transition
:
def transition(self, key, final_state, stimulus_id): ...
This transition function finds the appropriate path from the current to the final state. It also serves as a central point for logging and diagnostics.
Often we want to enact several transitions at once or want to continually
respond to new transitions recommended by initial transitions until we reach a
steady state. For that we use the transitions
function (note the plural s
).
def transitions(self, recommendations, stimulus_id):
recommendations = recommendations.copy()
while recommendations:
key, finish = recommendations.popitem()
new = self.transition(key, finish)
recommendations.update(new)
This function runs transition
, takes the recommendations and runs them as
well, repeating until no further task-transitions are recommended.
Stimuli¶
Transitions occur from stimuli, which are state-changing messages to the scheduler from workers or clients. The scheduler responds to the following stimuli:
Workers
- task-finished
A task has completed on a worker and is now in memory
- task-erred
A task ran and erred on a worker
- reschedule
A task has completed on a worker by raising
Reschedule
- long-running
A task is still running on the worker, but it called
secede()
- add-keys
Replication finished. One or more tasks, which were previously in memory on other workers, are now in memory on one additional worker. Also used to inform the scheduler of a successful
scatter()
operation.- request-refresh-who-has
All peers that hold a replica of a task in memory that a worker knows of are unavailable (temporarily or permanently), so the worker can’t fetch it and is asking the scheduler if it knows of any additional replicas. This call is repeated periodically until a new replica appears.
- release-worker-data
A worker informs that the scheduler that it no longer holds the task in memory
- worker-status-change
The global status of a worker has just changed, e.g. between
running
andpaused
.- log-event
A generic event happened on the worker, which should be logged centrally. Note that this is in addition to the worker’s log, which the client can fetch on request (up to a certain length).
- keep-alive
A worker informs that it’s still online and responsive. This uses the batched stream channel, as opposed to
distributed.worker.Worker.heartbeat()
andScheduler.heartbeat_worker()
which use dedicated RPC comms, and is needed to prevent firewalls from closing down the batched stream.- register-worker
A new worker was added to the network
- unregister
An existing worker left the network
Clients
- update-graph
The client sends more tasks to the scheduler
- client-releases-keys
The client no longer desires the result of certain keys.
Note that there are many more client API endpoints (e.g. to serve
scatter()
etc.), which are not listed here for the sake of
brevity.
Stimuli functions are prepended with the text stimulus
, and take a variety
of keyword arguments from the message as in the following examples:
def stimulus_task_finished(self, key=None, worker=None, nbytes=None,
type=None, compute_start=None, compute_stop=None,
transfer_start=None, transfer_stop=None):
def stimulus_task_erred(self, key=None, worker=None,
exception=None, traceback=None)
These functions change some non-essential administrative state and then call transition functions.
Note that there are several other non-state-changing messages that we receive from the workers and clients, such as messages requesting information about the current state of the scheduler. These are not considered stimuli.
API¶
- class distributed.scheduler.Scheduler(loop=None, services=None, service_kwargs=None, allowed_failures=None, extensions=None, validate=None, scheduler_file=None, security=None, worker_ttl=None, idle_timeout=None, interface=None, host=None, port=0, protocol=None, dashboard_address=None, dashboard=None, http_prefix='/', preload=None, preload_argv=(), plugins=(), contact_address=None, transition_counter_max=False, jupyter=False, **kwargs)[source]¶
Dynamic distributed task scheduler
The scheduler tracks the current state of workers, data, and computations. The scheduler listens for events and responds by controlling workers appropriately. It continuously tries to use the workers to execute an ever growing dask graph.
All events are handled quickly, in linear time with respect to their input (which is often of constant size) and generally within a millisecond. To accomplish this the scheduler tracks a lot of state. Every operation maintains the consistency of this state.
The scheduler communicates with the outside world through Comm objects. It maintains a consistent and valid view of the world even when listening to several clients at once.
A Scheduler is typically started either with the
dask scheduler
executable:$ dask scheduler Scheduler started at 127.0.0.1:8786
Or within a LocalCluster a Client starts up without connection information:
>>> c = Client() >>> c.cluster.scheduler Scheduler(...)
Users typically do not interact with the scheduler directly but rather with the client object
Client
.The
contact_address
parameter allows to advertise a specific address to the workers for communication with the scheduler, which is different than the address the scheduler binds to. This is useful when the scheduler listens on a private address, which therefore cannot be used by the workers to contact it.State
The scheduler contains the following state variables. Each variable is listed along with what it stores and a brief description.
- tasks:
{task key: TaskState}
Tasks currently known to the scheduler
- tasks:
- unrunnable:
{TaskState}
Tasks in the “no-worker” state
- unrunnable:
- workers:
{worker key: WorkerState}
Workers currently connected to the scheduler
- workers:
- idle:
{WorkerState}
: Set of workers that are not fully utilized
- idle:
- saturated:
{WorkerState}
: Set of workers that are not over-utilized
- saturated:
- host_info:
{hostname: dict}
: Information about each worker host
- host_info:
- clients:
{client key: ClientState}
Clients currently connected to the scheduler
- clients:
- services:
{str: port}
: Other services running on this scheduler, like Bokeh
- services:
- loop:
IOLoop
: The running Tornado IOLoop
- loop:
- client_comms:
{client key: Comm}
For each client, a Comm object used to receive task requests and report task status updates.
- client_comms:
- stream_comms:
{worker key: Comm}
For each worker, a Comm object from which we both accept stimuli and report results
- stream_comms:
- task_duration:
{key-prefix: time}
Time we expect certain functions to take, e.g.
{'sum': 0.25}
- task_duration:
- adaptive_target(target_duration=None)[source]¶
Desired number of workers based on the current workload
This looks at the current running tasks and memory use, and returns a number of desired workers. This is often used by adaptive scheduling.
- Parameters
- target_durationstr
A desired duration of time for computations to take. This affects how rapidly the scheduler will ask to scale.
See also
- async add_client(comm: distributed.comm.core.Comm, client: str, versions: dict[str, Any]) None [source]¶
Add client to network
We listen to all future messages from this Comm.
- add_keys(worker: str, keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]] = (), stimulus_id: str | None = None) Literal['OK', 'not found'] [source]¶
Learn that a worker has certain keys
This should not be used in practice and is mostly here for legacy reasons. However, it is sent by workers from time to time.
- add_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs: Any) None [source]¶
Add external plugin to scheduler.
See https://distributed.readthedocs.io/en/latest/plugins.html
- Parameters
- pluginSchedulerPlugin
SchedulerPlugin instance to add
- idempotentbool
If true, the plugin is assumed to already exist and no action is taken.
- namestr
A name for the plugin, if None, the name attribute is checked on the Plugin instance and generated if not discovered.
- add_replica(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) None ¶
Note that a worker holds a replica of a task with state=’memory’
- async add_worker(comm: distributed.comm.core.Comm, *, address: str, status: str, server_id: str, nthreads: int, name: str, resolve_address: bool = True, now: float, resources: dict[str, float], host_info: None = None, memory_limit: int | None, metrics: dict[str, Any], pid: int = 0, services: dict[str, int], local_directory: str, versions: dict[str, Any], nanny: str, extra: dict, stimulus_id: str) None [source]¶
Add a new worker to the cluster
- property address: str¶
The address this Server can be contacted on. If the server is not up, yet, this raises a ValueError.
- property address_safe: str¶
The address this Server can be contacted on. If the server is not up, yet, this returns a
"not-running"
.
- async benchmark_hardware() dict[str, dict[str, float]] [source]¶
Run a benchmark on the workers for memory, disk, and network bandwidths
- Returns
- result: dict
A dictionary mapping the names “disk”, “memory”, and “network” to dictionaries mapping sizes to bandwidths. These bandwidths are averaged over many workers running computations across the cluster.
- async broadcast(*, msg: dict, workers: collections.abc.Collection[str] | None = None, hosts: collections.abc.Collection[str] | None = None, nanny: bool = False, serializers: Any = None, on_error: Literal['raise', 'return', 'return_pickle', 'ignore'] = 'raise') dict[str, Any] [source]¶
Broadcast message to workers, return all results
- bulk_schedule_unrunnable_after_adding_worker(ws: distributed.scheduler.WorkerState) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']] ¶
Send
no-worker
tasks toprocessing
that this worker can handle.Returns priority-ordered recommendations.
- check_idle_saturated(ws: distributed.scheduler.WorkerState, occ: float = - 1.0) None ¶
Update the status of the idle and saturated state
The scheduler keeps track of workers that are ..
Saturated: have enough work to stay busy
Idle: do not have enough work to stay busy
They are considered saturated if they both have enough tasks to occupy all of their threads, and if the expected runtime of those tasks is large enough.
If
distributed.scheduler.worker-saturation
is notinf
(scheduler-side queuing is enabled), they are considered idle if they have fewer tasks processing than theworker-saturation
threshold dictates.Otherwise, they are considered idle if they have fewer tasks processing than threads, or if their tasks’ total expected runtime is less than half the expected runtime of the same number of average tasks.
This is useful for load balancing and adaptivity.
- client_releases_keys(keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, stimulus_id: str | None = None) None [source]¶
Remove keys from client desired list
- async close(fast=None, close_workers=None, reason='unknown')[source]¶
Send cleanup signal to all coroutines then wait until finished
See also
Scheduler.cleanup
- close_worker(worker: str) None [source]¶
Ask a worker to shut itself down. Do not wait for it to take effect. Note that there is no guarantee that the worker will actually accept the command.
Note that
remove_worker()
sends the same command internally if close=True.See also
- coerce_address(addr: str | tuple, resolve: bool = True) str [source]¶
Coerce possible input addresses to canonical form. resolve can be disabled for testing with fake hostnames.
Handles strings, tuples, or aliases.
- coerce_hostname(host: collections.abc.Hashable) str ¶
Coerce the hostname of a worker.
- decide_worker_non_rootish(ts: distributed.scheduler.TaskState) distributed.scheduler.WorkerState | None ¶
Pick a worker for a runnable non-root task, considering dependencies and restrictions.
Out of eligible workers holding dependencies of
ts
, selects the worker where, considering worker backlog and data-transfer costs, the task is estimated to start running the soonest.- Returns
- ws: WorkerState | None
The worker to assign the task to. If no workers satisfy the restrictions of
ts
or there are no running workers, returns None, in which case the task should be transitioned tono-worker
.
- decide_worker_rootish_queuing_disabled(ts: distributed.scheduler.TaskState) distributed.scheduler.WorkerState | None ¶
Pick a worker for a runnable root-ish task, without queuing.
This attempts to schedule sibling tasks on the same worker, reducing future data transfer. It does not consider the location of dependencies, since they’ll end up on every worker anyway.
It assumes it’s being called on a batch of tasks in priority order, and maintains state in SchedulerState.last_root_worker and SchedulerState.last_root_worker_tasks_left to achieve this.
This will send every runnable task to a worker, often causing root task overproduction.
- Returns
- ws: WorkerState | None
The worker to assign the task to. If there are no workers in the cluster, returns None, in which case the task should be transitioned to
no-worker
.
- decide_worker_rootish_queuing_enabled() distributed.scheduler.WorkerState | None ¶
Pick a worker for a runnable root-ish task, if not all are busy.
Picks the least-busy worker out of the
idle
workers (idle workers have fewer tasks running than threads, as set bydistributed.scheduler.worker-saturation
). It does not consider the location of dependencies, since they’ll end up on every worker anyway.If all workers are full, returns None, meaning the task should transition to
queued
. The scheduler will wait to send it to a worker until a thread opens up. This ensures that downstream tasks always run before new root tasks are started.This does not try to schedule sibling tasks on the same worker; in fact, it usually does the opposite. Even though this increases subsequent data transfer, it typically reduces overall memory use by eliminating root task overproduction.
- Returns
- ws: WorkerState | None
The worker to assign the task to. If there are no idle workers, returns None, in which case the task should be transitioned to
queued
.
- async delete_worker_data(worker_address: str, keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], stimulus_id: str) None [source]¶
Delete data from a worker and update the corresponding worker/task states
- Parameters
- worker_address: str
Worker address to delete keys from
- keys: list[Key]
List of keys to delete on the specified worker
- async dump_cluster_state_to_url(url: str, exclude: collections.abc.Collection[str], format: Literal['msgpack', 'yaml'], **storage_options: dict[str, Any]) None [source]¶
Write a cluster state dump to an fsspec-compatible URL.
- async feed(comm: distributed.comm.core.Comm, function: bytes | None = None, setup: bytes | None = None, teardown: bytes | None = None, interval: str | float = '1s', **kwargs: Any) None [source]¶
Provides a data Comm to external requester
Caution: this runs arbitrary Python code on the scheduler. This should eventually be phased out. It is mostly used by diagnostics.
- async finished()¶
Wait until the server has finished
- async gather(keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], serializers: list[str] | None = None) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object] [source]¶
Collect data from workers to the scheduler
- async gather_on_worker(worker_address: str, who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) set [source]¶
Peer-to-peer copy of keys from multiple workers to a single worker
- Parameters
- worker_address: str
Recipient worker address to copy keys to
- who_has: dict[Key, list[str]]
{key: [sender address, sender address, …], key: …}
- Returns
- returns:
set of keys that failed to be copied
- async get_cluster_state(exclude: collections.abc.Collection[str]) dict [source]¶
Produce the state dict used in a cluster state dump
- get_comm_cost(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) float ¶
Get the estimated communication cost (in s.) to compute the task on the given worker.
- get_connection_counters() dict[str, int] ¶
A dict with various connection counters
See also
Server.incoming_comms_open
Server.incoming_comms_active
Server.outgoing_comms_open
Server.outgoing_comms_active
- get_logs(start=0, n=None, timestamps=False)¶
Fetch log entries for this node
- Parameters
- startfloat, optional
A time (in seconds) to begin filtering log entries from
- nint, optional
Maximum number of log entries to return from filtered results
- timestampsbool, default False
Do we want log entries to include the time they were generated?
- Returns
- List of tuples containing the log level, message, and (optional) timestamp for each filtered entry, newest first
- async get_story(keys_or_stimuli: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) list[distributed.scheduler.Transition] [source]¶
RPC hook for
SchedulerState.story()
.Note that the msgpack serialization/deserialization round-trip will transform the
Transition
namedtuples into regular tuples.
- get_task_duration(ts: distributed.scheduler.TaskState) float ¶
Get the estimated computation cost of the given task (not including any communication cost).
If no data has been observed, value of distributed.scheduler.default-task-durations are used. If none is set for this task, distributed.scheduler.unknown-task-duration is used instead.
- get_worker_service_addr(worker: str, service_name: str, protocol: bool = False) tuple[str, int] | str | None [source]¶
Get the (host, port) address of the named service on the worker. Returns None if the service doesn’t exist.
- Parameters
- workeraddress
- service_namestr
Common services include ‘bokeh’ and ‘nanny’
- protocolboolean
Whether or not to include a full address with protocol (True) or just a (host, port) pair
- handle_comm(comm: distributed.comm.core.Comm) distributed.utils.NoOpAwaitable ¶
Start a background task that dispatches new communications to coroutine-handlers
- handle_long_running(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], worker: str, compute_duration: float | None, stimulus_id: str) None [source]¶
A task has seceded from the thread pool
We stop the task from being stolen in the future, and change task duration accounting as if the task has stopped.
- handle_request_refresh_who_has(keys: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], worker: str, stimulus_id: str) None [source]¶
Request from a Worker to refresh the who_has for some keys. Not to be confused with scheduler.who_has, which is a dedicated comm RPC request from a Client.
- async handle_worker(comm: distributed.comm.core.Comm, worker: str) None [source]¶
Listen to responses from a single worker
This is the main loop for scheduler-worker interaction
See also
Scheduler.handle_client
Equivalent coroutine for clients
- property host¶
The host this Server is running on.
This will raise ValueError if the Server is listening on a non-IP based protocol.
- property incoming_comms_open: int¶
The number of total incoming connections listening to remote RPCs
- property is_idle: bool¶
Return True iff there are no tasks that haven’t finished computing.
Unlike testing
self.total_occupancy
, this property returns False if there are long-running tasks, no-worker, or queued tasks (due to not having any workers).Not to be confused with
idle
.
- is_rootish(ts: distributed.scheduler.TaskState) bool ¶
Whether
ts
is a root or root-like task.Root-ish tasks are part of a group that’s much larger than the cluster, and have few or no dependencies. Tasks may also be explicitly marked as rootish to override this heuristic.
- property listen_address¶
The address this Server is listening on. This may be a wildcard address such as tcp://0.0.0.0:1234.
- log_event(topic: str | collections.abc.Collection[str], msg: Any) None [source]¶
Log an event under a given topic
- Parameters
- topicstr, list[str]
Name of the topic under which to log an event. To log the same event under multiple topics, pass a list of topic names.
- msg
Event message to log. Note this must be msgpack serializable.
See also
Client.log_event
- new_task(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], spec: dask._task_spec.GraphNode | None, state: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten'], computation: distributed.scheduler.Computation | None = None) distributed.scheduler.TaskState ¶
Create a new task, and associated states
- property outgoing_comms_active: int¶
The number of outgoing connections that are currently used to execute a RPC
- property outgoing_comms_open: int¶
The number of connections currently open and waiting for a remote RPC
- property port¶
The port number this Server is listening on.
This will raise ValueError if the Server is listening on a non-IP based protocol.
- async proxy(msg: dict, worker: str, serializers: Any = None) Any [source]¶
Proxy a communication through the scheduler to some other worker
- async rebalance(keys: collections.abc.Iterable[Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]] | None = None, workers: collections.abc.Iterable[str] | None = None, stimulus_id: str | None = None) dict [source]¶
Rebalance keys so that each worker ends up with roughly the same process memory (managed+unmanaged).
Warning
This operation is generally not well tested against normal operation of the scheduler. It is not recommended to use it while waiting on computations.
Algorithm
Find the mean occupancy of the cluster, defined as data managed by dask + unmanaged process memory that has been there for at least 30 seconds (
distributed.worker.memory.recent-to-old-time
). This lets us ignore temporary spikes caused by task heap usage.Alternatively, you may change how memory is measured both for the individual workers as well as to calculate the mean through
distributed.worker.memory.rebalance.measure
. Namely, this can be useful to disregard inaccurate OS memory measurements.Discard workers whose occupancy is within 5% of the mean cluster occupancy (
distributed.worker.memory.rebalance.sender-recipient-gap
/ 2). This helps avoid data from bouncing around the cluster repeatedly.Workers above the mean are senders; those below are recipients.
Discard senders whose absolute occupancy is below 30% (
distributed.worker.memory.rebalance.sender-min
). In other words, no data is moved regardless of imbalancing as long as all workers are below 30%.Discard recipients whose absolute occupancy is above 60% (
distributed.worker.memory.rebalance.recipient-max
). Note that this threshold by default is the same asdistributed.worker.memory.target
to prevent workers from accepting data and immediately spilling it out to disk.Iteratively pick the sender and recipient that are farthest from the mean and move the least recently inserted key between the two, until either all senders or all recipients fall within 5% of the mean.
A recipient will be skipped if it already has a copy of the data. In other words, this method does not degrade replication. A key will be skipped if there are no recipients available with enough memory to accept the key and that don’t already hold a copy.
The least recently insertd (LRI) policy is a greedy choice with the advantage of being O(1), trivial to implement (it relies on python dict insertion-sorting) and hopefully good enough in most cases. Discarded alternative policies were:
Largest first. O(n*log(n)) save for non-trivial additional data structures and risks causing the largest chunks of data to repeatedly move around the cluster like pinballs.
Least recently used (LRU). This information is currently available on the workers only and not trivial to replicate on the scheduler; transmitting it over the network would be very expensive. Also, note that dask will go out of its way to minimise the amount of time intermediate keys are held in memory, so in such a case LRI is a close approximation of LRU.
- Parameters
- keys: optional
allowlist of dask keys that should be considered for moving. All other keys will be ignored. Note that this offers no guarantee that a key will actually be moved (e.g. because it is unnecessary or because there are no viable recipient workers for it).
- workers: optional
allowlist of workers addresses to be considered as senders or recipients. All other workers will be ignored. The mean cluster occupancy will be calculated only using the allowed workers.
- async register_nanny_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage] [source]¶
Registers a nanny plugin on all running and future nannies
- async register_scheduler_plugin(plugin: bytes | distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None) None [source]¶
Register a plugin on the scheduler.
- async register_worker_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage] [source]¶
Registers a worker plugin on all running and future workers
- remove_all_replicas(ts: distributed.scheduler.TaskState) None ¶
Remove all replicas of a task from all workers
- remove_client(client: str, stimulus_id: str | None = None) None [source]¶
Remove client from network
- remove_plugin(name: str | None = None) None [source]¶
Remove external plugin from scheduler
- Parameters
- namestr
Name of the plugin to remove
- remove_replica(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) None ¶
Note that a worker no longer holds a replica of a task
- remove_worker(address: str, *, stimulus_id: str, expected: bool = False, close: bool = True) Literal['OK', 'already-removed'] [source]¶
Remove worker from cluster.
We do this when a worker reports that it plans to leave or when it appears to be unresponsive. This may send its tasks back to a released state.
See also
- async replicate(comm=None, keys=None, n=None, workers=None, branching_factor=2, delete=True, stimulus_id=None)[source]¶
Replicate data throughout cluster
This performs a tree copy of the data throughout the network individually on each piece of data.
- Parameters
- keys: Iterable
list of keys to replicate
- n: int
Number of replications we expect to see within the cluster
- branching_factor: int, optional
The number of workers that can copy data in each generation. The larger the branching factor, the more data we copy in a single step, but the more a given worker risks being swamped by data requests.
See also
- report(msg: dict, ts: distributed.scheduler.TaskState | None = None, client: str | None = None) None [source]¶
Publish updates to all listening Queues and Comms
If the message contains a key then we only send the message to those comms that care about the key.
- request_acquire_replicas(addr: str, keys: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None [source]¶
Asynchronously ask a worker to acquire a replica of the listed keys from other workers. This is a fire-and-forget operation which offers no feedback for success or failure, and is intended for housekeeping and not for computation.
- request_remove_replicas(addr: str, keys: list[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None [source]¶
Asynchronously ask a worker to discard its replica of the listed keys. This must never be used to destroy the last replica of a key. This is a fire-and-forget operation, intended for housekeeping and not for computation.
The replica disappears immediately from TaskState.who_has on the Scheduler side; if the worker refuses to delete, e.g. because the task is a dependency of another task running on it, it will (also asynchronously) inform the scheduler to re-add itself to who_has. If the worker agrees to discard the task, there is no feedback.
- async restart(*, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, stimulus_id: str) None [source]¶
Forget all tasks and call restart_workers on all workers.
- Parameters
- timeout:
See restart_workers
- wait_for_workers:
See restart_workers
See also
Client.restart
Client.restart_workers
Scheduler.restart_workers
- async restart_workers(workers: list[str] | None = None, *, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, on_error: Literal['raise', 'return'] = 'raise', stimulus_id: str) dict[str, Literal['OK', 'removed', 'timed out']] [source]¶
Restart selected workers. Optionally wait for workers to return.
Workers without nannies are shut down, hoping an external deployment system will restart them. Therefore, if not using nannies and your deployment system does not automatically restart workers,
restart
will just shut down all workers, then time out!After
restart
, all connected workers are new, regardless of whetherTimeoutError
was raised. Any workers that failed to shut down in time are removed, and may or may not shut down on their own in the future.- Parameters
- workers:
List of worker addresses to restart. If omitted, restart all workers.
- timeout:
How long to wait for workers to shut down and come back, if
wait_for_workers
is True, otherwise just how long to wait for workers to shut down. Raisesasyncio.TimeoutError
if this is exceeded.- wait_for_workers:
Whether to wait for all workers to reconnect, or just for them to shut down (default True). Use
restart(wait_for_workers=False)
combined withClient.wait_for_workers()
for granular control over how many workers to wait for.- on_error:
If ‘raise’ (the default), raise if any nanny times out while restarting the worker. If ‘return’, return error messages.
- Returns
- {worker address: “OK”, “no nanny”, or “timed out” or error message}
See also
Client.restart
Client.restart_workers
Scheduler.restart
- async retire_workers(workers: list[str], *, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, distributed.utils.Any] [source]¶
- async retire_workers(*, names: list, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, distributed.utils.Any]
- async retire_workers(*, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None', memory_ratio: int | float | None = 'None', n: int | None = 'None', key: Callable[[WorkerState], Hashable] | bytes | None = 'None', minimum: int | None = 'None', target: int | None = 'None', attribute: str = "'address'") dict[str, distributed.utils.Any]
Gracefully retire workers from cluster. Any key that is in memory exclusively on the retired workers is replicated somewhere else.
- Parameters
- workers: list[str] (optional)
List of worker addresses to retire.
- names: list (optional)
List of worker names to retire. Mutually exclusive with
workers
. If neitherworkers
nornames
are provided, we callworkers_to_close
which finds a good set.- close_workers: bool (defaults to False)
Whether to actually close the worker explicitly from here. Otherwise, we expect some external job scheduler to finish off the worker.
- remove: bool (defaults to True)
Whether to remove the worker metadata immediately or else wait for the worker to contact us.
If close_workers=False and remove=False, this method just flushes the tasks in memory out of the workers and then returns. If close_workers=True and remove=False, this method will return while the workers are still in the cluster, although they won’t accept new tasks. If close_workers=False or for whatever reason a worker doesn’t accept the close command, it will be left permanently unable to accept new tasks and it is expected to be closed in some other way.
- **kwargs: dict
Extra options to pass to workers_to_close to determine which workers we should drop. Only accepted if
workers
andnames
are omitted.
- Returns
- Dictionary mapping worker ID/address to dictionary of information about
- that worker for each retired worker.
- If there are keys that exist in memory only on the workers being retired and it
- was impossible to replicate them somewhere else (e.g. because there aren’t
- any other running workers), the workers holding such keys won’t be retired and
- won’t appear in the returned dict.
See also
- run_function(comm: distributed.comm.core.Comm, function: collections.abc.Callable, args: tuple = (), kwargs: dict | None = None, wait: bool = True) Any [source]¶
Run a function within this process
See also
Client.run_on_scheduler
- async scatter(comm=None, data=None, workers=None, client=None, broadcast=False, timeout=2)[source]¶
Send data out to workers
See also
- send_all(client_msgs: dict[str, list[dict[str, Any]]], worker_msgs: dict[str, list[dict[str, Any]]]) None [source]¶
Send messages to client and workers
- send_task_to_worker(worker: str, ts: distributed.scheduler.TaskState, duration: float = - 1) None [source]¶
Send a single computational task to a worker
- start_http_server(routes, dashboard_address, default_port=0, ssl_options=None)¶
This creates an HTTP Server running on this node
- start_periodic_callbacks()¶
Start Periodic Callbacks consistently
This starts all PeriodicCallbacks stored in self.periodic_callbacks if they are not yet running. It does this safely by checking that it is using the correct event loop.
- stimulus_cancel(keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, force: bool = False) None [source]¶
Stop execution on a list of keys
- stimulus_queue_slots_maybe_opened(*, stimulus_id: str) None [source]¶
Respond to an event which may have opened spots on worker threadpools
Selects the appropriate number of tasks from the front of the queue according to the total number of task slots available on workers (potentially 0), and transitions them to
processing
.Notes
Other transitions related to this stimulus should be fully processed beforehand, so any tasks that became runnable are already in
processing
. Otherwise, overproduction can occur if queued tasks get scheduled before downstream tasks.Must be called after check_idle_saturated; i.e. idle_task_count must be up to date.
- stimulus_task_erred(key=None, worker=None, exception=None, stimulus_id=None, traceback=None, run_id=None, **kwargs)[source]¶
Mark that a task has erred on a particular worker
- stimulus_task_finished(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], worker: str, stimulus_id: str, run_id: int, **kwargs: Any) tuple[dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]] [source]¶
Mark that a task has finished execution on a particular worker
- story(*keys_or_tasks_or_stimuli: Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.scheduler.TaskState]) list[distributed.scheduler.Transition] ¶
Get all transitions that touch one of the input keys or stimulus_id’s
- transition(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], finish: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten'], stimulus_id: str, **kwargs: Any) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']] [source]¶
Transition a key from its current state to the finish state
- Returns
- Dictionary of recommendations for future transitions
See also
Scheduler.transitions
transitive version of this function
Examples
>>> self.transition('x', 'waiting') {'x': 'processing'}
- transitions(recommendations: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], stimulus_id: str) None [source]¶
Process transitions until none are left
This includes feedback from previous transitions and continues until we reach a steady state
- async unregister_nanny_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage] [source]¶
Unregisters a worker plugin
- async unregister_worker_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage] [source]¶
Unregisters a worker plugin
- update_data(*, who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]], nbytes: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], int], client: str | None = None) None [source]¶
Learn that new data has entered the network from an external source
- valid_workers(ts: distributed.scheduler.TaskState) set[distributed.scheduler.WorkerState] | None ¶
Return set of currently valid workers for key
If all workers are valid then this returns
None
, in which case any running worker can be used. Otherwise, the subset of running workers valid for this task is returned. This checks tracks the following state:worker_restrictions
host_restrictions
resource_restrictions
- worker_objective(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) tuple ¶
Objective function to determine which worker should get the task
Minimize expected start time. If a tie then break with data storage.
- worker_send(worker: str, msg: dict[str, Any]) None [source]¶
Send message to worker
This also handles connection failures by adding a callback to remove the worker on the next cycle.
- workers_list(workers: collections.abc.Iterable[str] | None) list[str] [source]¶
List of qualifying workers
Takes a list of worker addresses or hostnames. Returns a list of all worker addresses that match
- workers_to_close(memory_ratio: int | float | None = None, n: int | None = None, key: collections.abc.Callable[[distributed.scheduler.WorkerState], collections.abc.Hashable] | bytes | None = None, minimum: int | None = None, target: int | None = None, attribute: str = 'address') list[str] [source]¶
Find workers that we can close with low cost
This returns a list of workers that are good candidates to retire. These workers are not running anything and are storing relatively little data relative to their peers. If all workers are idle then we still maintain enough workers to have enough RAM to store our data, with a comfortable buffer.
This is for use with systems like
distributed.deploy.adaptive
.- Parameters
- memory_ratioNumber
Amount of extra space we want to have for our stored data. Defaults to 2, or that we want to have twice as much memory as we currently have data.
- nint
Number of workers to close
- minimumint
Minimum number of workers to keep around
- keyCallable(WorkerState)
An optional callable mapping a WorkerState object to a group affiliation. Groups will be closed together. This is useful when closing workers must be done collectively, such as by hostname.
- targetint
Target number of workers to have after we close
- attributestr
The attribute of the WorkerState object to return, like “address” or “name”. Defaults to “address”.
- Returns
- to_close: list of worker addresses that are OK to close
See also
Examples
>>> scheduler.workers_to_close() ['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']
Group workers by hostname prior to closing
>>> scheduler.workers_to_close(key=lambda ws: ws.host) ['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']
Remove two workers
>>> scheduler.workers_to_close(n=2)
Keep enough workers to have twice as much memory as we we need.
>>> scheduler.workers_to_close(memory_ratio=2)
- class distributed.scheduler.TaskState(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], run_spec: dask._task_spec.GraphNode | None, state: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten'], group: distributed.scheduler.TaskGroup, validate: bool)[source]¶
A simple object holding information about a task.
Not to be confused with
distributed.worker_state_machine.TaskState
, which holds similar information on the Worker side.- add_dependency(other: distributed.scheduler.TaskState) None [source]¶
Add another task as a dependency of this task
- dependencies: set[distributed.scheduler.TaskState]¶
The set of tasks this task depends on for proper execution. Only tasks still alive are listed in this set. If, for whatever reason, this task also depends on a forgotten task, the
has_lost_dependencies
flag is set.A task can only be executed once all its dependencies have already been successfully executed and have their result stored on at least one worker. This is tracked by progressively draining the
waiting_on
set.
- dependents: set[distributed.scheduler.TaskState]¶
The set of tasks which depend on this task. Only tasks still alive are listed in this set. This is the reverse mapping of
dependencies
.
- erred_on: set[str] | None¶
Worker addresses on which errors appeared, causing this task to be in an error state.
- exception: distributed.protocol.serialize.Serialized | None¶
If this task failed executing, the exception object is stored here.
- exception_blame: distributed.scheduler.TaskState | None¶
If this task or one of its dependencies failed executing, the failed task is stored here (possibly itself).
- group: distributed.scheduler.TaskGroup¶
The group of tasks to which this one belongs
- has_lost_dependencies: bool¶
Whether any of the dependencies of this task has been forgotten. For memory consumption reasons, forgotten tasks are not kept in memory even though they may have dependent tasks. When a task is forgotten, therefore, each of its dependents has their
has_lost_dependencies
attribute set toTrue
.If
has_lost_dependencies
is true, this task cannot go into the “processing” state anymore.
- host_restrictions: set[str] | None¶
A set of hostnames where this task can be run (or
None
if empty). Usually this is empty unless the task has been specifically restricted to only run on certain hosts. A hostname may correspond to one or several connected workers.
- key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]]¶
The key is the unique identifier of a task, generally formed from the name of the function, followed by a hash of the function and arguments, like
'inc-ab31c010444977004d656610d2d421ec'
.
- loose_restrictions: bool¶
- False
Each of
host_restrictions
,worker_restrictions
andresource_restrictions
is a hard constraint: if no worker is available satisfying those restrictions, the task cannot go into the “processing” state and will instead go into the “no-worker” state.- True
The above restrictions are mere preferences: if no worker is available satisfying those restrictions, the task can still go into the “processing” state and be sent for execution to another connected worker.
- nbytes: int¶
The number of bytes, as determined by
sizeof
, of the result of a finished task. This number is used for diagnostics and to help prioritize work. Set to -1 for unfinished tasks.
- property prefix: distributed.scheduler.TaskPrefix¶
The broad class of tasks to which this task belongs like “inc” or “read_csv”
- priority: tuple[float, ...] | None¶
The priority provides each task with a relative ranking which is used to break ties when many tasks are being considered for execution.
This ranking is generally a 2-item tuple. The first (and dominant) item corresponds to when it was submitted. Generally, earlier tasks take precedence. The second item is determined by the client, and is a way to prioritize tasks within a large graph that may be important, such as if they are on the critical path, or good to run in order to release many dependencies. This is explained further in Scheduling Policy.
- processing_on: distributed.scheduler.WorkerState | None¶
If this task is in the “processing” state, which worker is currently processing it. This attribute is kept in sync with
WorkerState.processing
.
- resource_restrictions: dict[str, float] | None¶
Resources required by this task, such as
{'gpu': 1}
or{'memory': 1e9}
These are user-defined names and are matched against the : contents of eachWorkerState.resources
dictionary.
- retries: int¶
The number of times this task can automatically be retried in case of failure. If a task fails executing (the worker returns with an error), its
retries
attribute is checked. If it is equal to 0, the task is marked “erred”. If it is greater than 0, theretries
attribute is decremented and execution is attempted again.
- run_id: int | None¶
The unique identifier of a specific execution of a task. This identifier is used to sign a task such that the assigned worker is expected to return the same identifier in the task-finished message. This is used to correlate responses. Only the most recently assigned worker is trusted. All other results will be rejected.
- run_spec: dask._task_spec.GraphNode | None¶
A specification of how to run the task. The type and meaning of this value is opaque to the scheduler, as it is only interpreted by the worker to which the task is sent for executing.
As a special case, this attribute may also be
None
, in which case the task is “pure data” (such as, for example, a piece of data loaded in the scheduler usingClient.scatter()
). A “pure data” task cannot be computed again if its value is lost.
- property state: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']¶
This task’s current state. Valid states are
released
,waiting
,no-worker
,processing
,memory
,erred
andforgotten
. If it isforgotten
, the task isn’t stored in thetasks
dictionary anymore and will probably disappear soon from memory.
- suspicious: int¶
The number of times this task has been involved in a worker death.
Some tasks may cause workers to die (such as calling
os._exit(0)
). When a worker dies, all of the tasks on that worker are reassigned to others. This combination of behaviors can cause a bad task to catastrophically destroy all workers on the cluster, one after another. Whenever a worker dies, we mark each task currently processing on that worker (as recorded byWorkerState.processing
) as suspicious. If a task is involved in three deaths (or some other fixed constant) then we mark the task aserred
.
- traceback: distributed.protocol.serialize.Serialized | None¶
If this task failed executing, the traceback object is stored here.
- waiters: set[distributed.scheduler.TaskState] | None¶
The set of tasks which need this task to remain alive. This is always a subset of
dependents
. Each time one of the dependents has finished processing, it is removed from thewaiters
set.Once both
waiters
andwho_wants
become empty, this task can be released (if it has a non-emptyrun_spec
) or forgotten (otherwise) by the scheduler, and by any workers inwho_has
.Note
Counter-intuitively,
waiting_on
andwaiters
are not reverse mappings of each other.
- waiting_on: set[distributed.scheduler.TaskState] | None¶
The set of tasks this task is waiting on before it can be executed. This is always a subset of
dependencies
. Each time one of the dependencies has finished processing, it is removed from thewaiting_on
set.Once
waiting_on
becomes empty, this task can move from the “waiting” state to the “processing” state (unless one of the dependencies errored out, in which case this task is instead marked “erred”).
- who_has: set[distributed.scheduler.WorkerState] | None¶
The set of workers who have this task’s result in memory. It is non-empty iff the task is in the “memory” state. There can be more than one worker in this set if, for example,
Client.scatter()
orClient.replicate()
was used.This is the reverse mapping of
WorkerState.has_what
.
- who_wants: set[distributed.scheduler.ClientState] | None¶
The set of clients who want the result of this task to remain alive. This is the reverse mapping of
ClientState.wants_what
.When a client submits a graph to the scheduler it also specifies which output tasks it desires, such that their results are not released from memory.
Once a task has finished executing (i.e. moves into the “memory” or “erred” state), the clients in
who_wants
are notified.Once both
waiters
andwho_wants
become empty, this task can be released (if it has a non-emptyrun_spec
) or forgotten (otherwise) by the scheduler, and by any workers inwho_has
.
- worker_restrictions: set[str] | None¶
A set of complete worker addresses where this can be run (or
None
if empty). Usually this is empty unless the task has been specifically restricted to only run on certain workers. Note this is tracking worker addresses, not worker states, since the specific workers may not be connected at this time.
- class distributed.scheduler.WorkerState(*, address: str, status: distributed.core.Status, pid: int, name: object, nthreads: int = 0, memory_limit: int, local_directory: str, nanny: str | None, server_id: str, services: dict[str, int] | None = None, versions: dict[str, Any] | None = None, extra: dict[str, Any] | None = None, scheduler: distributed.scheduler.SchedulerState | None = None)[source]¶
A simple object holding information about a worker.
Not to be confused with
distributed.worker_state_machine.WorkerState
.- actors: set[distributed.scheduler.TaskState]¶
A set of all TaskStates on this worker that are actors. This only includes those actors whose state actually lives on this worker, not actors to which this worker has a reference.
- add_replica(ts: distributed.scheduler.TaskState) None [source]¶
The worker acquired a replica of task
- add_to_processing(ts: distributed.scheduler.TaskState) None [source]¶
Assign a task to this worker for compute.
- address: str¶
This worker’s unique key. This can be its connected address (such as
"tcp://127.0.0.1:8891"
) or an alias (such as"alice"
).
- clean() distributed.scheduler.WorkerState [source]¶
Return a version of this object that is appropriate for serialization
- executing: dict[distributed.scheduler.TaskState, float]¶
A dictionary of tasks that are currently being run on this worker. Each task state is associated with the duration in seconds which the task has been running.
- property has_what: collections.abc.Set[distributed.scheduler.TaskState]¶
An insertion-sorted set-like of tasks which currently reside on this worker. All the tasks here are in the “memory” state. This is the reverse mapping of
TaskState.who_has
.This is a read-only public accessor. The data is implemented as a dict without values, because rebalance() relies on dicts being insertion-sorted.
- long_running: set[distributed.scheduler.TaskState]¶
Running tasks that invoked
distributed.secede()
- property memory: distributed.scheduler.MemoryState¶
Polished memory metrics for the worker.
Design note on managed memory
There are two measures available for managed memory:
self.nbytes
self.metrics["managed_bytes"]
At rest, the two numbers must be identical. However,
self.nbytes
is immediately updated through the batched comms as soon as each task lands in memory on the worker;self.metrics["managed_bytes"]
instead is updated by the heartbeat, which can lag several seconds behind.Below we are mixing likely newer managed memory info from
self.nbytes
with process and spilled memory from the heartbeat. This is deliberate, so that managed memory total is updated more frequently.Managed memory directly and immediately contributes to optimistic memory, which is in turn used in Active Memory Manager heuristics (at the moment of writing; more uses will likely be added in the future). So it’s important to have it up to date; much more than it is for process memory.
Having up-to-date managed memory info as soon as the scheduler learns about task completion also substantially simplifies unit tests.
The flip side of this design is that it may cause some noise in the unmanaged_recent measure. e.g.:
Delete 100MB of managed data
The updated managed memory reaches the scheduler faster than the updated process memory
There’s a blip where the scheduler thinks that there’s a sudden 100MB increase in unmanaged_recent, since process memory hasn’t changed but managed memory has decreased by 100MB
When the heartbeat arrives, process memory goes down and so does the unmanaged_recent.
This is OK - one of the main reasons for the unmanaged_recent / unmanaged_old split is exactly to concentrate all the noise in unmanaged_recent and exclude it from optimistic memory, which is used for heuristics.
Something that is less OK, but also less frequent, is that the sudden deletion of spilled keys will cause a negative blip in managed memory:
Delete 100MB of spilled data
The updated managed memory total reaches the scheduler faster than the updated spilled portion
This causes the managed memory to temporarily plummet and be replaced by unmanaged_recent, while spilled memory remains unaltered
When the heartbeat arrives, managed goes back up, unmanaged_recent goes back down, and spilled goes down by 100MB as it should have to begin with.
GH#6002 will let us solve this.
- nbytes: int¶
The total memory size, in bytes, used by the tasks this worker holds in memory (i.e. the tasks in this worker’s
has_what
).
- needs_what: dict[distributed.scheduler.TaskState, int]¶
Keys that may need to be fetched to this worker, and the number of tasks that need them. All tasks are currently in memory on a worker other than this one. Much like processing, this does not exactly reflect worker state: keys here may be queued to fetch, in flight, or already in memory on the worker.
- processing: set[distributed.scheduler.TaskState]¶
All the tasks here are in the “processing” state. This attribute is kept in sync with
TaskState.processing_on
.
- remove_from_processing(ts: distributed.scheduler.TaskState) None [source]¶
Remove a task from a workers processing
- remove_replica(ts: distributed.scheduler.TaskState) None [source]¶
The worker no longer has a task in memory
- resources: dict[str, float]¶
The available resources on this worker, e.g.
{"GPU": 2}
. These are abstract quantities that constrain certain tasks from running at the same time on this worker.
- status: distributed.core.Status¶
Read-only worker status, synced one way from the remote Worker object
- class distributed.scheduler.ClientState(client: str, *, versions: dict[str, Any] | None = None)[source]¶
A simple object holding information about a client.
- client_key: str¶
A unique identifier for this client. This is generally an opaque string generated by the client itself.
- wants_what: set[distributed.scheduler.TaskState]¶
A set of tasks this client wants to be kept in memory, so that it can download its result when desired. This is the reverse mapping of
TaskState.who_wants
. Tasks are typically removed from this set when the corresponding object in the client’s space (for example aFuture
or a Dask collection) gets garbage-collected.
- distributed.scheduler.decide_worker(ts: distributed.scheduler.TaskState, all_workers: set[distributed.scheduler.WorkerState], valid_workers: set[distributed.scheduler.WorkerState] | None, objective: collections.abc.Callable[[distributed.scheduler.WorkerState], Any]) distributed.scheduler.WorkerState | None [source]¶
Decide which worker should take task ts.
We choose the worker that has the data on which ts depends.
If several workers have dependencies then we choose the less-busy worker.
Optionally provide valid_workers of where jobs are allowed to occur (if all workers are allowed to take the task, pass None instead).
If the task requires data communication because no eligible worker has all the dependencies already, then we choose to minimize the number of bytes sent between workers. This is determined by calling the objective function.
- class distributed.scheduler.MemoryState(*, process: int, unmanaged_old: int, managed: int, spilled: int)[source]¶
Memory readings on a worker or on the whole cluster.
Attributes / properties:
- managed_total
Sum of the output of sizeof() for all dask keys held by the worker in memory, plus number of bytes spilled to disk
- managed
Sum of the output of sizeof() for the dask keys held in RAM. Note that this may be inaccurate, which may cause inaccurate unmanaged memory (see below).
- spilled
Number of bytes for the dask keys spilled to the hard drive. Note that this is the size on disk; size in memory may be different due to compression and inaccuracies in sizeof(). In other words, given the same keys, ‘managed’ will change depending on the keys being in memory or spilled.
- process
Total RSS memory measured by the OS on the worker process. This is always exactly equal to managed + unmanaged.
- unmanaged
process - managed. This is the sum of
Python interpreter and modules
global variables
memory temporarily allocated by the dask tasks that are currently running
memory fragmentation
memory leaks
memory not yet garbage collected
memory not yet free()’d by the Python memory manager to the OS
- unmanaged_old
Minimum of the ‘unmanaged’ measures over the last
distributed.memory.recent-to-old-time
seconds- unmanaged_recent
unmanaged - unmanaged_old; in other words process memory that has been recently allocated but is not accounted for by dask; hopefully it’s mostly a temporary spike.
- optimistic
managed + unmanaged_old; in other words the memory held long-term by the process under the hopeful assumption that all unmanaged_recent memory is a temporary spike