Worker State Machine
Contents
Worker State Machine¶
Task states¶
When the Scheduler asks a Worker to compute a task, it is tracked by the Worker through
a distributed.worker_state_machine.TaskState
object - not to be confused with
the matching scheduler-side class distributed.scheduler.TaskState
.
The class has a key attribute, TaskState.state
, which can assume the following
values:
- released
Known but not actively computing or in memory. A task can stay in this state when the scheduler asked to forget it, but it has dependent tasks on the same worker.
- waiting
The scheduler has added the task to the worker queue. All of its dependencies are in memory somewhere on the cluster, but not all of them are in memory on the current worker, so they need to be fetched.
- fetch
This task is in memory on one or more peer workers, but not on this worker. Its data is queued to be transferred over the network, either because it’s a dependency of a task in
waiting
state, or because the Active Memory Manager requested it to be replicated here. The task can be found in theWorkerState.data_needed
heap.- missing
Like
fetch
, but all peer workers that were listed by the scheduler are either unreachable or have responded they don’t actually have the task data. The worker will periodically ask the scheduler if it knows of additional replicas; when it does, the task will transition again tofetch
. The task can be found in theWorkerState.missing_dep_flight
set.- flight
The task data is currently being transferred over the network from another worker. The task can be found in the
WorkerState.in_flight_tasks
andWorkerState.in_flight_workers
collections.- ready
The task is ready to be computed; all of its dependencies are in memory on the current worker and it’s waiting for an available thread. The task can be found in the
WorkerState.ready
heap.- constrained
Like
ready
, but the user specified resource constraints for this task. The task can be found in theWorkerState.constrained
queue.- executing
The task is currently being computed on a thread. It can be found in the
WorkerState.executing
set and in thedistributed.worker.Worker.active_threads
dict.- long-running
Like
executing
, but the user code calleddistributed.secede()
so the task no longer counts towards the maximum number of concurrent tasks. It can be found in theWorkerState.long_running
set and in thedistributed.worker.Worker.active_threads
dict.- rescheduled
The task just raised the
Reschedule
exception. This is a transitory state, which is not stored permanently.- cancelled
The scheduler asked to forget about this task, but it’s technically impossible at the moment. See Task cancellation. The task can be found in whatever collections it was in its
previous
state.- resumed
The task was recovered from
cancelled
state. See Task cancellation. The task can be found in whatever collections it was in itsprevious
state.- memory
Task execution completed, or the task was successfully transferred from another worker, and is now held in either
WorkerState.data
orWorkerState.actors
.- error
Task execution failed. Alternatively, task execution completed successfully, or the task data transferred successfully over the network, but it failed to serialize or deserialize. The full exception and traceback are stored in the task itself, so that they can be re-raised on the client.
- forgotten
The scheduler asked this worker to forget about the task, and there are neither dependents nor dependencies on the same worker. As soon as a task reaches this state, it is immediately dereferenced from the
WorkerState
and will be soon garbage-collected. This is the only case where two instances of aTaskState
object with the samekey
can (transitorily) exist in the same interpreter at the same time.
Fetching dependencies¶
As tasks that need to be computed arrive on the Worker, any dependencies that are not
already in memory on the same worker are wrapped by a TaskState
object and
contain a listing of workers (TaskState.who_has
) to collect their result from.
These TaskState
objects have their state set to fetch
, are put in the
data_needed
heap, and are progressively transferred over the
network. For each dependency we select a worker at random that has that data and collect
the dependency from that worker. To improve bandwidth, we opportunistically gather other
dependencies of other tasks that are known to be on that worker, up to a maximum of 50MB
of data (transfer_message_bytes_limit
, which is acquired from the
configuration key distributed.worker.transfer.message-bytes-limit
) - too little data
and bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of
50 connections (transfer_incoming_count_limit
, which is in turn
acquired from the configuration key distributed.worker.connections.outgoing
) so as
to avoid overly-fragmenting our network bandwidth.
In the event that the network comms between two workers are saturated, a dependency task
may cycle between fetch
and flight
until it is successfully collected. It may
also happen that a peer worker responds that it doesn’t have a replica of the requested
data anymore; finally, the peer worker may be unreachable or unresponsive. When that
happens, the peer is removed from who_has
and the task is
transitioned back to fetch
, so that the Worker will try gathering the same key from
a different peer. If who_has
becomes empty due to this process, the
task transitions to missing
and the Worker starts periodically asking the Scheduler
if additional peers are available.
The same system used for fetching dependencies is also used by Active Memory Manager replication.
Note
There is at most one gather_dep()
asyncio task running at any
given time for any given peer worker. If all workers holding a replica of a task
in fetch
state are already in flight, the task will remain in fetch
state
until a worker becomes available again.
Computing tasks¶
A TaskState
that needs to be computed proceeds on the Worker through the
following pipeline. It has its run_spec
defined, which instructs the
worker how to execute it.
After all dependencies for a task are in memory, the task transitions from waiting
to ready
or constrained
and is added to the ready
heap.
As soon as a thread is available, we pop a task from the top of the heap and put the task into a thread from a local thread pool to execute.
Optionally, while it’s running, this task may identify itself as a long-running task
(see Tasks launching tasks), at which point it secedes from the
thread pool and changes state to long-running. executing
and long-running
are
almost identical states, the only difference being that the latter don’t count towards
the maximum number of tasks running in parallel at the same time.
A task can terminate in three ways:
Complete successfully; its return value is stored in either
data
oractors
Raise an exception; the exception and traceback are stored on the
TaskState
objectRaise
Reschedule
; it is immediately forgotten.
In all cases, the outcome is sent back to the scheduler.
Forgetting tasks¶
Once a task is in memory
or error
, the Worker will hold onto it indefinitely,
until the Scheduler explicitly asks the Worker to forget it.
This happens when there are no more Clients holding a reference to the key and there are
no more waiter tasks (that is, dependents that have not been computed). Additionally,
the Active Memory Manager may ask to drop excess replicas of a task.
In the case of rescheduled
, the task will instead immediately transition to
released
and then forgotten
without waiting for the scheduler.
Irregular flow¶
There are a few important exceptions to the flow diagrams above:
A task is stolen, in which case it transitions from
waiting
,ready
, orconstrained
directly toreleased
. Note that steal requests for tasks that are currently executing are rejected.Scheduler intercession, in which the scheduler reassigns a task that was previously assigned to a separate worker to a new worker. This most commonly occurs when a worker dies during computation.
Client intercession, where a client either explicitly releases a Future or descopes it; alternatively the whole client may shut down or become unresponsive. When there are no more clients holding references to a key or one of its dependents, the Scheduler will release it.
In short:
Important
A task can transition to released
from any state, not just those in the
diagrams above.
If there are no dependants, the task immediately transitions to forgotten
and is
descoped. However, there is an important exception, Task cancellation.
Task cancellation¶
The Worker may receive a request to release a key while it is currently in flight
,
executing
, or long-running
. Due to technical limitations around cancelling
Python threads, and the way data fetching from peer workers is currently implemented,
such an event cannot cause the related asyncio task (and, in the case of executing
/
long-running
, the thread running the user code) to be immediately aborted. Instead,
tasks in these three states are instead transitioned to another state, cancelled
,
which means that the asyncio task will proceed to completion (outcome is irrelevant) and
then* the Dask task will be released.
The cancelled
state has a substate, previous
, which is set to one
of the above three states. The common notation for this <state>(<previous>)
,
e.g. cancelled(flight)
.
While a task is cancelled, one of three things will happen:
Nothing happens before the asyncio task completes; e.g. the Scheduler does not change its mind and still wants the Worker to forget about the task until the very end. When that happens, the task transitions from
cancelled
toreleased
and, typically,forgotten
.The scheduler switches back to its original request:
The scheduler asks the Worker to fetch a task that is currently
cancelled(flight)
; at which point the task will immediately revert toflight
, forget that cancellation ever happened, and continue waiting on the data fetch that’s already running;The scheduler asks the Worker to compute a task that is currently
cancelled(executing)
orcancelled(long-running)
. The Worker will completely disregard the newrun_spec
(if it changed), switch back to theprevious
state, and wait for the already executing thread to finish.
The scheduler flips to the opposite request, from fetch to computation or the other way around.
To serve this last use case there is another special state, resumed
. A task can
enter resumed
state exclusively from cancelled
. resumed
retains the
previous
attribute from the cancelled
state and adds another
attribute, next
, which is always:
To recap, these are all possible permutations of states and substates to handle cancelled tasks:
state |
previous |
next |
---|---|---|
cancelled |
flight |
None |
cancelled |
executing |
None |
cancelled |
long-running |
None |
resumed |
flight |
waiting |
resumed |
executing |
fetch |
resumed |
long-running |
fetch |
If a resumed
task completes successfully, it will transition to memory
(as
opposed to a cancelled
task, where the output is disregarded) and the Scheduler
will be informed with a spoofed termination message, that is the expected end message
for flight
if the task is resumed(executing->fetch)
or
resumed(long-running->fetch)
, and the expected end message for execute
if
the task is resumed(flight->waiting)
.
If the task fails or raises Reschedule
, the Worker will instead
silently ignore the exception and switch to its intended course, so
resumed(executing->fetch)
or resumed(long-running->fetch)
will transition to
fetch
and resumed(flight->waiting)
will transition to waiting
.
Finally, the scheduler can change its mind multiple times over the lifetime of the task,
so a resumed(executing->fetch)
or resumed(long-running->fetch)
task may be
requested to transition to waiting
again, at which point it will just revert to its
previous
state and forget the whole incident; likewise a
resumed(flight->waiting)
task could be requested to transition to fetch
again,
so it will just transition to flight
instead.
A common real-life use case
There are at least two workers on the cluster, A and B.
Task x is computed successfully on worker A.
When task x transitions to memory on worker A, the scheduler asks worker B to compute task y, which depends on task x.
B starts acquiring the key x from A, which sends the task into
flight
mode.Worker A crashes, and for whatever reason the scheduler notices before worker B does.
The scheduler will release task y (because it’s waiting on dependencies that are nowhere to be found in memory anymore) and reschedule task x somewhere else on the cluster. Task x will transition to
cancelled(flight)
on worker A.If the scheduler randomly chooses worker A to compute task X, the task will transition to
resumed(flight->waiting)
.When, and only when, the TCP socket from A to B collapses (e.g. due to timeout), the task will transition to
waiting
and will be eventually recomputed on A.
Important
You always have at most one compute()
or
gather_dep()
asyncio task running for any one given key; you
never have both.
Task state mapping between Scheduler and Worker¶
The task states on the scheduler and the worker are different, and their mapping is somewhat nuanced:
Scheduler states |
Typical worker states |
Edge case worker states |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
In addition to the above states, a worker may not know about a specific task at all. The opposite, where the worker knows about a task but it is nowhere to be found on the scheduler, happens exclusively in the case of Task cancellation.
There are also race conditions to be considered, where a worker (or some workers) know something before the scheduler does, or the other way around. For example,
A task will always transition from
executing
tomemory
on the worker before it can transition fromprocessing
tomemory
on the schedulerA task will always transition to
released
orforgotten
on the scheduler first, and only when the message reaches the worker it will be released there too.
Flow control¶
There are several classes involved in the worker state machine:
TaskState
includes all the information related to a single task; it also
includes references to dependent and dependency tasks. This is just a data holder, with
no mutating methods. Note that this is a distinct class from
distributed.scheduler.TaskState
.
WorkerState
encapsulates the state of the worker as a whole. It holds
references to TaskState
in its tasks
dictionary and in
several other secondary collections. Crucially, this class has no knowledge or
visibility whatsoever on asyncio, networking, disk I/O, threads, etc.
Note that this is a distinct class from distributed.scheduler.WorkerState
.
WorkerState
offers a single method to mutate the state:
handle_stimulus()
. The state must not be altered in any other way.
The method acquires a StateMachineEvent
, a.k.a. stimulus, which is a data
class which determines that something happened which may cause the worker state to
mutate. A stimulus can arrive from either the scheduler (e.g. a request to compute a
task) or from the worker itself (e.g. a task has finished computing).
WorkerState.handle_stimulus()
alters the internal state (e.g., it could transition
a task from executing
to memory
) and returns a list of Instruction
objects, which are actions that the worker needs to take but are external to the state
itself:
send a message to the scheduler
compute a task
gather a task from a peer worker
WorkerState.handle_stimulus()
is wrapped by BaseWorker.handle_stimulus()
,
which consumes the Instruction
objects. BaseWorker
deals with asyncio
task creation, tracking, and cleanup, but does not actually implement the actual task
execution or gather; instead it exposes abstract async methods
execute()
and gather_dep()
, which are then
overridden by its subclass Worker
, which actually runs tasks and
performs network I/O. When the implemented methods finish, they must return a
StateMachineEvent
, which is fed back into BaseWorker.handle_stimulus()
.
Note
This can create a (potentially very long) chain of events internal to the worker;
e.g. if there are more tasks in the ready
queue than there are
threads, then the termination StateMachineEvent
of one task will trigger the
Instruction
to execute the next one.
To summarize:
WorkerState
is agnostic to asyncio, networking, threading, and disk I/O; it includes collections ofTaskState
objects.BaseWorker
encapsulatesWorkerState
and adds awareness of asyncioWorker
subclassesBaseWorker
and adds awereness of networking, threading, and disk I/O.
Internal state permutation¶
Internally, WorkerState.handle_stimulus()
works very similarly to
the same process on the scheduler side:
WorkerState.handle_stimulus()
callsWorkerState._handle_<stimulus name>()
,which returns a tuple of
recommendations to transition tasks: {
TaskState
: <new state>}list of
Instruction
objects
WorkerState.handle_stimulus()
then passes the recommendations toWorkerState._transitions()
For each recommendation,
WorkerState._transitions()
callsWorkerState._transition()
,which in turn calls
WorkerState._transition_<start state>_<end state>()
,which in turn returns an additional tuple of (recommendations, instructions)
the new recommendations are consumed by
WorkerState._transitions()
, until no more recommendations are returned.WorkerState.handle_stimulus()
finally returns the list of instructions, which has been progressively extended by the transitions.
API Documentation¶
- class distributed.worker_state_machine.TaskState(key: Key, run_id: int = -1, run_spec: T_runspec | None = None, dependencies: set[TaskState] = <factory>, dependents: set[TaskState] = <factory>, waiting_for_data: set[TaskState] = <factory>, waiters: set[TaskState] = <factory>, state: TaskStateState = 'released', previous: Literal['executing', 'long-running', 'flight', None] = None, next: Literal['fetch', 'waiting', None] = None, duration: float | None = None, priority: tuple[int, ...] | None = None, who_has: set[str] = <factory>, coming_from: str | None = None, resource_restrictions: dict[str, float] = <factory>, exception: Serialize | None = None, traceback: Serialize | None = None, exception_text: str = '', traceback_text: str = '', type: type | None = None, suspicious_count: int = 0, startstops: list[StartStop] = <factory>, start_time: float | None = None, stop_time: float | None = None, metadata: dict = <factory>, nbytes: int | None = None, annotations: dict | None = None, span_id: str | None = None, done: bool = False)[source]¶
Holds volatile state relating to an individual Dask task.
Not to be confused with
distributed.scheduler.TaskState
, which holds similar information on the scheduler side.- done: bool¶
True if the
execute()
orgather_dep()
coroutine servicing this task completed; False otherwise. This flag changes the behaviour of transitions out of theexecuting
,flight
etc. states.
- key: Key¶
Task key. Mandatory.
- metadata: dict¶
Metadata related to the task. Stored metadata should be msgpack serializable (e.g. int, string, list, dict).
- next: Literal['fetch', 'waiting', None]¶
The next state of the task. It is not None iff
state
== resumed.
- previous: Literal['executing', 'long-running', 'flight', None]¶
The previous state of the task. It is not None iff
state
in (cancelled, resumed).
- priority: tuple[int, ...] | None¶
The priority this task given by the scheduler. Determines run order.
- run_spec: T_runspec | None¶
A tuple containing the
function
,args
,kwargs
andtask
associated with this TaskState instance. This defaults toNone
and can remain empty if it is a dependency that this worker will receive from another worker.
- span_id: str | None¶
unique span id (see
distributed.spans
). Matchesdistributed.scheduler.TaskState.group.span_id
.
- state: TaskStateState¶
The current state of the task
- class distributed.worker_state_machine.WorkerState(*, nthreads: int = 1, address: str | None = None, data: MutableMapping[Key, object] | None = None, threads: dict[Key, int] | None = None, plugins: dict[str, WorkerPlugin] | None = None, resources: Mapping[str, float] | None = None, transfer_incoming_count_limit: int = 9999, validate: bool = True, transition_counter_max: int | Literal[False] = False, transfer_incoming_bytes_limit: float = inf, transfer_message_bytes_limit: float = inf)[source]¶
State machine encapsulating the lifetime of all tasks on a worker.
Not to be confused with
distributed.scheduler.WorkerState
.Note
The data attributes of this class are implementation details and may be changed without a deprecation cycle.
Warning
The attributes of this class are all heavily correlated with each other. Do not modify them directly, ever, as it is extremely easy to obtain a broken state this way, which in turn will likely result in cluster-wide deadlocks.
The state should be exclusively mutated through
handle_stimulus()
.- address: str¶
Worker <IP address>:<port>. This is used in decision-making by the state machine, e.g. to determine if a peer worker is running on the same host or not. This attribute may not be known when the WorkerState is initialised. It must be set before the first call to
handle_stimulus()
.
- property all_running_tasks: set[distributed.worker_state_machine.TaskState]¶
All tasks that are currently occupying a thread. They may or may not count towards the maximum number of threads.
These are:
ts.status in (executing, long-running)
ts.status in (cancelled, resumed) and ts.previous in (executing, long-running)
See also
- available_resources: dict[str, float]¶
{resource name: amount}
. Current resources that aren’t being currently consumed by task execution. Always less or equal tototal_resources
. See Worker Resources.
- busy_workers: set[str]¶
Peer workers that recently returned a busy status. Workers in this set won’t be asked for additional dependencies for some time.
- constrained: HeapSet[TaskState]¶
Priority heap of tasks that are ready to run, but are waiting on abstract resources like GPUs. Mutually exclusive with
ready
. Seeavailable_resources
and Worker Resources.
- data: MutableMapping[Key, object]¶
In-memory tasks data. This collection is shared by reference between
Worker
,WorkerMemoryManager
, and this class.
- data_needed: defaultdict[str, HeapSet[TaskState]]¶
The tasks which still require data in order to execute and are in memory on at least another worker, prioritized as per-worker heaps. All and only tasks with
TaskState.state == 'fetch'
are in this collection. ATaskState
with multiple entries inwho_has
will appear multiple times here.
- executed_count: int¶
A number of tasks that this worker has run in its lifetime; this includes failed and cancelled tasks. See also
executing_count()
.
- executing: set[TaskState]¶
Set of tasks that are currently running.
This set includes exclusively tasks with
state
== ‘executing’ as well as tasks withstate
in (‘cancelled’, ‘resumed’) andprevious
== ‘executing`.See also
executing_count()
andlong_running
.
- property executing_count: int¶
Count of tasks currently executing on this worker and counting towards the maximum number of threads.
It includes cancelled tasks, but does not include long running (a.k.a. seceded) tasks.
- fetch_count: int¶
Total number of tasks in fetch state. If a task is in more than one data_needed heap, it’s only counted once.
- generation: int¶
Counter that decreases every time the compute-task handler is invoked by the Scheduler. It is appended to
TaskState.priority
and acts as a tie-breaker between tasks that have the same priority on the Scheduler, determining a last-in-first-out order between them.
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) list[distributed.worker_state_machine.Instruction] [source]¶
Process one or more external events, transition relevant tasks to new states, and return a list of instructions to be executed as a consequence.
See also
- has_what: defaultdict[str, set[Key]]¶
{worker address: {ts.key, ...}
. The data that we care about that we think a worker has
- in_flight_tasks: set[TaskState]¶
Tasks that are coming to us in current peer-to-peer connections.
This set includes exclusively tasks with
state
== ‘flight’ as well as tasks withstate
in (‘cancelled’, ‘resumed’) andprevious
== ‘flight`.See also
in_flight_tasks_count()
.
- property in_flight_tasks_count: int¶
Number of tasks currently being replicated from other workers to this one.
See also
- in_flight_workers: dict[str, set[Key]]¶
{worker address: {ts.key, ...}}
The workers from which we are currently gathering data and the dependencies we expect from those connections. Workers in this dict won’t be asked for additional dependencies until the current query returns.
- log: deque[tuple]¶
Transition log:
[(..., stimulus_id: str | None, timestamp: float), ...]
The number of stimuli logged is capped. See alsostory()
andstimulus_log
.
- long_running: set[TaskState]¶
Set of tasks that are currently running and have called
secede()
, so they no longer count towards the maximum number of concurrent tasks (nthreads). These tasks do not appear in theexecuting
set.This set includes exclusively tasks with
state
== ‘long-running’ as well as tasks withstate
in (‘cancelled’, ‘resumed’) andprevious
== ‘long-running`.
- nthreads: int¶
Number of tasks that can be executing in parallel. At any given time,
executing_count()
<= nthreads.
- plugins: dict[str, WorkerPlugin]¶
{name: worker plugin}
. This collection is shared by reference betweenWorker
and this class. The Worker managed adding and removing plugins, while the WorkerState invokes theWorkerPlugin.transition
method, is available.
- ready: HeapSet[TaskState]¶
Priority heap of tasks that are ready to run and have no resource constrains. Mutually exclusive with
constrained
.
- rng: random.Random¶
Statically-seeded random state, used to guarantee determinism whenever a pseudo-random choice is required
- running: bool¶
True if the state machine should start executing more tasks and fetch dependencies whenever a slot is available. This property must be kept aligned with the Worker:
WorkerState.running == (Worker.status is Status.running)
.
- stimulus_log: deque[StateMachineEvent]¶
Log of all stimuli received by
handle_stimulus()
. The number of events logged is capped. See alsolog
andstimulus_story()
.
- stimulus_story(*keys_or_tasks: Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.worker_state_machine.TaskState]) list[distributed.worker_state_machine.StateMachineEvent] [source]¶
Return all state machine events involving one or more tasks
- story(*keys_or_tasks_or_stimuli: Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.worker_state_machine.TaskState]) list[tuple] [source]¶
Return all records from the transitions log involving one or more tasks or stimulus_id’s
- task_counter: TaskCounter¶
Current number of tasks and cumulative elapsed time in each state, both broken down by
prefix
- tasks: dict[Key, TaskState]¶
{key: TaskState}
. The tasks currently executing on this worker (and any dependencies of those tasks)
- threads: dict[Key, int]¶
{ts.key: thread ID}
. This collection is shared by reference betweenWorker
and this class. While the WorkerState is thread-agnostic, it still needs access to this information in some cases. This collection is populated bydistributed.worker.Worker.execute()
. It does not need to be populated for the WorkerState to work.
- total_resources: dict[str, float]¶
{resource name: amount}
. Total resources available for task execution. See :doc: resources.
- transfer_incoming_bytes_limit: float¶
Limit of bytes for incoming data transfers; this is used for throttling.
- transfer_incoming_bytes_throttle_threshold: int¶
Ignore
transfer_incoming_count_limit
as long astransfer_incoming_bytes
is less than this value.
- property transfer_incoming_count: int¶
Current number of open data transfers from other workers.
See also
- transfer_incoming_count_limit: int¶
Maximum number of concurrent incoming data transfers from other workers. See also
distributed.worker.Worker.transfer_outgoing_count_limit
.
- transfer_incoming_count_total: int¶
Total number of data transfers from other workers since the worker was started.
- transfer_message_bytes_limit: float¶
Number of bytes to gather from the same worker in a single call to
BaseWorker.gather_dep()
. Multiple small tasks that can be gathered from the same worker will be batched in a single instruction as long as their combined size doesn’t exceed this value. If the first task to be gathered exceeds this limit, it will still be gathered to ensure progress. Hence, this limit is not absolute.
- transition_counter: int¶
Total number of state transitions so far. See also
log
andtransition_counter_max
.
- transition_counter_max: int | Literal[False]¶
Raise an error if the
transition_counter
ever reaches this value. This is meant for debugging only, to catch infinite recursion loops. In production, it should always be set to False.
- class distributed.worker_state_machine.BaseWorker(state: distributed.worker_state_machine.WorkerState)[source]¶
Wrapper around the
WorkerState
that implements instructions handling. This is an abstract class with several@abc.abstractmethod
methods, to be subclassed byWorker
and by unit test mock-ups.- abstract batched_send(msg: dict[str, Any]) None [source]¶
Send a fire-and-forget message to the scheduler through bulk comms.
- Parameters
- msg: dict
msgpack-serializable message to send to the scheduler. Must have a ‘op’ key which is registered in Scheduler.stream_handlers.
- abstract digest_metric(name: collections.abc.Hashable, value: float) None [source]¶
Log an arbitrary numerical metric
- abstract async execute(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent [source]¶
Execute a task
- abstract async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent [source]¶
Gather dependencies for a task from a worker who has them
- Parameters
- workerstr
Address of worker to gather dependencies from
- to_gatherlist
Keys of dependencies to gather from worker – this is not necessarily equivalent to the full list of dependencies of
dep
as some dependencies may already be present on this worker.- total_nbytesint
Total number of bytes for all the dependencies in to_gather combined
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None [source]¶
Forward one or more external stimuli to
WorkerState.handle_stimulus()
and process the returned instructions, invoking the relevant Worker callbacks (@abc.abstractmethod
methods below).Spawn asyncio tasks for all asynchronous instructions and start tracking them.
See also
- abstract async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent [source]¶
Wait some time, then take a peer worker out of busy state
- class distributed.worker_state_machine.StateMachineEvent(*args: Any, **kwargs: Any)[source]¶
Base abstract class for all stimuli that can modify the worker state
- static from_dict(d: dict) distributed.worker_state_machine.StateMachineEvent [source]¶
Convert the output of
recursive_to_dict
back into the original object. The output object is meaningful for the purpose of rebuilding the state machine, but not necessarily identical to the original.
- to_loggable(*, handled: float) distributed.worker_state_machine.StateMachineEvent [source]¶
Produce a variant version of self that is small enough to be stored in memory in the medium term and contains meaningful information for debugging
- class distributed.worker_state_machine.Instruction(stimulus_id: str)[source]¶
Command from the worker state machine to the Worker, in response to an event
- classmethod match(**kwargs: Any) distributed.worker_state_machine._InstructionMatch [source]¶
Generate a partial match to compare against an Instruction instance. The typical usage is to compare a list of instructions returned by
WorkerState.handle_stimulus()
or inWorkerState.stimulus_log
vs. an expected list of matches.Examples
instructions = ws.handle_stimulus(...) assert instructions == [ TaskFinishedMsg.match(key="x"), ... ]
Note
StateMachineEvent
and Instruction
are abstract classes, with many
subclasses which are not listed here for the sake of brevity.
Refer to the implementation module distributed.worker_state_machine
for the
full list.