Worker

Overview

Workers provide two functions:

  1. Compute tasks as directed by the scheduler

  2. Store and serve computed results to other workers or clients

Each worker contains a ThreadPool that it uses to evaluate tasks as requested by the scheduler. It stores the results of these tasks locally and serves them to other workers or clients on demand. If the worker is asked to evaluate a task for which it does not have all of the necessary data then it will reach out to its peer workers to gather the necessary dependencies.

A typical conversation between a scheduler and two workers Alice and Bob may look like the following:

Scheduler -> Alice:  Compute ``x <- add(1, 2)``!
Alice -> Scheduler:  I've computed x and am holding on to it!

Scheduler -> Bob:    Compute ``y <- add(x, 10)``!
                     You will need x.  Alice has x.
Bob -> Alice:        Please send me x.
Alice -> Bob:        Sure.  x is 3!
Bob -> Scheduler:    I've computed y and am holding on to it!

Storing Data

Data is stored locally in a dictionary in the .data attribute that maps keys to the results of function calls.

>>> worker.data
{'x': 3,
 'y': 13,
 ...
 '(df, 0)': pd.DataFrame(...),
 ...
 }

This .data attribute is a MutableMapping that is typically a combination of in-memory and on-disk storage with an LRU policy to move data between them.

Read more: Worker Memory Management

Thread Pool

Each worker sends computations to a thread in a concurrent.futures.ThreadPoolExecutor for computation. These computations occur in the same process as the Worker communication server so that they can access and share data efficiently between each other. For the purposes of data locality all threads within a worker are considered the same worker.

If your computations are mostly numeric in nature (for example NumPy and Pandas computations) and release the GIL entirely then it is advisable to run dask-worker processes with many threads and one process. This reduces communication costs and generally simplifies deployment.

If your computations are mostly Python code and don’t release the GIL then it is advisable to run dask-worker processes with many processes and one thread per process:

$ dask-worker scheduler:8786 --nworkers 8 --nthreads 1

This will launch 8 worker processes each of which has its own ThreadPoolExecutor of size 1.

If your computations are external to Python and long-running and don’t release the GIL then beware that while the computation is running the worker process will not be able to communicate to other workers or to the scheduler. This situation should be avoided. If you don’t link in your own custom C/Fortran code then this topic probably doesn’t apply.

Command Line tool

Use the dask-worker command line tool to start an individual worker. For more details on the command line options, please have a look at the command line tools documentation.

Internal Scheduling

Internally tasks that come to the scheduler proceed through the following pipeline as distributed.worker_state_machine.TaskState objects. Tasks which follow this path have a runspec defined which instructs the worker how to execute them.

Dask worker task states

Data dependencies are also represented as TaskState objects and follow a simpler path through the execution pipeline. These tasks do not have a runspec defined and instead contain a listing of workers to collect their result from.

Dask worker dependency states

As tasks arrive they are prioritized and put into a heap. They are then taken from this heap in turn to have any remote dependencies collected. For each dependency we select a worker at random that has that data and collect the dependency from that worker. To improve bandwidth we opportunistically gather other dependencies of other tasks that are known to be on that worker, up to a maximum of 200MB of data (too little data and bandwidth suffers, too much data and responsiveness suffers). We use a fixed number of connections (around 10-50) so as to avoid overly-fragmenting our network bandwidth. In the event that the network comms between two workers are saturated, a dependency task may cycle between fetch and flight until it is successfully collected.

After all dependencies for a task are in memory we transition the task to the ready state and put the task again into a heap of tasks that are ready to run.

We collect from this heap and put the task into a thread from a local thread pool to execute.

Optionally, this task may identify itself as a long-running task (see Tasks launching tasks), at which point it secedes from the thread pool.

A task either errs or its result is put into memory. In either case a response is sent back to the scheduler.

Tasks slated for execution and tasks marked for collection from other workers must follow their respective transition paths as defined above. The only exceptions to this are when:

  • A task is stolen, in which case a task which might have been collected will instead be executed on the thieving worker

  • Scheduler intercession, in which the scheduler reassigns a task that was previously assigned to a separate worker to a new worker. This most commonly occurs when a worker dies during computation.

Nanny

Dask workers are by default launched, monitored, and managed by a small Nanny process.

class distributed.nanny.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port: int | str | Collection[int] | None = 0, nthreads=None, loop=None, local_dir=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port: int | str | Collection[int] | None = None, protocol=None, config=None, **worker_kwargs)[source]

A process to manage worker processes

The nanny spins up Worker processes, watches then, and kills or restarts them as necessary. It is necessary if you want to use the Client.restart method, or to restart the worker automatically if it gets to the terminate fraction of its memory limit.

