影刀RPA分享194期|飞书群消息监听+批量转发完整实现思路(基于消息v2接口)
评论
收藏

影刀RPA分享194期|飞书群消息监听+批量转发完整实现思路(基于消息v2接口)

经验分享
小可耐教你学影刀RPA【哔哩哔哩同名】
2026-01-05 17:53·浏览量:1660
小可耐教你学影刀RPA【哔哩哔哩同名】
影刀高级开发者
发布于 2026-01-05 17:531660浏览


一、核心技术逻辑

整体方案基于飞书开放平台的事件订阅(WebHook)+ 消息v2接口 实现,核心流程是:飞书群消息触发WebHook回调 → 服务端接收并解析消息 → 按规则批量转发(支持飞书内/外转发),以下是分步拆解的可落地思路。


本文是通过我们影刀rpa高级任务【飞书调度】引发的衍生思考


二、前置准备(必做)

1. 飞书开发者平台配置

  • 步骤1:创建应用
    登录飞书开放平台 → 新建企业自建应用 → 记录App ID/App Secret(后续接口调用必备)。
  • 步骤2:配置事件订阅
    • 进入应用「事件订阅」模块 → 开启「启用事件订阅」 → 填写「请求网址」(公网可访问的WebHook服务地址,用于接收飞书推送的消息事件)。
    • 验证请求网址:飞书会发送GET请求带challenge参数,服务端需返回该参数完成验证(示例代码见下文)。
    • 添加事件:订阅「接收消息v2(message.receive_v2)」事件(核心事件,触发条件:群内有新消息)。
  • 步骤3:申请权限
    进入「权限管理」→ 申请以下核心权限(需企业管理员审批):
权限名称权限标识用途
获取群组中所有消息im:message.group:readonly读取群消息内容
发送消息im:message:send批量转发消息(飞书内)
获取用户信息contact:user:readonly解析消息发送人信息
获取群组信息im:chat:readonly解析群信息
  • 步骤4:发布应用
    应用创建后需「发布」→ 选择要监听的飞书群所在的企业/组织 → 管理员审批后生效。

2. 环境准备

  • 公网可访问的服务端(如阿里云ECS、腾讯云服务器,或用内网穿透工具如ngrok临时测试)。
  • 开发语言:Python/Java/Node.js(以下以Python为例)。
  • 依赖:飞书开放平台SDK(Python推荐larksuite-sdk)、Web框架(Flask/FastAPI)。

三、核心功能实现

1. WebHook服务(接收飞书消息事件)

作用:接收飞书推送的message.receive_v2事件,解析消息内容、群ID、发送人等信息。

from flask import Flask, request, jsonify
from larksuiteoapi import Config, Context
from larksuiteoapi.event import handle_event
from larksuiteoapi.model import OapiHeader, OapiRequest

# 初始化飞书配置
app_config = Config.new_internal_app_config(
    app_id="你的App ID",
    app_secret="你的App Secret"
)

app = Flask(__name__)

# 验证WebHook地址(飞书首次配置时触发)
@app.route('/feishu/webhook', methods=['GET'])
def verify_webhook():
    challenge = request.args.get('challenge')
    return jsonify({"challenge": challenge})

# 接收飞书消息事件
@app.route('/feishu/webhook', methods=['POST'])
def receive_message():
    # 解析飞书推送的事件
    header = OapiHeader(request.headers)
    req = OapiRequest(request.data, request.args)
    ctx = Context()
    
    # 处理事件
    def event_handler(ctx, event):
        # 仅处理群消息事件
        if event.header.event_type == "message.receive_v2":
            message = event.body
            # 解析核心字段
            chat_id = message.get("chat_id")  # 群ID
            msg_id = message.get("message_id")  # 消息ID
            msg_type = message.get("msg_type")  # 消息类型(text/image/file等)
            content = message.get("content")  # 消息内容(JSON字符串)
            sender = message.get("sender")  # 发送人信息
            
            # 1. 存储消息(可选:存入数据库,用于批量转发)
            save_message(chat_id, msg_id, msg_type, content, sender)
            
            # 2. 实时转发(或批量转发:定时读取数据库后转发)
            batch_forward_message(chat_id, msg_id, msg_type, content)
        
        return {"code": 0, "msg": "success"}
    
    # 调用飞书SDK处理事件
    handle_event(app_config, ctx, header, req, event_handler)
    return jsonify({"code": 0})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

2. 消息存储(批量转发前提)

批量转发需先缓存消息,推荐用轻量数据库(如SQLite/MySQL)存储核心字段:

字段名类型说明
idint自增主键
chat_idvarchar群ID
msg_idvarchar消息ID(唯一)
msg_typevarchar消息类型(text/image)
contenttext消息内容
sender_idvarchar发送人ID
create_timedatetime消息发送时间
is_forwardedtinyint是否已转发(0/1)

3. 批量转发实现

支持两种转发场景:飞书内转发(转发到其他群/私聊)飞书外转发(如公众号/企业微信/钉钉)

场景1:飞书内批量转发

调用飞书「发送消息v2接口」,批量将消息转发到目标群/用户:

import requests
import json
from larksuiteoapi import Client

