Worker

Overview

Workers provide two functions:

  1. Compute tasks as directed by the scheduler

  2. Store and serve computed results to other workers or clients

Each worker contains a ThreadPool that it uses to evaluate tasks as requested by the scheduler. It stores the results of these tasks locally and serves them to other workers or clients on demand. If the worker is asked to evaluate a task for which it does not have all of the necessary data then it will reach out to its peer workers to gather the necessary dependencies.

A typical conversation between a scheduler and two workers Alice and Bob may look like the following:

Scheduler -> Alice:  Compute ``x <- add(1, 2)``!
Alice -> Scheduler:  I've computed x and am holding on to it!

Scheduler -> Bob:    Compute ``y <- add(x, 10)``!
                     You will need x.  Alice has x.
Bob -> Alice:        Please send me x.
Alice -> Bob:        Sure.  x is 3!
Bob -> Scheduler:    I've computed y and am holding on to it!

Storing Data

Data is stored locally in a dictionary in the .data attribute that maps keys to the results of function calls.

>>> worker.data
{'x': 3,
 'y': 13,
 ...
 '(df, 0)': pd.DataFrame(...),
 ...
 }

This .data attribute is a MutableMapping that is typically a combination of in-memory and on-disk storage with an LRU policy to move data between them.

Thread Pool

Each worker sends computations to a thread in a concurrent.futures.ThreadPoolExecutor for computation. These computations occur in the same process as the Worker communication server so that they can access and share data efficiently between each other. For the purposes of data locality all threads within a worker are considered the same worker.

If your computations are mostly numeric in nature (for example NumPy and Pandas computations) and release the GIL entirely then it is advisable to run dask-worker processes with many threads and one process. This reduces communication costs and generally simplifies deployment.

If your computations are mostly Python code and don’t release the GIL then it is advisable to run dask-worker processes with many processes and one thread per process:

$ dask-worker scheduler:8786 --nprocs 8 --nthreads 1

This will launch 8 worker processes each of which has its own ThreadPoolExecutor of size 1.

If your computations are external to Python and long-running and don’t release the GIL then beware that while the computation is running the worker process will not be able to communicate to other workers or to the scheduler. This situation should be avoided. If you don’t link in your own custom C/Fortran code then this topic probably doesn’t apply.

Command Line tool

Use the dask-worker command line tool to start an individual worker. For more details on the command line options, please have a look at the command line tools documentation.

Internal Scheduling

Internally tasks that come to the scheduler proceed through the following pipeline as distributed.worker.TaskState objects. Tasks which follow this path have a distributed.worker.TaskState.runspec defined which instructs the worker how to execute them.

Dask worker task states

Data dependencies are also represented as distributed.worker.TaskState objects and follow a simpler path through the execution pipeline. These tasks do not have a distributed.worker.TaskState.runspec defined and instead contain a listing of workers to collect their result from.

Dask worker dependency states

As tasks arrive they are prioritized and put into a heap. They are then taken from this heap in turn to have any remote dependencies collected. For each dependency we select a worker at random that has that data and collect the dependency from that worker. To improve bandwidth we opportunistically gather other dependencies of other tasks that are known to be on that worker, up to a maximum of 200MB of data (too little data and bandwidth suffers, too much data and responsiveness suffers). We use a fixed number of connections (around 10-50) so as to avoid overly-fragmenting our network bandwidth. In the event that the network comms between two workers are saturated, a dependency task may cycle between fetch and flight until it is successfully collected.

After all dependencies for a task are in memory we transition the task to the ready state and put the task again into a heap of tasks that are ready to run.

We collect from this heap and put the task into a thread from a local thread pool to execute.

Optionally, this task may identify itself as a long-running task (see Tasks launching tasks), at which point it secedes from the thread pool.

A task either errs or its result is put into memory. In either case a response is sent back to the scheduler.

Tasks slated for execution and tasks marked for collection from other workers must follow their respective transition paths as defined above. The only exceptions to this are when:

  • A task is stolen, in which case a task which might have been collected will instead be executed on the thieving worker

  • Scheduler intercession, in which the scheduler reassigns a task that was previously assigned to a separate worker to a new worker. This most commonly occurs when a worker dies during computation.

Memory Management

