【案例分享】封装一个支持Iframe定位的Xpath查找方法(彻底解决跨域问题!!!)
回答
收藏

【案例分享】封装一个支持Iframe定位的Xpath查找方法(彻底解决跨域问题!!!)

欧拉
2024-02-22 14:13·浏览量:2298
欧拉
影刀见习开发者
发布于 2023-12-01 16:06更新于 2024-02-22 14:132298浏览

1. 实现原理:

  1. 对Xpath语句按照Iframe进行拆分,逐级查找直到完成;
  2. 跨域下查找,需要通过Iframe的父元素结合Selector拿到Iframe的子元素再进行查找才可以;
  3. 如果遇到Iframe元素,生成动态选择器,切换到Iframe下面的HTML标签;


2. 注意事项:

如需要定位的元素处于Iframe下面,在编写Xpath时需要指明Iframe节点的定位方式


3. 示例代码:

背景:以天猫超市登录为例,需要获取淘宝登录的页面(跨域),下面的登录滑块(同域);

网址: https://web.txcs.tmall.com/login


import xbot
from xbot import print, sleep
from .import package
from .package import variables as glv
import re, json, time

def check_timeout(wait_errors=[], ignore_errors=[], default_res=None):
    """
    判断函数是否超时(修饰类方法)
    :param wait_errors: 需要等待的异常列表
    :param ignore_errors: 超时情况下,忽略的异常列表
    :param default_res: 超时情况下,默认的返回值
    :return:
    """

    def wrapper_1(func):

        def wrapper_2(self, *args, **kwargs):
            if kwargs.get('timeout') == -1: return func(self, *args, **kwargs)

            max_ts, st_ts = int(kwargs.get('timeout') or 10), time.time()
            while True:
                try:
                    # 执行被装饰的函数
                    return func(self, *args, **kwargs)
                except Exception as e:
                    err_cate = str(e.args[0]).split(':')[0]
                    if err_cate in wait_errors:
                        if (time.time() - st_ts) < max_ts: continue     # 判断是否超时
                        if err_cate in ignore_errors: return default_res   # 判断是否返回预期
                    raise

        return wrapper_2

    return wrapper_1

class SuperXpath:

    def __init__(self, elem, debug=False):
        """
        初始化Xpath查找
        :param elem: 起始元素
        :param debug: 是否调试输出
        :return: 
        """
        self.debug, self.elem = debug, elem

    # 切换Iframe域
    def _switch_iframe(self, curr_elem, timeout):
        """
        切换Iframe域
        :param curr_elem: 当前元素
        :return: 切换后的对象
        """
        # 判断是否是Iframe节点,通过父节点中转到Iframe内部
        iframe_type = re.match(r'^<(i??frame)', curr_elem.get_html())
        if iframe_type: 
            pre_elem = curr_elem.parent()
            selector_option = {
                "name": 'xelem', "type": "simple", "path": [
                    {"name": iframe_type[1], "type": "web", "attributes": []},
                    {"name": "html", "type": "web", "attributes": []}
                ]
            }
            try:
                # 加入对Iframe Src属性的判断
                iframe_src = curr_elem.get_attribute('src')
                if iframe_src: selector_option['path'][0]['attributes'].append({'name': 'src', 'value': iframe_src, 'required': True, 'operator': 'Equal'})
                # 使用新的元素选择器定位Iframe元素
                curr_elem = pre_elem.find(xbot.selector.Selector(selector_option), timeout=timeout)
                if self.debug: print(f"跨域Iframe节点,已自动切换至Iframe域...")
            except Exception as ex:
                if self.debug: print(f"非跨域Iframe节点,跳过切换...")
        return curr_elem
    
    # 通过Xpath查找网页元素
    def _find_by_xpath(self, xpath_str, curr_elem=None, timeout=1):
        """
        通过Xpath查找网页元素
        :param xpath_str: Xpath文本
        :param curr_elem: 当前查找的元素
        :return: 元素集合
        """
        # 执行Xpath,递归Xpath,当前元素
        exec_xpath, inner_xpath, curr_elem = xpath_str, '', (curr_elem or self.elem)
        # 对Xpath按照Iframe进行拆分
        iframe_matchs = (re.findall(r'(\/+iframe|\/\/.*?::iframe)(\[.*?\])?', xpath_str) or [('', '')])[0]
        iframe_match_text = ''.join(_ or '' for _ in iframe_matchs)
        if iframe_match_text:
            split_index = xpath_str.find(iframe_match_text) + len(iframe_match_text)
            exec_xpath, inner_xpath = xpath_str[:split_index], xpath_str[split_index:]
        # 调试输出
        if self.debug: print("当前执行:", exec_xpath, "\t递归Xpath:", inner_xpath or '无')
        # 处理跨域Iframe查询的问题
        iframe_elem = self._switch_iframe(curr_elem, timeout)
        
        elements = iframe_elem.find_all_by_xpath(exec_xpath, timeout=timeout)
        
        res_elems = []
        for element in elements:
            if self.debug: print(f"查找成功:{element.get_html()}")
            if inner_xpath:
                res_elems += self._find_by_xpath(inner_xpath, element)
            else:
                res_elems.append(element)
        return res_elems
    
    # 通过Xpath查找多个元素
    @check_timeout(wait_errors=['未找到元素'], ignore_errors=['未找到元素'], default_res=[])
    def find_all(self, xpath_str, timeout=10):
        """
        通过Xpath查找多个元素
        :param xpath_str: Xpath文本
        :param timeout: 超时时间
        :return: 元素集合
        """
        res_elems = self._find_by_xpath(xpath_str)   # 查找元素
        assert res_elems, f"未找到元素:{xpath_str}"
        return res_elems

    # 通过Xpath查找单个元素
    @check_timeout(wait_errors=['未找到元素'])
    def find(self, xpath_str, timeout=10):
        """
        通过Xpath查找单个元素
        :param xpath_str: Xpath文本
        :param timeout: 超时时间
        :return: 元素对象
        """
        res_elems = self._find_by_xpath(xpath_str)   # 查找元素
        assert res_elems, f"未找到元素:{xpath_str}"
        assert len(res_elems) == 1, f"匹配到多个元素:{xpath_str}"
        return res_elems[0]

def main(args):
    # 以获取天猫超市登录页的淘宝登录滑块为例
    web_page = xbot.web.get(url='https://web.txcs.tmall.com*', mode='chrome', use_wildcard=True)
    # 分段查找
    login_iframe = SuperXpath(web_page).find('//iframe[contains(@src,"https://login.taobao.com/member/login.jhtml")]', timeout=3)
    print("登录Iframe匹配成功:", login_iframe.get_html())
    slide_iframe = SuperXpath(login_iframe).find('//iframe[@id="baxia-dialog-content"]', timeout=3)
    print(f"滑块Iframe匹配成功:", slide_iframe.get_html())
    slide_btn = SuperXpath(slide_iframe).find('//span[@id="nc_1_n1z"]', timeout=3)
    print(f"滑块元素匹配成功:", slide_btn.get_html())
    # 组合查找
    super_slide_btn = SuperXpath(web_page).find('//iframe[contains(@src,"https://login.taobao.com/member/login.jhtml")]//iframe[@id="baxia-dialog-content"]//*[@id="nc_1_n1z"]', timeout=3)
    print("直接匹配滑块:", super_slide_btn.get_html())


收藏9
全部回答1
最新
发布回答
回答