angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Collecter et bloquer IP addresses avec ipset et Python

Et une fois que nous avons une liste de IP addresses, nous pouvons les analyser avec IP2Location et whois.

21 mai 2023
post main image
https://unsplash.com/@enginakyurt

Si vous avez un serveur connecté à l'internet, vous avez probablement vu ceci dans vos fichiers journaux : beaucoup de requêtes externes illégales essayant d'accéder à vos services.

Je maintiens un serveur Debian , et j'utilise Fail2Ban pour la prévention des intrusions. Pratique standard, installer, configurer et oublier. Comme le serveur était mis hors service à certaines heures, j'ai décidé d'y regarder de plus près.

Je m'occupe surtout de programmation et ce travail est plutôt réservé aux administrateurs de système. Oui, ce sont eux les spécialistes, je ne suis qu'un novice. Mais c'est toujours agréable d'apprendre. Dans ce billet, je me limiterai aux scanners de ports qui attaquent le port SMTP . Je parlerai des attaques (D)DOS une autre fois. Mon serveur Debian est connecté à l'Internet en utilisant uniquement des adresses ipv4 (j'hésite encore à ajouter l'ipv6). Si vous essayez cela vous-même :

WARNING : USE AT YOUR OWN RISK / DO NOT LOCK YOURSELF OUT

Utilisation d'ipset et de Python

Je suppose que certaines des IP address impliquées dans l'analyse du port SMTP peuvent également être utilisées pour d'autres attaques, mais est-ce vraiment vrai ? Ce que je voulais faire, c'était collecter autant de IP addresses que possible en scannant le port SMTP . Pour ce faire, je dois bloquer de manière permanente ces IP addresses, puis je suppose (j'espère) que le scanner utilisera un autre IP address pour poursuivre l'attaque, etc.

Avec Fail2Ban, les IP addresses ne sont pas bannis de façon permanente. De plus, par défaut, Fail2Ban ne bloque pas tous les ports, mais seulement un seul.

Comme je ne voulais pas toucher à la configuration actuelle de Fail2Ban, j'ai dû utiliser iptables. Mais je ne voulais pas non plus ajouter de longues listes de IP addresses à iptables, et les supprimer plus tard. ipset à la rescousse. ipset est une extension d'iptables qui nous permet de créer des règles firewall qui correspondent à des ensembles de IP addresses (ou réseaux IP).Les ensembles d'ipset sont stockés dans des structures de données indexées, ce qui rend les recherches très efficaces.
Une fois que nous avons les IP addresses dans un ensemble, par exemple un ensemble appelé blocklist_postfix_sasl, nous pouvons les bloquer en ajoutant
une règle à iptables :

iptables -I INPUT -m set --match-set blocklist_network src -j DROP

Ici, je bloque tous les ports, mais si vous le souhaitez, vous pouvez modifier la règle pour ne bloquer que le port SMTP .

J'utilise Python au lieu d'un script Bash pour les raisons expliquées dans les posts précédents. Bash est très utile pour les petits scripts mais dès que vous avez besoin d'un peu plus de contrôle et/ou de traitement, il vaut mieux utiliser Python. Avec Python , nous pouvons également exécuter des programmes en ligne de commande Linux en utilisant subprocess.

Rendre les règles iptables et ipset persistantes

La première chose à faire est de rendre vos règles iptables persistantes, ce qui signifie qu'elles seront présentes après un redémarrage. Vous devez également rendre vos jeux ipset persistants. Il y a un problème ici car les jeux ipset doivent être chargés avant iptables. Vous pouvez trouver des informations sur internet sur la manière de procéder, j'ai ajouté deux liens ci-dessous.

Collecte de IP addresses

J'ai créé un script Python , voir ci-dessous, qui fait ce qui suit :

  • Analyser les fichiers journaux de Fail2Ban toutes les quelques heures à la recherche de Banned postfix-sasl IP addresses.
  • Créer un jeu d'ipsets nommé blocklist_postfix_sasl, s'il n'a pas encore été créé.
  • Ajoutez ces IP addresses à cet ensemble, ainsi qu'à un fichier.
  • Sauvegardez le jeu pour que les données ne soient pas perdues lors d'un redémarrage.

