Worker State Machine

Task states

When the Scheduler asks a Worker to compute a task, it is tracked by the Worker through a distributed.worker_state_machine.TaskState object - not to be confused with the matching scheduler-side class distributed.scheduler.TaskState.

The class has a key attribute, TaskState.state, which can assume the following values:

released

Known but not actively computing or in memory. A task can stay in this state when the scheduler asked to forget it, but it has dependent tasks on the same worker.

waiting

The scheduler has added the task to the worker queue. All of its dependencies are in memory somewhere on the cluster, but not all of them are in memory on the current worker, so they need to be fetched.

fetch

This task is in memory on one or more peer workers, but not on this worker. Its data is queued to be transferred over the network, either because it’s a dependency of a task in waiting state, or because the Active Memory Manager requested it to be replicated here. The task can be found in the WorkerState.data_needed heap.

missing

Like fetch, but all peer workers that were listed by the scheduler are either unreachable or have responded they don’t actually have the task data. The worker will periodically ask the scheduler if it knows of additional replicas; when it does, the task will transition again to fetch. The task can be found in the WorkerState.missing_dep_flight set.

flight

The task data is currently being transferred over the network from another worker. The task can be found in the WorkerState.in_flight_tasks and WorkerState.in_flight_workers collections.

ready

The task is ready to be computed; all of its dependencies are in memory on the current worker and it’s waiting for an available thread. The task can be found in the WorkerState.ready heap.

constrained

Like ready, but the user specified resource constraints for this task. The task can be found in the WorkerState.constrained queue.

executing

The task is currently being computed on a thread. It can be found in the WorkerState.executing set and in the distributed.worker.Worker.active_threads dict.

long-running

Like executing, but the user code called distributed.secede() so the task no longer counts towards the maximum number of concurrent tasks. It can be found in the WorkerState.long_running set and in the distributed.worker.Worker.active_threads dict.

rescheduled

The task just raised the Reschedule exception. This is a transitory state, which is not stored permanently.

cancelled

The scheduler asked to forget about this task, but it’s technically impossible at the moment. See Task cancellation. The task can be found in whatever collections it was in its previous state.

resumed

The task was recovered from cancelled state. See Task cancellation. The task can be found in whatever collections it was in its previous state.

memory

Task execution completed, or the task was successfully transferred from another worker, and is now held in either WorkerState.data or WorkerState.actors.

error

Task execution failed. Alternatively, task execution completed successfully, or the task data transferred successfully over the network, but it failed to serialize or deserialize. The full exception and traceback are stored in the task itself, so that they can be re-raised on the client.

forgotten

The scheduler asked this worker to forget about the task, and there are neither dependents nor dependencies on the same worker. As soon as a task reaches this state, it is immediately dereferenced from the WorkerState and will be soon garbage-collected. This is the only case where two instances of a TaskState object with the same key can (transitorily) exist in the same interpreter at the same time.

Fetching dependencies

Worker states for dependencies

As tasks that need to be computed arrive on the Worker, any dependencies that are not already in memory on the same worker are wrapped by a TaskState object and contain a listing of workers (TaskState.who_has) to collect their result from.

These TaskState objects have their state set to fetch, are put in the data_needed heap, and are progressively transferred over the network. For each dependency we select a worker at random that has that data and collect the dependency from that worker. To improve bandwidth, we opportunistically gather other dependencies of other tasks that are known to be on that worker, up to a maximum of 50MB of data (transfer_message_bytes_limit, which is acquired from the configuration key distributed.worker.transfer.message-bytes-limit) - too little data and bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of 50 connections (transfer_incoming_count_limit, which is in turn acquired from the configuration key distributed.worker.connections.outgoing) so as to avoid overly-fragmenting our network bandwidth.

In the event that the network comms between two workers are saturated, a dependency task may cycle between fetch and flight until it is successfully collected. It may also happen that a peer worker responds that it doesn’t have a replica of the requested data anymore; finally, the peer worker may be unreachable or unresponsive. When that happens, the peer is removed from who_has and the task is transitioned back to fetch, so that the Worker will try gathering the same key from a different peer. If who_has becomes empty due to this process, the task transitions to missing and the Worker starts periodically asking the Scheduler if additional peers are available.

