开源免费用!WPS 多维表格 AirScript 影刀指令合集,百万数据查询仅 10 分 35 秒
评论
收藏

开源免费用!WPS 多维表格 AirScript 影刀指令合集,百万数据查询仅 10 分 35 秒

经验分享
【耐家军】DC
2026-04-24 11:55·浏览量:208
【耐家军】DC
影刀专家
影刀认证工程师
发布于 2026-04-24 11:55208浏览

一、前言

WPS 多维表格 AirScript 作为国产轻量化高效自动化接口,完美适配影刀 RPA 自动化流程,全套覆盖数据查询、新增、批量新增、数据更新、批量删除、图片文件获取六大业务场景,对应影刀标准化流程:

二、WPS AirScript 多维表格核心指令

AirScript 是 WPS 多维表格专属在线脚本引擎,兼容标准 JS 语法,配套影刀 Python 封装 SDK,一站式搞定多维表全流程自动化。

1. 多维表格核心业务指令

  • 记录全量 CRUD:单条 / 批量新增、多条件筛选查询、更新、删除多维表数据

  • 智能分页拉取:超大数据集自动分段循环查询,避免接口超时、页面卡顿

  • 跨字段关联联动:多表数据自动匹配、汇总计算、实时同步

2. 高级拓展指令

  • 云文档互通:跨多维表格、跨文档数据调用、文件归档管理

  • HTTP 网络请求:对接第三方系统、Webhook 双向数据流转

  • 自动化触发:定时任务、事件触发脚本,无人值守批量处理数据

  • 附件图片管理:Base64 上传图片、获取文件永久下载链接

三、WPS VS 钉钉 & 飞书:三大碾压级核心优势

1. 接口永久免费,无额度、无会员门槛

  • WPS 多维表格:AirScript 开放接口全免费使用,不限调用次数、不限数据量级,个人 / 企业均可零成本对接集成

  • 飞书多维表格:高级 API、批量接口、Webhook 联动必须开通 VIP 付费,免费版严格限制接口频次与数据权限

  • 钉钉多维表格:接口权限单独付费申请,免费版仅基础读写功能,大数据批量接口全面收费

2. 毫秒级极速响应,低延迟高并发不卡顿

WPS 自研分布式高性能引擎,日常接口响应极低,千级并发、多人协同依旧流畅不锁表。

钉钉、飞书同等数据规模下,接口响应延迟更高,频繁批量调用极易卡顿、请求超时。

3. 百万级大数据超强算力,实测仅 10 分 35 秒

同等百万行海量业务数据,WPS AirScript 获取百万数据仅耗时 10 分 35 秒

钉钉单表数据行数硬性上限极低,大数据处理严重受限;飞书百万数据处理耗时远高于 WPS,成本更高、稳定性更差。

WPS AirScript VS 钉钉 / 飞书多维表格核心优势

  1. 零成本永久免费

    WPS 全量 AirScript 接口无调用次数限制、无数据量级门槛、无需开通付费 VIP;钉钉 / 飞书批量大数据、高级筛选、Webhook 联动全部单独收费,免费版严重限流限数据。

  2. 接口响应极速低延迟

    原生分布式引擎,日常接口毫秒级响应,高并发调用不卡顿、不超时,远优于钉钉飞书高延迟接口。

  3. 百万数据超强算力

    同等百万行全量业务数据,WPS 分页遍历全流程仅10 分 35 秒;钉钉单表行数硬性上限极低,飞书大数据处理耗时翻倍、极易接口崩溃。

  4. 影刀深度适配

    完美匹配影刀 RPA 全流程自动化,6 大标准化 flow 模块开箱即用,不用二次开发即可落地企业全场景数据自动化。

WPS 多维表格 AirScript × 影刀 RPA 全套 Python 调用使用说明

每个自定义指令都要两个参数

webhook地址

key密钥地址

WPS 多维表格查询数据.flow

查询条件就是数据表筛选格式如下

[("字段1","条件","条件内容"),("字段2","条件2","条件2内容")]

