angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

LogLineFollower: Regels van een groeiend logbestand volgen

Lees en verwerk regels uit een groeiend logbestand en voer de gewenste acties uit.

9 september 2022
post main image
https://unsplash.com/@mailchimp

Ik was op zoek naar een manier om regels van een logbestand te verwerken terwijl het groeit. Ik vond wat snippets en packages op het internet, maar niet precies wat ik wilde. Dat is waarom ik besloot om mijn eigen versie te schrijven.

Nadat ik een eerste versie had gecodeerd, zocht ik opnieuw en vond nog enkele pakketten. Maar, kijkend naar de beschrijving, de code en de problemen, besloot ik het bij mijn eigen code te houden. Het kan niet zo moeilijk zijn om zoiets te maken ... Maar zoals altijd, er komt meer bij kijken dan je in eerste instantie denkt.

Hieronder ga ik nog wat dieper in op details en eindig met code die de eerste versie van LogLineFollower laat zien. Oh, sorry, alleen Linux , geen idee of het op Windows of macOS draait.

String vs bytes

Sommige pakketten openen het logbestand in normale modus, terwijl ik besloten heb het logbestand in binaire modus te openen. Waarom heb ik hiervoor gekozen? De reden is dat ik in het verleden verschillende keren geconfronteerd werd met log bestanden die onverwachte, een beter woord is illegale, karakters bevatten, die log file processoren kunnen laten crashen. Ja, zelfs tools als grep kunnen falen. In één geval raakte syslog meerdere malen gecorrumpeerd, toen een proces dat naar syslog schreef crashte.

Het gebruik van bytes betekent het gebruik van "read(chunk_size)" in plaats van "readline()". En in plaats van '\n' als lijnscheidingsteken te gebruiken, gebruiken we nu b'\n'.

Het hele idee van het gebruik van bytes is om te voorkomen dat dit proces crasht, om (decoderings-)fouten op de een of andere manier af te handelen en om te proberen de normale werking voort te zetten, allemaal zonder onze interactie.

Bestandswijzigingen detecteren

Logbestanden kunnen veranderen, typisch door logbestandrotatie. Het huidige bestand wordt hernoemd en een nieuw bestand wordt gebruikt. Dit kan worden gedetecteerd op Linux door de inode te controleren. Als de inode van het logbestand verandert, dan is het bestand veranderd.

Maar er zijn andere manieren waarop logbestanden kunnen rouleren. Een daarvan is door de inhoud van het logbestand naar een ander bestand te kopiëren en dan het logbestand te truncaten. Of door een nieuw leeg bestand naar het huidige logbestand te kopiëren. In deze gevallen verandert de inode niet en het enige wat we kunnen doen is controleren of het logbestand kleiner is geworden in plaats van groter.

Om het nog erger te maken, kan de naam van het logbestand veranderd zijn na het roteren. Bijvoorbeeld van 'access.log' naar 'access.log.1'.In dit geval hebben we wat slimme logica nodig om het nieuwe logbestand te vinden.

In staat zijn om verder te gaan waar we gebleven waren

We leven niet in een perfecte wereld en onze log file processor kan crashen. Of, we willen het stoppen, wat onderhoud doen en het opnieuw draaien, maar de regels die we al verwerkt hebben overslaan.

Dit betekent dat we moeten onthouden wat de laatste regel was die we verwerkt hebben. Door de huidige offset van het logbestand op te slaan in een apart bestand na het verwerken van een regel, creëren we een manier om door te gaan na een onderbreking: Bij een herstart halen we de opgeslagen offset op en gebruiken deze om de volgende positie in het logbestand in te stellen.

Een andere manier om door te gaan na een onderbreking kan zijn om een kopie van de laatst verwerkte regel op te slaan, en dan deze kopie te gebruiken bij het doorlopen van het bestand.

De code: LogLineFollower

De eerste versie van de code hieronder heeft de volgende beperkingen:

  • Geen voortzetting of wat dan ook bij het roteren van het logbestand.
  • Bij onderbreking en herstart kunnen (sommige) verwerkte laatste regels opnieuw verschijnen of overgeslagen worden, zie code.

De laatste regel(s) kunnen belangrijk zijn, maar dat is aan jou om te beslissen. Dit gebeurt omdat ik om prestatieredenen besloten heb dat we een lijst van regels moeten kunnen verwerken in plaats van een enkele regel per keer.

Als na een herstart dezelfde regels weer opduiken, dan moet de bewerking in je 'process_line()' functie idempotent zijn,
betekent dat dezelfde bewerking meerdere keren kan worden toegepast zonder dat het resultaat verandert.

Zonder wijzigingen begint de onderstaande code op de eerste regel van /var/log/syslog. Wanneer het het einde van syslog bereikt, wacht het op nieuwe regels.

