Issue
I have a main class which creates an instance from a list of plugins. In the example below, the 'dhcpv6' plugin is launched and it listens for subscriptions on the 'subject_test_subscribe" subject. As soon as the handler intercepts a new message on this subject, it creates an new asynchronous task by instantiating the classe "Dhcpv6_child().run()" which represents a child of the class Dhcpv6.py. Each child have a timeout to kill the process child. For information, the class plugin definied in init.py is an abstract class which allows to load plugins.
I'm not able to dynamically manage the addition of new tasks with asyncio.gather (until the current task is finished, the new task is not executed). The problem is solved with asyncio.create_subprocess_exec but I'd prefer to use the possibilities of asyncio
main.py
import argparse
import asyncio
import importlib
parser = argparse.ArgumentParser()
parser.add_argument('--nats', '-n', nargs="?", type=str, required=True, help="adresse IP du serveur NATS")
parser.add_argument('--plugins', '-p', nargs="+", help="Liste de plugins a utiliser")
args = parser.parse_args()
print("************** main() *****************")
print(f"NATS server IP: {args.nats}")
print(f"List of plugins to load: {args.plugins}")
async def main():
tasks = []
for type_plugin in args.plugins:
try:
module = importlib.import_module(f'Plugins.{type_plugin}')
my_class = getattr(module, type_plugin)
my_instance = my_class(nats=args.nats)
tasks.append(asyncio.create_task(my_instance.run()))
except Exception as e:
print("Erreur chargement du plugin : ", type_plugin, ":", e)
try:
await asyncio.gather(*tasks)
except asyncio.TimeoutError:
print("timeout main")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.create_task(main())
loop.run_forever()
except KeyboardInterrupt:
pass
except Exception as e:
print(e)
finally:
loop.close()
Dhcpv6.py
import asyncio
import json
import logging
import sys
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from Plugins import Plugin
from Plugins.Dhcpv6_child import Dhcpv6_child
logger = logging.getLogger("dhcpv6")
logging.basicConfig(level=logging.DEBUG)
class Dhcpv6(Plugin):
name = "Dhcpv6 plugin"
subject = "subject_test_subscribe"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.nats = kwargs.get('nats')
self.list_task = []
self.timeout_task = 5
print("Class dhcpv6 ==> constructor / nats_ip : {}".format(self.nats))
async def run(self):
print("******* run DHCPV6 ***********")
nc = NATS()
try:
await nc.connect("127.0.0.1", verbose=True, pedantic=True)
except ErrConnectionClosed:
print(f"La connexion a ete fermee inopinement")
return
except ErrTimeout:
print(f"Le delai imparti a la connexion est depasse")
return
except ErrNoServers:
print(f"Aucun serveur n'a repondu a temps")
return
except Exception as e:
print(f"Exception inattendue: {e}")
return
async def plugin_handler(msg):
print(msg)
try:
# Creating and running a new task on demand
self.list_task.append(asyncio.wait_for(
asyncio.create_task(Dhcpv6_child().run()), timeout=self.timeout_task))
print("append a new task : ", self.list_task)
except:
print("Error append new task")
try:
print("Running a new task : ", self.list_task)
await asyncio.wait_for(asyncio.gather(*self.list_task), timeout=3600.0)
except asyncio.TimeoutError:
print("Fin du main : timeout atteint")
print(f"Subscribing test on : {self.subject}")
await nc.subscribe(f"{self.subject}", cb=plugin_handler)
while nc.is_connected:
await asyncio.sleep(0.5)
await nc.drain()
Here is an output screen :
************** main() *****************
NATS server IP: 127.0.0.1
List of plugins to load: : ['Dhcpv6']
------------ Load a list of plugins : {Dhcpv6} -------
Class plugin ==> constructor
Class dhcpv6 ==> constructor / nats_ip : 127.0.0.1
******* run DHCPV6 ***********
INFO:Plugin:Plugin Dhcpv6 plugin loaded
Subscribing test on : subject_test_subscribe
<Msg: subject='subject_test_subscribe' reply='' data='{"query": ...'>
Class CHILD ==> constructor
append a new task : [<coroutine object wait_for at 0x0000026D0310F040>]
Runnig a new task : [<coroutine object wait_for at 0x0000026D0310F040>] ```
Solution
Ok Cornelius and thank for yours advices. You're right, asyncio.create_task() allows to create task on demand. If I want to process one task after another, we need to add asyncio.wait(self.list_task) but it's not my purpose. Here is how i create a new child in "Dhcpv6.py". For information, the main() routine, called by asyncio.create_task(), is imported from the child script "Dhcpv6_child.py" :
if not mac in self.list_mac:
self.list_task.append(asyncio.create_task(main(self.nats,
self.min_inventory['name'],
self.min_inventory,
str(datetime.now()),
str(datetime.now() + self.timeout))))
self.date_start.append(datetime.now())
self.date_end.append(datetime.now() + self.timeout)
self.lb_name.append(self.min_inventory['name'])
self.list_mac.append(mac)
# logger.info(f"{self.lb_name} {task.get_name()} created")
await nc.subscribe(f"call.dhcpv6.{self.min_inventory['name']}", cb=cancel_tasks)
else :
logger.warning(f"[{__name__}] Task adress %s already in list_mac", mac)
Finally, as I have problems to correctly manage the list of tasks, I associated a time counter for each child (as described above in the main routine). When a task is finished, the child send a message to the parent which updates the current task list.
Answered By - tperrot
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.