条件包含一下

等于

不等于

大于

大于等于

小于

小于等于

介于取等

介于不等

开头是

结尾是

包含

不包含

指定值

为空

不为空

案例展示:

[("日期","等于","2026/04/23"),("平台","等于","淘宝"),("工厂名字","等于","领航")]

获取数据表名称 日期字段为2026/04/23和平台为淘宝,工厂名称为领行的数据

WPS 多维表格更新数据.flow

先要获取修改数据的id可以通过查询筛选拿到

数据字典:{"字段名称":"字段内容","字段名称1":"字段内容1"}

图文字典: {"文件字段名称":"文件路径地址","文件字段名称1":"文件路径地址1"}

WPS 多维表格获取图片或文件.flow

uploaid 可从查询数据源获取图片获取

WPS 多维表格批量删除.flow

通过查询数据拿到要删除的id列表实现批量删除

WPS 多维表格批量新增.flow

数据列表 批量上传数据字典即可

WPS 多维表格新增数据.flow

数据字典和图文字典和更新一致,数据字典为必填项,图文字典需要上传图片填写不上传图片不填写

对比钉钉、飞书多维表格接口,WPS AirScript接口永久免费无额度限制、响应速度更快、百万级海量数据处理实测仅需 10 分 35 秒,零成本即可实现企业全流程数据自动化对接。

js代码

function main(Context) {
    const { 
        operateType,
        sheetName,    // 用表名,不是 ID
        sheetId = 0,
        records = [],
        recordIds = [],
        imgbas64 = "",
        recordId = "",
        imgname = "",
        pagefy = "",
        filter = null,
        upId = "",
        maxRecords = undefined
    } = Context.argv;

    // 根据表名获取正确 sheetId
    let realSheetId = sheetId;
    if (sheetName) {
        const allSheets = Application.Sheet.GetSheets();
        for (let i = 0; i < allSheets.length; i++) {
            const s = allSheets[i];
            if (s.name.trim() === sheetName.trim()) {
                realSheetId = s.id;
                break;
            }
        }
    }

    // 只新增:根据表名获取索引
    function getSheetIndexByName(name){
        const sheets = Application.Sheet.GetSheets();
        for(let i=0;i<sheets.length;i++){
            if(sheets[i].name.trim() === name.trim()){
                return i+1; // 索引从1开始
            }
        }
        return 1;
    }

    if (operateType === "query") {
    function fetchAllRecords() {
        let allRecords = [];
        let offset = pagefy ? pagefy : null;
        let pageCount = 0; // 计数,控制每次只查3页
        
        // 最多查2页就停止,防止超时
        while ((allRecords.length === 0 || offset) && pageCount < 2) {
            const queryParams = {
                SheetId: realSheetId,
                Offset: offset,
                PageSize: 1000,    // 严格遵守最大1000,不动!
                MaxRecords: maxRecords
            };
            if (filter) queryParams.Filter = filter;
            if (!maxRecords) delete queryParams.MaxRecords;

            const recordsResult = Application.Record.GetRecords(queryParams);
            offset = recordsResult.offset;
            allRecords = allRecords.concat(recordsResult.records);
            
            pageCount++; // 页数+1
            if (!offset) break;
        }
        
        // 返回当前查到的所有数据 + 下一页offset,给Python继续拉
        return [offset, allRecords];
    }
    return fetchAllRecords();
}

    // 上传图片(只修改这里:使用索引)其他完全不动
    if (operateType === "ym") {
        return Application.Sheets(getSheetIndexByName(sheetName)).Views(1).RecordRange(recordId, `@${imgname}`).Value = Application.DBCellValue([{
            fileData: imgbas64,
            fileName: `${recordId}.png`
        }]);
    }

    // 获取图片下载地址
    if (operateType === "xz") {
        return Application.Record.GetAttachmentURL({
            UploadId: upId,
            Source: "upload_ks3"
        });
    }

    // 新建
    if (operateType === "create") {
        return Application.Record.CreateRecords({
            SheetId: realSheetId,
            Records: records
        });
    }

    // 删除
    if (operateType === "delete") {
        return Application.Record.DeleteRecords({
            SheetId: realSheetId,
            RecordIds: recordIds
        });
    }

    // 更新
    if (operateType === "update") {
        return Application.Record.UpdateRecords({
            SheetId: realSheetId,
            Records: [{
                id: recordId,
                fields: records[0].fields
            }]
        });
    }

    throw new Error("不支持的操作类型:" + operateType);
}

