乐于分享
好东西不私藏

开源免费用!WPS 多维表格 AirScript 影刀指令合集,百万数据查询仅 10 分 35 秒

开源免费用!WPS 多维表格 AirScript 影刀指令合集,百万数据查询仅 10 分 35 秒

一、前言

WPS 多维表格 AirScript 作为国产轻量化高效自动化接口,完美适配影刀 RPA 自动化流程,全套覆盖数据查询、新增、批量新增、数据更新、批量删除、图片文件获取六大业务场景,对应影刀标准化流程:

二、WPS AirScript 多维表格核心指令

AirScript 是 WPS 多维表格专属在线脚本引擎,兼容标准 JS 语法,配套影刀 Python 封装 SDK,一站式搞定多维表全流程自动化。

1. 多维表格核心业务指令

  • 记录全量 CRUD:单条 / 批量新增、多条件筛选查询、更新、删除多维表数据

  • 智能分页拉取:超大数据集自动分段循环查询,避免接口超时、页面卡顿

  • 跨字段关联联动:多表数据自动匹配、汇总计算、实时同步

2. 高级拓展指令

  • 云文档互通:跨多维表格、跨文档数据调用、文件归档管理

  • HTTP 网络请求:对接第三方系统、Webhook 双向数据流转

  • 自动化触发:定时任务、事件触发脚本,无人值守批量处理数据

  • 附件图片管理:Base64 上传图片、获取文件永久下载链接

三、WPS VS 钉钉 & 飞书:三大碾压级核心优势

1. 接口永久免费,无额度、无会员门槛

  • WPS 多维表格:AirScript 开放接口全免费使用,不限调用次数、不限数据量级,个人 / 企业均可零成本对接集成

  • 飞书多维表格:高级 API、批量接口、Webhook 联动必须开通 VIP 付费,免费版严格限制接口频次与数据权限

  • 钉钉多维表格:接口权限单独付费申请,免费版仅基础读写功能,大数据批量接口全面收费

2. 毫秒级极速响应,低延迟高并发不卡顿

WPS 自研分布式高性能引擎,日常接口响应极低,千级并发、多人协同依旧流畅不锁表。

钉钉、飞书同等数据规模下,接口响应延迟更高,频繁批量调用极易卡顿、请求超时。

3. 百万级大数据超强算力,实测仅 10 分 35 秒

同等百万行海量业务数据,WPS AirScript 获取百万数据仅耗时 10 分 35

已关注

关注

重播 分享

 秒钉钉单表数据行数硬性上限极低,大数据处理严重受限;飞书百万数据处理耗时远高于 WPS,成本更高、稳定性更差。

WPS AirScript VS 钉钉 / 飞书多维表格核心优势

  1. 零成本永久免费

    WPS 全量 AirScript 接口无调用次数限制、无数据量级门槛、无需开通付费 VIP;钉钉 / 飞书批量大数据、高级筛选、Webhook 联动全部单独收费,免费版严重限流限数据。

  2. 接口响应极速低延迟

    原生分布式引擎,日常接口毫秒级响应,高并发调用不卡顿、不超时,远优于钉钉飞书高延迟接口。

  3. 百万数据超强算力

    同等百万行全量业务数据,WPS 分页遍历全流程仅10 分 35 秒;钉钉单表行数硬性上限极低,飞书大数据处理耗时翻倍、极易接口崩溃。

  4. 影刀深度适配

    完美匹配影刀 RPA 全流程自动化,6 大标准化 flow 模块开箱即用,不用二次开发即可落地企业全场景数据自动化。

WPS 多维表格 AirScript × 影刀 RPA 全套 Python 调用使用说明

每个自定义指令都要两个参数

webhook地址

key密钥地址

WPS 多维表格查询数据.flow

查询条件就是数据表筛选格式如下

[(“字段1″,”条件”,”条件内容”),(“字段2″,”条件2″,”条件2内容”)]

条件包含一下

等于

不等于

大于

大于等于

小于

小于等于

介于取等

介于不等

开头是

结尾是

包含

不包含

指定值

为空

不为空

案例展示:

[(“日期”,”等于”,”2026/04/23″),(“平台”,”等于”,”淘宝”),(“工厂名字”,”等于”,”领航”)]

获取数据表名称   日期字段为2026/04/23和平台为淘宝,工厂名称为领行的数据

WPS 多维表格更新数据.flow

先要获取修改数据的id可以通过查询筛选拿到

数据字典:{“字段名称”:”字段内容”,”字段名称1″:”字段内容1″}

图文字典: {“文件字段名称”:”文件路径地址”,”文件字段名称1″:”文件路径地址1″}

WPS 多维表格获取图片或文件.flow

uploaid 可从查询数据源获取图片获取

WPS 多维表格批量删除.flow

通过查询数据拿到要删除的id列表实现批量删除

WPS 多维表格批量新增.flow

数据列表 批量上传数据字典即可

WPS 多维表格新增数据.flow