Lorsqu'une IP address est ajoutée à un ensemble, elle est immédiatement effective (si l'ensemble a été ajouté à iptables). En utilisant Cron, je lance ce script Python toutes les trois heures.

Le fichier avec le banni IP addresses contient des lignes comme :

2023-05-20T07:00:02.785796 add blocklist_postfix_sasl 123.xxx.xxx.xxx
2023-05-20T07:00:02.801597 add blocklist_postfix_sasl 456.xxx.xxx.xxx

Post-traitement

En une semaine, j'ai collecté quelques milliers de IP addresses uniques bloqués par Fail2Ban. J'ai ensuite copié cette liste sur mon ordinateur pour l'analyser. Dans un premier temps, j'ai voulu créer un fichier CSV avec des lignes de IP addresses et autant d'informations que possible.

Vous pouvez utiliser un service en ligne comme Abuseipdb.com pour vérifier manuellement les IP addresses, ou écrire un script et utiliser leur API. Mais leur service gratuit est limité à 1000 recherches par jour.

J'ai alors trouvé le paquet Python IP2Location avec la base de données gratuite :

IP2LOCATION-LITE-DB11.BIN

Ce paquet permet d'obtenir la géolocalisation, comme les informations sur le pays et la ville. L'avantage est que vous avez cette base de données sur votre machine, pas de recherche à distance.

Sur les 3900 IP addresses uniques, j'ai pu voir que 3750 venaient de Chine, soit 96%, WTF ?

Ensuite, j'ai installé 'whois' sur ma machine :

apt install whois

et je l'ai utilisé, avec un script, pour obtenir plus d'informations. Pour réduire le nombre de recherches whois à distance, j'ai exclu les IP address de Chine pour le moment. Après un peu de codage, j'ai pu extraire les organisations responsables de ces IP addresses et plus encore.

Résultats

Comme indiqué ci-dessus, presque toutes les IP addresses proviennent de Chine. Pourquoi la Chine voudrait-elle scanner les ports de mon serveur ? L'une des raisons peut être que l'analyse des ports est une pratique courante pour la Chine. Se préparer à la troisième guerre mondiale ? Ou peut-être espère-t-elle que je bloquerai ces IP addresses de façon permanente afin que les Chinois ne puissent pas accéder aux sites web hébergés sur mon serveur ?

Quelqu'un peut-il utiliser des IP addresses usurpés pour attaquer mon serveur et suggérer un balayage des ports par la Chine ? Mais il faudrait alors qu'ils piratent les routeurs de mon hébergeur ? Tout est possible, je ne sais pas.

Je ne peux pas mentionner les résultats des autres IP addresses dans detail. Mais je dois mentionner qu'une organisation d'hébergement spécifique des Pays-Bas semble être un fournisseur de nombreux pirates / scanners de ports. Il y a des reviews choquantes à leur sujet sur l'internet. Des personnes non averties qui ont commencé à utiliser leur hébergement et qui découvrent que leurs sites web sont bloqués dans le monde entier.

Le code Python collectant les IP addresses

Vous trouverez ci-dessous le code permettant de collecter les IP addresses, si vous voulez essayer vous-même. Notez la différence entre subprocess.run() et subprocess.popen(). Avec cette dernière, nous pouvons lancer une ligne de commande complète, en utilisant des filtres, des pipes et des redirections.

#!/usr/bin/python3
# manage_ipset_blocklist_postfix_sasl.py
import datetime
import glob
import logging
import os
import re
import shlex
import subprocess
import sys

