angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

LogLineFollower: Follow lines of a growing log file

Read and process lines from a growing log file and run your desired actions.

9 September 2022 Updated 10 September 2022
post main image
https://unsplash.com/@mailchimp

I was looking for a way to process lines of a log file while it was growing. I found some snippets and packages on the internet but not exactly what I wanted. That is why I decided to write my own.

After having coded a first version, I searched again and found several more packages. But, looking at the description, the code and issues, I decided to stick with my own code. It cannot be that hard to create something like this ... But as always, there is more to it than you initially think.

Below I go into some more details and end with code showing the first version of LogLineFollower. Oh, sorry, Linux only, no idea if it runs on Windows or macOS.

String vs bytes

Some packages open the log file in normal mode, while I decided to open the log file in binary mode. Why did I choose to do this? The reason is that in the past I was confronted several times with log files that contained unexpected, a better word is illegal, characters, that can cause log file processors to crash. Yes, even tools like grep can fail. In one case, syslog got corrupted multiple times, when a process writing to syslog crashed.

Using bytes means using 'read(chunk_size)' instead of 'readline()'. And instead of using '\n' as the line separator we now use b'\n'.

The whole idea of using bytes is to prevent this process to crash, to handle (decoding) errors in some way and try to continue normal operation, all without our interaction.

Detecting file changes

Log files can change, typically by log file rotation. The current file is renamed and a new file is used. This can be detected on Linux by checking the inode. If the inode of the log file changes, then the file has changed.

But there are other ways how log files can rotate. One is by copying the contents of the log file to another file and then truncating the log file. Or by copying a new empty file to the current log file. In these cases, the inode does not change and the only thing we can do is check if the log file has become smaller instead of bigger.

To make things worst, the log file name may have changed after rotate. For example from 'access.log' to 'access.log.1'.In this case we need some smart logic to find the new log file.

Being able to continue where we left off

We don't live in a perfect world and our log file processor can crash. Or, we want to stop it, do some maintenance and run it again but skip the lines we already processed.

This means that we must remember what the last line was we processed. By storing the current offset of the log file in a separate file after processing a line, we create a way to continue after an interruption: On a restart we fetch the saved offset and use this to set the next position in the log file.

A different way to continue after interruption can be to save a copy of the last line processed, and then use this copy when walking through the file.

The code: LogLineFollower

The first version of the code below has the following limitations:

  • No continuation or whatsoever on log file rotate.
  • When interupted and restarted, (some) processed last lines may show up again or skipped, see code.

The last item(s) can be important, but that is for you to decide. This is happening because I decided, for performance reasons, that we must be able to process a list of lines instead of a single line at a time.

If the same lines show up again after a restart then the operation in your 'process_line()' function must be idempotent,
meaning that the same operation can be applied multiple times without changing the result.

Without changes, the code below begins at the first line of /var/log/syslog. When it reaches the end of syslog, it waits for new lines.

To try it: Copy-paste and run. And change. And run.

"""
follow a log file, e.g. syslog, postfix.log

note(s):
- linux only
- in process_lines() you do what you want
- lines assumed to be ending with '\n'
- we only decode bytes after (!) we have a complete line
- file change (rotate) is detected on inode change
"""
import logging
import os
import sys
import time


# console logger
def get_logger():
    log_level = logging.DEBUG
    #log_level = logging.INFO
    log_format = logging.Formatter('%(asctime)s [%(filename)-30s%(funcName)20s():%(lineno)03s] %(levelname)-8.8s %(message)s')
    logger = logging.getLogger(__name__)
    logger.setLevel(log_level)
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(log_level)
    console_handler.setFormatter(log_format)
    logger.addHandler(console_handler)
    return logger

    
class DummyLogger:
    def __getattr__(self, name):
        return lambda *x: None        


