mirror of
https://github.com/ChronosX88/PyNesca.git
synced 2024-11-21 20:52:18 +00:00
Merged with Zloooy repository
This commit is contained in:
commit
584be50437
17
CoreModel.py
17
CoreModel.py
@ -1,17 +0,0 @@
|
||||
import socket
|
||||
|
||||
|
||||
class CoreModel:
|
||||
def __init__(self, timeout):
|
||||
self.defSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.defSocket.settimeout(int(timeout))
|
||||
|
||||
def scanIP(self, host, ports):
|
||||
openPorts = []
|
||||
for i in ports[0]:
|
||||
result = self.defSocket.connect_ex((host, int(i)))
|
||||
if result == 0:
|
||||
openPorts.append(i)
|
||||
self.defSocket.close()
|
||||
self.defSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
return openPorts
|
162
MainPresenter.py
162
MainPresenter.py
@ -1,10 +1,11 @@
|
||||
import CoreModel
|
||||
import Parser
|
||||
import threading
|
||||
import queue
|
||||
from network_scan.CoreModel import CoreModel
|
||||
from address_generation.Parser import Parser
|
||||
from threading import RLock
|
||||
import datetime
|
||||
from PyQt5.Qt import *
|
||||
from netaddr import IPNetwork
|
||||
from PyQt5.Qt import QThread, pyqtSignal
|
||||
from PyQt5.QtCore import QObject, pyqtSlot
|
||||
from address_generation.IpGenerator import IpGenerator
|
||||
from storage.JSONStorage import JSONStorage
|
||||
|
||||
|
||||
class MainPresenter:
|
||||
@ -12,93 +13,94 @@ class MainPresenter:
|
||||
self.ui = ui
|
||||
self.threads = []
|
||||
self.isScanEnabled = False
|
||||
self.queue = queue.Queue()
|
||||
self.parser = Parser()
|
||||
#needed config to specify path
|
||||
self.storage = JSONStorage("results.json")
|
||||
self.exit_lock = RLock()
|
||||
|
||||
def startScan(self, ipRanges, portsStr, threadNumber, timeout):
|
||||
if timeout == '':
|
||||
timeout = '3'
|
||||
cidrIPRanges = Parser.getCIDRFromRanges(ipRanges)
|
||||
ports = Parser.getPortsFromString(portsStr)
|
||||
ips = []
|
||||
for cidr in cidrIPRanges[0]:
|
||||
for ip in IPNetwork(cidr):
|
||||
ips.append(str(ip))
|
||||
for ip in ips:
|
||||
self.queue.put(ip)
|
||||
for i in range(int(threadNumber)):
|
||||
self.threads.append(ScanThread(self.queue, ports, timeout, self))
|
||||
self.setCurrentThreadsLabel(len(self.threads))
|
||||
timeout = 3 if not timeout else int(timeout)
|
||||
addresses = self.parser.parse_address_field(ipRanges)
|
||||
ports = self.parser.parse_port_field(portsStr)
|
||||
self.ip_generator = IpGenerator(addresses, ports)
|
||||
self.scanner = CoreModel(timeout)
|
||||
threadNumber = int(threadNumber)
|
||||
for i in range(threadNumber):
|
||||
scan_worker = ScanWorker(
|
||||
self.ip_generator,
|
||||
self.scanner,
|
||||
self.storage
|
||||
)
|
||||
scan_thread = QThread()
|
||||
scan_worker.log_signal.connect(self.log_text)
|
||||
scan_worker.moveToThread(scan_thread)
|
||||
scan_worker.exit_signal.connect(scan_thread.exit)
|
||||
scan_worker.exit_signal.connect(self.on_worker_exit)
|
||||
scan_thread.started.connect(scan_worker.work)
|
||||
self.threads.append((scan_worker, scan_thread))
|
||||
self.changeThreadLabel(threadNumber)
|
||||
for thread in self.threads:
|
||||
thread.signal.connect(self.setLogText)
|
||||
thread.exit_signal.connect(self.on_thread_exit)
|
||||
thread.start()
|
||||
scan_worker, scan_thread = thread
|
||||
scan_thread.start()
|
||||
|
||||
def on_thread_exit(self, is_last):
|
||||
if is_last:
|
||||
def changeThreadLabel(self, number_of_threads):
|
||||
self.number_of_threads = number_of_threads
|
||||
self.ui.currentThreadsLabel.setText(str(number_of_threads))
|
||||
|
||||
def on_worker_exit(self):
|
||||
self.changeThreadLabel(self.number_of_threads - 1)
|
||||
with self.exit_lock:
|
||||
for num, thread in enumerate(self.threads):
|
||||
scan_worker, scan_thread = thread
|
||||
if not scan_worker.isRunning:
|
||||
self.threads.pop(num)
|
||||
break
|
||||
if self.number_of_threads == 0:
|
||||
self.on_end_scanning()
|
||||
|
||||
def on_end_scanning(self):
|
||||
self.isScanEnabled = False
|
||||
self.ui.startButton.setText("Start")
|
||||
return
|
||||
count = 0
|
||||
for thr in self.threads:
|
||||
if thr.is_running:
|
||||
count = count + 1
|
||||
self.setCurrentThreadsLabel(count)
|
||||
self.storage.save()
|
||||
|
||||
def stopScan(self):
|
||||
self.isScanEnabled = False
|
||||
for thread in self.threads:
|
||||
thread.exit()
|
||||
thread.is_running = False
|
||||
count = 0
|
||||
is_last_thread = False
|
||||
for i in self.threads:
|
||||
if not i.is_running:
|
||||
count += 1
|
||||
if count == len(self.threads):
|
||||
is_last_thread = True
|
||||
thread.exit_signal.emit(is_last_thread)
|
||||
self.threads.clear()
|
||||
self.ui.currentThreadsLabel.setText("0")
|
||||
self.queue = queue.Queue()
|
||||
while self.threads:
|
||||
scan_worker, scan_thread = self.threads[0]
|
||||
if scan_worker.isRunning:
|
||||
scan_worker.stop()
|
||||
|
||||
def setLogText(self, string):
|
||||
def log_text(self, string):
|
||||
self.ui.dataText.append("[" + str(datetime.datetime.now()) + "] " + str(string))
|
||||
|
||||
def setCurrentThreadsLabel(self, threadNumber):
|
||||
self.ui.currentThreadsLabel.setText(str(threadNumber))
|
||||
|
||||
class ScanWorker(QObject):
|
||||
|
||||
class ScanThread(QThread):
|
||||
log_signal = pyqtSignal(str)
|
||||
exit_signal = pyqtSignal()
|
||||
|
||||
signal = pyqtSignal(str)
|
||||
exit_signal = pyqtSignal(bool)
|
||||
def __init__(self, ip_generator, scanner, storage, **kwargs):
|
||||
super().__init__()
|
||||
self.ip_generator = ip_generator
|
||||
self.storage = storage
|
||||
self.scanner = scanner
|
||||
self.previous_address = None
|
||||
self.isRunning = True
|
||||
|
||||
def __init__(self, scanQueue, ports, timeout, presenter, parent=None):
|
||||
QThread.__init__(self, parent)
|
||||
self.scanQueue = scanQueue
|
||||
self.coreModel = CoreModel.CoreModel(timeout)
|
||||
self.ports = ports
|
||||
self._stop_event = threading.Event()
|
||||
self.timeout = timeout
|
||||
self.presenter = presenter
|
||||
self.is_running = True
|
||||
@pyqtSlot()
|
||||
def work(self):
|
||||
while self.isRunning:
|
||||
scan_address = self.ip_generator.get_next_address(self.previous_address)
|
||||
if not scan_address:
|
||||
break
|
||||
self.previous_address = scan_address
|
||||
scan_result = self.scanner.scan_address(scan_address)
|
||||
self.storage.put_responce(scan_address, scan_result)
|
||||
if scan_result == 0:
|
||||
self.log_signal.emit('%s has open port: %s' % scan_address)
|
||||
else:
|
||||
self.log_signal.emit('%s has closed port: %s' % scan_address)
|
||||
self.stop()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
if self.scanQueue.empty():
|
||||
self.is_running = False
|
||||
count = 0
|
||||
is_last_thread = False
|
||||
for i in self.presenter.threads:
|
||||
if not i.isRunning():
|
||||
count += 1
|
||||
if count == len(self.presenter.threads):
|
||||
is_last_thread = True
|
||||
self.exit_signal.emit(is_last_thread)
|
||||
self.exit(1)
|
||||
hostObject = self.scanQueue.get()
|
||||
open_ports = self.coreModel.scanIP(str(hostObject), self.ports)
|
||||
signalStr = ', '.join(open_ports)
|
||||
if signalStr != '':
|
||||
self.signal.emit(str(hostObject) + ' has open ports: ' + signalStr)
|
||||
self.scanQueue.task_done()
|
||||
def stop(self):
|
||||
self.isRunning = False
|
||||
self.exit_signal.emit()
|
||||
|
76
Parser.py
76
Parser.py
@ -1,76 +0,0 @@
|
||||
import ipaddress
|
||||
|
||||
|
||||
def getPortsFromString(ports):
|
||||
"""
|
||||
Parses ports from string, returns them as integers in the list.
|
||||
Handles non-existent ports and non-port values.
|
||||
"""
|
||||
if ports:
|
||||
# Using set to avoid repetitions
|
||||
parsed = set()
|
||||
ports = ports.split(",")
|
||||
for port in ports:
|
||||
try:
|
||||
# Input is in range form ("100-200"):
|
||||
if '-' in port:
|
||||
start, end = map(int, port.split('-'))
|
||||
parsed.update(
|
||||
[p for p in range(start, end + 1) if 65355 >= p > 0])
|
||||
# Input is a single port ("80"):
|
||||
else:
|
||||
parsed.add(int(port))
|
||||
except ValueError as e:
|
||||
# If we get any not integer just ignore it
|
||||
pass
|
||||
return sorted(list(parsed))
|
||||
else:
|
||||
# Change to default ports from constant
|
||||
return [21, 22, 23, 25, 80, 443, 110, 111, 135, 139, 445, 8080, 8443, 53, 143, 989, 990, 3306, 1080, 5554, 6667, 2222, 4444, 666, 6666, 1337, 2020, 31337]
|
||||
|
||||
|
||||
def getCIDRFromRanges(ips):
|
||||
"""
|
||||
Parses ip input string, returns the generator over them.
|
||||
|
||||
Supports next inputs:
|
||||
1) 1.2.3.4
|
||||
2) 192.168.0.0/24
|
||||
3) 1.2.3.4 - 5.6.7.8
|
||||
Any non-ip value will be ignored.
|
||||
"""
|
||||
|
||||
# A set to contain non repeating ip objects from ipaddress
|
||||
ip_objects = set()
|
||||
inputs = [ip.strip() for ip in ips.split(',')]
|
||||
|
||||
for input_ in inputs:
|
||||
try:
|
||||
# Input is in range form ("1.2.3.4 - 5.6.7.8"):
|
||||
if '-' in input_:
|
||||
input_ips = input_.split('-')
|
||||
ranges = {ipaddr for ipaddr in ipaddress.summarize_address_range(
|
||||
ipaddress.IPv4Address(input_ips[0]),
|
||||
ipaddress.IPv4Address(input_ips[1]))
|
||||
}
|
||||
ip_objects.update(ranges)
|
||||
# Input is in CIDR form ("192.168.0.0/24"):
|
||||
elif '/' in input_:
|
||||
network = ipaddress.ip_network(input_)
|
||||
ip_objects.add(network)
|
||||
# Input is a single ip ("1.1.1.1"):
|
||||
else:
|
||||
ip = ipaddress.ip_address(input_)
|
||||
ip_objects.add(ip)
|
||||
except ValueError as e:
|
||||
# If we get any non-ip value just ignore it
|
||||
pass
|
||||
|
||||
for ip_obj in ip_objects:
|
||||
# The object is just one ip, simply yield it:
|
||||
if isinstance(ip_obj, ipaddress.IPv4Address):
|
||||
yield ip_obj
|
||||
# The object is a network, yield every host in it:
|
||||
else:
|
||||
for host in ip_obj.hosts():
|
||||
yield host
|
26
README.md
Normal file
26
README.md
Normal file
@ -0,0 +1,26 @@
|
||||
PySca - a NEtwork SCAnner rewritten in Python
|
||||
=============================
|
||||
[Вариант на русском](README.ru.md)
|
||||
------------
|
||||
What is it?
|
||||
------------
|
||||
According to [Wikipedia](https://en.wikipedia.org/wiki/Network_enumeration#Software),
|
||||
network scanner is a computer program used to retrieve usernames and info on groups, shares, and services of networked computers.
|
||||
This project is based on ideas of scanner [NESCA](https://github.com/pantyusha/nesca) - it is simple scanner for everyone. PySca includes GUI-interface. The project is not just another fork of nesca - it is analogue built from scratch using Python language. Another difference from original is modularity of code - PySca has ability to get scan results from wherever you want and put wherever you can with the same user interface.
|
||||
|
||||
INSTALLATION
|
||||
------------
|
||||
Just run
|
||||
```bash
|
||||
git clone http://github.com/ChronosX88/PySca.git
|
||||
cd PySca
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
RUN
|
||||
------------
|
||||
Run
|
||||
```bash
|
||||
python main.py
|
||||
```
|
||||
in root PySca folder
|
24
README.ru.md
Normal file
24
README.ru.md
Normal file
@ -0,0 +1,24 @@
|
||||
PySca - сетевой сканер, переписанный на Python
|
||||
=============================
|
||||
Что это?
|
||||
------------
|
||||
Согласно [Википедии](https://en.wikipedia.org/wiki/Network_enumeration#Software),
|
||||
сетевой сканер это компьютерная программа, используемая для получения информации о группах, общих ресурсах и сервисах компьютеров, подключённых к Сети.
|
||||
Этот проект основывается на идеях небезызвестного сканера [NESCA](https://github.com/pantyusha/nesca) - сканера, понятныого каждому за счёт удобного [GUI](https://ru.wikipedia.org/wiki/%D0%93%D1%80%D0%B0%D1%84%D0%B8%D1%87%D0%B5%D1%81%D0%BA%D0%B8%D0%B9_%D0%B8%D0%BD%D1%82%D0%B5%D1%80%D1%84%D0%B5%D0%B9%D1%81_%D0%BF%D0%BE%D0%BB%D1%8C%D0%B7%D0%BE%D0%B2%D0%B0%D1%82%D0%B5%D0%BB%D1%8F). PySca - не очередной форк NESCA - это её аналог, созданный с нуля на Python. Другое важное отличие от оригинала - модульность архитектуры сканера - PySca может брать данные (адреса для сканирования) из любого источника и сохранять результат куда возможно (при наличии соответствующих модулей), сохраняя тот же пользовательский интерфейс.
|
||||
|
||||
УСТАНОВКА
|
||||
------------
|
||||
Введите в терминале
|
||||
```bash
|
||||
git clone http://github.com/ChronosX88/PySca.git
|
||||
cd PySca
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
ЗАПУСК
|
||||
------------
|
||||
Введите в терминале
|
||||
```bash
|
||||
python main.py
|
||||
```
|
||||
Находясь в корневой папке PySca
|
0
__init__.py
Normal file
0
__init__.py
Normal file
15
address_generation/AbstractAddressGenerator.py
Normal file
15
address_generation/AbstractAddressGenerator.py
Normal file
@ -0,0 +1,15 @@
|
||||
from abc import abstractmethod, ABC
|
||||
|
||||
|
||||
class AbstractAddressGenerator(ABC):
|
||||
'''The class describes addess generation mechanism.
|
||||
In __init__ method it should get results of parsing fields
|
||||
and then it returns addresses.'''
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def get_next_address(self, previous_address, **kwargs):
|
||||
'''Address - an only, indivisible object, that describes single scan
|
||||
target address. This method should return next address to scan based on
|
||||
previous scanned address and result of scanning previous address, that
|
||||
can be placed in kwargs.'''
|
||||
pass
|
18
address_generation/AbstractParser.py
Normal file
18
address_generation/AbstractParser.py
Normal file
@ -0,0 +1,18 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class AbstractParser(ABC):
|
||||
'''The class describes fields parsing mechanisms'''
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def parse_address_field(self, field):
|
||||
'''In address field can be plased any text, describing address of
|
||||
scanning target'''
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def parse_port_field(self, field):
|
||||
'''In port field only numbers, whitespaces, comma and '-' allowed'''
|
||||
pass
|
28
address_generation/IpGenerator.py
Normal file
28
address_generation/IpGenerator.py
Normal file
@ -0,0 +1,28 @@
|
||||
from AbstractAddressGenerator import AbstractAddressGenerator
|
||||
from threading import RLock
|
||||
|
||||
|
||||
class IpGenerator(AbstractAddressGenerator):
|
||||
|
||||
def __init__(self, ip_generator, ports):
|
||||
self.ip_generator = ip_generator
|
||||
self.ports = ports
|
||||
self.lock = RLock()
|
||||
|
||||
def get_next_port_number(self, previous_port):
|
||||
return (self.ports.index(previous_port) + 1) % len(self.ports)
|
||||
|
||||
def get_next_address(self, previous_address):
|
||||
with self.lock:
|
||||
portnum = 0
|
||||
next_ip = None
|
||||
if previous_address:
|
||||
next_ip, port = previous_address
|
||||
portnum = self.get_next_port_number(port)
|
||||
if (portnum == 0):
|
||||
try:
|
||||
next_ip = str(next(self.ip_generator))
|
||||
except StopIteration:
|
||||
return None
|
||||
result = (next_ip, self.ports[portnum])
|
||||
return result
|
80
address_generation/Parser.py
Normal file
80
address_generation/Parser.py
Normal file
@ -0,0 +1,80 @@
|
||||
import ipaddress
|
||||
from AbstractParser import AbstractParser
|
||||
|
||||
|
||||
class Parser(AbstractParser):
|
||||
|
||||
def parse_port_field(self, ports):
|
||||
"""
|
||||
Parses ports from string, returns them as integers in the list.
|
||||
Handles non-existent ports and non-port values.
|
||||
"""
|
||||
if ports:
|
||||
# Using set to avoid repetitions
|
||||
parsed = set()
|
||||
ports = ports.split(",")
|
||||
for port in ports:
|
||||
try:
|
||||
# Input is in range form ("100-200"):
|
||||
if '-' in port:
|
||||
start, end = map(int, port.split('-'))
|
||||
parsed.update(
|
||||
[p for p in range(start, end + 1) if 65355 >= p > 0])
|
||||
# Input is a single port ("80"):
|
||||
else:
|
||||
parsed.add(int(port))
|
||||
except ValueError as e:
|
||||
# If we get any not integer just ignore it
|
||||
pass
|
||||
return sorted(list(parsed))
|
||||
else:
|
||||
# Change to default ports from constant
|
||||
return [21, 22, 23, 25, 80, 443, 110, 111, 135, 139, 445, 8080, 8443, 53, 143, 989, 990, 3306, 1080, 5554, 6667, 2222, 4444, 666, 6666, 1337, 2020, 31337]
|
||||
|
||||
def parse_address_field(self, ips):
|
||||
"""
|
||||
Parses ip input string, returns the generator over them.
|
||||
|
||||
Supports next inputs:
|
||||
1) 1.2.3.4
|
||||
2) 192.168.0.0/24
|
||||
3) 1.2.3.4 - 5.6.7.8
|
||||
Any non-ip value will be ignored.
|
||||
"""
|
||||
|
||||
# A set to contain non repeating ip objects from ipaddress
|
||||
ip_objects = set()
|
||||
inputs = [ip.strip() for ip in ips.split(',')]
|
||||
|
||||
for input_ in inputs:
|
||||
try:
|
||||
# Input is in range form ("1.2.3.4 - 5.6.7.8"):
|
||||
if '-' in input_:
|
||||
input_ips = input_.split('-')
|
||||
ranges = set(
|
||||
ipaddress.summarize_address_range(
|
||||
*map(lambda x: ipaddress.IPv4Address(x.strip()), input_ips)
|
||||
)
|
||||
)
|
||||
ip_objects.update(ranges)
|
||||
# Input is in CIDR form ("192.168.0.0/24"):
|
||||
elif '/' in input_:
|
||||
network = ipaddress.ip_network(input_)
|
||||
ip_objects.add(network)
|
||||
# Input is a single ip ("1.1.1.1"):
|
||||
else:
|
||||
ip = ipaddress.ip_address(input_)
|
||||
ip_objects.add(ip)
|
||||
except ValueError as e:
|
||||
print(e)
|
||||
# If we get any non-ip value just ignore it
|
||||
pass
|
||||
|
||||
for ip_obj in ip_objects:
|
||||
# The object is just one ip, simply yield it:
|
||||
if isinstance(ip_obj, ipaddress.IPv4Address):
|
||||
yield ip_obj
|
||||
# The object is a network, yield every host in it:
|
||||
else:
|
||||
for host in ip_obj:
|
||||
yield host
|
6
address_generation/__init__.py
Normal file
6
address_generation/__init__.py
Normal file
@ -0,0 +1,6 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
|
||||
fil = __file__[:__file__.rindex(os.sep)]
|
||||
sys.path.insert(0,fil)
|
27
address_generation/test_IpGenerator.py
Normal file
27
address_generation/test_IpGenerator.py
Normal file
@ -0,0 +1,27 @@
|
||||
from unittest import TestCase, main
|
||||
from ipaddress import IPv4Address
|
||||
from IpGenerator import IpGenerator
|
||||
from Parser import Parser
|
||||
|
||||
|
||||
class testIpGenerator(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
p = Parser()
|
||||
self.ipgen = IpGenerator(
|
||||
p.parse_address_field("192.168.1.1 - 192.168.1.10"), [80, 90])
|
||||
|
||||
def testIpGeneration(self):
|
||||
'''self.assertEqual(
|
||||
self.ipgen.get_next_address(None),
|
||||
("192.168.1.1", 80))'''
|
||||
previous_address = None
|
||||
a = True
|
||||
while previous_address or a:
|
||||
previous_address=self.ipgen.get_next_address(previous_address)
|
||||
a = False
|
||||
self.assertEqual(self.ipgen.get_next_address(None), None)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
37
address_generation/test_Parser.py
Normal file
37
address_generation/test_Parser.py
Normal file
@ -0,0 +1,37 @@
|
||||
from unittest import main, TestCase
|
||||
from Parser import Parser
|
||||
import ipaddress
|
||||
|
||||
|
||||
class TestParser(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.parser = Parser()
|
||||
|
||||
def test_port_parsing(self):
|
||||
self.assertEqual(self.parser.parse_port_field("80,90"), [80, 90])
|
||||
|
||||
def test_ip_parsing(self):
|
||||
self.assertEqual(
|
||||
set(self.parser.parse_address_field("192.168.1.1 - 192.168.1.3")),
|
||||
{ipaddress.IPv4Address(ip) for ip in
|
||||
[
|
||||
"192.168.1.1",
|
||||
"192.168.1.2",
|
||||
"192.168.1.3"
|
||||
]})
|
||||
self.assertEqual(
|
||||
set(self.parser.parse_address_field("192.168.1.1")),
|
||||
{ipaddress.IPv4Address("192.168.1.1")}
|
||||
)
|
||||
self.assertEqual(
|
||||
set(self.parser.parse_address_field("192.168.1.0/31")),
|
||||
{ipaddress.IPv4Address(ip) for ip in
|
||||
[
|
||||
'192.168.1.1',
|
||||
'192.168.1.0'
|
||||
]})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
19
network_scan/AbstractScanner.py
Normal file
19
network_scan/AbstractScanner.py
Normal file
@ -0,0 +1,19 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class AbstractScanner(ABC):
|
||||
'''The class is used by one thread to scan targets'''
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def __init__(self, **kwargs):
|
||||
'''In this method you can init some
|
||||
reusable resources needed for scan'''
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def scan_address(self, address):
|
||||
'''This method should contain scanning process of given address. All
|
||||
items returned will be passed to AbstractStorage and ui'''
|
||||
pass
|
15
network_scan/CoreModel.py
Normal file
15
network_scan/CoreModel.py
Normal file
@ -0,0 +1,15 @@
|
||||
import socket
|
||||
from AbstractScanner import AbstractScanner
|
||||
|
||||
|
||||
class CoreModel(AbstractScanner):
|
||||
def __init__(self, timeout):
|
||||
self.defSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.defSocket.settimeout(int(timeout))
|
||||
|
||||
def scan_address(self, address):
|
||||
host, port = address
|
||||
result = self.defSocket.connect_ex((host, port))
|
||||
self.defSocket.close()
|
||||
self.defSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
return result
|
6
network_scan/__init__.py
Normal file
6
network_scan/__init__.py
Normal file
@ -0,0 +1,6 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
|
||||
fil = __file__[:__file__.rindex(os.sep)]
|
||||
sys.path.insert(0,fil)
|
14
storage/AbstractStorage.py
Normal file
14
storage/AbstractStorage.py
Normal file
@ -0,0 +1,14 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class AbstractStorage(ABC):
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def put_responce(self, address, responce):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def save(self):
|
||||
pass
|
23
storage/JSONStorage.py
Normal file
23
storage/JSONStorage.py
Normal file
@ -0,0 +1,23 @@
|
||||
from AbstractStorage import AbstractStorage
|
||||
import json
|
||||
from threading import RLock
|
||||
|
||||
|
||||
class JSONStorage(AbstractStorage):
|
||||
|
||||
def __init__(self, path):
|
||||
self.path = path
|
||||
self.respdict = dict()
|
||||
self.lock = RLock()
|
||||
|
||||
def put_responce(self, address, responce):
|
||||
ip, port = address
|
||||
if ip not in self.respdict.keys():
|
||||
self.respdict[ip] = {"open": [], "close": []}
|
||||
self.respdict[ip]["open" if responce != 0 else "close"].append(port)
|
||||
|
||||
def save(self):
|
||||
print("saving")
|
||||
with open(self.path, "w") as f:
|
||||
json.dump(self.respdict, f)
|
||||
self.respdict = {}
|
6
storage/__init__.py
Normal file
6
storage/__init__.py
Normal file
@ -0,0 +1,6 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
|
||||
fil = __file__[:__file__.rindex(os.sep)]
|
||||
sys.path.insert(0,fil)
|
Loading…
Reference in New Issue
Block a user