Envío de mensajes a Slack mediante chat_postMessage
Escribir una clase de envoltura no es una mala manera de aprender los entresijos de un paquete Python .

Para un proyecto que ya estaba enviando mensajes por correo electrónico, pero ahora también quería enviar mensajes a Slack. Por supuesto, estamos utilizando el Python Slack SDK.
La documentación se puede encontrar en la página: Python Slack SDK - Cliente Web. En este post creo una simple clase SlackAPI con su propia clase de excepción SlackError.
Crear y configurar una nueva Slack App
Vamos a enviar nuestros mensajes a un canal de Slack. Primero debemos crear una Slack App, configurar los permisos (Scopes: 'chat:write'), y añadirla a nuestro Slack Channel. Al final tenemos la siguiente información:
- Slack Bot Token
- Canal Slack (Id o nombre)
- Slack User (opcional)
Consulta la documentación de Slack para saber cómo hacerlo.
Slack y la limitación de velocidad
Slack puede limitar la velocidad de nuestros mensajes. Si enviamos demasiados mensajes en poco tiempo, mostrará el mensaje de alerta:
Due to a high volume of activity, we are not displaying some messages sent by this application.
En nuestro código, se genera un SlackApiError con el error 'ratelimited'. El status_code será 429 y 'retry_after' da los segundos en los que hay que volver a intentarlo:
try:
...
except SlackApiError as e:
error_message=e.response.get('error', None),
status_code=e.response.status_code,
retry_after=e.response.headers.get('Retry-After', None),
Afortunadamente, el 'Python Slack SDK' viene con un RetryHandler que hace los reintentos automáticamente por nosotros cuando se devuelve el status_code 429. Añadiendo esto a nuestro código, no tenemos que manejar estos reintentos nosotros mismos.
Importante: Por defecto, el RetryHandler añade (!) un reintento. Esto significa que si inicializas el WebClient así:
client = WebClient(
timeout=10.
)
client.chat_postMessage(...)
Entonces el chat_postMessage volverá después de dos veces el tiempo de espera especificado: 20 segundos. No puedes cambiar esto llamando al RetryHandler con el parámetro:
max_retry_count
De momento no he averiguado como podemos hacer que el timeout sea para toda la operación.
La clase de excepción SlackError
En nuestro SlackAPI, distinghuish entre:
- Errores permanentes
- Errores temporales
Errores permanentes son, por ejemplo, un mal o no válido Slack Bot Token, Slack Channel. En este caso no debemos reintentar el envío de este mensaje. Un ejemplo de error temporal es cuando la conexión a Slack está caída. En este caso debemos reintentar enviar este mensaje después de algún tiempo. También debemos reintentar el envío cuando se produce un timeout. Tenga en cuenta que en caso de un tiempo de espera podemos terminar enviando un mensaje dos veces. Esto ocurre cuando la respuesta del servidor se produce al mismo tiempo que el tiempo de espera en el cliente.
El SDK de Slack tiene su propia clase de error, SlackApiError. Algunos errores son:
- invalid_auth invalid bot_token
- channel_not_found unknown channel
- no_text text is missing, e.g. text=None
- invalid_arguments e.g. channel=None
- ratelimited when you send too many messages in s short time
Si usamos el RetryHandler, ver arriba, el error ratelimitado no ocurre. Podemos obtener los datos del error del objeto error así:
e.response.status_code
e.response.error
e.response.headers
Hay otro error que puede ocurrir aquí. Esto es cuando la URL al Slack API no es correcta (no encontrada). Esto generará un error HTTP 404. Los datos de error devueltos son diferentes de los datos de error "normales". En este caso, el objeto de error devuelve un diccionario(!):
e.response['status']
e.response['headers']
e.response['body']
Los otros dos errores temporales son
- connection_error
- timeout_error
Creamos una clase de excepción SlackError que devuelve todos los errores permanentes y temporales, lo que significa que nuestra aplicación sólo necesita capturar los errores SlackError.
Añadiendo nuestro objeto SlackResponse
Realmente no estamos interesados en todo tipo de datos, sólo queremos saber qué ha pasado y si debemos reintentar el envío. Podemos devolver un diccionario, pero para hacer el acceso más limpio, aquí usamos un namedtuple :
ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after')
Devolvemos el mismo objeto SlackResponse para ambos casos, sin errores o con errores. En caso de errores, obtenemos el objeto SlackResponse de e.args[0].
El código
Aquí está el código por si quieres probar esto. Primero establece al menos los siguientes parámetros:
SLACK_BOT_TOKEN
SLACK_CHANNEL
Slack Channel puede ser el Id o el nombre del canal. Si desea observar la función de limitación de velocidad, puede establecer:
number_of_messages = 100
El código:
import collections
import copy
import datetime
import logging
import os
import sys
import time
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.http_retry.builtin_handlers import RateLimitErrorRetryHandler
class SlackError(Exception):
pass
class SlackApi:
def __init__(
self,
config=None,
logger=None,
base_url=None,
slack_bot_token=None,
timeout=None,
use_rate_limit_handler=True
):
self.config = config
self.logger = logger
self.base_url = base_url
self.slack_bot_token = slack_bot_token
self.timeout = timeout or 5.
self.use_rate_limit_handler = use_rate_limit_handler
web_client_params = {
'token': self.slack_bot_token,
'timeout': self.timeout
}
if self.base_url is not None:
web_client_params['base_url'] = self.base_url
self.client = WebClient(**web_client_params)
if use_rate_limit_handler:
# this handler does retries when HTTP status 429 is returned
rate_limit_handler = RateLimitErrorRetryHandler(
max_retry_count=1
)
self.client.retry_handlers.append(rate_limit_handler)
def make_response(self, has_errors=False, error_message=None, status_code=None, retry_after=None):
request_msecs = int((datetime.datetime.now() - self.ts).total_seconds() * 1000)
ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after request_msecs')
return ResponseObj(has_errors, error_message, status_code, retry_after, request_msecs)
def chat_post_message(self, channel, text=None, blocks=None, user=None):
self.ts = datetime.datetime.now()
# change empty string user to None
if user is not None:
user = user.strip()
if user == '':
user = None
try:
response = self.client.chat_postMessage(
channel=channel,
text=text,
blocks=blocks,
user=user,
)
return self.make_response(
status_code=response.status_code,
)
except SlackApiError as e:
# first check for http errors
status = e.response.get('status', None)
self.logger.debug('status = {}'.format(status))
if status is not None:
status_code = status
error_message = 'http_' + str(status)
headers = e.response['headers']
else:
status_code = e.response.get('status_code', None)
error_message = e.response.get('error', None)
headers = e.response.headers
raise SlackError(self.make_response(
has_errors=True,
status_code=status_code,
error_message=error_message,
retry_after=headers.get('Retry-After', None),
))
except Exception as e:
exception = type(e).__name__
if exception == 'TimeoutError':
error_message='timeout_error'
else:
# all other errors are considered connection errors
error_message='connection_error'
raise SlackError(self.make_response(
has_errors=True,
error_message=error_message,
))
# the code below is typically in another file
# Example from:
# Stacking multiple blocks
# https://api.slack.com/messaging/composing/layouts
blocks_example = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "New request"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Type:*\nPaid Time Off"
},
{
"type": "mrkdwn",
"text": "*Created by:*\n<example.com|Fred Enriquez>"
}
]
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*When:*\nAug 10 - Aug 13"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "<https://example.com|View request>"
}
}
]
def get_logger(logger_name=None, logger_level=logging.DEBUG):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
# console
console_handler = logging.StreamHandler()
console_logger_format = '%(levelname)-8.8s [%(filename)-15s%(funcName)15s():%(lineno)03s] %(message)s'
console_handler.setFormatter(logging.Formatter(console_logger_format))
console_handler.setLevel(logger_level)
logger.addHandler(console_handler)
return logger
config = {
'SLACK_BOT_TOKEN': '',
'SLACK_CHANNEL': '',
'SLACK_USER': None
}
def main():
logger = get_logger(logger_name='slack_api')
# your bot token and channel
slack_bot_token = config.get('SLACK_BOT_TOKEN')
slack_channel = config.get('SLACK_CHANNEL')
slack_user = config.get('SLACK_USER')
# create 'messages table'
number_of_messages = 100
#number_of_messages = 1
messages = []
for i in range(number_of_messages):
# deepcopy the dictionary here!
blocks = copy.deepcopy(blocks_example)
# patch message number
blocks[0]['text']['text'] = "New request" + ': {:03d}'.format(i)
messages.append({
'slack_bot_token': slack_bot_token,
'channel': slack_channel,
'user': slack_user,
'text': 'message {:03d}: this is a test message'.format(i),
'blocks': blocks,
# delivery status
'is_delivery_completed': False,
'is_delivered': False,
'delivery_tries': 0,
'last_delivery_error': None,
})
# send the messages
for i, message in enumerate(messages):
if message['is_delivered']:
continue
logger.debug('sending message {} ...'.format(i))
slack_api = SlackApi(
logger=logger,
slack_bot_token=message.get('slack_bot_token'),
#timeout=0.1,
)
try:
r = slack_api.chat_post_message(
channel=message.get('channel'),
text=message.get('text'),
blocks=message.get('blocks'),
user=message.get('user'),
)
logger.debug('success sending message[{}]: r = {}'.format(i, r))
# delivery status
message['is_delivery_completed'] = True
message['is_delivered'] = True
message['delivery_tries'] += 1
except SlackError as e:
#logger.exception('error sending message[{}]'.format(i))
r = e.args[0]
logger.debug('problem sending message[{}]: r = {}'.format(i, r))
message['delivery_tries'] += 1
message['last_delivery_error'] = r.error_message
if r.error_message in ['timeout_error', 'connection_error']:
# delivery status, must retry
message['is_delivery_completed'] = False
message['is_delivered'] = False
else:
# delivery status, done
message['is_delivery_completed'] = True
message['is_delivered'] = False
if __name__ == '__main__':
main()
Resumen
Primero tenemos que averiguar cómo crear una aplicación Slack, darle los permisos adecuados y añadirla a un canal Slack. Para enviar mensajes a Slack usamos el SDK de Slack. El RetryHandler se encarga de la función de limitación de velocidad de Slack. Envolvimos el SDK de Slack en nuestra clase SlackApi personalizada. También añadimos nuestra propia clase SlackError que maneja todos (!) los errores que pueden ocurrir, incluyendo timeouts y errores de conexión.
Nuestra aplicación ahora puede crear los mensajes y bloques y enviarlos a Slack usando nuestro SlackApi.
Lo bueno de escribir una clase wrapper, es que tenemos que revisar todos los parámetros y respuestas posibles. Esa es una forma (y no tan mala) de aprender los entresijos de un paquete Python .
Enlaces / créditos
Python Slack SDK - RetryHandler
https://slack.dev/python-slack-sdk/web/index.html#retryhandler
Python Slack SDK - Web Client
https://slack.dev/python-slack-sdk/web/index.html
Slack - Package slack_sdk
https://slack.dev/python-slack-sdk/api-docs/slack_sdk
Slack - Rate Limits
https://api.slack.com/docs/rate-limits
Slack API - Python Slack SDK
https://slack.dev/python-slack-sdk
Using the Slack Web API
https://api.slack.com/web
Leer más
Slack
Recientes
- Obtener una lista de YouTube vídeos de una persona
- De Docker-Composer a Docker Swarm: Configs
- Docker-Componer proyectos con nombres de servicio idénticos
- X Automatización web y scraping con Selenium
- Aiohttp con servidores DNS personalizados, Unbound y Docker
- Devuelve sólo los valores de una lista de registros de FastAPI
Más vistos
- Usando UUIDs en lugar de Integer Autoincrement Primary Keys con SQLAlchemy y MariaDb
- Usando Python's pyOpenSSL para verificar los certificados SSL descargados de un host
- Usando PyInstaller y Cython para crear un ejecutable de Python
- Conectarse a un servicio en un host Docker desde un contenedor Docker
- SQLAlchemy: Uso de Cascade Deletes para eliminar objetos relacionados
- Flask RESTful API validación de parámetros de solicitud con esquemas Marshmallow