diff --git a/data/config.yaml b/data/config.yaml index 1234fc7..69cda7f 100644 --- a/data/config.yaml +++ b/data/config.yaml @@ -20,6 +20,7 @@ services: package: lib.exec.Executor service: RQExecutor storage: pool + delay: 2 redis: host: redis GC: @@ -165,7 +166,7 @@ tasks: service: FTPListFilesTask ftp_filter_files: package: lib.plugin.iscan.tasks.ftp - service: FTPListFilesTask + service: FTPFilterFilesTask ftp_apply_tpl: package: lib.plugin.base.tasks.text service: Jinja2TemplateTask diff --git a/lib/exec/Executor.py b/lib/exec/Executor.py index 3458fee..079eaf3 100644 --- a/lib/exec/Executor.py +++ b/lib/exec/Executor.py @@ -22,7 +22,8 @@ class RQExecutor(Executor): def __run(self): redis_conn = Redis(host=self.lcnf.get('redis').get('host')) jobs = [] - + known_sources = {} + while self._running: sleep(self.lcnf.get('delay', 0.5)) try: @@ -48,7 +49,13 @@ class RQExecutor(Executor): items = self._data.get(block=False, count=count, filter=filter) # obtain everything else from source if len(items) < count: - source = Loader.by_id('storage', pipeline.get('source')) + source = None + source_id = pipeline.get('source') + if source_id in known_sources: + source = known_sources[source_id] + else: + source = Loader.by_id('storage', source_id) + known_sources[source_id] = source new_items = source.get(block=False, count=(count - len(items)), filter=filter) items.extend(new_items) source.remove(new_items) diff --git a/lib/exec/Task.py b/lib/exec/Task.py index 7137175..309b35a 100644 --- a/lib/exec/Task.py +++ b/lib/exec/Task.py @@ -8,21 +8,21 @@ class Task(Loadable): self._logger = Logger(self.__class__.__name__) def run(self, items): - result = [] - try: - result = self._run(items) - except Exception as e: - self._logger.debug("Error occured while executing: %s", e) + result = self._run(items) return result def _run(self, items): for item in items: - item['steps'][self._id] = self._process(item) + try: + item['steps'][self._id] = self._process(item) + except Exception as e: + self._logger.debug("Error occured while executing: %s", e) + item['steps'][self._id] = False return items - + def _process(self, item): return True def run(task_name, items): result = Loader.by_id('tasks', task_name).run(items) - return result \ No newline at end of file + return result diff --git a/lib/plugin/iscan/tasks/gopher.py b/lib/plugin/iscan/tasks/gopher.py index f1a575a..2a46ec9 100644 --- a/lib/plugin/iscan/tasks/gopher.py +++ b/lib/plugin/iscan/tasks/gopher.py @@ -27,7 +27,7 @@ class GopherFindTask(Task): # pylint: disable=too-few-public-methods response = self._recv(sock) sock.close() - self._logger.debug("Parsing result") + self._logger.debug("Parsing result: %s", response) item['data']['files'] = [] item['data']['filter'] = False for s in [s for s in response.split("\r\n") if s]: @@ -43,4 +43,5 @@ class GopherFindTask(Task): # pylint: disable=too-few-public-methods if not item['data']['files']: raise Exception("Empty server (not Gopher?)") - item['steps'][self._id] = True + + return True