angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Отправка сообщений в Slack с помощью chat_postMessage

Написание класса-обертки - неплохой способ изучить особенности пакета Python .

23 марта 2023 Обновленный 24 марта 2023
В
post main image
https://www.pexels.com/nl-nl/@thngocbich/

Для одного проекта я уже отправлял сообщения по электронной почте, но теперь мне захотелось также отправлять сообщения в Slack. Конечно, мы используем Python Slack SDK.

Документацию можно найти на странице: Python Slack SDK - Web Client. В этом посте я создаю простой класс SlackAPI с собственным классом исключений SlackError.

Создание и настройка нового приложения Slack

Мы будем отправлять наши сообщения в канал Slack. Сначала мы должны создать приложение Slack, установить разрешения (Scopes: 'chat:write') и добавить его в наш канал Slack. В конце у нас будет следующая информация:

  • Токен бота Slack
  • Канал Slack (Id или имя)
  • Slack User (необязательно).

Обратитесь к документации по Slack, чтобы узнать, как это сделать.

Slack и ограничение скорости

Slack может ограничивать скорость отправки наших сообщений. Если мы отправим слишком много сообщений за короткое время, он покажет предупреждающее сообщение:

Due to a high volume of activity, we are not displaying some messages sent by this application.

В нашем коде возникает SlackApiError с ошибкой 'ratelimited'. Код_статуса будет 429, а 'retry_after' дает секунды, когда нужно повторить попытку:

    try:
        ...
    except SlackApiError as e:
        error_message=e.response.get('error', None),
        status_code=e.response.status_code,
        retry_after=e.response.headers.get('Retry-After', None),

К счастью, 'Python Slack SDK' поставляется с RetryHandler, который делает повторные попытки автоматически для нас, когда возвращается status_code 429. Добавив его в наш код, нам не придется обрабатывать эти повторы самостоятельно!

Важно: По умолчанию RetryHandler добавляет (!) одну повторную попытку. Это означает, что если вы инициализируете WebClient следующим образом:

    client = WebClient(
        timeout=10.
    )
    client.chat_postMessage(...)

Тогда chat_postMessage вернется после удвоения указанного таймаута: 20 секунд. Вы не можете изменить это, вызвав RetryHandler с параметром:

max_retry_count

На данный момент я не понял, как можно сделать таймаут для всей операции.

Класс исключений SlackError

В нашем SlackAPI мы distингуем между:

  • постоянными ошибками
  • Временные ошибки

Постоянные ошибки - это, например, плохой или недействительный Slack Bot Token, Slack Channel. В этом случае не следует повторять попытку отправки сообщения. Примером временной ошибки является обрыв соединения со Slack. В этом случае мы должны повторить отправку этого сообщения через некоторое время. Мы также должны повторить отправку при возникновении тайм-аута. Обратите внимание, что в случае тайм-аута мы можем отправить сообщение дважды. Это происходит, когда ответ сервера происходит в то же время, что и тайм-аут клиента.

Slack SDK имеет свой собственный класс ошибок, SlackApiError. Некоторые ошибки таковы:

- invalid_auth        invalid bot_token
- channel_not_found   unknown channel
- no_text             text is missing, e.g. text=None
- invalid_arguments   e.g. channel=None
- ratelimited         when you send too many messages in s short time

Если мы используем RetryHandler, см. выше, ошибка с ограничением по времени не возникает. Мы можем получить данные об ошибке из объекта ошибки следующим образом:

e.response.status_code
e.response.error
e.response.headers

Есть еще одна ошибка, которая может возникнуть здесь. Это когда URL-адрес Slack API неверен (не найден). В этом случае возникает ошибка HTTP 404. Возвращаемые данные об ошибке отличаются от "обычных" данных об ошибке. В этом случае объект ошибки возвращает словарь(!):

e.response['status']
e.response['headers']
e.response['body']

Две другие, временные ошибки:

  • ошибка_соединения
  • ошибка таймаута

Мы создаем класс исключения SlackError, который возвращает все постоянные и временные ошибки, что означает, что нашему приложению нужно перехватывать только ошибки SlackError.

Добавление нашего объекта SlackResponse

