Skip to content Skip to sidebar Skip to footer

Synchronous Blocking Of Multiple Resources

Abstract situation. We have 2 sheeps we can asynchronously use at time (Semaphore(2)) and 1 gate we can use at time. We want to spend sheep through gate 2 times (each time we need

Solution 1:

You could implement an asynchronous context manager that handles multiple locks. This object should make sure it doesn't hold any lock while waiting for another non-available lock:

classmultilock(asyncio.locks._ContextManagerMixin):

    def__init__(self, *locks):
        self.released = list(locks)
        self.acquired = []

    asyncdefacquire(self):
        while self.released:
            lock = self.released.pop()
            if lock.locked():
                self.release()
            await lock.acquire()
            self.acquired.append(lock)

    defrelease(self):
        while self.acquired:
            lock = self.acquired.pop()
            lock.release()
            self.released.append(lock)

Example:

asyncdeftest(lock1, lock2):
    asyncwith multilock(lock1, lock2):
        print('Do something')

Solution 2:

Based on this solution I created solution for this example. We need two things:

  1. add locked() function to Sheep and Gate, that's checking if object can be acquired right now

  2. add and use new MultiAcquire task that would acquire objects only if it all can be acquired right now (and suspend for release event otherwise)

Here's final code, see MultiAcquire - it's main:

import asyncio


classSheep:
    _sem = asyncio.Semaphore(2)  # we have 2 avaliable sheeps at timedef__init__(self, reason):
        self._reason = reason

    asyncdefacquire(self):
        awaittype(self)._sem.acquire()
        print('acquire sheep ({})'.format(self._reason))

    defrelease(self):
        print('release sheep ({})'.format(self._reason))
        type(self)._sem.release()

    deflocked(self):
        returntype(self)._sem.locked()


classGate:
    _sem = asyncio.Semaphore(1)  # we have 1 avaliable gate at timedef__init__(self, reason):
        self._reason = reason

    asyncdefacquire(self):
        awaittype(self)._sem.acquire()
        print('acquire gate ({})'.format(self._reason))

    defrelease(self):
        print('release gate ({})'.format(self._reason))
        type(self)._sem.release()

    deflocked(self):
        returntype(self)._sem.locked()


classMultiAcquire(asyncio.Task):
    _check_lock = asyncio.Lock()  # to suspend for creating task that acquires objects
    _release_event = asyncio.Event()  # to suspend for any object was releaseddef__init__(self, locks):
        super().__init__(self._task_coro())
        self._locks = locks
        # Here we use decorator to subscribe all release() calls,# _release_event would be set in this case:for l in self._locks:
            l.release = self._notify(l.release)

    asyncdef_task_coro(self):
        whileTrue:
            # Create task to acquire all locks and break on success:asyncwithtype(self)._check_lock:
                ifnotany(l.locked() for l in self._locks):  # task would be created only if all objects can be acquired
                    task = asyncio.gather(*[l.acquire() for l in self._locks])  # create task to acquire all objects await asyncio.sleep(0)  # start task without waiting for itbreak# Wait for any release() to try again:awaittype(self)._release_event.wait()
        # Wait for task:returnawait task

    def_notify(self, func):
        defwrapper(*args, **kwargs):
            type(self)._release_event.set()
            type(self)._release_event.clear()
            return func(*args, **kwargs)
        return wrapper


asyncdefspend(reason):
    sheep = Sheep(reason)
    gate = Gate(reason)
    await MultiAcquire([sheep, gate])  # block 1 sheep, 1 gateawait asyncio.sleep(1)  # 1 secondprint('Spend sheep through a gate')
    sheep.release()
    gate.release()


asyncdeffeed(reason):
    sheep = Sheep(reason)
    await MultiAcquire([sheep])  # block 1 sheepawait asyncio.sleep(2)  # 2 secondsprint('Feed sheep')
    sheep.release()


asyncdefmain():
    await asyncio.gather(
        spend('spend 1'),
        feed('feed 1'),
        spend('spend 2')
    )  # spend 2 times, feed 1 timeif __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

acquire gate (spend 2)
acquire sheep (spend 2)
acquire sheep (feed 1)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
acquire sheep (spend 1)
acquire gate (spend 1)
Feed sheep
release sheep (feed 1)
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
[Finished in 2.2s]

Post a Comment for "Synchronous Blocking Of Multiple Resources"