【影刀RPA POST交互 。 DEMO静默登录并获取缺货订单号 --聚水潭ERP】
评论
收藏

【影刀RPA POST交互 。 DEMO静默登录并获取缺货订单号 --聚水潭ERP】

经验分享
没有逻辑的张先生
2026-03-24 12:12·浏览量:181
没有逻辑的张先生
影刀专家
影刀认证工程师
发布于 2026-03-24 12:12181浏览
# 使用提醒:
# 1. xbot包提供软件自动化、数据表格、Excel、日志、AI等功能
# 2. package包提供访问当前应用数据的功能,如获取元素、访问全局变量、获取资源文件等功能
# 3. 当此模块作为流程独立运行时执行main函数
# 4. 可视化流程中可以通过"调用模块"的指令使用此模块

import xbot
from xbot import print, sleep
from .import package
from .package import variables as glv
import requests
import json
from urllib.parse import urlencode

import requests
import json
import time
import os
import re

class ERPAPI:
    def __init__(self, account=None, password=None):
        self.session = requests.Session()
        self.cookie_file = "erp_cookies.json"
        self.login_url = "https://api.erp321.com/erp/webapi/UserApi/WebLogin/Passport"
        self.order_url = "https://www.erp321.com/app/order/order/list.aspx"
        self.stock_url = "https://www.erp321.com/app/item/SkuStock/WmsSkuStock.aspx"
        self.account = account
        self.password = password
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0",
            "Accept": "application/json, text/plain, */*",
            "Content-Type": "application/json",
        }
        self.page_vie = {self.login_url: None, self.order_url: None, self.stock_url: None}
        self.session.headers.update(self.headers)

    def get_page_vie(self, url):  
        try:
            response = self.session.get(url, timeout=30)
            response.raise_for_status()
            
            pattern = r'id="__VIEWSTATE" value="([^"]+)"'
            match = re.search(pattern, response.text)
            __VIEWSTATE = match.group(1) if match else None
            
            pattern = r'id="__VIEWSTATEGENERATOR" value="([^"]+)"'
            match = re.search(pattern, response.text)
            __VIEWSTATEGENERATOR = match.group(1) if match else None
            
            return {"__VIEWSTATE": __VIEWSTATE, "__VIEWSTATEGENERATOR": __VIEWSTATEGENERATOR}
        except Exception as e:
            return None

    def load_cookies(self):
        if os.path.exists(self.cookie_file):
            try:
                with open(self.cookie_file, "r", encoding="utf-8") as f:
                    cookies = json.load(f)
                    self.session.cookies.update(cookies)
                    return True
            except Exception as e:
                pass
        return False

    def save_cookies(self):
        try:
            cookies = {cookie.name: cookie.value for cookie in self.session.cookies}
            with open(self.cookie_file, "w", encoding="utf-8") as f:
                json.dump(cookies, f, ensure_ascii=False, indent=2)
        except Exception as e:
            pass

    def login(self, account=None, password=None):
        if account is None:
            account = self.account
        if password is None:
            password = self.password

        if not account or not password:
            return False

        data = {"data": {"account": account, "password": password, "isApp": False}}

        try:
            response = self.session.post(self.login_url, json=data, timeout=30)
            response.raise_for_status()
            result = response.json()

            if result.get("code") == 0:
                self.save_cookies()
                return True
            return False
        except Exception as e:
            return False

    def _make_request_with_retry(self, request_func, max_retries=2):
        for attempt in range(max_retries):
            try:
                result = request_func()
                if result is not None:
                    return result
            except Exception as e:
                pass

            if attempt < max_retries - 1:
                if self.account and self.password:
                    if self.login():
                        continue
        return None

    def _get_viewstate(self, url):
        """获取页面的 viewstate"""
        viewstate = self.page_vie.get(url)
        if viewstate is None:
            viewstate = self.get_page_vie(url)
            if viewstate is None:
                return None
            self.page_vie[url] = viewstate
        return viewstate

    def _parse_response(self, response):
        """解析响应数据"""
        try:
            content = response.text
            start_index = content.find("0|")
            if start_index != -1:
                json_data = json.loads(content[start_index + 2 :])
                return json_data
            return None
        except Exception as e:
            return None

    def get_orders(self):
        def _get_orders_request():
            viewstate = self._get_viewstate(self.order_url)
            if viewstate is None:
                return None

            headers = {
                "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
                "X-Requested-With": "XMLHttpRequest",
            }

            params = {
                "_c": "jst-epaas",
                "ts___": str(int(time.time() * 1000)),
                "am___": "LoadDataToJSON",
            }

            data = {
                "__VIEWSTATE": viewstate["__VIEWSTATE"],
                "__VIEWSTATEGENERATOR": viewstate["__VIEWSTATEGENERATOR"],  
                "_jt_page_increament_enabled": "true",
                "_jt_page_increament_key_name": "o_id",
                "_jt_page_size": "500",
                "_jt_page_action": "1",
                "fe_remark_type": "single",
                "__CALLBACKID": "JTable1",
                "__CALLBACKPARAM": '{"Method":"LoadDataToJSON","Args":["1","[{\\"k\\":\\"status\\",\\"v\\":\\"question\\",\\"c\\":\\"=\\"},{\\"k\\":\\"question_type\\",\\"v\\":\\"缺货\\",\\"c\\":\\"=\\"}]","{}"]}',
            }

            response = self.session.post(
                self.order_url, params=params, headers=headers, data=data, timeout=30
            )
            response.raise_for_status()
            return self._parse_response(response)

        return self._make_request_with_retry(_get_orders_request)

    def get_stock(self):
        def _get_stock_request():
            viewstate = self._get_viewstate(self.stock_url)
            if viewstate is None:
                return None

            headers = {
                "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
                "X-Requested-With": "XMLHttpRequest",
            }

            params = {
                "owner_co_id": "13700425",
                "authorize_co_id": "15473350",
                "ts___": str(int(time.time() * 1000)),
                "am___": "LoadDataToJSON",
            }

            data = {
                "__VIEWSTATE": viewstate["__VIEWSTATE"],
                "__VIEWSTATEGENERATOR": viewstate["__VIEWSTATEGENERATOR"],  
                "owner_co_id": "13700425",
                "authorize_co_id": "15473350",
                "qty-order_lock%2B%20isnull(virtual_qty%2C0)": ["", "-1"],
                "_jt_page_size": "500",
                "__CALLBACKID": "JTable1",
                "__CALLBACKPARAM": '{"Method":"LoadDataToJSON","Args":["1","[{\\"k\\":\\"qty-order_lock+ isnull(virtual_qty,0)\\",\\"v\\":\\"-1\\",\\"c\\":\\"<=\\"}]","{}"]}',
            }

            response = self.session.post(
                url=self.stock_url,
                params=params,
                headers=headers,
                data=data,
                timeout=30,
            )
            response.raise_for_status()
            return self._parse_response(response)

        return self._make_request_with_retry(_get_stock_request)

    def parse_out_of_stock(self, order_data):
        if not order_data:
            return []

        return_value = order_data.get("ReturnValue")
        if not return_value:
            return []

        try:
            return_value = json.loads(order_data["ReturnValue"])
            datas = return_value.get("datas", [])

            out_of_stock_orders = []
            for item in datas:
                if item.get("question_type") == "缺货":
                    out_of_stock_orders.append(
                        {
                            "outer_so_id": item.get("outer_so_id"),
                            "so_id": item.get("so_id"),
                            "order_date": item.get("order_date"),
                            "question_desc": item.get("question_desc"),
                        }
                    )

            return out_of_stock_orders
        except Exception as e:
            return []

    def parse_stock_items(self, stock_data):
        if not stock_data:
            return []

        return_value = stock_data.get("ReturnValue")
        if not return_value:
            return []

        try:
            return_value = json.loads(stock_data["ReturnValue"])
            datas = return_value.get("datas", [])

            stock_items = []
            for item in datas:
                sku_id = item.get("sku_id")
                name = item.get("name")
                qty = item.get("qty", 0)
                order_lock = item.get("order_lock", 0)
                available_qty = qty - order_lock

                if available_qty < 0:
                    stock_items.append(
                        {
                            "sku_id": sku_id,
                            "name": name,
                            "shortage_qty": abs(available_qty),
                            "qty": qty,
                            "order_lock": order_lock,
                        }
                    )

            return stock_items
        except Exception as e:
            return []

def main(args):
    erp = ERPAPI(account="你的账号", password="你的密码")
    
    if not erp.load_cookies():
        if not erp.login():
            return None, None

    order_data = erp.get_orders()
    缺货订单号 = []
    if order_data:
        out_of_stock_orders = erp.parse_out_of_stock(order_data)
        for order in out_of_stock_orders:
            # print(f"外部订单号: {order['outer_so_id']}")
            # print(f"内部订单号: {order['so_id']}")
            # print(f"订单日期: {order['order_date']}")
            # print(f"缺货原因: {order['question_desc']}")
            # print("-" * 50)
            缺货订单号.append(order['so_id'])
    print(缺货订单号)
    pass

后台登录管理cookie,查询订单缺货单号demo。
其余操作皆可照搬。🤖

收藏1
全部评论1
最新
发布评论
评论