The same system used for fetching dependencies is also used by Active Memory Manager replication.

Note

There is at most one gather_dep() asyncio task running at any given time for any given peer worker. If all workers holding a replica of a task in fetch state are already in flight, the task will remain in fetch state until a worker becomes available again.

Computing tasks

A TaskState that needs to be computed proceeds on the Worker through the following pipeline. It has its run_spec defined, which instructs the worker how to execute it.

Worker states for computing tasks

After all dependencies for a task are in memory, the task transitions from waiting to ready or constrained and is added to the ready heap.

As soon as a thread is available, we pop a task from the top of the heap and put the task into a thread from a local thread pool to execute.

Optionally, while it’s running, this task may identify itself as a long-running task (see Tasks launching tasks), at which point it secedes from the thread pool and changes state to long-running. executing and long-running are almost identical states, the only difference being that the latter don’t count towards the maximum number of tasks running in parallel at the same time.

A task can terminate in three ways:

  • Complete successfully; its return value is stored in either data or actors

  • Raise an exception; the exception and traceback are stored on the TaskState object

  • Raise Reschedule; it is immediately forgotten.

In all cases, the outcome is sent back to the scheduler.

Scattered data

Scattered data follows an even simpler path, landing directly in memory:

Worker states for scattered data

Forgetting tasks

Once a task is in memory or error, the Worker will hold onto it indefinitely, until the Scheduler explicitly asks the Worker to forget it. This happens when there are no more Clients holding a reference to the key and there are no more waiter tasks (that is, dependents that have not been computed). Additionally, the Active Memory Manager may ask to drop excess replicas of a task.

In the case of rescheduled, the task will instead immediately transition to released and then forgotten without waiting for the scheduler.

Worker states for computing tasks

Irregular flow

There are a few important exceptions to the flow diagrams above:

  • A task is stolen, in which case it transitions from waiting, ready, or constrained directly to released. Note that steal requests for tasks that are currently executing are rejected.

  • Scheduler intercession, in which the scheduler reassigns a task that was previously assigned to a separate worker to a new worker. This most commonly occurs when a worker dies during computation.

  • Client intercession, where a client either explicitly releases a Future or descopes it; alternatively the whole client may shut down or become unresponsive. When there are no more clients holding references to a key or one of its dependents, the Scheduler will release it.

In short:

Important

A task can transition to released from any state, not just those in the diagrams above.

If there are no dependants, the task immediately transitions to forgotten and is descoped. However, there is an important exception, Task cancellation.

Task cancellation

The Worker may receive a request to release a key while it is currently in flight, executing, or long-running. Due to technical limitations around cancelling Python threads, and the way data fetching from peer workers is currently implemented, such an event cannot cause the related asyncio task (and, in the case of executing / long-running, the thread running the user code) to be immediately aborted. Instead, tasks in these three states are instead transitioned to another state, cancelled, which means that the asyncio task will proceed to completion (outcome is irrelevant) and then* the Dask task will be released.

The cancelled state has a substate, previous, which is set to one of the above three states. The common notation for this <state>(<previous>), e.g. cancelled(flight).

While a task is cancelled, one of three things will happen:

  • Nothing happens before the asyncio task completes; e.g. the Scheduler does not change its mind and still wants the Worker to forget about the task until the very end. When that happens, the task transitions from cancelled to released and, typically, forgotten.

  • The scheduler switches back to its original request:

    • The scheduler asks the Worker to fetch a task that is currently cancelled(flight); at which point the task will immediately revert to flight, forget that cancellation ever happened, and continue waiting on the data fetch that’s already running;

    • The scheduler asks the Worker to compute a task that is currently cancelled(executing) or cancelled(long-running). The Worker will completely disregard the new run_spec (if it changed), switch back to the previous state, and wait for the already executing thread to finish.

  • The scheduler flips to the opposite request, from fetch to computation or the other way around.

