mirror of
https://github.com/ChronosX88/PyNesca.git
synced 2024-11-21 20:52:18 +00:00
Fixed trouble with IpGenerator
This commit is contained in:
parent
acbfb6607b
commit
aa3c795f9a
@ -1,12 +1,10 @@
|
||||
from network_scan.CoreModel import CoreModel
|
||||
from address_generation.Parser import Parser
|
||||
import threading
|
||||
import queue
|
||||
import datetime
|
||||
from PyQt5.Qt import *
|
||||
from netaddr import IPNetwork
|
||||
from address_generation.IpGenerator import IpGenerator
|
||||
|
||||
from storage.JSONStorage import JSONStorage
|
||||
|
||||
class MainPresenter:
|
||||
def __init__(self, ui):
|
||||
@ -14,12 +12,15 @@ class MainPresenter:
|
||||
self.threads = []
|
||||
self.isScanEnabled = False
|
||||
self.parser = Parser()
|
||||
#needed config to specify path
|
||||
self.storage = JSONStorage("results.json")
|
||||
|
||||
def startScan(self, ipRanges, portsStr, threadNumber, timeout):
|
||||
if timeout == '':
|
||||
timeout = '3'
|
||||
addresses = self.parser.parse_address_field(ipRanges)
|
||||
ports = self.parser.parse_port_field(portsStr)
|
||||
print(ports)
|
||||
self.ip_generator = IpGenerator(addresses,ports)
|
||||
for i in range(int(threadNumber)):
|
||||
self.threads.append(ScanThread(self.ip_generator, ports, timeout, self))
|
||||
@ -33,6 +34,7 @@ class MainPresenter:
|
||||
if is_last:
|
||||
self.isScanEnabled = False
|
||||
self.ui.startButton.setText("Start")
|
||||
self.storage.save()
|
||||
return
|
||||
count = 0
|
||||
for thr in self.threads:
|
||||
@ -55,8 +57,6 @@ class MainPresenter:
|
||||
thread.exit_signal.emit(is_last_thread)
|
||||
self.threads.clear()
|
||||
self.ui.currentThreadsLabel.setText("0")
|
||||
self.queue = queue.Queue()
|
||||
|
||||
def setLogText(self, string):
|
||||
self.ui.dataText.append("[" + str(datetime.datetime.now()) + "] " + str(string))
|
||||
|
||||
@ -72,6 +72,7 @@ class ScanThread(QThread):
|
||||
def __init__(self, ip_generator, ports, timeout, presenter, parent=None):
|
||||
QThread.__init__(self, parent)
|
||||
self.ip_generator = ip_generator
|
||||
self.previous_address = None
|
||||
self.coreModel = CoreModel(timeout)
|
||||
self._stop_event = threading.Event()
|
||||
self.timeout = timeout
|
||||
@ -79,16 +80,20 @@ class ScanThread(QThread):
|
||||
self.is_running = True
|
||||
|
||||
def run(self):
|
||||
print("Thread sterted")
|
||||
while True:
|
||||
scan_address = self.ip_generator.get_next_address(None)
|
||||
scan_address = self.ip_generator.get_next_address(self.previous_address)
|
||||
if not scan_address:
|
||||
break
|
||||
self.previous_address = scan_address
|
||||
scan_result = self.coreModel.scan_address(scan_address)
|
||||
self.presenter.storage.put_responce(scan_address, scan_result)
|
||||
if scan_result==0:
|
||||
self.signal.emit( '%s has open port: %s' % scan_address)
|
||||
else:
|
||||
self.signal.emit( '%s has closed port: %s' % scan_address)
|
||||
self.is_running = False
|
||||
print("Thread stopped")
|
||||
count = 0
|
||||
is_last_thread = False
|
||||
for i in self.presenter.threads:
|
||||
|
@ -1,18 +1,28 @@
|
||||
from AbstractAddressGenerator import AbstractAddressGenerator
|
||||
from threading import RLock
|
||||
|
||||
|
||||
class IpGenerator(AbstractAddressGenerator):
|
||||
|
||||
def __init__(self, address_generator, ports):
|
||||
self.address_generator = address_generator
|
||||
def __init__(self, ip_generator, ports):
|
||||
self.ip_generator = ip_generator
|
||||
self.ports = ports
|
||||
self.portnum = -1
|
||||
self.lock = RLock()
|
||||
|
||||
def get_next_port_number(self, previous_port):
|
||||
return (self.ports.index(previous_port) + 1) % len(self.ports)
|
||||
|
||||
def get_next_address(self, previous_address):
|
||||
with self.lock:
|
||||
self.portnum = (self.portnum + 1) % len(self.ports)
|
||||
portnum = 0
|
||||
next_ip = None
|
||||
if previous_address:
|
||||
next_ip, port = previous_address
|
||||
portnum = self.get_next_port_number(port)
|
||||
if (portnum == 0):
|
||||
try:
|
||||
return (str(next(self.address_generator)), self.ports[self.portnum])
|
||||
next_ip = str(next(self.ip_generator))
|
||||
except StopIteration:
|
||||
return None
|
||||
result = (next_ip, self.ports[portnum])
|
||||
return result
|
||||
|
@ -3,5 +3,4 @@ import os
|
||||
|
||||
|
||||
fil = __file__[:__file__.rindex(os.sep)]
|
||||
print(fil)
|
||||
sys.path.insert(0,fil)
|
||||
|
@ -8,13 +8,19 @@ class testIpGenerator(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
p = Parser()
|
||||
self.ipgen = IpGenerator(p.parse_address_field("192.168.1.1"), [80])
|
||||
self.ipgen = IpGenerator(
|
||||
p.parse_address_field("192.168.1.1 - 192.168.1.10"), [80, 90])
|
||||
|
||||
def testIpGeneration(self):
|
||||
self.assertEquals(
|
||||
'''self.assertEqual(
|
||||
self.ipgen.get_next_address(None),
|
||||
(IPv4Address("192.168.1.1"), 80))
|
||||
print(self.ipgen.get_next_address(None))
|
||||
("192.168.1.1", 80))'''
|
||||
previous_address = None
|
||||
a = True
|
||||
while previous_address or a:
|
||||
previous_address=self.ipgen.get_next_address(previous_address)
|
||||
a = False
|
||||
self.assertEqual(self.ipgen.get_next_address(None), None)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Loading…
Reference in New Issue
Block a user