diff --git a/Config.py b/Config.py index 117f34c..3008191 100644 --- a/Config.py +++ b/Config.py @@ -1,33 +1,8 @@ #!/usr/bin/python3 # -*- coding: utf-8 -*- -import json import yaml -import logging -from collections import namedtuple - - -def namedtupledict(*a, **kw): - nt = namedtuple(*a, **kw) - - def getitem(self, key): - try: - if type(key) == str: - return getattr(self, key) - return tuple.__getitem__(self, key) - except Exception: - raise Exception("Could not get %s" % key) - - def ntiter(self): - for name in self._fields: - yield name, getattr(self, name) - - nt.__iter__ = ntiter - nt.__getitem__ = getitem - - return nt cnf = {} -# dafuq with open('data/config.yaml') as config_file: - cnf = json.loads(json.dumps(yaml.load(config_file)), object_hook=lambda d: namedtupledict('X', d.keys())(*d.values())) \ No newline at end of file + cnf = yaml.load(config_file) \ No newline at end of file diff --git a/README.md b/README.md index fd2e49b..1592c1c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,145 @@ # medved +alpha beta whatever +partly works -to setup, update token and posting channels in docker/confd/templates/config.json.tmpl. you may also choose which plugins to use and tune their options. +## roadmap +refactor README and lib/plugin/plugins/* +cleanup * + +probably Listener shoul be part of Core in order to supervise everything + +tasks don't store results currently. need to implement some validation of performed tasks (at least in Executor thread before spreading new tasks) +http://python-rq.org/docs/results/ + +## configuration + +`data/config.yaml` +``` +# lists top-level services (like listeners, executors, data-manager) and pipelines enabled +core: + services: # should point to correct service + - data_manager + - rq_executor + pipelines: + - ftp + +# describes top-level services and their configurations +services: + data_manager: + # REQUIRED package name, just like path to file with dots + package: lib.data.Manager + # REQUIRED class inherited from Service (lib.Service) + service: DataManager + # REQUIRED used to select one of storages + data: + id: pool + # there can be more service-specific configuration fields + # for now they can be found in code :) + sources: + - random_ip + feeds: + - test_telegram + rq_executor: + package: lib.exeq.Executor + service: RQExecutor + data: + id: pool + redis: + host: "127.0.0.1" + +# describes datasources for data_manager +sources: + random_ip: + package: lib.plugin.base.lib.IP + service: RandomIP + data: + id: random_ip + +# describes datafeeds for data_manager +feeds: + test_telegram: # doesn't work yet, eh + package: lib.plugin.base.lib.Telegram + service: TelegramFeed + data: + id: pool + token: + chats: + - id: good_evening + pipelines: [ftp, gopher] + filter: + clause: any-of + equal: + - ftp_list_files_status: success + - gopher_collect_status: success + +# describes various storages, e.g. data pool for pipelines or queues for datasources +storage: + pool: + # REQUIRED + package: lib.plugin.base.lib.Mongo + service: MongoStorage + size: 40960 + # service-specific + db: "medved" + coll: 'pool' + random_ip: + package: lib.plugin.base.lib.Mongo + service: MongoStorage + size: 500 + db: "medved" + coll: 'randomipsource' + +# describes available pipelines +pipelines: + ftp: + # list of steps with dependencies + steps: + # will pass 10 items to lib.plugin.iscan.tasks.common.scan + - name: scan + package: lib.plugin.iscan.tasks.common + service: scan + multiple: 10 # default: False + requires: [] + # will pass 1 item marked with ftp_scan to lib.plugin.iscan.tasks.ftp.connect + - name: connect + package: lib.plugin.iscan.tasks.ftp + service: connect + requires: + - ftp_scan + - name: list_files + package: lib.plugin.iscan.tasks.ftp + service: list_files + requires: + - ftp_connect + +# various configurations for tasks +tasks: + ftp_scan: + ports: + - 21 + ftp_connect: + logins: data/ftp/logins.txt + passwords: data/ftp/passwords.txt + bruteforce: true + timeout: 15 + ftp_list_files: + +logging: + Storage: INFO +``` +probably it can be launched with docker, however I didn't test it yet run `make base && docker-compose up --build --scale worker=5` + +or simply `python medved.py` + +you'll need working redis and mongodb for default configuration + +## top-level services + +### lib.data.Manager.DataManager +Orchestrates datasources and datafeeds - starts and stops them, also checks pool size. If it is too low - takes data from DS. +### lib.exeq.Executor.RQExecutor +Should run pipelines described in configuration. Works via [RedisQueue](http://python-rq.org/), so needs some Redis up and running +Basically takes data from pool and submits it to workers. +RQ workers should be launched separately (`rqworker worker` from code root) diff --git a/bin/config-linter.py b/bin/config-linter.py deleted file mode 100644 index b1c25d1..0000000 --- a/bin/config-linter.py +++ /dev/null @@ -1,46 +0,0 @@ -import json, yaml -import sys, os -import importlib - -sys.path.append(os.path.join(os.path.dirname(__file__), '..')) - -from Config import namedtupledict -from lib import Logger, Loader - -logger = Logger('Linter') -logger.info('Reading config') - -with open('data/config.yaml', 'r') as config: - data = yaml.load(config) - -cnf = json.loads(json.dumps(data), object_hook=lambda d: namedtupledict('X', d.keys())(*d.values())) - -logger.info("Config: \n%s", json.dumps(data, indent=2)) - -logger.info('CORE') -services_check = ["datamgr", "listener", "executor"] -services = [] -logger.info("Checking services: %s", ", ".join(services)) -for s, sv in cnf.core: - services.append(s) -print(services) -for s in services_check: - if not s in services: - logger.error("Service %s is not defined!", s) - else: - logger.info("Service %s is defined, continuing", s) - for k, v in cnf.core[s]: - if k == 'package': - try: - importlib.import_module(v) - except Exception as e: - logger.error("Unable to load package %s!\n%s", v, e) - else: - logger.info("Package %s was imported successfully", v) - elif k == 'service': - try: - Loader(cnf.core[s]['package']).get(v) - except Exception as e: - logger.error("Unable to load package %s!\n%s", v, e) - else: - logger.info("Service %s was imported successfully", v) diff --git a/data/config.yaml b/data/config.yaml index 0e00494..8bf4c6e 100644 --- a/data/config.yaml +++ b/data/config.yaml @@ -1,8 +1,11 @@ -Core: +--- +dsl_version: 1 + +core: services: - - data_manager -# - zmq_listener -# - rq_executor + - data_manager +# - zmq_listener + - rq_executor pipelines: - ftp @@ -11,13 +14,12 @@ services: data_manager: package: lib.data.Manager service: DataManager - oneshot: 500 data: id: pool sources: - - random_ip + - random_ip feeds: - - test_telegram + - test_telegram zmq_listener: package: lib.net.Listener service: ZMQListener @@ -40,8 +42,6 @@ sources: service: RandomIP data: id: random_ip - oneshot: 100 - delay: 0.5 feeds: @@ -57,9 +57,8 @@ feeds: filter: clause: any-of equal: - - ftp_list_files_status: success - - gopher_collect_status: success - delay: 10 + - ftp_list_files_status: success + - gopher_collect_status: success storage: @@ -67,14 +66,12 @@ storage: package: lib.plugin.base.lib.Mongo service: MongoStorage size: 40960 - url: "127.0.0.1" db: "medved" coll: 'pool' random_ip: package: lib.plugin.base.lib.Mongo service: MongoStorage size: 500 - url: "127.0.0.1" db: "medved" coll: 'randomipsource' @@ -82,32 +79,34 @@ storage: pipelines: ftp: steps: - - name: scan - package: lib.plugin.iscan.tasks.common - service: scan - multiple: 200 - requires: [] - - name: connect - package: lib.plugin.iscan.tasks.ftp - service: connect - multiple: False - requires: [ - ftp_scan - ] - - name: list_files - package: lib.plugin.iscan.tasks.ftp - service: list_files - multiple: False - requires: [ - ftp_connect - ] + - name: scan + package: lib.plugin.iscan.tasks.common + service: scan + multiple: 10 + requires: [] + - name: connect + package: lib.plugin.iscan.tasks.ftp + service: connect + multiple: False + requires: + - ftp_scan + - name: list_files + package: lib.plugin.iscan.tasks.ftp + service: list_files + multiple: False + requires: + - ftp_connect tasks: ftp_scan: - ports: [21] + ports: + - 21 ftp_connect: logins: data/ftp/logins.txt passwords: data/ftp/passwords.txt bruteforce: true timeout: 15 ftp_list_files: + +logging: + Storage: INFO \ No newline at end of file diff --git a/lib/Service.py b/lib/Service.py index 542895b..52db01f 100644 --- a/lib/Service.py +++ b/lib/Service.py @@ -9,7 +9,7 @@ class Service(Loadable): def __init__(self, thread, id, root=cnf): super().__init__(id, root) - self._data = Loader.by_id('storage', self.lcnf.data.id) + self._data = Loader.by_id('storage', self.lcnf.get("data").get("id")) self._stop_timeout = 10 self._running = False diff --git a/lib/data/Manager.py b/lib/data/Manager.py index 7fcbff3..b068d85 100644 --- a/lib/data/Manager.py +++ b/lib/data/Manager.py @@ -9,10 +9,10 @@ class DataManager(Service): self._logger.add_field('service', 'DataManager') self.sources = {} - for s in self.lcnf.sources: + for s in self.lcnf.get("sources"): self.attach_source(s) self.feeds = {} - for f in self.lcnf.feeds: + for f in self.lcnf.get("feeds"): self.attach_feed(f) def _pre_start(self): @@ -46,12 +46,13 @@ class DataManager(Service): return self.feeds.get(name) def __run(self): + oneshot = self.lcnf.get("oneshot", 500) while self._running: - if self._data.count() < self.lcnf.oneshot: - while self._running and (self._data.count() + self.lcnf.oneshot < self._data.size()): + if self._data.count() < oneshot: + while self._running and (self._data.count() + oneshot < self._data.size()): self._logger.debug("fill %s OF %s", self._data.count(), self._data.size()) for _,source in self.sources.items(): - items = source.next(count=self.lcnf.oneshot) + items = source.next(count=oneshot) if items: self._data.put(items) sleep(1) diff --git a/lib/data/Storage.py b/lib/data/Storage.py index 8c565a9..f28475c 100644 --- a/lib/data/Storage.py +++ b/lib/data/Storage.py @@ -7,7 +7,7 @@ class Storage(Loadable): def __init__(self, id, root): super().__init__(id, root) - self._size = self.lcnf.size + self._size = self.lcnf.get("size") self._logger = Logger("Storage") self._logger.add_field('vname', self.__class__.__name__) diff --git a/lib/exeq/Executor.py b/lib/exeq/Executor.py index 4c88896..cf42d07 100644 --- a/lib/exeq/Executor.py +++ b/lib/exeq/Executor.py @@ -2,6 +2,8 @@ from lib import Service, Loader, Loadable from lib.tasks.worker import worker +from time import sleep + from rq import Queue from redis import Redis @@ -22,24 +24,31 @@ class RQExecutor(Executor): def __run(self): while self._running: try: - #TODO - for pn, pipeline in self.cnf.pipelines: + redis_conn = Redis(host=self.lcnf.get('redis').get('host')) + q = Queue('worker', connection=redis_conn) + if q.count + 1 > self.lcnf.get('size', 100): + sleep(self.lcnf.get('delay', 2)) + continue + for pn, pipeline in self.cnf.get("pipelines").items(): self._logger.debug("pipeline: %s", pn) - for step in pipeline.steps: - self._logger.debug("step: %s", step.name) - redis_conn = Redis(host=self.lcnf.redis.host) - q = Queue('worker', connection=redis_conn) + for step in pipeline['steps']: + self._logger.debug("step: %s", step['name']) filter = { "not_exist": [ - pn + '_' + step.name - ] + pn + '_' + step['name'] + ], + "exist": [ + [tag for tag in step.get("requires")] + ] } items = [] - if step.multiple != False: - items = self._data.get(count=step.multiple, filter=filter) + multiple = step.get('multiple', False) + if multiple != False: + items = self._data.get(block=False, count=multiple, filter=filter) else: - items = self._data.get(filter=filter) - for i in items: - q.enqueue(Loader(step.package).get(step.service), i) + items = self._data.get(block=False, filter=filter) + if items: + self._logger.debug("enqueueing %s.%s with %s", step['package'], step['service'], items) + q.enqueue("%s.%s" % (step['package'], step['service']), items) except Exception as e: self._logger.error(e) diff --git a/lib/net/Listener.py b/lib/net/Listener.py index 3ce80f4..d9075bb 100644 --- a/lib/net/Listener.py +++ b/lib/net/Listener.py @@ -23,7 +23,7 @@ class ZMQListener(Listener): self._running = True self._z_ctx = zmq.Context() self._z_sck = self._z_ctx.socket(zmq.REP) - self._z_sck.bind("tcp://%s:%s" % (self.lcnf.listen, self.lcnf.port)) + self._z_sck.bind("tcp://%s:%s" % (self.lcnf.get('listen', '127.0.0.1'), self.lcnf.get('port', 12321))) def _post_stop(self): self._z_ctx.destroy() diff --git a/lib/plugin/base/lib/IP.py b/lib/plugin/base/lib/IP.py index 9c6ffcd..b49a14c 100644 --- a/lib/plugin/base/lib/IP.py +++ b/lib/plugin/base/lib/IP.py @@ -31,7 +31,7 @@ class IPRange(IPSource): def load_ip_range(self): ip_range = [] - with open(self.lcnf.path, "r") as text: + with open(self.lcnf.get('path'), "r") as text: for line in text: try: diap = line.split('-') @@ -42,13 +42,14 @@ class IPRange(IPSource): except Exception as e: raise Exception("Error while adding range {}: {}".format(line, e)) self._iprange = ip_range + def __run(self): npos = 0 apos = 0 while self._running: try: - for _ in itertools.repeat(None, self.lcnf.oneshot): - if self.lcnf.ordered: + for _ in itertools.repeat(None, self.lcnf.get('oneshot', 100)): + if self.lcnf.get('ordered', True): # put currently selected element self._data.put(str(self._iprange[npos][apos])) # rotate next element through networks and addresses @@ -59,13 +60,13 @@ class IPRange(IPSource): if npos + 1 < len(self._iprange): npos += 1 else: - if self.lcnf.repeat: + if self.lcnf.get('repeat', True): npos = 0 else: self.stop() else: self._data.put(str(random.choice(random.choice(self._iprange)))) - sleep(self.lcnf.delay) + sleep(self.lcnf.get('delay', 0.5)) except Exception as e: self._logger.warn(e) @@ -78,10 +79,10 @@ class RandomIP(IPSource): while self._running: try: items = [] - for _ in itertools.repeat(None, self.lcnf.oneshot): + for _ in itertools.repeat(None, self.lcnf.get("oneshot", 100)): randomip = socket.inet_ntoa(struct.pack('>I', random.randint(1, 0xffffffff))) items.append(self.item(str(randomip))) self._data.put(items) - sleep(self.lcnf.delay) + sleep(self.lcnf.get("delay", 0.5)) except Exception as e: self._logger.warn(e) diff --git a/lib/plugin/base/lib/Mongo.py b/lib/plugin/base/lib/Mongo.py index 3660383..0cac822 100644 --- a/lib/plugin/base/lib/Mongo.py +++ b/lib/plugin/base/lib/Mongo.py @@ -6,19 +6,35 @@ class MongoStorage(Storage): """Mongo storage. Currently the only working correctly.""" def __init__(self, id, root): super().__init__(id, root) - self._client = MongoClient(self.lcnf.url) - self._db = self._client.get_database(self.lcnf.db) - self._coll = self._db.get_collection(self.lcnf.coll) - self._logger.add_field("db", self.lcnf.db, 7) - self._logger.add_field("coll", self.lcnf.coll, 7) + self._client = MongoClient(self.lcnf.get("url", "127.0.0.1")) + self._db = self._client.get_database(self.lcnf.get("db")) + self._coll = self._db.get_collection(self.lcnf.get("coll")) + self._logger.add_field("db", self.lcnf.get("db"), 7) + self._logger.add_field("coll", self.lcnf.get("coll"), 7) self._logger.debug("Connecting") def count(self): return self._coll.count() def _get(self, block, filter): + # TODO cleanup dat BS if filter is None: filter = {} + ne_tags = {} + e_tags = {} + if filter.get('not_exist'): + tags = [] + for ne in filter.get('not_exist'): + tags.append(ne) + ne_tags = {'tags': {'$not': {'$all': tags}}} + del filter['not_exist'] + if filter.get('exist'): + tags = [] + for e in filter.get('exist'): + tags.append(e) + e_tags = {'tags': {'$all': tags}} + del filter['exist'] + filter = {'$and': [ne_tags, e_tags]} item = self._coll.find_one_and_delete(filter=filter) if block: while not item: @@ -29,14 +45,14 @@ class MongoStorage(Storage): def _put(self, item, block): if block: - while self.count() + 1 >= self.size(): + while self.count() + 1 > self.size(): self._logger.debug('Collection full: %s of %s', self.count(), self.size()) sleep(1) self._coll.insert_one(item) def _put_many(self, items, block): if block: - while self.count() + len(items) >= self.size(): + while self.count() + len(items) > self.size(): self._logger.debug('Collection full: %s of %s', self.count(), self.size()) sleep(1) self._coll.insert_many(items) diff --git a/lib/plugin/base/lib/Remote.py b/lib/plugin/base/lib/Remote.py index 1df0e2e..56b2b3e 100644 --- a/lib/plugin/base/lib/Remote.py +++ b/lib/plugin/base/lib/Remote.py @@ -10,14 +10,14 @@ class RemoteFeed(Feed): def __run(self): while self._running: try: - remote = Remote(self.lcnf.ip, self.lcnf.port) + remote = Remote(self.lcnf.get('ip'), self.lcnf.get('port')) remote.connect() items = self.get(5) if items: self._logger.info("Sending %s items" % len(items)) remote.run('data.put', {'items': items}) - sleep(self.lcnf.delay) + sleep(self.lcnf.get('delay', 10)) except Exception as e: self._logger.warn(e) @@ -27,11 +27,11 @@ class RemoteSource(Source): super().__init__(self.__run, id, root) def __run(self): - remote = Remote(self.lcnf.ip, self.lcnf.port) + remote = Remote(self.lcnf.get('ip'), self.lcnf.get('port')) remote.connect() while self._running: - self._logger.debug("Requesting from %s", self.lcnf.ip) - rep = remote.run('data.get', {'count': self.lcnf.oneshot}) + self._logger.debug("Requesting from %s", self.lcnf.get('ip')) + rep = remote.run('data.get', {'count': self.lcnf.get('oneshot', 100)}) if rep.get('status'): targets = rep.get("data") else: @@ -39,4 +39,4 @@ class RemoteSource(Source): self._logger.debug("Got %s items", len(targets)) for t in targets: self._data.put(t) - sleep(self.lcnf.delay) \ No newline at end of file + sleep(self.lcnf.get('delay', 10)) \ No newline at end of file diff --git a/lib/plugin/base/lib/Telegram.py b/lib/plugin/base/lib/Telegram.py index 35b8338..ffd7e25 100644 --- a/lib/plugin/base/lib/Telegram.py +++ b/lib/plugin/base/lib/Telegram.py @@ -12,11 +12,12 @@ class TelegramFeed(Feed): super().__init__(self.__run, id, root) def __run(self): + delay = self.lcnf.get("delay", 2) while self._running: try: - for chat in self.lcnf.chats: - chat_id = chat.id - sleep(self.lcnf.delay) + for chat in self.lcnf.get("chats"): + chat_id = chat['id'] + sleep(delay) continue # plugins -> pipelines # it is in progress @@ -24,7 +25,7 @@ class TelegramFeed(Feed): msg = Manager.get_plugin(plugin).Plugin.TelegramMessage(host) msg.run() if msg.data['txt']: - tbot = telebot.TeleBot(self.lcnf.token, threaded=False) + tbot = telebot.TeleBot(self.lcnf.get('token'), threaded=False) if msg.data['img']: self._logger.debug("Send IP with img %s:%s to %s" % (host['ip'], host['port'], chat_id)) tbot.send_photo("@" + chat_id, msg.data['img'], caption=msg.data['txt']) @@ -33,6 +34,6 @@ class TelegramFeed(Feed): tbot.send_message("@" + chat_id, msg.data['txt']) else: self._logger.error('Empty text!') - sleep(self.lcnf.delay) + sleep(delay) except Exception as e: self._logger.warn(e) diff --git a/lib/plugin/iscan/tasks/common.py b/lib/plugin/iscan/tasks/common.py index 1a42930..07cd998 100644 --- a/lib/plugin/iscan/tasks/common.py +++ b/lib/plugin/iscan/tasks/common.py @@ -30,11 +30,13 @@ class MasScan: result = parser.loads(out) return result -def scan(items, taskid): - gi = GeoIP.open(cnf.geoip_dat, GeoIP.GEOIP_INDEX_CACHE | GeoIP.GEOIP_CHECK_CACHE) +def scan(items): + gi = GeoIP.open(cnf.get("geoip_dat", "/usr/share/GeoIP/GeoIP.dat"), GeoIP.GEOIP_INDEX_CACHE | GeoIP.GEOIP_CHECK_CACHE) logger.debug("Starting scan") ms = MasScan() - hosts = ms.scan(ip_list=[i['data']['ip'] for i in items], port_list=cnf.Tasks[taskid].ports) + hosts = ms.scan(ip_list=[i['data']['ip'] for i in items], + port_list=cnf.get("tasks").get('ftp_scan').get("ports")) + logger.debug(hosts) for h in hosts: for port in h['ports']: host = { diff --git a/lib/plugin/plugins/README b/lib/plugin/plugins/README index 697c45e..305d454 100644 --- a/lib/plugin/plugins/README +++ b/lib/plugin/plugins/README @@ -1,4 +1 @@ -plugins formed in old plugin format - -however, currently there is no other format - +plugins formed in old plugin format \ No newline at end of file diff --git a/lib/tasks/worker.py b/lib/tasks/worker.py index 1f18157..9ccf515 100644 --- a/lib/tasks/worker.py +++ b/lib/tasks/worker.py @@ -1,5 +1,6 @@ from lib.plugin import Manager # legacy +# why legacy? def worker(host, plugin): p = Manager.get_plugin(plugin) p.Plugin.Pipeline(host, plugin) diff --git a/lib/util.py b/lib/util.py index d1c6fb1..4d35e8a 100644 --- a/lib/util.py +++ b/lib/util.py @@ -15,7 +15,7 @@ class Logger(logging.Logger): self._update() def get_level(self, name): - return logging.DEBUG + return logging.getLevelName(config.get("logging").get(name, "DEBUG")) def add_field(self, name, default, size=3): if not name in self._lf_extra: @@ -84,27 +84,17 @@ class Loader: def __init__(self, path): self._path = path self._name = path.split('.')[-1] - self._dir = ".".join(path.split('.')[:-1]) # shiiiet + # TODO remove # self._dir = ".".join(path.split('.')[:-1]) self._logger = Logger('Loader') self._logger.add_field('path', self._path) def get(self, name): - sys.path.append( os.path.join( os.path.dirname( __file__ ), '..' )) + sys.path.append(os.path.join( os.path.dirname( __file__ ), '..' )) self._logger.debug('load %s', name) result = importlib.import_module(self._path) return getattr(result, name) @classmethod def by_id(cls, section, id): - l = cls(config[section][id].package) - return l.get(config[section][id].service)(id=id, root=config[section]) - -""" -section: - package: lib.package - service: Service - id: - qwer: asdf - tyui: ghjk -""" - + l = cls(config[section][id].get('package')) + return l.get(config[section][id].get('service'))(id=id, root=config[section]) diff --git a/medved.py b/medved.py index 0385ec1..0894c27 100755 --- a/medved.py +++ b/medved.py @@ -9,13 +9,14 @@ from lib import Logger, Loader class Core: """Core class, contains core services (like listeners, executors, datapool)""" def __init__(self): - self.cnf = cnf.Core + self.cnf = cnf.get("core") self.logger = Logger("Core") self.logger.debug("Loading services") self._services = [] - for service in self.cnf.services: - service = Loader.by_id('services', service) + + for service_name in self.cnf.get("services"): + service = Loader.by_id('services', service_name) self._services.append(service) def start(self):