The parameters for the Nanny are mostly the same as those for the Worker with exceptions listed below.

Parameters
env: dict, optional

Environment variables set at time of Nanny initialization will be ensured to be set in the Worker process as well. This argument allows to overwrite or otherwise set environment variables for the Worker. It is also possible to set environment variables using the option distributed.nanny.environ. Precedence as follows

  1. Nanny arguments

  2. Existing environment variables

  3. Dask configuration

See also

Worker
async close(comm=None, timeout=5, report=None)[source]

Close the worker process, stop all comms.

close_gracefully()[source]

A signal that we shouldn’t try to restart workers if they go away

This is used as part of the cluster shutdown process.

async instantiate() distributed.core.Status[source]

Start a local worker process

Blocks until the process is up and the scheduler is properly informed

async kill(timeout=2)[source]

Kill the local worker process

Blocks until both the process is down and the scheduler is properly informed

property local_dir

For API compatibility with Nanny

async start()[source]

Start nanny, start local process, start watching

API Documentation

class distributed.worker_state_machine.TaskState(key: str, run_spec: SerializedTask | None = None, dependencies: set[TaskState] = <factory>, dependents: set[TaskState] = <factory>, waiting_for_data: set[TaskState] = <factory>, waiters: set[TaskState] = <factory>, state: TaskStateState = 'released', _previous: TaskStateState | None = None, _next: TaskStateState | None = None, duration: float | None = None, priority: tuple[int, ...] | None = None, who_has: set[str] = <factory>, coming_from: str | None = None, resource_restrictions: dict[str, float] = <factory>, exception: Serialize | None = None, traceback: Serialize | None = None, exception_text: str = '', traceback_text: str = '', type: type | None = None, suspicious_count: int = 0, startstops: list[StartStop] = <factory>, start_time: float | None = None, stop_time: float | None = None, metadata: dict = <factory>, nbytes: int | None = None, annotations: dict | None = None, done: bool = False)[source]

Holds volatile state relating to an individual Dask task.

Not to be confused with distributed.scheduler.TaskState, which holds similar information on the scheduler side.

annotations: dict | None = None

Arbitrary task annotations

coming_from: str | None = None

The worker that current task data is coming from if task is in flight

dependencies: set[TaskState]

The data needed by this key to run

dependents: set[TaskState]

The keys that use this dependency

done: bool = False

True if the task is in memory or erred; False otherwise

duration: float | None = None

Expected duration of the task

exception: Serialize | None = None

The exception caused by running a task if it erred (serialized)

exception_text: str = ''

string representation of exception

key: str

Task key. Mandatory.

metadata: dict

Metadata related to the task. Stored metadata should be msgpack serializable (e.g. int, string, list, dict).

nbytes: int | None = None

The size of the value of the task, if in memory

priority: tuple[int, ...] | None = None

The priority this task given by the scheduler. Determines run order.

resource_restrictions: dict[str, float]

Abstract resources required to run a task

run_spec: SerializedTask | None = None

A named tuple containing the function, args, kwargs and task associated with this TaskState instance. This defaults to None and can remain empty if it is a dependency that this worker will receive from another worker.

start_time: float | None = None

Time at which task begins running

startstops: list[StartStop]

Log of transfer, load, and compute times for a task

state: TaskStateState = 'released'

The current state of the task

stop_time: float | None = None

Time at which task finishes running

suspicious_count: int = 0

The number of times a dependency has not been where we expected it

traceback: Serialize | None = None

The traceback caused by running a task if it erred (serialized)

traceback_text: str = ''

string representation of traceback

type: type | None = None

The type of a particular piece of data

waiters: set[TaskState]

Subset of dependents that are not in memory

waiting_for_data: set[TaskState]

Subset of dependencies that are not in memory

who_has: set[str]

Addresses of workers that we believe have this data

class distributed.worker.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_dir: None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool = True, executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = '1s', extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, interface: str | None = None, host: str | None = None, port: int | str | Collection[int] | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, memory_limit: str | float = 'auto', data=None, memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, **kwargs)[source]

Worker node in a Dask distributed cluster

Workers perform two functions:

  1. Serve data from a local dictionary

  2. Perform computation on that data and on data from peers

Workers keep the scheduler informed of their data and use that scheduler to gather data from other workers when necessary to perform a computation.

You can start a worker with the dask-worker command line application:

$ dask-worker scheduler-ip:port

Use the --help flag to see more options:

$ dask-worker --help

The rest of this docstring is about the internal state the the worker uses to manage and track internal computations.

State

Informational State