Workers are given a target memory limit to stay under with the command line --memory-limit keyword or the memory_limit= Python keyword argument, which sets the memory limit per worker processes launched by dask-worker

$ dask-worker tcp://scheduler:port --memory-limit=auto  # TOTAL_MEMORY * min(1, nthreads / total_nthreads)
$ dask-worker tcp://scheduler:port --memory-limit="4 GiB"  # four gigabytes per worker process.

Workers use a few different heuristics to keep memory use beneath this limit:

  1. At 60% of memory load (as estimated by sizeof), spill least recently used data to disk

  2. At 70% of memory load (as reported by the OS), spill least recently used data to disk regardless of what is reported by sizeof; this accounts for memory used by the python interpreter, modules, global variables, memory leaks, etc.

  3. At 80% of memory load (as reported by the OS), stop accepting new work on local thread pool

  4. At 95% of memory load (as reported by the OS), terminate and restart the worker

These values can be configured by modifying the ~/.config/dask/distributed.yaml file:

distributed:
  worker:
    # Fractions of worker memory at which we take action to avoid memory blowup
    # Set any of the lower three values to False to turn off the behavior entirely
    memory:
      target: 0.60  # target fraction to stay below
      spill: 0.70  # fraction at which we spill to disk
      pause: 0.80  # fraction at which we pause worker threads
      terminate: 0.95  # fraction at which we terminate the worker

Spill data to disk

Every time the worker finishes a task it estimates the size in bytes that the result costs to keep in memory using the sizeof function. This function defaults to sys.getsizeof() for arbitrary objects, which uses the standard Python __sizeof__ protocol, but also has special-cased implementations for common data types like NumPy arrays and Pandas dataframes.

When the sum of the number of bytes of the data in memory exceeds 60% of the memory limit, the worker will begin to dump the least recently used data to disk. You can control this location with the --local-directory keyword.:

$ dask-worker tcp://scheduler:port --memory-limit="4 GiB" --local-directory /scratch

That data is still available and will be read back from disk when necessary. On the diagnostic dashboard status page disk I/O will show up in the task stream plot as orange blocks. Additionally, the memory plot in the upper left will become yellow and then red.

Monitor process memory load

The approach above can fail for a few reasons:

  1. Custom objects may not report their memory size accurately

  2. User functions may take up more RAM than expected

  3. Significant amounts of data may accumulate in network I/O buffers

To address this we periodically monitor the memory of the worker process every 200 ms. If the system reported memory use is above 70% of the target memory usage then the worker will start dumping unused data to disk, even if internal sizeof recording hasn’t yet reached the normal 60% limit.

Halt worker threads

At 80% load, the worker’s thread pool will stop accepting new tasks. This gives time for the write-to-disk functionality to take effect even in the face of rapidly accumulating data.

Kill Worker

At 95% memory load, a worker’s nanny process will terminate it. This is to avoid having our worker job being terminated by an external job scheduler (like YARN, Mesos, SGE, etc..). After termination, the nanny will restart the worker in a fresh state.

Using the dashboard to monitor memory usage

The dashboard (typically available on port 8787) shows a summary of the overall memory usage on the cluster, as well as the individual usage on each worker. It provides different memory readings:

process

Overall memory used by the worker process (RSS), as measured by the OS

managed

This is the sum of the sizeof of all dask data stored on the worker, excluding spilled data.

unmanaged

This is the memory usage that dask is not directly aware of. It is estimated by subtracting managed memory from the total process memory and typically includes:

  • The Python interpreter code, loaded modules, and global variables

  • Memory temporarily used by running tasks

  • Dereferenced Python objects that have not been garbage-collected yet

  • Unused memory that the Python memory allocator did not return to libc through free yet

  • Unused memory that the user-space libc free function did not release to the OS yet (see memory allocators below)

  • Memory fragmentation

  • Memory leaks

unmanaged recent

Unmanaged memory that has appeared within the last 30 seconds. This is not included in the ‘unmanaged’ memory measure above. Ideally, this memory should be for the most part a temporary spike caused by tasks’ heap use plus soon-to-be garbage collected objects.

The time it takes for unmanaged memory to transition away from its “recent” state can be tweaked through the distributed.worker.memory.recent-to-old-time key in the ~/.config/dask/distributed.yaml file. If your tasks typically run for longer than 30 seconds, it’s recommended that you increase this setting accordingly.

