angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Приложение Flask , показывающее stdout и stderr фонового задания

multiprocessing.Queue() используется для захвата линий stdout и stderr в режиме реального времени.

19 декабря 2022
post main image
https://www.pexels.com/nl-nl/@samerdaboul

В проекте Flask мне нужно было запустить фоновое задание, а точнее команду, запущенную в терминале (Linux), и показать ее вывод, stdout и sterr, в реальном времени в окне браузера. Вы можете найти несколько решений в интернете, и это просто еще одно. Я также использую некоторый код, который я нашел в Интернете, см. ссылки ниже.

Это решение использует:

  • мультипроцессинг, для запуска нового процесса из нашего приложения Flask
  • subprocess, для запуска команды
  • потоков, для захвата stdin и stdout
  • multiprocessing.Queue для:
    - хранения вывода фонового задания
    - чтения вывода фонового задания в наше приложение Flask .

Как обычно, я запускаю это на Ubuntu.

Одностраничное приложение

Демонстрационное приложение Flask - это одностраничное приложение. На этой странице мы можем запустить команду, и после запуска в окне на странице отображается вывод по команде.

    Приложение Flask имеет два маршрута:

    start_command()

    GET-метод отправляет страницу. Метод POST-метод используется для запуска команды и остановки команды. Команды, которые вы можете попробовать в этой демонстрации:

    - pwd
    - ls -lR
    - ps -Af
    - someunknowncommand
    - cat /var/log/syslog
    - tail --lines=5000 /var/log/syslog
    - tail -f /var/log/syslog
    - docker

    get_command_result_data(queue_id)

    Этот маршрут вызывается Javascript на странице каждую секунду после запуска команды. Полученные данные добавляются в 'div' на странице.

    В этом проекте также используются Bootstrap и JQuery.

    Услуги в Flask

    Для этого приложения Flask я создал новый сервис. Как обычно, я поместил службу в папку app.services, а инициализировал службу в factory.py, используя init_app(), как и расширения Flask . Тогда все, что нам нужно сделать, это включить следующую строку в наши файлы Python :

    import app.services as services

    И затем мы вызываем нашу службу следующим образом:

    services.our_service.some_method()

    Таким образом, нам не придется беспокоиться о циклическом импорте.

    Служба BackgroundCommandRunner

    BackgroundCommandRunner - это наш новый сервис с двумя методами:

    start_command(command)

    Этот метод создает очередь и запускает новый фоновый процесс, выполняющий заданную команду. Процесс перехватывает stdout и stderr и помещает это в очередь.

    Возвращает tuple (процесс, очередь):

    • процесс: возвращаемое значение из multiprocessing.Process
    • очередь: возвращаемое значение из multiprocessing.Queue(), включая идентификатор и метку времени.

    Чтобы получить идентификатор процесса: process.pid
    Чтобы получить идентификатор очереди: queue.id

    get_queue_data(queue_id)

    Возвращает все новые доступные данные (строки) путем чтения из очереди, пока она не опустеет. Возвращенные данные являются jsonified, что означает, что мы можем вернуть их клиенту.

    Когда у нас будут все выходные данные (stdout, stderr) из процесса?

    В клиенте мы хотим знать, когда завершился фоновый процесс. При нормальной работе, т.е. работе без ошибок, потоки stdout и stderr закрываются.

    Предполагая, что в некоторых случаях потоки не работают так, как ожидалось, в качестве последнего средства мы можем подождать, пока завершится subprocess . Затем мы добавляем небольшую задержку, две секунды, чтобы очередь заполнилась оставшимися данными.

    Существуют и другие условия, например, при запуске неизвестной команды subprocess генерирует исключение. В этом случае мы отправляем сообщение об ошибке и устанавливаем флаг 'ready'-.

    Некоторые примечания

    • subprocess не запускается с 'shell=True', так как это привело бы к запуску другого процесса.
    • shlex используется для разбиения команды (строки) на последовательность аргументов перед вызовом subprocess.
    • Ошибки в фоновом процессе перехватываются и отправляются клиенту с помощью очереди.
    • Очереди удаляются после некоторого времени бездействия (60 секунд).

    Попробуйте сами

    Внимание! Пока вы запускаете и останавливаете фоновые задачи с веб-страницы, проблем быть не должно. Однако, если что-то пойдет не так или вы завершите Flask , нажав Control-C в командной строке, когда фоновый процесс запущен, то вы должны остановить этот фоновый процесс перед перезапуском приложения Flask . Это демо-версия, и не было предусмотрено никаких мер по изящной обработке таких ситуаций.

    Если вы хотите попробовать сами, вот дерево проекта:

    .
    ├── project
    │   ├── app
    │   │   ├── services
    │   │   │   ├── background_command_runner.py
    │   │   │   └── __init__.py
    │   │   ├── templates
    │   │   │   ├── base.html
    │   │   │   └── start_command.html
    │   │   └── factory.py
    │   └── run.py

    Сначала создайте virtual environment , а затем установите Flask:

    pip install Flask

    Затем создайте следующие файлы.

    run.py

    # run.py
    from app.factory import create_app
    
    app = create_app()
    
    if __name__ == '__main__':
        app.run(
            host= '0.0.0.0',
            port=5050,
            debug=True,
            use_reloader=True,
        )

    app/factory.py

    # app/factory.py
    import datetime
    import logging
    import os
    import re
    import signal
    import sys
    import string
    import time
    
    from flask import current_app, Flask, g, json, redirect, request, render_template
    
    from .services import (
        background_command_runner,
    )
    
    import app.services as services
    
    
    def setup_logging():
        logger = logging.getLogger(__name__)
        logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
        logger.setLevel(logging.DEBUG)
        # console
        console_handler = logging.StreamHandler(sys.stdout)                             
        console_handler.setLevel(logging.DEBUG)
        console_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(console_handler)                                            
        return logger
    
    
    def create_app():
        app = Flask(__name__)
    
        # reload template if differs from cached
        app.jinja_env.auto_reload = True
        app.config['TEMPLATES_AUTO_RELOAD'] = True
    
        # logging
        app.logger = setup_logging()
    
        # init services
        background_command_runner.init_app(app)
    
        # route to start a command
        @app.route('/', methods=['GET', 'POST'])
        def start_command():
            current_app.logger.debug('()')
    
            command_pid = None
            command_qid = None
            command = None
            error_message = None
            if request.method == 'POST':
                # stop current background process, if running
                try:
                    command_pid = int(request.form.get('command_pid'))
                    current_app.logger.debug('command_pid = {}'.format(command_pid))
                    os.kill(command_pid, signal.SIGKILL)
                except:
                    pass
                action = request.form.get('action')
                if action == 'start_command':
                    try:
                        current_app.logger.debug('starting background command ...')
                        command = request.form.get('command')
                        p, q = services.background_command_runner.start_command(command)
                        command_pid = p.pid
                        command_qid = q.id
                    except Exception as e:
                        error_message = 'Error starting command {}: {}, {}'.format(command, type(e).__name__, e.args)
                elif action == 'stop_command':
                    current_app.logger.debug('stopping background command ...')
                    pass
    
            return render_template(
                '/start_command.html',
                page_title='Run command in background',
                command=command,
                command_pid=command_pid,
                command_qid=command_qid,
                error_message=error_message,
            )
    
        # route to get data from a command
        @app.route('/get-command-result-data/<command_qid>', methods=['GET'])
        def get_command_result_data(command_qid):
            current_app.logger.debug('(command_qid = {})'.format(command_qid))
            return services.background_command_runner.get_queue_data(command_qid)
    
        return app

    app/services/background_command_runner.py

    # app/services/background_command_runner.py
    import datetime
    import multiprocessing
    import os
    import queue
    import shlex
    import subprocess
    import sys
    import threading
    import time
    import uuid
    
    from flask import current_app, jsonify
    
    
    class BackgroundCommandRunner:
        def __init__(self, app=None):
            self.app = app
    
            # storage for queues by id
            self.qid_queues = {}
            # remove queue if no activity after this time
            self.max_queue_secs = 60
            # stream end-of-transmission character
            self.EOT = None
    
            if app is not None:
                self.init_app(app)
    
        def init_app(self, app):
            pass
    
        def __create_queue(self):
            q = multiprocessing.Queue()
            q.id = uuid.uuid4().hex
            q.et = int(time.time())
            q.stdout_closed = False
            q.stderr_closed = False
            self.qid_queues[q.id] = q
            return q
    
        def __get_queue_by_id(self, qid):
            self.__cleanup_queues()
            q = self.qid_queues.get(qid)
            if q is not None:
                q.et = int(time.time())
            return q
    
        def __to_json(self, d):
            current_app.logger.debug('d = {}'.format(d))
            d_json = None
            try:
                d_json = jsonify(d)
            except Exception as e:
                current_app.logger.error('jsonify error, exception = {}, e.args = {} for d = {}'.format(type(e).__name__, e.args, d))
            return d_json
    
        def get_queue_data(self, qid):
            q = self.__get_queue_by_id(qid)
            if q is None:
                data = {
                    'lines': [],
                    'errors': ['Queue disappeared'],
                    'ready': True,
                }
                return self.__to_json({'data': data})
    
            errors = None
            ready = False
            lines = []
            # loop while queue not empty or max lines
            while len(lines) < 1000:
                try:
                    stream, line = q.get(block=True, timeout=0.2)
                except queue.Empty:
                    break
                except Exception as e:
                    errors = [type(e).__name__ + ', ' + str(e.args)]
                    current_app.logger.error('exception = {}, e.args = {}'.format(type(e).__name__, e.args))
                    ready = True
                    break
                current_app.logger.debug('data from queue, stream = {}, type(line) = {}, line = {}'.format(stream, type(line), line))
                if line == self.EOT:
                    if stream == 'stdout':
                        q.stdout_closed = True
                    elif stream == 'stderr':
                        q.stderr_closed = True
                    if q.stdout_closed and q.stderr_closed:
                        ready = True
                    continue
                lines.append({
                    'stream': stream,
                    'line': line,
                })
                if stream == 'exit_code':
                    current_app.logger.debug('exit_code received')
                    ready = True
    
            data = {
                'lines': lines,
                'errors': errors,
                'ready': ready,
            }
            return self.__to_json({'data': data})
    
        def __cleanup_queues(self):
            et = int(time.time())
            to_delete_qids = [q.id for q in self.qid_queues.values() if (et - q.et) > self.max_queue_secs]
            for qid in to_delete_qids:
                del self.qid_queues[qid]
    
        def __reader(self, stream, pipe, q):
            try:
                with pipe:
                    for line in iter(pipe.readline, ''):
                        q.put((stream, line))
            finally:
                q.put((stream, self.EOT))
    
        def __run_command_as_subprocess(self, command, q):
            try:
                if isinstance(command, str):
                    command = shlex.split(command)
                #print('COMMAND = {}'.format(command))
                p = subprocess.Popen(
                    command,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    bufsize=1,
                    universal_newlines = True,
                )
                threading.Thread(target=self.__reader, args=['stdout', p.stdout, q], daemon=True).start()
                threading.Thread(target=self.__reader, args=['stderr', p.stderr, q], daemon=True).start()
                # delay: the process may have completed but the output was still not processed
                exit_code = p.wait()
                time.sleep(2)
                #print('EXIT_CODE = {}'.format(exit_code))
                q.put(('exit_code', exit_code))
            except Exception as e:
                error_message = 'There was an error running the command = {}: {}, {}'.format(command, type(e).__name__, e.args)
                #print('ERROR_MESSAGE = {}'.format(error_message))
                q.put(('stderr', error_message))
                q.put(('exit_code', 1))
    
        def start_command(self, command):
            # start process and return
            q = self.__create_queue()
            q.put(('stdout', 'Running command: ' + command))
            p = multiprocessing.Process(
                name='__run_command_as_subprocess',
                target=self.__run_command_as_subprocess,
                args=(command, q),
            )
            p.start()
            return (p, q)

    app/services/__init__.py

    # app/services/__init__.py
    from .background_command_runner import BackgroundCommandRunner
    background_command_runner = BackgroundCommandRunner()

    app/templates/base.html

    {# app/templates/base.html #}
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
    <title>{{ page_title }}</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65" crossorigin="anonymous">
    </head>
    <body>
    
    <main id="main" class="container-fluid py-3 qps-0 qflex-fill mt-0">
        {% block main -%}{% endblock -%}
    
        {%- if command_qid -%}
        <div class="row px-2">
            <div class="col-8 px-2 py-2 my-0">
                Results for command '{{ command }}':
            </div>
            <div class="col-4 px-2 pt-0 pb-2 my-0 text-end">
                <form method="post">
                <input type="hidden" name="command_pid" value="{{ command_pid }}">
                <button type="submit" name="action" value="stop_command" class="btn btn-outline-dark btn-sm">
                    Stop command
                </button>
                </form>
            </div>
        </div>
        <div class="row px-2">
            <div class="col border p-3 overflow-scroll small" id="command_result_data"  style="height: 400px;">
            </div>
        </div>
        <p>
            Lines received: <span id="lines-received">0</span>
        </p>
        {%- endif -%}
    </main>
    
    <script src="https://code.jquery.com/jquery-3.6.2.min.js" integrity="sha256-2krYZKh//PcchRtd+H+VyyQoZ/e3EcrkxhM8ycwASPA=" crossorigin="anonymous"></script>
    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js" integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4" crossorigin="anonymous"></script>
    
    {%- set get_command_result_data_url = '' -%}
    {%- if command_qid -%}
    {%- set get_command_result_data_url = url_for('get_command_result_data', command_qid=command_qid) -%}
    <script>
    var build_result_update_secs = 1000;
    var lines_received = 0;
    function get_command_result_data(){
        var url, data, box, li, i, stream, line, exit_code_received = false;
        url = '{{ get_command_result_data_url }}';
        console.log('url = ' + url);
        if(url == ''){
            return;
        }
        $.ajax({
            url: url,
            type: 'GET',
            dataType: 'json',
            success: function(rcvd){
                data = rcvd.data;
                lines_received += data.lines.length;
                box = $('#command_result_data');
                for(i = 0; i < data.lines.length; i++){
                    li = data.lines[i];
                    stream = li.stream;
                    if(stream == 'stdout'){
                        line = li.line;
                    }else if(stream == 'stderr'){
                        line = 'stderr: ' + li.line;
                    }else{
                        line = stream + ': ' + li.line;
                    }
                    box.append(line + '<br>');
                }
                if(data.errors){
                    data.errors.forEach(function(li, i){
                        box.append('ERROR: ' + li + '<br>');
                    });
                }
                if(data.ready){
                    box.append('Ready' + '<br>');
                }else{
                    setTimeout(get_command_result_data, build_result_update_secs);
                }
                box.scrollTop(box.prop('scrollHeight'));
                $('#lines-received').text(lines_received);
            }
        });
    }
    $(document).ready(function(){
        setTimeout(get_command_result_data, build_result_update_secs);
    });
    </script>
    {%- endif -%}
    
    </body>
    </html>

    app/templates/start_command.html

    {# app/templates/start_command.html #}
    {% extends "base.html" %}
    {% block main %}
    
    	<h2 class="mb-3">
    		{{ page_title }}
    	</h2>
    	{%- if error_message -%}
    	<p class="text-danger">
    		{{ error_message }}
    	</p>
    	{%- endif -%}
    	<form method="post">
    	<input type="hidden" name="command_pid" value="{{ command_pid }}">
    		<select name="command" class="form-select" aria-label="Select command">
    			<option value="pwd">
    				pwd
    			</option>
    			<option value="ls -lR">
    				ls -lR
    			</option>
    			<option value="ps -Af">
    				ps -Af
    			</option>
    			<option value="someunknowncommand">
    				someunknowncommand
    			</option>
    			<option value="tail --lines=5000 /var/log/syslog">
    				tail --lines=5000 /var/log/syslog
    			</option>
    			<option value="tail -f /var/log/syslog">
    				tail -f /var/log/syslog
    			</option>
    			<option value="docker">
    				docker
    			</option>
    		</select>
    		<button type="submit" name="action" value="start_command" class="btn btn-primary my-3">
    			Start command
    		</button>
    	</form>
    
    {%- endblock -%}
    

    Запуск проекта

    Запустите приложение, перейдя в каталог проекта и набрав:

    python run.py

    Затем направьте браузер на:

    http://127.0.0.1:5050

    Должна появиться страница. Выберите команду и наблюдайте за выводом.

    Резюме

    Как всегда, это заняло больше времени, чем ожидалось. Сначала я передал команду в виде строки в subprocess. Команды 'pwd' и 'ls' сработали, но 'ls -l' выдала сообщение:

    FileNotFoundError, (2, 'No such file or directory')

    После использования shlex эта ошибка исчезла.

    Другая трудность заключалась в том, чтобы определить, когда мы получили все данные из subprocess. Сначала я ждал, пока закроются оба stdout и stderr . Но это иногда не срабатывало. В качестве последнего средства, мы ждем завершения subprocess , добавляем небольшую задержку в две секунды и считаем это окончанием потоков.

    Ссылки / кредиты

    How to continuously display Python output in a Webpage?
    https://stackoverflow.com/questions/15092961/how-to-continuously-display-python-output-in-a-webpage

    Python - shlex - Simple lexical analysis
    https://docs.python.org/3/library/shlex.html

    Python - subprocess - Popen
    https://docs.python.org/3/library/subprocess.html#subprocess.Popen

    Python read from subprocess stdout and stderr separately while preserving order
    https://stackoverflow.com/questions/31833897/python-read-from-subprocess-stdout-and-stderr-separately-while-preserving-order

    Подробнее

    Flask Multiprocessing

    Оставить комментарий

    Комментируйте анонимно или войдите в систему, чтобы прокомментировать.

    Комментарии

    Оставьте ответ

    Ответьте анонимно или войдите в систему, чтобы ответить.