# 初始化飞书客户端
client = Client.new_internal_app_client(app_config)

def batch_forward_message(chat_id, msg_id, msg_type, content):
    # 目标转发群/用户ID列表
    target_chat_ids = ["oc_xxxxxx", "ou_xxxxxx"]  # 群ID/用户ID
    
    # 构建转发消息体(适配不同消息类型)
    content_dict = json.loads(content)
    if msg_type == "text":
        msg_content = {
            "text": f"【转发自{chat_id}】{content_dict.get('text')}"
        }
    elif msg_type == "image":
        msg_content = {
            "image_key": content_dict.get("image_key")
        }
    # 其他类型(file/video等)同理适配
    
    # 批量发送
    for target_chat_id in target_chat_ids:
        # 调用飞书发送消息接口
        resp = client.im.message.create(
            params={"receive_id_type": "chat_id"},
            body={
                "receive_id": target_chat_id,
                "msg_type": msg_type,
                "content": json.dumps(msg_content)
            }
        )
        if resp.code == 0:
            # 更新数据库:标记为已转发
            update_message_status(msg_id, is_forwarded=1)
        else:
            print(f"转发失败:{resp.msg}")
场景2:转发到微信公众号

需对接微信公众号API(前提:公众号已认证,获取AppID/AppSecret):

def forward_to_wechat(content, sender):
    # 1. 获取微信access_token
    wechat_appid = "你的公众号AppID"
    wechat_secret = "你的公众号AppSecret"
    token_url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={wechat_appid}&secret={wechat_secret}"
    token_resp = requests.get(token_url).json()
    access_token = token_resp.get("access_token")
    
    # 2. 发送模板消息/客服消息到公众号
    send_url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={access_token}"
    wechat_data = {
        "touser": "公众号用户OpenID",
        "msgtype": "text",
        "text": {
            "content": f"飞书群消息转发:{content}"
        }
    }
    resp = requests.post(send_url, json=wechat_data)
    return resp.json()

四、批量转发策略设计

  1. 触发方式
    • 定时批量:用定时任务(如APScheduler)每5分钟读取未转发的消息,批量转发。
    • 手动触发:提供接口/按钮,手动选择时间范围/群聊,触发批量转发。
  2. 去重&过滤
    • msg_id去重,避免重复转发。
    • 过滤敏感消息:可配置关键词过滤(如“测试”“无关”),跳过无需转发的消息。
  3. 失败重试
    转发失败的消息标记为“重试中”,设置最大重试次数(如3次),避免因网络问题导致转发丢失。

五、公众号文章生成(扩展功能)

将批量转发的消息整理成公众号文章,核心步骤:

  1. 从数据库读取指定时间范围的群消息,按发送时间排序。
  2. 拼接HTML格式的文章内容(标题+消息列表+发送人+时间)。
  3. 对接微信公众号「永久素材上传接口」,生成图文素材,或直接生成markdown格式导出。
def generate_wechat_article(chat_id, start_time, end_time):
    # 1. 查询指定时间段的群消息
    messages = query_messages(chat_id, start_time, end_time)
    
    # 2. 拼接文章内容
    article_content = f"""
    <h1>飞书群{chat_id}消息汇总({start_time} - {end_time})</h1>
    <ul>
    """
    for msg in messages:
        article_content += f"""
        <li>
            <p>发送人:{msg['sender_id']} | 时间:{msg['create_time']}</p>
            <p>内容:{msg['content']}</p>
        </li>
        """
    article_content += "</ul>"
    
    # 3. 导出为HTML(可直接复制到公众号编辑器)
    with open("feishu_message_article.html", "w", encoding="utf-8") as f:
        f.write(article_content)
    
    return "文章生成完成:feishu_message_article.html"

六、注意事项

  1. 权限合规
    • 「获取群组中所有消息」属于敏感权限,需企业管理员审批,且需告知群成员消息监听/转发规则(合规要求)。
    • 避免转发隐私信息(如手机号、身份证),可添加敏感信息脱敏逻辑。
  2. 接口限流
    飞书开放平台接口有调用频率限制(如发送消息接口600次/分钟),批量转发时需控制速率(如添加延时time.sleep(0.1))。
  3. 服务稳定性
    生产环境建议用Gunicorn+Nginx部署WebHook服务,添加日志记录(如ELK),便于排查问题。
  4. 内网穿透(测试用)
    本地开发时,可用ngrok生成公网地址:ngrok http 8080,将生成的https://xxx.ngrok.io/feishu/webhook填入飞书事件订阅的请求网址。

七、整体流程总结

  1. 飞书开放平台配置应用→申请权限→订阅消息事件。
  2. 部署WebHook服务,接收并存储群消息。
  3. 设计批量转发策略(定时/手动),调用飞书/公众号API转发。
  4. 扩展:将消息汇总生成公众号文章。

该方案可适配影刀RPA等自动化工具(影刀可调用上述接口,实现可视化配置监听/转发规则),核心是依托飞书消息v2接口的事件推送能力,实现消息的监听-存储-转发闭环。



其他扩展学习 影刀高级任务触发

https://www.yingdao.com/yddoc/iPaaS/zh-CN/748145303894437888




收藏7
全部评论1
最新
发布评论
评论