By default, distributed.Client.rebalance() and distributed.scheduler.Scheduler.rebalance() ignore unmanaged recent memory. This behaviour can also be tweaked using the dask config - see the methods’ documentation.

spilled

managed memory that has been spilled to disk. This is not included in the ‘managed’ measure above.

The sum of managed + unmanaged + unmanaged recent is equal by definition to the process memory.

Memory not released back to the OS

In many cases, high unmanaged memory usage or “memory leak” warnings on workers can be misleading: a worker may not actually be using its memory for anything, but simply hasn’t returned that unused memory back to the operating system, and is hoarding it just in case it needs the memory capacity again. This is not a bug in your code, nor in Dask — it’s actually normal behavior for all processes on Linux and MacOS, and is a consequence of how the low-level memory allocator works (see below for details).

Because Dask makes decisions (spill-to-disk, pause, terminate, rebalance()) based on the worker’s memory usage as reported by the OS, and is unaware of how much of this memory is actually in use versus empty and “hoarded”, it can overestimate — sometimes significantly — how much memory the process is using and think the worker is running out of memory when in fact it isn’t.

More in detail: both the Linux and MacOS memory allocators try to avoid performing a brk kernel call every time the application calls free by implementing a user-space memory management system. Upon free, memory can remain allocated in user space and potentially reusable by the next malloc - which in turn won’t require a kernel call either. This is generally very desirable for C/C++ applications which have no memory allocator of their own, as it can drastically boost performance at the cost of a larger memory footprint. CPython however adds its own memory allocator on top, which reduces the need for this additional abstraction (with caveats).

There are steps you can take to alleviate situations where worker memory is not released back to the OS. These steps are discussed in the following sections.

Manually trim memory

Linux workers only

It is possible to forcefully release allocated but unutilized memory as follows:

import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

client.run(trim_memory)

This should be only used as a one-off debugging experiment. Watch the dashboard while running the above code. If unmanaged worker memory (on the “Bytes stored” plot) decreases significantly after calling client.run(trim_memory), then move on to the next section. Otherwise, you likely do have a memory leak.

Note that you should only run this malloc_trim if you are using the default glibc memory allocator. When using a custom allocator such as jemalloc (see below), this could cause unexpected behavior including segfaults. (If you don’t know what this means, you’re probably using the default glibc allocator and are safe to run this).

Automatically trim memory

Linux workers only

To aggressively and automatically trim the memory in a production environment, you should instead set the environment variable MALLOC_TRIM_THRESHOLD_ (note the final underscore) to 0 or a low number; see the mallopt man page for details. Reducing this value will increase the number of syscalls, and as a consequence may degrade performance.

Note

The variable must be set before starting the dask-worker process.

Note

If using a Nanny, the MALLOC_TRIM_THRESHOLD_ environment variable will automatically be set to 65536 for the worker process which the nanny is monitoring. You can modify this behavior using the distributed.nanny.environ configuration value.

jemalloc

Linux and MacOS workers

Alternatively to the above, you may experiment with the jemalloc memory allocator, as follows:

On Linux:

conda install jemalloc
LD_PRELOAD=$CONDA_PREFIX/lib/libjemalloc.so dask-worker <...>

On macOS:

conda install jemalloc
DYLD_INSERT_LIBRARIES=$CONDA_PREFIX/lib/libjemalloc.dylib dask-worker <...>

Alternatively on macOS, install globally with homebrew:

brew install jemalloc
DYLD_INSERT_LIBRARIES=$(brew --prefix jemalloc)/lib/libjemalloc.dylib dask-worker <...>

jemalloc offers a wealth of configuration settings; please refer to its documentation.

Ignore process memory

If all else fails, you may want to stop dask from using memory metrics from the OS (RSS) in its decision-making:

distributed:
  worker:
    memory:
      rebalance:
        measure: managed_in_memory
      spill: false
      pause: false
      terminate: false

This of course will be problematic if you have a genuine issue with unmanaged memory, e.g. memory leaks and/or suffer from heavy fragmentation.

Nanny

Dask workers are by default launched, monitored, and managed by a small Nanny process.

class distributed.nanny.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port=0, nthreads=None, ncores=None, loop=None, local_dir=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port=None, protocol=None, config=None, **worker_kwargs)[source]

