

发布于 2026-01-05 17:531660浏览整体方案基于飞书开放平台的事件订阅(WebHook)+ 消息v2接口 实现,核心流程是:飞书群消息触发WebHook回调 → 服务端接收并解析消息 → 按规则批量转发(支持飞书内/外转发),以下是分步拆解的可落地思路。
本文是通过我们影刀rpa高级任务【飞书调度】引发的衍生思考
App ID/App Secret(后续接口调用必备)。GET请求带challenge参数,服务端需返回该参数完成验证(示例代码见下文)。| 权限名称 | 权限标识 | 用途 |
| 获取群组中所有消息 | im:message.group:readonly | 读取群消息内容 |
| 发送消息 | im:message:send | 批量转发消息(飞书内) |
| 获取用户信息 | contact:user:readonly | 解析消息发送人信息 |
| 获取群组信息 | im:chat:readonly | 解析群信息 |

larksuite-sdk)、Web框架(Flask/FastAPI)。作用:接收飞书推送的message.receive_v2事件,解析消息内容、群ID、发送人等信息。
from flask import Flask, request, jsonify
from larksuiteoapi import Config, Context
from larksuiteoapi.event import handle_event
from larksuiteoapi.model import OapiHeader, OapiRequest
# 初始化飞书配置
app_config = Config.new_internal_app_config(
app_id="你的App ID",
app_secret="你的App Secret"
)
app = Flask(__name__)
# 验证WebHook地址(飞书首次配置时触发)
@app.route('/feishu/webhook', methods=['GET'])
def verify_webhook():
challenge = request.args.get('challenge')
return jsonify({"challenge": challenge})
# 接收飞书消息事件
@app.route('/feishu/webhook', methods=['POST'])
def receive_message():
# 解析飞书推送的事件
header = OapiHeader(request.headers)
req = OapiRequest(request.data, request.args)
ctx = Context()
# 处理事件
def event_handler(ctx, event):
# 仅处理群消息事件
if event.header.event_type == "message.receive_v2":
message = event.body
# 解析核心字段
chat_id = message.get("chat_id") # 群ID
msg_id = message.get("message_id") # 消息ID
msg_type = message.get("msg_type") # 消息类型(text/image/file等)
content = message.get("content") # 消息内容(JSON字符串)
sender = message.get("sender") # 发送人信息
# 1. 存储消息(可选:存入数据库,用于批量转发)
save_message(chat_id, msg_id, msg_type, content, sender)
# 2. 实时转发(或批量转发:定时读取数据库后转发)
batch_forward_message(chat_id, msg_id, msg_type, content)
return {"code": 0, "msg": "success"}
# 调用飞书SDK处理事件
handle_event(app_config, ctx, header, req, event_handler)
return jsonify({"code": 0})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)批量转发需先缓存消息,推荐用轻量数据库(如SQLite/MySQL)存储核心字段:
| 字段名 | 类型 | 说明 |
| id | int | 自增主键 |
| chat_id | varchar | 群ID |
| msg_id | varchar | 消息ID(唯一) |
| msg_type | varchar | 消息类型(text/image) |
| content | text | 消息内容 |
| sender_id | varchar | 发送人ID |
| create_time | datetime | 消息发送时间 |
| is_forwarded | tinyint | 是否已转发(0/1) |
支持两种转发场景:飞书内转发(转发到其他群/私聊)、飞书外转发(如公众号/企业微信/钉钉)。
调用飞书「发送消息v2接口」,批量将消息转发到目标群/用户:
import requests
import json
from larksuiteoapi import Client
# 初始化飞书客户端
client = Client.new_internal_app_client(app_config)
def batch_forward_message(chat_id, msg_id, msg_type, content):
# 目标转发群/用户ID列表
target_chat_ids = ["oc_xxxxxx", "ou_xxxxxx"] # 群ID/用户ID
# 构建转发消息体(适配不同消息类型)
content_dict = json.loads(content)
if msg_type == "text":
msg_content = {
"text": f"【转发自{chat_id}】{content_dict.get('text')}"
}
elif msg_type == "image":
msg_content = {
"image_key": content_dict.get("image_key")
}
# 其他类型(file/video等)同理适配
# 批量发送
for target_chat_id in target_chat_ids:
# 调用飞书发送消息接口
resp = client.im.message.create(
params={"receive_id_type": "chat_id"},
body={
"receive_id": target_chat_id,
"msg_type": msg_type,
"content": json.dumps(msg_content)
}
)
if resp.code == 0:
# 更新数据库:标记为已转发
update_message_status(msg_id, is_forwarded=1)
else:
print(f"转发失败:{resp.msg}")需对接微信公众号API(前提:公众号已认证,获取AppID/AppSecret):
def forward_to_wechat(content, sender):
# 1. 获取微信access_token
wechat_appid = "你的公众号AppID"
wechat_secret = "你的公众号AppSecret"
token_url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={wechat_appid}&secret={wechat_secret}"
token_resp = requests.get(token_url).json()
access_token = token_resp.get("access_token")
# 2. 发送模板消息/客服消息到公众号
send_url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={access_token}"
wechat_data = {
"touser": "公众号用户OpenID",
"msgtype": "text",
"text": {
"content": f"飞书群消息转发:{content}"
}
}
resp = requests.post(send_url, json=wechat_data)
return resp.json()msg_id去重,避免重复转发。将批量转发的消息整理成公众号文章,核心步骤:
def generate_wechat_article(chat_id, start_time, end_time):
# 1. 查询指定时间段的群消息
messages = query_messages(chat_id, start_time, end_time)
# 2. 拼接文章内容
article_content = f"""
<h1>飞书群{chat_id}消息汇总({start_time} - {end_time})</h1>
<ul>
"""
for msg in messages:
article_content += f"""
<li>
<p>发送人:{msg['sender_id']} | 时间:{msg['create_time']}</p>
<p>内容:{msg['content']}</p>
</li>
"""
article_content += "</ul>"
# 3. 导出为HTML(可直接复制到公众号编辑器)
with open("feishu_message_article.html", "w", encoding="utf-8") as f:
f.write(article_content)
return "文章生成完成:feishu_message_article.html"time.sleep(0.1))。ngrok http 8080,将生成的https://xxx.ngrok.io/feishu/webhook填入飞书事件订阅的请求网址。该方案可适配影刀RPA等自动化工具(影刀可调用上述接口,实现可视化配置监听/转发规则),核心是依托飞书消息v2接口的事件推送能力,实现消息的监听-存储-转发闭环。
其他扩展学习 影刀高级任务触发
https://www.yingdao.com/yddoc/iPaaS/zh-CN/748145303894437888