Running Python async function
Lately I was playing with asyncio and aiohttp in python 3.5.2. First time I hit the wall was when I was trying to test async methods:
First of all I did not know how to call say_hi() directly without event loop. Reading docs didn't really help me. I already knew I could do:
But I was more interested how asyncio event loop triggers this method by itself. So I decided to dig into asyncio source code.
Summary
I've found two ways to call async say_hi() without event loop.
Send something to coroutine:
Wrap coroutine in Future:
BaseEventLoop
Turns out on Debian asyncio.get_event_loop() returns UnixSelectEventLoop. It extends some other event loops:
UnixSelectEventLoop --> BaseSelectorEventLoop --> BaseEventLoop --> AbstractEventLoop
BaseEventLoop is the one that interests me, because it implements run_until_complete() method.
run_until_complete() calls self.run_forever() which calls self._run_once().
What _run_once() essentially does is it gets the handle from the task queue and calls handle._run():
class BaseEventLoop(events.AbstractEventLoop): def __init__(self): self._ready = collections.deque() def _run_once(self): handle = self._ready.popleft() handle._run()
handle type is asyncio.events.Handle. It's simplified version is:
class Handle: def __init__(self, callback, args, loop): self._loop = loop self._callback = callback self._args = args def _run(self): try: self._callback(*self._args) except Exception as exc: # ...
Now this is where the call chain started from run_until_complete() stops. And it's clear to me that I have to find the spot where handles are created and added to the event_loop._ready queue (or dequeu to be precise).
How my async function is turned into Handle?
Let's go back to loop.run_until_complete(). Simplified version could be read as:
def run_until_complete(self, future): future = tasks.ensure_future(future, loop=self) try: self.run_forever() except: # ... return future.result()
Ok, let's investigate tasks.ensure_future(). In my case it does:
Which by itself does not do a lot:
But Task constructor calls something interesting:
def __init__(self, coro, *, loop=None): super().__init__(loop=loop) self._coro = coro self._loop.call_soon(self._step)
And Task._step() method is even more interesting:
def _step(self, exc=None): # ... coro = self._coro # ... try: result = coro.send(None) except: # ... return result
AHA, seems like coro.send(None) triggers the async function. I quickly test it and indeed it works :) :
Anyway, let's keep hunting how this coroutine (my async function) is turned into Handle...
The next interesting function in Task contructor is loop.call_soon(). It simply calls self._call_soon(). Which does the rest:
def _call_soon(self, callback, args): if (coroutines.iscoroutine(callback) or coroutines.iscoroutinefunction(callback)): raise TypeError("coroutines cannot be used with call_soon()") self._check_closed() handle = events.Handle(callback, args, self) if handle._source_traceback: del handle._source_traceback[-1] self._ready.append(handle) return handle
It wraps callback (our async function) into Handle and appends it to the loop._ready queue.
So the call graph is something like this:
loop.run_until_complete(future) | V tasks.ensure_future(future, loop) | V loop.create_task(future) | V task.Task(future, loop) | V loop.call_soon(task._step) | V loop._call_soon(task._step) | V loop._ready.append(Handle(task._step))
Comments