A process to manage worker processes

The nanny spins up Worker processes, watches then, and kills or restarts them as necessary. It is necessary if you want to use the Client.restart method, or to restart the worker automatically if it gets to the terminate fraction of its memory limit.

The parameters for the Nanny are mostly the same as those for the Worker with exceptions listed below.

Parameters
env: dict, optional

Environment variables set at time of Nanny initialization will be ensured to be set in the Worker process as well. This argument allows to overwrite or otherwise set environment variables for the Worker. It is also possible to set environment variables using the option distributed.nanny.environ. Precedence as follows

  1. Nanny arguments

  2. Existing environment variables

  3. Dask configuration

See also

Worker

API Documentation

class distributed.worker.TaskState(key, runspec=None)[source]

Holds volatile state relating to an individual Dask task

  • dependencies: set(TaskState instances)

    The data needed by this key to run

  • dependents: set(TaskState instances)

    The keys that use this dependency.

  • duration: float

    Expected duration the a task

  • priority: tuple

    The priority this task given by the scheduler. Determines run order.

  • state: str

    The current state of the task. One of [“waiting”, “ready”, “executing”, “fetch”, “memory”, “flight”, “long-running”, “rescheduled”, “error”]

  • who_has: set(worker)

    Workers that we believe have this data

  • coming_from: str

    The worker that current task data is coming from if task is in flight

  • waiting_for_data: set(keys of dependencies)

    A dynamic version of dependencies. All dependencies that we still don’t have for a particular key.

  • resource_restrictions: {str: number}

    Abstract resources required to run a task

  • exception: str

    The exception caused by running a task if it erred

  • traceback: str

    The exception caused by running a task if it erred

  • type: type

    The type of a particular piece of data

  • suspicious_count: int

    The number of times a dependency has not been where we expected it

  • startstops: [{startstop}]

    Log of transfer, load, and compute times for a task

  • start_time: float

    Time at which task begins running

  • stop_time: float

    Time at which task finishes running

  • metadata: dict

    Metadata related to task. Stored metadata should be msgpack serializable (e.g. int, string, list, dict).

  • nbytes: int

    The size of a particular piece of data

  • annotations: dict

    Task annotations

Parameters
key: str
runspec: SerializedTask

A named tuple containing the function, args, kwargs and task associated with this TaskState instance. This defaults to None and can remain empty if it is a dependency that this worker will receive from another worker.

class distributed.worker.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, ncores: None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_dir: None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool = True, memory_limit: str | float = 'auto', executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, memory_monitor_interval: Any = '200ms', memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, extensions: list[type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, data: MutableMapping[str, Any] | Callable[[], MutableMapping[str, Any]] | tuple[Callable[..., MutableMapping[str, Any]], dict[str, Any]] | None = None, interface: str | None = None, host: str | None = None, port: int | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, **kwargs)[source]

Worker node in a Dask distributed cluster

Workers perform two functions:

  1. Serve data from a local dictionary

  2. Perform computation on that data and on data from peers

Workers keep the scheduler informed of their data and use that scheduler to gather data from other workers when necessary to perform a computation.

You can start a worker with the dask-worker command line application:

$ dask-worker scheduler-ip:port

Use the --help flag to see more options:

$ dask-worker --help

The rest of this docstring is about the internal state the the worker uses to manage and track internal computations.

State

Informational State

These attributes don’t change significantly during execution.

  • nthreads: int:

    Number of nthreads used by this worker process

  • executors: dict[str, concurrent.futures.Executor]:

    Executors used to perform computation. Always contains the default executor.

  • local_directory: path:

    Path on local machine to store temporary files

  • scheduler: rpc:

    Location of scheduler. See .ip/.port attributes.

  • name: string:

    Alias

  • services: {str: Server}:

    Auxiliary web servers running on this worker

  • service_ports: {str: port}:

  • total_out_connections: int

    The maximum number of concurrent outgoing requests for data

  • total_in_connections: int

    The maximum number of concurrent incoming requests for data

  • comm_threshold_bytes: int

    As long as the total number of bytes in flight is below this threshold we will not limit the number of outgoing connections for a single tasks dependency fetch.

  • batched_stream: BatchedSend

    A batched stream along which we communicate to the scheduler

  • log: [(message)]

    A structured and queryable log. See Worker.story