return main(Context);

python代码

import xbot
from xbot import print, sleep
from . import package
from .package import variables as glv
import requests
import base64
import asyncio
import aiohttp

# ======================== 中文条件映射(官方标准) ========================
CONDITION_MAP = {
    "等于": "Equals",
    "不等于": "NotEqu",
    "大于": "Greater",
    "大于等于": "GreaterEqu",
    "小于": "Less",
    "小于等于": "LessEqu",
    "介于取等": "GreaterEquAndLessEqu",
    "介于不等": "LessOrGreater",
    "开头是": "BeginWith",
    "结尾是": "EndWith",
    "包含": "Contains",
    "不包含": "NotContains",
    "指定值": "Intersected",
    "为空": "Empty",
    "不为空": "NotEmpty"
}

# ======================== 同步统一请求 ========================
def request_api(api_url, api_token, op_type, argv_data):
    HEADERS = {
        "Content-Type": "application/json",
        "AirScript-Token": api_token
    }
    res = requests.post(
        api_url,
        json={"Context": {"argv": {"operateType": op_type, **argv_data}}},
        headers=HEADERS
    )
    result = res.json()
    data = result.get("data", {})
    err = result.get("error", "").strip()
    res_data = data.get("result")
    if res_data is None or err:
        if err != "运行超时":
            if err:
                raise Exception(err)
    return res_data

# ======================== 异步高性能请求(大数据提速专用) ========================
async def _async_request_api(session, api_url, api_token, op_type, argv_data):
    HEADERS = {
        "Content-Type": "application/json",
        "AirScript-Token": api_token
    }
    async with session.post(
        api_url,
        json={"Context": {"argv": {"operateType": op_type, **argv_data}}},
        headers=HEADERS
    ) as resp:
        result = await resp.json()
        data = result.get("data", {})
        err = result.get("error", "").strip()
        res_data = data.get("result")
        if res_data is None or err:
            if err != "运行超时":
                if err:
                    raise Exception(err)
        return res_data

# ======================== 全表数据查询(自动分页循环) ========================
def query_all(api_url, api_token, sheet_name):
    async def run():
        all_data = []
        page = ""
        async with aiohttp.ClientSession() as session:
            while True:
                res = await _async_request_api(
                    session, api_url, api_token, "query", 
                    {"sheetName": sheet_name, "pagefy": page}
                )
                offset, records = res
                all_data += records
                if not offset:
                    break
                page = offset
        return all_data
    return asyncio.run(run())

# ======================== 单条件筛选查询 ========================
def query_filter(api_url, api_token, sheet_name, field, cn_op, value):
    op_en = CONDITION_MAP[cn_op]
    filters = {
        "mode": "AND",
        "criteria": [{"field": field, "op": op_en, "values": [value]}]
    }

    async def run():
        all_data = []
        page = ""
        async with aiohttp.ClientSession() as session:
            while True:
                res = await _async_request_api(
                    session, api_url, api_token, "query", {
                        "sheetName": sheet_name,
                        "pagefy": page,
                        "filter": filters
                    }
                )
                offset, records = res
                all_data += records
                if not offset:
                    break
                page = offset
        return all_data
    return asyncio.run(run())

