【分享】通过飞书群通知方式发送异常通知,以及日常的报表截图等等等………
评论
收藏

【分享】通过飞书群通知方式发送异常通知,以及日常的报表截图等等等………

经验分享
出道即颠峰
2023-12-19 22:21·浏览量:1142
出道即颠峰
影刀高级开发者
发布于 2023-12-19 21:10更新于 2023-12-19 22:211142浏览

前言:在社区看到一篇帖子  刚好之前有写过这个代码 找了找库存顺便分享给有需要的通知呦~~~

帖子


1.飞书开发者后台创建应用

https://open.feishu.cn/

2.飞书开放平台添加机器人

https://open.feishu.cn/app/cli_a4647eb479b8d00b/baseinfo

3.开通权限

4.飞书群里添加群机器人 添加创建的群机器人

5.安装requests包和requests_toolbelt包

6.代码部分

# 使用提醒:
# 1. xbot包提供软件自动化、数据表格、Excel、日志、AI等功能
# 2. package包提供访问当前应用数据的功能,如获取元素、访问全局变量、获取资源文件等功能
# 3. 当此模块作为流程独立运行时执行main函数
# 4. 可视化流程中可以通过"调用模块"的指令使用此模块

import xbot
from xbot import print, sleep
from .import package
from .package import variables as glv
import json
import requests
from requests_toolbelt import MultipartEncoder

#指令缺点,开启vpn时无法执行
#指令优点,省去找图床


class FeishuBot:
    def __init__(self, app_id, app_secret):
        self.app_id = app_id
        self.app_secret = app_secret
        self.base_url = "https://open.feishu.cn/open-apis"
        self.access_token = self._get_tenant_access_token()

    def _get_tenant_access_token(self):
        url = f"{self.base_url}/auth/v3/tenant_access_token/internal/"
        post_data = {"app_id": self.app_id, "app_secret": self.app_secret}
        response = requests.post(url, data=post_data)
        return response.json()["tenant_access_token"]

    def _get_chat_id(self, send_name):
        url = f"{self.base_url}/im/v1/chats?page_size=20"
        headers = {'Authorization': f'Bearer {self.access_token}'}
        response = requests.get(url, headers=headers)
        for chat in response.json()['data']['items']:
            if chat['name'] == send_name:
                return chat['chat_id']
        raise Exception("请核对输入群名字是否正确")

    def _get_image_key(self, image_path):
        url = f"{self.base_url}/im/v1/images"
        with open(image_path, 'rb') as f:
            form = {
                'image_type': 'message',
                'image': f
            }
            multi_form = MultipartEncoder(form)
            headers = {
                'Authorization': f"Bearer {self.access_token}",
                'Content-Type': multi_form.content_type
            }
            response = requests.post(url, headers=headers, data=multi_form)
            if response.status_code == 200:
                return response.json()["data"]["image_key"]
            else:
                raise Exception("Failed to upload image")

    def send_message(self, chat_name, image_path, message):
        chat_id = self._get_chat_id(chat_name)
        image_key = self._get_image_key(image_path)
        url = f"{self.base_url}/im/v1/messages"
        params = {"receive_id_type": "chat_id"}
        msg_content = {
            "elements": [
                {
                    "tag": "div",
                    "text": {"content": message, "tag": "plain_text"}
                },
                {
                    "tag": "img",
                    "img_key": image_key,
                    "alt": {"tag": "plain_text", "content": ""},
                    "mode": "fit_horizontal",
                    "preview": False  #是否允许用户查看原图,False为不允许,True可以点开图片查看原图
                }
            ]
        }
        req = {
            "receive_id": chat_id,
            "msg_type": "interactive",
            "content": json.dumps(msg_content)
        }
        headers = {
            'Authorization': f'Bearer {self.access_token}',
            'Content-Type': 'application/json'
        }
        response = requests.post(url, params=params, headers=headers, json=req)
        return response

def main(args):
    appid = args.get('appid')
    app_secret = args.get('app_secret')
    send_name = args.get('send_name')
    image_path = args.get('image_path')
    messages_content = args.get('messages_content')
    bot = FeishuBot(appid, app_secret)
    bot.send_message(send_name, image_path, messages_content)
    pass


7.指令流程参数设置

需要点亮流程名字右侧小眼睛哦 编辑完之后一定要发版哦

8.指令实现效果展示

收藏1
全部评论1
最新
发布评论
评论