angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Recoger y bloquear IP addresses con ipset y Python

Y una vez que tenemos una lista de IP addresses, podemos analizarlos con IP2Location y whois

21 mayo 2023
post main image
https://unsplash.com/@enginakyurt

Si tienes un servidor conectado a Internet, probablemente hayas visto esto en tus archivos de registro: montones de peticiones externas ilegales intentando acceder a tus servicios.

Yo mantengo un servidor Debian , y utilizo Fail2Ban para la prevención de intrusiones. Práctica estándar, instalar, configurar y olvidar. Como el servidor se caía a ciertas horas, decidí echar un vistazo más de cerca.

Me dedico sobre todo a la programación y esto es más bien un trabajo para administradores de sistemas. Sí, ellos son los especialistas, yo sólo soy un novato. Pero siempre es bueno aprender. En este post me limitaré a los escáneres de puertos, que atacan el puerto SMTP . Escribiré sobre ataques (D)DOS en otra ocasión. Mi servidor Debian está conectado a Internet usando sólo direcciones ipv4 (aún dudo en añadir ipv6). Si intentas esto por ti mismo:

ADVERTENCIA: USE BAJO SU PROPIO RIESGO / NO SE ENCIERRE OUT

Usando ipset y Python

Asumo que algunos de los IP addresses involucrados en el escaneo del puerto SMTP también pueden ser utilizados para otros ataques, pero ¿es esto realmente cierto? Lo que quería conseguir era recopilar la mayor cantidad posible de IP addresses escaneando el puerto SMTP . Para esto necesito bloquear permanentemente estos IP addresses y luego asumo (espero) que el escáner usará otro IP address para continuar el ataque, etc.

Con Fail2Ban, los IP addresses no son baneados permanentemente. Además Fail2Ban por defecto no bloquea todos los puertos sino sólo uno.

Como no quería tocar la configuración actual de Fail2Ban, tuve que usar iptables. Pero tampoco quería añadir largas listas de IP addresses a iptables, para luego eliminarlas. ipset al rescate. ipset es una extensión de iptables que nos permite crear reglas firewall que coinciden con conjuntos de IP addresses (o redes IP).Los conjuntos ipset se almacenan en estructuras de datos indexadas, haciendo las búsquedas muy eficientes.
Una vez que tenemos los IP addresses en un conjunto, por ejemplo un conjunto llamado blocklist_postfix_sasl, podemos bloquearlos añadiendo
una regla a iptables:

iptables -I INPUT -m set --match-set blocklist_network src -j DROP

Aquí estoy bloqueando todos los puertos, pero si quieres puedes cambiar la regla para bloquear sólo el puerto SMTP .

Yo uso Python en lugar de un script Bash por razones explicadas en posts anteriores. Bash es muy útil para pequeños scripts pero una vez que necesitas un poco más de control y/o procesamiento, estás mejor con Python. Con Python también podemos ejecutar programas de línea de comandos Linux usando subprocess.

Hacer que las reglas iptables y ipset persistan

Lo primero que debe hacer es hacer que sus reglas iptables persistan, lo que significa que estarán ahí después de un reinicio. También debe hacer que sus conjuntos ipset sean persistentes. Hay una trampa aquí porque los conjuntos ipset deben cargarse antes de que se cargue iptables. Puede encontrar información en internet sobre cómo hacer esto, he añadido dos enlaces más abajo.

Recopilando IP addresses

He creado un script Python , ver abajo, que hace lo siguiente:

  • Escanea los archivos de registro de Fail2Ban cada pocas horas en busca de Banned postfix-sasl IP addresses.
  • Crear un conjunto ipset llamado blocklist_postfix_sasl, si aún no se ha creado.
  • Añade estas IP addresses a este conjunto, y también a un archivo.
  • Guarde el conjunto para que los datos no se pierdan en un reinicio.

Cuando un IP address es añadido a un set, es inmediatamente efectivo (si el set ha sido añadido a iptables). Usando Cron, ejecuto este script Python cada tres horas.

El archivo con el prohibido IP addresses tiene líneas como:

2023-05-20T07:00:02.785796 add blocklist_postfix_sasl 123.xxx.xxx.xxx
2023-05-20T07:00:02.801597 add blocklist_postfix_sasl 456.xxx.xxx.xxx

Post-procesamiento

En una semana recopilé unos cuantos miles de IP addresses únicos bloqueados por Fail2Ban. Después copié esta lista a mi ordenador para analizarla. Como primer paso, quise crear un archivo CSV con filas de IP addresses y tanta información como fuera posible.

Puedes utilizar un servicio en línea como Abuseipdb.com para comprobar manualmente IP addresses, o escribir un script y utilizar su API. Pero su servicio gratuito tiene un límite de 1000 búsquedas al día.

Entonces encontré el paquete Python IP2Location con la base de datos gratuita:

IP2LOCATION-LITE-DB11.BIN

Con esto podemos obtener la geo-localización, como la información del país y la ciudad. Lo bueno es que tienes esta base de datos en tu máquina, sin búsquedas remotas.

