angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

IP addresses verzamelen en blokkeren met ipset en Python

En zodra we een lijst van IP addresses hebben, kunnen we ze analyseren met IP2Location en whois.

21 mei 2023
post main image
https://unsplash.com/@enginakyurt

Als je een server hebt die verbonden is met het internet, heb je dit waarschijnlijk gezien in je logbestanden: veel illegale externe verzoeken die proberen toegang te krijgen tot je diensten.

Ik onderhoud een Debian server, en gebruik Fail2Ban voor inbraakpreventie. Standaard praktijk, installeren, configureren en vergeten. Omdat de server op bepaalde tijden werd platgelegd, besloot ik een kijkje te nemen.

Ik ben vooral bezig met programmeren en dit is meer een taak voor systeembeheerders. Ja, zij zijn de specialisten, ik ben maar een noob. Maar het is altijd leuk om te leren. In deze post zal ik me beperken tot poortscanners, die de SMTP poort aanvallen. Over (D)DOS aanvallen zal ik een andere keer schrijven. Mijn Debian server is verbonden met het Internet met alleen ipv4 adressen (ik twijfel nog om ipv6 toe te voegen). Als u dit zelf probeert:

WAARSCHUWING: GEBRUIK OP EIGEN RISICO / SLUIT JEZELF NIET OP OUT

Gebruik van ipset en Python

Ik neem aan dat sommige van de IP addresses die betrokken zijn bij het scannen van de SMTP poort ook voor andere aanvallen kunnen worden gebruikt, maar is dat echt zo? Wat ik wilde bereiken was het verzamelen van zoveel mogelijk IP addresses die de SMTP poort scannen. Hiervoor moet ik deze IP addresses permanent blokkeren en dan neem ik aan (hoop ik) dat de scanner een andere IP address zal gebruiken om de aanval voort te zetten, enz.

Met Fail2Ban worden IP address's niet permanent geband. Ook blokkeert Fail2Ban standaard niet alle poorten, maar slechts één enkele poort.

Omdat ik de huidige Fail2Ban setup niet wilde aanraken, moest ik iptables gaan gebruiken. Maar ik wilde ook geen lange lijsten met IP addresses toevoegen aan iptables, en ze later weer verwijderen. ipset to the rescue. ipset is een uitbreiding op iptables die ons toestaat om firewall regels te maken die overeenkomen met sets van IP addresses (of IP netwerken).ipset sets worden opgeslagen in geïndexeerde datastructuren, waardoor lookups zeer efficiënt zijn.
Zodra we de IP addresses in een set hebben, bijvoorbeeld een set genaamd blocklist_postfix_sasl, kunnen we ze blokkeren door
een regel toe te voegen aan iptables:

iptables -I INPUT -m set --match-set blocklist_network src -j DROP

Hier blokkeer ik alle poorten, maar als je wilt kun je de regel wijzigen om alleen de poort SMTP te blokkeren.

Ik gebruik Python in plaats van een Bash-script om redenen die in eerdere berichten zijn uitgelegd. Bash is erg nuttig voor kleine scripts, maar zodra je wat meer controle en/of verwerking nodig hebt, ben je beter af met Python. Met Python kunnen we ook Linux commandoregelprogramma's uitvoeren met subprocess.

iptables-regels en ipset persistent maken

Het eerste wat u wilt doen is uw iptables regels persistent maken, wat betekent dat ze er nog zijn na een reboot. U moet ook uw ipset sets persistent maken. Hier zit een addertje onder het gras, want ipset sets moeten geladen worden voordat iptables geladen wordt. U kunt op het internet informatie vinden over hoe dit te doen, ik heb hieronder twee links toegevoegd.

IP addresses verzamelen

Ik heb een Python script gemaakt, zie hieronder, dat het volgende doet:

  • Scan de Fail2Ban logfiles om de paar uur voor Banned postfix-sasl IP addresses.
  • Maak een ipset set genaamd blocklist_postfix_sasl, indien nog niet aangemaakt.
  • Voeg deze IP addresses toe aan deze set, en ook aan een bestand.
  • Sla de set op zodat de gegevens niet verloren gaan bij een reboot.

