angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Flask toepassing met stdout en stderr van een achtergrondjob.

Een multiprocessing.Queue() wordt gebruikt om stdout en stderr lijnen in real time vast te leggen.

19 december 2022
post main image
https://www.pexels.com/nl-nl/@samerdaboul

In een Flask project moest ik een achtergrondjob uitvoeren, meer bepaald een commando dat draait in een (Linux) terminal, en de uitvoer ervan, stdout en sterr, in real time tonen in een browservenster. Er zijn enkele oplossingen te vinden op het internet en dit is er nog een. Ik gebruik ook wat code die ik op het web heb gevonden, zie onderstaande links.

Deze oplossing gebruikt:

  • multiprocessing, om een nieuw proces te starten vanuit onze Flask app
  • subprocess, om het commando te starten.
  • threads, om stdin op te vangen en stdout
  • een multiprocessing.Queue om:
    - de uitvoer van de achtergrondjob op te slaan
    - de uitvoer van de achtergrondjob in te lezen in onze Flask app.

Zoals altijd draai ik dit op Ubuntu.

Enkele pagina toepassing

De Flask demo app is een single page applicatie. Op deze pagina kunnen we een commando starten, en eenmaal gestart toont een venster op de pagina de uitvoer van het commando.

    De Flask app heeft twee routes:

    start_command()

    De GET-methode stuurt de pagina uit. De POST-methode wordt gebruikt om een commando te starten en een commando te stoppen. De commando's die u in deze demo kunt proberen:

    - pwd
    - ls -lR
    - ps -Af
    - someunknowncommand
    - cat /var/log/syslog
    - tail --lines=5000 /var/log/syslog
    - tail -f /var/log/syslog
    - docker

    get_command_result_data(queue_id)

    Deze route wordt door de Javascript op de pagina elke seconde aangeroepen zodra een commando is gestart. Ontvangen gegevens worden toegevoegd aan een 'div' op de pagina.

    Dit project maakt ook gebruik van Bootstrap en JQuery.

    Diensten in Flask

    Voor deze Flask app heb ik een nieuwe service gemaakt. Zoals altijd zet ik de service in de map app.services, en initialiseer de service in factory.py, met behulp van init_app(), net als de Flask extensies. Dan hoeven we alleen maar de volgende regel op te nemen in onze Python bestanden:

    import app.services as services

    En dan roepen we onze dienst aan zoals:

    services.our_service.some_method()

    Op deze manier hoeven we ons geen zorgen te maken over cyclische invoer.

    De dienst BackgroundCommandRunner

    BackgroundCommandRunner is onze nieuwe dienst met twee methoden:

    start_command(command)

    Deze methode maakt een wachtrij aan en start een nieuw achtergrondproces dat het gegeven commando uitvoert. Het proces vangt stdout en stderr op en zet deze in de wachtrij.

    Geeft als resultaat een tuple (proces, wachtrij):

    • proces: retourwaarde van multiprocessing.Process
    • wachtrij: de retourwaarde van multiprocessing.Queue(), inclusief een id en timestamp.

    Om het proces-id te krijgen: process.pid
    Om de wachtrij-id te krijgen: queue.id

    get_queue_data(queue_id)

    Retourneert alle nieuwe beschikbare gegevens (regels) door uit de wachtrij te lezen totdat deze leeg is. De geretourneerde gegevens zijn jsonified, wat betekent dat we ze kunnen terugsturen naar de client.

    Wanneer hebben we alle uitvoergegevens (stdout, stderr) van het proces?

    In de client willen we weten wanneer het achtergrondproces is beëindigd. Bij normale werking, d.w.z. zonder fouten, worden de streams stdout en stderr gesloten.

    Aangenomen dat de streams in sommige gevallen niet werken zoals verwacht, kunnen we als laatste redmiddel wachten tot subprocess is afgesloten. Dan voegen we een kleine vertraging toe, twee seconden, om de wachtrij te laten vullen met resterende gegevens.

    Er zijn andere omstandigheden, bijvoorbeeld wanneer een onbekend commando wordt gestart, genereert subprocess een uitzondering. In dat geval sturen we de foutmelding en zetten we de 'ready'-flag.

    Enkele opmerkingen

    • subprocess wordt niet gestart met 'shell=True' omdat dat een ander proces zou starten.
    • shlex wordt gebruikt om een commando (string) op te breken in een reeks argumenten voordat subprocess wordt aangeroepen.
    • Fouten in het achtergrondproces worden opgevangen en via de wachtrij naar de cliënt gestuurd.
    • Wachtrijen worden verwijderd na enige tijd van inactiviteit (60 seconden).

    Probeer uzelf

    Waarschuwing! Zolang u achtergrondtaken start en stopt vanaf de webpagina, zouden er geen problemen moeten zijn. Als er echter iets misgaat of u beëindigt Flask door op Control-C op de opdrachtregel te drukken terwijl er een achtergrondproces loopt, dan moet u dit achtergrondproces stoppen voordat u de Flask app opnieuw start. Dit is een demo en er zijn geen voorzieningen getroffen om dergelijke situaties netjes af te handelen.

    Voor het geval u het zelf wilt proberen, is hier de boom van het project:

    .
    ├── project
    │   ├── app
    │   │   ├── services
    │   │   │   ├── background_command_runner.py
    │   │   │   └── __init__.py
    │   │   ├── templates
    │   │   │   ├── base.html
    │   │   │   └── start_command.html
    │   │   └── factory.py
    │   └── run.py

    Maak eerst een virtual environment en installeer dan Flask:

    pip install Flask

    Maak vervolgens de volgende bestanden aan.

    run.py

    # run.py
    from app.factory import create_app
    
    app = create_app()
    
    if __name__ == '__main__':
        app.run(
            host= '0.0.0.0',
            port=5050,
            debug=True,
            use_reloader=True,
        )

    app/factory.py

    # app/factory.py
    import datetime
    import logging
    import os
    import re
    import signal
    import sys
    import string
    import time
    
    from flask import current_app, Flask, g, json, redirect, request, render_template
    
    from .services import (
        background_command_runner,
    )
    
    import app.services as services
    
    
    def setup_logging():
        logger = logging.getLogger(__name__)
        logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
        logger.setLevel(logging.DEBUG)
        # console
        console_handler = logging.StreamHandler(sys.stdout)                             
        console_handler.setLevel(logging.DEBUG)
        console_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(console_handler)                                            
        return logger
    
    
    def create_app():
        app = Flask(__name__)
    
        # reload template if differs from cached
        app.jinja_env.auto_reload = True
        app.config['TEMPLATES_AUTO_RELOAD'] = True
    
        # logging
        app.logger = setup_logging()
    
        # init services
        background_command_runner.init_app(app)
    
        # route to start a command
        @app.route('/', methods=['GET', 'POST'])
        def start_command():
            current_app.logger.debug('()')
    
            command_pid = None
            command_qid = None
            command = None
            error_message = None
            if request.method == 'POST':
                # stop current background process, if running
                try:
                    command_pid = int(request.form.get('command_pid'))
                    current_app.logger.debug('command_pid = {}'.format(command_pid))
                    os.kill(command_pid, signal.SIGKILL)
                except:
                    pass
                action = request.form.get('action')
                if action == 'start_command':
                    try:
                        current_app.logger.debug('starting background command ...')
                        command = request.form.get('command')
                        p, q = services.background_command_runner.start_command(command)
                        command_pid = p.pid
                        command_qid = q.id
                    except Exception as e:
                        error_message = 'Error starting command {}: {}, {}'.format(command, type(e).__name__, e.args)
                elif action == 'stop_command':
                    current_app.logger.debug('stopping background command ...')
                    pass
    
            return render_template(
                '/start_command.html',
                page_title='Run command in background',
                command=command,
                command_pid=command_pid,
                command_qid=command_qid,
                error_message=error_message,
            )
    
        # route to get data from a command
        @app.route('/get-command-result-data/<command_qid>', methods=['GET'])
        def get_command_result_data(command_qid):
            current_app.logger.debug('(command_qid = {})'.format(command_qid))
            return services.background_command_runner.get_queue_data(command_qid)
    
        return app

    app/services/background_command_runner.py

    # app/services/background_command_runner.py
    import datetime
    import multiprocessing
    import os
    import queue
    import shlex
    import subprocess
    import sys
    import threading
    import time
    import uuid
    
    from flask import current_app, jsonify
    
    
    class BackgroundCommandRunner:
        def __init__(self, app=None):
            self.app = app
    
            # storage for queues by id
            self.qid_queues = {}
            # remove queue if no activity after this time
            self.max_queue_secs = 60
            # stream end-of-transmission character
            self.EOT = None
    
            if app is not None:
                self.init_app(app)
    
        def init_app(self, app):
            pass
    
        def __create_queue(self):
            q = multiprocessing.Queue()
            q.id = uuid.uuid4().hex
            q.et = int(time.time())
            q.stdout_closed = False
            q.stderr_closed = False
            self.qid_queues[q.id] = q
            return q
    
        def __get_queue_by_id(self, qid):
            self.__cleanup_queues()
            q = self.qid_queues.get(qid)
            if q is not None:
                q.et = int(time.time())
            return q
    
        def __to_json(self, d):
            current_app.logger.debug('d = {}'.format(d))
            d_json = None
            try:
                d_json = jsonify(d)
            except Exception as e:
                current_app.logger.error('jsonify error, exception = {}, e.args = {} for d = {}'.format(type(e).__name__, e.args, d))
            return d_json
    
        def get_queue_data(self, qid):
            q = self.__get_queue_by_id(qid)
            if q is None:
                data = {
                    'lines': [],
                    'errors': ['Queue disappeared'],
                    'ready': True,
                }
                return self.__to_json({'data': data})
    
            errors = None
            ready = False
            lines = []
            # loop while queue not empty or max lines
            while len(lines) < 1000:
                try:
                    stream, line = q.get(block=True, timeout=0.2)
                except queue.Empty:
                    break
                except Exception as e:
                    errors = [type(e).__name__ + ', ' + str(e.args)]
                    current_app.logger.error('exception = {}, e.args = {}'.format(type(e).__name__, e.args))
                    ready = True
                    break
                current_app.logger.debug('data from queue, stream = {}, type(line) = {}, line = {}'.format(stream, type(line), line))
                if line == self.EOT:
                    if stream == 'stdout':
                        q.stdout_closed = True
                    elif stream == 'stderr':
                        q.stderr_closed = True
                    if q.stdout_closed and q.stderr_closed:
                        ready = True
                    continue
                lines.append({
                    'stream': stream,
                    'line': line,
                })
                if stream == 'exit_code':
                    current_app.logger.debug('exit_code received')
                    ready = True
    
            data = {
                'lines': lines,
                'errors': errors,
                'ready': ready,
            }
            return self.__to_json({'data': data})
    
        def __cleanup_queues(self):
            et = int(time.time())
            to_delete_qids = [q.id for q in self.qid_queues.values() if (et - q.et) > self.max_queue_secs]
            for qid in to_delete_qids:
                del self.qid_queues[qid]
    
        def __reader(self, stream, pipe, q):
            try:
                with pipe:
                    for line in iter(pipe.readline, ''):
                        q.put((stream, line))
            finally:
                q.put((stream, self.EOT))
    
        def __run_command_as_subprocess(self, command, q):
            try:
                if isinstance(command, str):
                    command = shlex.split(command)
                #print('COMMAND = {}'.format(command))
                p = subprocess.Popen(
                    command,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    bufsize=1,
                    universal_newlines = True,
                )
                threading.Thread(target=self.__reader, args=['stdout', p.stdout, q], daemon=True).start()
                threading.Thread(target=self.__reader, args=['stderr', p.stderr, q], daemon=True).start()
                # delay: the process may have completed but the output was still not processed
                exit_code = p.wait()
                time.sleep(2)
                #print('EXIT_CODE = {}'.format(exit_code))
                q.put(('exit_code', exit_code))
            except Exception as e:
                error_message = 'There was an error running the command = {}: {}, {}'.format(command, type(e).__name__, e.args)
                #print('ERROR_MESSAGE = {}'.format(error_message))
                q.put(('stderr', error_message))
                q.put(('exit_code', 1))
    
        def start_command(self, command):
            # start process and return
            q = self.__create_queue()
            q.put(('stdout', 'Running command: ' + command))
            p = multiprocessing.Process(
                name='__run_command_as_subprocess',
                target=self.__run_command_as_subprocess,
                args=(command, q),
            )
            p.start()
            return (p, q)

    app/services/__init__.py

    # app/services/__init__.py
    from .background_command_runner import BackgroundCommandRunner
    background_command_runner = BackgroundCommandRunner()

    app/templates/base.html

    {# app/templates/base.html #}
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
    <title>{{ page_title }}</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65" crossorigin="anonymous">
    </head>
    <body>
    
    <main id="main" class="container-fluid py-3 qps-0 qflex-fill mt-0">
        {% block main -%}{% endblock -%}
    
        {%- if command_qid -%}
        <div class="row px-2">
            <div class="col-8 px-2 py-2 my-0">
                Results for command '{{ command }}':
            </div>
            <div class="col-4 px-2 pt-0 pb-2 my-0 text-end">
                <form method="post">
                <input type="hidden" name="command_pid" value="{{ command_pid }}">
                <button type="submit" name="action" value="stop_command" class="btn btn-outline-dark btn-sm">
                    Stop command
                </button>
                </form>
            </div>
        </div>
        <div class="row px-2">
            <div class="col border p-3 overflow-scroll small" id="command_result_data"  style="height: 400px;">
            </div>
        </div>
        <p>
            Lines received: <span id="lines-received">0</span>
        </p>
        {%- endif -%}
    </main>
    
    <script src="https://code.jquery.com/jquery-3.6.2.min.js" integrity="sha256-2krYZKh//PcchRtd+H+VyyQoZ/e3EcrkxhM8ycwASPA=" crossorigin="anonymous"></script>
    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js" integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4" crossorigin="anonymous"></script>
    
    {%- set get_command_result_data_url = '' -%}
    {%- if command_qid -%}
    {%- set get_command_result_data_url = url_for('get_command_result_data', command_qid=command_qid) -%}
    <script>
    var build_result_update_secs = 1000;
    var lines_received = 0;
    function get_command_result_data(){
        var url, data, box, li, i, stream, line, exit_code_received = false;
        url = '{{ get_command_result_data_url }}';
        console.log('url = ' + url);
        if(url == ''){
            return;
        }
        $.ajax({
            url: url,
            type: 'GET',
            dataType: 'json',
            success: function(rcvd){
                data = rcvd.data;
                lines_received += data.lines.length;
                box = $('#command_result_data');
                for(i = 0; i < data.lines.length; i++){
                    li = data.lines[i];
                    stream = li.stream;
                    if(stream == 'stdout'){
                        line = li.line;
                    }else if(stream == 'stderr'){
                        line = 'stderr: ' + li.line;
                    }else{
                        line = stream + ': ' + li.line;
                    }
                    box.append(line + '<br>');
                }
                if(data.errors){
                    data.errors.forEach(function(li, i){
                        box.append('ERROR: ' + li + '<br>');
                    });
                }
                if(data.ready){
                    box.append('Ready' + '<br>');
                }else{
                    setTimeout(get_command_result_data, build_result_update_secs);
                }
                box.scrollTop(box.prop('scrollHeight'));
                $('#lines-received').text(lines_received);
            }
        });
    }
    $(document).ready(function(){
        setTimeout(get_command_result_data, build_result_update_secs);
    });
    </script>
    {%- endif -%}
    
    </body>
    </html>

    app/templates/start_command.html

    {# app/templates/start_command.html #}
    {% extends "base.html" %}
    {% block main %}
    
    	<h2 class="mb-3">
    		{{ page_title }}
    	</h2>
    	{%- if error_message -%}
    	<p class="text-danger">
    		{{ error_message }}
    	</p>
    	{%- endif -%}
    	<form method="post">
    	<input type="hidden" name="command_pid" value="{{ command_pid }}">
    		<select name="command" class="form-select" aria-label="Select command">
    			<option value="pwd">
    				pwd
    			</option>
    			<option value="ls -lR">
    				ls -lR
    			</option>
    			<option value="ps -Af">
    				ps -Af
    			</option>
    			<option value="someunknowncommand">
    				someunknowncommand
    			</option>
    			<option value="tail --lines=5000 /var/log/syslog">
    				tail --lines=5000 /var/log/syslog
    			</option>
    			<option value="tail -f /var/log/syslog">
    				tail -f /var/log/syslog
    			</option>
    			<option value="docker">
    				docker
    			</option>
    		</select>
    		<button type="submit" name="action" value="start_command" class="btn btn-primary my-3">
    			Start command
    		</button>
    	</form>
    
    {%- endblock -%}
    

    Het project uitvoeren

    Start de applicatie door naar de projectdirectory te gaan en te typen:

    python run.py

    Richt vervolgens uw browser op:

    http://127.0.0.1:5050

    De pagina zou moeten verschijnen. Selecteer een commando en bekijk de uitvoer.

    Samenvatting

    Zoals altijd kostte dit meer tijd dan verwacht. Aanvankelijk gaf ik het commando als een string door aan subprocess. De commando's 'pwd' en 'ls' werkten, maar 'ls -l' produceerde de boodschap:

    FileNotFoundError, (2, 'No such file or directory')

    Na gebruik van shlex verdween deze fout.

    Een andere moeilijkheid was te bepalen wanneer we alle gegevens van de subprocess hebben gekregen. Eerst wachtte ik tot zowel stdout als stderr gesloten waren. Maar dit werkte soms niet. Als laatste redmiddel wachten we tot de subprocess is afgesloten en voegen een kleine vertraging van twee seconden toe, en beschouwen dit als het einde van de streams.

    Links / credits

    How to continuously display Python output in a Webpage?
    https://stackoverflow.com/questions/15092961/how-to-continuously-display-python-output-in-a-webpage

    Python - shlex - Simple lexical analysis
    https://docs.python.org/3/library/shlex.html

    Python - subprocess - Popen
    https://docs.python.org/3/library/subprocess.html#subprocess.Popen

    Python read from subprocess stdout and stderr separately while preserving order
    https://stackoverflow.com/questions/31833897/python-read-from-subprocess-stdout-and-stderr-separately-while-preserving-order

    Laat een reactie achter

    Reageer anoniem of log in om commentaar te geven.

    Opmerkingen

    Laat een antwoord achter

    Antwoord anoniem of log in om te antwoorden.