Synchronous Blocking Of Multiple Resources
Abstract situation. We have 2 sheeps we can asynchronously use at time (Semaphore(2)) and 1 gate we can use at time. We want to spend sheep through gate 2 times (each time we need
Solution 1:
You could implement an asynchronous context manager that handles multiple locks. This object should make sure it doesn't hold any lock while waiting for another non-available lock:
classmultilock(asyncio.locks._ContextManagerMixin):
def__init__(self, *locks):
self.released = list(locks)
self.acquired = []
asyncdefacquire(self):
while self.released:
lock = self.released.pop()
if lock.locked():
self.release()
await lock.acquire()
self.acquired.append(lock)
defrelease(self):
while self.acquired:
lock = self.acquired.pop()
lock.release()
self.released.append(lock)
Example:
asyncdeftest(lock1, lock2):
asyncwith multilock(lock1, lock2):
print('Do something')
Solution 2:
Based on this solution I created solution for this example. We need two things:
add
locked()
function toSheep
andGate
, that's checking if object can be acquired right nowadd and use new
MultiAcquire
task that would acquire objects only if it all can be acquired right now (and suspend for release event otherwise)
Here's final code, see MultiAcquire
- it's main:
import asyncio
classSheep:
_sem = asyncio.Semaphore(2) # we have 2 avaliable sheeps at timedef__init__(self, reason):
self._reason = reason
asyncdefacquire(self):
awaittype(self)._sem.acquire()
print('acquire sheep ({})'.format(self._reason))
defrelease(self):
print('release sheep ({})'.format(self._reason))
type(self)._sem.release()
deflocked(self):
returntype(self)._sem.locked()
classGate:
_sem = asyncio.Semaphore(1) # we have 1 avaliable gate at timedef__init__(self, reason):
self._reason = reason
asyncdefacquire(self):
awaittype(self)._sem.acquire()
print('acquire gate ({})'.format(self._reason))
defrelease(self):
print('release gate ({})'.format(self._reason))
type(self)._sem.release()
deflocked(self):
returntype(self)._sem.locked()
classMultiAcquire(asyncio.Task):
_check_lock = asyncio.Lock() # to suspend for creating task that acquires objects
_release_event = asyncio.Event() # to suspend for any object was releaseddef__init__(self, locks):
super().__init__(self._task_coro())
self._locks = locks
# Here we use decorator to subscribe all release() calls,# _release_event would be set in this case:for l in self._locks:
l.release = self._notify(l.release)
asyncdef_task_coro(self):
whileTrue:
# Create task to acquire all locks and break on success:asyncwithtype(self)._check_lock:
ifnotany(l.locked() for l in self._locks): # task would be created only if all objects can be acquired
task = asyncio.gather(*[l.acquire() for l in self._locks]) # create task to acquire all objects await asyncio.sleep(0) # start task without waiting for itbreak# Wait for any release() to try again:awaittype(self)._release_event.wait()
# Wait for task:returnawait task
def_notify(self, func):
defwrapper(*args, **kwargs):
type(self)._release_event.set()
type(self)._release_event.clear()
return func(*args, **kwargs)
return wrapper
asyncdefspend(reason):
sheep = Sheep(reason)
gate = Gate(reason)
await MultiAcquire([sheep, gate]) # block 1 sheep, 1 gateawait asyncio.sleep(1) # 1 secondprint('Spend sheep through a gate')
sheep.release()
gate.release()
asyncdeffeed(reason):
sheep = Sheep(reason)
await MultiAcquire([sheep]) # block 1 sheepawait asyncio.sleep(2) # 2 secondsprint('Feed sheep')
sheep.release()
asyncdefmain():
await asyncio.gather(
spend('spend 1'),
feed('feed 1'),
spend('spend 2')
) # spend 2 times, feed 1 timeif __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Output:
acquire gate (spend 2)
acquire sheep (spend 2)
acquire sheep (feed 1)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
acquire sheep (spend 1)
acquire gate (spend 1)
Feed sheep
release sheep (feed 1)
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
[Finished in 2.2s]
Post a Comment for "Synchronous Blocking Of Multiple Resources"