Collect and block IP addresses with ipset and Python

And once we have a list of IP addresses, we can analyze them with IP2Location and whois

21 May 2023 Updated 21 May 2023
post main image

If you have a server connected to the Internet, you've probably seen this in your log files: lots of illegal external requests trying to access your services.

I maintain a Debian server, and use Fail2Ban for intrusion prevention. Standard practice, install, configure and forget. Since the server was getting pulled down at certain times, I decided to take a closer look.

I'm mostly into programming and this is more of a job for system administrators. Yes, they are the specialists, I am just a noob. But it's always nice to learn. In this post I will limit myself to port scanners, which attack the SMTP port. I will write about (D)DOS attacks another time. My Debian server is connected to the Internet using only ipv4 addresses (still hesitant to add ipv6). If you try this yourself:


Using ipset and Python

I assume some of the IP addresses involved in scanning the SMTP port may also be used for other attacks, but is this really true? What I wanted to achieve was to collect as much as possible IP addresses scanning the SMTP port. For this I need to permanently block these IP addresses and then I assume (hope) that the scanner will use another IP address to continue the attack, etc.

With Fail2Ban, IP addresses do not get banned permanently. Also Fail2Ban by default does not block all ports but only a single port.

As I did not want to touch the current Fail2Ban setup, I had to go using iptables. But I also did not want to add long lists of IP addresses to iptables, and later remove them. ipset to the rescue. ipset is an extension to iptables that allows us to create firewall rules that match sets of IP addresses (or IP networks).ipset sets are stored in indexed data structures, making lookups very efficient.
Once we have the IP addresses in a set, for example a set called blocklist_postfix_sasl, we can block them by adding
a rule to iptables:

iptables -I INPUT -m set --match-set blocklist_network src -j DROP

Here I am blocking all ports, but if you want you can change the rule to only block the SMTP port.

I use Python instead of a Bash script for reasons explained in previous posts. Bash is very useful for small scripts but once you need a bit more control and/or processing, you are better off with Python. With Python we can also run Linux command line programs using subprocess.

Making iptables rules and ipset persistant

The first thing you want to do is to make your iptables rules persistant, meaning that they will be there after a reboot. You must also make your ipset sets persistant. There is a catch here becauses ipset sets must be loaded before iptables is loaded. You can find information on the internet on how to do this, I added two links below.

Collecting IP addresses

I created a Python script, see below, that does the following:

  • Scan the Fail2Ban logfiles every few hours for Banned postfix-sasl IP addresses.
  • Create an ipset set named blocklist_postfix_sasl, if not yet created.
  • Add these IP addresses to this set, and also to a file.
  • Save the set so the data does not gets lost on a reboot.

When an IP address is added to a set, it is immediately effective (if the set has been added to iptables). Using Cron, I run this Python script every three hours.

The file with the banned IP addresses has lines like:

2023-05-20T07:00:02.785796 add blocklist_postfix_sasl
2023-05-20T07:00:02.801597 add blocklist_postfix_sasl


In a week I collected a few thousand unique IP addresses blocked by Fail2Ban. Then I copied this list to my computer for analyzing. As a first step, I wanted to create a CSV-file with rows of IP addresses and as much information as possible.

You can use an online service like to manually check IP addresses, or write a script and use their API. But their free service has a limit of 1000 lookups a day.

Then I found the Python package IP2Location with the free database:


With this we can get the geo-location, like country and city information. The nice thing is that you have this database on your machine, no remote lookups.

Of the 3900 unique IP addresses, now I could see the 3750 were from China, which is 96%, WTF?

Next, I installed 'whois' on my machine:

apt install whois

and used this, with a script, to get more information. To reduce the number of remote whois lookups I excluded the IP addresses from China for the moment. After some coding, I could extract the organizations responsible for these IP addresses and more.


As already mentioned above, almost al IP addresses are from China. Now why in the world would China want to scan the ports on my server? One reason can be that scanning ports is business-as-usual for China. Prepare for WW3? Or maybe they hope that I will block these IP addresses permanently so that the people from China cannot access websites that are hosted on my server?

Can someone use spoofed IP addresses to attack my server, and suggest port scanning by China? But then they would have to hack the routers of my hosting provider? Everything is possible, I don't know.

I cannot mention the results of the other IP addresses in detail. But I must mention that one specific hosting organization from the Netherlands seems to be a provider of many hackers / port scanners. There are shocking reviews about them on the internet. Unaware people that started using their hosting and finding that their websites are blocked all over the world.

The Python code collecting IP addresses

Below is the code for collecting the IP addresses, if you want to try yourself. Note the difference between and subprocess.popen(). With the latter we can run a full blown command line, using filters, pipes and redirection.

import datetime
import glob
import logging
import os
import re
import shlex
import subprocess
import sys

PROGRAM_NAME = 'manage_ipset_blocklist_postfix_sasl'
# never add these ips to ipset
FAIL2BAN_FILES_DIR = '/var/log'
FILES_DIR = '/root/f2b_bans'
# log file
LOG_FILE = os.path.join(FILES_DIR, PROGRAM_NAME + '.log')
# temporary file(s)
BANS_FILE = os.path.join(FILES_DIR, 'bans')
# ipset
IPSET_EXE = '/usr/sbin/ipset'
IPSET_BLOCKLIST_NAME = 'blocklist_postfix_sasl'
# this file is loaded by ipset on reboot
IPSET_SAVE_FILE = '/etc/iptables/ipset'

