angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Application Flask montrant stdout et stderr d'un travail en arrière-plan

Un multiprocessing.Queue() est utilisé pour capturer les lignes stdout et stderr en temps réel.

19 décembre 2022
post main image
https://www.pexels.com/nl-nl/@samerdaboul

Dans le cadre d'un projet Flask , j'avais besoin d'exécuter un travail en arrière-plan, plus précisément une commande s'exécutant dans un terminal (Linux), et d'afficher ses résultats, stdout et sterr, en temps réel dans une fenêtre de navigateur. Vous pouvez trouver quelques solutions sur Internet et celle-ci en est une autre. J'utilise également du code que j'ai trouvé sur le web, voir les liens ci-dessous.

Cette solution utilise :

  • le multiprocessing, pour démarrer un nouveau processus depuis notre application Flask
  • subprocess, pour lancer la commande
  • threads, pour capturer stdin et stdout
  • une file d'attente multiprocessing.Queue pour :
    - stocker le résultat du travail en arrière-plan
    - lire le résultat du travail en arrière-plan dans notre application Flask .

Comme toujours, j'exécute cette application sur Ubuntu.

Application à page unique

L'application de démonstration Flask est une application à page unique. Sur cette page, nous pouvons lancer une commande, et une fois lancée, une fenêtre sur la page montre la sortie de la commande.

    L'application Flask a deux routes :

    start_command()

    La méthode GET envoie la page. La méthode POST est utilisée pour lancer et arrêter une commande. Les commandes que vous pouvez essayer dans cette démo :

    - pwd
    - ls -lR
    - ps -Af
    - someunknowncommand
    - cat /var/log/syslog
    - tail --lines=5000 /var/log/syslog
    - tail -f /var/log/syslog
    - docker

    get_command_result_data(queue_id)

    Cette route est appelée par le Javascript sur la page toutes les secondes dès qu'une commande est lancée. Les données reçues sont ajoutées à un 'div' sur la page.

    Ce projet utilise également Bootstrap et JQuery.

    Services dans Flask

    Pour cette application Flask , j'ai créé un nouveau service. Comme toujours, je place le service dans le dossier app.services, et j'initialise le service dans factory.py, en utilisant init_app(), tout comme les extensions Flask . Ensuite, tout ce que nous avons à faire est d'inclure la ligne suivante dans nos fichiers Python :

    import app.services as services

    Et ensuite nous appelons notre service comme :

    services.our_service.some_method()

    De cette façon, nous n'avons pas à nous soucier des importations cycliques.

    Le service BackgroundCommandRunner

    BackgroundCommandRunner est notre nouveau service avec deux méthodes :

    start_command(command)

    Cette méthode crée une file d'attente et démarre un nouveau processus en arrière-plan exécutant la commande donnée. Le processus capture stdout et stderr et les place dans la file d'attente.

    Retourne un tuple (processus, file d'attente) :

    • processus : valeur de retour de multiprocessing.Process
    • file d'attente : la valeur de retour de multiprocessing.Queue(), y compris un identifiant et un horodatage.

    Pour obtenir l'id du processus : process.pid
    Pour obtenir l'identifiant de la file d'attente : queue.id

    get_queue_data(queue_id)

    Renvoie toutes les nouvelles données disponibles (lignes) en lisant la file d'attente jusqu'à ce qu'elle soit vide. Les données retournées sont jsonifiées, ce qui signifie que nous pouvons les renvoyer au client.

    Quand avons-nous toutes les données de sortie (stdout, stderr) du processus ?

    Dans le client, nous voulons savoir quand le processus d'arrière-plan s'est terminé. En fonctionnement normal, c'est-à-dire sans erreur, les flux stdout et stderr sont fermés.

    En supposant que les flux ne fonctionnent pas comme prévu dans certains cas, en dernier recours, nous pouvons attendre que subprocess se termine. Ensuite, nous ajoutons un petit délai, deux secondes, pour permettre à la file d'attente d'être remplie avec les données restantes.

    Il existe d'autres conditions, par exemple, lors du lancement d'une commande inconnue, subprocess génère une exception. Dans ce cas, nous envoyons le message d'erreur et activons le drapeau 'ready'.

    Quelques notes

    • subprocess n'est pas lancée avec 'shell=True' car cela lancerait un autre processus.
    • shlex est utilisé pour décomposer une commande (chaîne) en une séquence d'arguments avant d'appeler subprocess.
    • Les erreurs dans le processus de fond sont capturées et envoyées au client en utilisant la file d'attente.
    • Les files d'attente sont supprimées après un certain temps d'inactivité (60 secondes).

    Essayez vous-même

    Attention ! Tant que vous démarrez et arrêtez les tâches d'arrière-plan à partir de la page Web, il ne devrait y avoir aucun problème. Toutefois, si quelque chose ne va pas ou si vous arrêtez Flask en appuyant sur Control-C sur la ligne de commande alors qu'un processus d'arrière-plan est en cours d'exécution, vous devez arrêter ce processus d'arrière-plan avant de redémarrer l'application Flask . Il s'agit d'une démo et aucune disposition n'a été prise pour gérer gracieusement de telles situations.

    Au cas où vous voudriez essayer vous-même, voici l'arbre du projet :

    .
    ├── project
    │   ├── app
    │   │   ├── services
    │   │   │   ├── background_command_runner.py
    │   │   │   └── __init__.py
    │   │   ├── templates
    │   │   │   ├── base.html
    │   │   │   └── start_command.html
    │   │   └── factory.py
    │   └── run.py

    Créez d'abord une virtual environment et installez ensuite Flask :

    pip install Flask

    Puis créez les fichiers suivants.

    run.py

    # run.py
    from app.factory import create_app
    
    app = create_app()
    
    if __name__ == '__main__':
        app.run(
            host= '0.0.0.0',
            port=5050,
            debug=True,
            use_reloader=True,
        )

    app/factory.py

    # app/factory.py
    import datetime
    import logging
    import os
    import re
    import signal
    import sys
    import string
    import time
    
    from flask import current_app, Flask, g, json, redirect, request, render_template
    
    from .services import (
        background_command_runner,
    )
    
    import app.services as services
    
    
    def setup_logging():
        logger = logging.getLogger(__name__)
        logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
        logger.setLevel(logging.DEBUG)
        # console
        console_handler = logging.StreamHandler(sys.stdout)                             
        console_handler.setLevel(logging.DEBUG)
        console_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(console_handler)                                            
        return logger
    
    
    def create_app():
        app = Flask(__name__)
    
        # reload template if differs from cached
        app.jinja_env.auto_reload = True
        app.config['TEMPLATES_AUTO_RELOAD'] = True
    
        # logging
        app.logger = setup_logging()
    
        # init services
        background_command_runner.init_app(app)
    
        # route to start a command
        @app.route('/', methods=['GET', 'POST'])
        def start_command():
            current_app.logger.debug('()')
    
            command_pid = None
            command_qid = None
            command = None
            error_message = None
            if request.method == 'POST':
                # stop current background process, if running
                try:
                    command_pid = int(request.form.get('command_pid'))
                    current_app.logger.debug('command_pid = {}'.format(command_pid))
                    os.kill(command_pid, signal.SIGKILL)
                except:
                    pass
                action = request.form.get('action')
                if action == 'start_command':
                    try:
                        current_app.logger.debug('starting background command ...')
                        command = request.form.get('command')
                        p, q = services.background_command_runner.start_command(command)
                        command_pid = p.pid
                        command_qid = q.id
                    except Exception as e:
                        error_message = 'Error starting command {}: {}, {}'.format(command, type(e).__name__, e.args)
                elif action == 'stop_command':
                    current_app.logger.debug('stopping background command ...')
                    pass
    
            return render_template(
                '/start_command.html',
                page_title='Run command in background',
                command=command,
                command_pid=command_pid,
                command_qid=command_qid,
                error_message=error_message,
            )
    
        # route to get data from a command
        @app.route('/get-command-result-data/<command_qid>', methods=['GET'])
        def get_command_result_data(command_qid):
            current_app.logger.debug('(command_qid = {})'.format(command_qid))
            return services.background_command_runner.get_queue_data(command_qid)
    
        return app

    app/services/background_command_runner.py

    # app/services/background_command_runner.py
    import datetime
    import multiprocessing
    import os
    import queue
    import shlex
    import subprocess
    import sys
    import threading
    import time
    import uuid
    
    from flask import current_app, jsonify
    
    
    class BackgroundCommandRunner:
        def __init__(self, app=None):
            self.app = app
    
            # storage for queues by id
            self.qid_queues = {}
            # remove queue if no activity after this time
            self.max_queue_secs = 60
            # stream end-of-transmission character
            self.EOT = None
    
            if app is not None:
                self.init_app(app)
    
        def init_app(self, app):
            pass
    
        def __create_queue(self):
            q = multiprocessing.Queue()
            q.id = uuid.uuid4().hex
            q.et = int(time.time())
            q.stdout_closed = False
            q.stderr_closed = False
            self.qid_queues[q.id] = q
            return q
    
        def __get_queue_by_id(self, qid):
            self.__cleanup_queues()
            q = self.qid_queues.get(qid)
            if q is not None:
                q.et = int(time.time())
            return q
    
        def __to_json(self, d):
            current_app.logger.debug('d = {}'.format(d))
            d_json = None
            try:
                d_json = jsonify(d)
            except Exception as e:
                current_app.logger.error('jsonify error, exception = {}, e.args = {} for d = {}'.format(type(e).__name__, e.args, d))
            return d_json
    
        def get_queue_data(self, qid):
            q = self.__get_queue_by_id(qid)
            if q is None:
                data = {
                    'lines': [],
                    'errors': ['Queue disappeared'],
                    'ready': True,
                }
                return self.__to_json({'data': data})
    
            errors = None
            ready = False
            lines = []
            # loop while queue not empty or max lines
            while len(lines) < 1000:
                try:
                    stream, line = q.get(block=True, timeout=0.2)
                except queue.Empty:
                    break
                except Exception as e:
                    errors = [type(e).__name__ + ', ' + str(e.args)]
                    current_app.logger.error('exception = {}, e.args = {}'.format(type(e).__name__, e.args))
                    ready = True
                    break
                current_app.logger.debug('data from queue, stream = {}, type(line) = {}, line = {}'.format(stream, type(line), line))
                if line == self.EOT:
                    if stream == 'stdout':
                        q.stdout_closed = True
                    elif stream == 'stderr':
                        q.stderr_closed = True
                    if q.stdout_closed and q.stderr_closed:
                        ready = True
                    continue
                lines.append({
                    'stream': stream,
                    'line': line,
                })
                if stream == 'exit_code':
                    current_app.logger.debug('exit_code received')
                    ready = True
    
            data = {
                'lines': lines,
                'errors': errors,
                'ready': ready,
            }
            return self.__to_json({'data': data})
    
        def __cleanup_queues(self):
            et = int(time.time())
            to_delete_qids = [q.id for q in self.qid_queues.values() if (et - q.et) > self.max_queue_secs]
            for qid in to_delete_qids:
                del self.qid_queues[qid]
    
        def __reader(self, stream, pipe, q):
            try:
                with pipe:
                    for line in iter(pipe.readline, ''):
                        q.put((stream, line))
            finally:
                q.put((stream, self.EOT))
    
        def __run_command_as_subprocess(self, command, q):
            try:
                if isinstance(command, str):
                    command = shlex.split(command)
                #print('COMMAND = {}'.format(command))
                p = subprocess.Popen(
                    command,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    bufsize=1,
                    universal_newlines = True,
                )
                threading.Thread(target=self.__reader, args=['stdout', p.stdout, q], daemon=True).start()
                threading.Thread(target=self.__reader, args=['stderr', p.stderr, q], daemon=True).start()
                # delay: the process may have completed but the output was still not processed
                exit_code = p.wait()
                time.sleep(2)
                #print('EXIT_CODE = {}'.format(exit_code))
                q.put(('exit_code', exit_code))
            except Exception as e:
                error_message = 'There was an error running the command = {}: {}, {}'.format(command, type(e).__name__, e.args)
                #print('ERROR_MESSAGE = {}'.format(error_message))
                q.put(('stderr', error_message))
                q.put(('exit_code', 1))
    
        def start_command(self, command):
            # start process and return
            q = self.__create_queue()
            q.put(('stdout', 'Running command: ' + command))
            p = multiprocessing.Process(
                name='__run_command_as_subprocess',
                target=self.__run_command_as_subprocess,
                args=(command, q),
            )
            p.start()
            return (p, q)

    app/services/__init__.py

    # app/services/__init__.py
    from .background_command_runner import BackgroundCommandRunner
    background_command_runner = BackgroundCommandRunner()

    app/templates/base.html

    {# app/templates/base.html #}
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
    <title>{{ page_title }}</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65" crossorigin="anonymous">
    </head>
    <body>
    
    <main id="main" class="container-fluid py-3 qps-0 qflex-fill mt-0">
        {% block main -%}{% endblock -%}
    
        {%- if command_qid -%}
        <div class="row px-2">
            <div class="col-8 px-2 py-2 my-0">
                Results for command '{{ command }}':
            </div>
            <div class="col-4 px-2 pt-0 pb-2 my-0 text-end">
                <form method="post">
                <input type="hidden" name="command_pid" value="{{ command_pid }}">
                <button type="submit" name="action" value="stop_command" class="btn btn-outline-dark btn-sm">
                    Stop command
                </button>
                </form>
            </div>
        </div>
        <div class="row px-2">
            <div class="col border p-3 overflow-scroll small" id="command_result_data"  style="height: 400px;">
            </div>
        </div>
        <p>
            Lines received: <span id="lines-received">0</span>
        </p>
        {%- endif -%}
    </main>
    
    <script src="https://code.jquery.com/jquery-3.6.2.min.js" integrity="sha256-2krYZKh//PcchRtd+H+VyyQoZ/e3EcrkxhM8ycwASPA=" crossorigin="anonymous"></script>
    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js" integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4" crossorigin="anonymous"></script>
    
    {%- set get_command_result_data_url = '' -%}
    {%- if command_qid -%}
    {%- set get_command_result_data_url = url_for('get_command_result_data', command_qid=command_qid) -%}
    <script>
    var build_result_update_secs = 1000;
    var lines_received = 0;
    function get_command_result_data(){
        var url, data, box, li, i, stream, line, exit_code_received = false;
        url = '{{ get_command_result_data_url }}';
        console.log('url = ' + url);
        if(url == ''){
            return;
        }
        $.ajax({
            url: url,
            type: 'GET',
            dataType: 'json',
            success: function(rcvd){
                data = rcvd.data;
                lines_received += data.lines.length;
                box = $('#command_result_data');
                for(i = 0; i < data.lines.length; i++){
                    li = data.lines[i];
                    stream = li.stream;
                    if(stream == 'stdout'){
                        line = li.line;
                    }else if(stream == 'stderr'){
                        line = 'stderr: ' + li.line;
                    }else{
                        line = stream + ': ' + li.line;
                    }
                    box.append(line + '<br>');
                }
                if(data.errors){
                    data.errors.forEach(function(li, i){
                        box.append('ERROR: ' + li + '<br>');
                    });
                }
                if(data.ready){
                    box.append('Ready' + '<br>');
                }else{
                    setTimeout(get_command_result_data, build_result_update_secs);
                }
                box.scrollTop(box.prop('scrollHeight'));
                $('#lines-received').text(lines_received);
            }
        });
    }
    $(document).ready(function(){
        setTimeout(get_command_result_data, build_result_update_secs);
    });
    </script>
    {%- endif -%}
    
    </body>
    </html>

    app/templates/start_command.html

    {# app/templates/start_command.html #}
    {% extends "base.html" %}
    {% block main %}
    
    	<h2 class="mb-3">
    		{{ page_title }}
    	</h2>
    	{%- if error_message -%}
    	<p class="text-danger">
    		{{ error_message }}
    	</p>
    	{%- endif -%}
    	<form method="post">
    	<input type="hidden" name="command_pid" value="{{ command_pid }}">
    		<select name="command" class="form-select" aria-label="Select command">
    			<option value="pwd">
    				pwd
    			</option>
    			<option value="ls -lR">
    				ls -lR
    			</option>
    			<option value="ps -Af">
    				ps -Af
    			</option>
    			<option value="someunknowncommand">
    				someunknowncommand
    			</option>
    			<option value="tail --lines=5000 /var/log/syslog">
    				tail --lines=5000 /var/log/syslog
    			</option>
    			<option value="tail -f /var/log/syslog">
    				tail -f /var/log/syslog
    			</option>
    			<option value="docker">
    				docker
    			</option>
    		</select>
    		<button type="submit" name="action" value="start_command" class="btn btn-primary my-3">
    			Start command
    		</button>
    	</form>
    
    {%- endblock -%}
    

    Exécution du projet

    Démarrez l'application en vous plaçant dans le répertoire du projet et en tapant :

    python run.py

    Puis dirigez votre navigateur vers :

    http://127.0.0.1:5050

    La page devrait apparaître. Sélectionnez une commande et observez la sortie.

    Résumé

    Comme toujours, cela a pris plus de temps que prévu. Au départ, j'ai transmis la commande sous forme de chaîne à subprocess. Les commandes 'pwd' et 'ls' ont fonctionné, mais 'ls -l' a produit le message :

    FileNotFoundError, (2, 'No such file or directory')

    Après avoir utilisé shlex, cette erreur a disparu.

    Une autre difficulté était de décider quand nous avions obtenu toutes les données de la subprocess. J'ai d'abord attendu la fermeture des deux stdout et stderr . Mais cela n'a parfois pas fonctionné. En dernier recours, nous attendons la fin de la subprocess et ajoutons un petit délai de deux secondes, et considérons que c'est la fin des flux.

    Liens / crédits

    How to continuously display Python output in a Webpage?
    https://stackoverflow.com/questions/15092961/how-to-continuously-display-python-output-in-a-webpage

    Python - shlex - Simple lexical analysis
    https://docs.python.org/3/library/shlex.html

    Python - subprocess - Popen
    https://docs.python.org/3/library/subprocess.html#subprocess.Popen

    Python read from subprocess stdout and stderr separately while preserving order
    https://stackoverflow.com/questions/31833897/python-read-from-subprocess-stdout-and-stderr-separately-while-preserving-order

    En savoir plus...

    Flask Multiprocessing

    Laissez un commentaire

    Commentez anonymement ou connectez-vous pour commenter.

    Commentaires

    Laissez une réponse

    Répondez de manière anonyme ou connectez-vous pour répondre.