Scheduler

The scheduler orchestrates all work across the cluster. It tracks all active workers and clients and manages the workers to perform the tasks requested by the clients. It tracks the current state of the entire cluster, determining which tasks execute on which workers in what order. It is a Tornado TCPServer consisting of several concurrent coroutines running in a single event loop.

Design

The scheduler tracks the state of many tasks among many workers. It updates the state of tasks in response to stimuli from workers and from clients. After the update of any new information it ensures that the entire system is on track to complete the desired tasks.

For performance, all updates happen in constant time, regardless of the number of known tasks. To achieve constant-time performance the scheduler state is heavily indexed, with several data structures (around 20) indexing each other to achieve fast lookup. All state is a mixture of interwoven dictionaries, lists, and sets.

This interwoven collection of dictionaries, sets, and lists is difficult to update consistently without error. The introduction of small errors and race conditions leads to infrequent but deadlocking errors that erode confidence in the scheduler. To mitigate this complexity we introduce coarser scale task transitions that are easier to reason about and are themselves heavily tested. Most code that responds to external stimuli/events is then written as a sequence of task transitions.

Transitions

A task is a single Python function call on data intended to be run within a single worker. Tasks fall into the following states with the following allowed transitions

Dask scheduler task states
  • Released: known but not actively computing or in memory
  • Waiting: On track to be computed, waiting on dependencies to arrive in memory
  • Queue (ready): Ready to be computed by any worker
  • Stacks (ready): Ready to be computed by a particular preferred worker
  • No-worker (ready, rare): Ready to be computed, but no appropriate worker exists
  • Processing: Actively being computed by one or more workers
  • Memory: In memory on one or more workers
  • Erred: Task has computed and erred
  • Removed (not actually a state): Task is no longer needed by any client and so it removed from state

Every transition between states is a separate method in the scheduler. These task transition functions are prefixed with transition and then have the name of the start and finish task state like the following.

def transition_released_waiting(self, key):

def transition_processing_memory(self, key):

def transition_processing_erred(self, key):

These functions each have three effects.

  1. They perform the necessary transformations on the scheduler state (the 20 dicts/lists/sets) to move one key between states.
  2. They return a dictionary of recommended {key: state} transitions to enact directly afterwards. For example after we transition a key into memory we may find that many waiting keys are now ready to transition from waiting to a ready state.
  3. Optionally they include a set of validation checks that can be turned on for testing.

Rather than call these functions directly we call the central function transition:

def transition(self, key, final_state):
    """ Transition key to the suggested state """

This transition function finds the appropriate path from the current to the final state. Italso serves as a central point for logging and diagnostics.

Often we want to enact several transitions at once or want to continually respond to new transitions recommended by initial transitions until we reach a steady state. For that we use the transitions function (note the plural s).

def transitions(self, recommendations):
    recommendations = recommendations.copy()
    while recommendations:
        key, finish = recommendations.popitem()
        new = self.transition(key, finish)
        recommendations.update(new)

This function runs transition, takes the recommendations and runs them as well, repeating until no further task-transitions are recommended.

Stimuli

Transitions occur from stimuli, which are state-changing messages to the scheduler from workers or clients. The scheduler responds to the following stimuli:

  • Workers
    • Task finished: A task has completed on a worker and is now in memory
    • Task erred: A task ran and erred on a worker
    • Task missing data: A task tried to run but was unable to find necessary data on other workers
    • Worker added: A new worker was added to the network
    • Worker removed: An existing worker left the network
  • Clients
    • Update graph: The client sends more tasks to the scheduler
    • Release keys: The client no longer desires the result of certain keys

Stimuli functions are prepended with the text stimulus, and take a variety of keyword arguments from the message as in the following examples:

def stimulus_task_finished(self, key=None, worker=None, nbytes=None,
                           type=None, compute_start=None, compute_stop=None,
                           transfer_start=None, transfer_stop=None):

def stimulus_task_erred(self, key=None, worker=None,
                        exception=None, traceback=None)

These functions change some non-essential administrative state and then call transition functions.

Note that there are several other non-state-changing messages that we receive from the workers and clients, such as messages requesting information about the current state of the scheduler. These are not considered stimuli.

API

class distributed.scheduler.Scheduler(center=None, loop=None, max_buffer_size=2069891072.0, delete_interval=500, synchronize_worker_interval=5000, ip=None, services=None, allowed_failures=3, validate=False, **kwargs)[source]

Dynamic distributed task scheduler

