angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Сбор и блокировка IP addresses с помощью ipset и Python

И как только мы получим список IP addresses, мы можем проанализировать их с помощью IP2Location и whois

21 мая 2023
post main image
https://unsplash.com/@enginakyurt

Если у вас есть сервер, подключенный к Интернету, вы, вероятно, видели это в своих журналах: множество незаконных внешних запросов, пытающихся получить доступ к вашим службам.

Я обслуживаю сервер Debian и использую Fail2Ban для предотвращения вторжений. Стандартная практика, установил, настроил и забыл. Поскольку сервер стал падать в определенное время, я решил присмотреться повнимательнее.

Я в основном занимаюсь программированием, а это больше работа для системных администраторов. Да, они специалисты, а я просто новичок. Но учиться всегда приятно. В этом посте я ограничусь сканерами портов, которые атакуют порт SMTP . Об атаках (D)DOS я напишу в другой раз. Мой сервер Debian подключен к Интернету, используя только ipv4 адреса (все еще не решаюсь добавить ipv6). Если вы попробуете это сами:

WARNING: USE AT YOUR OWN RISK / DO NOT LOCK YOURSELF OUT

Использование ipset и Python

Я предполагаю, что некоторые из IP address, участвующие в сканировании порта SMTP , могут также использоваться для других атак, но так ли это на самом деле? Я хотел добиться того, чтобы собрать как можно больше IP addresses, сканирующих порт SMTP . Для этого мне нужно постоянно блокировать эти IP addresses, а затем я предполагаю (надеюсь), что сканер будет использовать другой IP address для продолжения атаки и т.д.

С Fail2Ban, IP addresses не будут забанены навсегда. Также Fail2Ban по умолчанию блокирует не все порты, а только один порт.

Поскольку я не хотел трогать текущую настройку Fail2Ban, мне пришлось использовать iptables. Но я также не хотел добавлять длинные списки IP addresses в iptables, а затем удалять их. На помощь пришел ipset. ipset - это расширение iptables, которое позволяет нам создавать правила firewall , которые соответствуют наборам IP addresses (или IP-сетям).Наборы ipset хранятся в индексированных структурах данных, что делает поиск очень эффективным.
После того, как у нас есть IP addresses в наборе, например, набор под названием blocklist_postfix_sasl, мы можем заблокировать их, добавив
правило в iptables:

iptables -I INPUT -m set --match-set blocklist_network src -j DROP

Здесь я блокирую все порты, но если вы хотите, вы можете изменить правило, чтобы блокировать только порт SMTP .

Я использую Python вместо Bash-скрипта по причинам, описанным в предыдущих сообщениях. Bash очень полезен для небольших сценариев, но если вам нужно немного больше контроля и/или обработки, лучше использовать Python. С помощью Python мы также можем запускать программы командной строки Linux , используя subprocess.

Создание постоянных правил iptables и ipset

Первое, что вы хотите сделать, это сделать правила iptables постоянными, то есть, чтобы они сохранялись после перезагрузки. Вы также должны сделать наборы ipset постоянными. Здесь есть одна загвоздка, поскольку наборы ipset должны быть загружены до загрузки iptables. Вы можете найти в интернете информацию о том, как это сделать, я добавил две ссылки ниже.

Сбор IP addresses

Я создал скрипт Python , см. ниже, который делает следующее:

  • Сканирует лог-файлы Fail2Ban каждые несколько часов на предмет запрета postfix-sasl IP addresses.
  • Создайте набор ipset с именем blocklist_postfix_sasl, если он еще не создан.
  • Добавьте эти IP addresses в этот набор, а также в файл.
  • Сохраните набор, чтобы данные не были потеряны при перезагрузке.

Когда IP address добавляется в набор, он немедленно вступает в силу (если набор был добавлен в iptables). Используя Cron, я запускаю этот скрипт Python каждые три часа.

В файле с запрещенным IP addresses есть такие строки, как:

2023-05-20T07:00:02.785796 add blocklist_postfix_sasl 123.xxx.xxx.xxx
2023-05-20T07:00:02.801597 add blocklist_postfix_sasl 456.xxx.xxx.xxx

Постобработка

За неделю я собрал несколько тысяч уникальных IP addresses, заблокированных Fail2Ban. Затем я скопировал этот список на свой компьютер для анализа. В качестве первого шага я хотел создать CSV-файл со строками IP addresses и как можно больше информации.

Для ручной проверки IP addresses можно использовать онлайн-сервис, например Abuseipdb.com, или написать скрипт и использовать их API. Но их бесплатный сервис имеет ограничение в 1000 поисков в день.

Затем я нашел пакет Python IP2Location с бесплатной базой данных:

IP2LOCATION-LITE-DB11.BIN

С его помощью мы можем получить геолокацию, например, информацию о стране и городе. Самое приятное, что эта база данных находится на вашей машине, никаких удаленных поисков.

Из 3900 уникальных IP addresses, теперь я мог видеть, что 3750 были из Китая, что составляет 96%, WTF?

Далее, я установил "whois" на свою машину:

