angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

LogLineFollower: Zeilen einer wachsenden Protokolldatei verfolgen

Lesen und verarbeiten Sie Zeilen aus einer wachsenden Protokolldatei und führen Sie die gewünschten Aktionen aus.

9 September 2022
post main image
https://unsplash.com/@mailchimp

Ich war auf der Suche nach einer Möglichkeit, Zeilen einer Protokolldatei zu verarbeiten, während sie wächst. Ich fand einige Schnipsel und Pakete im Internet, aber nicht genau das, was ich wollte. Deshalb beschloss ich, mein eigenes Programm zu schreiben.

Nachdem ich eine erste Version programmiert hatte, suchte ich erneut und fand einige weitere Pakete. Aber als ich mir die Beschreibung, den Code und die Probleme ansah, entschied ich mich, bei meinem eigenen Code zu bleiben. Es kann doch nicht so schwer sein, so etwas zu erstellen ... Aber wie immer steckt mehr dahinter, als man zunächst denkt.

Im Folgenden gehe ich auf einige weitere details ein und schließe mit Code, der die erste Version von LogLineFollower zeigt. Oh, sorry, nur Linux , keine Ahnung, ob es auf Windows oder macOS läuft.

String vs. bytes

Einige Pakete öffnen die Protokolldatei im normalen Modus, während ich mich entschieden habe, die Protokolldatei im Binärmodus zu öffnen. Warum habe ich mich für diese Vorgehensweise entschieden? Der Grund dafür ist, dass ich in der Vergangenheit mehrfach mit Protokolldateien konfrontiert wurde, die unerwartete, besser gesagt illegale Zeichen enthielten, die zum Absturz von Protokolldateiprozessoren führen können. Ja, sogar Tools wie grep können versagen. In einem Fall wurde syslog mehrfach beschädigt, als ein Prozess, der in syslog schrieb, abstürzte.

Die Verwendung von bytes bedeutet die Verwendung von 'read(chunk_size)' anstelle von 'readline()'. Und statt '\n' als Zeilentrennzeichen verwenden wir jetzt b'\n'.

Der Sinn der Verwendung von bytes besteht darin, einen Absturz dieses Prozesses zu verhindern, (Dekodierungs-)Fehler auf irgendeine Weise zu behandeln und zu versuchen, den normalen Betrieb fortzusetzen, und zwar ohne unser Zutun.

Erkennen von Dateiänderungen

Protokolldateien können sich ändern, typischerweise durch Rotation der Protokolldatei. Die aktuelle Datei wird umbenannt und eine neue Datei wird verwendet. Dies kann bei Linux durch Überprüfung der inode festgestellt werden. Wenn sich die inode der Protokolldatei ändert, dann hat sich die Datei geändert.

Es gibt aber noch andere Möglichkeiten, wie Protokolldateien rotieren können. Eine davon ist, den Inhalt der Protokolldatei in eine andere Datei zu kopieren und dann die Protokolldatei zu truncaten. Oder durch Kopieren einer neuen leeren Datei in die aktuelle Protokolldatei. In diesen Fällen ändert sich der inode nicht, und wir können nur prüfen, ob die Protokolldatei kleiner statt größer geworden ist.

Erschwerend kommt hinzu, dass sich der Name der Protokolldatei nach der Rotation geändert haben kann. Zum Beispiel von 'access.log' zu 'access.log.1'. In diesem Fall brauchen wir eine intelligente Logik, um die neue Protokolldatei zu finden.

Weitermachen können, wo wir aufgehört haben

Wir leben nicht in einer perfekten Welt und unser Protokolldateiprozessor kann abstürzen. Oder wir wollen ihn anhalten, einige Wartungsarbeiten durchführen und ihn erneut starten, aber die Zeilen überspringen, die wir bereits verarbeitet haben.

Das bedeutet, dass wir uns merken müssen, welche Zeile wir zuletzt verarbeitet haben. Indem wir den aktuellen Offset der Protokolldatei nach der Verarbeitung einer Zeile in einer separaten Datei speichern, schaffen wir eine Möglichkeit, nach einer Unterbrechung fortzufahren: Bei einem Neustart holen wir uns den gespeicherten Offset und setzen damit die nächste Position in der Protokolldatei.

Eine andere Möglichkeit, nach einer Unterbrechung fortzufahren, kann darin bestehen, eine Kopie der zuletzt verarbeiteten Zeile zu speichern und diese Kopie dann beim Durchlaufen der Datei zu verwenden.

Der Code: LogLineFollower

Die erste Version des nachstehenden Codes hat die folgenden Einschränkungen:

  • Keine Fortsetzung oder ähnliches beim Drehen der Protokolldatei.
  • Bei Unterbrechung und Neustart können (einige) verarbeitete letzte Zeilen wieder auftauchen oder übersprungen werden, siehe Code.

Die letzte(n) Zeile(n) können wichtig sein, aber das müssen Sie selbst entscheiden. Dies geschieht, weil ich aus Leistungsgründen beschlossen habe, dass wir in der Lage sein müssen, eine Liste von Zeilen statt einer einzelnen Zeile auf einmal zu verarbeiten.

