

WPS 多维表格 AirScript 作为国产轻量化高效自动化接口,完美适配影刀 RPA 自动化流程,全套覆盖数据查询、新增、批量新增、数据更新、批量删除、图片文件获取六大业务场景,对应影刀标准化流程:
AirScript 是 WPS 多维表格专属在线脚本引擎,兼容标准 JS 语法,配套影刀 Python 封装 SDK,一站式搞定多维表全流程自动化。
记录全量 CRUD:单条 / 批量新增、多条件筛选查询、更新、删除多维表数据
智能分页拉取:超大数据集自动分段循环查询,避免接口超时、页面卡顿
跨字段关联联动:多表数据自动匹配、汇总计算、实时同步
云文档互通:跨多维表格、跨文档数据调用、文件归档管理
HTTP 网络请求:对接第三方系统、Webhook 双向数据流转
自动化触发:定时任务、事件触发脚本,无人值守批量处理数据
附件图片管理:Base64 上传图片、获取文件永久下载链接
WPS 多维表格:AirScript 开放接口全免费使用,不限调用次数、不限数据量级,个人 / 企业均可零成本对接集成
飞书多维表格:高级 API、批量接口、Webhook 联动必须开通 VIP 付费,免费版严格限制接口频次与数据权限
钉钉多维表格:接口权限单独付费申请,免费版仅基础读写功能,大数据批量接口全面收费
WPS 自研分布式高性能引擎,日常接口响应极低,千级并发、多人协同依旧流畅不锁表。
钉钉、飞书同等数据规模下,接口响应延迟更高,频繁批量调用极易卡顿、请求超时。
同等百万行海量业务数据,WPS AirScript 获取百万数据仅耗时 10 分 35 秒
钉钉单表数据行数硬性上限极低,大数据处理严重受限;飞书百万数据处理耗时远高于 WPS,成本更高、稳定性更差。
零成本永久免费
WPS 全量 AirScript 接口无调用次数限制、无数据量级门槛、无需开通付费 VIP;钉钉 / 飞书批量大数据、高级筛选、Webhook 联动全部单独收费,免费版严重限流限数据。
接口响应极速低延迟
原生分布式引擎,日常接口毫秒级响应,高并发调用不卡顿、不超时,远优于钉钉飞书高延迟接口。
百万数据超强算力
同等百万行全量业务数据,WPS 分页遍历全流程仅10 分 35 秒;钉钉单表行数硬性上限极低,飞书大数据处理耗时翻倍、极易接口崩溃。
影刀深度适配
完美匹配影刀 RPA 全流程自动化,6 大标准化 flow 模块开箱即用,不用二次开发即可落地企业全场景数据自动化。

每个自定义指令都要两个参数






查询条件就是数据表筛选格式如下
[("字段1","条件","条件内容"),("字段2","条件2","条件2内容")]
条件包含一下
等于
不等于
大于
大于等于
小于
小于等于
介于取等
介于不等
开头是
结尾是
包含
不包含
指定值
为空
不为空
案例展示:

[("日期","等于","2026/04/23"),("平台","等于","淘宝"),("工厂名字","等于","领航")]
获取数据表名称 日期字段为2026/04/23和平台为淘宝,工厂名称为领行的数据
先要获取修改数据的id可以通过查询筛选拿到

数据字典:{"字段名称":"字段内容","字段名称1":"字段内容1"}
图文字典: {"文件字段名称":"文件路径地址","文件字段名称1":"文件路径地址1"}

uploaid 可从查询数据源获取图片获取

通过查询数据拿到要删除的id列表实现批量删除

数据列表 批量上传数据字典即可

