Source code for distributed.asyncio

"""Experimental interface for asyncio, may disappear without warning"""

# flake8: noqa

import asyncio
from functools import wraps

from toolz import merge

from tornado.platform.asyncio import BaseAsyncIOLoop
from tornado.platform.asyncio import to_asyncio_future

from . import client
from .client import Client, Future
from .variable import Variable
from .utils import ignoring


def to_asyncio(fn, **default_kwargs):
    """Converts Tornado gen.coroutines and futures to asyncio ones"""
    @wraps(fn)
    def convert(*args, **kwargs):
        if default_kwargs:
            kwargs = merge(default_kwargs, kwargs)
        return to_asyncio_future(fn(*args, **kwargs))
    return convert


[docs]class AioClient(Client): """ Connect to and drive computation on a distributed Dask cluster This class provides an asyncio compatible async/await interface for dask.distributed. The Client connects users to a dask.distributed compute cluster. It provides an asynchronous user interface around functions and futures. This class resembles executors in ``concurrent.futures`` but also allows ``Future`` objects within ``submit/map`` calls. AioClient is an **experimental** interface for distributed and may disappear without warning! Parameters ---------- address: string, or Cluster This can be the address of a ``Scheduler`` server like a string ``'127.0.0.1:8786'`` or a cluster object like ``LocalCluster()`` Examples -------- Provide cluster's scheduler address on initialization:: client = AioClient('127.0.0.1:8786') Start the client:: async def start_the_client(): client = await AioClient() # Use the client.... await client.close() An ``async with`` statement is a more convenient way to start and shut down the client:: async def start_the_client(): async with AioClient() as client: # Use the client within this block. pass Use the ``submit`` method to send individual computations to the cluster, and await the returned future to retrieve the result:: async def add_two_numbers(): async with AioClient() as client: a = client.submit(add, 1, 2) result = await a Continue using submit or map on results to build up larger computations, and gather results with the ``gather`` method:: async def gather_some_results(): async with AioClient() as client: a = client.submit(add, 1, 2) b = client.submit(add, 10, 20) c = client.submit(add, a, b) result = await client.gather([c]) See Also -------- distributed.client.Client: Blocking Client distributed.scheduler.Scheduler: Internal scheduler """ def __init__(self, *args, **kwargs): loop = asyncio.get_event_loop() ioloop = BaseAsyncIOLoop(loop) super().__init__(*args, loop=ioloop, asynchronous=True, **kwargs) def __enter__(self): raise RuntimeError("Use AioClient in an 'async with' block, not 'with'") async def __aenter__(self): await to_asyncio_future(self._started) return self async def __aexit__(self, type, value, traceback): await to_asyncio_future(self._close()) def __await__(self): return to_asyncio_future(self._started).__await__() get = to_asyncio(Client.get, sync=False) sync = to_asyncio(Client.sync) close = to_asyncio(Client.close) shutdown = to_asyncio(Client.shutdown)
class as_completed(client.as_completed): __anext__ = to_asyncio(client.as_completed.__anext__) wait = to_asyncio(client._wait)