Worker¶
We build a distributed network from two kinds of nodes.
- A single scheduler node
- Several Worker nodes

This page describes the worker nodes.
Serve Data¶
Workers serve data from a local dictionary of data:
{'x': np.array(...),
'y': pd.DataFrame(...)}
Operations include normal dictionary operations, like get, set, and delete key-value pairs. In the following example we connect to two workers, collect data from one worker and send it to another.
alice = rpc(ip='192.168.0.101', port=8788)
d = yield alice.get_data(keys=['x', 'y'])
bob = rpc(ip='192.168.0.102', port=8788)
yield bob.update_data(data=d)
However, this is only an example, typically one does not manually manage data transfer between workers. They handle that as necessary on their own.
Compute¶
Workers evaluate functions provided by the user on their data. They evaluate functions either on their data or can automatically collect data from peers (as shown above) if they don’t have the necessary data but their peers do:
z <- add(x, y) # can be done with only local data
z <- add(x, a) # need to find out where we can get 'a'
The result of such a computation on our end is just a response b'OK'
. The
actual result stays on the remote worker.
>>> response, metadata = yield alice.compute(function=add, keys=['x', 'a'])
>>> response
b'OK'
>>> metadata
{'nbytes': 1024}
The worker also reports back to the center/scheduler whenever it completes a
computation. Metadata storage is centralized but all data transfer is
peer-to-peer. Here is a quick example of what happens during a call to
compute
:
client: Hey Alice! Compute ``z <- add(x, a)``
Alice: Hey Center! Who has a?
Center: Hey Alice! Bob has a.
Alice: Hey Bob! Send me a!
Bob: Hey Alice! Here's a!
Alice: Hey Client! I've computed z and am holding on to it!
Alice: Hey Center! I have z!
-
class
distributed.worker.
Worker
(center_ip, center_port, ip=None, ncores=None, loop=None, local_dir=None, services=None, service_ports=None, name=None, **kwargs)[source]¶ Worker Node
Workers perform two functions:
- Serve data from a local dictionary
- Perform computation on that data and on data from peers
Additionally workers keep a Center informed of their data and use that Center to gather data from other workers when necessary to perform a computation.
You can start a worker with the
dworker
command line application:$ dworker scheduler-ip:port
State
- data:
{key: object}
: Dictionary mapping keys to actual values
- data:
- active:
{key}
: Set of keys currently under computation
- active:
- ncores:
int
: Number of cores used by this worker process
- ncores:
- executor:
concurrent.futures.ThreadPoolExecutor
: Executor used to perform computation
- executor:
- local_dir:
path
: Path on local machine to store temporary files
- local_dir:
- center:
rpc
: Location of center or scheduler. See
.ip/.port
attributes.
- center:
- name:
string
: Alias
- name:
- services:
{str: Server}
: Auxiliary web servers running on this worker
- services:
service_ports:
{str: port}
:
See also
distributed.center.Center
Examples
Create centers and workers in Python:
>>> from distributed import Center, Worker >>> c = Center('192.168.0.100', 8787) >>> w = Worker(c.ip, c.port) >>> yield w._start(port=8788)
Or use the command line:
$ dcenter Start center at 127.0.0.1:8787 $ dworker 127.0.0.1:8787 Start worker at: 127.0.0.1:8788 Registered with center at: 127.0.0.1:8787