Volatile State

These attributes track the progress of tasks that this worker is trying to complete. In the descriptions below a key is the name of a task that we want to compute and dep is the name of a piece of dependent data that we want to collect from others.

  • tasks: {key: TaskState}

    The tasks currently executing on this worker (and any dependencies of those tasks)

  • data: {key: object}:

    Prefer using the host attribute instead of this, unless memory_limit and at least one of memory_target_fraction or memory_spill_fraction values are defined, in that case, this attribute is a zict.Buffer, from which information on LRU cache can be queried.

  • data.memory: {key: object}:

    Dictionary mapping keys to actual values stored in memory. Only available if condition for data being a zict.Buffer is met.

  • data.disk: {key: object}:

    Dictionary mapping keys to actual values stored on disk. Only available if condition for data being a zict.Buffer is met.

  • data_needed: deque(keys)

    The keys which still require data in order to execute, arranged in a deque

  • ready: [keys]

    Keys that are ready to run. Stored in a LIFO stack

  • constrained: [keys]

    Keys for which we have the data to run, but are waiting on abstract resources like GPUs. Stored in a FIFO deque

  • executing_count: int

    A count of tasks currently executing on this worker

  • executed_count: int

    A number of tasks that this worker has run in its lifetime

  • long_running: {keys}

    A set of keys of tasks that are running and have started their own long-running clients.

  • has_what: {worker: {deps}}

    The data that we care about that we think a worker has

  • pending_data_per_worker: {worker: [dep]}

    The data on each worker that we still want, prioritized as a deque

  • in_flight_tasks: int

    A count of the number of tasks that are coming to us in current peer-to-peer connections

  • in_flight_workers: {worker: {task}}

    The workers from which we are currently gathering data and the dependencies we expect from those connections

  • comm_bytes: int

    The total number of bytes in flight

  • threads: {key: int}

    The ID of the thread on which the task ran

  • active_threads: {int: key}

    The keys currently running on active threads

  • waiting_for_data_count: int

    A count of how many tasks are currently waiting for data

Parameters
scheduler_ip: str, optional
scheduler_port: int, optional
scheduler_file: str, optional
ip: str, optional
data: MutableMapping, type, None

The object to use for storage, builds a disk-backed LRU dict by default

nthreads: int, optional
loop: tornado.ioloop.IOLoop
local_directory: str, optional

Directory where we place local resources

name: str, optional
memory_limit: int, float, string

Number of bytes of memory that this worker should use. Set to zero for no limit. Set to ‘auto’ to calculate as system.MEMORY_LIMIT * min(1, nthreads / total_cores) Use strings or numbers like 5GB or 5e9

memory_target_fraction: float or False

Fraction of memory to try to stay beneath (default: read from config key distributed.worker.memory.target)

memory_spill_fraction: float or false

Fraction of memory at which we start spilling to disk (default: read from config key distributed.worker.memory.spill)

memory_pause_fraction: float or False

Fraction of memory at which we stop running new tasks (default: read from config key distributed.worker.memory.pause)

executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
The executor(s) to use. Depending on the type, it has the following meanings:
  • Executor instance: The default executor.

  • Dict[str, Executor]: mapping names to Executor instances. If the “default” key isn’t in the dict, a “default” executor will be created using ThreadPoolExecutor(nthreads).

  • Str: The string “offload”, which refer to the same thread pool used for offloading communications. This results in the same thread being used for deserialization and computation.

resources: dict

Resources that this worker has like {'GPU': 2}

nanny: str

Address on which to contact nanny, if it exists

lifetime: str

Amount of time like “1 hour” after which we gracefully shut down the worker. This defaults to None, meaning no explicit shutdown time.

lifetime_stagger: str

Amount of time like “5 minutes” to stagger the lifetime value The actual lifetime will be selected uniformly at random between lifetime +/- lifetime_stagger

lifetime_restart: bool

Whether or not to restart a worker after it has reached its lifetime Default False

kwargs: optional

Additional parameters to ServerNode constructor

Examples

Use the command line to start a worker:

$ dask-scheduler
Start scheduler at 127.0.0.1:8786

$ dask-worker 127.0.0.1:8786
Start worker at:               127.0.0.1:1234
Registered with scheduler at:  127.0.0.1:8786