The scheduler tracks the current state of workers, data, and computations. The scheduler listens for events and responds by controlling workers appropriately. It continuously tries to use the workers to execute an ever growing dask graph.

All events are handled quickly, in linear time with respect to their input (which is often of constant size) and generally within a millisecond. To accomplish this the scheduler tracks a lot of state. Every operation maintains the consistency of this state.

The scheduler communicates with the outside world through Tornado IOStreams It maintains a consistent and valid view of the world even when listening to several clients at once.

A Scheduler is typically started either with the dask-scheduler executable:

$ dask-scheduler
Scheduler started at 127.0.0.1:8786

Or within a LocalCluster a Executor starts up without connection information:

>>> e = Executor()  
>>> e.cluster.scheduler  
Scheduler(...)

Users typically do not interact with the scheduler directly but rather with the client object Executor.

State

The scheduler contains the following state variables. Each variable is listed along with what it stores and a brief description.

  • tasks: {key: task}:

    Dictionary mapping key to a serialized task like the following: {'function': b'...', 'args': b'...'} or {'task': b'...'}

  • dependencies: {key: {keys}}:

    Dictionary showing which keys depend on which others

  • dependents: {key: {keys}}:

    Dictionary showing which keys are dependent on which others

  • task_state: {key: string}:

    Dictionary listing the current state of every task among the following: released, waiting, stacks, queue, no-worker, processing, memory, erred

  • priority: {key: tuple}:

    A score per key that determines its priority

  • waiting: {key: {key}}:

    Dictionary like dependencies but excludes keys already computed

  • waiting_data: {key: {key}}:

    Dictionary like dependents but excludes keys already computed

  • ready: deque(key)

    Keys that are ready to run, but not yet assigned to a worker

  • stacks: {worker: [keys]}:

    List of keys waiting to be sent to each worker

  • processing: {worker: {key: cost}}:

    Set of keys currently in execution on each worker and their expected duration

  • rprocessing: {key: {worker}}:

    Set of workers currently executing a particular task

  • who_has: {key: {worker}}:

    Where each key lives. The current state of distributed memory.

  • has_what: {worker: {key}}:

    What worker has what keys. The transpose of who_has.

  • released: {keys}

    Set of keys that are known, but released from memory

  • unrunnable: {key}

    Keys that we are unable to run

  • retrictions: {key: {hostnames}}:

    A set of hostnames per key of where that key can be run. Usually this is empty unless a key has been specifically restricted to only run on certain hosts. These restrictions don’t include a worker port. Any worker on that hostname is deemed valid.

  • loose_retrictions: {key}:

    Set of keys for which we are allow to violate restrictions (see above) if not valid workers are present.

  • exceptions: {key: Exception}:

    A dict mapping keys to remote exceptions

  • tracebacks: {key: list}:

    A dict mapping keys to remote tracebacks stored as a list of strings

  • exceptions_blame: {key: key}:

    A dict mapping a key to another key on which it depends that has failed

  • suspicious_tasks: {key: int}

    Number of times a task has been involved in a worker failure

  • deleted_keys: {key: {workers}}

    Locations of workers that have keys that should be deleted

  • wants_what: {client: {key}}:

    What keys are wanted by each client.. The transpose of who_wants.

  • who_wants: {key: {client}}:

    Which clients want each key. The active targets of computation.

  • nbytes: {key: int}:

    Number of bytes for a key as reported by workers holding that key.

  • stealable: [[key]]

    A list of stacks of stealable keys, ordered by stealability

  • ncores: {worker: int}:

    Number of cores owned by each worker

  • idle: {worker}:

    Set of workers that are not fully utilized

  • worker_info: {worker: {str: data}}:

    Information about each worker

  • host_info: {hostname: dict}:

    Information about each worker host

  • occupancy: {worker: time}

    Expected runtime for all tasks currently processing on a worker

  • services: {str: port}:

    Other services running on this scheduler, like HTTP

  • loop: IOLoop:

    The running Torando IOLoop

  • streams: [IOStreams]:

    A list of Tornado IOStreams from which we both accept stimuli and report results

  • task_duration: {key-prefix: time}

    Time we expect certain functions to take, e.g. {'sum': 0.25}

  • coroutines: [Futures]:

    A list of active futures that control operation

  • scheduler_queues: [Queues]:

    A list of Tornado Queues from which we accept stimuli

  • report_queues: [Queues]:

    A list of Tornado Queues on which we report results

add_client(stream, client=None)[source]

Add client to network

We listen to all future messages from this IOStream.

add_keys(stream=None, address=None, keys=())[source]

Learn that a worker has certain keys