数据字典和图文字典和更新一致,数据字典为必填项,图文字典需要上传图片填写不上传图片不填写
对比钉钉、飞书多维表格接口,WPS AirScript接口永久免费无额度限制、响应速度更快、百万级海量数据处理实测仅需 10 分 35 秒,零成本即可实现企业全流程数据自动化对接。
function main(Context) {
const {
operateType,
sheetName, // 用表名,不是 ID
sheetId = 0,
records = [],
recordIds = [],
imgbas64 = "",
recordId = "",
imgname = "",
pagefy = "",
filter = null,
upId = "",
maxRecords = undefined
} = Context.argv;
// 根据表名获取正确 sheetId
let realSheetId = sheetId;
if (sheetName) {
const allSheets = Application.Sheet.GetSheets();
for (let i = 0; i < allSheets.length; i++) {
const s = allSheets[i];
if (s.name.trim() === sheetName.trim()) {
realSheetId = s.id;
break;
}
}
}
// 只新增:根据表名获取索引
function getSheetIndexByName(name){
const sheets = Application.Sheet.GetSheets();
for(let i=0;i<sheets.length;i++){
if(sheets[i].name.trim() === name.trim()){
return i+1; // 索引从1开始
}
}
return 1;
}
if (operateType === "query") {
function fetchAllRecords() {
let allRecords = [];
let offset = pagefy ? pagefy : null;
let pageCount = 0; // 计数,控制每次只查3页
// 最多查2页就停止,防止超时
while ((allRecords.length === 0 || offset) && pageCount < 2) {
const queryParams = {
SheetId: realSheetId,
Offset: offset,
PageSize: 1000, // 严格遵守最大1000,不动!
MaxRecords: maxRecords
};
if (filter) queryParams.Filter = filter;
if (!maxRecords) delete queryParams.MaxRecords;
const recordsResult = Application.Record.GetRecords(queryParams);
offset = recordsResult.offset;
allRecords = allRecords.concat(recordsResult.records);
pageCount++; // 页数+1
if (!offset) break;
}
// 返回当前查到的所有数据 + 下一页offset,给Python继续拉
return [offset, allRecords];
}
return fetchAllRecords();
}
// 上传图片(只修改这里:使用索引)其他完全不动
if (operateType === "ym") {
return Application.Sheets(getSheetIndexByName(sheetName)).Views(1).RecordRange(recordId, `@${imgname}`).Value = Application.DBCellValue([{
fileData: imgbas64,
fileName: `${recordId}.png`
}]);
}
// 获取图片下载地址
if (operateType === "xz") {
return Application.Record.GetAttachmentURL({
UploadId: upId,
Source: "upload_ks3"
});
}
// 新建
if (operateType === "create") {
return Application.Record.CreateRecords({
SheetId: realSheetId,
Records: records
});
}
// 删除
if (operateType === "delete") {
return Application.Record.DeleteRecords({
SheetId: realSheetId,
RecordIds: recordIds
});
}
// 更新
if (operateType === "update") {
return Application.Record.UpdateRecords({
SheetId: realSheetId,
Records: [{
id: recordId,
fields: records[0].fields
}]
});
}
throw new Error("不支持的操作类型:" + operateType);
}
return main(Context);import xbot
from xbot import print, sleep
from . import package
from .package import variables as glv
import requests
import base64
import asyncio
import aiohttp
# ======================== 中文条件映射(官方标准) ========================
CONDITION_MAP = {
"等于": "Equals",
"不等于": "NotEqu",
"大于": "Greater",
"大于等于": "GreaterEqu",
"小于": "Less",
"小于等于": "LessEqu",
"介于取等": "GreaterEquAndLessEqu",
"介于不等": "LessOrGreater",
"开头是": "BeginWith",
"结尾是": "EndWith",
"包含": "Contains",
"不包含": "NotContains",
"指定值": "Intersected",
"为空": "Empty",
"不为空": "NotEmpty"
}
# ======================== 同步统一请求 ========================
def request_api(api_url, api_token, op_type, argv_data):
HEADERS = {
"Content-Type": "application/json",
"AirScript-Token": api_token
}
res = requests.post(
api_url,
json={"Context": {"argv": {"operateType": op_type, **argv_data}}},
headers=HEADERS
)
result = res.json()
data = result.get("data", {})
err = result.get("error", "").strip()
res_data = data.get("result")
if res_data is None or err:
if err != "运行超时":
if err:
raise Exception(err)
return res_data
# ======================== 异步高性能请求(大数据提速专用) ========================
async def _async_request_api(session, api_url, api_token, op_type, argv_data):
HEADERS = {
"Content-Type": "application/json",
"AirScript-Token": api_token
}
async with session.post(
api_url,
json={"Context": {"argv": {"operateType": op_type, **argv_data}}},
headers=HEADERS
) as resp:
result = await resp.json()
data = result.get("data", {})
err = result.get("error", "").strip()
res_data = data.get("result")
if res_data is None or err:
if err != "运行超时":
if err:
raise Exception(err)
return res_data
# ======================== 全表数据查询(自动分页循环) ========================
def query_all(api_url, api_token, sheet_name):
async def run():
all_data = []
page = ""
async with aiohttp.ClientSession() as session:
while True:
res = await _async_request_api(
session, api_url, api_token, "query",
{"sheetName": sheet_name, "pagefy": page}
)
offset, records = res
all_data += records
if not offset:
break
page = offset
return all_data
return asyncio.run(run())
# ======================== 单条件筛选查询 ========================
def query_filter(api_url, api_token, sheet_name, field, cn_op, value):
op_en = CONDITION_MAP[cn_op]
filters = {
"mode": "AND",
"criteria": [{"field": field, "op": op_en, "values": [value]}]
}
async def run():
all_data = []
page = ""
async with aiohttp.ClientSession() as session:
while True:
res = await _async_request_api(
session, api_url, api_token, "query", {
"sheetName": sheet_name,
"pagefy": page,
"filter": filters
}
)
offset, records = res
all_data += records
if not offset:
break
page = offset
return all_data
return asyncio.run(run())
# ======================== 多条件组合筛选查询 ========================
def query_filter_multi(api_url, api_token, sheet_name, conditions, mode="AND"):
criteria = []
filters = {}
if conditions:
if len(conditions) != 0:
for field, cn_op, value in conditions:
op_en = CONDITION_MAP[cn_op]
criteria.append({"field": field, "op": op_en, "values": [value]})
filters = {"mode": mode, "criteria": criteria}
async def run():
all_data = []
page = ""
async with aiohttp.ClientSession() as session:
while True:
res = await _async_request_api(
session, api_url, api_token, "query", {
"sheetName": sheet_name,
"pagefy": page,
"filter": filters
}
)
offset, records = res
all_data += records
if not offset:
break
page = offset
return all_data
return asyncio.run(run())
# 单条新增数据
def create_data(api_url, api_token, sheet_name, fields):
return request_api(api_url, api_token, "create", {
"sheetName": sheet_name,
"records": [{"fields": fields}]
})
# 批量新增海量数据
def batch_create(api_url, api_token, sheet_name, records_list):
formatted_records = [{"fields": item} for item in records_list]
return request_api(api_url, api_token, "create", {
"sheetName": sheet_name,
"records": formatted_records
})
# 更新单行数据
def update_data(api_url, api_token, sheet_name, record_id, fields):
return request_api(api_url, api_token, "update", {
"sheetName": sheet_name,
"recordId": record_id,
"records": [{"fields": fields}]
})
# 单条删除
def delete_data(api_url, api_token, sheet_name, record_id):
return request_api(api_url, api_token, "delete", {"sheetName": sheet_name, "recordIds": [record_id]})
# 批量删除多行数据
def batch_delete(api_url, api_token, sheet_name, record_ids):
return request_api(api_url, api_token, "delete", {
"sheetName": sheet_name,
"recordIds": record_ids
})
# 上传本地图片至多维表格
def upload_img(api_url, api_token, sheet_name, record_id, field_name, img_path):
with open(img_path, "rb") as f:
b64 = base64.b64encode(f.read()).decode()
return request_api(api_url, api_token, "ym", {
"sheetName": sheet_name,
"recordId": record_id,
"imgname": field_name,
"imgbas64": f"data:image/png;base64,{b64}"
})
# 获取图片文件下载地址
def get_image_url(api_url, api_token, sheet_name, upload_id, source="upload_ks3"):
return request_api(api_url, api_token, "xz", {
"sheetName": sheet_name,
"upId": upload_id,
"source": source
})
# 新增数据同时附带多张图片
def create_with_imgs(api_url, api_token, sheet_name, fields, img_dict):
rid = create_data(api_url, api_token, sheet_name, fields)[0]["id"]
for field, path in img_dict.items():
for i in range(3):
try:
upload_img(api_url, api_token, sheet_name, rid, field, path)
break
except Exception as e:
pass
# 更新数据同时替换多张附件图片
def update_with_imgs(api_url, api_token, sheet_name, rid, fields, img_dict):
update_data(api_url, api_token, sheet_name, rid, fields)
for field, path in img_dict.items():
for i in range(3):
try:
upload_img(api_url, api_token, sheet_name, rid, field, path)
break
except Exception as e:
pass
# ===================== 实战调用示例 =====================
def main(args):
# 读取影刀全局变量配置
API_URL = glv["hook"]
API_TOKEN = glv["密钥"]
# 多条件筛选:查询所有带有图片附件的数据
conditions = [
("图片和附件", "不等于", ""),
]
data = query_filter_multi(API_URL, API_TOKEN, "测试", conditions, mode="AND")
print([i["fields"]["图片和附件"][0]["uploadId"] for i in data])
print(get_image_url(API_URL, API_TOKEN, "测试", [i["fields"]["图片和附件"][0]["uploadId"] for i in data][1]))