Envoi de messages à Slack à l'aide de chat_postMessage
L'écriture d'une classe enveloppante n'est pas un mauvais moyen d'apprendre les tenants et les aboutissants d'un paquetage Python .

Dans le cadre d'un projet, j'envoyais déjà des messages par courrier électronique, mais je voulais aussi envoyer des messages sur Slack. Bien sûr, nous utilisons le SDK Slack Python .
La documentation se trouve sur la page : Python Slack SDK - Web Client. Dans ce billet, je crée une classe SlackAPI simple avec sa propre classe d'exception SlackError.
Créer et configurer une nouvelle application Slack
Nous allons envoyer nos messages à un canal Slack. Tout d'abord, nous devons créer une application Slack, définir les permissions (Scopes : 'chat:write') et l'ajouter à notre canal Slack. A la fin, nous avons les informations suivantes :
- Token du bot Slack
- Canal Slack (Id ou nom)
- Slack User (optionnel)
Reportez-vous à la documentation de Slack pour savoir comment procéder.
Slack et la limitation du débit
Slack peut limiter le débit de nos messages. Si nous envoyons trop de messages dans un court laps de temps, il affichera un message d'alerte :
Due to a high volume of activity, we are not displaying some messages sent by this application.
Dans notre code, une erreur SlackApiError avec l'erreur 'ratelimited' est levée. Le code de statut sera 429 et 'retry_after' donne les secondes pendant lesquelles il faut réessayer :
try:
...
except SlackApiError as e:
error_message=e.response.get('error', None),
status_code=e.response.status_code,
retry_after=e.response.headers.get('Retry-After', None),
Heureusement, le 'Python Slack SDK' est livré avec un RetryHandler qui effectue les réessais automatiquement pour nous lorsque le code de statut 429 est renvoyé. En l'ajoutant à notre code, nous n'avons pas à gérer ces tentatives nous-mêmes !
Important : par défaut, le RetryHandler ajoute ( !) une tentative. Cela signifie que si vous initialisez le WebClient comme ceci :
client = WebClient(
timeout=10.
)
client.chat_postMessage(...)
Le chat_postMessage sera renvoyé après deux fois le délai spécifié : 20 secondes. Vous ne pouvez pas changer cela en appelant le RetryHandler avec le paramètre :
max_retry_count
Pour l'instant, je n'ai pas trouvé comment fixer le délai d'attente pour l'ensemble de l'opération.
La classe d'exception SlackError
Dans notre SlackAPI, nous distinghuish entre :
- les erreurs permanentes
- Les erreurs temporaires
Les erreurs permanentes sont par exemple un Token Slack Bot ou un Canal Slack mauvais ou invalide. Dans ce cas, il ne faut pas réessayer d'envoyer ce message. Un exemple d'erreur temporaire est lorsque la connexion à Slack est interrompue. Dans ce cas, nous devons réessayer d'envoyer ce message après un certain temps. Nous devons également réessayer d'envoyer le message en cas de dépassement de délai. Notez qu'en cas de dépassement de délai, il se peut que nous envoyions un message deux fois. Cela se produit lorsque la réponse du serveur survient au même moment que le délai d'attente du client.
Le SDK Slack possède sa propre classe d'erreur, SlackApiError. Voici quelques exemples d'erreurs :
- invalid_auth invalid bot_token
- channel_not_found unknown channel
- no_text text is missing, e.g. text=None
- invalid_arguments e.g. channel=None
- ratelimited when you send too many messages in s short time
Si nous utilisons le RetryHandler, voir ci-dessus, l'erreur ratelimited ne se produit pas. Nous pouvons obtenir les données d'erreur à partir de l'objet error comme ceci :
e.response.status_code
e.response.error
e.response.headers
Une autre erreur peut se produire ici. C'est le cas lorsque l'URL du Slack API n'est pas correcte (introuvable). Une erreur HTTP 404 est alors générée. Les données d'erreur renvoyées sont différentes des données d'erreur "normales". Dans ce cas, l'objet d'erreur renvoie un dictionnaire( !):
e.response['status']
e.response['headers']
e.response['body']
Les deux autres erreurs, temporaires, sont les suivantes
- erreur_connexion
- timeout_error
Nous créons une classe d'exception SlackError qui renvoie toutes les erreurs permanentes et temporaires, ce qui signifie que notre application n'a besoin de capturer que les erreurs SlackError.
Ajouter notre objet SlackResponse
Nous ne sommes pas vraiment intéressés par toutes sortes de données, nous voulons juste savoir ce qui s'est passé et si nous devons réessayer l'envoi. Nous pouvons retourner un dictionnaire, mais pour rendre l'accès plus propre, nous utilisons ici un objet nommétuple :
ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after')
Nous renvoyons le même objet SlackResponse pour les deux cas, pas d'erreurs ou erreurs. En cas d'erreur, nous obtenons l'objet SlackResponse à partir de e.args[0].
Le code
Voici le code si vous voulez essayer ceci. Définissez d'abord au moins les paramètres suivants :
SLACK_BOT_TOKEN
SLACK_CHANNEL
Slack Channel peut être l'Id ou le nom du canal. Si vous souhaitez observer la fonction de limitation du débit, vous pouvez définir :
number_of_messages = 100
Le code :
import collections
import copy
import datetime
import logging
import os
import sys
import time
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.http_retry.builtin_handlers import RateLimitErrorRetryHandler
class SlackError(Exception):
pass
class SlackApi:
def __init__(
self,
config=None,
logger=None,
base_url=None,
slack_bot_token=None,
timeout=None,
use_rate_limit_handler=True
):
self.config = config
self.logger = logger
self.base_url = base_url
self.slack_bot_token = slack_bot_token
self.timeout = timeout or 5.
self.use_rate_limit_handler = use_rate_limit_handler
web_client_params = {
'token': self.slack_bot_token,
'timeout': self.timeout
}
if self.base_url is not None:
web_client_params['base_url'] = self.base_url
self.client = WebClient(**web_client_params)
if use_rate_limit_handler:
# this handler does retries when HTTP status 429 is returned
rate_limit_handler = RateLimitErrorRetryHandler(
max_retry_count=1
)
self.client.retry_handlers.append(rate_limit_handler)
def make_response(self, has_errors=False, error_message=None, status_code=None, retry_after=None):
request_msecs = int((datetime.datetime.now() - self.ts).total_seconds() * 1000)
ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after request_msecs')
return ResponseObj(has_errors, error_message, status_code, retry_after, request_msecs)
def chat_post_message(self, channel, text=None, blocks=None, user=None):
self.ts = datetime.datetime.now()
# change empty string user to None
if user is not None:
user = user.strip()
if user == '':
user = None
try:
response = self.client.chat_postMessage(
channel=channel,
text=text,
blocks=blocks,
user=user,
)
return self.make_response(
status_code=response.status_code,
)
except SlackApiError as e:
# first check for http errors
status = e.response.get('status', None)
self.logger.debug('status = {}'.format(status))
if status is not None:
status_code = status
error_message = 'http_' + str(status)
headers = e.response['headers']
else:
status_code = e.response.get('status_code', None)
error_message = e.response.get('error', None)
headers = e.response.headers
raise SlackError(self.make_response(
has_errors=True,
status_code=status_code,
error_message=error_message,
retry_after=headers.get('Retry-After', None),
))
except Exception as e:
exception = type(e).__name__
if exception == 'TimeoutError':
error_message='timeout_error'
else:
# all other errors are considered connection errors
error_message='connection_error'
raise SlackError(self.make_response(
has_errors=True,
error_message=error_message,
))
# the code below is typically in another file
# Example from:
# Stacking multiple blocks
# https://api.slack.com/messaging/composing/layouts
blocks_example = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "New request"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Type:*\nPaid Time Off"
},
{
"type": "mrkdwn",
"text": "*Created by:*\n<example.com|Fred Enriquez>"
}
]
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*When:*\nAug 10 - Aug 13"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "<https://example.com|View request>"
}
}
]
def get_logger(logger_name=None, logger_level=logging.DEBUG):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
# console
console_handler = logging.StreamHandler()
console_logger_format = '%(levelname)-8.8s [%(filename)-15s%(funcName)15s():%(lineno)03s] %(message)s'
console_handler.setFormatter(logging.Formatter(console_logger_format))
console_handler.setLevel(logger_level)
logger.addHandler(console_handler)
return logger
config = {
'SLACK_BOT_TOKEN': '',
'SLACK_CHANNEL': '',
'SLACK_USER': None
}
def main():
logger = get_logger(logger_name='slack_api')
# your bot token and channel
slack_bot_token = config.get('SLACK_BOT_TOKEN')
slack_channel = config.get('SLACK_CHANNEL')
slack_user = config.get('SLACK_USER')
# create 'messages table'
number_of_messages = 100
#number_of_messages = 1
messages = []
for i in range(number_of_messages):
# deepcopy the dictionary here!
blocks = copy.deepcopy(blocks_example)
# patch message number
blocks[0]['text']['text'] = "New request" + ': {:03d}'.format(i)
messages.append({
'slack_bot_token': slack_bot_token,
'channel': slack_channel,
'user': slack_user,
'text': 'message {:03d}: this is a test message'.format(i),
'blocks': blocks,
# delivery status
'is_delivery_completed': False,
'is_delivered': False,
'delivery_tries': 0,
'last_delivery_error': None,
})
# send the messages
for i, message in enumerate(messages):
if message['is_delivered']:
continue
logger.debug('sending message {} ...'.format(i))
slack_api = SlackApi(
logger=logger,
slack_bot_token=message.get('slack_bot_token'),
#timeout=0.1,
)
try:
r = slack_api.chat_post_message(
channel=message.get('channel'),
text=message.get('text'),
blocks=message.get('blocks'),
user=message.get('user'),
)
logger.debug('success sending message[{}]: r = {}'.format(i, r))
# delivery status
message['is_delivery_completed'] = True
message['is_delivered'] = True
message['delivery_tries'] += 1
except SlackError as e:
#logger.exception('error sending message[{}]'.format(i))
r = e.args[0]
logger.debug('problem sending message[{}]: r = {}'.format(i, r))
message['delivery_tries'] += 1
message['last_delivery_error'] = r.error_message
if r.error_message in ['timeout_error', 'connection_error']:
# delivery status, must retry
message['is_delivery_completed'] = False
message['is_delivered'] = False
else:
# delivery status, done
message['is_delivery_completed'] = True
message['is_delivered'] = False
if __name__ == '__main__':
main()
Résumé
Tout d'abord, nous devons comprendre comment créer une application Slack, lui donner les permissions appropriées et l'ajouter à un canal Slack. Pour envoyer des messages à Slack, nous utilisons le SDK de Slack. Le RetryHandler prend en charge la fonction de limitation du taux de Slack. Nous avons intégré le SDK de Slack dans notre classe SlackApi personnalisée. Nous avons également ajouté notre propre classe SlackError qui gère toutes les erreurs ( !) qui peuvent survenir, y compris les dépassements de temps et les erreurs de connexion.
Notre application peut maintenant créer les messages et les blocs et les envoyer à Slack en utilisant notre SlackApi.
L'avantage d'écrire une classe wrapper, c'est que nous devons passer en revue tous les paramètres et réponses possibles. C'est une façon (et pas si mauvaise) d'apprendre les tenants et les aboutissants d'un paquetage Python .
Liens / crédits
Python Slack SDK - RetryHandler
https://slack.dev/python-slack-sdk/web/index.html#retryhandler
Python Slack SDK - Web Client
https://slack.dev/python-slack-sdk/web/index.html
Slack - Package slack_sdk
https://slack.dev/python-slack-sdk/api-docs/slack_sdk
Slack - Rate Limits
https://api.slack.com/docs/rate-limits
Slack API - Python Slack SDK
https://slack.dev/python-slack-sdk
Using the Slack Web API
https://api.slack.com/web
En savoir plus...
Slack
Récent
- Obtenir une liste des vidéos YouTube d'une personne
- De Docker-Composer à Docker Swarm : Configs
- Docker-Composer des projets avec des noms de services identiques
- X automatisation du web et scraping avec Selenium
- Aiohttp avec serveurs DNS personnalisés, Unbound et Docker
- Renvoyer uniquement les valeurs d'une liste d'enregistrements de FastAPI
Les plus consultés
- Utiliser UUIDs au lieu de Integer Autoincrement Primary Keys avec SQLAlchemy et MariaDb
- Utilisation des Python's pyOpenSSL pour vérifier les certificats SSL téléchargés d'un hôte
- Utiliser PyInstaller et Cython pour créer un exécutable Python
- Connexion à un service sur un hôte Docker à partir d'un conteneur Docker
- SQLAlchemy : Utilisation de Cascade Deletes pour supprimer des objets connexes
- Flask RESTful API validation des paramètres de la requête avec les schémas Marshmallow