Worker¶
Overview¶
Workers provide two functions:
- Compute tasks as directed by the scheduler
- Store and serve computed results to other workers or clients
Each worker contains a ThreadPool that it uses to evaluate tasks as requested by the scheduler. It stores the results of these tasks locally and serves them to other workers or clients on demand. If the worker is asked to evaluate a task for which it does not have all of the necessary data then it will reach out to its peer workers to gather the necessary dependencies.
A typical conversation between a scheduler and two workers Alice and Bob may look like the following:
Scheduler -> Alice: Compute ``x <- add(1, 2)``!
Alice -> Scheduler: I've computed x and am holding on to it!
Scheduler -> Bob: Compute ``y <- add(x, 10)``!
You will need x. Alice has x.
Bob -> Alice: Please send me x.
Alice -> Bob: Sure. x is 3!
Bob -> Scheduler: I've computed y and am holding on to it!
Storing Data¶
Data is stored locally in a dictionary in the .data
attribute that
maps keys to the results of function calls.
>>> worker.data
{'x': 3,
'y': 13,
...
'(df, 0)': pd.DataFrame(...),
...
}
This .data
attribute is a MutableMapping
that is typically a
combination of in-memory and on-disk storage with an LRU policy to move data
between them.
Thread Pool¶
Each worker sends computations to a thread in a concurrent.futures.ThreadPoolExecutor for computation. These computations occur in the same process as the Worker communication server so that they can access and share data efficiently between each other. For the purposes of data locality all threads within a worker are considered the same worker.
If your computations are mostly numeric in nature (for example NumPy and Pandas
computations) and release the GIL entirely then it is advisable to run
dask-worker
processes with many threads and one process. This reduces
communication costs and generally simplifies deployment.
If your computations are mostly Python code and don’t release the GIL then it
is advisable to run dask-worker
processes with many processes and one
thread per process:
$ dask-worker scheduler:8786 --nprocs 8 --nthreads 1
This will launch 8 worker processes each of which has its own ThreadPoolExecutor of size 1.
If your computations are external to Python and long-running and don’t release the GIL then beware that while the computation is running the worker process will not be able to communicate to other workers or to the scheduler. This situation should be avoided. If you don’t link in your own custom C/Fortran code then this topic probably doesn’t apply.
Command Line tool¶
Use the dask-worker
command line tool to start an individual worker. For
more details on the command line options, please have a look at the
command line tools documentation.
Internal Scheduling¶
Internally tasks that come to the scheduler proceed through the following
pipeline as distributed.worker.TaskState
objects. Tasks which
follow this path have a distributed.worker.TaskState.runspec
defined
which instructs the worker how to execute them.
Data dependencies are also represented as
distributed.worker.TaskState
objects and follow a simpler path
through the execution pipeline. These tasks do not have a
distributed.worker.TaskState.runspec
defined and instead contain a
listing of workers to collect their result from.
As tasks arrive they are prioritized and put into a heap. They are then taken
from this heap in turn to have any remote dependencies collected. For each
dependency we select a worker at random that has that data and collect the
dependency from that worker. To improve bandwidth we opportunistically gather
other dependencies of other tasks that are known to be on that worker, up to a
maximum of 200MB of data (too little data and bandwidth suffers, too much data
and responsiveness suffers). We use a fixed number of connections (around
10-50) so as to avoid overly-fragmenting our network bandwidth. In the event
that the network comms between two workers are saturated, a dependency task may
cycle between fetch
and flight
until it is successfully collected.
After all dependencies for a task are in memory we transition the task to the ready state and put the task again into a heap of tasks that are ready to run.
We collect from this heap and put the task into a thread from a local thread pool to execute.
Optionally, this task may identify itself as a long-running task (see Tasks launching tasks), at which point it secedes from the thread pool.
A task either errs or its result is put into memory. In either case a response is sent back to the scheduler.
Tasks slated for execution and tasks marked for collection from other workers must follow their respective transition paths as defined above. The only exceptions to this are when:
- A task is stolen, in which case a task which might have been collected will instead be executed on the thieving worker
- Scheduler intercession, in which the scheduler reassigns a task that was previously assigned to a separate worker to a new worker. This most commonly occurs when a worker dies during computation.
Memory Management¶
Workers are given a target memory limit to stay under with the
command line --memory-limit
keyword or the memory_limit=
Python
keyword argument, which sets the memory limit per worker processes launched
by dask-worker
$ dask-worker tcp://scheduler:port --memory-limit=auto # TOTAL_MEMORY * min(1, nthreads / total_nthreads)
$ dask-worker tcp://scheduler:port --memory-limit=4e9 # four gigabytes per worker process.
Workers use a few different heuristics to keep memory use beneath this limit:
- At 60% of memory load (as estimated by
sizeof
), spill least recently used data to disk - At 70% of memory load, spill least recently used data to disk regardless of
what is reported by
sizeof
- At 80% of memory load, stop accepting new work on local thread pool
- At 95% of memory load, terminate and restart the worker
These values can be configured by modifying the ~/.config/dask/distributed.yaml
file
distributed:
worker:
# Fractions of worker memory at which we take action to avoid memory blowup
# Set any of the lower three values to False to turn off the behavior entirely
memory:
target: 0.60 # target fraction to stay below
spill: 0.70 # fraction at which we spill to disk
pause: 0.80 # fraction at which we pause worker threads
terminate: 0.95 # fraction at which we terminate the worker
Spill data to Disk¶
Every time the worker finishes a task it estimates the size in bytes that the
result costs to keep in memory using the sizeof
function. This function
defaults to sys.getsizeof
for arbitrary objects which uses the standard
Python __sizeof__ protocol, but also has
special-cased implementations for common data types like NumPy arrays and
Pandas dataframes.
When the sum of the number of bytes of the data in memory exceeds 60% of the
available threshold the worker will begin to dump the least recently used data
to disk. You can control this location with the --local-directory
keyword.:
$ dask-worker tcp://scheduler:port --memory-limit 4e9 --local-directory /scratch
That data is still available and will be read back from disk when necessary. On the diagnostic dashboard status page disk I/O will show up in the task stream plot as orange blocks. Additionally the memory plot in the upper left will become orange and then red.
Monitor process memory load¶
The approach above can fail for a few reasons
- Custom objects may not report their memory size accurately
- User functions may take up more RAM than expected
- Significant amounts of data may accumulate in network I/O buffers
To address this we periodically monitor the memory of the worker process every
200 ms. If the system reported memory use is above 70% of the target memory
usage then the worker will start dumping unused data to disk, even if internal
sizeof
recording hasn’t yet reached the normal 60% limit.
Halt worker threads¶
At 80% load the worker’s thread pool will stop accepting new tasks. This gives time for the write-to-disk functionality to take effect even in the face of rapidly accumulating data.
Kill Worker¶
At 95% memory load a worker’s nanny process will terminate it. This is to avoid having our worker job being terminated by an external job scheduler (like YARN, Mesos, SGE, etc..). After termination the nanny will restart the worker in a fresh state.
Nanny¶
Dask workers are by default launched, monitored, and managed by a small Nanny process.
-
class
distributed.nanny.
Nanny
(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port=0, nthreads=None, ncores=None, loop=None, local_dir=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port=None, protocol=None, config=None, **worker_kwargs)[source]¶ A process to manage worker processes
The nanny spins up Worker processes, watches then, and kills or restarts them as necessary. It is necessary if you want to use the
Client.restart
method, or to restart the worker automatically if it gets to the terminate fractiom of its memory limit.The parameters for the Nanny are mostly the same as those for the Worker.
See also
Worker
API Documentation¶
-
class
distributed.worker.
TaskState
(key, runspec=None)[source]¶ Holds volatile state relating to an individual Dask task
- dependencies:
set(TaskState instances)
- The data needed by this key to run
- dependencies:
- dependents:
set(TaskState instances)
- The keys that use this dependency. Only keys which are not available already are tracked in this structure and dependents made available are actively removed. Only after all dependents have been removed, this task is allowed to be forgotten
- dependents:
- duration:
float
- Expected duration the a task
- duration:
- priority:
tuple
- The priority this task given by the scheduler. Determines run order.
- priority:
- state:
str
- The current state of the task. One of [“waiting”, “ready”, “executing”, “fetch”, “memory”, “flight”, “long-running”, “rescheduled”, “error”]
- state:
- who_has:
set(worker)
- Workers that we believe have this data
- who_has:
- coming_from:
str
- The worker that current task data is coming from if task is in flight
- coming_from:
- waiting_for_data:
set(keys of dependencies)
- A dynamic version of dependencies. All dependencies that we still don’t have for a particular key.
- waiting_for_data:
- resource_restrictions:
{str: number}
- Abstract resources required to run a task
- resource_restrictions:
- exception:
str
- The exception caused by running a task if it erred
- exception:
- traceback:
str
- The exception caused by running a task if it erred
- traceback:
- type:
type
- The type of a particular piece of data
- type:
- suspicious_count:
int
- The number of times a dependency has not been where we expected it
- suspicious_count:
- startstops:
[{startstop}]
- Log of transfer, load, and compute times for a task
- startstops:
- start_time:
float
- Time at which task begins running
- start_time:
- stop_time:
float
- Time at which task finishes running
- stop_time:
- metadata:
dict
- Metadata related to task. Stored metadata should be msgpack serializable (e.g. int, string, list, dict).
- metadata:
- nbytes:
int
- The size of a particular piece of data
- nbytes:
Parameters: - key: str
- runspec: SerializedTask
A named tuple containing the
function
,args
,kwargs
andtask
associated with this TaskState instance. This defaults toNone
and can remain empty if it is a dependency that this worker will receive from another worker.
-
class
distributed.worker.
Worker
(scheduler_ip=None, scheduler_port=None, scheduler_file=None, ncores=None, nthreads=None, loop=None, local_dir=None, local_directory=None, services=None, service_ports=None, service_kwargs=None, name=None, reconnect=True, memory_limit='auto', executor=None, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, security=None, contact_address=None, memory_monitor_interval='200ms', extensions=None, metrics={}, startup_information={}, data=None, interface=None, host=None, port=None, protocol=None, dashboard_address=None, dashboard=False, http_prefix='/', nanny=None, plugins=(), low_level_profiler=False, validate=None, profile_cycle_interval=None, lifetime=None, lifetime_stagger=None, lifetime_restart=None, **kwargs)[source]¶ Worker node in a Dask distributed cluster
Workers perform two functions:
- Serve data from a local dictionary
- Perform computation on that data and on data from peers
Workers keep the scheduler informed of their data and use that scheduler to gather data from other workers when necessary to perform a computation.
You can start a worker with the
dask-worker
command line application:$ dask-worker scheduler-ip:port
Use the
--help
flag to see more options:$ dask-worker --help
The rest of this docstring is about the internal state the the worker uses to manage and track internal computations.
State
Informational State
These attributes don’t change significantly during execution.
- nthreads:
int
: - Number of nthreads used by this worker process
- nthreads:
- executor:
concurrent.futures.ThreadPoolExecutor
: - Executor used to perform computation This can also be the string “offload” in which case this uses the same thread pool used for offloading communications. This results in the same thread being used for deserialization and computation.
- executor:
- local_directory:
path
: - Path on local machine to store temporary files
- local_directory:
- scheduler:
rpc
: - Location of scheduler. See
.ip/.port
attributes.
- scheduler:
- name:
string
: - Alias
- name:
- services:
{str: Server}
: - Auxiliary web servers running on this worker
- services:
- service_ports:
{str: port}
: - total_out_connections:
int
- The maximum number of concurrent outgoing requests for data
- total_out_connections:
- total_in_connections:
int
- The maximum number of concurrent incoming requests for data
- total_in_connections:
- total_comm_nbytes:
int
- batched_stream:
BatchedSend
- A batched stream along which we communicate to the scheduler
- batched_stream:
- log:
[(message)]
- A structured and queryable log. See
Worker.story
- log:
Volatile State
These attributes track the progress of tasks that this worker is trying to complete. In the descriptions below a
key
is the name of a task that we want to compute anddep
is the name of a piece of dependent data that we want to collect from others.- tasks:
{key: TaskState}
- The tasks currently executing on this worker (and any dependencies of those tasks)
- tasks:
- data:
{key: object}
: - Prefer using the host attribute instead of this, unless memory_limit and at least one of memory_target_fraction or memory_spill_fraction values are defined, in that case, this attribute is a zict.Buffer, from which information on LRU cache can be queried.
- data:
- data.memory:
{key: object}
: - Dictionary mapping keys to actual values stored in memory. Only available if condition for data being a zict.Buffer is met.
- data.memory:
- data.disk:
{key: object}
: - Dictionary mapping keys to actual values stored on disk. Only available if condition for data being a zict.Buffer is met.
- data.disk:
- data_needed: deque(keys)
- The keys which still require data in order to execute, arranged in a deque
- ready: [keys]
- Keys that are ready to run. Stored in a LIFO stack
- constrained: [keys]
- Keys for which we have the data to run, but are waiting on abstract resources like GPUs. Stored in a FIFO deque
- executing_count:
int
- A count of tasks currently executing on this worker
- executing_count:
- executed_count: int
- A number of tasks that this worker has run in its lifetime
- long_running: {keys}
- A set of keys of tasks that are running and have started their own long-running clients.
- has_what:
{worker: {deps}}
- The data that we care about that we think a worker has
- has_what:
- pending_data_per_worker:
{worker: [dep]}
- The data on each worker that we still want, prioritized as a deque
- pending_data_per_worker:
- in_flight_tasks:
int
- A count of the number of tasks that are coming to us in current peer-to-peer connections
- in_flight_tasks:
- in_flight_workers:
{worker: {task}}
- The workers from which we are currently gathering data and the dependencies we expect from those connections
- in_flight_workers:
- comm_bytes:
int
- The total number of bytes in flight
- comm_bytes:
- threads:
{key: int}
- The ID of the thread on which the task ran
- threads:
- active_threads:
{int: key}
- The keys currently running on active threads
- active_threads:
- waiting_for_data_count:
int
- A count of how many tasks are currently waiting for data
- waiting_for_data_count:
Parameters: - scheduler_ip: str
- scheduler_port: int
- ip: str, optional
- data: MutableMapping, type, None
The object to use for storage, builds a disk-backed LRU dict by default
- nthreads: int, optional
- loop: tornado.ioloop.IOLoop
- local_directory: str, optional
Directory where we place local resources
- name: str, optional
- memory_limit: int, float, string
Number of bytes of memory that this worker should use. Set to zero for no limit. Set to ‘auto’ to calculate as system.MEMORY_LIMIT * min(1, nthreads / total_cores) Use strings or numbers like 5GB or 5e9
- memory_target_fraction: float
Fraction of memory to try to stay beneath
- memory_spill_fraction: float
Fraction of memory at which we start spilling to disk
- memory_pause_fraction: float
Fraction of memory at which we stop running new tasks
- executor: concurrent.futures.Executor
- resources: dict
Resources that this worker has like
{'GPU': 2}
- nanny: str
Address on which to contact nanny, if it exists
- lifetime: str
Amount of time like “1 hour” after which we gracefully shut down the worker. This defaults to None, meaning no explicit shutdown time.
- lifetime_stagger: str
Amount of time like “5 minutes” to stagger the lifetime value The actual lifetime will be selected uniformly at random between lifetime +/- lifetime_stagger
- lifetime_restart: bool
Whether or not to restart a worker after it has reached its lifetime Default False
Examples
Use the command line to start a worker:
$ dask-scheduler Start scheduler at 127.0.0.1:8786 $ dask-worker 127.0.0.1:8786 Start worker at: 127.0.0.1:1234 Registered with scheduler at: 127.0.0.1:8786