These attributes don’t change significantly during execution.

  • nthreads: int:

    Number of nthreads used by this worker process

  • executors: dict[str, concurrent.futures.Executor]:

    Executors used to perform computation. Always contains the default executor.

  • local_directory: path:

    Path on local machine to store temporary files

  • scheduler: rpc:

    Location of scheduler. See .ip/.port attributes.

  • name: string:

    Alias

  • services: {str: Server}:

    Auxiliary web servers running on this worker

  • service_ports: {str: port}:

  • total_out_connections: int

    The maximum number of concurrent outgoing requests for data

  • total_in_connections: int

    The maximum number of concurrent incoming requests for data

  • comm_threshold_bytes: int

    As long as the total number of bytes in flight is below this threshold we will not limit the number of outgoing connections for a single tasks dependency fetch.

  • batched_stream: BatchedSend

    A batched stream along which we communicate to the scheduler

  • log: [(message)]

    A structured and queryable log. See Worker.story

Volatile State

These attributes track the progress of tasks that this worker is trying to complete. In the descriptions below a key is the name of a task that we want to compute and dep is the name of a piece of dependent data that we want to collect from others.

  • tasks: {key: TaskState}

    The tasks currently executing on this worker (and any dependencies of those tasks)

  • data_needed: UniqueTaskHeap

    The tasks which still require data in order to execute and are in memory on at least another worker, prioritized as a heap

  • data_needed_per_worker: {worker: UniqueTaskHeap}

    Same as data_needed, split by worker

  • ready: [keys]

    Keys that are ready to run. Stored in a LIFO stack

  • constrained: [keys]

    Keys for which we have the data to run, but are waiting on abstract resources like GPUs. Stored in a FIFO deque

  • executing_count: int

    A count of tasks currently executing on this worker

  • executed_count: int

    A number of tasks that this worker has run in its lifetime

  • long_running: {keys}

    A set of keys of tasks that are running and have started their own long-running clients.

  • has_what: {worker: {deps}}

    The data that we care about that we think a worker has

  • in_flight_tasks: int

    A count of the number of tasks that are coming to us in current peer-to-peer connections

  • in_flight_workers: {worker: {task}}

    The workers from which we are currently gathering data and the dependencies we expect from those connections. Workers in this dict won’t be asked for additional dependencies until the current query returns.

  • busy_workers: {worker}

    Workers that recently returned a busy status. Workers in this set won’t be asked for additional dependencies for some time.

  • comm_bytes: int

    The total number of bytes in flight

  • threads: {key: int}

    The ID of the thread on which the task ran

  • active_threads: {int: key}

    The keys currently running on active threads

  • waiting_for_data_count: int

    A count of how many tasks are currently waiting for data

  • generation: int

    Counter that decreases every time the compute-task handler is invoked by the Scheduler. It is appended to TaskState.priority and acts as a tie-breaker between tasks that have the same priority on the Scheduler, determining a last-in-first-out order between them.

Parameters
scheduler_ip: str, optional
scheduler_port: int, optional
scheduler_file: str, optional
ip: str, optional
data: MutableMapping, type, None

The object to use for storage, builds a disk-backed LRU dict by default

nthreads: int, optional
loop: tornado.ioloop.IOLoop
local_directory: str, optional

Directory where we place local resources

name: str, optional
memory_limit: int, float, string

Number of bytes of memory that this worker should use. Set to zero for no limit. Set to ‘auto’ to calculate as system.MEMORY_LIMIT * min(1, nthreads / total_cores) Use strings or numbers like 5GB or 5e9

memory_target_fraction: float or False

Fraction of memory to try to stay beneath (default: read from config key distributed.worker.memory.target)

memory_spill_fraction: float or false

Fraction of memory at which we start spilling to disk (default: read from config key distributed.worker.memory.spill)

memory_pause_fraction: float or False

Fraction of memory at which we stop running new tasks (default: read from config key distributed.worker.memory.pause)

max_spill: int, string or False

Limit of number of bytes to be spilled on disk. (default: read from config key distributed.worker.memory.max-spill)

executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
The executor(s) to use. Depending on the type, it has the following meanings:
  • Executor instance: The default executor.

  • Dict[str, Executor]: mapping names to Executor instances. If the “default” key isn’t in the dict, a “default” executor will be created using ThreadPoolExecutor(nthreads).

  • Str: The string “offload”, which refer to the same thread pool used for offloading communications. This results in the same thread being used for deserialization and computation.

resources: dict

Resources that this worker has like {'GPU': 2}

nanny: str

Address on which to contact nanny, if it exists

lifetime: str

