Recoger y bloquear IP addresses con ipset y Python
Y una vez que tenemos una lista de IP addresses, podemos analizarlos con IP2Location y whois
Si tienes un servidor conectado a Internet, probablemente hayas visto esto en tus archivos de registro: montones de peticiones externas ilegales intentando acceder a tus servicios.
Yo mantengo un servidor Debian , y utilizo Fail2Ban para la prevención de intrusiones. Práctica estándar, instalar, configurar y olvidar. Como el servidor se caía a ciertas horas, decidí echar un vistazo más de cerca.
Me dedico sobre todo a la programación y esto es más bien un trabajo para administradores de sistemas. Sí, ellos son los especialistas, yo sólo soy un novato. Pero siempre es bueno aprender. En este post me limitaré a los escáneres de puertos, que atacan el puerto SMTP . Escribiré sobre ataques (D)DOS en otra ocasión. Mi servidor Debian está conectado a Internet usando sólo direcciones ipv4 (aún dudo en añadir ipv6). Si intentas esto por ti mismo:
ADVERTENCIA: USE BAJO SU PROPIO RIESGO / NO SE ENCIERRE OUT
Usando ipset y Python
Asumo que algunos de los IP addresses involucrados en el escaneo del puerto SMTP también pueden ser utilizados para otros ataques, pero ¿es esto realmente cierto? Lo que quería conseguir era recopilar la mayor cantidad posible de IP addresses escaneando el puerto SMTP . Para esto necesito bloquear permanentemente estos IP addresses y luego asumo (espero) que el escáner usará otro IP address para continuar el ataque, etc.
Con Fail2Ban, los IP addresses no son baneados permanentemente. Además Fail2Ban por defecto no bloquea todos los puertos sino sólo uno.
Como no quería tocar la configuración actual de Fail2Ban, tuve que usar iptables. Pero tampoco quería añadir largas listas de IP addresses a iptables, para luego eliminarlas. ipset al rescate. ipset es una extensión de iptables que nos permite crear reglas firewall que coinciden con conjuntos de IP addresses (o redes IP).Los conjuntos ipset se almacenan en estructuras de datos indexadas, haciendo las búsquedas muy eficientes.
Una vez que tenemos los IP addresses en un conjunto, por ejemplo un conjunto llamado blocklist_postfix_sasl, podemos bloquearlos añadiendo
una regla a iptables:
iptables -I INPUT -m set --match-set blocklist_network src -j DROP
Aquí estoy bloqueando todos los puertos, pero si quieres puedes cambiar la regla para bloquear sólo el puerto SMTP .
Yo uso Python en lugar de un script Bash por razones explicadas en posts anteriores. Bash es muy útil para pequeños scripts pero una vez que necesitas un poco más de control y/o procesamiento, estás mejor con Python. Con Python también podemos ejecutar programas de línea de comandos Linux usando subprocess.
Hacer que las reglas iptables y ipset persistan
Lo primero que debe hacer es hacer que sus reglas iptables persistan, lo que significa que estarán ahí después de un reinicio. También debe hacer que sus conjuntos ipset sean persistentes. Hay una trampa aquí porque los conjuntos ipset deben cargarse antes de que se cargue iptables. Puede encontrar información en internet sobre cómo hacer esto, he añadido dos enlaces más abajo.
Recopilando IP addresses
He creado un script Python , ver abajo, que hace lo siguiente:
- Escanea los archivos de registro de Fail2Ban cada pocas horas en busca de Banned postfix-sasl IP addresses.
- Crear un conjunto ipset llamado blocklist_postfix_sasl, si aún no se ha creado.
- Añade estas IP addresses a este conjunto, y también a un archivo.
- Guarde el conjunto para que los datos no se pierdan en un reinicio.
Cuando un IP address es añadido a un set, es inmediatamente efectivo (si el set ha sido añadido a iptables). Usando Cron, ejecuto este script Python cada tres horas.
El archivo con el prohibido IP addresses tiene líneas como:
2023-05-20T07:00:02.785796 add blocklist_postfix_sasl 123.xxx.xxx.xxx
2023-05-20T07:00:02.801597 add blocklist_postfix_sasl 456.xxx.xxx.xxx
Post-procesamiento
En una semana recopilé unos cuantos miles de IP addresses únicos bloqueados por Fail2Ban. Después copié esta lista a mi ordenador para analizarla. Como primer paso, quise crear un archivo CSV con filas de IP addresses y tanta información como fuera posible.
Puedes utilizar un servicio en línea como Abuseipdb.com para comprobar manualmente IP addresses, o escribir un script y utilizar su API. Pero su servicio gratuito tiene un límite de 1000 búsquedas al día.
Entonces encontré el paquete Python IP2Location con la base de datos gratuita:
IP2LOCATION-LITE-DB11.BIN
Con esto podemos obtener la geo-localización, como la información del país y la ciudad. Lo bueno es que tienes esta base de datos en tu máquina, sin búsquedas remotas.
De los 3900 IP addresses únicos, ahora pude ver que 3750 eran de China, lo cual es 96%, WTF?
A continuación, instalé 'whois' en mi máquina:
apt install whois
y usé esto, con un script, para obtener más información. Para reducir el número de búsquedas whois remotas, excluí por el momento los IP addresses de China. Después de un poco de codificación, pude extraer las organizaciones responsables de estos IP addresses y más.
Resultados
Como ya se ha mencionado, casi todas las IP addresses proceden de China. Ahora bien, ¿por qué querría China escanear los puertos de mi servidor? Una razón puede ser que escanear puertos es algo habitual para China. ¿Prepararse para la Tercera Guerra Mundial? ¿O tal vez esperan que bloquee estos IP addresses permanentemente para que la gente de China no pueda acceder a los sitios web que están alojados en mi servidor?
¿Puede alguien utilizar IP addresses falsos para atacar mi servidor, y sugerir el escaneo de puertos por parte de China? ¿Pero entonces tendrían que hackear los routers de mi proveedor de alojamiento? Todo es posible, no lo sé.
No puedo mencionar los resultados de los otros IP addresses en detail. Pero debo mencionar que una organización de alojamiento específico de los Países Bajos parece ser un proveedor de muchos hackers / escáneres de puertos. Hay impactantes reviews sobre ellos en internet. Personas inconscientes que comenzaron a utilizar su alojamiento y encontrar que sus sitios web están bloqueados en todo el mundo.
El código Python que recoge IP addresses
A continuación se muestra el código para la recogida de la IP addresses, si quieres probar a ti mismo. Observa la diferencia entre subprocess.run() y subprocess.popen(). Con esta última podemos ejecutar una línea de comandos completa, usando filtros, tuberías y redireccionamiento.
#!/usr/bin/python3
# manage_ipset_blocklist_postfix_sasl.py
import datetime
import glob
import logging
import os
import re
import shlex
import subprocess
import sys
# CONSTANTS
PROGRAM_NAME = 'manage_ipset_blocklist_postfix_sasl'
# never add these ips to ipset
NEVER_ADD_IPS = [
'xxx.xxx.xxx.xxx',
]
FAIL2BAN_FILES_DIR = '/var/log'
FILES_DIR = '/root/f2b_bans'
# log file
LOG_FILE = os.path.join(FILES_DIR, PROGRAM_NAME + '.log')
# temporary file(s)
BANS_FILE = os.path.join(FILES_DIR, 'bans')
# ipset
IPSET_EXE = '/usr/sbin/ipset'
IPSET_BLOCKLIST_NAME = 'blocklist_postfix_sasl'
IPSET_BLOCKLIST_LOG_FILE = os.path.join(FILES_DIR, IPSET_BLOCKLIST_NAME + '.log')
# this file is loaded by ipset on reboot
IPSET_SAVE_FILE = '/etc/iptables/ipset'
def get_logger(
console_log_level=logging.DEBUG,
file_log_level=logging.DEBUG,
log_file=os.path.splitext(__file__)[0] + '.log',
):
logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
if console_log_level:
# console
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(console_log_level)
console_handler.setFormatter(logging.Formatter(logger_format))
logger.addHandler(console_handler)
if file_log_level:
# file
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(file_log_level)
file_handler.setFormatter(logging.Formatter(logger_format))
logger.addHandler(file_handler)
return logger
logger = get_logger(
#console_log_level=logging.INFO,
#console_log_level=None,
log_file=LOG_FILE,
)
logger.debug('START')
class CmdLine:
def __init__(
self,
logger=None,
):
self.logger = logger
def run(self, command, stdout=None, check=False, stdout_file=None, stdout_file_mode='w'):
if stdout_file is None:
result = subprocess.run(shlex.split(command), capture_output=True, text=True, stdout=stdout, check=check)
else:
with open(stdout_file, stdout_file_mode) as fo:
result = subprocess.run(shlex.split(command), text=True, stdout=fo, check=check)
self.logger.debug(f'type(result.stdout) = {type(result.stdout)}, result.stdout = {result.stdout}')
self.logger.debug(f'type(result.stderr) = {type(result.stderr)}, result.stderr = {result.stderr}')
self.logger.debug(f'type(result.returncode) = {type(result.returncode)}, result.returncode = {result.returncode}')
return result
def popen(self, cmd):
self.logger.debug(f'cmd = {cmd}')
p = subprocess.Popen(cmd, shell=True)
returncode = p.wait()
self.logger.debug(f'type(returncode) = {type(returncode)}, returncode = {returncode}')
return returncode
def get_file_line_count(self, f):
p = subprocess.Popen(['wc', '-l', f], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result, err = p.communicate()
if p.returncode != 0:
raise IOError(err)
return int(result.strip().split()[0])
def get_file_size(self, f):
return os.path.getsize(f)
def create_dir(self, d):
os.makedirs(d, exist_ok=True)
return True
def get_dir_files(self, d, pattern):
self.logger.debug(f'd = {d}, pattern = {pattern}')
pathname = os.path.join(d, pattern)
self.logger.debug(f'pathname = {pathname}')
return glob.glob(os.path.join(d, pattern))
def remove_file(self, f):
self.logger.debug(f'(f = {f})')
try:
os.remove(f)
except OSError:
pass
def main():
# 2023-04-16 17:28:50,860 fail2ban.actions [790]: NOTICE [postfix-sasl] Ban xxx.xxx.xxx.xxx
ban_pattern = re.compile(r'^.*?\[postfix\-sasl\]\s+Ban\s+(.*?)\s*$')
ip4_pattern = re.compile(r'^(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])$')
cmd_line = CmdLine(
logger=logger,
)
# create ipset for blocklist if not there yet
command = f'{IPSET_EXE} create {IPSET_BLOCKLIST_NAME} hash:ip hashsize 4096'
cmd_line.run(command)
# get ipset blocklist members to avoid excessive calling 'ipset add <ip address>'
blocklist_members = []
command = f'{IPSET_EXE} list {IPSET_BLOCKLIST_NAME}'
result = cmd_line.run(command)
for line in result.stdout.split('\n'):
if ':' in line:
continue
line = line.strip()
if ip4_pattern.match(line):
blocklist_members.append(line)
blocklist_members_len = len(blocklist_members)
logger.debug(f'blocklist_members_len = {blocklist_members_len}')
# filter Fail2Ban logs Bans to BANS_FILE
logger.debug(f'filter fail2ban.log Bans to bans file {BANS_FILE} ...')
cmd_line.remove_file(BANS_FILE)
files = cmd_line.get_dir_files(FAIL2BAN_FILES_DIR, 'fail2ban.log*')
for f in files:
logger.debug(f'iterating file {f}')
grep = 'grep'
if f.endswith('.gz'):
grep = 'zgrep'
command = f'{grep} NOTICE "{f}" | grep "postfix-sasl" | grep Ban >> {BANS_FILE}'
cmd_line.popen(command)
# get ips from BANS_FILE
ban_ips = []
with open(BANS_FILE, 'r') as fo:
data = fo.read()
lines = data.split('\n')
for line in lines:
m = ban_pattern.search(line)
if m is None:
continue
ip = m.group(1)
if ip4_pattern.match(ip) is None:
continue
# never
if ip in NEVER_ADD_IPS:
continue
# no duplicates
if ip in ban_ips:
continue
# not if already in members
if ip in blocklist_members:
continue
ban_ips.append(ip)
ban_ips_count = len(ban_ips)
logger.debug(f'ban_ips_count = {ban_ips_count}')
# add ips to ipset blocklist
added_count = 0
rejected_count = 0
for ban_ip in ban_ips:
command = f'{IPSET_EXE} add {IPSET_BLOCKLIST_NAME} {ban_ip}'
result = cmd_line.run(command)
returncode = result.returncode
logger.debug(f'type(returncode) = {type(returncode)}, returncode = {returncode}')
if returncode == 0:
added_count += 1
# also add ip with timestamp to ipset_added file
dt = datetime.datetime.utcnow().isoformat()
with open(IPSET_BLOCKLIST_LOG_FILE, 'a') as fo_add_file:
fo_add_file.write(f'{dt} add {IPSET_BLOCKLIST_NAME} {ban_ip}\n')
else:
rejected_count += 1
logger.info(f'added_count = {added_count}, rejected_count = {rejected_count}')
logger.debug(f'saving ipset to {IPSET_SAVE_FILE}')
command = f'{IPSET_EXE} save -file {IPSET_SAVE_FILE}'
cmd_line.run(command)
logger.debug(f'to save again: {command}')
main()
Resumen
Nunca antes había usado ipset, es bueno aprender a usarlo. Fail2Ban también puede ser usado con ipset, pero no quise cambiar mi configuración de Fail2Ban. Con ipset y el script Python puedo recolectar IP addresses malos y bloquear automáticamente el acceso a mi servidor. Y puedo obtener fácilmente más información de estos IP addresses utilizando el paquete Python IP2Location y la utilidad de línea de comandos whois.
Enlaces / créditos
Fail2ban
https://www.fail2ban.org
How to find the actual address of spoofed IPs?
https://security.stackexchange.com/questions/48523/how-to-find-the-actual-address-of-spoofed-ips
How to Make iptables Firewall Rules Persistent on Debian/Ubuntu
https://linuxiac.com/persistent-iptables-firewall-rules
IP2Location
https://pypi.org/project/IP2Location
ipset
https://ipset.netfilter.org/ipset.man.html
Persistent ipset for Ubuntu/Debian compatible with ufw and iptables-persistent
https://selivan.github.io/2018/07/27/ipset-save-with-ufw-and-iptables-persistent-and.html
WHOIS(1)
https://manpages.debian.org/bullseye/whois/whois.1.en.html
Leer más
Internet Security testing
Recientes
- Uso de Ingress para acceder a RabbitMQ en un clúster Microk8s
- Galería de vídeo simple con Flask, Jinja, Bootstrap y JQuery
- Programación básica de trabajos con APScheduler
- Un conmutador de base de datos con HAProxy y el HAProxy Runtime API
- Docker Swarm rolling updates
- Cómo ocultar las claves primarias de la base de datos UUID de su aplicación web
Más vistos
- Usando PyInstaller y Cython para crear un ejecutable de Python
- Reducir los tiempos de respuesta de las páginas de un sitio Flask SQLAlchemy web
- Usando Python's pyOpenSSL para verificar los certificados SSL descargados de un host
- Conectarse a un servicio en un host Docker desde un contenedor Docker
- Usando UUIDs en lugar de Integer Autoincrement Primary Keys con SQLAlchemy y MariaDb
- SQLAlchemy: Uso de Cascade Deletes para eliminar objetos relacionados