from __future__ import print_function, division, absolute_import

from collections import defaultdict, deque
from datetime import timedelta
import logging
import uuid

from tornado import gen
import tornado.locks

from .client import _get_global_client
from .utils import log_errors
from .worker import get_worker

logger = logging.getLogger(__name__)

class LockExtension(object):
    """ An extension for the scheduler to manage Locks

    This adds the following routes to the scheduler

    *  lock_acquire
    *  lock_release

    def __init__(self, scheduler):
        self.scheduler = scheduler = defaultdict(deque)
        self.ids = dict()

        self.scheduler.handlers.update({'lock_acquire': self.acquire,
                                        'lock_release': self.release})

        self.scheduler.extensions['locks'] = self

    def acquire(self, stream=None, name=None, id=None, timeout=None):
        with log_errors():
            if isinstance(name, list):
                name = tuple(name)
            if name not in self.ids:
                result = True
                while name in self.ids:
                    event = tornado.locks.Event()
                    future = event.wait()
                    if timeout is not None:
                        future = gen.with_timeout(timedelta(seconds=timeout), future)
                        yield future
                    except gen.TimeoutError:
                        result = False
                        result = True
                        event2 =[name].popleft()
                        assert event is event2
            if result:
                assert name not in self.ids
                self.ids[name] = id
            raise gen.Return(result)

    def release(self, stream=None, name=None, id=None):
        with log_errors():
            if isinstance(name, list):
                name = tuple(name)
            if self.ids.get(name) != id:
                raise ValueError("This lock has not yet been acquired")
            del self.ids[name]

[docs]class Lock(object): """ Distributed Centralized Lock Parameters ---------- name: string Name of the lock to acquire. Choosing the same name allows two disconnected processes to coordinate a lock. Examples -------- >>> lock = Lock('x') # doctest: +SKIP >>> lock.acquire(timeout=1) # doctest: +SKIP >>> # do things with protected resource >>> lock.release() # doctest: +SKIP """ def __init__(self, name=None, client=None): self.client = client or _get_global_client() or get_worker().client = name or 'lock-' + uuid.uuid4().hex = uuid.uuid4().hex self._locked = False
[docs] def acquire(self, blocking=True, timeout=None): """ Acquire the lock Parameters ---------- blocking : bool, optional If false, don't wait on the lock in the scheduler at all. timeout : number, optional Seconds to wait on the lock in the scheduler. This does not include local coroutine time, network transfer time, etc.. It is forbidden to specify a timeout when blocking is false. Examples -------- >>> lock = Lock('x') # doctest: +SKIP >>> lock.acquire(timeout=1) # doctest: +SKIP Returns ------- True or False whether or not it sucessfully acquired the lock """ if not blocking: if timeout is not None: raise ValueError( "can't specify a timeout for a non-blocking call") timeout = 0 result = self.client.sync(self.client.scheduler.lock_acquire,,, timeout=timeout) self._locked = True return result
[docs] def release(self): """ Release the lock if already acquired """ if not self.locked(): raise ValueError("Lock is not yet acquired") result = self.client.sync(self.client.scheduler.lock_release,, self._locked = False return result
def locked(self): return self._locked def __enter__(self): self.acquire() return self def __exit__(self, *args, **kwargs): self.release() @gen.coroutine def __aenter__(self): yield self.acquire() raise gen.Return(self) @gen.coroutine def __aexit__(self, *args, **kwargs): yield self.release() def __reduce__(self): return (Lock, (,))