Typo fix; gopher plugin fix

This commit is contained in:
Good Evening 2018-10-10 00:14:55 +03:00
parent 5148691174
commit 999c644869
4 changed files with 22 additions and 13 deletions

View File

@ -20,6 +20,7 @@ services:
package: lib.exec.Executor package: lib.exec.Executor
service: RQExecutor service: RQExecutor
storage: pool storage: pool
delay: 2
redis: redis:
host: redis host: redis
GC: GC:
@ -165,7 +166,7 @@ tasks:
service: FTPListFilesTask service: FTPListFilesTask
ftp_filter_files: ftp_filter_files:
package: lib.plugin.iscan.tasks.ftp package: lib.plugin.iscan.tasks.ftp
service: FTPListFilesTask service: FTPFilterFilesTask
ftp_apply_tpl: ftp_apply_tpl:
package: lib.plugin.base.tasks.text package: lib.plugin.base.tasks.text
service: Jinja2TemplateTask service: Jinja2TemplateTask

View File

@ -22,7 +22,8 @@ class RQExecutor(Executor):
def __run(self): def __run(self):
redis_conn = Redis(host=self.lcnf.get('redis').get('host')) redis_conn = Redis(host=self.lcnf.get('redis').get('host'))
jobs = [] jobs = []
known_sources = {}
while self._running: while self._running:
sleep(self.lcnf.get('delay', 0.5)) sleep(self.lcnf.get('delay', 0.5))
try: try:
@ -48,7 +49,13 @@ class RQExecutor(Executor):
items = self._data.get(block=False, count=count, filter=filter) items = self._data.get(block=False, count=count, filter=filter)
# obtain everything else from source # obtain everything else from source
if len(items) < count: if len(items) < count:
source = Loader.by_id('storage', pipeline.get('source')) source = None
source_id = pipeline.get('source')
if source_id in known_sources:
source = known_sources[source_id]
else:
source = Loader.by_id('storage', source_id)
known_sources[source_id] = source
new_items = source.get(block=False, count=(count - len(items)), filter=filter) new_items = source.get(block=False, count=(count - len(items)), filter=filter)
items.extend(new_items) items.extend(new_items)
source.remove(new_items) source.remove(new_items)

View File

@ -8,21 +8,21 @@ class Task(Loadable):
self._logger = Logger(self.__class__.__name__) self._logger = Logger(self.__class__.__name__)
def run(self, items): def run(self, items):
result = [] result = self._run(items)
try:
result = self._run(items)
except Exception as e:
self._logger.debug("Error occured while executing: %s", e)
return result return result
def _run(self, items): def _run(self, items):
for item in items: for item in items:
item['steps'][self._id] = self._process(item) try:
item['steps'][self._id] = self._process(item)
except Exception as e:
self._logger.debug("Error occured while executing: %s", e)
item['steps'][self._id] = False
return items return items
def _process(self, item): def _process(self, item):
return True return True
def run(task_name, items): def run(task_name, items):
result = Loader.by_id('tasks', task_name).run(items) result = Loader.by_id('tasks', task_name).run(items)
return result return result

View File

@ -27,7 +27,7 @@ class GopherFindTask(Task): # pylint: disable=too-few-public-methods
response = self._recv(sock) response = self._recv(sock)
sock.close() sock.close()
self._logger.debug("Parsing result") self._logger.debug("Parsing result: %s", response)
item['data']['files'] = [] item['data']['files'] = []
item['data']['filter'] = False item['data']['filter'] = False
for s in [s for s in response.split("\r\n") if s]: for s in [s for s in response.split("\r\n") if s]:
@ -43,4 +43,5 @@ class GopherFindTask(Task): # pylint: disable=too-few-public-methods
if not item['data']['files']: if not item['data']['files']:
raise Exception("Empty server (not Gopher?)") raise Exception("Empty server (not Gopher?)")
item['steps'][self._id] = True
return True