angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

LogLineFollower: Seguir las líneas de un archivo de registro creciente

Lea y procese las líneas de un archivo de registro creciente y ejecute las acciones que desee.

9 septiembre 2022
post main image
https://unsplash.com/@mailchimp

Estaba buscando una manera de procesar líneas de un archivo de registro mientras crecía. Encontré algunos fragmentos y paquetes en internet pero no exactamente lo que quería. Por eso decidí escribir el mío propio.

Después de haber codificado una primera versión, busqué de nuevo y encontré varios paquetes más. Pero, mirando la descripción, el código y los problemas, decidí quedarme con mi propio código. No puede ser tan difícil crear algo así ... Pero como siempre, hay más de lo que se piensa inicialmente.

A continuación, voy a entrar en algunos más details y terminar con el código que muestra la primera versión de LogLineFollower. Oh, lo siento, sólo Linux , ni idea de si funciona con Windows o macOS.

String vs bytes

Algunos paquetes abren el archivo de registro en modo normal, mientras que yo decidí abrir el archivo de registro en modo binario. ¿Por qué decidí hacer esto? La razón es que en el pasado me enfrenté varias veces con archivos de registro que contenían caracteres inesperados, una palabra mejor es ilegal, que pueden hacer que los procesadores de archivos de registro se bloqueen. Sí, incluso herramientas como grep pueden fallar. En un caso, syslog se corrompió varias veces, cuando un proceso que escribía en syslog se estrelló.

Utilizar bytes significa utilizar 'read(chunk_size)' en lugar de 'readline()'. Y en lugar de utilizar '\n' como separador de líneas, ahora utilizamos b'\n'.

La idea de usar bytes es evitar que este proceso se cuelgue, para manejar los errores (de decodificación) de alguna manera y tratar de continuar la operación normal, todo sin nuestra interacción.

Detección de cambios en los archivos

Los archivos de registro pueden cambiar, normalmente por la rotación de los archivos de registro. El archivo actual es renombrado y se utiliza un nuevo archivo. Esto se puede detectar en Linux comprobando el inode. Si el inode del archivo de registro cambia, entonces el archivo ha cambiado.

Pero hay otras formas en las que los archivos de registro pueden rotar. Una es copiando el contenido del archivo de registro a otro archivo y luego truncatando el archivo de registro. O copiando un nuevo archivo vacío en el archivo de registro actual. En estos casos, el inode no cambia y lo único que podemos hacer es comprobar si el archivo de registro se ha hecho más pequeño en lugar de más grande.

Para empeorar las cosas, el nombre del archivo de registro puede haber cambiado después de la rotación. Por ejemplo de 'access.log' a 'access.log.1'.En este caso necesitamos alguna lógica inteligente para encontrar el nuevo archivo de registro.

Poder continuar donde lo dejamos

No vivimos en un mundo perfecto y nuestro procesador de archivos de registro puede fallar. O bien, queremos detenerlo, hacer algo de mantenimiento y ejecutarlo de nuevo pero saltándonos las líneas que ya hemos procesado.

Esto significa que debemos recordar cuál fue la última línea que procesamos. Al almacenar el desplazamiento actual del archivo de registro en un archivo separado después de procesar una línea, creamos una forma de continuar después de una interrupción: En un reinicio recuperamos el desplazamiento guardado y lo utilizamos para establecer la siguiente posición en el archivo de registro.

Una forma diferente de continuar después de una interrupción puede ser guardar una copia de la última línea procesada, y luego usar esta copia al recorrer el archivo.

El código: LogLineFollower

La primera versión del código de abajo tiene las siguientes limitaciones:

  • No hay continuación o lo que sea en la rotación del archivo de registro.
  • Cuando se interrumpe y se reinicia, (algunas) últimas líneas procesadas pueden aparecer de nuevo o saltarse, ver código.

La(s) última(s) línea(s) puede(n) ser importante(s), pero eso lo decide usted. Esto sucede porque he decidido, por razones de rendimiento, que debemos ser capaces de procesar una lista de líneas en lugar de una sola línea a la vez.

Si las mismas líneas aparecen de nuevo después de un reinicio, entonces la operación en su función 'process_line()' debe ser idempotente,
lo que significa que la misma operación puede ser aplicada múltiples veces sin cambiar el resultado.

Sin cambios, el código siguiente comienza en la primera línea de /var/log/syslog. Cuando llega al final de syslog, espera nuevas líneas.

Para probarlo: Copia-pega y ejecuta. Y cambiar. Y ejecuta.

"""
follow a log file, e.g. syslog, postfix.log

note(s):
- linux only
- in process_lines() you do what you want
- lines assumed to be ending with '\n'
- we only decode bytes after (!) we have a complete line
- file change (rotate) is detected on inode change
"""
import logging
import os
import sys
import time