To serve this last use case there is another special state, resumed. A task can enter resumed state exclusively from cancelled. resumed retains the previous attribute from the cancelled state and adds another attribute, next, which is always:

  • fetch, if previous is executing or long-running

  • waiting, if previous is flight

To recap, these are all possible permutations of states and substates to handle cancelled tasks:

state

previous

next

cancelled

flight

None

cancelled

executing

None

cancelled

long-running

None

resumed

flight

waiting

resumed

executing

fetch

resumed

long-running

fetch

If a resumed task completes successfully, it will transition to memory (as opposed to a cancelled task, where the output is disregarded) and the Scheduler will be informed with a spoofed termination message, that is the expected end message for flight if the task is resumed(executing->fetch) or resumed(long-running->fetch), and the expected end message for execute if the task is resumed(flight->waiting).

If the task fails or raises Reschedule, the Worker will instead silently ignore the exception and switch to its intended course, so resumed(executing->fetch) or resumed(long-running->fetch) will transition to fetch and resumed(flight->waiting) will transition to waiting.

Finally, the scheduler can change its mind multiple times over the lifetime of the task, so a resumed(executing->fetch) or resumed(long-running->fetch) task may be requested to transition to waiting again, at which point it will just revert to its previous state and forget the whole incident; likewise a resumed(flight->waiting) task could be requested to transition to fetch again, so it will just transition to flight instead.

Worker states for cancel/resumeWorker states for cancel/resume

A common real-life use case

  1. There are at least two workers on the cluster, A and B.

  2. Task x is computed successfully on worker A.

  3. When task x transitions to memory on worker A, the scheduler asks worker B to compute task y, which depends on task x.

  4. B starts acquiring the key x from A, which sends the task into flight mode.

  5. Worker A crashes, and for whatever reason the scheduler notices before worker B does.

  6. The scheduler will release task y (because it’s waiting on dependencies that are nowhere to be found in memory anymore) and reschedule task x somewhere else on the cluster. Task x will transition to cancelled(flight) on worker A.

  7. If the scheduler randomly chooses worker A to compute task X, the task will transition to resumed(flight->waiting).

  8. When, and only when, the TCP socket from A to B collapses (e.g. due to timeout), the task will transition to waiting and will be eventually recomputed on A.

Important

You always have at most one compute() or gather_dep() asyncio task running for any one given key; you never have both.

Task state mapping between Scheduler and Worker

The task states on the scheduler and the worker are different, and their mapping is somewhat nuanced:

Scheduler states

Typical worker states

Edge case worker states

  • released

  • waiting

  • no-worker

  • (unknown)

  • released

  • cancelled

  • processing

  • waiting

  • ready

  • constrained

  • executing

  • long-running

  • resumed(waiting)

  • memory

  • memory

  • fetch

  • flight

  • error

  • missing

  • resumed(fetch)

  • erred

  • error

In addition to the above states, a worker may not know about a specific task at all. The opposite, where the worker knows about a task but it is nowhere to be found on the scheduler, happens exclusively in the case of Task cancellation.

There are also race conditions to be considered, where a worker (or some workers) know something before the scheduler does, or the other way around. For example,

  • A task will always transition from executing to memory on the worker before it can transition from processing to memory on the scheduler

  • A task will always transition to released or forgotten on the scheduler first, and only when the message reaches the worker it will be released there too.

Flow control

Worker state machine control flow

There are several classes involved in the worker state machine:

TaskState includes all the information related to a single task; it also includes references to dependent and dependency tasks. This is just a data holder, with no mutating methods. Note that this is a distinct class from distributed.scheduler.TaskState.

WorkerState encapsulates the state of the worker as a whole. It holds references to TaskState in its tasks dictionary and in several other secondary collections. Crucially, this class has no knowledge or visibility whatsoever on asyncio, networking, disk I/O, threads, etc. Note that this is a distinct class from distributed.scheduler.WorkerState.

