angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Flask -Anwendung mit stdout und stderr eines Hintergrundjobs

Ein multiprocessing.Queue() wird verwendet, um stdout und stderr Linien in Echtzeit zu erfassen.

19 Dezember 2022
post main image
https://www.pexels.com/nl-nl/@samerdaboul

In einem Flask -Projekt musste ich einen Hintergrundjob ausführen, genauer gesagt einen Befehl, der in einem (Linux) Terminal läuft, und dessen Ausgabe, stdout und sterr, in Echtzeit in einem Browserfenster anzeigen. Sie können einige Lösungen im Internet finden und dies ist nur eine weitere. Ich verwende auch einen Code, den ich im Internet gefunden habe, siehe Links unten.

Diese Lösung verwendet:

  • multiprocessing, um einen neuen Prozess von unserer Flask Anwendung zu starten
  • subprocess, um den Befehl zu starten
  • Threads, zur Erfassung von stdin und stdout
  • eine multiprocessing.Queue, um:
    - die Ausgabe des Hintergrundjobs zu speichern
    - die Ausgabe des Hintergrundjobs in unsere Flask -App zu lesen

Wie immer führe ich dies auf Ubuntu aus.

Einseitige Anwendung

Die Flask Demo-Anwendung ist eine einseitige Anwendung. Auf dieser Seite kann ein Befehl gestartet werden, und sobald er gestartet ist, zeigt ein Fenster auf der Seite die Ausgabe des Befehls an.

    Die Flask Anwendung hat zwei Routen:

    start_command()

    Die GET-Methode sendet die Seite aus. Die POST-Methode wird verwendet, um einen Befehl zu starten und einen Befehl zu stoppen. Die Befehle, die Sie in dieser Demo ausprobieren können:

    - pwd
    - ls -lR
    - ps -Af
    - someunknowncommand
    - cat /var/log/syslog
    - tail --lines=5000 /var/log/syslog
    - tail -f /var/log/syslog
    - docker

    get_command_result_data(queue_id)

    Diese Route wird vom Javascript auf der Seite jede Sekunde aufgerufen, sobald ein Befehl gestartet wurde. Die empfangenen Daten werden an ein 'div' auf der Seite angehängt.

    Dieses Projekt verwendet auch Bootstrap und JQuery.

    Dienstleistungen in Flask

    Für diese Flask -App habe ich einen neuen Dienst erstellt. Wie immer lege ich den Service im Ordner app.services ab und initialisiere den Service in factory.py, indem ich init_app() verwende, genau wie bei den Flask -Erweiterungen. Dann müssen wir nur noch die folgende Zeile in unsere Python -Dateien einfügen:

    import app.services as services

    Und dann rufen wir unseren Dienst wie folgt auf:

    services.our_service.some_method()

    Auf diese Weise müssen wir uns nicht um zyklische Importe kümmern.

    Der Dienst BackgroundCommandRunner

    BackgroundCommandRunner ist unser neuer Dienst mit zwei Methoden:

    start_command(command)

    Diese Methode erstellt eine Warteschlange und startet einen neuen Hintergrundprozess, der den angegebenen Befehl ausführt. Der Prozess erfasst stdout und stderr und stellt diese in die Warteschlange.

    Gibt einen tuple (Prozess, Warteschlange) zurück:

    • process: Rückgabewert von multiprocessing.Process
    • queue: der Rückgabewert von multiprocessing.Queue(), einschließlich einer ID und eines Zeitstempels

    Um die Prozess-ID zu erhalten: process.pid
    Abfrage der Warteschlangen-ID: queue.id

    get_queue_data(queue_id)

    Gibt alle neuen verfügbaren Daten (Zeilen) zurück, indem es aus der Warteschlange liest, bis diese leer ist. Die zurückgegebenen Daten sind jsonifiziert, d. h. wir können sie an den Client zurückgeben.

    Wann haben wir alle Ausgabedaten (stdout, stderr) vom Prozess?

    Im Client wollen wir wissen, wann der Hintergrundprozess beendet wurde. Im Normalbetrieb, d.h. bei fehlerfreiem Betrieb, werden die Streams stdout und stderr geschlossen.

    In der Annahme, dass die Streams in einigen Fällen nicht wie erwartet funktionieren, können wir als letzten Ausweg warten, bis subprocess beendet ist. Dann fügen wir eine kleine Verzögerung von zwei Sekunden hinzu, damit die Warteschlange mit den restlichen Daten gefüllt werden kann.

    Es gibt noch andere Bedingungen, z. B. wenn ein unbekannter Befehl gestartet wird, erzeugt subprocess eine Ausnahme. In diesem Fall wird die Fehlermeldung gesendet und das 'ready'-Flag gesetzt.

    Einige Hinweise

    • subprocess wird nicht mit 'shell=True' gestartet, da dies einen anderen Prozess starten würde.
    • shlex wird verwendet, um einen Befehl (String) in eine Folge von Argumenten zu zerlegen, bevor subprocess aufgerufen wird.
    • Fehler im Hintergrundprozess werden aufgefangen und über die Warteschlange an den Client gesendet.
    • Die Warteschlangen werden nach einer gewissen Zeit der Inaktivität (60 Sekunden) entfernt.

    Versuchen Sie es selbst

    Warnung! Solange Sie Hintergrundaufgaben von der Webseite aus starten und stoppen, sollte es keine Probleme geben. Wenn jedoch etwas schief geht oder Sie Flask durch Drücken von Control-C in der Kommandozeile beenden, während ein Hintergrundprozess läuft, müssen Sie diesen Hintergrundprozess beenden, bevor Sie die Flask -App neu starten. Dies ist eine Demo, und es wurden keine Vorkehrungen getroffen, um solche Situationen angemessen zu behandeln.

    Falls Sie es selbst ausprobieren möchten, finden Sie hier den Baum des Projekts:

    .
    ├── project
    │   ├── app
    │   │   ├── services
    │   │   │   ├── background_command_runner.py
    │   │   │   └── __init__.py
    │   │   ├── templates
    │   │   │   ├── base.html
    │   │   │   └── start_command.html
    │   │   └── factory.py
    │   └── run.py

    Erstellen Sie zuerst eine virtual environment und installieren Sie dann Flask:

    pip install Flask

    Erstellen Sie dann die folgenden Dateien.

    run.py

    # run.py
    from app.factory import create_app
    
    app = create_app()
    
    if __name__ == '__main__':
        app.run(
            host= '0.0.0.0',
            port=5050,
            debug=True,
            use_reloader=True,
        )

    app/factory.py

    # app/factory.py
    import datetime
    import logging
    import os
    import re
    import signal
    import sys
    import string
    import time
    
    from flask import current_app, Flask, g, json, redirect, request, render_template
    
    from .services import (
        background_command_runner,
    )
    
    import app.services as services
    
    
    def setup_logging():
        logger = logging.getLogger(__name__)
        logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
        logger.setLevel(logging.DEBUG)
        # console
        console_handler = logging.StreamHandler(sys.stdout)                             
        console_handler.setLevel(logging.DEBUG)
        console_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(console_handler)                                            
        return logger
    
    
    def create_app():
        app = Flask(__name__)
    
        # reload template if differs from cached
        app.jinja_env.auto_reload = True
        app.config['TEMPLATES_AUTO_RELOAD'] = True
    
        # logging
        app.logger = setup_logging()
    
        # init services
        background_command_runner.init_app(app)
    
        # route to start a command
        @app.route('/', methods=['GET', 'POST'])
        def start_command():
            current_app.logger.debug('()')
    
            command_pid = None
            command_qid = None
            command = None
            error_message = None
            if request.method == 'POST':
                # stop current background process, if running
                try:
                    command_pid = int(request.form.get('command_pid'))
                    current_app.logger.debug('command_pid = {}'.format(command_pid))
                    os.kill(command_pid, signal.SIGKILL)
                except:
                    pass
                action = request.form.get('action')
                if action == 'start_command':
                    try:
                        current_app.logger.debug('starting background command ...')
                        command = request.form.get('command')
                        p, q = services.background_command_runner.start_command(command)
                        command_pid = p.pid
                        command_qid = q.id
                    except Exception as e:
                        error_message = 'Error starting command {}: {}, {}'.format(command, type(e).__name__, e.args)
                elif action == 'stop_command':
                    current_app.logger.debug('stopping background command ...')
                    pass
    
            return render_template(
                '/start_command.html',
                page_title='Run command in background',
                command=command,
                command_pid=command_pid,
                command_qid=command_qid,
                error_message=error_message,
            )
    
        # route to get data from a command
        @app.route('/get-command-result-data/<command_qid>', methods=['GET'])
        def get_command_result_data(command_qid):
            current_app.logger.debug('(command_qid = {})'.format(command_qid))
            return services.background_command_runner.get_queue_data(command_qid)
    
        return app

    app/services/background_command_runner.py

    # app/services/background_command_runner.py
    import datetime
    import multiprocessing
    import os
    import queue
    import shlex
    import subprocess
    import sys
    import threading
    import time
    import uuid
    
    from flask import current_app, jsonify
    
    
    class BackgroundCommandRunner:
        def __init__(self, app=None):
            self.app = app
    
            # storage for queues by id
            self.qid_queues = {}
            # remove queue if no activity after this time
            self.max_queue_secs = 60
            # stream end-of-transmission character
            self.EOT = None
    
            if app is not None:
                self.init_app(app)
    
        def init_app(self, app):
            pass
    
        def __create_queue(self):
            q = multiprocessing.Queue()
            q.id = uuid.uuid4().hex
            q.et = int(time.time())
            q.stdout_closed = False
            q.stderr_closed = False
            self.qid_queues[q.id] = q
            return q
    
        def __get_queue_by_id(self, qid):
            self.__cleanup_queues()
            q = self.qid_queues.get(qid)
            if q is not None:
                q.et = int(time.time())
            return q
    
        def __to_json(self, d):
            current_app.logger.debug('d = {}'.format(d))
            d_json = None
            try:
                d_json = jsonify(d)
            except Exception as e:
                current_app.logger.error('jsonify error, exception = {}, e.args = {} for d = {}'.format(type(e).__name__, e.args, d))
            return d_json
    
        def get_queue_data(self, qid):
            q = self.__get_queue_by_id(qid)
            if q is None:
                data = {
                    'lines': [],
                    'errors': ['Queue disappeared'],
                    'ready': True,
                }
                return self.__to_json({'data': data})
    
            errors = None
            ready = False
            lines = []
            # loop while queue not empty or max lines
            while len(lines) < 1000:
                try:
                    stream, line = q.get(block=True, timeout=0.2)
                except queue.Empty:
                    break
                except Exception as e:
                    errors = [type(e).__name__ + ', ' + str(e.args)]
                    current_app.logger.error('exception = {}, e.args = {}'.format(type(e).__name__, e.args))
                    ready = True
                    break
                current_app.logger.debug('data from queue, stream = {}, type(line) = {}, line = {}'.format(stream, type(line), line))
                if line == self.EOT:
                    if stream == 'stdout':
                        q.stdout_closed = True
                    elif stream == 'stderr':
                        q.stderr_closed = True
                    if q.stdout_closed and q.stderr_closed:
                        ready = True
                    continue
                lines.append({
                    'stream': stream,
                    'line': line,
                })
                if stream == 'exit_code':
                    current_app.logger.debug('exit_code received')
                    ready = True
    
            data = {
                'lines': lines,
                'errors': errors,
                'ready': ready,
            }
            return self.__to_json({'data': data})
    
        def __cleanup_queues(self):
            et = int(time.time())
            to_delete_qids = [q.id for q in self.qid_queues.values() if (et - q.et) > self.max_queue_secs]
            for qid in to_delete_qids:
                del self.qid_queues[qid]
    
        def __reader(self, stream, pipe, q):
            try:
                with pipe:
                    for line in iter(pipe.readline, ''):
                        q.put((stream, line))
            finally:
                q.put((stream, self.EOT))
    
        def __run_command_as_subprocess(self, command, q):
            try:
                if isinstance(command, str):
                    command = shlex.split(command)
                #print('COMMAND = {}'.format(command))
                p = subprocess.Popen(
                    command,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    bufsize=1,
                    universal_newlines = True,
                )
                threading.Thread(target=self.__reader, args=['stdout', p.stdout, q], daemon=True).start()
                threading.Thread(target=self.__reader, args=['stderr', p.stderr, q], daemon=True).start()
                # delay: the process may have completed but the output was still not processed
                exit_code = p.wait()
                time.sleep(2)
                #print('EXIT_CODE = {}'.format(exit_code))
                q.put(('exit_code', exit_code))
            except Exception as e:
                error_message = 'There was an error running the command = {}: {}, {}'.format(command, type(e).__name__, e.args)
                #print('ERROR_MESSAGE = {}'.format(error_message))
                q.put(('stderr', error_message))
                q.put(('exit_code', 1))
    
        def start_command(self, command):
            # start process and return
            q = self.__create_queue()
            q.put(('stdout', 'Running command: ' + command))
            p = multiprocessing.Process(
                name='__run_command_as_subprocess',
                target=self.__run_command_as_subprocess,
                args=(command, q),
            )
            p.start()
            return (p, q)

    app/services/__init__.py

    # app/services/__init__.py
    from .background_command_runner import BackgroundCommandRunner
    background_command_runner = BackgroundCommandRunner()

    app/templates/base.html

    {# app/templates/base.html #}
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
    <title>{{ page_title }}</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65" crossorigin="anonymous">
    </head>
    <body>
    
    <main id="main" class="container-fluid py-3 qps-0 qflex-fill mt-0">
        {% block main -%}{% endblock -%}
    
        {%- if command_qid -%}
        <div class="row px-2">
            <div class="col-8 px-2 py-2 my-0">
                Results for command '{{ command }}':
            </div>
            <div class="col-4 px-2 pt-0 pb-2 my-0 text-end">
                <form method="post">
                <input type="hidden" name="command_pid" value="{{ command_pid }}">
                <button type="submit" name="action" value="stop_command" class="btn btn-outline-dark btn-sm">
                    Stop command
                </button>
                </form>
            </div>
        </div>
        <div class="row px-2">
            <div class="col border p-3 overflow-scroll small" id="command_result_data"  style="height: 400px;">
            </div>
        </div>
        <p>
            Lines received: <span id="lines-received">0</span>
        </p>
        {%- endif -%}
    </main>
    
    <script src="https://code.jquery.com/jquery-3.6.2.min.js" integrity="sha256-2krYZKh//PcchRtd+H+VyyQoZ/e3EcrkxhM8ycwASPA=" crossorigin="anonymous"></script>
    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js" integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4" crossorigin="anonymous"></script>
    
    {%- set get_command_result_data_url = '' -%}
    {%- if command_qid -%}
    {%- set get_command_result_data_url = url_for('get_command_result_data', command_qid=command_qid) -%}
    <script>
    var build_result_update_secs = 1000;
    var lines_received = 0;
    function get_command_result_data(){
        var url, data, box, li, i, stream, line, exit_code_received = false;
        url = '{{ get_command_result_data_url }}';
        console.log('url = ' + url);
        if(url == ''){
            return;
        }
        $.ajax({
            url: url,
            type: 'GET',
            dataType: 'json',
            success: function(rcvd){
                data = rcvd.data;
                lines_received += data.lines.length;
                box = $('#command_result_data');
                for(i = 0; i < data.lines.length; i++){
                    li = data.lines[i];
                    stream = li.stream;
                    if(stream == 'stdout'){
                        line = li.line;
                    }else if(stream == 'stderr'){
                        line = 'stderr: ' + li.line;
                    }else{
                        line = stream + ': ' + li.line;
                    }
                    box.append(line + '<br>');
                }
                if(data.errors){
                    data.errors.forEach(function(li, i){
                        box.append('ERROR: ' + li + '<br>');
                    });
                }
                if(data.ready){
                    box.append('Ready' + '<br>');
                }else{
                    setTimeout(get_command_result_data, build_result_update_secs);
                }
                box.scrollTop(box.prop('scrollHeight'));
                $('#lines-received').text(lines_received);
            }
        });
    }
    $(document).ready(function(){
        setTimeout(get_command_result_data, build_result_update_secs);
    });
    </script>
    {%- endif -%}
    
    </body>
    </html>

    app/vorlagen/start_befehl.html

    {# app/templates/start_command.html #}
    {% extends "base.html" %}
    {% block main %}
    
    	<h2 class="mb-3">
    		{{ page_title }}
    	</h2>
    	{%- if error_message -%}
    	<p class="text-danger">
    		{{ error_message }}
    	</p>
    	{%- endif -%}
    	<form method="post">
    	<input type="hidden" name="command_pid" value="{{ command_pid }}">
    		<select name="command" class="form-select" aria-label="Select command">
    			<option value="pwd">
    				pwd
    			</option>
    			<option value="ls -lR">
    				ls -lR
    			</option>
    			<option value="ps -Af">
    				ps -Af
    			</option>
    			<option value="someunknowncommand">
    				someunknowncommand
    			</option>
    			<option value="tail --lines=5000 /var/log/syslog">
    				tail --lines=5000 /var/log/syslog
    			</option>
    			<option value="tail -f /var/log/syslog">
    				tail -f /var/log/syslog
    			</option>
    			<option value="docker">
    				docker
    			</option>
    		</select>
    		<button type="submit" name="action" value="start_command" class="btn btn-primary my-3">
    			Start command
    		</button>
    	</form>
    
    {%- endblock -%}
    

    Ausführen des Projekts

    Starten Sie die Anwendung, indem Sie in das Projektverzeichnis wechseln und den Befehl eingeben:

    python run.py

    Zeigen Sie dann mit Ihrem Browser auf:

    http://127.0.0.1:5050

    Die Seite sollte erscheinen. Wählen Sie einen Befehl aus und beobachten Sie die Ausgabe.

    Zusammenfassung

    Wie immer hat dies mehr Zeit in Anspruch genommen als erwartet. Zunächst übergab ich den Befehl als String an subprocess. Die Befehle "pwd" und "ls" funktionierten, aber "ls -l" ergab die Meldung:

    FileNotFoundError, (2, 'No such file or directory')

    Nachdem wir shlex verwendet hatten, verschwand dieser Fehler.

    Eine weitere Schwierigkeit bestand darin, zu entscheiden, wann wir alle Daten aus subprocess erhalten haben. Zunächst wartete ich, bis sowohl stdout als auch stderr geschlossen waren. Aber das hat manchmal nicht funktioniert. Als letzten Ausweg warten wir, bis subprocess geschlossen ist, fügen eine kleine Verzögerung von zwei Sekunden hinzu und betrachten dies als das Ende der Streams.

    Links / Impressum

    How to continuously display Python output in a Webpage?
    https://stackoverflow.com/questions/15092961/how-to-continuously-display-python-output-in-a-webpage

    Python - shlex - Simple lexical analysis
    https://docs.python.org/3/library/shlex.html

    Python - subprocess - Popen
    https://docs.python.org/3/library/subprocess.html#subprocess.Popen

    Python read from subprocess stdout and stderr separately while preserving order
    https://stackoverflow.com/questions/31833897/python-read-from-subprocess-stdout-and-stderr-separately-while-preserving-order

    Einen Kommentar hinterlassen

    Kommentieren Sie anonym oder melden Sie sich zum Kommentieren an.

    Kommentare

    Eine Antwort hinterlassen

    Antworten Sie anonym oder melden Sie sich an, um zu antworten.