$ pip install distributed --upgrade

See installation document for more information.

Setup Dask.distributed the Hard Way

Set up scheduler and worker processes on your local computer:

$ dask-scheduler
Scheduler started at

$ dask-worker
$ dask-worker
$ dask-worker


At least one dask-worker must be running after launching a scheduler.

Launch an Executor and point it to the IP/port of the scheduler.

>>> from distributed import Executor
>>> executor = Executor('')

See setup for advanced use.

Setup Dask.distributed the Easy Way

If you create an executor without providing an address it will start up a local scheduler and worker for you.

>>> from distributed import Executor
>>> executor = Executor()
>>> executor
<Executor: scheduler="" processes=8 cores=8>

Map and Submit Functions

Use the map and submit methods to launch computations on the cluster. The map/submit functions send the function and arguments to the remote workers for processing. They return Future objects that refer to remote data on the cluster. The Future returns immediately while the computations run remotely in the background.

>>> def square(x):
        return x ** 2

>>> def neg(x):
        return -x

>>> A = executor.map(square, range(10))
>>> B = executor.map(neg, A)
>>> total = executor.submit(sum, B)
>>> total.result()


The map/submit functions return Future objects, lightweight tokens that refer to results on the cluster. By default the results of computations stay on the cluster.

>>> total  # Function hasn't yet completed
<Future: status: waiting, key: sum-58999c52e0fa35c7d7346c098f5085c7>

>>> total  # Function completed, result ready on remote worker
<Future: status: finished, key: sum-58999c52e0fa35c7d7346c098f5085c7>

Gather results to your local machine either with the Future.result method for a single future, or with the Executor.gather method for many futures at once.

>>> total.result()     # result for single future
>>> executor.gather(A) # gather for many futures
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


When things go wrong, or when you want to reset the cluster state, call the restart method.

>>> executor.restart()

See executor for advanced use.