Om het te proberen: Kopieer-plak en voer uit. En veranderen. En uitvoeren.

"""
follow a log file, e.g. syslog, postfix.log

note(s):
- linux only
- in process_lines() you do what you want
- lines assumed to be ending with '\n'
- we only decode bytes after (!) we have a complete line
- file change (rotate) is detected on inode change
"""
import logging
import os
import sys
import time


# console logger
def get_logger():
    log_level = logging.DEBUG
    #log_level = logging.INFO
    log_format = logging.Formatter('%(asctime)s [%(filename)-30s%(funcName)20s():%(lineno)03s] %(levelname)-8.8s %(message)s')
    logger = logging.getLogger(__name__)
    logger.setLevel(log_level)
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(log_level)
    console_handler.setFormatter(log_format)
    logger.addHandler(console_handler)
    return logger

    
class DummyLogger:
    def __getattr__(self, name):
        return lambda *x: None        


class LogLineFollower:
    """
    wait for new lines in the file to follow and calls process_line() or your callback cb_line() where you can do your work.

    arguments:

    logger              (optional) the logger.
    follow_file         the file to follow, e.g. /var/log/syslog.
    cb_process_line     (optional) callback. your line processing function.
    cb_process_lines    (optional) callback. your multiple lines processing function.
    offset_file         (optional) file holding the information to continue on a restart.
                        the offset file is always present. remove this file to make a fresh start.
                        if not specified its path is follow_file with the extension replaced by '.offset'.
    offset_type         (optional) where to start in follow_file, can be one of:
                        - 'begin': start at beginning of file
                        - 'continue': start where we left off using data from offset_file
                        - 'end': (default) start at end of file
                        important: if offset_file is present, its values will be used to continue
                        regardless of the value of offset_type! see also offset_file.
    chunk_size          (optional) maximum number of bytes we are trying to read at a time from follow_file.
                        default: 4096
    followed_lines_file (optional) receives all lines read.
                        can be used for testing and/or checking.
    """
    def __init__(
        self,
        logger=None,
        follow_file=None,
        cb_process_line=None,
        cb_process_lines=None,
        offset_file=None,
        offset_type=None,
        chunk_size=None,
        followed_lines_file=None,
    ):
        self.logger = logger or DummyLogger()
        self.follow_file = follow_file
        self.cb_process_line = cb_process_line
        self.cb_process_lines = cb_process_lines
        self.offset_file = offset_file or os.path.splitext(self.follow_file)[0] + '.offset'
        self.offset_type = offset_type or 'end'
        self.chunk_size = chunk_size or 4096
        self.followed_lines_file = followed_lines_file

    def append_lines_to_followed_lines_file(self, blines):
        if self.followed_lines_file is None:
            return
        #self.logger.debug('appending blines = {}'.format(blines))
        # decode to utf-8 before appending
        lines = []
        for bline in blines:
            # 'try-except' this if decode has errors
            lines.append(bline.decode('utf-8'))
        with open(self.followed_lines_file, 'a') as fo:
            fo.write('\n'.join(lines) + '\n')

    def process_lines(self, blines):
        # decode to utf-8 before printing
        lines = []
        for bline in blines:
            # 'try-except' this if decode has errors
            line = bline.decode('utf-8')
            lines.append(line)
            self.process_line(line)
        if self.cb_process_lines is not None:
            self.cb_process_lines(lines)


    def process_line(self, line):
        if self.cb_process_line is not None:
            self.cb_process_line(line)
        else:
            self.logger.info('line = {}'.format(line))

    def write_offset_file(self, pos, incomplete_line_chunk):
        # write a comma-separated bytes string: b'inode,pos,incomplete_line_chunk'
        self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(self.inode, pos, incomplete_line_chunk))
        bs = self.inode.to_bytes(8, 'big') + b','
        bs += pos.to_bytes(8, 'big') + b','
        bs += incomplete_line_chunk or b''
        self.logger.debug('bs = {}'.format(bs))
        with open(self.offset_file, 'wb') as fo:
            fo.write(bs)

    def read_offset_file(self):
        bs = None
        try:
            with open(self.offset_file, 'rb') as fo:
                bs = fo.read()
            if bs is not None:
                bs_parts = bs.split(b',', 2)
                bs_parts_len = len(bs_parts)
                if bs_parts_len == 3:
                    inode = int.from_bytes(bs_parts[0], 'big')
                    pos = int.from_bytes(bs_parts[1], 'big')
                    incomplete_line_chunk = bs_parts[2]
                    if incomplete_line_chunk == b'':
                        incomplete_line_chunk = None
                    self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(inode, pos, incomplete_line_chunk))
                    return inode, pos, incomplete_line_chunk
        except Exception as e:
            pass
        return None, None, None

    def set_file(self):
        try:
            inode = os.stat(self.follow_file).st_ino
            self.logger.debug('inode = {}'.format(inode))
            self.inode = inode
            return True
        except Exception as e:
            self.logger.debug('exception = {}, e.args = {}'.format(type(e).__name, e.args))
            return False

    def file_changed(self):
        try:
            inode = os.stat(self.follow_file).st_ino
            self.logger.debug('inode = {}'.format(inode))
            if inode != self.inode:
                return True
            return False
        except Exception as e:
            self.logger.debug('get_inode: exception = {}, e.args = {}'.format(type(e).__name, e.args))
            return True

    def run(self):
        incomplete_line_chunk = None
        wait_count = 0
        self.set_file()
        with open(self.follow_file, 'rb') as fo:
            # continue where we left off, remove offset_file to start fresh
            start_pos_set = False
            inode, pos, incomplete_line_chunk = self.read_offset_file()
            if inode is not None:
                # check saved inode against current inode
                # ...
                # seek to pos in offset_file
                fo.seek(pos, 0)
                start_pos_set = True
            if not start_pos_set:
                if self.offset_type == 'end':
                    # start at end
                    fo.seek(0, 2)
                else:
                    # start at beginning
                    fo.seek(0, 0)

            while True:
                if self.file_changed():
                    self.logger.debug('file not present or changed')
                    break
                # read new chunk
                chunk = fo.read(self.chunk_size)
                chunk_size_read = len(chunk)
                self.logger.debug('chunk_size_read = {}'.format(chunk_size_read))
                if chunk_size_read == 0:
                    self.logger.debug('waiting for new data ... {}'.format(wait_count))
                    time.sleep(1)
                    wait_count += 1
                    continue
                wait_count = 0

                # prepend incomplete_line_chunk if any
                if incomplete_line_chunk is not None:
                    chunk = incomplete_line_chunk + chunk
                    incomplete_line_chunk = None

                # split chunk on new lines
                blines = chunk.split(b'\n')

                # get completed lines, and incomplete_line
                if blines[-1] == b'':
                    # last line is empty means last line -1 is complete
                    del blines[-1]
                else:
                    # last line is not empty means last line is not complete
                    incomplete_line_chunk = blines[-1]
                    del blines[-1]
                # do something with complete lines (bytes)
                if len(blines) > 0:
                    self.append_lines_to_followed_lines_file(blines)
                    self.process_lines(blines)
                # show incomplete_line_chunk
                if incomplete_line_chunk is not None:
                    self.logger.debug('incomplete_line_chunk = {}'.format(incomplete_line_chunk))

                # here we write the file offset pos AFTER 'do something with complete lines'
                # if 'do something with complete lines' is interrupted, 
                # (some of) these lines will be read (and processed) AGAIN the next time.
                #
                # the alternative is to write the offset file BEFORE 'do something with complete lines'
                # if 'do something with complete lines' is interrupted, 
                # (some of) these lines will be SKIPPED the next time 
                # 
                fo.seek(0, 1)
                pos = fo.tell()
                self.logger.debug('pos = {}'.format(pos))
                self.write_offset_file(pos, incomplete_line_chunk)
                #self.read_offset_file()