class LogLineFollower:
    """
    wait for new lines in the file to follow and calls process_line() or your callback cb_line() where you can do your work.

    arguments:

    logger              (optional) the logger.
    follow_file         the file to follow, e.g. /var/log/syslog.
    cb_process_line     (optional) callback. your line processing function.
    cb_process_lines    (optional) callback. your multiple lines processing function.
    offset_file         (optional) file holding the information to continue on a restart.
                        the offset file is always present. remove this file to make a fresh start.
                        if not specified its path is follow_file with the extension replaced by '.offset'.
    offset_type         (optional) where to start in follow_file, can be one of:
                        - 'begin': start at beginning of file
                        - 'continue': start where we left off using data from offset_file
                        - 'end': (default) start at end of file
                        important: if offset_file is present, its values will be used to continue
                        regardless of the value of offset_type! see also offset_file.
    chunk_size          (optional) maximum number of bytes we are trying to read at a time from follow_file.
                        default: 4096
    followed_lines_file (optional) receives all lines read.
                        can be used for testing and/or checking.
    """
    def __init__(
        self,
        logger=None,
        follow_file=None,
        cb_process_line=None,
        cb_process_lines=None,
        offset_file=None,
        offset_type=None,
        chunk_size=None,
        followed_lines_file=None,
    ):
        self.logger = logger or DummyLogger()
        self.follow_file = follow_file
        self.cb_process_line = cb_process_line
        self.cb_process_lines = cb_process_lines
        self.offset_file = offset_file or os.path.splitext(self.follow_file)[0] + '.offset'
        self.offset_type = offset_type or 'end'
        self.chunk_size = chunk_size or 4096
        self.followed_lines_file = followed_lines_file

    def append_lines_to_followed_lines_file(self, blines):
        if self.followed_lines_file is None:
            return
        #self.logger.debug('appending blines = {}'.format(blines))
        # decode to utf-8 before appending
        lines = []
        for bline in blines:
            # 'try-except' this if decode has errors
            lines.append(bline.decode('utf-8'))
        with open(self.followed_lines_file, 'a') as fo:
            fo.write('\n'.join(lines) + '\n')

    def process_lines(self, blines):
        # decode to utf-8 before printing
        lines = []
        for bline in blines:
            # 'try-except' this if decode has errors
            line = bline.decode('utf-8')
            lines.append(line)
            self.process_line(line)
        if self.cb_process_lines is not None:
            self.cb_process_lines(lines)


    def process_line(self, line):
        if self.cb_process_line is not None:
            self.cb_process_line(line)
        else:
            self.logger.info('line = {}'.format(line))

    def write_offset_file(self, pos, incomplete_line_chunk):
        # write a comma-separated bytes string: b'inode,pos,incomplete_line_chunk'
        self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(self.inode, pos, incomplete_line_chunk))
        bs = self.inode.to_bytes(8, 'big') + b','
        bs += pos.to_bytes(8, 'big') + b','
        bs += incomplete_line_chunk or b''
        self.logger.debug('bs = {}'.format(bs))
        with open(self.offset_file, 'wb') as fo:
            fo.write(bs)

    def read_offset_file(self):
        bs = None
        try:
            with open(self.offset_file, 'rb') as fo:
                bs = fo.read()
            if bs is not None:
                bs_parts = bs.split(b',', 2)
                bs_parts_len = len(bs_parts)
                if bs_parts_len == 3:
                    inode = int.from_bytes(bs_parts[0], 'big')
                    pos = int.from_bytes(bs_parts[1], 'big')
                    incomplete_line_chunk = bs_parts[2]
                    if incomplete_line_chunk == b'':
                        incomplete_line_chunk = None
                    self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(inode, pos, incomplete_line_chunk))
                    return inode, pos, incomplete_line_chunk
        except Exception as e:
            pass
        return None, None, None

    def set_file(self):
        try:
            inode = os.stat(self.follow_file).st_ino
            self.logger.debug('inode = {}'.format(inode))
            self.inode = inode
            return True
        except Exception as e:
            self.logger.debug('exception = {}, e.args = {}'.format(type(e).__name, e.args))
            return False

    def file_changed(self):
        try:
            inode = os.stat(self.follow_file).st_ino
            self.logger.debug('inode = {}'.format(inode))
            if inode != self.inode:
                return True
            return False
        except Exception as e:
            self.logger.debug('get_inode: exception = {}, e.args = {}'.format(type(e).__name, e.args))
            return True

    def run(self):
        incomplete_line_chunk = None
        wait_count = 0
        self.set_file()
        with open(self.follow_file, 'rb') as fo:
            # continue where we left off, remove offset_file to start fresh
            start_pos_set = False
            inode, pos, incomplete_line_chunk = self.read_offset_file()
            if inode is not None:
                # check saved inode against current inode
                # ...
                # seek to pos in offset_file
                fo.seek(pos, 0)
                start_pos_set = True
            if not start_pos_set:
                if self.offset_type == 'end':
                    # start at end
                    fo.seek(0, 2)
                else:
                    # start at beginning
                    fo.seek(0, 0)

            while True:
                if self.file_changed():
                    self.logger.debug('file not present or changed')
                    break
                # read new chunk
                chunk = fo.read(self.chunk_size)
                chunk_size_read = len(chunk)
                self.logger.debug('chunk_size_read = {}'.format(chunk_size_read))
                if chunk_size_read == 0:
                    self.logger.debug('waiting for new data ... {}'.format(wait_count))
                    time.sleep(1)
                    wait_count += 1
                    continue
                wait_count = 0

                # prepend incomplete_line_chunk if any
                if incomplete_line_chunk is not None:
                    chunk = incomplete_line_chunk + chunk
                    incomplete_line_chunk = None

                # split chunk on new lines
                blines = chunk.split(b'\n')

                # get completed lines, and incomplete_line
                if blines[-1] == b'':
                    # last line is empty means last line -1 is complete
                    del blines[-1]
                else:
                    # last line is not empty means last line is not complete
                    incomplete_line_chunk = blines[-1]
                    del blines[-1]
                # do something with complete lines (bytes)
                if len(blines) > 0:
                    self.append_lines_to_followed_lines_file(blines)
                    self.process_lines(blines)
                # show incomplete_line_chunk
                if incomplete_line_chunk is not None:
                    self.logger.debug('incomplete_line_chunk = {}'.format(incomplete_line_chunk))

                # here we write the file offset pos AFTER 'do something with complete lines'
                # if 'do something with complete lines' is interrupted, 
                # (some of) these lines will be read (and processed) AGAIN the next time.
                #
                # the alternative is to write the offset file BEFORE 'do something with complete lines'
                # if 'do something with complete lines' is interrupted, 
                # (some of) these lines will be SKIPPED the next time 
                # 
                fo.seek(0, 1)
                pos = fo.tell()
                self.logger.debug('pos = {}'.format(pos))
                self.write_offset_file(pos, incomplete_line_chunk)
                #self.read_offset_file()

