angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

LogLineFollower : Suivre les lignes d'un fichier journal en expansion

Lisez et traitez les lignes d'un fichier journal en expansion et exécutez les actions souhaitées.

9 septembre 2022
Dans Log file
post main image
https://unsplash.com/@mailchimp

Je cherchais un moyen de traiter les lignes d'un fichier journal pendant sa croissance. J'ai trouvé quelques snippets et paquets sur Internet mais pas exactement ce que je voulais. C'est pourquoi j'ai décidé d'écrire mon propre programme.

Après avoir codé une première version, j'ai cherché à nouveau et j'ai trouvé plusieurs autres paquets. Mais, en regardant la description, le code et les problèmes, j'ai décidé de m'en tenir à mon propre code. Il ne peut pas être si difficile de créer quelque chose comme ça ... Mais comme toujours, il y a plus à faire que ce que l'on pense au départ.

Ci-dessous, j'aborde un peu plus les detail et je termine par un code montrant la première version de LogLineFollower. Oh, désolé, Linux seulement, aucune idée si cela fonctionne sur Windows ou macOS.

String vs bytes

Certains paquets ouvrent le fichier journal en mode normal, alors que j'ai décidé d'ouvrir le fichier journal en mode binaire. Pourquoi ai-je choisi de faire cela ? La raison est que, dans le passé, j'ai été confronté à plusieurs reprises à des fichiers journaux qui contenaient des caractères inattendus, ou plutôt illégaux, qui pouvaient faire planter les processeurs de fichiers journaux. Oui, même des outils comme grep peuvent échouer. Dans un cas, syslog a été corrompu plusieurs fois, lorsqu'un processus écrivant dans syslog s'est écrasé.

Utiliser bytes signifie utiliser 'read(chunk_size)' au lieu de 'readline()'. Et au lieu d'utiliser '\n' comme séparateur de ligne, nous utilisons maintenant b'\n'.

L'idée générale de l'utilisation de bytes est d'empêcher ce processus de se planter, de traiter les erreurs (de décodage) d'une manière ou d'une autre et d'essayer de continuer à fonctionner normalement, le tout sans notre intervention.

Détection des changements de fichiers

Les fichiers journaux peuvent changer, généralement par la rotation des fichiers journaux. Le fichier actuel est renommé et un nouveau fichier est utilisé. Ceci peut être détecté sur Linux en vérifiant le inode. Si la inode du fichier journal change, alors le fichier a changé.

Mais il existe d'autres façons de faire tourner les fichiers journaux. L'une d'entre elles consiste à copier le contenu du fichier journal dans un autre fichier, puis à truncater le fichier journal. Ou en copiant un nouveau fichier vide dans le fichier journal actuel. Dans ces cas, la inode ne change pas et la seule chose que nous pouvons faire est de vérifier si le fichier journal est devenu plus petit au lieu d'être plus grand.

Pour aggraver les choses, le nom du fichier journal peut avoir changé après la rotation. Par exemple, de 'access.log' à 'access.log.1'. Dans ce cas, nous avons besoin d'une logique intelligente pour trouver le nouveau fichier journal.

Pouvoir continuer là où on s'est arrêté

Nous ne vivons pas dans un monde parfait et notre processeur de fichiers journaux peut tomber en panne. Ou bien, nous voulons l'arrêter, faire un peu de maintenance et le relancer mais en sautant les lignes que nous avons déjà traitées.

Cela signifie que nous devons nous souvenir de la dernière ligne que nous avons traitée. En stockant le décalage actuel du fichier journal dans un fichier séparé après avoir traité une ligne, nous créons un moyen de continuer après une interruption : Au redémarrage, nous récupérons le décalage enregistré et l'utilisons pour définir la position suivante dans le fichier journal.

Une autre façon de continuer après une interruption peut être de sauvegarder une copie de la dernière ligne traitée, et ensuite d'utiliser cette copie en parcourant le fichier.

Le code : LogLineFollower

La première version du code ci-dessous présente les limitations suivantes :

  • Pas de continuation ou quoi que ce soit sur la rotation du fichier journal.
  • Lorsqu'il est interrompu et redémarré, (certaines) des dernières lignes traitées peuvent réapparaître ou être sautées, voir le code.

Le ou les derniers éléments peuvent être importants, mais c'est à vous d'en décider. Cela se produit parce que j'ai décidé, pour des raisons de performance, que nous devons être en mesure de traiter une liste de lignes au lieu d'une seule ligne à la fois.

Si les mêmes lignes réapparaissent après un redémarrage, l'opération dans votre fonction 'process_line()' doit être idempotente,
signifiant que la même opération peut être appliquée plusieurs fois sans changer le résultat.

Sans changement, le code ci-dessous commence à la première ligne de /var/log/syslog. Lorsqu'il atteint la fin de syslog, il attend de nouvelles lignes.