# CONSTANTS
PROGRAM_NAME = 'manage_ipset_blocklist_postfix_sasl'
# never add these ips to ipset
NEVER_ADD_IPS = [
    'xxx.xxx.xxx.xxx',
]
FAIL2BAN_FILES_DIR = '/var/log'
FILES_DIR = '/root/f2b_bans'
# log file
LOG_FILE = os.path.join(FILES_DIR, PROGRAM_NAME + '.log')
# temporary file(s)
BANS_FILE = os.path.join(FILES_DIR, 'bans')
# ipset
IPSET_EXE = '/usr/sbin/ipset'
IPSET_BLOCKLIST_NAME = 'blocklist_postfix_sasl'
IPSET_BLOCKLIST_LOG_FILE = os.path.join(FILES_DIR, IPSET_BLOCKLIST_NAME + '.log')
# this file is loaded by ipset on reboot
IPSET_SAVE_FILE = '/etc/iptables/ipset'

def get_logger(
    console_log_level=logging.DEBUG,
    file_log_level=logging.DEBUG,
    log_file=os.path.splitext(__file__)[0] + '.log',
):
    logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    if console_log_level:
        # console
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setLevel(console_log_level)
        console_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(console_handler)
    if file_log_level:
        # file
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(file_log_level)
        file_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(file_handler)
    return logger

logger = get_logger(
    #console_log_level=logging.INFO,
    #console_log_level=None,
    log_file=LOG_FILE,
)
logger.debug('START')