Wenn dieselben Zeilen nach einem Neustart wieder auftauchen, muss die Operation in Ihrer Funktion 'process_line()' idempotent sein,
was bedeutet, dass dieselbe Operation mehrfach angewendet werden kann, ohne dass sich das Ergebnis ändert.

Ohne Änderungen beginnt der folgende Code in der ersten Zeile von /var/log/syslog. Wenn er das Ende von syslog erreicht, wartet er auf neue Zeilen.

Um es auszuprobieren: Kopieren - Einfügen - Ausführen. Und ändern. Und ausführen.

"""
follow a log file, e.g. syslog, postfix.log

note(s):
- linux only
- in process_lines() you do what you want
- lines assumed to be ending with '\n'
- we only decode bytes after (!) we have a complete line
- file change (rotate) is detected on inode change
"""
import logging
import os
import sys
import time


# console logger
def get_logger():
    log_level = logging.DEBUG
    #log_level = logging.INFO
    log_format = logging.Formatter('%(asctime)s [%(filename)-30s%(funcName)20s():%(lineno)03s] %(levelname)-8.8s %(message)s')
    logger = logging.getLogger(__name__)
    logger.setLevel(log_level)
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(log_level)
    console_handler.setFormatter(log_format)
    logger.addHandler(console_handler)
    return logger

    
class DummyLogger:
    def __getattr__(self, name):
        return lambda *x: None        


class LogLineFollower:
    """
    wait for new lines in the file to follow and calls process_line() or your callback cb_line() where you can do your work.

    arguments:

    logger              (optional) the logger.
    follow_file         the file to follow, e.g. /var/log/syslog.
    cb_process_line     (optional) callback. your line processing function.
    cb_process_lines    (optional) callback. your multiple lines processing function.
    offset_file         (optional) file holding the information to continue on a restart.
                        the offset file is always present. remove this file to make a fresh start.
                        if not specified its path is follow_file with the extension replaced by '.offset'.
    offset_type         (optional) where to start in follow_file, can be one of:
                        - 'begin': start at beginning of file
                        - 'continue': start where we left off using data from offset_file
                        - 'end': (default) start at end of file
                        important: if offset_file is present, its values will be used to continue
                        regardless of the value of offset_type! see also offset_file.
    chunk_size          (optional) maximum number of bytes we are trying to read at a time from follow_file.
                        default: 4096
    followed_lines_file (optional) receives all lines read.
                        can be used for testing and/or checking.
    """
    def __init__(
        self,
        logger=None,
        follow_file=None,
        cb_process_line=None,
        cb_process_lines=None,
        offset_file=None,
        offset_type=None,
        chunk_size=None,
        followed_lines_file=None,
    ):
        self.logger = logger or DummyLogger()
        self.follow_file = follow_file
        self.cb_process_line = cb_process_line
        self.cb_process_lines = cb_process_lines
        self.offset_file = offset_file or os.path.splitext(self.follow_file)[0] + '.offset'
        self.offset_type = offset_type or 'end'
        self.chunk_size = chunk_size or 4096
        self.followed_lines_file = followed_lines_file

    def append_lines_to_followed_lines_file(self, blines):
        if self.followed_lines_file is None:
            return
        #self.logger.debug('appending blines = {}'.format(blines))
        # decode to utf-8 before appending
        lines = []
        for bline in blines:
            # 'try-except' this if decode has errors
            lines.append(bline.decode('utf-8'))
        with open(self.followed_lines_file, 'a') as fo:
            fo.write('\n'.join(lines) + '\n')

    def process_lines(self, blines):
        # decode to utf-8 before printing
        lines = []
        for bline in blines:
            # 'try-except' this if decode has errors
            line = bline.decode('utf-8')
            lines.append(line)
            self.process_line(line)
        if self.cb_process_lines is not None:
            self.cb_process_lines(lines)


    def process_line(self, line):
        if self.cb_process_line is not None:
            self.cb_process_line(line)
        else:
            self.logger.info('line = {}'.format(line))

    def write_offset_file(self, pos, incomplete_line_chunk):
        # write a comma-separated bytes string: b'inode,pos,incomplete_line_chunk'
        self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(self.inode, pos, incomplete_line_chunk))
        bs = self.inode.to_bytes(8, 'big') + b','
        bs += pos.to_bytes(8, 'big') + b','
        bs += incomplete_line_chunk or b''
        self.logger.debug('bs = {}'.format(bs))
        with open(self.offset_file, 'wb') as fo:
            fo.write(bs)

    def read_offset_file(self):
        bs = None
        try:
            with open(self.offset_file, 'rb') as fo:
                bs = fo.read()
            if bs is not None:
                bs_parts = bs.split(b',', 2)
                bs_parts_len = len(bs_parts)
                if bs_parts_len == 3:
                    inode = int.from_bytes(bs_parts[0], 'big')
                    pos = int.from_bytes(bs_parts[1], 'big')
                    incomplete_line_chunk = bs_parts[2]
                    if incomplete_line_chunk == b'':
                        incomplete_line_chunk = None
                    self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(inode, pos, incomplete_line_chunk))
                    return inode, pos, incomplete_line_chunk
        except Exception as e:
            pass
        return None, None, None

    def set_file(self):
        try:
            inode = os.stat(self.follow_file).st_ino
            self.logger.debug('inode = {}'.format(inode))
            self.inode = inode
            return True
        except Exception as e:
            self.logger.debug('exception = {}, e.args = {}'.format(type(e).__name, e.args))
            return False

    def file_changed(self):
        try:
            inode = os.stat(self.follow_file).st_ino
            self.logger.debug('inode = {}'.format(inode))
            if inode != self.inode:
                return True
            return False
        except Exception as e:
            self.logger.debug('get_inode: exception = {}, e.args = {}'.format(type(e).__name, e.args))
            return True

    def run(self):
        incomplete_line_chunk = None
        wait_count = 0
        self.set_file()
        with open(self.follow_file, 'rb') as fo:
            # continue where we left off, remove offset_file to start fresh
            start_pos_set = False
            inode, pos, incomplete_line_chunk = self.read_offset_file()
            if inode is not None:
                # check saved inode against current inode
                # ...
                # seek to pos in offset_file
                fo.seek(pos, 0)
                start_pos_set = True
            if not start_pos_set:
                if self.offset_type == 'end':
                    # start at end
                    fo.seek(0, 2)
                else:
                    # start at beginning
                    fo.seek(0, 0)

            while True:
                if self.file_changed():
                    self.logger.debug('file not present or changed')
                    break
                # read new chunk
                chunk = fo.read(self.chunk_size)
                chunk_size_read = len(chunk)
                self.logger.debug('chunk_size_read = {}'.format(chunk_size_read))
                if chunk_size_read == 0:
                    self.logger.debug('waiting for new data ... {}'.format(wait_count))
                    time.sleep(1)
                    wait_count += 1
                    continue
                wait_count = 0

                # prepend incomplete_line_chunk if any
                if incomplete_line_chunk is not None:
                    chunk = incomplete_line_chunk + chunk
                    incomplete_line_chunk = None

                # split chunk on new lines
                blines = chunk.split(b'\n')

                # get completed lines, and incomplete_line
                if blines[-1] == b'':
                    # last line is empty means last line -1 is complete
                    del blines[-1]
                else:
                    # last line is not empty means last line is not complete
                    incomplete_line_chunk = blines[-1]
                    del blines[-1]
                # do something with complete lines (bytes)
                if len(blines) > 0:
                    self.append_lines_to_followed_lines_file(blines)
                    self.process_lines(blines)
                # show incomplete_line_chunk
                if incomplete_line_chunk is not None:
                    self.logger.debug('incomplete_line_chunk = {}'.format(incomplete_line_chunk))

                # here we write the file offset pos AFTER 'do something with complete lines'
                # if 'do something with complete lines' is interrupted, 
                # (some of) these lines will be read (and processed) AGAIN the next time.
                #
                # the alternative is to write the offset file BEFORE 'do something with complete lines'
                # if 'do something with complete lines' is interrupted, 
                # (some of) these lines will be SKIPPED the next time 
                # 
                fo.seek(0, 1)
                pos = fo.tell()
                self.logger.debug('pos = {}'.format(pos))
                self.write_offset_file(pos, incomplete_line_chunk)
                #self.read_offset_file()