This should not be used in practice and is mostly here for legacy reasons.

add_plugin(plugin)[source]

Add external plugin to scheduler

See http://distributed.readthedocs.io/en/latest/plugins.html

add_worker(stream=None, address=None, keys=(), ncores=None, name=None, coerce_address=True, nbytes=None, now=None, host_info=None, **info)[source]

Add a new worker to the cluster

broadcast(stream=None, msg=None, workers=None, hosts=None)[source]

Broadcast message to workers, return all results

cancel_key(key, client, retries=5)[source]

Cancel a particular key and all dependents

change_worker_cores(stream=None, worker=None, diff=0)[source]

Add or remove cores from a worker

This is used when a worker wants to spin off a long-running task

cleanup()[source]

Clean up queues and coroutines, prepare to stop

clear_data_from_workers()[source]

Send delete signals to clear unused data from workers

This watches the .deleted_keys attribute, which stores a set of keys to be deleted from each worker. This function is run periodically by the ._delete_periodic_callback to actually remove tha data.

This runs every self.delete_interval milliseconds.

client_releases_keys(keys=None, client=None)[source]

Remove keys from client desired list

close(stream=None, fast=False)[source]

Send cleanup signal to all coroutines then wait until finished

close_streams()[source]

Close all active IOStreams

coerce_address(addr)[source]

Coerce possible input addresses to canonical form

Handles lists, strings, bytes, tuples, or aliases

correct_time_delay(worker, msg)[source]

Apply offset time delay in message times.

Clocks on different workers differ. We keep track of a relative “now” through periodic heartbeats. We use this known delay to align message times to Scheduler local time. In particular this helps with diagnostics.

Operates in place

ensure_occupied()[source]

Run ready tasks on idle workers

Work stealing policy

If some workers are idle but not others, if there are no globally ready tasks, and if there are tasks in worker stacks, then we start to pull preferred tasks from overburdened workers and deploy them back into the global pool in the following manner.

We determine the number of tasks to reclaim as the number of all tasks in all stacks times the fraction of idle workers to all workers. We sort the stacks by size and walk through them, reclaiming half of each stack until we have enough task to fill the global pool. We are careful not to reclaim tasks that are restricted to run on certain workers.

ensure_occupied_queue(worker, count)[source]

Send at most count tasks from the ready queue to the specified worker

ensure_occupied_stacks(worker)[source]

Send tasks to worker while it has tasks and free cores

These tasks may come from the worker’s own stacks or from the global ready deque.

We update the idle workers set appropriately.

feed(stream, function=None, setup=None, teardown=None, interval=1, **kwargs)[source]

Provides a data stream to external requester

Caution: this runs arbitrary Python code on the scheduler. This should eventually be phased out. It is mostly used by diagnostics.

finished()[source]

Wait until all coroutines have ceased

gather(stream=None, keys=None)[source]

Collect data in from workers

handle_messages(in_queue, report, client=None)[source]

The master client coroutine. Handles all inbound messages from clients.

This runs once per Client IOStream or Queue.

See also

Scheduler.worker_stream
The equivalent function for workers
handle_queues(scheduler_queue, report_queue)[source]

Register new control and report queues to the Scheduler

Queues are not in common use. This may be deprecated in the future.

identity(stream)[source]

Basic information about ourselves and our cluster

issaturated(worker, latency=0.005)[source]

Determine if a worker has enough work to avoid being idle

A worker is saturated if the following criteria are met

  1. It is working on at least as many tasks as it has cores
  2. The expected time it will take to complete all of its currently assigned tasks is at least a full round-trip time. This is relevant when it has many small tasks
rebalance(stream=None, keys=None, workers=None)[source]

Rebalance keys so that each worker stores roughly equal bytes

remove_client(client=None)[source]

Remove client from network

remove_plugin(plugin)[source]

Remove external plugin from scheduler

remove_worker(stream=None, address=None)[source]

Remove worker from cluster

We do this when a worker reports that it plans to leave or when it appears to be unresponsive. This may send its tasks back to a released state.

replicate(stream=None, keys=None, n=None, workers=None, branching_factor=2)[source]

Replicate data throughout cluster

This performs a tree copy of the data throughout the network individually on each piece of data.

Parameters:

keys: Iterable

list of keys to replicate

n: int

Number of replications we expect to see within the cluster

branching_factor: int, optional

The number of workers that can copy data in each generation

report(msg)[source]

Publish updates to all listening Queues and Streams

If the message contains a key then we only send the message to those streams that care about the key.

restart()[source]

Restart all workers. Reset local state.