WorkerState offers a single method to mutate the state: handle_stimulus(). The state must not be altered in any other way. The method acquires a StateMachineEvent, a.k.a. stimulus, which is a data class which determines that something happened which may cause the worker state to mutate. A stimulus can arrive from either the scheduler (e.g. a request to compute a task) or from the worker itself (e.g. a task has finished computing).

WorkerState.handle_stimulus() alters the internal state (e.g., it could transition a task from executing to memory) and returns a list of Instruction objects, which are actions that the worker needs to take but are external to the state itself:

  • send a message to the scheduler

  • compute a task

  • gather a task from a peer worker

WorkerState.handle_stimulus() is wrapped by BaseWorker.handle_stimulus(), which consumes the Instruction objects. BaseWorker deals with asyncio task creation, tracking, and cleanup, but does not actually implement the actual task execution or gather; instead it exposes abstract async methods execute() and gather_dep(), which are then overridden by its subclass Worker, which actually runs tasks and performs network I/O. When the implemented methods finish, they must return a StateMachineEvent, which is fed back into BaseWorker.handle_stimulus().

Note

This can create a (potentially very long) chain of events internal to the worker; e.g. if there are more tasks in the ready queue than there are threads, then the termination StateMachineEvent of one task will trigger the Instruction to execute the next one.

To summarize:

Internal state permutation

Internally, WorkerState.handle_stimulus() works very similarly to the same process on the scheduler side:

  1. WorkerState.handle_stimulus() calls WorkerState._handle_<stimulus name>(),

  2. which returns a tuple of

  3. WorkerState.handle_stimulus() then passes the recommendations to WorkerState._transitions()

  4. For each recommendation, WorkerState._transitions() calls WorkerState._transition(),

  5. which in turn calls WorkerState._transition_<start state>_<end state>(),

  6. which in turn returns an additional tuple of (recommendations, instructions)

  7. the new recommendations are consumed by WorkerState._transitions(), until no more recommendations are returned.

  8. WorkerState.handle_stimulus() finally returns the list of instructions, which has been progressively extended by the transitions.

API Documentation

class distributed.worker_state_machine.TaskState(key: Key, run_id: int = -1, run_spec: T_runspec | None = None, dependencies: set[TaskState] = <factory>, dependents: set[TaskState] = <factory>, waiting_for_data: set[TaskState] = <factory>, waiters: set[TaskState] = <factory>, state: TaskStateState = 'released', previous: Literal['executing', 'long-running', 'flight', None] = None, next: Literal['fetch', 'waiting', None] = None, duration: float | None = None, priority: tuple[int, ...] | None = None, who_has: set[str] = <factory>, coming_from: str | None = None, resource_restrictions: dict[str, float] = <factory>, exception: Serialize | None = None, traceback: Serialize | None = None, exception_text: str = '', traceback_text: str = '', type: type | None = None, suspicious_count: int = 0, startstops: list[StartStop] = <factory>, start_time: float | None = None, stop_time: float | None = None, metadata: dict = <factory>, nbytes: int | None = None, annotations: dict | None = None, span_id: str | None = None, done: bool = False)[source]

Holds volatile state relating to an individual Dask task.

Not to be confused with distributed.scheduler.TaskState, which holds similar information on the scheduler side.

annotations: dict | None

Arbitrary task annotations

coming_from: str | None

The worker that current task data is coming from if task is in flight

dependencies: set[TaskState]

The data needed by this key to run

dependents: set[TaskState]

The keys that use this dependency

done: bool

True if the execute() or gather_dep() coroutine servicing this task completed; False otherwise. This flag changes the behaviour of transitions out of the executing, flight etc. states.

duration: float | None

Expected duration of the task

exception: Serialize | None

The exception caused by running a task if it erred (serialized)

exception_text: str

string representation of exception

key: Key

Task key. Mandatory.

metadata: dict

Metadata related to the task. Stored metadata should be msgpack serializable (e.g. int, string, list, dict).

nbytes: int | None

The size of the value of the task, if in memory

next: Literal['fetch', 'waiting', None]

The next state of the task. It is not None iff state == resumed.

prefix: str

Task prefix (leftmost part of the key)

previous: Literal['executing', 'long-running', 'flight', None]