Amount of time like “1 hour” after which we gracefully shut down the worker. This defaults to None, meaning no explicit shutdown time.

lifetime_stagger: str

Amount of time like “5 minutes” to stagger the lifetime value The actual lifetime will be selected uniformly at random between lifetime +/- lifetime_stagger

lifetime_restart: bool

Whether or not to restart a worker after it has reached its lifetime Default False

kwargs: optional

Additional parameters to ServerNode constructor

Examples

Use the command line to start a worker:

$ dask-scheduler
Start scheduler at 127.0.0.1:8786

$ dask-worker 127.0.0.1:8786
Start worker at:               127.0.0.1:1234
Registered with scheduler at:  127.0.0.1:8786
async close_gracefully(restart=None)[source]

Gracefully shut down a worker

This first informs the scheduler that we’re shutting down, and asks it to move our data elsewhere. Afterwards, we close as normal

property data: collections.abc.MutableMapping[str, Any]

{task key: task payload} of all completed tasks, whether they were computed on this Worker or computed somewhere else and then transferred here over the network.

When using the default configuration, this is a zict buffer that automatically spills to disk whenever the target threshold is exceeded. If spilling is disabled, it is a plain dict instead. It could also be a user-defined arbitrary dict-like passed when initialising the Worker or the Nanny. Worker logic should treat this opaquely and stick to the MutableMapping API.

async gather_dep(worker: str, to_gather: collections.abc.Iterable[str], total_nbytes: int, *, stimulus_id: str) None[source]

Gather dependencies for a task from a worker who has them

Parameters
workerstr

Address of worker to gather dependencies from

to_gatherlist

Keys of dependencies to gather from worker – this is not necessarily equivalent to the full list of dependencies of dep as some dependencies may already be present on this worker.

total_nbytesint

Total number of bytes for all the dependencies in to_gather combined

get_current_task() str[source]

Get the key of the task we are currently running

This only makes sense to run within a task

See also

get_worker

Examples

>>> from dask.distributed import get_worker
>>> def f():
...     return get_worker().get_current_task()
>>> future = client.submit(f)  
>>> future.result()  
'f-1234'
handle_cancel_compute(key: str, stimulus_id: str) None[source]

Cancel a task on a best effort basis. This is only possible while a task is in state waiting or ready. Nothing will happen otherwise.

handle_free_keys(keys: list[str], stimulus_id: str) None[source]

Handler to be called by the scheduler.

The given keys are no longer referred to and required by the scheduler. The worker is now allowed to release the key, if applicable.

This does not guarantee that the memory is released since the worker may still decide to hold on to the data and task since it is required by an upstream dependency.

handle_remove_replicas(keys: list[str], stimulus_id: str) str[source]

Stream handler notifying the worker that it might be holding unreferenced, superfluous data.

This should not actually happen during ordinary operations and is only intended to correct any erroneous state. An example where this is necessary is if a worker fetches data for a downstream task but that task is released before the data arrives. In this case, the scheduler will notify the worker that it may be holding this unnecessary data, if the worker hasn’t released the data itself, already.

This handler does not guarantee the task nor the data to be actually released but only asks the worker to release the data on a best effort guarantee. This protects from race conditions where the given keys may already have been rescheduled for compute in which case the compute would win and this handler is ignored.

For stronger guarantees, see handler free_keys

stimulus_story(*keys_or_tasks: str | TaskState) list[StateMachineEvent][source]

Return all state machine events involving one or more tasks

story(*keys_or_tasks: str | TaskState) list[tuple][source]

Return all transitions involving one or more tasks

transition(ts: distributed.worker_state_machine.TaskState, finish: str, *, stimulus_id: str, **kwargs) None[source]

Transition a key from its current state to the finish state

Returns
Dictionary of recommendations for future transitions

See also

Scheduler.transitions

transitive version of this function

Examples

>>> self.transition('x', 'waiting')
{'x': 'processing'}
transition_resumed_fetch(ts: distributed.worker_state_machine.TaskState, *, stimulus_id: str) tuple[source]

See Worker._transition_from_resumed

transition_resumed_missing(ts: distributed.worker_state_machine.TaskState, *, stimulus_id: str) tuple[source]

See Worker._transition_from_resumed

transition_resumed_waiting(ts: distributed.worker_state_machine.TaskState, *, stimulus_id: str)[source]

See Worker._transition_from_resumed

transitions(recommendations: dict, *, stimulus_id: str) None[source]

Process transitions until none are left

This includes feedback from previous transitions and continues until we reach a steady state

trigger_profile() None[source]

Get a frame from all actively computing threads

Merge these frames into existing profile counts

property worker_address

For API compatibility with Nanny