数据字典和图文字典和更新一致,数据字典为必填项,图文字典需要上传图片填写不上传图片不填写

对比钉钉、飞书多维表格接口,WPS AirScript接口永久免费无额度限制、响应速度更快、百万级海量数据处理实测仅需 10 分 35 秒,零成本即可实现企业全流程数据自动化对接。

js代码

function main(Context) {    const {         operateType,        sheetName,    // 用表名,不是 ID        sheetId = 0,        records = [],        recordIds = [],        imgbas64 = "",        recordId = "",        imgname = "",        pagefy = "",        filter = null,        upId = "",        maxRecords = undefined    } = Context.argv;    // 根据表名获取正确 sheetId    let realSheetId = sheetId;    if (sheetName) {        const allSheets = Application.Sheet.GetSheets();        for (let i = 0; i < allSheets.length; i++) {            const s = allSheets[i];            if (s.name.trim() === sheetName.trim()) {                realSheetId = s.id;                break;            }        }    }    // 只新增:根据表名获取索引    function getSheetIndexByName(name){        const sheets = Application.Sheet.GetSheets();        for(let i=0;i<sheets.length;i++){            if(sheets[i].name.trim() === name.trim()){                return i+1// 索引从1开始            }        }        return 1;    }    if (operateType === "query") {    function fetchAllRecords() {        let allRecords = [];        let offset = pagefy ? pagefy : null;        let pageCount = 0// 计数,控制每次只查3页        // 最多查2页就停止,防止超时        while ((allRecords.length === 0 || offset) && pageCount < 2) {            const queryParams = {                SheetId: realSheetId,                Offset: offset,                PageSize1000,    // 严格遵守最大1000,不动!                MaxRecords: maxRecords            };            if (filter) queryParams.Filter = filter;            if (!maxRecords) delete queryParams.MaxRecords;            const recordsResult = Application.Record.GetRecords(queryParams);            offset = recordsResult.offset;            allRecords = allRecords.concat(recordsResult.records);            pageCount++; // 页数+1            if (!offset) break;        }        // 返回当前查到的所有数据 + 下一页offset,给Python继续拉        return [offset, allRecords];    }    return fetchAllRecords();}    // 上传图片(只修改这里:使用索引)其他完全不动    if (operateType === "ym") {        return Application.Sheets(getSheetIndexByName(sheetName)).Views(1).RecordRange(recordId, `@${imgname}`).Value = Application.DBCellValue([{            fileData: imgbas64,            fileName`${recordId}.png`        }]);    }    // 获取图片下载地址    if (operateType === "xz") {        return Application.Record.GetAttachmentURL({            UploadId: upId,            Source"upload_ks3"        });    }    // 新建    if (operateType === "create") {        return Application.Record.CreateRecords({            SheetId: realSheetId,            Records: records        });    }    // 删除    if (operateType === "delete") {        return Application.Record.DeleteRecords({            SheetId: realSheetId,            RecordIds: recordIds        });    }    // 更新    if (operateType === "update") {        return Application.Record.UpdateRecords({            SheetId: realSheetId,            Records: [{                id: recordId,                fields: records[0].fields            }]        });    }    throw new Error("不支持的操作类型:" + operateType);}return main(Context);
python
import xbotfrom xbot import print, sleepfrom . import packagefrom .package import variables as glvimport requestsimport base64import asyncioimport aiohttp# ======================== 中文条件映射(官方标准) ========================CONDITION_MAP = {    "等于""Equals",    "不等于""NotEqu",    "大于""Greater",    "大于等于""GreaterEqu",    "小于""Less",    "小于等于""LessEqu",    "介于取等""GreaterEquAndLessEqu",    "介于不等""LessOrGreater",    "开头是""BeginWith",    "结尾是""EndWith",    "包含""Contains",    "不包含""NotContains",    "指定值""Intersected",    "为空""Empty",    "不为空""NotEmpty"}# ======================== 同步统一请求 ========================def request_api(api_url, api_token, op_type, argv_data):    HEADERS = {        "Content-Type""application/json",        "AirScript-Token": api_token    }    res = requests.post(        api_url,        json={"Context": {"argv": {"operateType": op_type, **argv_data}}},        headers=HEADERS    )    result = res.json()    data = result.get("data", {})    err = result.get("error""").strip()    res_data = data.get("result")    if res_data is None or err:        if err != "运行超时":            if err:                raise Exception(err)    return res_data# ======================== 异步高性能请求(大数据提速专用) ========================async def _async_request_api(session, api_url, api_token, op_type, argv_data):    HEADERS = {        "Content-Type""application/json",        "AirScript-Token": api_token    }    async with session.post(        api_url,        json={"Context": {"argv": {"operateType": op_type, **argv_data}}},        headers=HEADERS    ) as resp:        result = await resp.json()        data = result.get("data", {})        err = result.get("error""").strip()        res_data = data.get("result")        if res_data is None or err:            if err != "运行超时":                if err:                    raise Exception(err)        return res_data# ======================== 全表数据查询(自动分页循环) ========================def query_all(api_url, api_token, sheet_name):    async def run():        all_data = []        page = ""        async with aiohttp.ClientSession() as session:            while True:                res = await _async_request_api(                    session, api_url, api_token, "query"                    {"sheetName": sheet_name, "pagefy": page}                )                offset, records = res                all_data += records                if not offset:                    break                page = offset        return all_data    return asyncio.run(run())# ======================== 单条件筛选查询 ========================def query_filter(api_url, api_token, sheet_name, field, cn_op, value):    op_en = CONDITION_MAP[cn_op]    filters = {        "mode""AND",        "criteria": [{"field": field, "op": op_en, "values": [value]}]    }    async def run():        all_data = []        page = ""        async with aiohttp.ClientSession() as session:            while True:                res = await _async_request_api(                    session, api_url, api_token, "query", {                        "sheetName": sheet_name,                        "pagefy": page,                        "filter": filters                    }                )                offset, records = res                all_data += records                if not offset:                    break                page = offset        return all_data    return asyncio.run(run())# ======================== 多条件组合筛选查询 ========================def query_filter_multi(api_url, api_token, sheet_name, conditions, mode="AND"):    criteria = []    filters = {}    if conditions:        if len(conditions) != 0:            for field, cn_op, value in conditions:                op_en = CONDITION_MAP[cn_op]                criteria.append({"field": field, "op": op_en, "values": [value]})            filters = {"mode": mode, "criteria": criteria}    async def run():        all_data = []        page = ""        async with aiohttp.ClientSession() as session:            while True:                res = await _async_request_api(                    session, api_url, api_token, "query", {                        "sheetName": sheet_name,                        "pagefy": page,                        "filter": filters                    }                )                offset, records = res                all_data += records                if not offset:                    break                page = offset        return all_data    return asyncio.run(run())# 单条新增数据def create_data(api_url, api_token, sheet_name, fields):    return request_api(api_url, api_token, "create", {        "sheetName": sheet_name,        "records": [{"fields": fields}]    })# 批量新增海量数据def batch_create(api_url, api_token, sheet_name, records_list):    formatted_records = [{"fields": item} for item in records_list]    return request_api(api_url, api_token, "create", {        "sheetName": sheet_name,        "records": formatted_records    })# 更新单行数据def update_data(api_url, api_token, sheet_name, record_id, fields):    return request_api(api_url, api_token, "update", {        "sheetName": sheet_name,        "recordId": record_id,        "records": [{"fields": fields}]    })# 单条删除def delete_data(api_url, api_token, sheet_name, record_id):    return request_api(api_url, api_token, "delete", {"sheetName": sheet_name, "recordIds": [record_id]})# 批量删除多行数据def batch_delete(api_url, api_token, sheet_name, record_ids):    return request_api(api_url, api_token, "delete", {        "sheetName": sheet_name,        "recordIds": record_ids    })# 上传本地图片至多维表格def upload_img(api_url, api_token, sheet_name, record_id, field_name, img_path):    with open(img_path, "rb"as f:        b64 = base64.b64encode(f.read()).decode()    return request_api(api_url, api_token, "ym", {        "sheetName": sheet_name,        "recordId": record_id,        "imgname": field_name,        "imgbas64"f"data:image/png;base64,{b64}"    })# 获取图片文件下载地址def get_image_url(api_url, api_token, sheet_name, upload_id, source="upload_ks3"):    return request_api(api_url, api_token, "xz", {        "sheetName": sheet_name,        "upId": upload_id,        "source": source    })# 新增数据同时附带多张图片def create_with_imgs(api_url, api_token, sheet_name, fields, img_dict):    rid = create_data(api_url, api_token, sheet_name, fields)[0]["id"]    for field, path in img_dict.items():        for i in range(3):            try:                upload_img(api_url, api_token, sheet_name, rid, field, path)                break            except Exception as e:                pass# 更新数据同时替换多张附件图片def update_with_imgs(api_url, api_token, sheet_name, rid, fields, img_dict):    update_data(api_url, api_token, sheet_name, rid, fields)    for field, path in img_dict.items():        for i in range(3):            try:                upload_img(api_url, api_token, sheet_name, rid, field, path)                break            except Exception as e:                pass# ===================== 实战调用示例 =====================def main(args):    # 读取影刀全局变量配置    API_URL = glv["hook"]    API_TOKEN = glv["密钥"]    # 多条件筛选:查询所有带有图片附件的数据    conditions = [        ("图片和附件""不等于"""),    ]    data = query_filter_multi(API_URL, API_TOKEN, "测试", conditions, mode="AND")    print([i["fields"]["图片和附件"][0]["uploadId"for i in data])    print(get_image_url(API_URL, API_TOKEN, "测试", [i["fields"]["图片和附件"][0]["uploadId"for i in data][1]))

开发者可自行封装免费开源,自定义指令已手搓完成一键三连找DC哥获取!

猜你喜欢

DCpage:基于 DrissionPage 打造的影刀专属自定义指令库