The previous state of the task. It is not None iff state in (cancelled, resumed).

priority: tuple[int, ...] | None

The priority this task given by the scheduler. Determines run order.

resource_restrictions: dict[str, float]

Abstract resources required to run a task

run_id: int

Task run ID.

run_spec: T_runspec | None

A tuple containing the function, args, kwargs and task associated with this TaskState instance. This defaults to None and can remain empty if it is a dependency that this worker will receive from another worker.

span_id: str | None

unique span id (see distributed.spans). Matches distributed.scheduler.TaskState.group.span_id.

start_time: float | None

Time at which task begins running

startstops: list[StartStop]

Log of transfer, load, and compute times for a task

state: TaskStateState

The current state of the task

stop_time: float | None

Time at which task finishes running

suspicious_count: int

The number of times a dependency has not been where we expected it

traceback: Serialize | None

The traceback caused by running a task if it erred (serialized)

traceback_text: str

string representation of traceback

type: type | None

The type of a particular piece of data

waiters: set[TaskState]

Subset of dependents that are not in memory

waiting_for_data: set[TaskState]

Subset of dependencies that are not in memory

who_has: set[str]

Addresses of workers that we believe have this data

class distributed.worker_state_machine.WorkerState(*, nthreads: int = 1, address: str | None = None, data: MutableMapping[Key, object] | None = None, threads: dict[Key, int] | None = None, plugins: dict[str, WorkerPlugin] | None = None, resources: Mapping[str, float] | None = None, transfer_incoming_count_limit: int = 9999, validate: bool = True, transition_counter_max: int | Literal[False] = False, transfer_incoming_bytes_limit: float = inf, transfer_message_bytes_limit: float = inf)[source]

State machine encapsulating the lifetime of all tasks on a worker.

Not to be confused with distributed.scheduler.WorkerState.

Note

The data attributes of this class are implementation details and may be changed without a deprecation cycle.

Warning

The attributes of this class are all heavily correlated with each other. Do not modify them directly, ever, as it is extremely easy to obtain a broken state this way, which in turn will likely result in cluster-wide deadlocks.

The state should be exclusively mutated through handle_stimulus().

actors: dict[Key, object]

Actor tasks. See Actors.

address: str

Worker <IP address>:<port>. This is used in decision-making by the state machine, e.g. to determine if a peer worker is running on the same host or not. This attribute may not be known when the WorkerState is initialised. It must be set before the first call to handle_stimulus().

property all_running_tasks: set[distributed.worker_state_machine.TaskState]

All tasks that are currently occupying a thread. They may or may not count towards the maximum number of threads.

These are:

  • ts.status in (executing, long-running)

  • ts.status in (cancelled, resumed) and ts.previous in (executing, long-running)

available_resources: dict[str, float]

{resource name: amount}. Current resources that aren’t being currently consumed by task execution. Always less or equal to total_resources. See Worker Resources.

busy_workers: set[str]

Peer workers that recently returned a busy status. Workers in this set won’t be asked for additional dependencies for some time.

constrained: HeapSet[TaskState]

Priority heap of tasks that are ready to run, but are waiting on abstract resources like GPUs. Mutually exclusive with ready. See available_resources and Worker Resources.

data: MutableMapping[Key, object]

In-memory tasks data. This collection is shared by reference between Worker, WorkerMemoryManager, and this class.

data_needed: defaultdict[str, HeapSet[TaskState]]

The tasks which still require data in order to execute and are in memory on at least another worker, prioritized as per-worker heaps. All and only tasks with TaskState.state == 'fetch' are in this collection. A TaskState with multiple entries in who_has will appear multiple times here.

executed_count: int

A number of tasks that this worker has run in its lifetime; this includes failed and cancelled tasks. See also executing_count().

executing: set[TaskState]

Set of tasks that are currently running.