# console logger
def get_logger():
    log_level = logging.DEBUG
    #log_level = logging.INFO
    log_format = logging.Formatter('%(asctime)s [%(filename)-30s%(funcName)20s():%(lineno)03s] %(levelname)-8.8s %(message)s')
    logger = logging.getLogger(__name__)
    logger.setLevel(log_level)
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(log_level)
    console_handler.setFormatter(log_format)
    logger.addHandler(console_handler)
    return logger

    
class DummyLogger:
    def __getattr__(self, name):
        return lambda *x: None        


class LogLineFollower:
    """
    wait for new lines in the file to follow and calls process_line() or your callback cb_line() where you can do your work.

    arguments:

    logger              (optional) the logger.
    follow_file         the file to follow, e.g. /var/log/syslog.
    cb_process_line     (optional) callback. your line processing function.
    cb_process_lines    (optional) callback. your multiple lines processing function.
    offset_file         (optional) file holding the information to continue on a restart.
                        the offset file is always present. remove this file to make a fresh start.
                        if not specified its path is follow_file with the extension replaced by '.offset'.
    offset_type         (optional) where to start in follow_file, can be one of:
                        - 'begin': start at beginning of file
                        - 'continue': start where we left off using data from offset_file
                        - 'end': (default) start at end of file
                        important: if offset_file is present, its values will be used to continue
                        regardless of the value of offset_type! see also offset_file.
    chunk_size          (optional) maximum number of bytes we are trying to read at a time from follow_file.
                        default: 4096
    followed_lines_file (optional) receives all lines read.
                        can be used for testing and/or checking.
    """
    def __init__(
        self,
        logger=None,
        follow_file=None,
        cb_process_line=None,
        cb_process_lines=None,
        offset_file=None,
        offset_type=None,
        chunk_size=None,
        followed_lines_file=None,
    ):
        self.logger = logger or DummyLogger()
        self.follow_file = follow_file
        self.cb_process_line = cb_process_line
        self.cb_process_lines = cb_process_lines
        self.offset_file = offset_file or os.path.splitext(self.follow_file)[0] + '.offset'
        self.offset_type = offset_type or 'end'
        self.chunk_size = chunk_size or 4096
        self.followed_lines_file = followed_lines_file

    def append_lines_to_followed_lines_file(self, blines):
        if self.followed_lines_file is None:
            return
        #self.logger.debug('appending blines = {}'.format(blines))
        # decode to utf-8 before appending
        lines = []
        for bline in blines:
            # 'try-except' this if decode has errors
            lines.append(bline.decode('utf-8'))
        with open(self.followed_lines_file, 'a') as fo:
            fo.write('\n'.join(lines) + '\n')

    def process_lines(self, blines):
        # decode to utf-8 before printing
        lines = []
        for bline in blines:
            # 'try-except' this if decode has errors
            line = bline.decode('utf-8')
            lines.append(line)
            self.process_line(line)
        if self.cb_process_lines is not None:
            self.cb_process_lines(lines)


    def process_line(self, line):
        if self.cb_process_line is not None:
            self.cb_process_line(line)
        else:
            self.logger.info('line = {}'.format(line))

    def write_offset_file(self, pos, incomplete_line_chunk):
        # write a comma-separated bytes string: b'inode,pos,incomplete_line_chunk'
        self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(self.inode, pos, incomplete_line_chunk))
        bs = self.inode.to_bytes(8, 'big') + b','
        bs += pos.to_bytes(8, 'big') + b','
        bs += incomplete_line_chunk or b''
        self.logger.debug('bs = {}'.format(bs))
        with open(self.offset_file, 'wb') as fo:
            fo.write(bs)

    def read_offset_file(self):
        bs = None
        try:
            with open(self.offset_file, 'rb') as fo:
                bs = fo.read()
            if bs is not None:
                bs_parts = bs.split(b',', 2)
                bs_parts_len = len(bs_parts)
                if bs_parts_len == 3:
                    inode = int.from_bytes(bs_parts[0], 'big')
                    pos = int.from_bytes(bs_parts[1], 'big')
                    incomplete_line_chunk = bs_parts[2]
                    if incomplete_line_chunk == b'':
                        incomplete_line_chunk = None
                    self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(inode, pos, incomplete_line_chunk))
                    return inode, pos, incomplete_line_chunk
        except Exception as e:
            pass
        return None, None, None

    def set_file(self):
        try:
            inode = os.stat(self.follow_file).st_ino
            self.logger.debug('inode = {}'.format(inode))
            self.inode = inode
            return True
        except Exception as e:
            self.logger.debug('exception = {}, e.args = {}'.format(type(e).__name, e.args))
            return False

    def file_changed(self):
        try:
            inode = os.stat(self.follow_file).st_ino
            self.logger.debug('inode = {}'.format(inode))
            if inode != self.inode:
                return True
            return False
        except Exception as e:
            self.logger.debug('get_inode: exception = {}, e.args = {}'.format(type(e).__name, e.args))
            return True

    def run(self):
        incomplete_line_chunk = None
        wait_count = 0
        self.set_file()
        with open(self.follow_file, 'rb') as fo:
            # continue where we left off, remove offset_file to start fresh
            start_pos_set = False
            inode, pos, incomplete_line_chunk = self.read_offset_file()
            if inode is not None:
                # check saved inode against current inode
                # ...
                # seek to pos in offset_file
                fo.seek(pos, 0)
                start_pos_set = True
            if not start_pos_set:
                if self.offset_type == 'end':
                    # start at end
                    fo.seek(0, 2)
                else:
                    # start at beginning
                    fo.seek(0, 0)

            while True:
                if self.file_changed():
                    self.logger.debug('file not present or changed')
                    break
                # read new chunk
                chunk = fo.read(self.chunk_size)
                chunk_size_read = len(chunk)
                self.logger.debug('chunk_size_read = {}'.format(chunk_size_read))
                if chunk_size_read == 0:
                    self.logger.debug('waiting for new data ... {}'.format(wait_count))
                    time.sleep(1)
                    wait_count += 1
                    continue
                wait_count = 0

                # prepend incomplete_line_chunk if any
                if incomplete_line_chunk is not None:
                    chunk = incomplete_line_chunk + chunk
                    incomplete_line_chunk = None

                # split chunk on new lines
                blines = chunk.split(b'\n')

                # get completed lines, and incomplete_line
                if blines[-1] == b'':
                    # last line is empty means last line -1 is complete
                    del blines[-1]
                else:
                    # last line is not empty means last line is not complete
                    incomplete_line_chunk = blines[-1]
                    del blines[-1]
                # do something with complete lines (bytes)
                if len(blines) > 0:
                    self.append_lines_to_followed_lines_file(blines)
                    self.process_lines(blines)
                # show incomplete_line_chunk
                if incomplete_line_chunk is not None:
                    self.logger.debug('incomplete_line_chunk = {}'.format(incomplete_line_chunk))

                # here we write the file offset pos AFTER 'do something with complete lines'
                # if 'do something with complete lines' is interrupted, 
                # (some of) these lines will be read (and processed) AGAIN the next time.
                #
                # the alternative is to write the offset file BEFORE 'do something with complete lines'
                # if 'do something with complete lines' is interrupted, 
                # (some of) these lines will be SKIPPED the next time 
                # 
                fo.seek(0, 1)
                pos = fo.tell()
                self.logger.debug('pos = {}'.format(pos))
                self.write_offset_file(pos, incomplete_line_chunk)
                #self.read_offset_file()

