How to await a select.select call in python asyncio

I have a python 3.6 program where I am using the asyncio package event loops. One of my data sources comes from an api which was not build around asyncio. My connection object contains a member called _connection which is just a python socket. Right now I can use this in a select statement to tell when data is ready.

async def run(self):
    while True:
        if select.select([self._q._connection], [], [])[0]:
            msg = self._q.receive()
            print(msg)

What I would really like is...

async def run(self):
    while True:
        if await select.select([self._q._connection], [], [])[0]:
            msg = self._q.receive()
            print(msg)

I know there is a sock_recv function in the asyncio event loop however I need the api to do the actual reading and decoding. I tried this but it would just fall through the await which I guess makes sense since I said 0 bytes.

async def run(self):
    while True:
        print('A')
        await asyncio.get_event_loop().sock_recv(self._q._connection, 0)
        print('B')
        msg = self._q.receive()
        print(msg)

The only solution I can think of for now is to add a small timeout to the select and then call asyncio.sleep while there is no data but this seems like an inefficent approach. I wish there was something like asyncio.select. Do anyone want to recommend another approach?

EDIT: Right now I have come up with this. I don't like it because it adds an extra quarter second latency (probably doesn't matter much for my application but it still bugs me.)

async def run(self):
    while True:
        if select.select([self._q._connection], [], [], 0)[0]:
           print(self._q.receive())
        else:
            await asyncio.sleep(0.25)

1 answer

  • answered 2018-01-14 10:16 Vincent

    You could use loop.add_reader to wait for the read availability of your socket:

    async def watch(fd):
        future = asyncio.Future()
        loop.add_reader(fd, future.set_result, None)
        future.add_done_callback(lambda f: loop.remove_reader(fd))
        await future
    
    async def run(self):
        while True:
            await watch(self._q._connection)
            msg = self._q.receive()
            print(msg)
    

    However, it'll be very tricky to avoid all the blocking IO calls of the library you mentioned without rewriting it completely. Instead, I'd recommend to use the loop.run_in_executor method to schedule the blocking IO calls in a thread pool:

    async def run(self):
        loop = asyncio.get_event_loop()
        while True:
            msg = await loop.run_in_executor(None, self._q.receive)
            print(msg)