PyNesca/MainPresenter.py

95 lines
3.2 KiB
Python
Raw Normal View History

from network_scan.CoreModel import CoreModel
from address_generation.Parser import Parser
2019-01-20 16:59:47 +00:00
import threading
import datetime
2019-01-20 16:59:47 +00:00
from PyQt5.Qt import *
from address_generation.IpGenerator import IpGenerator
2019-01-28 18:57:07 +00:00
from storage.JSONStorage import JSONStorage
2019-01-20 16:59:47 +00:00
2019-01-20 16:59:47 +00:00
class MainPresenter:
def __init__(self, ui):
self.ui = ui
self.threads = []
self.isScanEnabled = False
2019-01-28 18:57:07 +00:00
self.parser = Parser()
#needed config to specify path
self.storage = JSONStorage("results.json")
2019-01-20 16:59:47 +00:00
def startScan(self, ipRanges, portsStr, threadNumber, timeout):
if timeout == '':
timeout = '3'
addresses = self.parser.parse_address_field(ipRanges)
ports = self.parser.parse_port_field(portsStr)
2019-01-28 18:57:07 +00:00
print(ports)
self.ip_generator = IpGenerator(addresses,ports)
2019-01-20 16:59:47 +00:00
for i in range(int(threadNumber)):
self.threads.append(ScanThread(self.ip_generator, ports, timeout, self))
2019-01-20 16:59:47 +00:00
self.setCurrentThreadsLabel(len(self.threads))
for thread in self.threads:
thread.signal.connect(self.setLogText)
thread.exit_signal.connect(self.on_thread_exit)
2019-01-20 16:59:47 +00:00
thread.start()
def on_thread_exit(self):
count = 0
for thr in self.threads:
if thr.is_running:
count = count + 1
if count == len(self.threads):
self.on_end_scanning()
self.setCurrentThreadsLabel(count)
2019-01-20 16:59:47 +00:00
def on_end_scanning(self):
self.isScanEnabled = False
self.ui.startButton.setText("Start")
self.storage.save()
2019-01-20 16:59:47 +00:00
def stopScan(self):
self.isScanEnabled = False
for thread in self.threads:
thread.exit()
for thread in self.threads:
thread.wait()
self.on_end_scanning()
self.threads.clear()
self.ui.currentThreadsLabel.setText("0")
2019-01-20 16:59:47 +00:00
def setLogText(self, string):
self.ui.dataText.append("[" + str(datetime.datetime.now()) + "] " + str(string))
2019-01-20 16:59:47 +00:00
def setCurrentThreadsLabel(self, threadNumber):
self.ui.currentThreadsLabel.setText(str(threadNumber))
class ScanThread(QThread):
signal = pyqtSignal(str)
exit_signal = pyqtSignal()
2019-01-20 16:59:47 +00:00
def __init__(self, ip_generator, ports, timeout, presenter, parent=None):
2019-01-20 16:59:47 +00:00
QThread.__init__(self, parent)
self.ip_generator = ip_generator
2019-01-28 18:57:07 +00:00
self.previous_address = None
self.coreModel = CoreModel(timeout)
2019-01-20 16:59:47 +00:00
self._stop_event = threading.Event()
self.timeout = timeout
self.presenter = presenter
self.is_running = True
2019-01-20 16:59:47 +00:00
def run(self):
while True:
2019-01-28 18:57:07 +00:00
scan_address = self.ip_generator.get_next_address(self.previous_address)
if not scan_address:
break
2019-01-28 18:57:07 +00:00
self.previous_address = scan_address
scan_result = self.coreModel.scan_address(scan_address)
2019-01-28 18:57:07 +00:00
self.presenter.storage.put_responce(scan_address, scan_result)
if scan_result == 0:
self.signal.emit('%s has open port: %s' % scan_address)
2019-01-20 16:59:47 +00:00
else:
self.signal.emit('%s has closed port: %s' % scan_address)
self.is_running = False
self.exit_signal.emit()
self.exit(1)