# ======================== 多条件组合筛选查询 ========================
def query_filter_multi(api_url, api_token, sheet_name, conditions, mode="AND"):
    criteria = []
    filters = {}
    if conditions:
        if len(conditions) != 0:
            for field, cn_op, value in conditions:
                op_en = CONDITION_MAP[cn_op]
                criteria.append({"field": field, "op": op_en, "values": [value]})

            filters = {"mode": mode, "criteria": criteria}

    async def run():
        all_data = []
        page = ""
        async with aiohttp.ClientSession() as session:
            while True:
                res = await _async_request_api(
                    session, api_url, api_token, "query", {
                        "sheetName": sheet_name,
                        "pagefy": page,
                        "filter": filters
                    }
                )
                offset, records = res
                all_data += records
                if not offset:
                    break
                page = offset
        return all_data
    return asyncio.run(run())

# 单条新增数据
def create_data(api_url, api_token, sheet_name, fields):
    return request_api(api_url, api_token, "create", {
        "sheetName": sheet_name,
        "records": [{"fields": fields}]
    })

# 批量新增海量数据
def batch_create(api_url, api_token, sheet_name, records_list):
    formatted_records = [{"fields": item} for item in records_list]
    return request_api(api_url, api_token, "create", {
        "sheetName": sheet_name,
        "records": formatted_records
    })

# 更新单行数据
def update_data(api_url, api_token, sheet_name, record_id, fields):
    return request_api(api_url, api_token, "update", {
        "sheetName": sheet_name,
        "recordId": record_id,
        "records": [{"fields": fields}]
    })

# 单条删除
def delete_data(api_url, api_token, sheet_name, record_id):
    return request_api(api_url, api_token, "delete", {"sheetName": sheet_name, "recordIds": [record_id]})

# 批量删除多行数据
def batch_delete(api_url, api_token, sheet_name, record_ids):
    return request_api(api_url, api_token, "delete", {
        "sheetName": sheet_name,
        "recordIds": record_ids
    })

# 上传本地图片至多维表格
def upload_img(api_url, api_token, sheet_name, record_id, field_name, img_path):
    with open(img_path, "rb") as f:
        b64 = base64.b64encode(f.read()).decode()
    return request_api(api_url, api_token, "ym", {
        "sheetName": sheet_name,
        "recordId": record_id,
        "imgname": field_name,
        "imgbas64": f"data:image/png;base64,{b64}"
    })

# 获取图片文件下载地址
def get_image_url(api_url, api_token, sheet_name, upload_id, source="upload_ks3"):
    return request_api(api_url, api_token, "xz", {
        "sheetName": sheet_name,
        "upId": upload_id,
        "source": source
    })

# 新增数据同时附带多张图片
def create_with_imgs(api_url, api_token, sheet_name, fields, img_dict):
    rid = create_data(api_url, api_token, sheet_name, fields)[0]["id"]
    for field, path in img_dict.items():
        for i in range(3):
            try:
                upload_img(api_url, api_token, sheet_name, rid, field, path)
                break
            except Exception as e:
                pass

# 更新数据同时替换多张附件图片
def update_with_imgs(api_url, api_token, sheet_name, rid, fields, img_dict):
    update_data(api_url, api_token, sheet_name, rid, fields)
    for field, path in img_dict.items():
        for i in range(3):
            try:
                upload_img(api_url, api_token, sheet_name, rid, field, path)
                break
            except Exception as e:
                pass

# ===================== 实战调用示例 =====================
def main(args):
    # 读取影刀全局变量配置
    API_URL = glv["hook"]
    API_TOKEN = glv["密钥"]

    # 多条件筛选:查询所有带有图片附件的数据
    conditions = [
        ("图片和附件", "不等于", ""),
    ]
    data = query_filter_multi(API_URL, API_TOKEN, "测试", conditions, mode="AND")
    print([i["fields"]["图片和附件"][0]["uploadId"] for i in data])
    print(get_image_url(API_URL, API_TOKEN, "测试", [i["fields"]["图片和附件"][0]["uploadId"] for i in data][1]))

开发者可自行封装免费开源,自定义指令已手搓完成需要找DC哥获取!

猜你喜欢

DCpage:基于 DrissionPage 打造的影刀专属自定义指令库

收藏3
全部评论1
最新
发布评论
评论