De los 3900 IP addresses únicos, ahora pude ver que 3750 eran de China, lo cual es 96%, WTF?

A continuación, instalé 'whois' en mi máquina:

apt install whois

y usé esto, con un script, para obtener más información. Para reducir el número de búsquedas whois remotas, excluí por el momento los IP addresses de China. Después de un poco de codificación, pude extraer las organizaciones responsables de estos IP addresses y más.

Resultados

Como ya se ha mencionado, casi todas las IP addresses proceden de China. Ahora bien, ¿por qué querría China escanear los puertos de mi servidor? Una razón puede ser que escanear puertos es algo habitual para China. ¿Prepararse para la Tercera Guerra Mundial? ¿O tal vez esperan que bloquee estos IP addresses permanentemente para que la gente de China no pueda acceder a los sitios web que están alojados en mi servidor?

¿Puede alguien utilizar IP addresses falsos para atacar mi servidor, y sugerir el escaneo de puertos por parte de China? ¿Pero entonces tendrían que hackear los routers de mi proveedor de alojamiento? Todo es posible, no lo sé.

No puedo mencionar los resultados de los otros IP addresses en detail. Pero debo mencionar que una organización de alojamiento específico de los Países Bajos parece ser un proveedor de muchos hackers / escáneres de puertos. Hay impactantes reviews sobre ellos en internet. Personas inconscientes que comenzaron a utilizar su alojamiento y encontrar que sus sitios web están bloqueados en todo el mundo.

El código Python que recoge IP addresses

A continuación se muestra el código para la recogida de la IP addresses, si quieres probar a ti mismo. Observa la diferencia entre subprocess.run() y subprocess.popen(). Con esta última podemos ejecutar una línea de comandos completa, usando filtros, tuberías y redireccionamiento.

#!/usr/bin/python3
# manage_ipset_blocklist_postfix_sasl.py
import datetime
import glob
import logging
import os
import re
import shlex
import subprocess
import sys

# CONSTANTS
PROGRAM_NAME = 'manage_ipset_blocklist_postfix_sasl'
# never add these ips to ipset
NEVER_ADD_IPS = [
    'xxx.xxx.xxx.xxx',
]
FAIL2BAN_FILES_DIR = '/var/log'
FILES_DIR = '/root/f2b_bans'
# log file
LOG_FILE = os.path.join(FILES_DIR, PROGRAM_NAME + '.log')
# temporary file(s)
BANS_FILE = os.path.join(FILES_DIR, 'bans')
# ipset
IPSET_EXE = '/usr/sbin/ipset'
IPSET_BLOCKLIST_NAME = 'blocklist_postfix_sasl'
IPSET_BLOCKLIST_LOG_FILE = os.path.join(FILES_DIR, IPSET_BLOCKLIST_NAME + '.log')
# this file is loaded by ipset on reboot
IPSET_SAVE_FILE = '/etc/iptables/ipset'

def get_logger(
    console_log_level=logging.DEBUG,
    file_log_level=logging.DEBUG,
    log_file=os.path.splitext(__file__)[0] + '.log',
):
    logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    if console_log_level:
        # console
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setLevel(console_log_level)
        console_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(console_handler)
    if file_log_level:
        # file
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(file_log_level)
        file_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(file_handler)
    return logger

logger = get_logger(
    #console_log_level=logging.INFO,
    #console_log_level=None,
    log_file=LOG_FILE,
)
logger.debug('START')