Нас не очень интересуют всевозможные данные, мы просто хотим знать, что произошло и нужно ли нам повторить попытку отправки. Мы можем вернуть словарь, но чтобы сделать доступ более чистым, мы используем здесь объект с именемtuple :

ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after')

Мы возвращаем один и тот же объект SlackResponse для обоих случаев - без ошибок или с ошибками. В случае ошибок мы получаем объект SlackResponse из e.args[0].

Код

Вот код, если вы хотите попробовать. Сначала установите хотя бы следующие параметры:

SLACK_BOT_TOKEN
SLACK_CHANNEL

Slack Channel - это может быть Id или имя канала. Если вы хотите наблюдать функцию ограничения скорости, вы можете установить:

number_of_messages = 100

Код:

import collections
import copy
import datetime
import logging
import os
import sys
import time    

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.http_retry.builtin_handlers import RateLimitErrorRetryHandler

class SlackError(Exception):
    pass
    
class SlackApi:
    def __init__(
        self, 
        config=None,
        logger=None,
        base_url=None,
        slack_bot_token=None,
        timeout=None,
        use_rate_limit_handler=True
    ):
        self.config = config
        self.logger = logger
        self.base_url = base_url
        self.slack_bot_token = slack_bot_token
        self.timeout = timeout or 5.
        self.use_rate_limit_handler = use_rate_limit_handler
        web_client_params = {
            'token': self.slack_bot_token,
            'timeout': self.timeout
        }
        if self.base_url is not None:
            web_client_params['base_url'] = self.base_url
        self.client = WebClient(**web_client_params)
        if use_rate_limit_handler:
            # this handler does retries when HTTP status 429 is returned
            rate_limit_handler = RateLimitErrorRetryHandler(
                max_retry_count=1
            )
            self.client.retry_handlers.append(rate_limit_handler)

    def make_response(self, has_errors=False, error_message=None, status_code=None, retry_after=None):
        request_msecs = int((datetime.datetime.now() - self.ts).total_seconds() * 1000)
        ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after request_msecs')
        return ResponseObj(has_errors, error_message, status_code, retry_after, request_msecs)

    def chat_post_message(self, channel, text=None, blocks=None, user=None):
        self.ts = datetime.datetime.now()
        # change empty string user to None
        if user is not None:
            user = user.strip()
            if user == '':
                user = None
        try:
            response = self.client.chat_postMessage(
                channel=channel,
                text=text,
                blocks=blocks,
                user=user,
            )
            return self.make_response(
                status_code=response.status_code,
            )
        except SlackApiError as e:
            # first check for http errors
            status = e.response.get('status', None)
            self.logger.debug('status = {}'.format(status))
            if status is not None:
                status_code = status
                error_message = 'http_' + str(status)
                headers = e.response['headers']
            else:
                status_code = e.response.get('status_code', None)
                error_message = e.response.get('error', None)
                headers = e.response.headers
            raise SlackError(self.make_response(
                has_errors=True,
                status_code=status_code,
                error_message=error_message,
                retry_after=headers.get('Retry-After', None),
            ))
        except Exception as e:
            exception = type(e).__name__
            if exception == 'TimeoutError':
                error_message='timeout_error'
            else:
                # all other errors are considered connection errors
                error_message='connection_error'
            raise SlackError(self.make_response(
                has_errors=True,
                error_message=error_message,
            ))

# the code below is typically in another file

# Example from:
# Stacking multiple blocks
# https://api.slack.com/messaging/composing/layouts
blocks_example = [
    {
        "type": "header",
        "text": {
            "type": "plain_text",
            "text": "New request"
        }
    },
    {
        "type": "section",
        "fields": [
            {
                "type": "mrkdwn",
                "text": "*Type:*\nPaid Time Off"
            },
            {
                "type": "mrkdwn",
                "text": "*Created by:*\n<example.com|Fred Enriquez>"
            }
        ]
    },
    {
        "type": "section",
        "fields": [
            {
                "type": "mrkdwn",
                "text": "*When:*\nAug 10 - Aug 13"
            }
        ]
    },
    {
        "type": "section",
        "text": {
            "type": "mrkdwn",
            "text": "<https://example.com|View request>"
        }
    }
]