def process_line(line):
    print('processing line = {}'.format(line))

if __name__ == '__main__':
    f = '/var/log/syslog'
    llf = LogLineFollower(
        logger=get_logger(),
        follow_file=f,
        offset_file='file_follower.offset',
        cb_process_line=process_line,
        offset_type='begin',
        #offset_type='end',
        chunk_size=4096,
        followed_lines_file='followed_lines.log',
    )
    llf.run()

Resumen

Como en muchos casos, empiezas con unas pocas líneas de código, y estás en el camino. Pronto buscarás la manera de probar tu código. Por eso he añadido el archivo opcional 'líneas_seguidas'. Puedes comparar los datos de este archivo con los datos del 'archivo_seguido'.

¿He ahorrado tiempo escribiendo esto yo mismo, o debería haber utilizado uno de los paquetes de Pypi.org ? Desgraciadamente, la mayoría de los paquetes de Pypi.org que tienen una funcionalidad comparable, no cumplen algunos de mis requisitos, tienen problemas o no están realmente mantenidos. Pero fue instructivo estudiarlos. Supongo que no tenía muchas opciones en este caso.

Enlaces / créditos

logtailtool
https://github.com/minyong-jeong/logtailtool

py-logwatcher
https://github.com/tonytan4ever/py-logwatcher

pygtail
https://github.com/bgreenlee/pygtail

tail-uwsgi-log
https://pypi.org/project/tail-uwsgi-log

Leer más

Log file

Deje un comentario

Comente de forma anónima o inicie sesión para comentar.

Comentarios

Deje una respuesta.

Responda de forma anónima o inicie sesión para responder.