Source code for distributed.deploy.local

from __future__ import annotations

import logging
import math
import warnings

import toolz

from dask.system import CPU_COUNT
from dask.widgets import get_template

from distributed.deploy.spec import SpecCluster
from distributed.deploy.utils import nprocesses_nthreads
from distributed.nanny import Nanny
from distributed.scheduler import Scheduler
from distributed.security import Security
from distributed.worker import Worker
from distributed.worker_memory import parse_memory_limit

logger = logging.getLogger(__name__)


[docs]class LocalCluster(SpecCluster): """Create local Scheduler and Workers This creates a "cluster" of a scheduler and workers running on the local machine. Parameters ---------- n_workers: int Number of workers to start memory_limit: str, float, int, or None, default "auto" Sets the memory limit *per worker*. Notes regarding argument data type: * If None or 0, no limit is applied. * If "auto", the total system memory is split evenly between the workers. * If a float, that fraction of the system memory is used *per worker*. * If a string giving a number of bytes (like ``"1GiB"``), that amount is used *per worker*. * If an int, that number of bytes is used *per worker*. Note that the limit will only be enforced when ``processes=True``, and the limit is only enforced on a best-effort basis — it's still possible for workers to exceed this limit. processes: bool Whether to use processes (True) or threads (False). Defaults to True, unless worker_class=Worker, in which case it defaults to False. threads_per_worker: int Number of threads per each worker scheduler_port: int Port of the scheduler. Use 0 to choose a random port (default). 8786 is a common choice. silence_logs: logging level Level of logs to print out to stdout. ``logging.WARN`` by default. Use a falsey value like False or None for no change. host: string Host address on which the scheduler will listen, defaults to only localhost ip: string Deprecated. See ``host`` above. dashboard_address: str Address on which to listen for the Bokeh diagnostics server like 'localhost:8787' or '0.0.0.0:8787'. Defaults to ':8787'. Set to ``None`` to disable the dashboard. Use ':0' for a random port. When specifying only a port like ':8787', the dashboard will bind to the given interface from the ``host`` parameter. If ``host`` is empty, binding will occur on all interfaces '0.0.0.0'. To avoid firewall issues when deploying locally, set ``host`` to 'localhost'. worker_dashboard_address: str Address on which to listen for the Bokeh worker diagnostics server like 'localhost:8787' or '0.0.0.0:8787'. Defaults to None which disables the dashboard. Use ':0' for a random port. diagnostics_port: int Deprecated. See dashboard_address. asynchronous: bool (False by default) Set to True if using this cluster within async/await functions or within Tornado gen.coroutines. This should remain False for normal use. blocked_handlers: List[str] A list of strings specifying a blocklist of handlers to disallow on the Scheduler, like ``['feed', 'run_function']`` service_kwargs: Dict[str, Dict] Extra keywords to hand to the running services security : Security or bool, optional Configures communication security in this cluster. Can be a security object, or True. If True, temporary self-signed credentials will be created automatically. protocol: str (optional) Protocol to use like ``tcp://``, ``tls://``, ``inproc://`` This defaults to sensible choice given other keyword arguments like ``processes`` and ``security`` interface: str (optional) Network interface to use. Defaults to lo/localhost worker_class: Worker Worker class used to instantiate workers from. Defaults to Worker if processes=False and Nanny if processes=True or omitted. **worker_kwargs: Extra worker arguments. Any additional keyword arguments will be passed to the ``Worker`` class constructor. Examples -------- >>> cluster = LocalCluster() # Create a local cluster # doctest: +SKIP >>> cluster # doctest: +SKIP LocalCluster("127.0.0.1:8786", workers=8, threads=8) >>> c = Client(cluster) # connect to local cluster # doctest: +SKIP Scale the cluster to three workers >>> cluster.scale(3) # doctest: +SKIP Pass extra keyword arguments to Bokeh >>> LocalCluster(service_kwargs={'dashboard': {'prefix': '/foo'}}) # doctest: +SKIP """ def __init__( self, name=None, n_workers=None, threads_per_worker=None, processes=None, loop=None, start=None, host=None, ip=None, scheduler_port=0, silence_logs=logging.WARN, dashboard_address=":8787", worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, interface=None, worker_class=None, scheduler_kwargs=None, scheduler_sync_interval=1, **worker_kwargs, ): if ip is not None: # In the future we should warn users about this move # warnings.warn("The ip keyword has been moved to host") host = ip if diagnostics_port is not None: warnings.warn( "diagnostics_port has been deprecated. " "Please use `dashboard_address=` instead" ) dashboard_address = diagnostics_port if threads_per_worker == 0: warnings.warn( "Setting `threads_per_worker` to 0 has been deprecated. " "Please set to None or to a specific int." ) threads_per_worker = None if "dashboard" in worker_kwargs: warnings.warn( "Setting `dashboard` is discouraged. " "Please set `dashboard_address` to affect the scheduler (more common) " "and `worker_dashboard_address` for the worker (less common)." ) if processes is None: processes = worker_class is None or issubclass(worker_class, Nanny) if worker_class is None: worker_class = Nanny if processes else Worker self.status = None self.processes = processes if security is None: # Falsey values load the default configuration security = Security() elif security is True: # True indicates self-signed temporary credentials should be used security = Security.temporary() elif not isinstance(security, Security): raise TypeError("security must be a Security object") if protocol is None: if host and "://" in host: protocol = host.split("://")[0] elif security and security.require_encryption: protocol = "tls://" elif not self.processes and not scheduler_port: protocol = "inproc://" else: protocol = "tcp://" if not protocol.endswith("://"): protocol = protocol + "://" if host is None and not protocol.startswith("inproc") and not interface: host = "127.0.0.1" services = services or {} worker_services = worker_services or {} if n_workers is None and threads_per_worker is None: if processes: n_workers, threads_per_worker = nprocesses_nthreads() else: n_workers = 1 threads_per_worker = CPU_COUNT if n_workers is None and threads_per_worker is not None: n_workers = max(1, CPU_COUNT // threads_per_worker) if processes else 1 if n_workers and threads_per_worker is None: # Overcommit threads per worker, rather than undercommit threads_per_worker = max(1, int(math.ceil(CPU_COUNT / n_workers))) if n_workers and "memory_limit" not in worker_kwargs: worker_kwargs["memory_limit"] = parse_memory_limit( "auto", 1, n_workers, logger=logger ) worker_kwargs.update( { "host": host, "nthreads": threads_per_worker, "services": worker_services, "dashboard_address": worker_dashboard_address, "dashboard": worker_dashboard_address is not None, "interface": interface, "protocol": protocol, "security": security, "silence_logs": silence_logs, } ) scheduler = { "cls": Scheduler, "options": toolz.merge( dict( host=host, services=services, service_kwargs=service_kwargs, security=security, port=scheduler_port, interface=interface, protocol=protocol, dashboard=dashboard_address is not None, dashboard_address=dashboard_address, blocked_handlers=blocked_handlers, ), scheduler_kwargs or {}, ), } worker = {"cls": worker_class, "options": worker_kwargs} workers = {i: worker for i in range(n_workers)} super().__init__( name=name, scheduler=scheduler, workers=workers, worker=worker, loop=loop, asynchronous=asynchronous, silence_logs=silence_logs, security=security, scheduler_sync_interval=scheduler_sync_interval, ) def start_worker(self, *args, **kwargs): raise NotImplementedError( "The `cluster.start_worker` function has been removed. " "Please see the `cluster.scale` method instead." ) def _repr_html_(self, cluster_status=None): cluster_status = get_template("local_cluster.html.j2").render( status=self.status.name, processes=self.processes, cluster_status=cluster_status, ) return super()._repr_html_(cluster_status=cluster_status)