angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Aplicación Flask que muestra stdout y stderr de un trabajo de fondo

Se utiliza un multiprocessing.Queue() para capturar las líneas stdout y stderr en tiempo real.

19 diciembre 2022
post main image
https://www.pexels.com/nl-nl/@samerdaboul

En un proyecto Flask , necesitaba ejecutar un trabajo en segundo plano, más concretamente un comando que se ejecuta en un terminal (Linux), y mostrar su salida, stdout y sterr, en tiempo real en una ventana del navegador. Puedes encontrar algunas soluciones en internet y esta es una más. También estoy usando un código que encontré en la web, ver enlaces más abajo.

Esta solución está utilizando:

  • multiproceso, para iniciar un nuevo proceso desde nuestra aplicación Flask
  • subprocess, para iniciar el comando
  • threads, para capturar stdin y stdout
  • un multiprocessing.Queue para:
    - almacenar la salida del trabajo en segundo plano
    - leer la salida del trabajo en segundo plano en nuestra aplicación Flask

Como siempre, estoy ejecutando esto en Ubuntu.

Aplicación de una sola página

La aplicación de demostración Flask es una aplicación de una sola página. En esta página podemos iniciar un comando, y una vez iniciado, una ventana en la página muestra la salida en el comando.

    La app Flask tiene dos rutas:

    start_command()

    El método GET envía la página. El método POST se utiliza para iniciar y detener un comando. Los comandos que puedes probar en esta demo:

    - pwd
    - ls -lR
    - ps -Af
    - someunknowncommand
    - cat /var/log/syslog
    - tail --lines=5000 /var/log/syslog
    - tail -f /var/log/syslog
    - docker

    get_command_result_data(queue_id)

    Esta ruta es llamada por el Javascript en la página cada segundo una vez que se inicia un comando. Los datos recibidos se añaden a un 'div' en la página.

    Este proyecto también utiliza Bootstrap y JQuery.

    Servicios en Flask

    Para esta aplicación Flask he creado un nuevo servicio. Como siempre pongo el servicio en la carpeta app.services, e inicializo el servicio en factory.py, usando init_app(), igual que las extensiones Flask . Entonces todo lo que tenemos que hacer es incluir la siguiente línea en nuestros archivos Python :

    import app.services as services

    Y luego llamamos a nuestro servicio como:

    services.our_service.some_method()

    De esta forma no tenemos que preocuparnos por las importaciones cíclicas.

    El servicio BackgroundCommandRunner

    BackgroundCommandRunner es nuestro nuevo servicio con dos métodos:

    start_command(comando)

    Este método crea una cola e inicia un nuevo proceso en segundo plano que ejecuta el comando dado. El proceso captura stdout y stderr y lo pone en la cola.

    Devuelve un tuple (proceso, cola):

    • proceso: valor de retorno de multiprocessing.Process
    • cola: el valor devuelto por multiprocessing.Queue(), incluyendo un id y una marca de tiempo

    Para obtener el identificador del proceso: process.pid
    Para obtener el identificador de la cola: queue.id

    get_queue_data(queue_id)

    Devuelve todos los nuevos datos disponibles (líneas) leyendo de la cola hasta que esté vacía. Los datos devueltos están jsonificados, lo que significa que podemos devolverlos al cliente.

    ¿Cuándo tenemos todos los datos de salida (stdout, stderr) del proceso?

    En el cliente queremos saber cuándo ha finalizado el proceso en segundo plano. El funcionamiento normal, es decir, sin errores, es que los flujos stdout y stderr están cerrados.

    Suponiendo que los flujos no funcionen como se espera en algunos casos, como último recurso, podemos esperar a que subprocess termine. Entonces añadimos un pequeño retraso, de dos segundos, para permitir que la cola se llene con los datos restantes.

    Hay otras condiciones, por ejemplo, al iniciar un comando desconocido, subprocess genera una excepción. En este caso, enviamos el mensaje de error y activamos la bandera 'ready'.

    Algunas notas

    • subprocess no se inicia con 'shell=True' porque eso iniciaría otro proceso.
    • shlex se utiliza para descomponer un comando (cadena) en una secuencia de argumentos antes de llamar a subprocess.
    • Los errores en el proceso en segundo plano se capturan y se envían al cliente utilizando la cola.
    • Las colas se eliminan tras un tiempo de inactividad (60 segundos).

    Pruebe usted mismo

    ¡Atención! Mientras inicie y detenga las tareas en segundo plano desde la página web, no debería haber problemas. Sin embargo, si algo va mal o finalizas Flask pulsando Control-C en la línea de comandos mientras se está ejecutando un proceso en segundo plano, deberás detener este proceso en segundo plano antes de reiniciar la aplicación Flask . Esto es una demo y no se han tomado medidas para manejar estas situaciones con elegancia.

    Por si quieres probar tú mismo, aquí tienes el árbol del proyecto:

    .
    ├── project
    │   ├── app
    │   │   ├── services
    │   │   │   ├── background_command_runner.py
    │   │   │   └── __init__.py
    │   │   ├── templates
    │   │   │   ├── base.html
    │   │   │   └── start_command.html
    │   │   └── factory.py
    │   └── run.py

    Crea primero un virtual environment y luego instala Flask:

    pip install Flask

    A continuación, cree los siguientes archivos.

    run.py

    # run.py
    from app.factory import create_app
    
    app = create_app()
    
    if __name__ == '__main__':
        app.run(
            host= '0.0.0.0',
            port=5050,
            debug=True,
            use_reloader=True,
        )

    app/factory.py

    # app/factory.py
    import datetime
    import logging
    import os
    import re
    import signal
    import sys
    import string
    import time
    
    from flask import current_app, Flask, g, json, redirect, request, render_template
    
    from .services import (
        background_command_runner,
    )
    
    import app.services as services
    
    
    def setup_logging():
        logger = logging.getLogger(__name__)
        logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
        logger.setLevel(logging.DEBUG)
        # console
        console_handler = logging.StreamHandler(sys.stdout)                             
        console_handler.setLevel(logging.DEBUG)
        console_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(console_handler)                                            
        return logger
    
    
    def create_app():
        app = Flask(__name__)
    
        # reload template if differs from cached
        app.jinja_env.auto_reload = True
        app.config['TEMPLATES_AUTO_RELOAD'] = True
    
        # logging
        app.logger = setup_logging()
    
        # init services
        background_command_runner.init_app(app)
    
        # route to start a command
        @app.route('/', methods=['GET', 'POST'])
        def start_command():
            current_app.logger.debug('()')
    
            command_pid = None
            command_qid = None
            command = None
            error_message = None
            if request.method == 'POST':
                # stop current background process, if running
                try:
                    command_pid = int(request.form.get('command_pid'))
                    current_app.logger.debug('command_pid = {}'.format(command_pid))
                    os.kill(command_pid, signal.SIGKILL)
                except:
                    pass
                action = request.form.get('action')
                if action == 'start_command':
                    try:
                        current_app.logger.debug('starting background command ...')
                        command = request.form.get('command')
                        p, q = services.background_command_runner.start_command(command)
                        command_pid = p.pid
                        command_qid = q.id
                    except Exception as e:
                        error_message = 'Error starting command {}: {}, {}'.format(command, type(e).__name__, e.args)
                elif action == 'stop_command':
                    current_app.logger.debug('stopping background command ...')
                    pass
    
            return render_template(
                '/start_command.html',
                page_title='Run command in background',
                command=command,
                command_pid=command_pid,
                command_qid=command_qid,
                error_message=error_message,
            )
    
        # route to get data from a command
        @app.route('/get-command-result-data/<command_qid>', methods=['GET'])
        def get_command_result_data(command_qid):
            current_app.logger.debug('(command_qid = {})'.format(command_qid))
            return services.background_command_runner.get_queue_data(command_qid)
    
        return app

    app/services/background_command_runner.py

    # app/services/background_command_runner.py
    import datetime
    import multiprocessing
    import os
    import queue
    import shlex
    import subprocess
    import sys
    import threading
    import time
    import uuid
    
    from flask import current_app, jsonify
    
    
    class BackgroundCommandRunner:
        def __init__(self, app=None):
            self.app = app
    
            # storage for queues by id
            self.qid_queues = {}
            # remove queue if no activity after this time
            self.max_queue_secs = 60
            # stream end-of-transmission character
            self.EOT = None
    
            if app is not None:
                self.init_app(app)
    
        def init_app(self, app):
            pass
    
        def __create_queue(self):
            q = multiprocessing.Queue()
            q.id = uuid.uuid4().hex
            q.et = int(time.time())
            q.stdout_closed = False
            q.stderr_closed = False
            self.qid_queues[q.id] = q
            return q
    
        def __get_queue_by_id(self, qid):
            self.__cleanup_queues()
            q = self.qid_queues.get(qid)
            if q is not None:
                q.et = int(time.time())
            return q
    
        def __to_json(self, d):
            current_app.logger.debug('d = {}'.format(d))
            d_json = None
            try:
                d_json = jsonify(d)
            except Exception as e:
                current_app.logger.error('jsonify error, exception = {}, e.args = {} for d = {}'.format(type(e).__name__, e.args, d))
            return d_json
    
        def get_queue_data(self, qid):
            q = self.__get_queue_by_id(qid)
            if q is None:
                data = {
                    'lines': [],
                    'errors': ['Queue disappeared'],
                    'ready': True,
                }
                return self.__to_json({'data': data})
    
            errors = None
            ready = False
            lines = []
            # loop while queue not empty or max lines
            while len(lines) < 1000:
                try:
                    stream, line = q.get(block=True, timeout=0.2)
                except queue.Empty:
                    break
                except Exception as e:
                    errors = [type(e).__name__ + ', ' + str(e.args)]
                    current_app.logger.error('exception = {}, e.args = {}'.format(type(e).__name__, e.args))
                    ready = True
                    break
                current_app.logger.debug('data from queue, stream = {}, type(line) = {}, line = {}'.format(stream, type(line), line))
                if line == self.EOT:
                    if stream == 'stdout':
                        q.stdout_closed = True
                    elif stream == 'stderr':
                        q.stderr_closed = True
                    if q.stdout_closed and q.stderr_closed:
                        ready = True
                    continue
                lines.append({
                    'stream': stream,
                    'line': line,
                })
                if stream == 'exit_code':
                    current_app.logger.debug('exit_code received')
                    ready = True
    
            data = {
                'lines': lines,
                'errors': errors,
                'ready': ready,
            }
            return self.__to_json({'data': data})
    
        def __cleanup_queues(self):
            et = int(time.time())
            to_delete_qids = [q.id for q in self.qid_queues.values() if (et - q.et) > self.max_queue_secs]
            for qid in to_delete_qids:
                del self.qid_queues[qid]
    
        def __reader(self, stream, pipe, q):
            try:
                with pipe:
                    for line in iter(pipe.readline, ''):
                        q.put((stream, line))
            finally:
                q.put((stream, self.EOT))
    
        def __run_command_as_subprocess(self, command, q):
            try:
                if isinstance(command, str):
                    command = shlex.split(command)
                #print('COMMAND = {}'.format(command))
                p = subprocess.Popen(
                    command,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    bufsize=1,
                    universal_newlines = True,
                )
                threading.Thread(target=self.__reader, args=['stdout', p.stdout, q], daemon=True).start()
                threading.Thread(target=self.__reader, args=['stderr', p.stderr, q], daemon=True).start()
                # delay: the process may have completed but the output was still not processed
                exit_code = p.wait()
                time.sleep(2)
                #print('EXIT_CODE = {}'.format(exit_code))
                q.put(('exit_code', exit_code))
            except Exception as e:
                error_message = 'There was an error running the command = {}: {}, {}'.format(command, type(e).__name__, e.args)
                #print('ERROR_MESSAGE = {}'.format(error_message))
                q.put(('stderr', error_message))
                q.put(('exit_code', 1))
    
        def start_command(self, command):
            # start process and return
            q = self.__create_queue()
            q.put(('stdout', 'Running command: ' + command))
            p = multiprocessing.Process(
                name='__run_command_as_subprocess',
                target=self.__run_command_as_subprocess,
                args=(command, q),
            )
            p.start()
            return (p, q)

    app/services/__init__.py

    # app/services/__init__.py
    from .background_command_runner import BackgroundCommandRunner
    background_command_runner = BackgroundCommandRunner()

    app/templates/base.html

    {# app/templates/base.html #}
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
    <title>{{ page_title }}</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65" crossorigin="anonymous">
    </head>
    <body>
    
    <main id="main" class="container-fluid py-3 qps-0 qflex-fill mt-0">
        {% block main -%}{% endblock -%}
    
        {%- if command_qid -%}
        <div class="row px-2">
            <div class="col-8 px-2 py-2 my-0">
                Results for command '{{ command }}':
            </div>
            <div class="col-4 px-2 pt-0 pb-2 my-0 text-end">
                <form method="post">
                <input type="hidden" name="command_pid" value="{{ command_pid }}">
                <button type="submit" name="action" value="stop_command" class="btn btn-outline-dark btn-sm">
                    Stop command
                </button>
                </form>
            </div>
        </div>
        <div class="row px-2">
            <div class="col border p-3 overflow-scroll small" id="command_result_data"  style="height: 400px;">
            </div>
        </div>
        <p>
            Lines received: <span id="lines-received">0</span>
        </p>
        {%- endif -%}
    </main>
    
    <script src="https://code.jquery.com/jquery-3.6.2.min.js" integrity="sha256-2krYZKh//PcchRtd+H+VyyQoZ/e3EcrkxhM8ycwASPA=" crossorigin="anonymous"></script>
    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js" integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4" crossorigin="anonymous"></script>
    
    {%- set get_command_result_data_url = '' -%}
    {%- if command_qid -%}
    {%- set get_command_result_data_url = url_for('get_command_result_data', command_qid=command_qid) -%}
    <script>
    var build_result_update_secs = 1000;
    var lines_received = 0;
    function get_command_result_data(){
        var url, data, box, li, i, stream, line, exit_code_received = false;
        url = '{{ get_command_result_data_url }}';
        console.log('url = ' + url);
        if(url == ''){
            return;
        }
        $.ajax({
            url: url,
            type: 'GET',
            dataType: 'json',
            success: function(rcvd){
                data = rcvd.data;
                lines_received += data.lines.length;
                box = $('#command_result_data');
                for(i = 0; i < data.lines.length; i++){
                    li = data.lines[i];
                    stream = li.stream;
                    if(stream == 'stdout'){
                        line = li.line;
                    }else if(stream == 'stderr'){
                        line = 'stderr: ' + li.line;
                    }else{
                        line = stream + ': ' + li.line;
                    }
                    box.append(line + '<br>');
                }
                if(data.errors){
                    data.errors.forEach(function(li, i){
                        box.append('ERROR: ' + li + '<br>');
                    });
                }
                if(data.ready){
                    box.append('Ready' + '<br>');
                }else{
                    setTimeout(get_command_result_data, build_result_update_secs);
                }
                box.scrollTop(box.prop('scrollHeight'));
                $('#lines-received').text(lines_received);
            }
        });
    }
    $(document).ready(function(){
        setTimeout(get_command_result_data, build_result_update_secs);
    });
    </script>
    {%- endif -%}
    
    </body>
    </html>

    app/templates/start_command.html

    {# app/templates/start_command.html #}
    {% extends "base.html" %}
    {% block main %}
    
    	<h2 class="mb-3">
    		{{ page_title }}
    	</h2>
    	{%- if error_message -%}
    	<p class="text-danger">
    		{{ error_message }}
    	</p>
    	{%- endif -%}
    	<form method="post">
    	<input type="hidden" name="command_pid" value="{{ command_pid }}">
    		<select name="command" class="form-select" aria-label="Select command">
    			<option value="pwd">
    				pwd
    			</option>
    			<option value="ls -lR">
    				ls -lR
    			</option>
    			<option value="ps -Af">
    				ps -Af
    			</option>
    			<option value="someunknowncommand">
    				someunknowncommand
    			</option>
    			<option value="tail --lines=5000 /var/log/syslog">
    				tail --lines=5000 /var/log/syslog
    			</option>
    			<option value="tail -f /var/log/syslog">
    				tail -f /var/log/syslog
    			</option>
    			<option value="docker">
    				docker
    			</option>
    		</select>
    		<button type="submit" name="action" value="start_command" class="btn btn-primary my-3">
    			Start command
    		</button>
    	</form>
    
    {%- endblock -%}
    

    Ejecutar el proyecto

    Inicie la aplicación desplazándose al directorio del proyecto y escribiendo:

    python run.py

    A continuación, dirija su navegador a:

    http://127.0.0.1:5050

    Debería aparecer la página. Seleccione un comando y observe la salida.

    Resumen

    Como siempre esto tomó más tiempo de lo esperado. Inicialmente pasé el comando como una cadena a subprocess. Los comandos 'pwd' y 'ls' funcionaron, pero 'ls -l' produjo el mensaje:

    FileNotFoundError, (2, 'No such file or directory')

    Después de utilizar shlex, este error desapareció.

    Otra dificultad era decidir cuándo habíamos obtenido todos los datos del subprocess. Primero esperé a que se cerraran tanto stdout como stderr . Pero esto a veces no funcionaba. Como último recurso, esperamos a que termine el subprocess y añadimos un pequeño retardo de dos segundos, y consideramos que éste es el final de los flujos.

    Enlaces / créditos

    How to continuously display Python output in a Webpage?
    https://stackoverflow.com/questions/15092961/how-to-continuously-display-python-output-in-a-webpage

    Python - shlex - Simple lexical analysis
    https://docs.python.org/3/library/shlex.html

    Python - subprocess - Popen
    https://docs.python.org/3/library/subprocess.html#subprocess.Popen

    Python read from subprocess stdout and stderr separately while preserving order
    https://stackoverflow.com/questions/31833897/python-read-from-subprocess-stdout-and-stderr-separately-while-preserving-order

    Deje un comentario

    Comente de forma anónima o inicie sesión para comentar.

    Comentarios

    Deje una respuesta.

    Responda de forma anónima o inicie sesión para responder.