Wanneer een IP address is toegevoegd aan een set, is deze onmiddellijk van kracht (als de set is toegevoegd aan iptables). Met behulp van Cron voer ik dit Python script elke drie uur uit.

Het bestand met de verboden IP addresses heeft regels als:

2023-05-20T07:00:02.785796 add blocklist_postfix_sasl 123.xxx.xxx.xxx
2023-05-20T07:00:02.801597 add blocklist_postfix_sasl 456.xxx.xxx.xxx

Post-processing

In een week heb ik een paar duizend unieke IP address's verzameld die door Fail2Ban zijn geblokkeerd. Vervolgens heb ik deze lijst naar mijn computer gekopieerd om te analyseren. Als eerste stap wilde ik een CSV-bestand maken met rijen IP addresses en zoveel mogelijk informatie.

U kunt een online dienst zoals Abuseipdb.com gebruiken om handmatig IP addresses te controleren, of een script schrijven en hun API gebruiken. Maar hun gratis dienst heeft een limiet van 1000 lookups per dag.

Toen vond ik het Python pakket IP2Location met de gratis database:

IP2LOCATION-LITE-DB11.BIN

Hiermee kunnen we de geo-locatie krijgen, zoals land en stad informatie. Het mooie is dat je deze database op je eigen machine hebt, geen remote lookups.

Van de 3900 unieke IP addresses kon ik nu zien dat de 3750 uit China kwamen, dat is 96%, WTF?

Vervolgens installeerde ik 'whois' op mijn machine:

apt install whois

en gebruikte dit, met een script, om meer informatie te krijgen. Om het aantal remote whois lookups te verminderen heb ik de IP addresses uit China voorlopig uitgesloten. Na wat codering kon ik de organisaties die verantwoordelijk zijn voor deze IP addresses en meer eruit halen.

Resultaten

Zoals hierboven reeds vermeld, komen bijna alle IP addresses uit China. Waarom zou China de poorten op mijn server willen scannen? Eén reden kan zijn dat het scannen van poorten business-as-usual is voor China. Voorbereiden op WO3? Of misschien hopen ze dat ik deze IP addresses permanent blokkeer, zodat de mensen uit China geen toegang hebben tot websites die op mijn server staan?

Kan iemand gespoofde IP addresses gebruiken om mijn server aan te vallen, en suggereren dat China poorten scant? Maar dan zouden ze de routers van mijn hosting provider moeten hacken? Alles is mogelijk, ik weet het niet.

Ik kan de resultaten van de andere IP addresses in detail niet vermelden. Maar ik moet wel vermelden dat één specifieke hosting organisatie uit Nederland een leverancier lijkt te zijn van veel hackers / poortscanners. Er staan schokkende reviews over hen op het internet. Onwetende mensen die hun hosting zijn gaan gebruiken en ontdekken dat hun websites over de hele wereld geblokkeerd worden.

De Python code voor het verzamelen van IP addresses

Hieronder staat de code voor het ophalen van de IP addresses, als je het zelf wilt proberen. Let op het verschil tussen subprocess.run() en subprocess.popen(). Met de laatste kunnen we een volledige opdrachtregel uitvoeren, met behulp van filters, pijpen en omleiding.

#!/usr/bin/python3
# manage_ipset_blocklist_postfix_sasl.py
import datetime
import glob
import logging
import os
import re
import shlex
import subprocess
import sys

