diff --git a/modules/network_scan/FTPScanner.py b/modules/network_scan/FTPScanner.py new file mode 100644 index 0000000..3f2ab40 --- /dev/null +++ b/modules/network_scan/FTPScanner.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +# -*- coding:utf-8 -*- + +from core.prototypes.AbstractScanner import AbstractScanner +import ftplib +from ftplib import FTP + +MAX_ERRORS = 3 + + +class FTPScanner(AbstractScanner): + def __init__(self, timeout): + self.__timeout__ = timeout + + def scan_address(self, host: 'ipv4_str or hostname', port: 'port', credentials: 'tuples with login and password') -> {'scan_result'}: + result = self.ftp_anonymous_login(host, port, self.__timeout__) + if result['status'] == 'error' or result['anonymous_login']: + return result + result['credentials'] = self.ftp_bruteforce( + host, port, credentials, self.__timeout__) + return result + + @staticmethod + def ftp_anonymous_login(host, port, timeout): + '''Get version and check if anonympous login is enabled''' + result = {} + ftp_connection = FTP(timeout=timeout) + try: + version = ftp_connection.connect(host=host, port=port) + # Get something like "220 Twisted 16.6.0 FTP Server" + result['ftp_version'] = version.lstrip('220 ') + # Try to login as anonymous user + ftp_connection.login() + result['anonymous_login'] = True + result['status'] = 'ok' + except ftplib.error_perm as e: + if str(e).startswith("530"): + result['status'] = 'ok' + result['anonymous_login'] = False + except ftplib.all_errors as e: + result['status'] = 'error' + result['error_type'] = str(e) + return result + finally: + ftp_connection.close() + return result + + @staticmethod + def ftp_bruteforce(host, port, creds, timeout): + '''Attempt to brute force login/password pair''' + # We want maintain connection to speed up bruteforce + # but we also want to reconnect if necessary. + # That is why I use cred iterator to pick up new login/pass only when + # we need to. + error_count = 0 + it = iter(creds) + cred = next(it, "") + ftp_connection = FTP(timeout=timeout) + while error_count < MAX_ERRORS: + try: + # Connecting to server + ftp_connection.connect(host=host, port=port) + while cred and error_count < MAX_ERRORS: + user, password = cred + # Trying to log in + try: + ftp_connection.login(user, password) + ftp_connection.close() + return user, password + except ftplib.error_perm as e: + # Password was wrong, checking another + cred = next(it, "") + continue + except ftplib.all_errors as e: + error_count += 1 + # Connection was dropped or another network error happened + # We must connection, error_count would help us to + # avoid deadlock on mumbling host + break + except ftplib.all_errors as e: + # Cannot reconnect, give up + break + finally: + ftp_connection.close() + return None diff --git a/modules/network_scan/test_FTPScanner.py b/modules/network_scan/test_FTPScanner.py new file mode 100644 index 0000000..e97826e --- /dev/null +++ b/modules/network_scan/test_FTPScanner.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +# -*- coding:utf-8 -*- + +from modules.network_scan.FTPScanner import FTPScanner +from pyftpdlib.authorizers import DummyAuthorizer +from pyftpdlib.handlers import FTPHandler +from pyftpdlib.servers import FTPServer +import unittest +from tempfile import mkdtemp +import multiprocessing +from time import sleep + +import http.server +import socketserver +import os + +TEST_CREDS = (("admin", "admin"), ("1", "1"), ('user', 'password')) +PORT = 2121 + + +def run_anonymous_ftp(temp_dir): + authorizer = DummyAuthorizer() + authorizer.add_anonymous(temp_dir) + handler = FTPHandler + handler.authorizer = authorizer + server = FTPServer(("127.0.0.1", PORT), handler) + server.serve_forever() + + +def run_bruteforce_ftp(temp_dir): + authorizer = DummyAuthorizer() + user, password = TEST_CREDS[-1] + authorizer.add_user(user, password, temp_dir, perm="elradfmw") + handler = FTPHandler + handler.authorizer = authorizer + handler.max_login_attempts = 2 # Drop connection on each 2 incorrect attempts + server = FTPServer(("127.0.0.1", PORT), handler) + server.serve_forever() + + +def run_mumble(): + handler = http.server.SimpleHTTPRequestHandler + httpd = socketserver.TCPServer(("127.0.0.1", PORT), handler) + httpd.serve_forever() + + +class TestFTPScanner(unittest.TestCase): + def test_closed_port(self): + scanner = FTPScanner(timeout=10) + result = scanner.scan_address('127.0.0.1', 31337, credentials=TEST_CREDS) + print(result) + self.assertEqual(result['status'], 'error', "Should be error") + self.assertTrue("Connection refused" in result['error_type'], "Connection refused") + + def test_mumble(self): + p = multiprocessing.Process(target=run_mumble) + p.start() + sleep(5) + scanner = FTPScanner(timeout=10) + result = scanner.scan_address('127.0.0.1', PORT, credentials=TEST_CREDS) + print(result) + self.assertEqual(result['status'], 'error', "Should be error") + self.assertTrue("timed out" in result['error_type'], "Timed out") + p.terminate() + + def test_anonymous_login(self): + temp_dir = mkdtemp() + p = multiprocessing.Process(target=run_anonymous_ftp, args=(temp_dir,)) + p.start() + sleep(5) + scanner = FTPScanner(timeout=10) + result = scanner.scan_address('127.0.0.1', PORT, credentials=TEST_CREDS) + print(result) + self.assertEqual(result['anonymous_login'], True, "Should be True") + p.terminate() + os.rmdir(temp_dir) + + def test_bruteforce(self): + temp_dir = mkdtemp() + p = multiprocessing.Process(target=run_bruteforce_ftp, args=(temp_dir,)) + p.start() + sleep(5) + scanner = FTPScanner(timeout=10) + result = scanner.scan_address('127.0.0.1', PORT, credentials=TEST_CREDS) + print(result) + self.assertEqual(result['credentials'], TEST_CREDS[-1], "Should be True") + p.terminate() + os.rmdir(temp_dir) + + +if __name__ == '__main__': + unittest.main()