LogLineFollower: Follow lines of a growing log file
Read and process lines from a growing log file and run your desired actions.
I was looking for a way to process lines of a log file while it was growing. I found some snippets and packages on the internet but not exactly what I wanted. That is why I decided to write my own.
After having coded a first version, I searched again and found several more packages. But, looking at the description, the code and issues, I decided to stick with my own code. It cannot be that hard to create something like this ... But as always, there is more to it than you initially think.
Below I go into some more details and end with code showing the first version of LogLineFollower. Oh, sorry, Linux only, no idea if it runs on Windows or macOS.
String vs bytes
Some packages open the log file in normal mode, while I decided to open the log file in binary mode. Why did I choose to do this? The reason is that in the past I was confronted several times with log files that contained unexpected, a better word is illegal, characters, that can cause log file processors to crash. Yes, even tools like grep can fail. In one case, syslog got corrupted multiple times, when a process writing to syslog crashed.
Using bytes means using 'read(chunk_size)' instead of 'readline()'. And instead of using '\n' as the line separator we now use b'\n'.
The whole idea of using bytes is to prevent this process to crash, to handle (decoding) errors in some way and try to continue normal operation, all without our interaction.
Detecting file changes
Log files can change, typically by log file rotation. The current file is renamed and a new file is used. This can be detected on Linux by checking the inode. If the inode of the log file changes, then the file has changed.
But there are other ways how log files can rotate. One is by copying the contents of the log file to another file and then truncating the log file. Or by copying a new empty file to the current log file. In these cases, the inode does not change and the only thing we can do is check if the log file has become smaller instead of bigger.
To make things worst, the log file name may have changed after rotate. For example from 'access.log' to 'access.log.1'.In this case we need some smart logic to find the new log file.
Being able to continue where we left off
We don't live in a perfect world and our log file processor can crash. Or, we want to stop it, do some maintenance and run it again but skip the lines we already processed.
This means that we must remember what the last line was we processed. By storing the current offset of the log file in a separate file after processing a line, we create a way to continue after an interruption: On a restart we fetch the saved offset and use this to set the next position in the log file.
A different way to continue after interruption can be to save a copy of the last line processed, and then use this copy when walking through the file.
The code: LogLineFollower
The first version of the code below has the following limitations:
- No continuation or whatsoever on log file rotate.
- When interupted and restarted, (some) processed last lines may show up again or skipped, see code.
The last item(s) can be important, but that is for you to decide. This is happening because I decided, for performance reasons, that we must be able to process a list of lines instead of a single line at a time.
If the same lines show up again after a restart then the operation in your 'process_line()' function must be idempotent,
meaning that the same operation can be applied multiple times without changing the result.
Without changes, the code below begins at the first line of /var/log/syslog. When it reaches the end of syslog, it waits for new lines.
To try it: Copy-paste and run. And change. And run.
"""
follow a log file, e.g. syslog, postfix.log
note(s):
- linux only
- in process_lines() you do what you want
- lines assumed to be ending with '\n'
- we only decode bytes after (!) we have a complete line
- file change (rotate) is detected on inode change
"""
import logging
import os
import sys
import time
# console logger
def get_logger():
log_level = logging.DEBUG
#log_level = logging.INFO
log_format = logging.Formatter('%(asctime)s [%(filename)-30s%(funcName)20s():%(lineno)03s] %(levelname)-8.8s %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(log_level)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(log_format)
logger.addHandler(console_handler)
return logger
class DummyLogger:
def __getattr__(self, name):
return lambda *x: None
class LogLineFollower:
"""
wait for new lines in the file to follow and calls process_line() or your callback cb_line() where you can do your work.
arguments:
logger (optional) the logger.
follow_file the file to follow, e.g. /var/log/syslog.
cb_process_line (optional) callback. your line processing function.
cb_process_lines (optional) callback. your multiple lines processing function.
offset_file (optional) file holding the information to continue on a restart.
the offset file is always present. remove this file to make a fresh start.
if not specified its path is follow_file with the extension replaced by '.offset'.
offset_type (optional) where to start in follow_file, can be one of:
- 'begin': start at beginning of file
- 'continue': start where we left off using data from offset_file
- 'end': (default) start at end of file
important: if offset_file is present, its values will be used to continue
regardless of the value of offset_type! see also offset_file.
chunk_size (optional) maximum number of bytes we are trying to read at a time from follow_file.
default: 4096
followed_lines_file (optional) receives all lines read.
can be used for testing and/or checking.
"""
def __init__(
self,
logger=None,
follow_file=None,
cb_process_line=None,
cb_process_lines=None,
offset_file=None,
offset_type=None,
chunk_size=None,
followed_lines_file=None,
):
self.logger = logger or DummyLogger()
self.follow_file = follow_file
self.cb_process_line = cb_process_line
self.cb_process_lines = cb_process_lines
self.offset_file = offset_file or os.path.splitext(self.follow_file)[0] + '.offset'
self.offset_type = offset_type or 'end'
self.chunk_size = chunk_size or 4096
self.followed_lines_file = followed_lines_file
def append_lines_to_followed_lines_file(self, blines):
if self.followed_lines_file is None:
return
#self.logger.debug('appending blines = {}'.format(blines))
# decode to utf-8 before appending
lines = []
for bline in blines:
# 'try-except' this if decode has errors
lines.append(bline.decode('utf-8'))
with open(self.followed_lines_file, 'a') as fo:
fo.write('\n'.join(lines) + '\n')
def process_lines(self, blines):
# decode to utf-8 before printing
lines = []
for bline in blines:
# 'try-except' this if decode has errors
line = bline.decode('utf-8')
lines.append(line)
self.process_line(line)
if self.cb_process_lines is not None:
self.cb_process_lines(lines)
def process_line(self, line):
if self.cb_process_line is not None:
self.cb_process_line(line)
else:
self.logger.info('line = {}'.format(line))
def write_offset_file(self, pos, incomplete_line_chunk):
# write a comma-separated bytes string: b'inode,pos,incomplete_line_chunk'
self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(self.inode, pos, incomplete_line_chunk))
bs = self.inode.to_bytes(8, 'big') + b','
bs += pos.to_bytes(8, 'big') + b','
bs += incomplete_line_chunk or b''
self.logger.debug('bs = {}'.format(bs))
with open(self.offset_file, 'wb') as fo:
fo.write(bs)
def read_offset_file(self):
bs = None
try:
with open(self.offset_file, 'rb') as fo:
bs = fo.read()
if bs is not None:
bs_parts = bs.split(b',', 2)
bs_parts_len = len(bs_parts)
if bs_parts_len == 3:
inode = int.from_bytes(bs_parts[0], 'big')
pos = int.from_bytes(bs_parts[1], 'big')
incomplete_line_chunk = bs_parts[2]
if incomplete_line_chunk == b'':
incomplete_line_chunk = None
self.logger.debug('inode = {}, pos = {}, incomplete_line_chunk = {}'.format(inode, pos, incomplete_line_chunk))
return inode, pos, incomplete_line_chunk
except Exception as e:
pass
return None, None, None
def set_file(self):
try:
inode = os.stat(self.follow_file).st_ino
self.logger.debug('inode = {}'.format(inode))
self.inode = inode
return True
except Exception as e:
self.logger.debug('exception = {}, e.args = {}'.format(type(e).__name, e.args))
return False
def file_changed(self):
try:
inode = os.stat(self.follow_file).st_ino
self.logger.debug('inode = {}'.format(inode))
if inode != self.inode:
return True
return False
except Exception as e:
self.logger.debug('get_inode: exception = {}, e.args = {}'.format(type(e).__name, e.args))
return True
def run(self):
incomplete_line_chunk = None
wait_count = 0
self.set_file()
with open(self.follow_file, 'rb') as fo:
# continue where we left off, remove offset_file to start fresh
start_pos_set = False
inode, pos, incomplete_line_chunk = self.read_offset_file()
if inode is not None:
# check saved inode against current inode
# ...
# seek to pos in offset_file
fo.seek(pos, 0)
start_pos_set = True
if not start_pos_set:
if self.offset_type == 'end':
# start at end
fo.seek(0, 2)
else:
# start at beginning
fo.seek(0, 0)
while True:
if self.file_changed():
self.logger.debug('file not present or changed')
break
# read new chunk
chunk = fo.read(self.chunk_size)
chunk_size_read = len(chunk)
self.logger.debug('chunk_size_read = {}'.format(chunk_size_read))
if chunk_size_read == 0:
self.logger.debug('waiting for new data ... {}'.format(wait_count))
time.sleep(1)
wait_count += 1
continue
wait_count = 0
# prepend incomplete_line_chunk if any
if incomplete_line_chunk is not None:
chunk = incomplete_line_chunk + chunk
incomplete_line_chunk = None
# split chunk on new lines
blines = chunk.split(b'\n')
# get completed lines, and incomplete_line
if blines[-1] == b'':
# last line is empty means last line -1 is complete
del blines[-1]
else:
# last line is not empty means last line is not complete
incomplete_line_chunk = blines[-1]
del blines[-1]
# do something with complete lines (bytes)
if len(blines) > 0:
self.append_lines_to_followed_lines_file(blines)
self.process_lines(blines)
# show incomplete_line_chunk
if incomplete_line_chunk is not None:
self.logger.debug('incomplete_line_chunk = {}'.format(incomplete_line_chunk))
# here we write the file offset pos AFTER 'do something with complete lines'
# if 'do something with complete lines' is interrupted,
# (some of) these lines will be read (and processed) AGAIN the next time.
#
# the alternative is to write the offset file BEFORE 'do something with complete lines'
# if 'do something with complete lines' is interrupted,
# (some of) these lines will be SKIPPED the next time
#
fo.seek(0, 1)
pos = fo.tell()
self.logger.debug('pos = {}'.format(pos))
self.write_offset_file(pos, incomplete_line_chunk)
#self.read_offset_file()
def process_line(line):
print('processing line = {}'.format(line))
if __name__ == '__main__':
f = '/var/log/syslog'
llf = LogLineFollower(
logger=get_logger(),
follow_file=f,
offset_file='file_follower.offset',
cb_process_line=process_line,
offset_type='begin',
#offset_type='end',
chunk_size=4096,
followed_lines_file='followed_lines.log',
)
llf.run()
Summary
As in many cases, you start with a few lines of code, and you are on your way. Soon you will look for ways to test your code. That's why I added the optional 'followed_lines_file'. You can compare the data of this file with the data of the 'follow_file'.
Did I save time writing this myself, or should I have used one of the Pypi.org packages? Unfortunately, most packages on Pypi.org that have comparable functionality, miss some of my requirements, have issues, or are not really maintained. But it was instructive to study them. I guess I did not have much of a choice in this case.
Links / credits
logtailtool
https://github.com/minyong-jeong/logtailtool
py-logwatcher
https://github.com/tonytan4ever/py-logwatcher
pygtail
https://github.com/bgreenlee/pygtail
tail-uwsgi-log
https://pypi.org/project/tail-uwsgi-log
Read more
Log file
Recent
Most viewed
- Using PyInstaller and Cython to create a Python executable
- Reducing page response times of a Flask SQLAlchemy website
- Using Python's pyOpenSSL to verify SSL certificates downloaded from a host
- Connect to a service on a Docker host from a Docker container
- Using UUIDs instead of Integer Autoincrement Primary Keys with SQLAlchemy and MariaDb
- SQLAlchemy: Using Cascade Deletes to delete related objects