Issue
When I use aiologger, I have to write await logger
many times.
For example,
import asyncio
from aiologger import Logger
async def main():
logger = Logger.with_default_handlers(name='my-logger')
await logger.debug("debug at stdout")
await logger.info("info at stdout")
await logger.warning("warning at stderr")
await logger.error("error at stderr")
await logger.critical("critical at stderr")
await logger.shutdown()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
It would be great if I could write something like al
instead of await logger
.
Solution
Disclaimer: I've written about this -- https://coxley.org/logging/#logging-over-the-network
Please don't accept a logging interface like this.
You can't avoid using await
to yield the event loop. You just can't. But you can leverage existing features to do I/O outside of the main thread and still use asyncio. You just start a second event loop in that thread.
Example
I don't like to recommend third-party libs in answers, but janus.Queue
is important here. Makes it easier to bridge between non-asyncio writers (eg: Log Handler) and asyncio readers (the flusher).
Note 1: If you don't actually need asyncio-compatible I/O from the flusher, use stdlib queue.Queue
, remove the async-closure, and get rid of the second loop.
Note 2: This example has both an unbounded queue and does I/O for every message. Add an interval and/or message threshold for flushing to be production-ready. Depending on your system, decide whether you accept memory growth for log bursts, drop logs, or block the main code-path.
import asyncio
import logging
import time
import threading
import typing as t
# pip install --user janus
import janus
LOG = logging.getLogger(__name__)
# Queue must be created within the event loop it will be used from. Start as
# None since this will not be the main thread.
_QUEUE: t.Optional[janus.Queue] = None
class IOHandler(logging.Handler):
def __init__(self, *args, **kwargs):
# This is set from the flusher thread
global _QUEUE
while _QUEUE is None:
time.sleep(0.01)
self.q = _QUEUE.sync_q
super().__init__(*args, **kwargs)
def emit(self, record: logging.LogRecord):
self.q.put(record)
def flusher():
async def run():
global _QUEUE
if _QUEUE is None:
_QUEUE = janus.Queue()
# Upload record instead of print
# Perhaps flush every n-seconds w/ buffer for upper-bound on inserts.
q = _QUEUE.async_q
while True:
record = await q.get()
print("woohoo, doing i/o:", record.msg)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run())
def foo():
print("foo")
def bar():
print("bar")
async def baz():
await asyncio.sleep(1)
print("baz")
async def main():
threading.Thread(target=flusher, daemon=True).start()
LOG.setLevel(logging.INFO)
LOG.addHandler(IOHandler())
foo()
LOG.info("starting program")
LOG.info("doing some stuff")
LOG.info("mighty cool")
bar()
await baz()
if __name__ == "__main__":
asyncio.run(main())
Answered By - coxley
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.