Added some abstract parent classes and folders, made tests for parser and IpGenerator, fixed some bugs in parsing.

This commit is contained in:
Zloooy 2019-01-27 23:28:58 +03:00
parent 2b412dd8f7
commit ee2de0decf
15 changed files with 279 additions and 76 deletions

View File

@ -1,17 +0,0 @@
import socket
class CoreModel:
def __init__(self, timeout):
self.defSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.defSocket.settimeout(int(timeout))
def scanIP(self, host, ports):
openPorts = []
for i in ports[0]:
result = self.defSocket.connect_ex((host, int(i)))
if result == 0:
openPorts.append(i)
self.defSocket.close()
self.defSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
return openPorts

View File

@ -1,10 +1,11 @@
import CoreModel from network_scan.CoreModel import CoreModel
import Parser from address_generation.Parser import Parser
import threading import threading
import queue import queue
import datetime import datetime
from PyQt5.Qt import * from PyQt5.Qt import *
from netaddr import IPNetwork from netaddr import IPNetwork
from address_generation.IpGenerator import IpGenerator
class MainPresenter: class MainPresenter:
@ -12,39 +13,48 @@ class MainPresenter:
self.ui = ui self.ui = ui
self.threads = [] self.threads = []
self.isScanEnabled = False self.isScanEnabled = False
self.queue = queue.Queue() self.parser=Parser()
def startScan(self, ipRanges, portsStr, threadNumber, timeout): def startScan(self, ipRanges, portsStr, threadNumber, timeout):
if timeout == '': if timeout == '':
timeout = '3' timeout = '3'
cidrIPRanges = Parser.getCIDRFromRanges(ipRanges) addresses = self.parser.parse_address_field(ipRanges)
ports = Parser.getPortsFromString(portsStr) ports = self.parser.parse_port_field(portsStr)
ips = [] self.ip_generator = IpGenerator(addresses,ports)
for cidr in cidrIPRanges[0]:
for ip in IPNetwork(cidr):
ips.append(str(ip))
for ip in ips:
self.queue.put(ip)
for i in range(int(threadNumber)): for i in range(int(threadNumber)):
self.threads.append(ScanThread(self.queue, ports, timeout, self)) self.threads.append(ScanThread(self.ip_generator, ports, timeout, self))
self.setCurrentThreadsLabel(len(self.threads)) self.setCurrentThreadsLabel(len(self.threads))
for thread in self.threads: for thread in self.threads:
thread.signal.connect(self.setLogText) thread.signal.connect(self.setLogText)
thread.exit_signal.connect(self.on_thread_exit) thread.exit_signal.connect(self.on_thread_exit)
thread.start() thread.start()
def on_thread_exit(self, thread): def on_thread_exit(self, is_last):
self.threads.pop(self.threads.index(thread[0])) if is_last:
self.setCurrentThreadsLabel(len(self.threads))
if len(self.threads) == 0:
self.isScanEnabled = False self.isScanEnabled = False
self.ui.startButton.setText("Start") self.ui.startButton.setText("Start")
return
count = 0
for thr in self.threads:
if thr.is_running:
count = count + 1
self.setCurrentThreadsLabel(count)
def stopScan(self): def stopScan(self):
self.isScanEnabled = False self.isScanEnabled = False
for thread in self.threads: for thread in self.threads:
thread.exit_signal.emit(self.threads.index(thread))
thread.exit() thread.exit()
thread.is_running = False
count = 0
is_last_thread = False
for i in self.threads:
if not i.is_running:
count += 1
if count == len(self.threads):
is_last_thread = True
thread.exit_signal.emit(is_last_thread)
self.threads.clear()
self.ui.currentThreadsLabel.setText("0")
self.queue = queue.Queue() self.queue = queue.Queue()
def setLogText(self, string): def setLogText(self, string):
@ -57,27 +67,34 @@ class MainPresenter:
class ScanThread(QThread): class ScanThread(QThread):
signal = pyqtSignal(str) signal = pyqtSignal(str)
exit_signal = pyqtSignal(list) # This signal for correct pop'ing from thread list exit_signal = pyqtSignal(bool)
def __init__(self, scanQueue, ports, timeout, presenter, parent=None): def __init__(self, ip_generator, ports, timeout, presenter, parent=None):
QThread.__init__(self, parent) QThread.__init__(self, parent)
self.scanQueue = scanQueue self.ip_generator = ip_generator
self.coreModel = CoreModel.CoreModel(timeout) self.coreModel = CoreModel(timeout)
self.ports = ports
self._stop_event = threading.Event() self._stop_event = threading.Event()
self.timeout = timeout self.timeout = timeout
self.presenter = presenter self.presenter = presenter
self.is_running = True
def run(self): def run(self):
while True: while True:
if self.scanQueue.empty(): scan_address = self.ip_generator.get_next_address(None)
self.exit_signal.emit([self]) if not scan_address:
self.exit(1) break
hostObject = self.scanQueue.get() scan_result = self.coreModel.scan_address(scan_address)
open_ports = self.coreModel.scanIP(str(hostObject), self.ports) if scan_result==0:
signalStr = ''.join(str(x) for x in open_ports) self.signal.emit( '%s has open port: %s' % scan_address)
if(signalStr == ''):
self.signal.emit(str(hostObject) + ' has no open ports!')
else: else:
self.signal.emit(str(hostObject) + ' has open ports: ' + signalStr) self.signal.emit( '%s has closed port: %s' % scan_address)
self.scanQueue.task_done() self.is_running = False
count = 0
is_last_thread = False
for i in self.presenter.threads:
if not i.isRunning():
count += 1
if count == len(self.presenter.threads):
is_last_thread = True
self.exit_signal.emit(is_last_thread)
self.exit(1)