# CONSTANTS
PROGRAM_NAME = 'manage_ipset_blocklist_postfix_sasl'
# never add these ips to ipset
NEVER_ADD_IPS = [
    'xxx.xxx.xxx.xxx',
]
FAIL2BAN_FILES_DIR = '/var/log'
FILES_DIR = '/root/f2b_bans'
# log file
LOG_FILE = os.path.join(FILES_DIR, PROGRAM_NAME + '.log')
# temporary file(s)
BANS_FILE = os.path.join(FILES_DIR, 'bans')
# ipset
IPSET_EXE = '/usr/sbin/ipset'
IPSET_BLOCKLIST_NAME = 'blocklist_postfix_sasl'
IPSET_BLOCKLIST_LOG_FILE = os.path.join(FILES_DIR, IPSET_BLOCKLIST_NAME + '.log')
# this file is loaded by ipset on reboot
IPSET_SAVE_FILE = '/etc/iptables/ipset'

def get_logger(
    console_log_level=logging.DEBUG,
    file_log_level=logging.DEBUG,
    log_file=os.path.splitext(__file__)[0] + '.log',
):
    logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    if console_log_level:
        # console
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setLevel(console_log_level)
        console_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(console_handler)
    if file_log_level:
        # file
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(file_log_level)
        file_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(file_handler)
    return logger

logger = get_logger(
    #console_log_level=logging.INFO,
    #console_log_level=None,
    log_file=LOG_FILE,
)
logger.debug('START')

