medved/lib/exeq/Executor.py

64 lines
2.4 KiB
Python
Raw Normal View History

2018-04-02 22:41:10 +00:00
from lib import Service, Loader, Loadable
from time import sleep
2018-04-02 22:41:10 +00:00
from rq import Queue
from redis import Redis
class Executor(Service):
2018-04-02 23:47:50 +00:00
"""Base class for executors"""
2018-04-02 22:41:10 +00:00
def __init__(self, thread, id, root):
super().__init__(thread, id, root)
self._logger.add_field('service', 'Executor')
self._logger.add_field('vname', self.__class__.__name__)
class RQExecutor(Executor):
2018-08-02 17:42:09 +00:00
"""rq (redis queue) executor"""
2018-04-02 22:41:10 +00:00
def __init__(self, id, root):
super().__init__(self.__run, id, root)
def __run(self):
redis_conn = Redis(host=self.lcnf.get('redis').get('host'))
jobs = []
2018-08-02 19:11:11 +00:00
2018-04-02 22:41:10 +00:00
while self._running:
2018-08-02 19:11:11 +00:00
sleep(self.lcnf.get('delay', 0.5))
2018-04-02 22:41:10 +00:00
try:
for pn, pipeline in self.cnf.get("pipelines").items():
2018-08-02 17:42:09 +00:00
if pn not in self.cnf.get('core').get('pipelines'):
continue
for step in pipeline['steps']:
q = Queue(step.get('priority', 'normal'), connection=redis_conn)
2018-08-02 17:42:09 +00:00
for job_id in jobs:
job = q.fetch_job(job_id)
if job:
if job.result is not None:
self._logger.debug("%s|%s", job_id, job._status)
self._data.update(job.result)
job.cleanup()
jobs.remove(job_id)
if len(jobs) + 1 > self.lcnf.get('qsize', 200):
continue
filter = {"steps.%s" % step['task']: {'$exists': False}}
filter.update({key: value for key, value in step.get("if", {}).items()})
2018-08-02 17:42:09 +00:00
count = step.get('parallel', 1)
# get as much as possible from own pool
items = self._data.get(block=False, count=count, filter=filter)
# obtain everything else from source
if len(items) < count:
2018-08-02 19:11:11 +00:00
source = Loader.by_id('storage', pipeline.get('source'))
2018-08-02 17:42:09 +00:00
new_items = source.get(block=False, count=(count - len(items)), filter=filter)
items.extend(new_items)
source.remove(new_items)
if items:
2018-08-02 19:11:11 +00:00
for i in items:
i['steps'][step['task']] = None
self._data.update(items)
job = q.enqueue("lib.exeq.Task.run", step['task'], items)
2018-08-02 17:42:09 +00:00
self._logger.info("%s|%s|%s|%s", job.id, step.get('priority', 'normal'), step['task'], len(items))
jobs.append(job.id)
2018-04-02 22:41:10 +00:00
except Exception as e:
self._logger.error("Error in executor main thread: %s", e)