View File

@ -1,21 +0,0 @@
import netaddr
def getCIDRFromRanges(str_ranges):
str_ranges = str_ranges.replace(' ', '')
ranges = []
ips = []
splitted_ranges = str_ranges.split(",")
for i in splitted_ranges:
ranges.append(i.split("-"))
for i in ranges:
if len(ranges[ranges.index(i)]) == 1:
ips.append(netaddr.iprange_to_cidrs(i[0], i[0]))
else:
ips.append(netaddr.iprange_to_cidrs(i[0], i[1]))
return ips
def getPortsFromString(str_ports):
str_ports = str_ports.replace(" ", "")
return [str_ports.split(",")]

0
__init__.py Normal file
View File

View File

@ -0,0 +1,9 @@
from abc import abstractmethod, ABC
class AbstractAddressGenerator(ABC):
@classmethod
@abstractmethod
def get_next_address(self, previous_address, **kwargs):
pass

View File

@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
class AbstractParser(ABC):
@classmethod
@abstractmethod
def parse_address_field(self, field):
pass
@classmethod
@abstractmethod
def parse_port_field(self, field):
pass

View File

@ -0,0 +1,18 @@
from AbstractAddressGenerator import AbstractAddressGenerator
from threading import RLock
class IpGenerator(AbstractAddressGenerator):
def __init__(self, address_generator, ports):
self.address_generator = address_generator
self.ports = ports
self.portnum = -1
self.lock = RLock()
def get_next_address(self, previous_address):
with self.lock:
self.portnum = (self.portnum + 1) % len(self.ports)
try:
return (str(next(self.address_generator)), self.ports[self.portnum])
except StopIteration:
return None

View File

@ -0,0 +1,80 @@
import ipaddress
from AbstractParser import AbstractParser
class Parser(AbstractParser):
def parse_port_field(self, ports):
"""
Parses ports from string, returns them as integers in the list.
Handles non-existent ports and non-port values.
"""
if ports:
# Using set to avoid repetitions
parsed = set()
ports = ports.split(",")
for port in ports:
try:
# Input is in range form ("100-200"):
if '-' in port:
start, end = map(int, port.split('-'))
parsed.update(
[p for p in range(start, end + 1) if 65355 >= p > 0])
# Input is a single port ("80"):
else:
parsed.add(int(port))
except ValueError as e:
# If we get any not integer just ignore it
pass
return sorted(list(parsed))
else:
# Change to default ports from constant
return [21, 22, 23, 25, 80, 443, 110, 111, 135, 139, 445, 8080, 8443, 53, 143, 989, 990, 3306, 1080, 5554, 6667, 2222, 4444, 666, 6666, 1337, 2020, 31337]
def parse_address_field(self, ips):
"""
Parses ip input string, returns the generator over them.
Supports next inputs:
1) 1.2.3.4
2) 192.168.0.0/24
3) 1.2.3.4 - 5.6.7.8
Any non-ip value will be ignored.
"""
# A set to contain non repeating ip objects from ipaddress
ip_objects = set()
inputs = [ip.strip() for ip in ips.split(',')]
for input_ in inputs:
try:
# Input is in range form ("1.2.3.4 - 5.6.7.8"):
if '-' in input_:
input_ips = input_.split('-')
ranges = set(
ipaddress.summarize_address_range(
*map(lambda x: ipaddress.IPv4Address(x.strip()), input_ips)
)
)
ip_objects.update(ranges)
# Input is in CIDR form ("192.168.0.0/24"):
elif '/' in input_:
network = ipaddress.ip_network(input_)
ip_objects.add(network)
# Input is a single ip ("1.1.1.1"):
else:
ip = ipaddress.ip_address(input_)
ip_objects.add(ip)
except ValueError as e:
print(e)
# If we get any non-ip value just ignore it
pass
for ip_obj in ip_objects:
# The object is just one ip, simply yield it:
if isinstance(ip_obj, ipaddress.IPv4Address):
yield ip_obj
# The object is a network, yield every host in it:
else:
for host in ip_obj:
yield host