class CmdLine:
    def __init__(
        self,
        logger=None,
    ):
        self.logger = logger

    def run(self, command, stdout=None, check=False, stdout_file=None, stdout_file_mode='w'):
        if stdout_file is None:
            result = subprocess.run(shlex.split(command), capture_output=True, text=True, stdout=stdout, check=check)
        else:
            with open(stdout_file, stdout_file_mode) as fo:
                result = subprocess.run(shlex.split(command), text=True, stdout=fo, check=check)
        self.logger.debug(f'type(result.stdout) = {type(result.stdout)}, result.stdout = {result.stdout}')
        self.logger.debug(f'type(result.stderr) = {type(result.stderr)}, result.stderr = {result.stderr}')
        self.logger.debug(f'type(result.returncode) = {type(result.returncode)}, result.returncode = {result.returncode}')
        return result

    def popen(self, cmd):
        self.logger.debug(f'cmd = {cmd}')
        p = subprocess.Popen(cmd, shell=True)
        returncode = p.wait()
        self.logger.debug(f'type(returncode) = {type(returncode)}, returncode = {returncode}')
        return returncode

    def get_file_line_count(self, f):
        p = subprocess.Popen(['wc', '-l', f], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        result, err = p.communicate()
        if p.returncode != 0:
            raise IOError(err)
        return int(result.strip().split()[0])

    def get_file_size(self, f):
        return os.path.getsize(f)

    def create_dir(self, d):
        os.makedirs(d, exist_ok=True)
        return True

    def get_dir_files(self, d, pattern):
        self.logger.debug(f'd = {d}, pattern = {pattern}')
        pathname = os.path.join(d, pattern)
        self.logger.debug(f'pathname = {pathname}')
        return glob.glob(os.path.join(d, pattern))

    def remove_file(self, f):
        self.logger.debug(f'(f = {f})')
        try:
            os.remove(f)
        except OSError:
            pass

def main():

    # 2023-04-16 17:28:50,860 fail2ban.actions        [790]: NOTICE  [postfix-sasl] Ban xxx.xxx.xxx.xxx
    ban_pattern = re.compile(r'^.*?\[postfix\-sasl\]\s+Ban\s+(.*?)\s*$')
    ip4_pattern = re.compile(r'^(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])$')

    cmd_line = CmdLine(
        logger=logger,
    )

    # create ipset for blocklist if not there yet
    command = f'{IPSET_EXE} create {IPSET_BLOCKLIST_NAME} hash:ip hashsize 4096'
    cmd_line.run(command)

    # get ipset blocklist members to avoid excessive calling 'ipset add <ip address>'
    blocklist_members = []
    command = f'{IPSET_EXE} list {IPSET_BLOCKLIST_NAME}'
    result = cmd_line.run(command)
    for line in result.stdout.split('\n'):
        if ':' in line:
            continue
        line = line.strip()
        if ip4_pattern.match(line):
            blocklist_members.append(line)
    blocklist_members_len = len(blocklist_members)
    logger.debug(f'blocklist_members_len = {blocklist_members_len}')

    # filter Fail2Ban logs Bans to BANS_FILE
    logger.debug(f'filter fail2ban.log Bans to bans file {BANS_FILE} ...')
    cmd_line.remove_file(BANS_FILE)
    files = cmd_line.get_dir_files(FAIL2BAN_FILES_DIR, 'fail2ban.log*')
    for f in files:
        logger.debug(f'iterating file {f}')
        grep = 'grep'
        if f.endswith('.gz'):
            grep = 'zgrep'
        command = f'{grep} NOTICE "{f}" | grep "postfix-sasl" | grep Ban >> {BANS_FILE}'
        cmd_line.popen(command)

    # get ips from BANS_FILE
    ban_ips = []
    with open(BANS_FILE, 'r') as fo:
        data = fo.read()
    lines = data.split('\n')
    for line in lines:
        m = ban_pattern.search(line)
        if m is None:
            continue
        ip = m.group(1)
        if ip4_pattern.match(ip) is None:
            continue
        # never
        if ip in NEVER_ADD_IPS:
            continue
        # no duplicates
        if ip in ban_ips:
            continue
        # not if already in members
        if ip in blocklist_members:
            continue
        ban_ips.append(ip)
    ban_ips_count = len(ban_ips)
    logger.debug(f'ban_ips_count = {ban_ips_count}')

    # add ips to ipset blocklist
    added_count = 0
    rejected_count = 0
    for ban_ip in ban_ips:
        command = f'{IPSET_EXE} add {IPSET_BLOCKLIST_NAME} {ban_ip}'
        result = cmd_line.run(command)
        returncode = result.returncode
        logger.debug(f'type(returncode) = {type(returncode)}, returncode = {returncode}')
        if returncode == 0:
            added_count += 1
            # also add ip with timestamp to ipset_added file
            dt = datetime.datetime.utcnow().isoformat()
            with open(IPSET_BLOCKLIST_LOG_FILE, 'a') as fo_add_file:
                fo_add_file.write(f'{dt} add {IPSET_BLOCKLIST_NAME} {ban_ip}\n')
        else:
            rejected_count += 1
    logger.info(f'added_count = {added_count}, rejected_count = {rejected_count}')

    logger.debug(f'saving ipset to {IPSET_SAVE_FILE}')
    command = f'{IPSET_EXE} save -file {IPSET_SAVE_FILE}'
    cmd_line.run(command)

    logger.debug(f'to save again: {command}')

main()


Resumen

Nunca antes había usado ipset, es bueno aprender a usarlo. Fail2Ban también puede ser usado con ipset, pero no quise cambiar mi configuración de Fail2Ban. Con ipset y el script Python puedo recolectar IP addresses malos y bloquear automáticamente el acceso a mi servidor. Y puedo obtener fácilmente más información de estos IP addresses utilizando el paquete Python IP2Location y la utilidad de línea de comandos whois.

Enlaces / créditos

Fail2ban
https://www.fail2ban.org

How to find the actual address of spoofed IPs?
https://security.stackexchange.com/questions/48523/how-to-find-the-actual-address-of-spoofed-ips

How to Make iptables Firewall Rules Persistent on Debian/Ubuntu
https://linuxiac.com/persistent-iptables-firewall-rules

IP2Location
https://pypi.org/project/IP2Location

ipset
https://ipset.netfilter.org/ipset.man.html

Persistent ipset for Ubuntu/Debian compatible with ufw and iptables-persistent
https://selivan.github.io/2018/07/27/ipset-save-with-ufw-and-iptables-persistent-and.html

WHOIS(1)
https://manpages.debian.org/bullseye/whois/whois.1.en.html

Deje un comentario

Comente de forma anónima o inicie sesión para comentar.

Comentarios

Deje una respuesta.

Responda de forma anónima o inicie sesión para responder.