def get_logger(
    log_file=os.path.splitext(__file__)[0] + '.log',
    logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
    logger = logging.getLogger(__name__)
    if console_log_level:
        # console
        console_handler = logging.StreamHandler(sys.stdout)
    if file_log_level:
        # file
        file_handler = logging.FileHandler(log_file)
    return logger

logger = get_logger(

class CmdLine:
    def __init__(
        self.logger = logger

    def run(self, command, stdout=None, check=False, stdout_file=None, stdout_file_mode='w'):
        if stdout_file is None:
            result =, capture_output=True, text=True, stdout=stdout, check=check)
            with open(stdout_file, stdout_file_mode) as fo:
                result =, text=True, stdout=fo, check=check)
        self.logger.debug(f'type(result.stdout) = {type(result.stdout)}, result.stdout = {result.stdout}')
        self.logger.debug(f'type(result.stderr) = {type(result.stderr)}, result.stderr = {result.stderr}')
        self.logger.debug(f'type(result.returncode) = {type(result.returncode)}, result.returncode = {result.returncode}')
        return result

    def popen(self, cmd):
        self.logger.debug(f'cmd = {cmd}')
        p = subprocess.Popen(cmd, shell=True)
        returncode = p.wait()
        self.logger.debug(f'type(returncode) = {type(returncode)}, returncode = {returncode}')
        return returncode

    def get_file_line_count(self, f):
        p = subprocess.Popen(['wc', '-l', f], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        result, err = p.communicate()
        if p.returncode != 0:
            raise IOError(err)
        return int(result.strip().split()[0])

    def get_file_size(self, f):
        return os.path.getsize(f)

    def create_dir(self, d):
        os.makedirs(d, exist_ok=True)
        return True

    def get_dir_files(self, d, pattern):
        self.logger.debug(f'd = {d}, pattern = {pattern}')
        pathname = os.path.join(d, pattern)
        self.logger.debug(f'pathname = {pathname}')
        return glob.glob(os.path.join(d, pattern))

    def remove_file(self, f):
        self.logger.debug(f'(f = {f})')
        except OSError:

def main():

    # 2023-04-16 17:28:50,860 fail2ban.actions        [790]: NOTICE  [postfix-sasl] Ban
    ban_pattern = re.compile(r'^.*?\[postfix\-sasl\]\s+Ban\s+(.*?)\s*$')
    ip4_pattern = re.compile(r'^(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])$')

    cmd_line = CmdLine(

    # create ipset for blocklist if not there yet
    command = f'{IPSET_EXE} create {IPSET_BLOCKLIST_NAME} hash:ip hashsize 4096'

    # get ipset blocklist members to avoid excessive calling 'ipset add <ip address>'
    blocklist_members = []
    command = f'{IPSET_EXE} list {IPSET_BLOCKLIST_NAME}'
    result =
    for line in result.stdout.split('\n'):
        if ':' in line:
        line = line.strip()
        if ip4_pattern.match(line):
    blocklist_members_len = len(blocklist_members)
    logger.debug(f'blocklist_members_len = {blocklist_members_len}')

    # filter Fail2Ban logs Bans to BANS_FILE
    logger.debug(f'filter fail2ban.log Bans to bans file {BANS_FILE} ...')
    files = cmd_line.get_dir_files(FAIL2BAN_FILES_DIR, 'fail2ban.log*')
    for f in files:
        logger.debug(f'iterating file {f}')
        grep = 'grep'
        if f.endswith('.gz'):
            grep = 'zgrep'
        command = f'{grep} NOTICE "{f}" | grep "postfix-sasl" | grep Ban >> {BANS_FILE}'

    # get ips from BANS_FILE
    ban_ips = []
    with open(BANS_FILE, 'r') as fo:
        data =
    lines = data.split('\n')
    for line in lines:
        m =
        if m is None:
        ip =
        if ip4_pattern.match(ip) is None:
        # never
        if ip in NEVER_ADD_IPS:
        # no duplicates
        if ip in ban_ips:
        # not if already in members
        if ip in blocklist_members:
    ban_ips_count = len(ban_ips)
    logger.debug(f'ban_ips_count = {ban_ips_count}')

    # add ips to ipset blocklist
    added_count = 0
    rejected_count = 0
    for ban_ip in ban_ips:
        command = f'{IPSET_EXE} add {IPSET_BLOCKLIST_NAME} {ban_ip}'
        result =
        returncode = result.returncode
        logger.debug(f'type(returncode) = {type(returncode)}, returncode = {returncode}')
        if returncode == 0:
            added_count += 1
            # also add ip with timestamp to ipset_added file
            dt = datetime.datetime.utcnow().isoformat()
            with open(IPSET_BLOCKLIST_LOG_FILE, 'a') as fo_add_file:
                fo_add_file.write(f'{dt} add {IPSET_BLOCKLIST_NAME} {ban_ip}\n')
            rejected_count += 1'added_count = {added_count}, rejected_count = {rejected_count}')

    logger.debug(f'saving ipset to {IPSET_SAVE_FILE}')
    command = f'{IPSET_EXE} save -file {IPSET_SAVE_FILE}'

    logger.debug(f'to save again: {command}')



I have never used ipset before, it is good to learn how to use it. Fail2Ban can also be used with ipset, but I didn't want to change my Fail2Ban configuration. With ipset and the Python script I can collect bad IP addresses and automatically block access to my server. And I can easily can get more information from these IP addresses using the Python package IP2Location and the command line utility whois.

Links / credits


How to find the actual address of spoofed IPs?

How to Make iptables Firewall Rules Persistent on Debian/Ubuntu



Persistent ipset for Ubuntu/Debian compatible with ufw and iptables-persistent


Leave a comment

Comment anonymously or log in to comment.


Leave a reply

Reply anonymously or log in to reply.