apt install whois

и использовал его со скриптом, чтобы получить больше информации. Чтобы уменьшить количество удаленных поисков whois, я пока исключил IP address из Китая. После некоторого кодирования я смог извлечь организации, ответственные за эти IP addresses и многое другое.

Результаты

Как уже упоминалось выше, почти все IP addresses - из Китая. Так почему же Китай хочет сканировать порты на моем сервере? Одна из причин может заключаться в том, что сканирование портов - это обычное дело для Китая. Готовимся к третьей мировой войне? Или они надеются, что я заблокирую эти IP addresses навсегда, чтобы люди из Китая не могли получить доступ к веб-сайтам, размещенным на моем сервере?

Может ли кто-то использовать поддельные IP addresses для атаки на мой сервер и предложить сканирование портов Китаем? Но тогда им придется взломать маршрутизаторы моего хостинг-провайдера? Все возможно, я не знаю.

Я не могу упомянуть результаты других IP addresses в detail. Но я должен упомянуть, что одна конкретная хостинговая организация из Нидерландов, похоже, является поставщиком многих хакеров / сканеров портов. В интернете о них ходят шокирующие отзывы reviews . Неосведомленные люди, которые начали использовать их хостинг и обнаружили, что их сайты заблокированы по всему миру.

Код Python для сбора IP addresses

Ниже приведен код для сбора IP addresses, если вы хотите попробовать сами. Обратите внимание на разницу между subprocess.run() и subprocess.popen(). С помощью последней мы можем запустить полноценную командную строку, используя фильтры, трубы и перенаправление.

#!/usr/bin/python3
# manage_ipset_blocklist_postfix_sasl.py
import datetime
import glob
import logging
import os
import re
import shlex
import subprocess
import sys

# CONSTANTS
PROGRAM_NAME = 'manage_ipset_blocklist_postfix_sasl'
# never add these ips to ipset
NEVER_ADD_IPS = [
    'xxx.xxx.xxx.xxx',
]
FAIL2BAN_FILES_DIR = '/var/log'
FILES_DIR = '/root/f2b_bans'
# log file
LOG_FILE = os.path.join(FILES_DIR, PROGRAM_NAME + '.log')
# temporary file(s)
BANS_FILE = os.path.join(FILES_DIR, 'bans')
# ipset
IPSET_EXE = '/usr/sbin/ipset'
IPSET_BLOCKLIST_NAME = 'blocklist_postfix_sasl'
IPSET_BLOCKLIST_LOG_FILE = os.path.join(FILES_DIR, IPSET_BLOCKLIST_NAME + '.log')
# this file is loaded by ipset on reboot
IPSET_SAVE_FILE = '/etc/iptables/ipset'

def get_logger(
    console_log_level=logging.DEBUG,
    file_log_level=logging.DEBUG,
    log_file=os.path.splitext(__file__)[0] + '.log',
):
    logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    if console_log_level:
        # console
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setLevel(console_log_level)
        console_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(console_handler)
    if file_log_level:
        # file
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(file_log_level)
        file_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(file_handler)
    return logger

logger = get_logger(
    #console_log_level=logging.INFO,
    #console_log_level=None,
    log_file=LOG_FILE,
)
logger.debug('START')

