钉钉和影刀交互。通过钉钉机器人操作影刀。让每个人都成为机器人的主人
评论
收藏

钉钉和影刀交互。通过钉钉机器人操作影刀。让每个人都成为机器人的主人

经验分享
总有人间一两风
2025-07-25 09:40·浏览量:1152
总有人间一两风
影刀专家
发布于 2025-07-25 09:37更新于 2025-07-25 09:401152浏览

业务场景。客户艾特机器人告诉机器人产品型号,机器人接收到之后去操作网页修改产品。然后将修改完后的产品截图返回到钉钉。

实现人机交互,让客服小伙伴们也可以成为机器人的主人


效果如图,修改完之后。发送图片回到钉钉


实现方式如下:在钉钉后台配置一个stream模式的机器人。


接下里给你们上代码了:



import xbot
from xbot import print, sleep
from . import package
from .package import variables as glv
import asyncio
from pyppeteer import launch
import traceback
import argparse
import requests
import logging
import time
from datetime import datetime, timedelta
from dingtalk_stream import AckMessage
import dingtalk_stream
import xbot_visual
from . import process1
from . import process2
from . import aliyun
import os
import queue
import threading
import re

all_values = [item for sublist in glv['good_dict'].values() for item in sublist]
unique_colors = list(set(all_values))


class SingletonQueue:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super(SingletonQueue, cls).__new__(cls)
                cls._instance.queue = queue.Queue()
        return cls._instance

    def put(self, item):
        self.queue.put(item)

    def get(self, timeout=None):
        return self.queue.get(timeout=timeout)

    def join(self):
        self.queue.join()

    def task_done(self):
        self.queue.task_done()

    def empty(self):
        return self.queue.empty()

    def size(self):
        return self.queue.qsize()

oss = aliyun.AliyunOss()


def setup_logger():
    logger = logging.getLogger()
    handler = logging.StreamHandler()
    handler.setFormatter(
        logging.Formatter('%(asctime)s %(name)-8s %(levelname)-8s %(message)s [%(filename)s:%(lineno)d]'))
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    return logger


def define_options():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--client_id', dest='client_id', required=False,
    )
    parser.add_argument(
        '--client_secret', dest='client_secret', required=False,
    )
    options = parser.parse_args()
    options.client_id = '***************************'   #这里是你钉钉后台的Client ID
    options.client_secret = '***************************'#这里是你钉钉后台的Client Secret
    return options


class EchoMarkdownHandler(dingtalk_stream.ChatbotHandler):
    def __init__(self, logger: logging.Logger = None):
        super(dingtalk_stream.ChatbotHandler, self).__init__()
        if logger:
            self.logger = logger

    async def process(self, callback: dingtalk_stream.CallbackMessage):
        queue = SingletonQueue()
        incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data)
        queue.put(incoming_message)
        print('!!!!!!!!!!!!!!!!!!!!!!!!!')
        return AckMessage.STATUS_OK, "OK"


class message_hanlder(dingtalk_stream.ChatbotHandler):
    def __init__(self, logger: logging.Logger = None):
        super(dingtalk_stream.ChatbotHandler, self).__init__()
        if logger:
            self.logger = logger

    async def get_message(self):
        shared_queue = SingletonQueue()
        print("消费进入")
        while True:
            try:
                # 获取队列中的任务,设置timeout以便能够在循环中检查停止条件
                incoming_message = shared_queue.get(timeout=1)
                print(incoming_message.text.content.strip())
                self.test(incoming_message)
                shared_queue.task_done()
                print(f"队列剩余数量:{shared_queue.size()}")
            except queue.Empty:
                continue
            except Exception as e:
                print(f"出现其他错误{e}")
                continue

    def test(self, incoming_message):
        try:
            quotation_str = incoming_message.text.content.strip()       #这是你艾特机器人的文字
            if quotation_str is None or quotation_str == "":
                raise ValueError(f"请填写数据后在艾特机器人")
            # 输入内容做分割
            quotation_list = [loop_item.upper() for loop_item in quotation_str.split(' ') if
                              loop_item is not '' and loop_item is not None]
            # 意向单简易判断
            if quotation_list[0].strip()[:2].upper() != "BJ":
                raise ValueError(f"当前意向单号没有以【BJ】开头,请规范后重新@机器人")
            elif len(quotation_list) > 3 or len(quotation_list) < 2:
                raise ValueError(
                    f"当前信息超过或未到达预定长度,最多输入三个参数,最少输入两个参数")

           #然后你可以根据你实际的业务情况对文本进行校验了




            # 这一段是代码调用你的可视化流程。就是我去操作网页的,模拟人工的
            p_arg = {
                "意向单号": quotation_list[0].strip().upper(),
                "系列": series,
                "颜色": color
            }
            process1_result = process1.main(p_arg)

            # 我始有个图片回传。这里是将图片上传到图床了
            url = oss.putFile(glv['screenshot_save_file_name'])
            self.reply_markdown(quotation_list[0].strip().upper(), f"![意向单]({url})", incoming_message)
        except Exception as e:
            self.reply_text(str(e), incoming_message)


def main(args):
    logger = setup_logger()
    options = define_options()
    print(f'!!!!!!!!!!!!!!!!{options.client_id, options.client_secret}')
    credential = dingtalk_stream.Credential(options.client_id, options.client_secret)
    client = dingtalk_stream.DingTalkStreamClient(credential)

    # 创建消费者线程
    consumer = message_hanlder(dingtalk_stream.chatbot.ChatbotMessage.TOPIC)
    consumer_thread = threading.Thread(target=lambda: asyncio.run(consumer.get_message()))
    consumer_thread.daemon = True
    consumer_thread.start()

    # 注册生产者处理器并启动客户端
    client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, EchoMarkdownHandler(logger))
    client.start_forever()

上面调用可视化流程的参数如图


然后丢在一台电脑上一直运行,只要有人找机器人。他随时在线回应你


这样大家就可以获得一个24小时的员工。并且还不是普通的机器人。是一个听你指令的机器人



收藏9
全部评论1
最新
发布评论
评论