angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

LogLineFollower: Следите за строками растущего файла журнала

Считывайте и обрабатывайте строки из растущего файла журнала и выполняйте необходимые действия.

9 сентября 2022
post main image
https://unsplash.com/@mailchimp

Я искал способ обработки строк лог-файла в процессе его роста. В интернете я нашел несколько фрагментов и пакетов, но не совсем то, что мне было нужно. Поэтому я решил написать свой собственный.

После того как я написал первую версию, я поискал снова и нашел еще несколько пакетов. Но, посмотрев на описание, код и проблемы, я решил остановиться на собственном коде. Не может быть так сложно создать что-то подобное... Но, как всегда, в этом деле есть нечто большее, чем кажется на первый взгляд.

Ниже я рассмотрю еще несколько detail и закончу кодом, показывающим первую версию LogLineFollower. О, извините, только Linux , не знаю, работает ли это на Windows или macOS.

Строка против bytes

Некоторые пакеты открывают файл журнала в обычном режиме, в то время как я решил открыть файл журнала в двоичном режиме. Почему я решил поступить именно так? Причина в том, что в прошлом я несколько раз сталкивался с лог-файлами, содержащими неожиданные, лучше сказать, недопустимые символы, которые могут привести к сбою процессоров лог-файлов. Да, даже такие инструменты, как grep, могут дать сбой. В одном случае syslog был поврежден несколько раз, когда процесс, записывающий в syslog , потерпел крах.

Использование bytes означает использование 'read(chunk_size)' вместо 'readline()'. И вместо использования '\n' в качестве разделителя строк мы теперь используем b'\n'.

Вся идея использования bytes заключается в том, чтобы предотвратить падение этого процесса, обработать ошибки (декодирования) определенным образом и попытаться продолжить нормальную работу, все без нашего вмешательства.

Обнаружение изменений файлов

Файлы журналов могут изменяться, как правило, в результате ротации файлов журналов. Текущий файл переименовывается и используется новый файл. Это можно обнаружить на Linux , проверив inode. Если inode файла журнала изменяется, значит, файл изменился.

Но есть и другие способы, с помощью которых файлы журналов могут вращаться. Один из них - это копирование содержимого файла журнала в другой файл, а затем truncatирование файла журнала. Или путем копирования нового пустого файла в текущий файл журнала. В этих случаях inode не меняется, и единственное, что мы можем сделать, это проверить, стал ли файл журнала меньше, а не больше.

Что еще хуже, имя файла журнала могло измениться после поворота. Например, с 'access.log' на 'access.log.1'. В этом случае нам нужна умная логика, чтобы найти новый файл журнала.

Возможность продолжить работу с того места, где остановились

Мы живем не в идеальном мире, и наш процессор журнальных файлов может сломаться. Или же мы хотим остановить его, провести некоторое обслуживание и запустить снова, но пропустить строки, которые мы уже обработали.

Это означает, что мы должны помнить, какая последняя строка была обработана. Сохраняя текущее смещение файла журнала в отдельном файле после обработки строки, мы создаем возможность продолжить работу после прерывания: При перезапуске мы получаем сохраненное смещение и используем его для установки следующей позиции в файле журнала.

Другим способом продолжения работы после прерывания может быть сохранение копии последней обработанной строки, а затем использование этой копии при проходе по файлу.

Код: LogLineFollower

Первая версия приведенного ниже кода имеет следующие ограничения:

  • Нет продолжения или чего-либо еще при вращении файла журнала.
  • При прерывании и перезапуске, (некоторые) обработанные последние строки могут появиться снова или быть пропущены, см. код.

Последний элемент(ы) может быть важным, но это вам решать. Это происходит потому, что я решил, что по соображениям производительности мы должны иметь возможность обрабатывать список строк, а не одну строку за раз.

Если после перезапуска снова появятся те же строки, то операция в вашей функции 'process_line()' должна быть идемпотентной,
означает, что одна и та же операция может быть применена несколько раз без изменения результата.

Без изменений приведенный ниже код начинается с первой строки /var/log/syslog. Когда он достигает конца syslog, он ожидает новых строк.

Чтобы попробовать: Скопируйте-вставьте и запустите. И измените. И запустить.

"""
follow a log file, e.g. syslog, postfix.log

note(s):
- linux only
- in process_lines() you do what you want
- lines assumed to be ending with '\n'
- we only decode bytes after (!) we have a complete line
- file change (rotate) is detected on inode change
"""
import logging
import os
import sys
import time