This set includes exclusively tasks with state == ‘executing’ as well as tasks with state in (‘cancelled’, ‘resumed’) and previous == ‘executing`.

See also executing_count() and long_running.

property executing_count: int

Count of tasks currently executing on this worker and counting towards the maximum number of threads.

It includes cancelled tasks, but does not include long running (a.k.a. seceded) tasks.

fetch_count: int

Total number of tasks in fetch state. If a task is in more than one data_needed heap, it’s only counted once.

generation: int

Counter that decreases every time the compute-task handler is invoked by the Scheduler. It is appended to TaskState.priority and acts as a tie-breaker between tasks that have the same priority on the Scheduler, determining a last-in-first-out order between them.

handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) list[distributed.worker_state_machine.Instruction][source]

Process one or more external events, transition relevant tasks to new states, and return a list of instructions to be executed as a consequence.

has_what: defaultdict[str, set[Key]]

{worker address: {ts.key, ...}. The data that we care about that we think a worker has

in_flight_tasks: set[TaskState]

Tasks that are coming to us in current peer-to-peer connections.

This set includes exclusively tasks with state == ‘flight’ as well as tasks with state in (‘cancelled’, ‘resumed’) and previous == ‘flight`.

See also in_flight_tasks_count().

property in_flight_tasks_count: int

Number of tasks currently being replicated from other workers to this one.

in_flight_workers: dict[str, set[Key]]

{worker address: {ts.key, ...}} The workers from which we are currently gathering data and the dependencies we expect from those connections. Workers in this dict won’t be asked for additional dependencies until the current query returns.

log: deque[tuple]

Transition log: [(..., stimulus_id: str | None, timestamp: float), ...] The number of stimuli logged is capped. See also story() and stimulus_log.

long_running: set[TaskState]

Set of tasks that are currently running and have called secede(), so they no longer count towards the maximum number of concurrent tasks (nthreads). These tasks do not appear in the executing set.

