-
Notifications
You must be signed in to change notification settings - Fork 536
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Race condition when using AsyncTimeout #459
Comments
Just to clarify the desired behavior: It should never move from The issue happens if the coroutines are interleaved like this:
I understand the reason to shield the callback (the timer elapsed, and is no longer cancelable). However, I don't know how to resolve this race. |
Hi @rgov, really thoughtful bug report. Much appreciated! The last action of |
That's a good catch on the assumption. How confident are you that the async state machine does not have race conditions? It seems like the sequence 1-3 above could still happen, even if it is less common now. But I'm fairly new to Python coroutines, so I may miss how there's a guarantee of no race. It feels like what I really want is a synchronous state machine where the enter/leave callbacks fire asynchronously, as long as the ordering is guaranteed. |
As far as I understand async/await logic, there is no classic race condition as we know from threading since everthing runs in the same thread. However, every DEBUG:root:ENTERED WAITING
INFO:transitions.extensions.asyncio:Entered state waiting
DEBUG:transitions.extensions.asyncio:Executed callback after transition.
DEBUG:transitions.extensions.asyncio:Executed machine finalize callbacks
[1] DEBUG:transitions.extensions.asyncio:Timeout state waiting. Timeout triggered. <-- timeout triggered
[2] DEBUG:transitions.extensions.asyncio:Executed machine preparation callbacks before conditions. <--- but task was suspended and the new trigger is processed.
DEBUG:transitions.extensions.asyncio:Initiating transition from state waiting to state completed...
DEBUG:transitions.extensions.asyncio:Executed callbacks before conditions. So suspension after [1] needs to be prevented to prevent [2] here.
Technically, |
One way of making sure that every event is processed without interruption is to introduce locks in import asyncio
from transitions.extensions.asyncio import AsyncMachine, AsyncTimeout, AsyncEvent
from transitions.extensions.states import add_state_features
import logging
class LockedAsyncEvent(AsyncEvent):
async def trigger(self, _model, *args, **kwargs):
self.machine.locks[id(_model)] = self.machine.locks.get(id(_model), asyncio.Lock())
async with self.machine.locks[id(_model)]:
return await super(LockedAsyncEvent, self).trigger(_model, *args, **kwargs)
@add_state_features(AsyncTimeout)
class CustomMachine(AsyncMachine):
event_cls = LockedAsyncEvent
def __init__(self, *args, **kwargs):
self.locks = {}
super(CustomMachine, self).__init__(*args, **kwargs)
class Model:
async def on_enter_completed(self):
logging.debug("ENTERED COMPLETE")
async def on_enter_waiting(self):
logging.debug("ENTERED WAITING")
async def main():
model = Model()
states = [
'start',
{'name': 'waiting', 'timeout': 0.5, 'on_timeout': 'timeout_waiting'},
'completed',
]
transitions = [{'trigger': 'timeout_waiting',
'source': '*',
'dest': 'waiting',
'conditions': 'is_waiting'}]
machine = CustomMachine(model, states, transitions=transitions, initial='start')
machine.add_transition('wait', 'start', 'waiting')
machine.add_transition('complete', 'waiting', 'completed')
await model.wait()
await asyncio.sleep(0.5)
await model.complete()
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
loop.close() Ultimately, it might be better to get rid of the shielding in |
If you hold the lock during the execution of all the callbacks, then if a callback transitions to another state, you will deadlock, won't you? |
jupp, pretty much. Unfortunately, there is no asyncio rlock. The discussion I found were like "when you need it, you are not doing it right." You can avoid it by executing the event in a new task: import asyncio
from transitions.extensions.asyncio import AsyncMachine, AsyncTimeout, AsyncEvent
from transitions.extensions.states import add_state_features
import logging
class LockedAsyncEvent(AsyncEvent):
async def trigger(self, _model, *args, **kwargs):
self.machine.locks[id(_model)] = self.machine.locks.get(id(_model), asyncio.Lock())
async with self.machine.locks[id(_model)]:
return await super(LockedAsyncEvent, self).trigger(_model, *args, **kwargs)
@add_state_features(AsyncTimeout)
class CustomMachine(AsyncMachine):
event_cls = LockedAsyncEvent
def __init__(self, *args, **kwargs):
self.locks = {}
super(CustomMachine, self).__init__(*args, **kwargs)
class Model:
async def on_enter_waiting(self):
print("entered")
asyncio.create_task(self.to_completed()) # this would deadlock if awaited
async def main():
model = Model()
states = [
'start',
'waiting',
'completed',
]
machine = CustomMachine(model, states, initial='start')
await model.to_waiting()
print(model.state)
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main()) |
I investigated the process a bit further and the problem appears when timout AND new event trigger happen in the same context (no cancellation triggered) and both hit the asynchronous state exit functions at the same time. Unfortunately, The only simple solution I see here is to queue tasks by passing
|
Consider the following example:
The intended behavior is for it to loop on the
waiting
state a few times, then transition to thecompleted
state and stop. However, the output looks like this:It seems that the timer that transitions back to the
waiting
state does not get properly canceled when we transition to thecompleted
state.The text was updated successfully, but these errors were encountered: