Worker
Contents
Worker¶
Overview¶
Workers provide two functions:
Compute tasks as directed by the scheduler
Store and serve computed results to other workers or clients
Each worker contains a ThreadPool that it uses to evaluate tasks as requested by the scheduler. It stores the results of these tasks locally and serves them to other workers or clients on demand. If the worker is asked to evaluate a task for which it does not have all of the necessary data then it will reach out to its peer workers to gather the necessary dependencies.
A typical conversation between a scheduler and two workers Alice and Bob may look like the following:
Scheduler -> Alice: Compute ``x <- add(1, 2)``!
Alice -> Scheduler: I've computed x and am holding on to it!
Scheduler -> Bob: Compute ``y <- add(x, 10)``!
You will need x. Alice has x.
Bob -> Alice: Please send me x.
Alice -> Bob: Sure. x is 3!
Bob -> Scheduler: I've computed y and am holding on to it!
Storing Data¶
Data is stored locally in a dictionary in the .data
attribute that
maps keys to the results of function calls.
>>> worker.data
{'x': 3,
'y': 13,
...
'(df, 0)': pd.DataFrame(...),
...
}
This .data
attribute is a MutableMapping
that is typically a
combination of in-memory and on-disk storage with an LRU policy to move data
between them.
Read more: Worker Memory Management
Thread Pool¶
Each worker sends computations to a thread in a
concurrent.futures.ThreadPoolExecutor
for computation. These computations occur in the same process as the Worker
communication server so that they can access and share data efficiently between
each other. For the purposes of data locality all threads within a worker are
considered the same worker.
If your computations are mostly numeric in nature (for example NumPy and Pandas
computations) and release the GIL entirely then it is advisable to run
dask-worker
processes with many threads and one process. This reduces
communication costs and generally simplifies deployment.
If your computations are mostly Python code and don’t release the GIL then it
is advisable to run dask-worker
processes with many processes and one
thread per process:
$ dask-worker scheduler:8786 --nworkers 8 --nthreads 1
This will launch 8 worker processes each of which has its own ThreadPoolExecutor of size 1.
If your computations are external to Python and long-running and don’t release the GIL then beware that while the computation is running the worker process will not be able to communicate to other workers or to the scheduler. This situation should be avoided. If you don’t link in your own custom C/Fortran code then this topic probably doesn’t apply.
Command Line tool¶
Use the dask-worker
command line tool to start an individual worker. For
more details on the command line options, please have a look at the
command line tools documentation.
Internal Scheduling¶
Internally tasks that come to the scheduler proceed through the following pipeline as
distributed.worker_state_machine.TaskState
objects. Tasks which follow this
path have a runspec
defined which
instructs the worker how to execute them.
Data dependencies are also represented as
TaskState
objects and follow a simpler path
through the execution pipeline. These tasks do not have a
runspec
defined and instead contain
a listing of workers to collect their result from.
As tasks arrive they are prioritized and put into a heap. They are then taken
from this heap in turn to have any remote dependencies collected. For each
dependency we select a worker at random that has that data and collect the
dependency from that worker. To improve bandwidth we opportunistically gather
other dependencies of other tasks that are known to be on that worker, up to a
maximum of 200MB of data (too little data and bandwidth suffers, too much data
and responsiveness suffers). We use a fixed number of connections (around
10-50) so as to avoid overly-fragmenting our network bandwidth. In the event
that the network comms between two workers are saturated, a dependency task may
cycle between fetch
and flight
until it is successfully collected.
After all dependencies for a task are in memory we transition the task to the ready state and put the task again into a heap of tasks that are ready to run.
We collect from this heap and put the task into a thread from a local thread pool to execute.
Optionally, this task may identify itself as a long-running task (see Tasks launching tasks), at which point it secedes from the thread pool.
A task either errs or its result is put into memory. In either case a response is sent back to the scheduler.
Tasks slated for execution and tasks marked for collection from other workers must follow their respective transition paths as defined above. The only exceptions to this are when:
A task is stolen, in which case a task which might have been collected will instead be executed on the thieving worker
Scheduler intercession, in which the scheduler reassigns a task that was previously assigned to a separate worker to a new worker. This most commonly occurs when a worker dies during computation.
Nanny¶
Dask workers are by default launched, monitored, and managed by a small Nanny process.
- class distributed.nanny.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port: int | str | Collection[int] | None = 0, nthreads=None, loop=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port: int | str | Collection[int] | None = None, protocol=None, config=None, **worker_kwargs)[source]¶
A process to manage worker processes
The nanny spins up Worker processes, watches then, and kills or restarts them as necessary. It is necessary if you want to use the
Client.restart
method, or to restart the worker automatically if it gets to the terminate fraction of its memory limit.The parameters for the Nanny are mostly the same as those for the Worker with exceptions listed below.
- Parameters
- env: dict, optional
Environment variables set at time of Nanny initialization will be ensured to be set in the Worker process as well. This argument allows to overwrite or otherwise set environment variables for the Worker. It is also possible to set environment variables using the option distributed.nanny.environ. Precedence as follows
Nanny arguments
Existing environment variables
Dask configuration
See also
Worker
- close_gracefully()[source]¶
A signal that we shouldn’t try to restart workers if they go away
This is used as part of the cluster shutdown process.
- async instantiate() distributed.core.Status [source]¶
Start a local worker process
Blocks until the process is up and the scheduler is properly informed
API Documentation¶
- class distributed.worker_state_machine.TaskState(key: str, run_spec: SerializedTask | None = None, dependencies: set[TaskState] = <factory>, dependents: set[TaskState] = <factory>, waiting_for_data: set[TaskState] = <factory>, waiters: set[TaskState] = <factory>, state: TaskStateState = 'released', previous: TaskStateState | None = None, next: TaskStateState | None = None, duration: float | None = None, priority: tuple[int, ...] | None = None, who_has: set[str] = <factory>, coming_from: str | None = None, resource_restrictions: dict[str, float] = <factory>, exception: Serialize | None = None, traceback: Serialize | None = None, exception_text: str = '', traceback_text: str = '', type: type | None = None, suspicious_count: int = 0, startstops: list[StartStop] = <factory>, start_time: float | None = None, stop_time: float | None = None, metadata: dict = <factory>, nbytes: int | None = None, annotations: dict | None = None, done: bool = False)[source]¶
Holds volatile state relating to an individual Dask task.
Not to be confused with
distributed.scheduler.TaskState
, which holds similar information on the scheduler side.- coming_from: str | None = None¶
The worker that current task data is coming from if task is in flight
- metadata: dict¶
Metadata related to the task. Stored metadata should be msgpack serializable (e.g. int, string, list, dict).
- next: TaskStateState | None = None¶
The next state of the task. It is not None iff state == resumed.
- previous: TaskStateState | None = None¶
The previous state of the task. It is not None iff state in (cancelled, resumed).
- priority: tuple[int, ...] | None = None¶
The priority this task given by the scheduler. Determines run order.
- run_spec: SerializedTask | None = None¶
A named tuple containing the
function
,args
,kwargs
andtask
associated with this TaskState instance. This defaults toNone
and can remain empty if it is a dependency that this worker will receive from another worker.
- state: TaskStateState = 'released'¶
The current state of the task
- class distributed.worker_state_machine.WorkerState(*, nthreads: int = 1, address: str | None = None, data: MutableMapping[str, object] | None = None, threads: dict[str, int] | None = None, plugins: dict[str, WorkerPlugin] | None = None, resources: Mapping[str, float] | None = None, total_out_connections: int = 9999, validate: bool = True, transition_counter_max: int | Literal[False] = False)[source]¶
State machine encapsulating the lifetime of all tasks on a worker.
Not to be confused with
distributed.scheduler.WorkerState
.Note
The data attributes of this class are implementation details and may be changed without a deprecation cycle.
Warning
The attributes of this class are all heavily correlated with each other. Do not modify them directly, ever, as it is extremely easy to obtain a broken state this way, which in turn will likely result in cluster-wide deadlocks.
The state should be exclusively mutated through
handle_stimulus()
.- address: str¶
Worker <IP address>:<port>. This is used in decision-making by the state machine, e.g. to determine if a peer worker is running on the same host or not. This attribute may not be known when the WorkerState is initialised. It must be set before the first call to
handle_stimulus()
.
- property all_running_tasks: set[distributed.worker_state_machine.TaskState]¶
All tasks that are currently occupying a thread. They may or may not count towards the maximum number of threads.
These are:
ts.status in (executing, long-running)
ts.status in (cancelled, resumed) and ts.previous in (executing, long-running)
See also
- available_resources: dict[str, float]¶
{resource name: amount}
. Current resources that aren’t being currently consumed by task execution. Always less or equal tototal_resources
. See Worker Resources.
- busy_workers: set[str]¶
Peer workers that recently returned a busy status. Workers in this set won’t be asked for additional dependencies for some time.
- comm_threshold_bytes: int¶
Ignore
total_out_connections
as long ascomm_nbytes
is less than this value.
- constrained: HeapSet[TaskState]¶
Priority heap of tasks that are ready to run, but are waiting on abstract resources like GPUs. Mutually exclusive with
ready
. Seeavailable_resources
and Worker Resources.
- data: MutableMapping[str, object]¶
In-memory tasks data. This collection is shared by reference between
Worker
,WorkerMemoryManager
, and this class.
- data_needed: defaultdict[str, HeapSet[TaskState]]¶
The tasks which still require data in order to execute and are in memory on at least another worker, prioritized as per-worker heaps. All and only tasks with
TaskState.state == 'fetch'
are in this collection. ATaskState
with multiple entries inwho_has
will appear multiple times here.
- executed_count: int¶
A number of tasks that this worker has run in its lifetime; this includes failed and cancelled tasks. See also
executing_count()
.
- executing: set[TaskState]¶
Set of tasks that are currently running. See also
executing_count()
andlong_running
.
- property executing_count: int¶
Count of tasks currently executing on this worker and counting towards the maximum number of threads.
It includes cancelled tasks, but does not include long running (a.k.a. seceded) tasks.
- generation: int¶
Counter that decreases every time the compute-task handler is invoked by the Scheduler. It is appended to
TaskState.priority
and acts as a tie-breaker between tasks that have the same priority on the Scheduler, determining a last-in-first-out order between them.
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) list [source]¶
Process one or more external events, transition relevant tasks to new states, and return a list of instructions to be executed as a consequence.
See also
- has_what: defaultdict[str, set[str]]¶
{worker address: {ts.key, ...}
. The data that we care about that we think a worker has
- in_flight_tasks: set[TaskState]¶
Which tasks that are coming to us in current peer-to-peer connections. All and only tasks with TaskState.state == ‘flight’. See also
in_flight_tasks_count()
.
- property in_flight_tasks_count: int¶
Count of tasks currently being replicated from other workers to this one.
See also
- in_flight_workers: dict[str, set[str]]¶
{worker address: {ts.key, ...}}
The workers from which we are currently gathering data and the dependencies we expect from those connections. Workers in this dict won’t be asked for additional dependencies until the current query returns.
- log: deque[tuple]¶
Transition log:
[(..., stimulus_id: str | None, timestamp: float), ...]
The number of stimuli logged is capped. See alsostory()
andstimulus_log
.
- long_running: set[TaskState]¶
Set of tasks that are currently running and have called
secede()
, so they no longer count towards the maximum number of concurrent tasks (nthreads). These tasks do not appear in theexecuting
set.
- nthreads: int¶
Number of tasks that can be executing in parallel. At any given time,
executing_count()
<= nthreads.
- plugins: dict[str, WorkerPlugin]¶
{name: worker plugin}
. This collection is shared by reference betweenWorker
and this class. The Worker managed adding and removing plugins, while the WorkerState invokes theWorkerPlugin.transition
method, is available.
- ready: HeapSet[TaskState]¶
Priority heap of tasks that are ready to run and have no resource constrains. Mutually exclusive with
constrained
.
- rng: random.Random¶
Statically-seeded random state, used to guarantee determinism whenever a pseudo-random choice is required
- running: bool¶
True if the state machine should start executing more tasks and fetch dependencies whenever a slot is available. This property must be kept aligned with the Worker:
WorkerState.running == (Worker.status is Status.running)
.
- stimulus_log: deque[StateMachineEvent]¶
Log of all stimuli received by
handle_stimulus()
. The number of events logged is capped. See alsolog
andstimulus_story()
.
- stimulus_story(*keys_or_tasks: str | TaskState) list[StateMachineEvent] [source]¶
Return all state machine events involving one or more tasks
- story(*keys_or_tasks_or_stimuli: str | TaskState) list[tuple] [source]¶
Return all records from the transitions log involving one or more tasks or stimulus_id’s
- target_message_size: int¶
Number of bytes to fetch from the same worker in a single call to
BaseWorker.gather_dep()
. Multiple small tasks that can be fetched from the same worker will be clustered in a single instruction as long as their combined size doesn’t exceed this value.
- tasks: dict[str, TaskState]¶
{key: TaskState}
. The tasks currently executing on this worker (and any dependencies of those tasks)
- threads: dict[str, int]¶
{ts.key: thread ID}
. This collection is shared by reference betweenWorker
and this class. While the WorkerState is thread-agnostic, it still needs access to this information in some cases. This collection is populated bydistributed.worker.Worker.execute()
. It does not need to be populated for the WorkerState to work.
- total_out_connections: int¶
The maximum number of concurrent incoming requests for data. See also
distributed.worker.Worker.total_in_connections
.
- total_resources: dict[str, float]¶
{resource name: amount}
. Total resources available for task execution. See :doc: resources.
- transition_counter: int¶
Total number of state transitions so far. See also
log
andtransition_counter_max
.
- transition_counter_max: int | Literal[False]¶
Raise an error if the
transition_counter
ever reaches this value. This is meant for debugging only, to catch infinite recursion loops. In production, it should always be set to False.
- class distributed.worker_state_machine.BaseWorker(state: distributed.worker_state_machine.WorkerState)[source]¶
Wrapper around the
WorkerState
that implements instructions handling. This is an abstract class with several@abc.abstractmethod
methods, to be subclassed byWorker
and by unit test mock-ups.- abstract batched_send(msg: dict[str, Any]) None [source]¶
Send a fire-and-forget message to the scheduler through bulk comms.
- Parameters
- msg: dict
msgpack-serializable message to send to the scheduler. Must have a ‘op’ key which is registered in Scheduler.stream_handlers.
- abstract async execute(key: str, *, stimulus_id: str) StateMachineEvent | None [source]¶
Execute a task
- abstract async gather_dep(worker: str, to_gather: Collection[str], total_nbytes: int, *, stimulus_id: str) StateMachineEvent | None [source]¶
Gather dependencies for a task from a worker who has them
- Parameters
- workerstr
Address of worker to gather dependencies from
- to_gatherlist
Keys of dependencies to gather from worker – this is not necessarily equivalent to the full list of dependencies of
dep
as some dependencies may already be present on this worker.- total_nbytesint
Total number of bytes for all the dependencies in to_gather combined
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None [source]¶
Forward one or more external stimuli to
WorkerState.handle_stimulus()
and process the returned instructions, invoking the relevant Worker callbacks (@abc.abstractmethod
methods below).Spawn asyncio tasks for all asynchronous instructions and start tracking them.
See also
- class distributed.worker.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool | None = None, executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = '1s', extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, interface: str | None = None, host: str | None = None, port: int | str | Collection[int] | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, transition_counter_max: int | Literal[False] = False, memory_limit: str | float = 'auto', data: MutableMapping[str, Any] | Callable[[], MutableMapping[str, Any]] | tuple[Callable[..., MutableMapping[str, Any]], dict[str, Any]] | None = None, memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, **kwargs)[source]¶
Worker node in a Dask distributed cluster
Workers perform two functions:
Serve data from a local dictionary
Perform computation on that data and on data from peers
Workers keep the scheduler informed of their data and use that scheduler to gather data from other workers when necessary to perform a computation.
You can start a worker with the
dask-worker
command line application:$ dask-worker scheduler-ip:port
Use the
--help
flag to see more options:$ dask-worker --help
The rest of this docstring is about the internal state that the worker uses to manage and track internal computations.
State
Informational State
These attributes don’t change significantly during execution.
- nthreads:
int
: Number of nthreads used by this worker process
- nthreads:
- executors:
dict[str, concurrent.futures.Executor]
: Executors used to perform computation. Always contains the default executor.
- executors:
- local_directory:
path
: Path on local machine to store temporary files
- local_directory:
- scheduler:
rpc
: Location of scheduler. See
.ip/.port
attributes.
- scheduler:
- name:
string
: Alias
- name:
- services:
{str: Server}
: Auxiliary web servers running on this worker
- services:
service_ports:
{str: port}
:- total_in_connections:
int
The maximum number of concurrent incoming requests for data. See also
distributed.worker_state_machine.WorkerState.total_out_connections
.
- total_in_connections:
- batched_stream:
BatchedSend
A batched stream along which we communicate to the scheduler
- batched_stream:
- log:
[(message)]
A structured and queryable log. See
Worker.story
- log:
Volatile State
These attributes track the progress of tasks that this worker is trying to complete. In the descriptions below a
key
is the name of a task that we want to compute anddep
is the name of a piece of dependent data that we want to collect from others.- threads:
{key: int}
The ID of the thread on which the task ran
- threads:
- active_threads:
{int: key}
The keys currently running on active threads
- active_threads:
- state:
WorkerState
Encapsulated state machine. See
BaseWorker
andWorkerState
- state:
- Parameters
- scheduler_ip: str, optional
- scheduler_port: int, optional
- scheduler_file: str, optional
- host: str, optional
- data: MutableMapping, type, None
The object to use for storage, builds a disk-backed LRU dict by default
- nthreads: int, optional
- local_directory: str, optional
Directory where we place local resources
- name: str, optional
- memory_limit: int, float, string
Number of bytes of memory that this worker should use. Set to zero for no limit. Set to ‘auto’ to calculate as system.MEMORY_LIMIT * min(1, nthreads / total_cores) Use strings or numbers like 5GB or 5e9
- memory_target_fraction: float or False
Fraction of memory to try to stay beneath (default: read from config key distributed.worker.memory.target)
- memory_spill_fraction: float or False
Fraction of memory at which we start spilling to disk (default: read from config key distributed.worker.memory.spill)
- memory_pause_fraction: float or False
Fraction of memory at which we stop running new tasks (default: read from config key distributed.worker.memory.pause)
- max_spill: int, string or False
Limit of number of bytes to be spilled on disk. (default: read from config key distributed.worker.memory.max-spill)
- executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
- The executor(s) to use. Depending on the type, it has the following meanings:
Executor instance: The default executor.
Dict[str, Executor]: mapping names to Executor instances. If the “default” key isn’t in the dict, a “default” executor will be created using
ThreadPoolExecutor(nthreads)
.Str: The string “offload”, which refer to the same thread pool used for offloading communications. This results in the same thread being used for deserialization and computation.
- resources: dict
Resources that this worker has like
{'GPU': 2}
- nanny: str
Address on which to contact nanny, if it exists
- lifetime: str
Amount of time like “1 hour” after which we gracefully shut down the worker. This defaults to None, meaning no explicit shutdown time.
- lifetime_stagger: str
Amount of time like “5 minutes” to stagger the lifetime value The actual lifetime will be selected uniformly at random between lifetime +/- lifetime_stagger
- lifetime_restart: bool
Whether or not to restart a worker after it has reached its lifetime Default False
- kwargs: optional
Additional parameters to ServerNode constructor
Examples
Use the command line to start a worker:
$ dask-scheduler Start scheduler at 127.0.0.1:8786 $ dask-worker 127.0.0.1:8786 Start worker at: 127.0.0.1:1234 Registered with scheduler at: 127.0.0.1:8786
- batched_send(msg: dict[str, Any]) None [source]¶
Implements BaseWorker abstract method.
Send a fire-and-forget message to the scheduler through bulk comms.
If we’re not currently connected to the scheduler, the message will be silently dropped!
- async close(timeout: float = 30, executor_wait: bool = True, nanny: bool = True) str | None [source]¶
Close the worker
Close asynchronous operations running on the worker, stop all executors and comms. If requested, this also closes the nanny.
- Parameters
- timeoutfloat, default 30
Timeout in seconds for shutting down individual instructions
- executor_waitbool, default True
If True, shut down executors synchronously, otherwise asynchronously
- nannybool, default True
If True, close the nanny
- Returns
- str | None
None if worker already in closing state or failed, “OK” otherwise
- async close_gracefully(restart=None)[source]¶
Gracefully shut down a worker
This first informs the scheduler that we’re shutting down, and asks it to move our data elsewhere. Afterwards, we close as normal
- property data: collections.abc.MutableMapping[str, Any]¶
{task key: task payload} of all completed tasks, whether they were computed on this Worker or computed somewhere else and then transferred here over the network.
When using the default configuration, this is a zict buffer that automatically spills to disk whenever the target threshold is exceeded. If spilling is disabled, it is a plain dict instead. It could also be a user-defined arbitrary dict-like passed when initialising the Worker or the Nanny. Worker logic should treat this opaquely and stick to the MutableMapping API.
Note
This same collection is also available at
self.state.data
andself.memory_manager.data
.
- async execute(key: str, *, stimulus_id: str) StateMachineEvent | None [source]¶
Execute a task. Implements BaseWorker abstract method.
- async gather_dep(worker: str, to_gather: Collection[str], total_nbytes: int, *, stimulus_id: str) StateMachineEvent | None [source]¶
Implements BaseWorker abstract method
- get_current_task() str [source]¶
Get the key of the task we are currently running
This only makes sense to run within a task
See also
get_worker
Examples
>>> from dask.distributed import get_worker >>> def f(): ... return get_worker().get_current_task()
>>> future = client.submit(f) >>> future.result() 'f-1234'
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None [source]¶
Override BaseWorker method for added validation
- async retry_busy_worker_later(worker: str) StateMachineEvent | None [source]¶
Wait some time, then take a peer worker out of busy state. Implements BaseWorker abstract method.
- async start_unsafe()[source]¶
Attempt to start the server. This is not idempotent and not protected against concurrent startup attempts.
This is intended to be overwritten or called by subclasses. For a safe startup, please use
Server.start
instead.If
death_timeout
is configured, we will require this coroutine to finish before this timeout is reached. If the timeout is reached we will close the instance and raise anasyncio.TimeoutError
- trigger_profile() None [source]¶
Get a frame from all actively computing threads
Merge these frames into existing profile counts
- property worker_address¶
For API compatibility with Nanny