

今天来抓一下直播弹幕吧~
这里用抖音直播演示,其他平台的直播也是可以的

var observer = new MutationObserver(function (mutations) {
mutations.forEach(function (mutation) {
let nodes = mutation.addedNodes
for (let i = 0; i < nodes.length; i++) {
// 循环打印所有新增元素
console.log(nodes[i].innerText)
}
// 其他处理逻辑...
});
});
// 配置 Mutation Observer 抓取直播弹幕我们只需要监听子元素和后代元素变化即可
var options = {
attributes: false, // 监听属性变化
childList: true, // 监听子元素变化
characterData: false, // 监听文本内容变化
subtree: true, // 监听后代元素变化
attributeOldValue: false, // 记录属性变化前的值
characterDataOldValue: false // 记录文本内容变化前的值
};
// 启动 Mutation Observer 并传入目标元素和配置
observer.observe(targetElement, options);
2.将每次抓取的数据用链表存储,然后在影刀无线循环读取这个链表的头部数据即可
3.然后再处理一点点细节就可以愉快的抓弹幕啦~
init_code_js = '''
function main(element, input) {
// log = console.log
log = () => {}
$x = (xpath, parent_node) => {
if (parent_node == null) {
parent_node = document
}
let xpathResult = document.evaluate(xpath, parent_node, null, XPathResult.ORDERED_NODE_ITERATOR_TYPE, null);
let nodes = []
let node = null
while (node = xpathResult.iterateNext()) {
nodes.push(node)
}
return nodes
}
try {
observer.disconnect();
} catch (e) {
}
/**
* 查找公共父元素
* @param elements
* @returns
*/
function findCommonParent(elements) {
if (elements.length === 0) {
return document.querySelector('html');
}
let commonParent = elements[0].parentNode;
for (let i = 1; i < elements.length; i++) {
let currentElement = elements[i];
// 如果当前元素不是公共父元素的子元素,则更新公共父元素为当前元素的父元素
while (!commonParent.contains(currentElement) && currentElement.parentNode !== null) {
commonParent = commonParent.parentNode;
}
}
return commonParent;
}
class Node {
constructor(data) {
this.data = data;
this.next = null;
}
}
class LinkedList {
constructor() {
this.head = null;
this.tail = null;
}
// 从末尾添加数据
append(data) {
log(data)
let newNode = new Node(data);
if (!this.head) {
this.head = newNode;
this.tail = newNode;
} else {
this.tail.next = newNode;
this.tail = newNode;
}
}
// 从头部获取数据并删除节点
getAndRemoveHead() {
if (!this.head) {
return null;
}
let data = this.head.data;
this.head = this.head.next;
if (!this.head) {
this.tail = null;
}
return data;
}
}
/**
* 启动监听
* @param root
*/
function run(root) {
let options = {
attributes: false,
childList: true,
subtree: true,
characterData: false,
characterDataOldValue: false
};
function getData(mutationsList, observer) {
for (let item of mutationsList) {
log(item)
let nodes = item.addedNodes
for (let i = 0; i < nodes.length; i++) {
let node = nodes[i]
if (input == null) {
dmLinked.append(node.innerText);
} else {
let childElements = $x(input, document)
childElements = Array.from(childElements).filter(function (element) {
return node.contains(element) || element === node
});
if (childElements.length !== 0) {
let text = childElements[0].innerText
dmLinked.append(text);
}
}
}
}
}
observer = new MutationObserver(getData);
observer.observe(root, options);
}
dmLinked = new LinkedList();
if (element != null) {
run(element)
} else {
let commonParent = findCommonParent($x(input, document))
run(commonParent)
}
}
'''
get_data_js = '''
function main(element, input) {
return dmLinked.getAndRemoveHead()
}
'''
import xbot
from xbot import print, sleep
from . import package
from .package import variables as glv
from .js import *
print = [lambda *_, **__: None, print][__package__ == 'xbot_robot']
class DanmuStorage:
def __init__(self, web_page: xbot.web.WebBrowser, dm_selector, dm_xpath=None):
self.web_page = web_page
self.dm_selector = dm_selector
self.element = None
if isinstance(dm_selector, xbot.selector.Selector):
self.element = web_page.find(dm_selector, timeout=5)
self.dm_xpath = dm_xpath
self.executor = self.web_page if self.element is None else self.element
self.run()
def run(self):
self.executor.execute_javascript(init_code_js, self.dm_xpath)
def get_one_dm(self, is_wait):
dm = self.executor.execute_javascript(get_data_js)
if is_wait == 0:
return dm
while dm is None:
sleep(1)
dm = self.executor.execute_javascript(get_data_js)
return dm
from ._core import *
danmu_storage_map = {}
def init_danmu_storage(web_page, dm_selector, dm_xpath=None):
global danmu_storage_map
danmu_storage = DanmuStorage(web_page=web_page, dm_selector=dm_selector, dm_xpath=dm_xpath)
danmu_storage_map[web_page.bid] = danmu_storage
sleep(1)
def get_one_dm(web_page, is_wait):
assert web_page.bid in danmu_storage_map.keys(), "弹幕未初始化~"
dm = danmu_storage_map[web_page.bid].get_one_dm(int(is_wait))
return dm
4.示例
代码调用
from api import init_danmu_storage, get_one_dm
def main(args):
# 获取网页对象
web_page = xbot.web.get(url='*', mode='edge', use_wildcard=True)
dm_selector = package.selector('直播弹幕块')
danmu_storage = init_danmu_storage(web_page, dm_selector=dm_selector)
# danmu_storage = init_danmu_storage(web_page, dm_xpath=dm_xpath) # XPath的方式
while True:
dm = get_one_dm(web_page, is_wait=1)
if dm is None:
sleep(0.1)
continue
print(dm)