Pour l'essayer : Copier-coller et exécuter. Et changez. Et exécutez.

"""
follow a log file, e.g. syslog, postfix.log

note(s):
- linux only
- in process_lines() you do what you want
- lines assumed to be ending with '\n'
- we only decode bytes after (!) we have a complete line
- file change (rotate) is detected on inode change
"""
import logging
import os
import sys
import time


# console logger
def get_logger():
    log_level = logging.DEBUG
    #log_level = logging.INFO
    log_format = logging.Formatter('%(asctime)s [%(filename)-30s%(funcName)20s():%(lineno)03s] %(levelname)-8.8s %(message)s')
    logger = logging.getLogger(__name__)
    logger.setLevel(log_level)
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(log_level)
    console_handler.setFormatter(log_format)
    logger.addHandler(console_handler)
    return logger

    
class DummyLogger:
    def __getattr__(self, name):
        return lambda *x: None        


class LogLineFollower:
    """
    wait for new lines in the file to follow and calls process_line() or your callback cb_line() where you can do your work.

    arguments:

    logger              (optional) the logger.
    follow_file         the file to follow, e.g. /var/log/syslog.
    cb_process_line     (optional) callback. your line processing function.
    cb_process_lines    (optional) callback. your multiple lines processing function.
    offset_file         (optional) file holding the information to continue on a restart.
                        the offset file is always present. remove this file to make a fresh start.
                        if not specified its path is follow_file with the extension replaced by '.offset'.
    offset_type         (optional) where to start in follow_file, can be one of:
                        - 'begin': start at beginning of file
                        - 'continue': start where we left off using data from offset_file
                        - 'end': (default) start at end of file
                        important: if offset_file is present, its values will be used to continue
                        regardless of the value of offset_type! see also offset_file.
    chunk_size          (optional) maximum number of bytes we are trying to read at a time from follow_file.
                        default: 4096
    followed_lines_file (optional) receives all lines read.
                        can be used for testing and/or checking.
    """
    def __init__(
        self,
        logger=None,
        follow_file=None,
        cb_process_line=None,
        cb_process_lines=None,
        offset_file=None,
        offset_type=None,
        chunk_size=None,
        followed_lines_file=None,
    ):
        self.logger = logger or DummyLogger()
        self.follow_file = follow_file
        self.cb_process_line = cb_process_line
        self.cb_process_lines = cb_process_lines
        self.offset_file = offset_file or os.path.splitext(self.follow_file)[0] + '.offset'
        self.offset_type = offset_type or 'end'
        self.chunk_size = chunk_size or 4096
        self.followed_lines_file = followed_lines_file

    def append_lines_to_followed_lines_file(self, blines):
        if self.followed_lines_file is None:
            return
        #self.logger.debug('appending blines = {}'.format(blines))
        # decode to utf-8 before appending
        lines = []
        for bline in blines:
            # 'try-except' this if decode has errors
            lines.append(bline.decode('utf-8'))
        with open(self.followed_lines_file, 'a') as fo:
            fo.write('\n'.join(lines) + '\n')

    def process_lines(self, blines):
        # decode to utf-8 before printing
        lines = []
        for bline in blines:
            # 'try-except' this if decode has errors
            line = bline.decode('utf-8')
            lines.append(line)
            self.process_line(line)
        if self.cb_process_lines is not None:
            self.cb_process_lines(lines)


    def process_line(self, line):
        if self.cb_process_line is not None:
            self.cb_process_line(line)
        else:
            self.logger.info('line = {}'.format(line))

    def write_offset_file(self, pos, incomplete_line_chunk):
        # write a comma-separated bytes string: b'inode,pos,incomplete_line_chunk'
        self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(self.inode, pos, incomplete_line_chunk))
        bs = self.inode.to_bytes(8, 'big') + b','
        bs += pos.to_bytes(8, 'big') + b','
        bs += incomplete_line_chunk or b''
        self.logger.debug('bs = {}'.format(bs))
        with open(self.offset_file, 'wb') as fo:
            fo.write(bs)

    def read_offset_file(self):
        bs = None
        try:
            with open(self.offset_file, 'rb') as fo:
                bs = fo.read()
            if bs is not None:
                bs_parts = bs.split(b',', 2)
                bs_parts_len = len(bs_parts)
                if bs_parts_len == 3:
                    inode = int.from_bytes(bs_parts[0], 'big')
                    pos = int.from_bytes(bs_parts[1], 'big')
                    incomplete_line_chunk = bs_parts[2]
                    if incomplete_line_chunk == b'':
                        incomplete_line_chunk = None
                    self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(inode, pos, incomplete_line_chunk))
                    return inode, pos, incomplete_line_chunk
        except Exception as e:
            pass
        return None, None, None

    def set_file(self):
        try:
            inode = os.stat(self.follow_file).st_ino
            self.logger.debug('inode = {}'.format(inode))
            self.inode = inode
            return True
        except Exception as e:
            self.logger.debug('exception = {}, e.args = {}'.format(type(e).__name, e.args))
            return False

    def file_changed(self):
        try:
            inode = os.stat(self.follow_file).st_ino
            self.logger.debug('inode = {}'.format(inode))
            if inode != self.inode:
                return True
            return False
        except Exception as e:
            self.logger.debug('get_inode: exception = {}, e.args = {}'.format(type(e).__name, e.args))
            return True

    def run(self):
        incomplete_line_chunk = None
        wait_count = 0
        self.set_file()
        with open(self.follow_file, 'rb') as fo:
            # continue where we left off, remove offset_file to start fresh
            start_pos_set = False
            inode, pos, incomplete_line_chunk = self.read_offset_file()
            if inode is not None:
                # check saved inode against current inode
                # ...
                # seek to pos in offset_file
                fo.seek(pos, 0)
                start_pos_set = True
            if not start_pos_set:
                if self.offset_type == 'end':
                    # start at end
                    fo.seek(0, 2)
                else:
                    # start at beginning
                    fo.seek(0, 0)

            while True:
                if self.file_changed():
                    self.logger.debug('file not present or changed')
                    break
                # read new chunk
                chunk = fo.read(self.chunk_size)
                chunk_size_read = len(chunk)
                self.logger.debug('chunk_size_read = {}'.format(chunk_size_read))
                if chunk_size_read == 0:
                    self.logger.debug('waiting for new data ... {}'.format(wait_count))
                    time.sleep(1)
                    wait_count += 1
                    continue
                wait_count = 0

                # prepend incomplete_line_chunk if any
                if incomplete_line_chunk is not None:
                    chunk = incomplete_line_chunk + chunk
                    incomplete_line_chunk = None

                # split chunk on new lines
                blines = chunk.split(b'\n')

                # get completed lines, and incomplete_line
                if blines[-1] == b'':
                    # last line is empty means last line -1 is complete
                    del blines[-1]
                else:
                    # last line is not empty means last line is not complete
                    incomplete_line_chunk = blines[-1]
                    del blines[-1]
                # do something with complete lines (bytes)
                if len(blines) > 0:
                    self.append_lines_to_followed_lines_file(blines)
                    self.process_lines(blines)
                # show incomplete_line_chunk
                if incomplete_line_chunk is not None:
                    self.logger.debug('incomplete_line_chunk = {}'.format(incomplete_line_chunk))

                # here we write the file offset pos AFTER 'do something with complete lines'
                # if 'do something with complete lines' is interrupted, 
                # (some of) these lines will be read (and processed) AGAIN the next time.
                #
                # the alternative is to write the offset file BEFORE 'do something with complete lines'
                # if 'do something with complete lines' is interrupted, 
                # (some of) these lines will be SKIPPED the next time 
                # 
                fo.seek(0, 1)
                pos = fo.tell()
                self.logger.debug('pos = {}'.format(pos))
                self.write_offset_file(pos, incomplete_line_chunk)
                #self.read_offset_file()

