

{ "scopes": { "tenant": [ "aily:message:read", "aily:message:write", "application:application.app_message_stats.overview:readonly", "im:app_feed_card:write", "im:message", "im:message.group_at_msg:readonly", "im:message.group_msg", "im:message.p2p_msg:readonly", "im:message.pins:read", "im:message.pins:write_only", "im:message.reactions:read", "im:message.reactions:write_only", "im:message.urgent", "im:message.urgent.status:write", "im:message.urgent:phone", "im:message.urgent:sms", "im:message:readonly", "im:message:recall", "im:message:send_as_bot", "im:message:send_multi_depts", "im:message:send_multi_users", "im:message:send_sys_msg", "im:message:update" ], "user": [] } }
import lark_oapi as lark ## P2ImMessageReceiveV1 为接收消息 v2.0;CustomizedEvent 内的 message 为接收消息 v1.0。 def do_p2_im_message_receive_v1(data: lark.im.v1.P2ImMessageReceiveV1) -> None: print(f'[ do_p2_im_message_receive_v1 access ], data: {lark.JSON.marshal(data, indent=4)}') def do_message_event(data: lark.CustomizedEvent) -> None: print(f'[ do_customized_event access ], type: message, data: {lark.JSON.marshal(data, indent=4)}') event_handler = lark.EventDispatcherHandler.builder("", "") \ .register_p2_im_message_receive_v1(do_p2_im_message_receive_v1) \ .register_p1_customized_event("这里填入你要自定义订阅的 event 的 key,例如 out_approval", do_message_event) \ .build() def main(): cli = lark.ws.Client("YOUR_APP_ID", "YOUR_APP_SECRET", event_handler=event_handler, log_level=lark.LogLevel.DEBUG) cli.start() if __name__ == "__main__": main()
3.2将上面代码的appid appkey改为自己的应用id,订阅的事件key改为“do_p2_im_message_receive_v1”
下面是我的参考代码记得改下appid和key:
# 使用提醒:
# 1. xbot包提供软件自动化、数据表格、Excel、日志、AI等功能
# 2. package包提供访问当前应用数据的功能,如获取元素、访问全局变量、获取资源文件等功能
# 3. 当此模块作为流程独立运行时执行main函数
# 4. 可视化流程中可以通过"调用模块"的指令使用此模块
import xbot
import re
import json
from xbot import print, sleep
from .import package
from .package import variables as glv
import lark_oapi as lark
## P2ImMessageReceiveV1 为接收消息 v2.0;CustomizedEvent 内的 message 为接收消息 v1.0。
from .import process1
from .import process2
def do_p2_im_message_receive_v1(data: lark.im.v1.P2ImMessageReceiveV1) -> None:
content_json = data.event.message.content
print(f"原始消息内容: {content_json}")
try:
# 1. 解析JSON字符串
content_dict = json.loads(content_json)
# 2. 获取消息文本
text_content = content_dict.get('text', '')
print(f"消息文本: {text_content}")
if not text_content:
print("消息文本为空,跳过处理")
return
# 3. 检查是否是告警消息
if "查看告警聊天详情:" in text_content:
print("检测到告警消息,开始处理...")
# 提取URL
url = text_content.split("查看告警聊天详情:")[-1].strip()
print(f"告警详情URL: {url}")
# 提取飞鸽会话ID
ID = extract_id_regex(text_content)
print(f"飞鸽会话ID: {ID}")
if url and ID:
# 打开网页
print("正在打开告警详情页面...")
web_page = xbot.web.create(url, 'edge')
sleep(2) # 等待页面加载
# 调用process1进行截图操作
print(f"调用process1,参数ID={ID}")
process1.main({"ID": ID})
# # 调用process2
# print("调用process2...")
# process2.main({})
else:
print("URL或ID提取失败")
else:
print("非告警消息,跳过处理")
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}, 原始内容: {content_json}")
except Exception as e:
print(f"处理消息时发生错误: {e}")
import traceback
traceback.print_exc()
# 方法1:正则表达式提取
def extract_id_regex(text):
# 匹配"飞鸽会话ID:"后面的数字
pattern = r'飞鸽会话ID[::]\s*(\d+)'
match = re.search(pattern, text)
if match:
return match.group(1)
return None
def do_message_event(data: lark.CustomizedEvent) -> None:
print(f'[ do_customized_event access ], type: message, data: {lark.JSON.marshal(data, indent=4)}')
# 通过 event_handler 的 register_{事件版本}_{事件类型 或 customized_event} 方法处理不同的事件
event_handler = lark.EventDispatcherHandler.builder("", "") \
.register_p2_im_message_receive_v1(do_p2_im_message_receive_v1) \
.build()
# .register_p1_customized_event("这里填入你要自定义订阅的 event 的 key,例如 out_approval", do_message_event) \
def main(args):
cli = lark.ws.Client("cli_XXXXXXXXc4", "XXXXXXXXXn",
event_handler=event_handler,
log_level=lark.LogLevel.DEBUG)
cli.start()
if __name__ == "__main__":
main()
3.3写完后运行上述代码,然后再到飞书开放平台点击配置长连接,成功之后再添加事件【im.message.receive_v1】即可
后续代码调用可视化流程啥的可以参考下这篇文章