class CmdLine:
    def __init__(
        self,
        logger=None,
    ):
        self.logger = logger

    def run(self, command, stdout=None, check=False, stdout_file=None, stdout_file_mode='w'):
        if stdout_file is None:
            result = subprocess.run(shlex.split(command), capture_output=True, text=True, stdout=stdout, check=check)
        else:
            with open(stdout_file, stdout_file_mode) as fo:
                result = subprocess.run(shlex.split(command), text=True, stdout=fo, check=check)
        self.logger.debug(f'type(result.stdout) = {type(result.stdout)}, result.stdout = {result.stdout}')
        self.logger.debug(f'type(result.stderr) = {type(result.stderr)}, result.stderr = {result.stderr}')
        self.logger.debug(f'type(result.returncode) = {type(result.returncode)}, result.returncode = {result.returncode}')
        return result

    def popen(self, cmd):
        self.logger.debug(f'cmd = {cmd}')
        p = subprocess.Popen(cmd, shell=True)
        returncode = p.wait()
        self.logger.debug(f'type(returncode) = {type(returncode)}, returncode = {returncode}')
        return returncode

    def get_file_line_count(self, f):
        p = subprocess.Popen(['wc', '-l', f], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        result, err = p.communicate()
        if p.returncode != 0:
            raise IOError(err)
        return int(result.strip().split()[0])

    def get_file_size(self, f):
        return os.path.getsize(f)

    def create_dir(self, d):
        os.makedirs(d, exist_ok=True)
        return True

    def get_dir_files(self, d, pattern):
        self.logger.debug(f'd = {d}, pattern = {pattern}')
        pathname = os.path.join(d, pattern)
        self.logger.debug(f'pathname = {pathname}')
        return glob.glob(os.path.join(d, pattern))

    def remove_file(self, f):
        self.logger.debug(f'(f = {f})')
        try:
            os.remove(f)
        except OSError:
            pass

def main():

    # 2023-04-16 17:28:50,860 fail2ban.actions        [790]: NOTICE  [postfix-sasl] Ban xxx.xxx.xxx.xxx
    ban_pattern = re.compile(r'^.*?\[postfix\-sasl\]\s+Ban\s+(.*?)\s*$')
    ip4_pattern = re.compile(r'^(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])$')

    cmd_line = CmdLine(
        logger=logger,
    )

    # create ipset for blocklist if not there yet
    command = f'{IPSET_EXE} create {IPSET_BLOCKLIST_NAME} hash:ip hashsize 4096'
    cmd_line.run(command)

    # get ipset blocklist members to avoid excessive calling 'ipset add <ip address>'
    blocklist_members = []
    command = f'{IPSET_EXE} list {IPSET_BLOCKLIST_NAME}'
    result = cmd_line.run(command)
    for line in result.stdout.split('\n'):
        if ':' in line:
            continue
        line = line.strip()
        if ip4_pattern.match(line):
            blocklist_members.append(line)
    blocklist_members_len = len(blocklist_members)
    logger.debug(f'blocklist_members_len = {blocklist_members_len}')

    # filter Fail2Ban logs Bans to BANS_FILE
    logger.debug(f'filter fail2ban.log Bans to bans file {BANS_FILE} ...')
    cmd_line.remove_file(BANS_FILE)
    files = cmd_line.get_dir_files(FAIL2BAN_FILES_DIR, 'fail2ban.log*')
    for f in files:
        logger.debug(f'iterating file {f}')
        grep = 'grep'
        if f.endswith('.gz'):
            grep = 'zgrep'
        command = f'{grep} NOTICE "{f}" | grep "postfix-sasl" | grep Ban >> {BANS_FILE}'
        cmd_line.popen(command)

    # get ips from BANS_FILE
    ban_ips = []
    with open(BANS_FILE, 'r') as fo:
        data = fo.read()
    lines = data.split('\n')
    for line in lines:
        m = ban_pattern.search(line)
        if m is None:
            continue
        ip = m.group(1)
        if ip4_pattern.match(ip) is None:
            continue
        # never
        if ip in NEVER_ADD_IPS:
            continue
        # no duplicates
        if ip in ban_ips:
            continue
        # not if already in members
        if ip in blocklist_members:
            continue
        ban_ips.append(ip)
    ban_ips_count = len(ban_ips)
    logger.debug(f'ban_ips_count = {ban_ips_count}')

    # add ips to ipset blocklist
    added_count = 0
    rejected_count = 0
    for ban_ip in ban_ips:
        command = f'{IPSET_EXE} add {IPSET_BLOCKLIST_NAME} {ban_ip}'
        result = cmd_line.run(command)
        returncode = result.returncode
        logger.debug(f'type(returncode) = {type(returncode)}, returncode = {returncode}')
        if returncode == 0:
            added_count += 1
            # also add ip with timestamp to ipset_added file
            dt = datetime.datetime.utcnow().isoformat()
            with open(IPSET_BLOCKLIST_LOG_FILE, 'a') as fo_add_file:
                fo_add_file.write(f'{dt} add {IPSET_BLOCKLIST_NAME} {ban_ip}\n')
        else:
            rejected_count += 1
    logger.info(f'added_count = {added_count}, rejected_count = {rejected_count}')

    logger.debug(f'saving ipset to {IPSET_SAVE_FILE}')
    command = f'{IPSET_EXE} save -file {IPSET_SAVE_FILE}'
    cmd_line.run(command)

    logger.debug(f'to save again: {command}')

main()


Résumé

Je n'avais jamais utilisé ipset auparavant, il est bon d'apprendre à l'utiliser. Fail2Ban peut également être utilisé avec ipset, mais je ne voulais pas changer ma configuration Fail2Ban. Avec ipset et le script Python , je peux collecter les mauvais IP addresses et bloquer automatiquement l'accès à mon serveur. Et je peux facilement obtenir plus d'informations à partir de ces IP address en utilisant le paquetage IP2Location de Python et l'utilitaire de ligne de commande whois.

Liens / crédits

Fail2ban
https://www.fail2ban.org

How to find the actual address of spoofed IPs?
https://security.stackexchange.com/questions/48523/how-to-find-the-actual-address-of-spoofed-ips

How to Make iptables Firewall Rules Persistent on Debian/Ubuntu
https://linuxiac.com/persistent-iptables-firewall-rules

IP2Location
https://pypi.org/project/IP2Location

ipset
https://ipset.netfilter.org/ipset.man.html

Persistent ipset for Ubuntu/Debian compatible with ufw and iptables-persistent
https://selivan.github.io/2018/07/27/ipset-save-with-ufw-and-iptables-persistent-and.html

WHOIS(1)
https://manpages.debian.org/bullseye/whois/whois.1.en.html

Laissez un commentaire

Commentez anonymement ou connectez-vous pour commenter.

Commentaires

Laissez une réponse

Répondez de manière anonyme ou connectez-vous pour répondre.