def process_line(line):
    print('processing line = {}'.format(line))

if __name__ == '__main__':
    f = '/var/log/syslog'
    llf = LogLineFollower(
        logger=get_logger(),
        follow_file=f,
        offset_file='file_follower.offset',
        cb_process_line=process_line,
        offset_type='begin',
        #offset_type='end',
        chunk_size=4096,
        followed_lines_file='followed_lines.log',
    )
    llf.run()

Résumé

Comme dans de nombreux cas, vous commencez avec quelques lignes de code, et vous êtes sur la bonne voie. Bientôt, vous chercherez des moyens de tester votre code. C'est pourquoi j'ai ajouté l'option 'followed_lines_file'. Vous pouvez comparer les données de ce fichier avec celles du fichier 'follow_file'.

Ai-je gagné du temps en écrivant moi-même ce programme, ou aurais-je dû utiliser l'un des paquets de Pypi.org ? Malheureusement, la plupart des paquets de Pypi.org qui ont des fonctionnalités comparables, ne répondent pas à certaines de mes exigences, ont des problèmes ou ne sont pas vraiment maintenus. Mais il était instructif de les étudier. Je suppose que je n'avais pas vraiment le choix dans ce cas.

Liens / crédits

logtailtool
https://github.com/minyong-jeong/logtailtool

py-logwatcher
https://github.com/tonytan4ever/py-logwatcher

pygtail
https://github.com/bgreenlee/pygtail

tail-uwsgi-log
https://pypi.org/project/tail-uwsgi-log

En savoir plus...

Log file

Laissez un commentaire

Commentez anonymement ou connectez-vous pour commenter.

Commentaires

Laissez une réponse

Répondez de manière anonyme ou connectez-vous pour répondre.