class CmdLine:
    def __init__(
        self,
        logger=None,
    ):
        self.logger = logger

    def run(self, command, stdout=None, check=False, stdout_file=None, stdout_file_mode='w'):
        if stdout_file is None:
            result = subprocess.run(shlex.split(command), capture_output=True, text=True, stdout=stdout, check=check)
        else:
            with open(stdout_file, stdout_file_mode) as fo:
                result = subprocess.run(shlex.split(command), text=True, stdout=fo, check=check)
        self.logger.debug(f'type(result.stdout) = {type(result.stdout)}, result.stdout = {result.stdout}')
        self.logger.debug(f'type(result.stderr) = {type(result.stderr)}, result.stderr = {result.stderr}')
        self.logger.debug(f'type(result.returncode) = {type(result.returncode)}, result.returncode = {result.returncode}')
        return result

    def popen(self, cmd):
        self.logger.debug(f'cmd = {cmd}')
        p = subprocess.Popen(cmd, shell=True)
        returncode = p.wait()
        self.logger.debug(f'type(returncode) = {type(returncode)}, returncode = {returncode}')
        return returncode

    def get_file_line_count(self, f):
        p = subprocess.Popen(['wc', '-l', f], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        result, err = p.communicate()
        if p.returncode != 0:
            raise IOError(err)
        return int(result.strip().split()[0])

    def get_file_size(self, f):
        return os.path.getsize(f)

    def create_dir(self, d):
        os.makedirs(d, exist_ok=True)
        return True

    def get_dir_files(self, d, pattern):
        self.logger.debug(f'd = {d}, pattern = {pattern}')
        pathname = os.path.join(d, pattern)
        self.logger.debug(f'pathname = {pathname}')
        return glob.glob(os.path.join(d, pattern))

    def remove_file(self, f):
        self.logger.debug(f'(f = {f})')
        try:
            os.remove(f)
        except OSError:
            pass

def main():

    # 2023-04-16 17:28:50,860 fail2ban.actions        [790]: NOTICE  [postfix-sasl] Ban xxx.xxx.xxx.xxx
    ban_pattern = re.compile(r'^.*?\[postfix\-sasl\]\s+Ban\s+(.*?)\s*$')
    ip4_pattern = re.compile(r'^(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])$')

    cmd_line = CmdLine(
        logger=logger,
    )

    # create ipset for blocklist if not there yet
    command = f'{IPSET_EXE} create {IPSET_BLOCKLIST_NAME} hash:ip hashsize 4096'
    cmd_line.run(command)

    # get ipset blocklist members to avoid excessive calling 'ipset add <ip address>'
    blocklist_members = []
    command = f'{IPSET_EXE} list {IPSET_BLOCKLIST_NAME}'
    result = cmd_line.run(command)
    for line in result.stdout.split('\n'):
        if ':' in line:
            continue
        line = line.strip()
        if ip4_pattern.match(line):
            blocklist_members.append(line)
    blocklist_members_len = len(blocklist_members)
    logger.debug(f'blocklist_members_len = {blocklist_members_len}')

    # filter Fail2Ban logs Bans to BANS_FILE
    logger.debug(f'filter fail2ban.log Bans to bans file {BANS_FILE} ...')
    cmd_line.remove_file(BANS_FILE)
    files = cmd_line.get_dir_files(FAIL2BAN_FILES_DIR, 'fail2ban.log*')
    for f in files:
        logger.debug(f'iterating file {f}')
        grep = 'grep'
        if f.endswith('.gz'):
            grep = 'zgrep'
        command = f'{grep} NOTICE "{f}" | grep "postfix-sasl" | grep Ban >> {BANS_FILE}'
        cmd_line.popen(command)

    # get ips from BANS_FILE
    ban_ips = []
    with open(BANS_FILE, 'r') as fo:
        data = fo.read()
    lines = data.split('\n')
    for line in lines:
        m = ban_pattern.search(line)
        if m is None:
            continue
        ip = m.group(1)
        if ip4_pattern.match(ip) is None:
            continue
        # never
        if ip in NEVER_ADD_IPS:
            continue
        # no duplicates
        if ip in ban_ips:
            continue
        # not if already in members
        if ip in blocklist_members:
            continue
        ban_ips.append(ip)
    ban_ips_count = len(ban_ips)
    logger.debug(f'ban_ips_count = {ban_ips_count}')

    # add ips to ipset blocklist
    added_count = 0
    rejected_count = 0
    for ban_ip in ban_ips:
        command = f'{IPSET_EXE} add {IPSET_BLOCKLIST_NAME} {ban_ip}'
        result = cmd_line.run(command)
        returncode = result.returncode
        logger.debug(f'type(returncode) = {type(returncode)}, returncode = {returncode}')
        if returncode == 0:
            added_count += 1
            # also add ip with timestamp to ipset_added file
            dt = datetime.datetime.utcnow().isoformat()
            with open(IPSET_BLOCKLIST_LOG_FILE, 'a') as fo_add_file:
                fo_add_file.write(f'{dt} add {IPSET_BLOCKLIST_NAME} {ban_ip}\n')
        else:
            rejected_count += 1
    logger.info(f'added_count = {added_count}, rejected_count = {rejected_count}')

    logger.debug(f'saving ipset to {IPSET_SAVE_FILE}')
    command = f'{IPSET_EXE} save -file {IPSET_SAVE_FILE}'
    cmd_line.run(command)

    logger.debug(f'to save again: {command}')

main()


Резюме

Я никогда раньше не использовал ipset, полезно узнать, как его использовать. Fail2Ban также можно использовать с ipset, но я не хотел менять конфигурацию Fail2Ban. С помощью ipset и скрипта Python я могу собирать плохие IP address и автоматически блокировать доступ к моему серверу. И я могу легко получить больше информации из этих IP addresses, используя Python пакет IP2Location и утилиту командной строки whois.

Ссылки / кредиты

Fail2ban
https://www.fail2ban.org

How to find the actual address of spoofed IPs?
https://security.stackexchange.com/questions/48523/how-to-find-the-actual-address-of-spoofed-ips

How to Make iptables Firewall Rules Persistent on Debian/Ubuntu
https://linuxiac.com/persistent-iptables-firewall-rules

IP2Location
https://pypi.org/project/IP2Location

ipset
https://ipset.netfilter.org/ipset.man.html

Persistent ipset for Ubuntu/Debian compatible with ufw and iptables-persistent
https://selivan.github.io/2018/07/27/ipset-save-with-ufw-and-iptables-persistent-and.html

WHOIS(1)
https://manpages.debian.org/bullseye/whois/whois.1.en.html

Оставить комментарий

Комментируйте анонимно или войдите в систему, чтобы прокомментировать.

Комментарии

Оставьте ответ

Ответьте анонимно или войдите в систему, чтобы ответить.