# console logger
def get_logger():
    log_level = logging.DEBUG
    #log_level = logging.INFO
    log_format = logging.Formatter('%(asctime)s [%(filename)-30s%(funcName)20s():%(lineno)03s] %(levelname)-8.8s %(message)s')
    logger = logging.getLogger(__name__)
    logger.setLevel(log_level)
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(log_level)
    console_handler.setFormatter(log_format)
    logger.addHandler(console_handler)
    return logger

    
class DummyLogger:
    def __getattr__(self, name):
        return lambda *x: None        


class LogLineFollower:
    """
    wait for new lines in the file to follow and calls process_line() or your callback cb_line() where you can do your work.

    arguments:

    logger              (optional) the logger.
    follow_file         the file to follow, e.g. /var/log/syslog.
    cb_process_line     (optional) callback. your line processing function.
    cb_process_lines    (optional) callback. your multiple lines processing function.
    offset_file         (optional) file holding the information to continue on a restart.
                        the offset file is always present. remove this file to make a fresh start.
                        if not specified its path is follow_file with the extension replaced by '.offset'.
    offset_type         (optional) where to start in follow_file, can be one of:
                        - 'begin': start at beginning of file
                        - 'continue': start where we left off using data from offset_file
                        - 'end': (default) start at end of file
                        important: if offset_file is present, its values will be used to continue
                        regardless of the value of offset_type! see also offset_file.
    chunk_size          (optional) maximum number of bytes we are trying to read at a time from follow_file.
                        default: 4096
    followed_lines_file (optional) receives all lines read.
                        can be used for testing and/or checking.
    """
    def __init__(
        self,
        logger=None,
        follow_file=None,
        cb_process_line=None,
        cb_process_lines=None,
        offset_file=None,
        offset_type=None,
        chunk_size=None,
        followed_lines_file=None,
    ):
        self.logger = logger or DummyLogger()
        self.follow_file = follow_file
        self.cb_process_line = cb_process_line
        self.cb_process_lines = cb_process_lines
        self.offset_file = offset_file or os.path.splitext(self.follow_file)[0] + '.offset'
        self.offset_type = offset_type or 'end'
        self.chunk_size = chunk_size or 4096
        self.followed_lines_file = followed_lines_file

    def append_lines_to_followed_lines_file(self, blines):
        if self.followed_lines_file is None:
            return
        #self.logger.debug('appending blines = {}'.format(blines))
        # decode to utf-8 before appending
        lines = []
        for bline in blines:
            # 'try-except' this if decode has errors
            lines.append(bline.decode('utf-8'))
        with open(self.followed_lines_file, 'a') as fo:
            fo.write('\n'.join(lines) + '\n')

    def process_lines(self, blines):
        # decode to utf-8 before printing
        lines = []
        for bline in blines:
            # 'try-except' this if decode has errors
            line = bline.decode('utf-8')
            lines.append(line)
            self.process_line(line)
        if self.cb_process_lines is not None:
            self.cb_process_lines(lines)


    def process_line(self, line):
        if self.cb_process_line is not None:
            self.cb_process_line(line)
        else:
            self.logger.info('line = {}'.format(line))

    def write_offset_file(self, pos, incomplete_line_chunk):
        # write a comma-separated bytes string: b'inode,pos,incomplete_line_chunk'
        self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(self.inode, pos, incomplete_line_chunk))
        bs = self.inode.to_bytes(8, 'big') + b','
        bs += pos.to_bytes(8, 'big') + b','
        bs += incomplete_line_chunk or b''
        self.logger.debug('bs = {}'.format(bs))
        with open(self.offset_file, 'wb') as fo:
            fo.write(bs)

    def read_offset_file(self):
        bs = None
        try:
            with open(self.offset_file, 'rb') as fo:
                bs = fo.read()
            if bs is not None:
                bs_parts = bs.split(b',', 2)
                bs_parts_len = len(bs_parts)
                if bs_parts_len == 3:
                    inode = int.from_bytes(bs_parts[0], 'big')
                    pos = int.from_bytes(bs_parts[1], 'big')
                    incomplete_line_chunk = bs_parts[2]
                    if incomplete_line_chunk == b'':
                        incomplete_line_chunk = None
                    self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(inode, pos, incomplete_line_chunk))
                    return inode, pos, incomplete_line_chunk
        except Exception as e:
            pass
        return None, None, None

    def set_file(self):
        try:
            inode = os.stat(self.follow_file).st_ino
            self.logger.debug('inode = {}'.format(inode))
            self.inode = inode
            return True
        except Exception as e:
            self.logger.debug('exception = {}, e.args = {}'.format(type(e).__name, e.args))
            return False

    def file_changed(self):
        try:
            inode = os.stat(self.follow_file).st_ino
            self.logger.debug('inode = {}'.format(inode))
            if inode != self.inode:
                return True
            return False
        except Exception as e:
            self.logger.debug('get_inode: exception = {}, e.args = {}'.format(type(e).__name, e.args))
            return True

    def run(self):
        incomplete_line_chunk = None
        wait_count = 0
        self.set_file()
        with open(self.follow_file, 'rb') as fo:
            # continue where we left off, remove offset_file to start fresh
            start_pos_set = False
            inode, pos, incomplete_line_chunk = self.read_offset_file()
            if inode is not None:
                # check saved inode against current inode
                # ...
                # seek to pos in offset_file
                fo.seek(pos, 0)
                start_pos_set = True
            if not start_pos_set:
                if self.offset_type == 'end':
                    # start at end
                    fo.seek(0, 2)
                else:
                    # start at beginning
                    fo.seek(0, 0)

            while True:
                if self.file_changed():
                    self.logger.debug('file not present or changed')
                    break
                # read new chunk
                chunk = fo.read(self.chunk_size)
                chunk_size_read = len(chunk)
                self.logger.debug('chunk_size_read = {}'.format(chunk_size_read))
                if chunk_size_read == 0:
                    self.logger.debug('waiting for new data ... {}'.format(wait_count))
                    time.sleep(1)
                    wait_count += 1
                    continue
                wait_count = 0

                # prepend incomplete_line_chunk if any
                if incomplete_line_chunk is not None:
                    chunk = incomplete_line_chunk + chunk
                    incomplete_line_chunk = None

                # split chunk on new lines
                blines = chunk.split(b'\n')

                # get completed lines, and incomplete_line
                if blines[-1] == b'':
                    # last line is empty means last line -1 is complete
                    del blines[-1]
                else:
                    # last line is not empty means last line is not complete
                    incomplete_line_chunk = blines[-1]
                    del blines[-1]
                # do something with complete lines (bytes)
                if len(blines) > 0:
                    self.append_lines_to_followed_lines_file(blines)
                    self.process_lines(blines)
                # show incomplete_line_chunk
                if incomplete_line_chunk is not None:
                    self.logger.debug('incomplete_line_chunk = {}'.format(incomplete_line_chunk))

                # here we write the file offset pos AFTER 'do something with complete lines'
                # if 'do something with complete lines' is interrupted, 
                # (some of) these lines will be read (and processed) AGAIN the next time.
                #
                # the alternative is to write the offset file BEFORE 'do something with complete lines'
                # if 'do something with complete lines' is interrupted, 
                # (some of) these lines will be SKIPPED the next time 
                # 
                fo.seek(0, 1)
                pos = fo.tell()
                self.logger.debug('pos = {}'.format(pos))
                self.write_offset_file(pos, incomplete_line_chunk)
                #self.read_offset_file()