View File

@ -0,0 +1,7 @@
import sys
import os
fil = __file__[:__file__.rindex(os.sep)]
print(fil)
sys.path.insert(0,fil)

View File

@ -0,0 +1,21 @@
from unittest import TestCase, main
from ipaddress import IPv4Address
from IpGenerator import IpGenerator
from Parser import Parser
class testIpGenerator(TestCase):
def setUp(self):
p = Parser()
self.ipgen = IpGenerator(p.parse_address_field("192.168.1.1"), [80])
def testIpGeneration(self):
self.assertEquals(
self.ipgen.get_next_address(None),
(IPv4Address("192.168.1.1"), 80))
print(self.ipgen.get_next_address(None))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,37 @@
from unittest import main, TestCase
from Parser import Parser
import ipaddress
class TestParser(TestCase):
def setUp(self):
self.parser = Parser()
def test_port_parsing(self):
self.assertEqual(self.parser.parse_port_field("80,90"), [80, 90])
def test_ip_parsing(self):
self.assertEqual(
set(self.parser.parse_address_field("192.168.1.1 - 192.168.1.3")),
{ipaddress.IPv4Address(ip) for ip in
[
"192.168.1.1",
"192.168.1.2",
"192.168.1.3"
]})
self.assertEqual(
set(self.parser.parse_address_field("192.168.1.1")),
{ipaddress.IPv4Address("192.168.1.1")}
)
self.assertEqual(
set(self.parser.parse_address_field("192.168.1.0/31")),
{ipaddress.IPv4Address(ip) for ip in
[
'192.168.1.1',
'192.168.1.0'
]})
if __name__ == "__main__":
main()

14
main.py
View File

@ -19,15 +19,17 @@ class MyWin(QtWidgets.QMainWindow):
self.isScanActive = False self.isScanActive = False
def startButtonClicked(self): def startButtonClicked(self):
if self.isScanActive == False: if self.presenter.isScanEnabled:
self.presenter.isScanEnabled = True
self.ui.startButton.setText("Stop")
self.presenter.startScan(self.ui.ipLine.text(), self.ui.portsLine.text(), self.ui.threadsLine.text(),
self.ui.timeoutLine.text())
else:
self.presenter.isScanEnabled = False self.presenter.isScanEnabled = False
self.ui.startButton.setText("Start") self.ui.startButton.setText("Start")
self.presenter.stopScan() self.presenter.stopScan()
else:
self.presenter.isScanEnabled = True
self.ui.startButton.setText("Stop")
self.presenter.startScan(self.ui.ipLine.text(),
self.ui.portsLine.text(),
self.ui.threadsLine.text(),
self.ui.timeoutLine.text())
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
class AbstractScanner(ABC):
@classmethod
@abstractmethod
def __init__(self, **kwargs):
pass
@classmethod
@abstractmethod
def scan_address(self, address):
pass

15
network_scan/CoreModel.py Normal file
View File

@ -0,0 +1,15 @@
import socket
from AbstractScanner import AbstractScanner
class CoreModel(AbstractScanner):
def __init__(self, timeout):
self.defSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.defSocket.settimeout(int(timeout))
def scan_address(self, address):
host, port = address
result = self.defSocket.connect_ex((host, port))
self.defSocket.close()
self.defSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
return result

7
network_scan/__init__.py Normal file
View File

@ -0,0 +1,7 @@
import sys
import os
fil = __file__[:__file__.rindex(os.sep)]
print(fil)
sys.path.insert(0,fil)