def process_line(line):
    print('processing line = {}'.format(line))

if __name__ == '__main__':
    f = '/var/log/syslog'
    llf = LogLineFollower(
        logger=get_logger(),
        follow_file=f,
        offset_file='file_follower.offset',
        cb_process_line=process_line,
        offset_type='begin',
        #offset_type='end',
        chunk_size=4096,
        followed_lines_file='followed_lines.log',
    )
    llf.run()

Zusammenfassung

Wie in vielen Fällen beginnen Sie mit ein paar Zeilen Code, und schon sind Sie auf dem richtigen Weg. Bald werden Sie nach Möglichkeiten suchen, Ihren Code zu testen. Deshalb habe ich die optionale 'followed_lines_file' hinzugefügt. Sie können die Daten dieser Datei mit den Daten der 'follow_file' vergleichen.

Habe ich Zeit gespart, indem ich das selbst geschrieben habe, oder hätte ich eines der Pypi.org -Pakete verwenden sollen? Leider lassen die meisten Pakete auf Pypi.org , die eine vergleichbare Funktionalität haben, einige meiner Anforderungen vermissen, haben Probleme oder werden nicht wirklich gepflegt. Aber es war lehrreich, sie zu studieren. Ich denke, ich hatte in diesem Fall keine große Wahl.

Links / Impressum

logtailtool
https://github.com/minyong-jeong/logtailtool

py-logwatcher
https://github.com/tonytan4ever/py-logwatcher

pygtail
https://github.com/bgreenlee/pygtail

tail-uwsgi-log
https://pypi.org/project/tail-uwsgi-log

Mehr erfahren

Log file

Einen Kommentar hinterlassen

Kommentieren Sie anonym oder melden Sie sich zum Kommentieren an.

Kommentare

Eine Antwort hinterlassen

Antworten Sie anonym oder melden Sie sich an, um zu antworten.