Flask application showing stdout and stderr of a background job
A multiprocessing.Queue() is used to capture stdout and stderr lines in real time.

In a Flask project, I needed to run a background job, more specifically a command running in a (Linux) terminal, and show its output, stdout and sterr, in real time in a browser window. You can find some solutions on the internet and this is just another one. I am also using some code I found on the web, see links below.
This solution is using:
- multiprocessing, to start a new process from our Flask app
- subprocess, to start the command
- threads, to capture stdin and stdout
- a multiprocessing.Queue to:
- store the output of the background job
- read the output of the background job into our Flask app
As always, I am running this on Ubuntu.
Single page application
The Flask demo app is a single page application. On this page we can start a command, and once started, a window on the page shows the output on the command.
The Flask app has two routes:
start_command()
The GET-method sends out the page. The POST-method is used to start a command and stop a command. The commands you can try in this demo:
- pwd
- ls -lR
- ps -Af
- someunknowncommand
- cat /var/log/syslog
- tail --lines=5000 /var/log/syslog
- tail -f /var/log/syslog
- docker
get_command_result_data(queue_id)
This route is called by the Javascript on the page every second once a command started. Received data is appended to a 'div' on the page.
This project also uses Bootstrap and JQuery.
Services in Flask
For this Flask app I created a new service. As always I put the service in the folder app.services, and initialize the service in factory.py, using init_app(), just like Flask extensions. Then all we have to do is include the following line in our Python files:
import app.services as services
And then we call our service like:
services.our_service.some_method()
This way we do not have to worry about cyclic imports.
The service BackgroundCommandRunner
BackgroundCommandRunner is our new service with two methods:
start_command(command)
This method creates a queue and starts a new background process running the given command. The process captures stdout and stderr and puts this into the queue.
Returns a tuple (process, queue):
- process: return value from multiprocessing.Process
- queue: the return value from multiprocessing.Queue(), including an id and timestamp
To get the process id: process.pid
To get the queue id: queue.id
get_queue_data(queue_id)
Returns all new available data (lines) by reading from the queue until it is empty. The returned data is jsonified meaning we can return it to the client.
When do we have all output data (stdout, stderr) from the process?
In the client we want to know when the background process finished. Normal operation, i.e. running without errors, is that the streams stdout and stderr are closed.
Assuming the streams do not work as expected in some cases, as a final resort, we can wait for subprocess to finish. Then we add a small delay, two seconds, to allow the queue to be filled with remaining data.
There are other conditions, e.g. when starting an unknown command, subprocess generates an exception. In this case, we send the error message and set the 'ready'-flag.
Some notes
- subprocess is not started with 'shell=True' because that would start another process.
- shlex is used to break a command (string) into a sequence of arguments before calling subprocess.
- Errors in the background process are captured and send to the client using the queue.
- Queues are removed after some time of inactivity (60 seconds).
Try yourself
Warning! As long as you start and stop background tasks from the web page, there should be no problems. However, if something goes wrong or you terminate Flask by pressing Control-C on the command line while a background process is running, then you must stop this background process before restarting the Flask app. This is a demo and no provisions have been made to handle such situations gracefully.
In case you want to try yourself, here is the tree of the project:
.
├── project
│ ├── app
│ │ ├── services
│ │ │ ├── background_command_runner.py
│ │ │ └── __init__.py
│ │ ├── templates
│ │ │ ├── base.html
│ │ │ └── start_command.html
│ │ └── factory.py
│ └── run.py
Create a virtual environment first and then install Flask:
pip install Flask
Then create the following files.
run.py
# run.py
from app.factory import create_app
app = create_app()
if __name__ == '__main__':
app.run(
host= '0.0.0.0',
port=5050,
debug=True,
use_reloader=True,
)
app/factory.py
# app/factory.py
import datetime
import logging
import os
import re
import signal
import sys
import string
import time
from flask import current_app, Flask, g, json, redirect, request, render_template
from .services import (
background_command_runner,
)
import app.services as services
def setup_logging():
logger = logging.getLogger(__name__)
logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
logger.setLevel(logging.DEBUG)
# console
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter(logger_format))
logger.addHandler(console_handler)
return logger
def create_app():
app = Flask(__name__)
# reload template if differs from cached
app.jinja_env.auto_reload = True
app.config['TEMPLATES_AUTO_RELOAD'] = True
# logging
app.logger = setup_logging()
# init services
background_command_runner.init_app(app)
# route to start a command
@app.route('/', methods=['GET', 'POST'])
def start_command():
current_app.logger.debug('()')
command_pid = None
command_qid = None
command = None
error_message = None
if request.method == 'POST':
# stop current background process, if running
try:
command_pid = int(request.form.get('command_pid'))
current_app.logger.debug('command_pid = {}'.format(command_pid))
os.kill(command_pid, signal.SIGKILL)
except:
pass
action = request.form.get('action')
if action == 'start_command':
try:
current_app.logger.debug('starting background command ...')
command = request.form.get('command')
p, q = services.background_command_runner.start_command(command)
command_pid = p.pid
command_qid = q.id
except Exception as e:
error_message = 'Error starting command {}: {}, {}'.format(command, type(e).__name__, e.args)
elif action == 'stop_command':
current_app.logger.debug('stopping background command ...')
pass
return render_template(
'/start_command.html',
page_title='Run command in background',
command=command,
command_pid=command_pid,
command_qid=command_qid,
error_message=error_message,
)
# route to get data from a command
@app.route('/get-command-result-data/<command_qid>', methods=['GET'])
def get_command_result_data(command_qid):
current_app.logger.debug('(command_qid = {})'.format(command_qid))
return services.background_command_runner.get_queue_data(command_qid)
return app
app/services/background_command_runner.py
# app/services/background_command_runner.py
import datetime
import multiprocessing
import os
import queue
import shlex
import subprocess
import sys
import threading
import time
import uuid
from flask import current_app, jsonify
class BackgroundCommandRunner:
def __init__(self, app=None):
self.app = app
# storage for queues by id
self.qid_queues = {}
# remove queue if no activity after this time
self.max_queue_secs = 60
# stream end-of-transmission character
self.EOT = None
if app is not None:
self.init_app(app)
def init_app(self, app):
pass
def __create_queue(self):
q = multiprocessing.Queue()
q.id = uuid.uuid4().hex
q.et = int(time.time())
q.stdout_closed = False
q.stderr_closed = False
self.qid_queues[q.id] = q
return q
def __get_queue_by_id(self, qid):
self.__cleanup_queues()
q = self.qid_queues.get(qid)
if q is not None:
q.et = int(time.time())
return q
def __to_json(self, d):
current_app.logger.debug('d = {}'.format(d))
d_json = None
try:
d_json = jsonify(d)
except Exception as e:
current_app.logger.error('jsonify error, exception = {}, e.args = {} for d = {}'.format(type(e).__name__, e.args, d))
return d_json
def get_queue_data(self, qid):
q = self.__get_queue_by_id(qid)
if q is None:
data = {
'lines': [],
'errors': ['Queue disappeared'],
'ready': True,
}
return self.__to_json({'data': data})
errors = None
ready = False
lines = []
# loop while queue not empty or max lines
while len(lines) < 1000:
try:
stream, line = q.get(block=True, timeout=0.2)
except queue.Empty:
break
except Exception as e:
errors = [type(e).__name__ + ', ' + str(e.args)]
current_app.logger.error('exception = {}, e.args = {}'.format(type(e).__name__, e.args))
ready = True
break
current_app.logger.debug('data from queue, stream = {}, type(line) = {}, line = {}'.format(stream, type(line), line))
if line == self.EOT:
if stream == 'stdout':
q.stdout_closed = True
elif stream == 'stderr':
q.stderr_closed = True
if q.stdout_closed and q.stderr_closed:
ready = True
continue
lines.append({
'stream': stream,
'line': line,
})
if stream == 'exit_code':
current_app.logger.debug('exit_code received')
ready = True
data = {
'lines': lines,
'errors': errors,
'ready': ready,
}
return self.__to_json({'data': data})
def __cleanup_queues(self):
et = int(time.time())
to_delete_qids = [q.id for q in self.qid_queues.values() if (et - q.et) > self.max_queue_secs]
for qid in to_delete_qids:
del self.qid_queues[qid]
def __reader(self, stream, pipe, q):
try:
with pipe:
for line in iter(pipe.readline, ''):
q.put((stream, line))
finally:
q.put((stream, self.EOT))
def __run_command_as_subprocess(self, command, q):
try:
if isinstance(command, str):
command = shlex.split(command)
#print('COMMAND = {}'.format(command))
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines = True,
)
threading.Thread(target=self.__reader, args=['stdout', p.stdout, q], daemon=True).start()
threading.Thread(target=self.__reader, args=['stderr', p.stderr, q], daemon=True).start()
# delay: the process may have completed but the output was still not processed
exit_code = p.wait()
time.sleep(2)
#print('EXIT_CODE = {}'.format(exit_code))
q.put(('exit_code', exit_code))
except Exception as e:
error_message = 'There was an error running the command = {}: {}, {}'.format(command, type(e).__name__, e.args)
#print('ERROR_MESSAGE = {}'.format(error_message))
q.put(('stderr', error_message))
q.put(('exit_code', 1))
def start_command(self, command):
# start process and return
q = self.__create_queue()
q.put(('stdout', 'Running command: ' + command))
p = multiprocessing.Process(
name='__run_command_as_subprocess',
target=self.__run_command_as_subprocess,
args=(command, q),
)
p.start()
return (p, q)
app/services/__init__.py
# app/services/__init__.py
from .background_command_runner import BackgroundCommandRunner
background_command_runner = BackgroundCommandRunner()
app/templates/base.html
{# app/templates/base.html #}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>{{ page_title }}</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65" crossorigin="anonymous">
</head>
<body>
<main id="main" class="container-fluid py-3 qps-0 qflex-fill mt-0">
{% block main -%}{% endblock -%}
{%- if command_qid -%}
<div class="row px-2">
<div class="col-8 px-2 py-2 my-0">
Results for command '{{ command }}':
</div>
<div class="col-4 px-2 pt-0 pb-2 my-0 text-end">
<form method="post">
<input type="hidden" name="command_pid" value="{{ command_pid }}">
<button type="submit" name="action" value="stop_command" class="btn btn-outline-dark btn-sm">
Stop command
</button>
</form>
</div>
</div>
<div class="row px-2">
<div class="col border p-3 overflow-scroll small" id="command_result_data" style="height: 400px;">
</div>
</div>
<p>
Lines received: <span id="lines-received">0</span>
</p>
{%- endif -%}
</main>
<script src="https://code.jquery.com/jquery-3.6.2.min.js" integrity="sha256-2krYZKh//PcchRtd+H+VyyQoZ/e3EcrkxhM8ycwASPA=" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js" integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4" crossorigin="anonymous"></script>
{%- set get_command_result_data_url = '' -%}
{%- if command_qid -%}
{%- set get_command_result_data_url = url_for('get_command_result_data', command_qid=command_qid) -%}
<script>
var build_result_update_secs = 1000;
var lines_received = 0;
function get_command_result_data(){
var url, data, box, li, i, stream, line, exit_code_received = false;
url = '{{ get_command_result_data_url }}';
console.log('url = ' + url);
if(url == ''){
return;
}
$.ajax({
url: url,
type: 'GET',
dataType: 'json',
success: function(rcvd){
data = rcvd.data;
lines_received += data.lines.length;
box = $('#command_result_data');
for(i = 0; i < data.lines.length; i++){
li = data.lines[i];
stream = li.stream;
if(stream == 'stdout'){
line = li.line;
}else if(stream == 'stderr'){
line = 'stderr: ' + li.line;
}else{
line = stream + ': ' + li.line;
}
box.append(line + '<br>');
}
if(data.errors){
data.errors.forEach(function(li, i){
box.append('ERROR: ' + li + '<br>');
});
}
if(data.ready){
box.append('Ready' + '<br>');
}else{
setTimeout(get_command_result_data, build_result_update_secs);
}
box.scrollTop(box.prop('scrollHeight'));
$('#lines-received').text(lines_received);
}
});
}
$(document).ready(function(){
setTimeout(get_command_result_data, build_result_update_secs);
});
</script>
{%- endif -%}
</body>
</html>
app/templates/start_command.html
{# app/templates/start_command.html #}
{% extends "base.html" %}
{% block main %}
<h2 class="mb-3">
{{ page_title }}
</h2>
{%- if error_message -%}
<p class="text-danger">
{{ error_message }}
</p>
{%- endif -%}
<form method="post">
<input type="hidden" name="command_pid" value="{{ command_pid }}">
<select name="command" class="form-select" aria-label="Select command">
<option value="pwd">
pwd
</option>
<option value="ls -lR">
ls -lR
</option>
<option value="ps -Af">
ps -Af
</option>
<option value="someunknowncommand">
someunknowncommand
</option>
<option value="tail --lines=5000 /var/log/syslog">
tail --lines=5000 /var/log/syslog
</option>
<option value="tail -f /var/log/syslog">
tail -f /var/log/syslog
</option>
<option value="docker">
docker
</option>
</select>
<button type="submit" name="action" value="start_command" class="btn btn-primary my-3">
Start command
</button>
</form>
{%- endblock -%}
Running the project
Start the application by moving to the project directory and typing:
python run.py
Then point your browser to:
http://127.0.0.1:5050
The page should appear. Select a command and observe the output.
Summary
As always this took more time than expected. Initially I passed the command as a string to subprocess. The commands 'pwd' and 'ls' worked, but 'ls -l' produced the message:
FileNotFoundError, (2, 'No such file or directory')
After using shlex, this error went away.
Another difficulty was to decide when we have got all data from the subprocess. First I waited for both stdout and stderr closed. But this sometimes did not work. As a final resort, we wait for the subprocess to finish and add a small delay of two seconds, and consider this the end of the streams.
Links / credits
How to continuously display Python output in a Webpage?
https://stackoverflow.com/questions/15092961/how-to-continuously-display-python-output-in-a-webpage
Python - shlex - Simple lexical analysis
https://docs.python.org/3/library/shlex.html
Python - subprocess - Popen
https://docs.python.org/3/library/subprocess.html#subprocess.Popen
Python read from subprocess stdout and stderr separately while preserving order
https://stackoverflow.com/questions/31833897/python-read-from-subprocess-stdout-and-stderr-separately-while-preserving-order
Read more
Flask Multiprocessing
Most viewed
- Using UUIDs instead of Integer Autoincrement Primary Keys with SQLAlchemy and MariaDb
- Using Python's pyOpenSSL to verify SSL certificates downloaded from a host
- Using PyInstaller and Cython to create a Python executable
- Connect to a service on a Docker host from a Docker container
- SQLAlchemy: Using Cascade Deletes to delete related objects
- Flask RESTful API request parameter validation with Marshmallow schemas