def process_line(line):
    print('processing line = {}'.format(line))

if __name__ == '__main__':
    f = '/var/log/syslog'
    llf = LogLineFollower(
        logger=get_logger(),
        follow_file=f,
        offset_file='file_follower.offset',
        cb_process_line=process_line,
        offset_type='begin',
        #offset_type='end',
        chunk_size=4096,
        followed_lines_file='followed_lines.log',
    )
    llf.run()

Резюме

Как и во многих других случаях, вы начинаете с нескольких строк кода, и вы в пути. Вскоре вы будете искать способы протестировать свой код. Именно поэтому я добавил дополнительный файл 'followed_lines_file'. Вы можете сравнить данные этого файла с данными 'follow_file'.

Сэкономил ли я время, написав это сам, или мне следовало использовать один из пакетов Pypi.org ? К сожалению, большинство пакетов на Pypi.org , имеющих сопоставимую функциональность, не удовлетворяют некоторым из моих требований, имеют проблемы или не поддерживаются. Но изучать их было познавательно. Думаю, в этом случае у меня не было особого выбора.

Ссылки / кредиты

logtailtool
https://github.com/minyong-jeong/logtailtool

py-logwatcher
https://github.com/tonytan4ever/py-logwatcher

pygtail
https://github.com/bgreenlee/pygtail

tail-uwsgi-log
https://pypi.org/project/tail-uwsgi-log

Подробнее

Log file

Оставить комментарий

Комментируйте анонимно или войдите в систему, чтобы прокомментировать.

Комментарии

Оставьте ответ

Ответьте анонимно или войдите в систему, чтобы ответить.