class CmdLine:
    def __init__(
        self,
        logger=None,
    ):
        self.logger = logger

    def run(self, command, stdout=None, check=False, stdout_file=None, stdout_file_mode='w'):
        if stdout_file is None:
            result = subprocess.run(shlex.split(command), capture_output=True, text=True, stdout=stdout, check=check)
        else:
            with open(stdout_file, stdout_file_mode) as fo:
                result = subprocess.run(shlex.split(command), text=True, stdout=fo, check=check)
        self.logger.debug(f'type(result.stdout) = {type(result.stdout)}, result.stdout = {result.stdout}')
        self.logger.debug(f'type(result.stderr) = {type(result.stderr)}, result.stderr = {result.stderr}')
        self.logger.debug(f'type(result.returncode) = {type(result.returncode)}, result.returncode = {result.returncode}')
        return result

    def popen(self, cmd):
        self.logger.debug(f'cmd = {cmd}')
        p = subprocess.Popen(cmd, shell=True)
        returncode = p.wait()
        self.logger.debug(f'type(returncode) = {type(returncode)}, returncode = {returncode}')
        return returncode

    def get_file_line_count(self, f):
        p = subprocess.Popen(['wc', '-l', f], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        result, err = p.communicate()
        if p.returncode != 0:
            raise IOError(err)
        return int(result.strip().split()[0])

    def get_file_size(self, f):
        return os.path.getsize(f)

    def create_dir(self, d):
        os.makedirs(d, exist_ok=True)
        return True

    def get_dir_files(self, d, pattern):
        self.logger.debug(f'd = {d}, pattern = {pattern}')
        pathname = os.path.join(d, pattern)
        self.logger.debug(f'pathname = {pathname}')
        return glob.glob(os.path.join(d, pattern))

    def remove_file(self, f):
        self.logger.debug(f'(f = {f})')
        try:
            os.remove(f)
        except OSError:
            pass

def main():

    # 2023-04-16 17:28:50,860 fail2ban.actions        [790]: NOTICE  [postfix-sasl] Ban xxx.xxx.xxx.xxx
    ban_pattern = re.compile(r'^.*?\[postfix\-sasl\]\s+Ban\s+(.*?)\s*$')
    ip4_pattern = re.compile(r'^(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])$')

    cmd_line = CmdLine(
        logger=logger,
    )

    # create ipset for blocklist if not there yet
    command = f'{IPSET_EXE} create {IPSET_BLOCKLIST_NAME} hash:ip hashsize 4096'
    cmd_line.run(command)

    # get ipset blocklist members to avoid excessive calling 'ipset add <ip address>'
    blocklist_members = []
    command = f'{IPSET_EXE} list {IPSET_BLOCKLIST_NAME}'
    result = cmd_line.run(command)
    for line in result.stdout.split('\n'):
        if ':' in line:
            continue
        line = line.strip()
        if ip4_pattern.match(line):
            blocklist_members.append(line)
    blocklist_members_len = len(blocklist_members)
    logger.debug(f'blocklist_members_len = {blocklist_members_len}')

    # filter Fail2Ban logs Bans to BANS_FILE
    logger.debug(f'filter fail2ban.log Bans to bans file {BANS_FILE} ...')
    cmd_line.remove_file(BANS_FILE)
    files = cmd_line.get_dir_files(FAIL2BAN_FILES_DIR, 'fail2ban.log*')
    for f in files:
        logger.debug(f'iterating file {f}')
        grep = 'grep'
        if f.endswith('.gz'):
            grep = 'zgrep'
        command = f'{grep} NOTICE "{f}" | grep "postfix-sasl" | grep Ban >> {BANS_FILE}'
        cmd_line.popen(command)

    # get ips from BANS_FILE
    ban_ips = []
    with open(BANS_FILE, 'r') as fo:
        data = fo.read()
    lines = data.split('\n')
    for line in lines:
        m = ban_pattern.search(line)
        if m is None:
            continue
        ip = m.group(1)
        if ip4_pattern.match(ip) is None:
            continue
        # never
        if ip in NEVER_ADD_IPS:
            continue
        # no duplicates
        if ip in ban_ips:
            continue
        # not if already in members
        if ip in blocklist_members:
            continue
        ban_ips.append(ip)
    ban_ips_count = len(ban_ips)
    logger.debug(f'ban_ips_count = {ban_ips_count}')

    # add ips to ipset blocklist
    added_count = 0
    rejected_count = 0
    for ban_ip in ban_ips:
        command = f'{IPSET_EXE} add {IPSET_BLOCKLIST_NAME} {ban_ip}'
        result = cmd_line.run(command)
        returncode = result.returncode
        logger.debug(f'type(returncode) = {type(returncode)}, returncode = {returncode}')
        if returncode == 0:
            added_count += 1
            # also add ip with timestamp to ipset_added file
            dt = datetime.datetime.utcnow().isoformat()
            with open(IPSET_BLOCKLIST_LOG_FILE, 'a') as fo_add_file:
                fo_add_file.write(f'{dt} add {IPSET_BLOCKLIST_NAME} {ban_ip}\n')
        else:
            rejected_count += 1
    logger.info(f'added_count = {added_count}, rejected_count = {rejected_count}')

    logger.debug(f'saving ipset to {IPSET_SAVE_FILE}')
    command = f'{IPSET_EXE} save -file {IPSET_SAVE_FILE}'
    cmd_line.run(command)

    logger.debug(f'to save again: {command}')

main()


Samenvatting

Ik heb ipset nooit eerder gebruikt, het is goed om te leren hoe het werkt. Fail2Ban kan ook gebruikt worden met ipset, maar ik wilde mijn Fail2Ban configuratie niet veranderen. Met ipset en het Python script kan ik slechte IP address's verzamelen en automatisch de toegang tot mijn server blokkeren. En ik kan gemakkelijk meer informatie uit deze IP addresses halen met behulp van het Python pakket IP2Location en het command line utility whois.

Links / credits

Fail2ban
https://www.fail2ban.org

How to find the actual address of spoofed IPs?
https://security.stackexchange.com/questions/48523/how-to-find-the-actual-address-of-spoofed-ips

How to Make iptables Firewall Rules Persistent on Debian/Ubuntu
https://linuxiac.com/persistent-iptables-firewall-rules

IP2Location
https://pypi.org/project/IP2Location

ipset
https://ipset.netfilter.org/ipset.man.html

Persistent ipset for Ubuntu/Debian compatible with ufw and iptables-persistent
https://selivan.github.io/2018/07/27/ipset-save-with-ufw-and-iptables-persistent-and.html

WHOIS(1)
https://manpages.debian.org/bullseye/whois/whois.1.en.html

Laat een reactie achter

Reageer anoniem of log in om commentaar te geven.

Opmerkingen

Laat een antwoord achter

Antwoord anoniem of log in om te antwoorden.