Scheduler¶

The scheduler orchestrates which workers work on which tasks in what order. It tracks the current state of the entire cluster. It consists of several coroutines rounning in a single event loop.
-
class
distributed.scheduler.
Scheduler
(center=None, loop=None, resource_interval=1, resource_log_size=1000, max_buffer_size=2069891072.0, delete_interval=500, ip=None, services=None, heartbeat_interval=500, **kwargs)[source]¶ Dynamic distributed task scheduler
The scheduler tracks the current state of workers, data, and computations. The scheduler listens for events and responds by controlling workers appropriately. It continuously tries to use the workers to execute an ever growing dask graph.
All events are handled quickly, in linear time with respect to their input (which is often of constant size) and generally within a millisecond. To accomplish this the scheduler tracks a lot of state. Every operation maintains the consistency of this state.
The scheduler communicates with the outside world either by adding pairs of in/out queues or by responding to a new IOStream (the Scheduler can operate as a typical distributed
Server
). It maintains a consistent and valid view of the world even when listening to several clients at once.A Scheduler is typically started either with the
dscheduler
executable:$ dscheduler 127.0.0.1:8787 # address of center
Or as part of when an Executor starts up and connects to a Center:
>>> e = Executor('127.0.0.1:8787') >>> e.scheduler Scheduler(...)
Users typically do not interact with the scheduler except through Plugins. See http://distributed.readthedocs.io/en/latest/plugins.html
State
- tasks:
{key: task}
: Dictionary mapping key to task, either dask task, or serialized dict like:
{'function': b'xxx', 'args': b'xxx'}
or{'task': b'xxx'}
- tasks:
- dependencies:
{key: {key}}
: Dictionary showing which keys depend on which others
- dependencies:
- dependents:
{key: {key}}
: Dictionary showing which keys are dependent on which others
- dependents:
- waiting:
{key: {key}}
: Dictionary like dependencies but excludes keys already computed
- waiting:
- waiting_data:
{key: {key}}
: Dictionary like dependents but excludes keys already computed
- waiting_data:
- ready:
deque(key)
Keys that are ready to run, but not yet assigned to a worker
- ready:
- ncores:
{worker: int}
: Number of cores owned by each worker
- ncores:
- idle:
{worker}
: Set of workers that are not fully utilized
- idle:
- services:
{str: port}
: Other services running on this scheduler, like HTTP
- services:
- worker_info:
{worker: {str: data}}
: Information about each worker
- worker_info:
- host_info:
{hostname: dict}
: Information about each worker host
- host_info:
- who_has:
{key: {worker}}
: Where each key lives. The current state of distributed memory.
- who_has:
- has_what:
{worker: {key}}
: What worker has what keys. The transpose of who_has.
- has_what:
- who_wants:
{key: {client}}
: Which clients want each key. The active targets of computation.
- who_wants:
- wants_what:
{client: {key}}
: What keys are wanted by each client.. The transpose of who_wants.
- wants_what:
- nbytes:
{key: int}
: Number of bytes for a key as reported by workers holding that key.
- nbytes:
- processing:
{worker: {key: cost}}
: Set of keys currently in execution on each worker and their expected duration
- processing:
- task_duration:
{key-prefix: time}
Time we expect certain functions to take, e.g.
{'sum': 0.25}
- task_duration:
- occupancy:
{worker: time}
Expected runtime for all tasks currently processing on a worker
- occupancy:
- stacks:
{worker: [keys]}
: List of keys waiting to be sent to each worker
- stacks:
- released:
{keys}
Set of keys that are known, but released from memory
- released:
- unrunnable:
{key}
Keys that we are unable to run
- unrunnable:
- retrictions:
{key: {hostnames}}
: A set of hostnames per key of where that key can be run. Usually this is empty unless a key has been specifically restricted to only run on certain hosts. These restrictions don’t include a worker port. Any worker on that hostname is deemed valid.
- retrictions:
- loose_retrictions:
{key}
: Set of keys for which we are allow to violate restrictions (see above) if not valid workers are present.
- loose_retrictions:
- keyorder:
{key: tuple}
: A score per key that determines its priority
- keyorder:
- scheduler_queues:
[Queues]
: A list of Tornado Queues from which we accept stimuli
- scheduler_queues:
- report_queues:
[Queues]
: A list of Tornado Queues on which we report results
- report_queues:
- streams:
[IOStreams]
: A list of Tornado IOStreams from which we both accept stimuli and report results
- streams:
- coroutines:
[Futures]
: A list of active futures that control operation
- coroutines:
- exceptions:
{key: Exception}
: A dict mapping keys to remote exceptions
- exceptions:
- tracebacks:
{key: list}
: A dict mapping keys to remote tracebacks stored as a list of strings
- tracebacks:
- exceptions_blame:
{key: key}
: A dict mapping a key to another key on which it depends that has failed
- exceptions_blame:
- deleted_keys:
{key: {workers}}
Locations of workers that have keys that should be deleted
- deleted_keys:
- loop:
IOLoop
: The running Torando IOLoop
- loop:
-
add_plugin
(plugin)[source]¶ Add external plugin to scheduler
See http://http://distributed.readthedocs.io/en/latest/plugins.html
-
broadcast
(stream=None, msg=None, workers=None)[source]¶ Broadcast message to workers, return all results
-
clear_data_from_workers
()[source]¶ This is intended to be run periodically,
The
self._delete_periodic_callback
attribute holds a PeriodicCallback that runs this everyself.delete_interval
milliseconds``.
-
coerce_address
(addr)[source]¶ Coerce possible input addresses to canonical form
Handles lists, strings, bytes, tuples, or aliases
-
ensure_idle_ready
()[source]¶ Run ready tasks on idle workers
Work stealing policy
If some workers are idle but not others, if there are no globally ready tasks, and if there are tasks in worker stacks, then we start to pull preferred tasks from overburdened workers and deploy them back into the global pool in the following manner.
We determine the number of tasks to reclaim as the number of all tasks in all stacks times the fraction of idle workers to all workers. We sort the stacks by size and walk through them, reclaiming half of each stack until we have enough task to fill the global pool. We are careful not to reclaim tasks that are restricted to run on certain workers.
-
ensure_in_play
(key)[source]¶ Ensure that a key is on track to enter memory in the future
This will only act on keys currently in self.released.
-
ensure_occupied_stacks
(worker)[source]¶ Send tasks to worker while it has tasks and free cores
These tasks may come from the worker’s own stacks or from the global ready deque.
We update the idle workers set appropriately.
-
forget
(key)[source]¶ Forget a key if no one cares about it
This removes all knowledge of how to produce a key from the scheduler. This is almost exclusively called by release_held_data
-
handle_messages
(in_queue, report, client=None)[source]¶ Master coroutine. Handles inbound messages.
This runs once per Queue or Stream.
-
handle_queues
(scheduler_queue, report_queue)[source]¶ Register new control and report queues to the Scheduler
-
mark_failed
(key, failing_key=None)[source]¶ When a task fails mark it and all dependent task as failed
-
mark_key_in_memory
(key, workers=None, type=None)[source]¶ Mark that a key now lives in distributed memory
-
mark_missing_data
(keys=None, key=None, worker=None, **kwargs)[source]¶ Mark that certain keys have gone missing. Recover.
See also
-
mark_ready_to_run
(key)[source]¶ Mark a task as ready to run.
If the task should be assigned to a worker then make that determination and assign appropriately. Otherwise place task in the ready queue.
Trigger appropriate workers if idle.
See also
decide_worker
,Scheduler.ensure_occupied
-
mark_task_erred
(key=None, worker=None, exception=None, traceback=None, **kwargs)[source]¶ Mark that a task has erred on a particular worker
See also
-
mark_task_finished
(key=None, worker=None, nbytes=None, type=None, compute_start=None, compute_stop=None, transfer_start=None, transfer_stop=None, **kwargs)[source]¶ Mark that a task has finished execution on a particular worker
-
recover_missing
(key)[source]¶ Recover a recently lost piece of data
This assumes that we’ve already removed this key from who_has/has_what.
-
release_held_data
(keys=None)[source]¶ Mark that a key is no longer externally required to be in memory
-
release_live_dependencies
(key)[source]¶ We no longer need to keep data in memory to compute this
This occurs after we’ve computed it or after we’ve forgotten it
-
remove_worker
(stream=None, address=None)[source]¶ Mark that a worker no longer seems responsive
See also
-
replicate
(stream=None, keys=None, n=None, workers=None, branching_factor=2)[source]¶ Replicate data throughout cluster
This performs a tree copy of the data throughout the network individually on each piece of data.
Parameters: keys: Iterable
list of keys to replicate
n: int
Number of replications we expect to see within the cluster
branching_factor: int, optional
The number of workers that can copy data in each generation
See also
Scheduler.rebalance
-
scatter
(stream=None, data=None, workers=None, client=None, broadcast=False)[source]¶ Send data out to workers
-
update_data
(who_has=None, nbytes=None, client=None)[source]¶ Learn that new data has entered the network from an external source
See also
-
distributed.scheduler.
decide_worker
(dependencies, stacks, processing, who_has, has_what, restrictions, loose_restrictions, nbytes, key)[source]¶ Decide which worker should take task
>>> dependencies = {'c': {'b'}, 'b': {'a'}} >>> stacks = {'alice:8000': ['z'], 'bob:8000': []} >>> processing = {'alice:8000': set(), 'bob:8000': set()} >>> who_has = {'a': {'alice:8000'}} >>> has_what = {'alice:8000': {'a'}} >>> nbytes = {'a': 100} >>> restrictions = {} >>> loose_restrictions = set()
We choose the worker that has the data on which ‘b’ depends (alice has ‘a’)
>>> decide_worker(dependencies, stacks, processing, who_has, has_what, ... restrictions, loose_restrictions, nbytes, 'b') 'alice:8000'
If both Alice and Bob have dependencies then we choose the less-busy worker
>>> who_has = {'a': {'alice:8000', 'bob:8000'}} >>> has_what = {'alice:8000': {'a'}, 'bob:8000': {'a'}} >>> decide_worker(dependencies, stacks, processing, who_has, has_what, ... restrictions, loose_restrictions, nbytes, 'b') 'bob:8000'
Optionally provide restrictions of where jobs are allowed to occur
>>> restrictions = {'b': {'alice', 'charlie'}} >>> decide_worker(dependencies, stacks, processing, who_has, has_what, ... restrictions, loose_restrictions, nbytes, 'b') 'alice:8000'
If the task requires data communication, then we choose to minimize the number of bytes sent between workers. This takes precedence over worker occupancy.
>>> dependencies = {'c': {'a', 'b'}} >>> who_has = {'a': {'alice:8000'}, 'b': {'bob:8000'}} >>> has_what = {'alice:8000': {'a'}, 'bob:8000': {'b'}} >>> nbytes = {'a': 1, 'b': 1000} >>> stacks = {'alice:8000': [], 'bob:8000': []}
>>> decide_worker(dependencies, stacks, processing, who_has, has_what, ... {}, set(), nbytes, 'c') 'bob:8000'