def process_line(line):
    print('processing line = {}'.format(line))

if __name__ == '__main__':
    f = '/var/log/syslog'
    llf = LogLineFollower(
        logger=get_logger(),
        follow_file=f,
        offset_file='file_follower.offset',
        cb_process_line=process_line,
        offset_type='begin',
        #offset_type='end',
        chunk_size=4096,
        followed_lines_file='followed_lines.log',
    )
    llf.run()

Summary

As in many cases, you start with a few lines of code, and you are on your way. Soon you will look for ways to test your code. That's why I added the optional 'followed_lines_file'. You can compare the data of this file with the data of the 'follow_file'.

Did I save time writing this myself, or should I have used one of the Pypi.org packages? Unfortunately, most packages on Pypi.org that have comparable functionality, miss some of my requirements, have issues, or are not really maintained. But it was instructive to study them. I guess I did not have much of a choice in this case.

Links / credits

logtailtool
https://github.com/minyong-jeong/logtailtool

py-logwatcher
https://github.com/tonytan4ever/py-logwatcher

pygtail
https://github.com/bgreenlee/pygtail

tail-uwsgi-log
https://pypi.org/project/tail-uwsgi-log

Read more

Log file

Leave a comment

Comment anonymously or log in to comment.

Comments

Leave a reply

Reply anonymously or log in to reply.