This set includes exclusively tasks with state == ‘long-running’ as well as tasks with state in (‘cancelled’, ‘resumed’) and previous == ‘long-running`.

missing_dep_flight: set[TaskState]

All and only tasks with TaskState.state == 'missing'.

nbytes: int

Total size of all tasks in memory

nthreads: int

Number of tasks that can be executing in parallel. At any given time, executing_count() <= nthreads.

plugins: dict[str, WorkerPlugin]

{name: worker plugin}. This collection is shared by reference between Worker and this class. The Worker managed adding and removing plugins, while the WorkerState invokes the WorkerPlugin.transition method, is available.

ready: HeapSet[TaskState]

Priority heap of tasks that are ready to run and have no resource constrains. Mutually exclusive with constrained.

rng: random.Random

Statically-seeded random state, used to guarantee determinism whenever a pseudo-random choice is required

running: bool

True if the state machine should start executing more tasks and fetch dependencies whenever a slot is available. This property must be kept aligned with the Worker: WorkerState.running == (Worker.status is Status.running).

stimulus_log: deque[StateMachineEvent]

Log of all stimuli received by handle_stimulus(). The number of events logged is capped. See also log and stimulus_story().

stimulus_story(*keys_or_tasks: Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.worker_state_machine.TaskState]) list[distributed.worker_state_machine.StateMachineEvent][source]

Return all state machine events involving one or more tasks

story(*keys_or_tasks_or_stimuli: Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.worker_state_machine.TaskState]) list[tuple][source]

Return all records from the transitions log involving one or more tasks or stimulus_id’s

task_counter: TaskCounter

Current number of tasks and cumulative elapsed time in each state, both broken down by prefix

tasks: dict[Key, TaskState]

{key: TaskState}. The tasks currently executing on this worker (and any dependencies of those tasks)

threads: dict[Key, int]

{ts.key: thread ID}. This collection is shared by reference between Worker and this class. While the WorkerState is thread-agnostic, it still needs access to this information in some cases. This collection is populated by distributed.worker.Worker.execute(). It does not need to be populated for the WorkerState to work.

total_resources: dict[str, float]

{resource name: amount}. Total resources available for task execution. See :doc: resources.

transfer_incoming_bytes: int

Current total size of open data transfers from other workers

transfer_incoming_bytes_limit: float

Limit of bytes for incoming data transfers; this is used for throttling.

transfer_incoming_bytes_throttle_threshold: int

Ignore transfer_incoming_count_limit as long as transfer_incoming_bytes is less than this value.

property transfer_incoming_count: int

Current number of open data transfers from other workers.

transfer_incoming_count_limit: int

Maximum number of concurrent incoming data transfers from other workers. See also distributed.worker.Worker.transfer_outgoing_count_limit.

transfer_incoming_count_total: int

Total number of data transfers from other workers since the worker was started.

transfer_message_bytes_limit: float

Number of bytes to gather from the same worker in a single call to BaseWorker.gather_dep(). Multiple small tasks that can be gathered from the same worker will be batched in a single instruction as long as their combined size doesn’t exceed this value. If the first task to be gathered exceeds this limit, it will still be gathered to ensure progress. Hence, this limit is not absolute.

transition_counter: int

Total number of state transitions so far. See also log and transition_counter_max.

transition_counter_max: int | Literal[False]

Raise an error if the transition_counter ever reaches this value. This is meant for debugging only, to catch infinite recursion loops. In production, it should always be set to False.

validate: bool

If True, enable expensive internal consistency check. Typically disabled in production.

waiting: set[TaskState]

Tasks that are currently waiting for data

class distributed.worker_state_machine.BaseWorker(state: distributed.worker_state_machine.WorkerState)[source]

Wrapper around the WorkerState that implements instructions handling. This is an abstract class with several @abc.abstractmethod methods, to be subclassed by Worker and by unit test mock-ups.

abstract batched_send(msg: dict[str, Any]) None[source]

Send a fire-and-forget message to the scheduler through bulk comms.

Parameters
msg: dict

msgpack-serializable message to send to the scheduler. Must have a ‘op’ key which is registered in Scheduler.stream_handlers.

async close(timeout: float = 30) None[source]

Cancel all asynchronous instructions

abstract digest_metric(name: collections.abc.Hashable, value: float) None[source]

Log an arbitrary numerical metric

abstract async execute(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[source]

Execute a task

abstract async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[source]

Gather dependencies for a task from a worker who has them

Parameters
workerstr

Address of worker to gather dependencies from

to_gatherlist

Keys of dependencies to gather from worker – this is not necessarily equivalent to the full list of dependencies of dep as some dependencies may already be present on this worker.

total_nbytesint

Total number of bytes for all the dependencies in to_gather combined

handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None[source]

Forward one or more external stimuli to WorkerState.handle_stimulus() and process the returned instructions, invoking the relevant Worker callbacks (@abc.abstractmethod methods below).

Spawn asyncio tasks for all asynchronous instructions and start tracking them.

abstract async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent[source]

Wait some time, then take a peer worker out of busy state

class distributed.worker_state_machine.StateMachineEvent(*args: Any, **kwargs: Any)[source]

Base abstract class for all stimuli that can modify the worker state

static from_dict(d: dict) distributed.worker_state_machine.StateMachineEvent[source]

Convert the output of recursive_to_dict back into the original object. The output object is meaningful for the purpose of rebuilding the state machine, but not necessarily identical to the original.

stimulus_id: str

Unique ID of the event

to_loggable(*, handled: float) distributed.worker_state_machine.StateMachineEvent[source]

Produce a variant version of self that is small enough to be stored in memory in the medium term and contains meaningful information for debugging

class distributed.worker_state_machine.Instruction(stimulus_id: str)[source]

Command from the worker state machine to the Worker, in response to an event

classmethod match(**kwargs: Any) distributed.worker_state_machine._InstructionMatch[source]

Generate a partial match to compare against an Instruction instance. The typical usage is to compare a list of instructions returned by WorkerState.handle_stimulus() or in WorkerState.stimulus_log vs. an expected list of matches.

Examples

instructions = ws.handle_stimulus(...)
assert instructions == [
    TaskFinishedMsg.match(key="x"),
    ...
]

Note

StateMachineEvent and Instruction are abstract classes, with many subclasses which are not listed here for the sake of brevity. Refer to the implementation module distributed.worker_state_machine for the full list.