各大平台直播弹幕抓取 ---- by.广州组
评论
收藏

各大平台直播弹幕抓取 ---- by.广州组

经验分享
驿站
2023-08-07 20:17·浏览量:3094
驿站
发布于 2023-08-07 20:14更新于 2023-08-07 20:173094浏览

今天来抓一下直播弹幕吧~


〇、实现效果

这里用抖音直播演示,其他平台的直播也是可以的



一、实现原理

1.直播弹幕的新增会带动网页元素变化,利用 Javascript 的MutationObserver来监听页面元素的变化,就可以获取到每次新增的弹幕,下面是MutationObserver的简单示例代码

var observer = new MutationObserver(function (mutations) {
    mutations.forEach(function (mutation) {
        let nodes = mutation.addedNodes
        for (let i = 0; i < nodes.length; i++) {
            // 循环打印所有新增元素
            console.log(nodes[i].innerText)
        }
        // 其他处理逻辑...
    });
});

// 配置 Mutation Observer  抓取直播弹幕我们只需要监听子元素和后代元素变化即可
var options = {
    attributes: false, // 监听属性变化
    childList: true, // 监听子元素变化
    characterData: false, // 监听文本内容变化
    subtree: true, // 监听后代元素变化
    attributeOldValue: false, // 记录属性变化前的值
    characterDataOldValue: false // 记录文本内容变化前的值
};

// 启动 Mutation Observer 并传入目标元素和配置
observer.observe(targetElement, options);

2.将每次抓取的数据用链表存储,然后在影刀无线循环读取这个链表的头部数据即可

3.然后再处理一点点细节就可以愉快的抓弹幕啦~

二、快速实现

1.创建代码模块,命名为 js.py


init_code_js = '''
function main(element, input) {
// log = console.log
log = () => {}
$x = (xpath, parent_node) => {
if (parent_node == null) {
parent_node = document
}
let xpathResult = document.evaluate(xpath, parent_node, null, XPathResult.ORDERED_NODE_ITERATOR_TYPE, null);
let nodes = []
let node = null
while (node = xpathResult.iterateNext()) {
nodes.push(node)
}
return nodes
}

try {
observer.disconnect();
} catch (e) {
}

/**
* 查找公共父元素
* @param elements
* @returns
*/
function findCommonParent(elements) {

if (elements.length === 0) {
return document.querySelector('html');
}
let commonParent = elements[0].parentNode;
for (let i = 1; i < elements.length; i++) {
let currentElement = elements[i];
// 如果当前元素不是公共父元素的子元素,则更新公共父元素为当前元素的父元素
while (!commonParent.contains(currentElement) && currentElement.parentNode !== null) {
commonParent = commonParent.parentNode;
}
}

return commonParent;
}

class Node {
constructor(data) {
this.data = data;
this.next = null;
}
}

class LinkedList {
constructor() {
this.head = null;
this.tail = null;
}

// 从末尾添加数据
append(data) {
log(data)
let newNode = new Node(data);
if (!this.head) {
this.head = newNode;
this.tail = newNode;
} else {
this.tail.next = newNode;
this.tail = newNode;
}
}

// 从头部获取数据并删除节点
getAndRemoveHead() {
if (!this.head) {
return null;
}

let data = this.head.data;
this.head = this.head.next;

if (!this.head) {
this.tail = null;
}
return data;
}
}

/**
* 启动监听
* @param root
*/
function run(root) {
let options = {
attributes: false,
childList: true,
subtree: true,
characterData: false,
characterDataOldValue: false
};

function getData(mutationsList, observer) {
for (let item of mutationsList) {
log(item)
let nodes = item.addedNodes
for (let i = 0; i < nodes.length; i++) {
let node = nodes[i]
if (input == null) {
dmLinked.append(node.innerText);
} else {
let childElements = $x(input, document)

childElements = Array.from(childElements).filter(function (element) {
return node.contains(element) || element === node
});
if (childElements.length !== 0) {
let text = childElements[0].innerText
dmLinked.append(text);
}

}

}
}
}

observer = new MutationObserver(getData);
observer.observe(root, options);
}

dmLinked = new LinkedList();
if (element != null) {
run(element)
} else {
let commonParent = findCommonParent($x(input, document))
run(commonParent)
}
}

'''

get_data_js = '''
function main(element, input) {
return dmLinked.getAndRemoveHead()
}
'''



2.创建代码模块,命名为 _core.py


import xbot
from xbot import print, sleep
from . import package
from .package import variables as glv
from .js import *

print = [lambda *_, **__: None, print][__package__ == 'xbot_robot']

class DanmuStorage:
def __init__(self, web_page: xbot.web.WebBrowser, dm_selector, dm_xpath=None):
self.web_page = web_page
self.dm_selector = dm_selector
self.element = None
if isinstance(dm_selector, xbot.selector.Selector):
self.element = web_page.find(dm_selector, timeout=5)
self.dm_xpath = dm_xpath

self.executor = self.web_page if self.element is None else self.element
self.run()

def run(self):
self.executor.execute_javascript(init_code_js, self.dm_xpath)

def get_one_dm(self, is_wait):

dm = self.executor.execute_javascript(get_data_js)
if is_wait == 0:
return dm

while dm is None:
sleep(1)
dm = self.executor.execute_javascript(get_data_js)
return dm



3.创建代码模块,命名为 api.py。根据api中的函数,就可以自己封装成指令,分别调用这两个函数即可


from ._core import *

danmu_storage_map = {}


def init_danmu_storage(web_page, dm_selector, dm_xpath=None):
global danmu_storage_map
danmu_storage = DanmuStorage(web_page=web_page, dm_selector=dm_selector, dm_xpath=dm_xpath)
danmu_storage_map[web_page.bid] = danmu_storage
sleep(1)


def get_one_dm(web_page, is_wait):
assert web_page.bid in danmu_storage_map.keys(), "弹幕未初始化~"
dm = danmu_storage_map[web_page.bid].get_one_dm(int(is_wait))
return dm






4.示例
代码调用


from api import init_danmu_storage, get_one_dm


def main(args):
    # 获取网页对象
    web_page = xbot.web.get(url='*', mode='edge', use_wildcard=True)
    dm_selector = package.selector('直播弹幕块')
    danmu_storage = init_danmu_storage(web_page, dm_selector=dm_selector)

    # danmu_storage = init_danmu_storage(web_page, dm_xpath=dm_xpath)  # XPath的方式
    while True:
        dm = get_one_dm(web_page, is_wait=1)
        if dm is None:
            sleep(0.1)
            continue
        print(dm)


封装成指令调用


收藏9
全部评论1
最新
发布评论
评论