def get_logger(logger_name=None, logger_level=logging.DEBUG):
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    # console
    console_handler = logging.StreamHandler()
    console_logger_format = '%(levelname)-8.8s [%(filename)-15s%(funcName)15s():%(lineno)03s] %(message)s'
    console_handler.setFormatter(logging.Formatter(console_logger_format))
    console_handler.setLevel(logger_level)
    logger.addHandler(console_handler)
    return logger


config = {
    'SLACK_BOT_TOKEN': '',
    'SLACK_CHANNEL': '',
    'SLACK_USER': None
}

def main():
    logger = get_logger(logger_name='slack_api')

    # your bot token and channel
    slack_bot_token = config.get('SLACK_BOT_TOKEN')
    slack_channel = config.get('SLACK_CHANNEL')
    slack_user = config.get('SLACK_USER')

    # create 'messages table'
    number_of_messages = 100
    #number_of_messages = 1
    messages = []
    for i in range(number_of_messages):
        # deepcopy the dictionary here!
        blocks = copy.deepcopy(blocks_example)
        # patch message number
        blocks[0]['text']['text'] = "New request" + ': {:03d}'.format(i)
        messages.append({
            'slack_bot_token': slack_bot_token,
            'channel': slack_channel,
            'user': slack_user,
            'text': 'message {:03d}: this is a test message'.format(i),
            'blocks': blocks,
            # delivery status
            'is_delivery_completed': False,
            'is_delivered': False,
            'delivery_tries': 0,
            'last_delivery_error': None,
        })

    # send the messages
    for i, message in enumerate(messages):
        if message['is_delivered']:
            continue
        logger.debug('sending message {} ...'.format(i))
        slack_api = SlackApi(
            logger=logger,
            slack_bot_token=message.get('slack_bot_token'),
            #timeout=0.1,
        )
        try:
            r = slack_api.chat_post_message(
                channel=message.get('channel'),
                text=message.get('text'),
                blocks=message.get('blocks'),
                user=message.get('user'),
            )
            logger.debug('success sending message[{}]: r = {}'.format(i, r))
            # delivery status
            message['is_delivery_completed'] = True
            message['is_delivered'] = True
            message['delivery_tries'] += 1
        except SlackError as e:
            #logger.exception('error sending message[{}]'.format(i))
            r = e.args[0]
            logger.debug('problem sending message[{}]: r = {}'.format(i, r))
            message['delivery_tries'] += 1
            message['last_delivery_error'] = r.error_message
            if r.error_message in ['timeout_error', 'connection_error']:
                # delivery status, must retry
                message['is_delivery_completed'] = False
                message['is_delivered'] = False
            else:
                # delivery status, done
                message['is_delivery_completed'] = True
                message['is_delivered'] = False


if __name__ == '__main__':
    main()

Summary

Сначала мы должны понять, как создать приложение Slack, дать ему соответствующие разрешения и добавить его в канал Slack. Для отправки сообщений в Slack мы используем Slack SDK. RetryHandler позаботится о функции ограничения скорости Slack. Это делает наш код намного компактнее. Мы обернули Slack SDK в наш пользовательский класс SlackApi. Мы также добавили наш собственный класс SlackError, который обрабатывает все (!) ошибки, которые могут возникнуть, включая таймауты и ошибки соединения.

Теперь наше приложение может создавать сообщения и блоки и отправлять их в Slack с помощью нашего SlackApi.

Приятным моментом написания класса-обертки является то, что мы перебираем все возможные параметры и ответы. Это один из способов (и не такой уж плохой) изучить особенности пакета Python .

Ссылки / кредиты

Python Slack SDK - RetryHandler
https://slack.dev/python-slack-sdk/web/index.html#retryhandler

Python Slack SDK - Web Client
https://slack.dev/python-slack-sdk/web/index.html

Slack - Package slack_sdk
https://slack.dev/python-slack-sdk/api-docs/slack_sdk

Slack - Rate Limits
https://api.slack.com/docs/rate-limits

Slack API - Python Slack SDK
https://slack.dev/python-slack-sdk

Using the Slack Web API
https://api.slack.com/web

Подробнее

Slack

Оставить комментарий

Комментируйте анонимно или войдите в систему, чтобы прокомментировать.

Комментарии

Оставьте ответ

Ответьте анонимно или войдите в систему, чтобы ответить.