def process_line(line):
    print('processing line = {}'.format(line))

if __name__ == '__main__':
    f = '/var/log/syslog'
    llf = LogLineFollower(
        logger=get_logger(),
        follow_file=f,
        offset_file='file_follower.offset',
        cb_process_line=process_line,
        offset_type='begin',
        #offset_type='end',
        chunk_size=4096,
        followed_lines_file='followed_lines.log',
    )
    llf.run()

Samenvatting

Zoals in veel gevallen, begin je met een paar regels code, en je bent op weg. Al snel ga je op zoek naar manieren om je code te testen. Daarom heb ik de optionele 'followed_lines_file' toegevoegd. Je kunt de gegevens van dit bestand vergelijken met de gegevens van het 'follow_file'.

Heb ik tijd bespaard door dit zelf te schrijven, of had ik beter een van de Pypi.org pakketten kunnen gebruiken? Helaas, de meeste pakketten op Pypi.org die vergelijkbare functionaliteit hebben, missen enkele van mijn eisen, hebben problemen, of worden niet echt onderhouden. Maar het was leerzaam om ze te bestuderen. Ik denk dat ik in dit geval niet veel keus had.

Links / credits

logtailtool
https://github.com/minyong-jeong/logtailtool

py-logwatcher
https://github.com/tonytan4ever/py-logwatcher

pygtail
https://github.com/bgreenlee/pygtail

tail-uwsgi-log
https://pypi.org/project/tail-uwsgi-log

Lees meer

Log file

Laat een reactie achter

Reageer anoniem of log in om commentaar te geven.

Opmerkingen

Laat een antwoord achter

Antwoord anoniem of log in om te antwoorden.