scatter(stream=None, data=None, workers=None, client=None, broadcast=False)[source]

Send data out to workers

send_task_to_worker(worker, key)[source]

Send a single computational task to a worker

start(port=8786, start_queues=True)[source]

Clear out old state and restart all running coroutines

start_ipython(stream=None)[source]

Start an IPython kernel

Returns Jupyter connection info dictionary.

steal_time_ratio(key, bandwidth=100000000)[source]

The compute to communication time ratio of a key

Returns:

ratio: The compute/communication time ratio of the task

loc: The self.stealable bin into which this key should go

stimulus_cancel(stream, keys=None, client=None)[source]

Stop execution on a list of keys

stimulus_missing_data(keys=None, key=None, worker=None, ensure=True, **kwargs)[source]

Mark that certain keys have gone missing. Recover.

stimulus_task_erred(key=None, worker=None, exception=None, traceback=None, **kwargs)[source]

Mark that a task has erred on a particular worker

stimulus_task_finished(key=None, worker=None, **kwargs)[source]

Mark that a task has finished execution on a particular worker

transition(key, finish, *args, **kwargs)[source]

Transition a key from its current state to the finish state

Returns:Dictionary of recommendations for future transitions

See also

Scheduler.transitions
transitive version of this function

Examples

>>> self.transition('x', 'waiting')
{'x': 'ready'}
transition_story(*keys)[source]

Get all transitions that touch one of the input keys

transitions(recommendations)[source]

Process transitions until none are left

This includes feedback from previous transitions and continues until we reach a steady state

update_data(stream=None, who_has=None, nbytes=None, client=None)[source]

Learn that new data has entered the network from an external source

See also

Scheduler.mark_key_in_memory

update_graph(client=None, tasks=None, keys=None, dependencies=None, restrictions=None, priority=None, loose_restrictions=None)[source]

Add new computations to the internal dask graph

This happens whenever the Executor calls submit, map, get, or compute.

work_steal()[source]

Steal tasks from saturated workers to idle workers

This moves tasks from the bottom of the stacks of over-occupied workers to the stacks of idling workers.

worker_stream(worker)[source]

Listen to responses from a single worker

This is the main loop for scheduler-worker interaction

See also

Scheduler.handle_messages
Equivalent coroutine for clients
workers_list(workers)[source]

List of qualifying workers

Takes a list of worker addresses or hostnames. Returns a list of all worker addresses that match

distributed.scheduler.decide_worker(dependencies, stacks, processing, who_has, has_what, restrictions, loose_restrictions, nbytes, key)[source]

Decide which worker should take task

>>> dependencies = {'c': {'b'}, 'b': {'a'}}
>>> stacks = {'alice:8000': ['z'], 'bob:8000': []}
>>> processing = {'alice:8000': set(), 'bob:8000': set()}
>>> who_has = {'a': {'alice:8000'}}
>>> has_what = {'alice:8000': {'a'}}
>>> nbytes = {'a': 100}
>>> restrictions = {}
>>> loose_restrictions = set()

We choose the worker that has the data on which ‘b’ depends (alice has ‘a’)

>>> decide_worker(dependencies, stacks, processing, who_has, has_what,
...               restrictions, loose_restrictions, nbytes, 'b')
'alice:8000'

If both Alice and Bob have dependencies then we choose the less-busy worker

>>> who_has = {'a': {'alice:8000', 'bob:8000'}}
>>> has_what = {'alice:8000': {'a'}, 'bob:8000': {'a'}}
>>> decide_worker(dependencies, stacks, processing, who_has, has_what,
...               restrictions, loose_restrictions, nbytes, 'b')
'bob:8000'

Optionally provide restrictions of where jobs are allowed to occur

>>> restrictions = {'b': {'alice', 'charlie'}}
>>> decide_worker(dependencies, stacks, processing, who_has, has_what,
...               restrictions, loose_restrictions, nbytes, 'b')
'alice:8000'

If the task requires data communication, then we choose to minimize the number of bytes sent between workers. This takes precedence over worker occupancy.

>>> dependencies = {'c': {'a', 'b'}}
>>> who_has = {'a': {'alice:8000'}, 'b': {'bob:8000'}}
>>> has_what = {'alice:8000': {'a'}, 'bob:8000': {'b'}}
>>> nbytes = {'a': 1, 'b': 1000}
>>> stacks = {'alice:8000': [], 'bob:8000': []}
>>> decide_worker(dependencies, stacks, processing, who_has, has_what,
...               {}, set(), nbytes, 'c')
'bob:8000'