Active Memory Manager
Contents
Active Memory Manager¶
The Active Memory Manager, or AMM, is an experimental daemon that optimizes memory usage of workers across the Dask cluster. It is enabled by default but can be disabled/configured. See Enabling the Active Memory Manager for details.
Memory imbalance and duplication¶
Whenever a Dask task returns data, it is stored on the worker that executed the task for
as long as it’s a dependency of other tasks, is referenced by a
Client
through a Future
, or is part of a
published dataset.
Dask assigns tasks to workers following criteria of CPU occupancy, Worker Resources, and locality. In the trivial use case of tasks that are not connected to each other, take the same time to compute, return data of the same size, and have no resource constraints, one will observe a perfect balance in memory occupation across workers too. In all other use cases, however, as the computation goes it could cause an imbalance in memory usage.
When a task runs on a worker and requires in input the output of a task from a different worker, Dask will transparently transfer the data between workers, ending up with multiple copies of the same data on different workers. This is generally desirable, as it avoids re-transferring the data if it’s required again later on. However, it also causes increased overall memory usage across the cluster.
Enabling the Active Memory Manager¶
The AMM is enabled by default. It can be disabled or tweaked through the Dask configuration file:
distributed:
scheduler:
active-memory-manager:
start: true
interval: 2s
measure: optimistic
The above is the recommended setup and will run all enabled AMM policies (see below)
every two seconds. Alternatively, you can manually start/stop the AMM from the
Client
or trigger a one-off iteration:
>>> client.amm.start() # Start running every 2 seconds
>>> client.amm.stop() # Stop running periodically
>>> client.amm.running()
False
>>> client.amm.run_once()
Policies¶
The AMM by itself doesn’t do anything. The user must enable policies which suggest actions regarding Dask data. The AMM runs the policies and enacts their suggestions, as long as they don’t harm data integrity. These suggestions can be of two types:
Replicate the data of an in-memory Dask task from one worker to another. This should not be confused with replication caused by task dependencies.
Delete one or more replicas of an in-memory task. The AMM will never delete the last replica of a task, even if a policy asks to.
There are no “move” operations. A move is performed in two passes: first a policy creates a copy; in the next AMM iteration, the same or another policy deletes the original (if the copy succeeded).
Unless a policy puts constraints on which workers should be impacted, the AMM will automatically create replicas on workers with the lowest memory usage first and delete them from workers with the highest memory usage first.
Individual policies are enabled, disabled, and configured through the Dask config:
distributed:
scheduler:
active-memory-manager:
start: true
interval: 2s
measure: optimistic
policies:
- class: distributed.active_memory_manager.ReduceReplicas
- class: my_package.MyPolicy
arg1: foo
arg2: bar
See below for custom policies like the one in the example above.
The default Dask config file contains a sane selection of builtin policies that should
be generally desirable. You should try first with just start: true
in your Dask
config and see if it is fit for purpose for you before you tweak individual policies.
Built-in policies¶
ReduceReplicas¶
- class
- parameters
None
This policy is enabled in the default Dask config. Whenever a Dask task is replicated on more than one worker and the additional replicas don’t appear to serve an ongoing computation, this policy drops all excess replicas.
Note
This policy is incompatible with replicate()
and with the
broadcast=True
parameter of scatter()
. If you invoke
replicate()
to create additional replicas and then later
run this policy, it will delete all replicas but one (but not necessarily the new
ones).
RetireWorker¶
- class
- parameters
- addressstr
The address of the worker being retired.
This is a special policy, which should never appear in the Dask configuration file.
It is injected on the fly by distributed.Client.retire_workers()
and whenever
an adaptive cluster is being scaled down.
This policy supervises moving all tasks, that are in memory exclusively on the worker
being retired, to different workers. Once the worker does not uniquely hold the data for
any task, this policy uninstalls itself automatically from the Active Memory Manager and
the worker is shut down.
If multiple workers are being retired at the same time, there will be multiple instances of this policy installed in the AMM.
If the Active Memory Manager is disabled, distributed.Client.retire_workers()
and
adaptive scaling will start a temporary one, install this policy into it, and then shut
it down once it’s finished.
Custom policies¶
Power users can write their own policies by subclassing
ActiveMemoryManagerPolicy
. The class should
define two methods:
__init__
A custom policy may load parameters from the Dask config through
__init__
parameters. If you don’t need configuration, you don’t need to implement this method.run
This method accepts no parameters and is invoked by the AMM every 2 seconds (or whatever the AMM interval is). It must yield zero or more of the following
Suggestion
namedtuples:yield Suggestion("replicate", <TaskState>)
Create one replica of the target task on the worker with the lowest memory usage that doesn’t hold a replica yet. To create more than one replica, you need to yield the same command more than once.
yield Suggestion("replicate", <TaskState>, {<WorkerState>, <WorkerState>, ...})
Create one replica of the target task on the worker with the lowest memory among the listed candidates.
yield Suggestion("drop", <TaskState>)
Delete one replica of the target task on the worker with the highest memory usage across the whole cluster.
yield Suggestion("drop", <TaskState>, {<WorkerState>, <WorkerState>, ...})
Delete one replica of the target task on the worker with the highest memory among the listed candidates.
The AMM will silently reject unacceptable suggestions, such as:
Delete the last replica of a task
Delete a replica from a subset of workers that don’t hold any
Delete a replica from a worker that currently needs it for computation
Replicate a task that is not yet in memory
Create more replicas of a task than there are workers
Create replicas of a task on workers that already hold them
Create replicas on paused or retiring workers
It is generally a good idea to design policies to be as simple as possible and let the AMM take care of the edge cases above by ignoring some of the suggestions.
Optionally, the
run
method may retrieve which worker the AMM just selected, as follows:ws = (yield Suggestion("drop", ts))
The run
method can access the following attributes:
self.manager
The
ActiveMemoryManagerExtension
that the policy is attached toself.manager.scheduler
Scheduler
to which the suggestions will be applied. From there you can access various attributes such astasks
andworkers
.self.manager.workers_memory
Read-only mapping of
{WorkerState: bytes}
. bytes is the expected RAM usage of the worker after all suggestions accepted so far in the current AMM iteration, from all policies, will be enacted. Note that you don’t need to access this if you are happy to always create/delete replicas on the workers with the lowest and highest memory usage respectively - the AMM will handle it for you.self.manager.pending
Read-only mapping of
{TaskState: ({<WorkerState>, ...}, {<WorkerState>, ...})
. The first set contains the workers that will receive a new replica of the task according to the suggestions accepted so far; the second set contains the workers which will lose a replica.self.manager.policies
Set of policies registered in the AMM. A policy can deregister itself as follows:
def run(self): self.manager.policies.drop(self)
Example¶
The following custom policy ensures that keys “foo” and “bar” are replicated on all
workers at all times. New workers will receive a replica soon after connecting to the
scheduler. The policy will do nothing if the target keys are not in memory somewhere or
if all workers already hold a replica. Note that this example is incompatible with the
ReduceReplicas
built-in policy.
In mymodule.py (it must be accessible by the scheduler):
from distributed.active_memory_manager import ActiveMemoryManagerPolicy, Suggestion
class EnsureBroadcast(ActiveMemoryManagerPolicy):
def __init__(self, key):
self.key = key
def run(self):
ts = self.manager.scheduler.tasks.get(self.key)
if not ts:
return
for _ in range(len(self.manager.scheduler.workers) - len(ts.who_has)):
yield Suggestion("replicate", ts)
Note that the policy doesn’t bother testing for edge cases such as paused workers or other policies also requesting replicas; the AMM takes care of it. In theory you could rewrite the last two lines as follows (at the cost of some wasted CPU cycles):
for _ in range(1000):
yield Suggestion("replicate", ts)
In distributed.yaml:
distributed:
scheduler:
active-memory-manager:
start: true
interval: 2s
policies:
- class: mymodule.EnsureBroadcast
key: foo
- class: mymodule.EnsureBroadcast
key: bar
We could have alternatively used a single policy instance with a list of keys - the above design merely illustrates that you may have multiple instances of the same policy running side by side.
API reference¶
- class distributed.active_memory_manager.ActiveMemoryManagerExtension(scheduler: Scheduler, policies: set[ActiveMemoryManagerPolicy] | None = None, *, measure: str | None = None, register: bool = True, start: bool | None = None, interval: float | None = None)[source]¶
Scheduler extension that optimizes memory usage across the cluster. It can be either triggered by hand or automatically every few seconds; at every iteration it performs one or both of the following:
create new replicas of in-memory tasks
destroy replicas of in-memory tasks; this never destroys the last available copy.
There are no ‘move’ operations. A move is performed in two passes: first you create a copy and, in the next iteration, you delete the original (if the copy succeeded).
This extension is configured by the dask config section
distributed.scheduler.active-memory-manager
.- amm_handler(method: str) Any [source]¶
Scheduler handler, invoked from the Client by
AMMClientProxy
- measure: str¶
Memory measure to use. Must be one of the attributes or properties of
distributed.scheduler.MemoryState
.
- pending: dict[TaskState, tuple[set[WorkerState], set[WorkerState]]]¶
Pending replications and deletions for each task This attribute only exist within the scope of self.run().
- policies: set[ActiveMemoryManagerPolicy]¶
All active policies
- run_once() None [source]¶
Run all policies once and asynchronously (fire and forget) enact their recommendations to replicate/drop tasks
- workers_memory: dict[WorkerState, int]¶
Current memory (in bytes) allocated on each worker, plus/minus pending actions This attribute only exist within the scope of self.run().
- class distributed.active_memory_manager.ActiveMemoryManagerPolicy[source]¶
Abstract parent class
- abstract run() SuggestionGenerator [source]¶
This method is invoked by the ActiveMemoryManager every few seconds, or whenever the user invokes
client.amm.run_once
.It is an iterator that must emit
Suggestion
objects:Suggestion("replicate", <TaskState>)
Suggestion("replicate", <TaskState>, {subset of potential workers to replicate to})
Suggeston("drop", <TaskState>)
Suggestion("drop", <TaskState>, {subset of potential workers to drop from})
Each element yielded indicates the desire to create or destroy a single replica of a key. If a subset of workers is not provided, it defaults to all workers on the cluster. Either the ActiveMemoryManager or the Worker may later decide to disregard the request, e.g. because it would delete the last copy of a key or because the key is currently needed on that worker.
You may optionally retrieve which worker it was decided the key will be replicated to or dropped from, as follows:
choice = (yield Suggestion("replicate", ts))
choice
is either a WorkerState or None; the latter is returned if the ActiveMemoryManager chose to disregard the request.The current pending (accepted) suggestions can be inspected on
self.manager.pending
; this includes the suggestions previously yielded by this same method.The current memory usage on each worker, downstream of all pending suggestions, can be inspected on
self.manager.workers_memory
.
- class distributed.active_memory_manager.Suggestion(op, ts, candidates)[source]¶
- candidates: set[WorkerState] | None¶
Alias for field number 2
- op: Literal['replicate', 'drop']¶
Alias for field number 0
- class distributed.active_memory_manager.AMMClientProxy(client: Client)[source]¶
Convenience accessors to operate the AMM from the dask client
Usage:
client.amm.start()
etc.All methods are asynchronous if the client is asynchronous and synchronous if the client is synchronous.
- class distributed.active_memory_manager.ReduceReplicas[source]¶
Make sure that in-memory tasks are not replicated on more workers than desired; drop the excess replicas.
- class distributed.active_memory_manager.RetireWorker(address: str)[source]¶
Replicate somewhere else all unique in-memory tasks on a worker, preparing for its shutdown.
At any given time, the AMM may have registered multiple instances of this policy, one for each worker currently being retired - meaning that most of the time no instances will be registered at all. For this reason, this policy doesn’t figure in the dask config (
distributed.yaml
). Instances are added bydistributed.Scheduler.retire_workers()
and automatically remove themselves once the worker has been retired. If the AMM is disabled in the dask config,retire_workers()
will start a temporary ad-hoc one.Failure condition
There may not be any suitable workers to receive the tasks from the retiring worker. This happens in two use cases:
This is the only worker in the cluster, or
All workers are either paused or being retired at the same time
In either case, this policy will fail to move out all keys and set the
no_recipients
boolean to True.retire_workers()
will abort the retirement.There is a third use case, where a task fails to be replicated away for whatever reason, e.g. because its recipient is unresponsive but the Scheduler doesn’t know yet. In this case we’ll just wait for the next AMM iteration and try again (possibly with a different receiving worker, e.g. if the receiving worker was hung but not yet declared dead).
Retiring a worker with spilled tasks
On its very first iteration, this policy suggests that other workers should fetch all unique in-memory tasks of the retiring worker. Frequently, this means that in the next few moments the retiring worker will be bombarded by
distributed.worker.Worker.get_data()
calls from the rest of the cluster. This can be a problem if most of the managed memory of the worker has been spilled out, as it could send the worker above its terminate threshold. Two measures are in place in order to prevent this:At every iteration, this policy drops all tasks on the retiring worker that have already been replicated somewhere else. This makes room for further tasks to be moved out of the spill file in order to be replicated onto another worker.
Once the worker passes the
pause
threshold,get_data()
throttles the number of outgoing connections to 1.
- Parameters
- address: str
URI of the worker to be retired