开发者通过集成飞书 SDK 与开放平台建立一条 WebSocket 全双工通道,当有事件回调发生时,开放平台会通过该通道向开发者发送消息。
与传统的 Webhook 模式相比,长连接模式大大降低了接入成本,将原先 1 周左右的开发周期降低到 5 分钟。具体优势如下:
接入限制:
如果你已经使用飞书 SDK 通过 Webhook 模式订阅事件,那几乎是零成本切换,因为所有的业务逻辑都可以复用,只需修改启动方式即可。
pip install lark-oapi==1.1.0b1import requests
import json,time
from lark_oapi import EventDispatcherHandler, ws, JSON, im, LogLevel
class FeishuConfig:
'''飞书API的配置信息'''
APP_ID = 'cli_xxxxxxxxxxxxxxx'
APP_SECRET = '7Tjxxxxxxxxxxxxxxxxxxxxxxxxx'
class FeishuApi:
'''FeishuApi类用于处理与飞书API的交互'''
TOKEN_URL = 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal'
REPLY_MESSAGE_URL_TEMPLATE = 'https://open.feishu.cn/open-apis/im/v1/messages/{message_id}/reply'
HEADERS = {'Content-Type': 'application/json; charset=utf-8'}
def __init__(self):
self.session = requests.Session()
self.token = self.get_token()
def get_token(self):
'''获取飞书API的访问令牌'''
data = {'app_id': FeishuConfig.APP_ID, 'app_secret': FeishuConfig.APP_SECRET}
response = self.session.post(self.TOKEN_URL, headers=self.HEADERS, json=data)
response.raise_for_status()
return response.json().get('tenant_access_token')
def reply_message(self, message_id, user_id, message):
'''回复飞书群聊消息'''
url = self.REPLY_MESSAGE_URL_TEMPLATE.format(message_id=message_id)
content = f'{{"text":"<at user_id=\\"{user_id}\\"></at> {message}"}}'
data = {"content": content, "msg_type": "text"}
headers = {'Authorization': 'Bearer ' + self.token, **self.HEADERS}
response = self.session.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()
def handle_p2_im_message(data: im.v1.P2ImMessageReceiveV1):
'''处理接收到的个人或群聊消息'''
data_dict = json.loads(JSON.marshal(data))
message_id = data_dict["event"]["message"]["message_id"]
content = eval(data.event.message.content).get("text")
content = content.split(" ")[-1].replace('"}', '').strip()
user_id = data_dict["event"]["sender"]["sender_id"]["user_id"]
# time.sleep(5)
feishu = FeishuApi()
feishu.reply_message(message_id=message_id, user_id=user_id, message=f'已经收到您的消息啦:{content}')
def main():
'''启动飞书长连接 WebSocket客户端'''
event_handler = EventDispatcherHandler.builder("", "") \
.register_p2_im_message_receive_v1(handle_p2_im_message) \
.build()
cli = ws.Client(FeishuConfig.APP_ID, FeishuConfig.APP_SECRET, event_handler=event_handler, log_level=LogLevel.DEBUG)
cli.start()
if __name__ == "__main__":
main()
步骤说明:



5.页面下方按需可添加事件即可

开始发挥你的想象定制脚本了,常见的有以下:
1.聊天触发器:接收消息后修改本地txt或其他文件,然后触发影刀运行
2.聊天调度:接受消息后调用影刀api,这样你就可以调度多台电脑运行任务(企业版功能)
3.远程关机:接收消息后执行一条cmd命令,让电脑关机
4.发送电脑文件:接收消息后,让机器人发送指定电脑文件给你
5.........发挥自己的想象哈

