【爬虫分享】某音直播弹幕获取
本文最后更新于162 天前,其中的信息可能已经过时,如有错误请发送邮件到lysun26@163.com

前言

参考文章: https://www.52pojie.cn/thread-1737925-1-1.html

视频地址: https://www.bilibili.com/video/BV1hM411w7L3

参考文章对应的 github 代码地址:https://github.com/Prince-cool/dy_protobuf

本文章中所有内容仅供学习交流,若有侵权,请联系我立即删除

由于我在学习上面的这篇文章时,发现抖音的代码与这篇文章中的代码不一样了(但是流程仍然是一样的)。正好我也借此机会,自己按照这篇文章的思路分析了一下,在这个过程中学到了很多,特此记录一下。

目标地址:某音随便一个直播间

websocket

爬取弹幕之前,我们首先要知道工作流程。直播弹幕是实时通信,实时通信中,目前主流的协议是 websocket,具体可以参考 https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket 。我们需要知道的是,它有四个事件和两个方法,两个方法是 send 和 close,四个事件分别是 open, message, error, close,其中,最关键的是 open 和 message。open 事件就是初始化事件,当 websocket 连接刚刚建立后,将会进入到 open 事件中,在 open 事件中完成一些初始化的操作。由于 websocket 是实时通信,因此会涉及到客户端和服务器之间的双向通信,也就是客户端向服务器发数据,服务器也可以向客户端发数据。其中,客户端向服务器发数据是通过 websocket 的 send 方法实现的,服务器向客户端发数据,其实就是响应信息,当服务器响应数据时,会触发 message 事件,然后我们就可以在 message 事件中,处理服务器响应的数据。

该平台直播弹幕就是采用的 websocket 协议。根据上面的分析,不难知道,其工作流程就是:

  1. 建立 websocket 连接
  2. 我们向服务器发送一些初始化的信息,在 open 事件中发送
  3. 服务器响应数据,我们在 message 事件中,得到响应数据
  4. 对响应数据进行解析,得到弹幕等信息

其中,我们需要尤其关注:

  1. 在 open 事件中发出了什么信息,经过了什么处理
  2. 发送数据和响应数据的格式是什么
  3. 响应数据是如何解析的,中间有没有什么加密等

这也是本次爬虫的关键。

onOpen

打开一个直播间,打开开发者工具,开始抓包。

在上方选择 WS,就可以显示 websocket 的包了。在出现的第一个包中,打开 Initiator,可以看到调用堆栈,我们点击第一个。

进来之后,打下断点,刷新页面,可以发现程序断了下来。

通过查询 websocket 的介绍,我们知道 new WebSocket(n) 就是建立一个 ws 的连接,url 便是 n。然后,我们可以在下面看到 onOpen, onMessage 等方法。我们往下找,找到 onOpen 所在位置,打下断点

这里就有着 onOpen 和 onMessage 两个函数,这两个也是我们后续要重点分析的。我们运行程序,会发现进入到了 onOpen 中。代码如下:

this.client.onOpen(()=>{
    ef("socket established"),
    this.emitter.emit("start-driver", "websocket"),
    t(),
    this.pingStarted = !0,
    this.ping()
}

显然,上面四行都是传递一些信号是否建立等相关的信息,并不是我们关心的地方,我们关心的是发送的数据是什么,要找到发送的数据。所以,重点是这个 ping 函数。我们进入到 ping 函数里面。

这时,我们看到了很关键的函数,就是 send。因此,我们推测,后面的 this.transport.ping() 函数的返回值就是客户端向服务器发送的数据。我们跟进来

发现调用了 encode 函数,传递了两个参数,分别是 {payload_type:"hb"}"PushFrame"。继续跟进去,

发现调用了 _encode,好家伙,一直套娃 hhh。继续跟

观察代码,猜测 this.getType(t) 是获取了数据类型,而 n.encode(e) 猜测是对 e 进行编码,返回的便是编码后的数据。这时,就基本上有了结论。"PushFrame" 应该是 e 的编码类型,我们跟进去

这时,就可以看到很多信息了。首先,"PushFrame" 决定了用 e.PushFrame 这个对象进行编码,然后我们可以看到 encodedecode 两个方法,对应着编码和解码。我们先来看解码,因为解码的结果形式是利于我们观看的。

我们可以发现,出来了从 1 开始的连续整数,并且类型有 string, int32, bytes 等类型,可以联想到 protobuf。protobuf 的知识可以参考开篇文章中的大佬的另一篇文章: https://www.52pojie.cn/thread-1735975-1-1.html ,我就简单说明一下,因为我也是刚了解几天。关于 protobuf,其实就可以把它当成一种数据结构,我们可以类比于字典,按照它定义的语法,定义一些变量,然后 protobuf 会提供一些序列化和反序列的方法,将其转换成一个字节序列,这样的话,就方便我们传输。而当我们收到了一个字节序列,如果知道它对应的 protobuf 的格式,那么直接调用其反序列化的方法,就可以直接得到原始的形式,方便我们查看。

我们打开 Network 面板,选择 WS,可以看到有一个包,打开 Messages,发现什么都没有,因为这个时候还在编码阶段,并没有发送 send 出去。

到这里,我们就把这个 ping 函数分析的差不多了。这里总结一下:

  1. 原始数据: {payload_type:"hb"}
  2. 方式:protobuf
  3. 对象:e.PushFrame

接下来,我们就可以验证一下是否正确。再点击一下运行,

断在了 send 方法这里,而上面的 e 便是 {payload_type:"hb"} 编码后的数据。我们再运行一下,让其发送出去,打开 Network,便可以看到有了消息。

其中,蓝色向上的箭头表示客户端发送给服务器的数据,红色向下的箭头表示服务器发送给客户端的数据,第一条便是刚才我们发送的数据,并且服务器也做出了响应,这个响应数据后续再说,我们先来验证一下这个发送的数据。

根据 protobuf 的知识,我们先根据 decode 中部分,还原出 proto 代码,如下:

syntax = "proto3";  
message pushproto_PushHeader{  
    string key=1;  
    string value=2;  
}  
message pushproto_PushFrame{  
    uint64 seqid=1;  
    uint64 logid=2;  
    uint64 service=3;  
    uint64 method=4;  
    repeated pushproto_PushHeader headers=5;  
    string payloadencoding=6;  
    string payload_type=7;  
    bytes payload=8;  
}

然后我们编译成 python 文件,接下来实例化之后,我们将 payload_type 的值设为"hb",然后将其序列化,与 Network 中响应的数据对比。代码如下:

import test_pb2 as pb  

pushframe = pb.pushproto_PushFrame()  
pushframe.payload_type = "hb"  
res = pushframe.SerializeToString()  
print(res.hex())

为了便于对比,我们将结果转为 hex 类型,输出如下:

浏览器结果为:

发现一致,说明之前的分析没问题。至此,onOpen 已经解决了。接下来便是处理响应数据了。

onMessage

数据会在 onMessage 处收到,我们到对应的函数处下断点,如下:

我们跟进去

这个便是核心代码了。我们仍旧关心关键函数,接下来逐渐分析。

Transport.decode

首先,前三行只是一些复制操作,再往下就看到了一个关键函数,我们跟到这里,可以猜到 e.data 就是响应的数据。我们可以去 Network 里,可以看到有一条响应的数据,查看后可以发现是一样的。

显然这个 decode 函数,就是对这个响应数据进行解析还原。我们跟进去

首先,我们一直往下跟,发现运行到了 _decodeFrameOrResponse,并没有进入到上面的 return 那里。可以猜测,上面这个是判断是否是响应数据,如果是的话,就用 _decodeFrameOrResponse 解析,否则就用上面的函数解析。

t = this._decode(e, "PushFrame") 与之前分析的一样,将响应数据用 PushFrame 对象进行解析。这时我们可以从 Network 那里,复制响应数据,然后我们自己解析一下,对比结果。这里,我推荐复制 base64 的形式,方便操作。

代码如下(为了避免代码太长,这里 data 里我就没把数据放上去):

import base64  
data = 'xxxxxxxxx'  
data_bytes = base64.b64decode(data)  
res2 = pb.pushproto_PushFrame()  
res2.ParseFromString(data_bytes)  
print(res2)

输出结果为:

对比

可以发现是一致的,说明没问题。

然后是 n = yield this._extractResponse(t); ,t 是解析后的数据,从这个名字上看,可以猜测这个是从解析的数据中提取处一部分。跟进去,代码如下:

let r = null == (n = t.headers) ? void 0 :  n.some(e=>"compress_type" === e.key && "gzip" === e.value)
      , i = r ? yield e.unGzip(t.payload) : t.payload;
    return i

首先看这个条件语句,大致意思就是当 e 的 compress_type 为 gzip 时,会执行 e.unGzip(t.payload),否则则执行 t.payload。从这个名字就可以看出,是执行解压操作。t.payload 显然就是取出 payload 对应的数据。接下来分析这个 unGzip 函数。跟进去

可以看到调用了 ungzip,继续跟进去,来到了这个文件

这是一个 webpack 打包,但是这个比较简单,大括号里只有一个序号 29507,所以只需要把开头结尾去掉,就可以。开头只需要删掉 a. 开头的内容即可,后面还需要添加 window=global。然后我们在 Python 中调用 e7 函数,会发现报错,这主要是编码问题。这里提供一种处理方法,我们在 python 中,将数据转换成 base64 后传入到 js 文件中,然后在 js 中,将其转换为 bytes 类型,然后再转成 base64 传回 python,代码如下:

function arrayBufferToBase64(array) {  
    array = new Uint8Array(array);  
    var length = array.byteLength;  
    var table = [  
        "A",  
        "B",  
        "C",  
        "D",  
        "E",  
        "F",  
        "G",  
        "H",  
        "I",  
        "J",  
        "K",  
        "L",  
        "M",  
        "N",  
        "O",  
        "P",  
        "Q",  
        "R",  
        "S",  
        "T",  
        "U",  
        "V",  
        "W",  
        "X",  
        "Y",  
        "Z",  
        "a",  
        "b",  
        "c",  
        "d",  
        "e",  
        "f",  
        "g",  
        "h",  
        "i",  
        "j",  
        "k",  
        "l",  
        "m",  
        "n",  
        "o",  
        "p",  
        "q",  
        "r",  
        "s",  
        "t",  
        "u",  
        "v",  
        "w",  
        "x",  
        "y",  
        "z",  
        "0",  
        "1",  
        "2",  
        "3",  
        "4",  
        "5",  
        "6",  
        "7",  
        "8",  
        "9",  
        "+",  
        "/",  
    ];  
    var base64Str = "";  
    for (var i = 0; length - i >= 3; i += 3) {  
        var num1 = array[i];  
        var num2 = array[i + 1];  
        var num3 = array[i + 2];  
        base64Str +=  
            table[num1 >>> 2] +  
            table[((num1 & 0b11) << 4) | (num2 >>> 4)] +  
            table[((num2 & 0b1111) << 2) | (num3 >>> 6)] +  
            table[num3 & 0b111111];  
    }  
    var lastByte = length - i;  
    if (lastByte === 1) {  
        var lastNum1 = array[i];  
        base64Str +=  
            table[lastNum1 >>> 2] + table[(lastNum1 & 0b11) << 4] + "==";  
    } else if (lastByte === 2) {  
        var lastNum1 = array[i];  
        var lastNum2 = array[i + 1];  
        base64Str +=  
            table[lastNum1 >>> 2] +  
            table[((lastNum1 & 0b11) << 4) | (lastNum2 >>> 4)] +  
            table[(lastNum2 & 0b1111) << 2] +  
            "=";  
    }  
    return base64Str;  
}  

function base64ToUint8Array(base64String) {  
    let padding = "=".repeat((4 - (base64String.length % 4)) % 4);  
    let base64 = (base64String + padding)  
        .replace(/\-/g, "+")  
        .replace(/_/g, "/");  

    let rawData = window.atob(base64);  
    let outputArray = new Uint8Array(rawData.length);  

    for (var i = 0; i < rawData.length; ++i) {  
        outputArray[i] = rawData.charCodeAt(i);  
    }  
    return outputArray;  
}  

function get_data(in_data) {  
    in_data = base64ToUint8Array(in_data);  
    data = e7(in_data);  
    data = arrayBufferToBase64(data);  
    return data;  
}

然后将之前的 payload 数据,用这个函数处理一下,得到解压后的数据。代码如下:

# 响应数据测试  
import base64  
data = 'xxxxxxxx'  
data_bytes = base64.b64decode(data)  
res2 = pb.pushproto_PushFrame()  
res2.ParseFromString(data_bytes)  

# 取出payload  
payload = res2.payload  

# 解压  
import execjs  
js = execjs.compile(open('1.js', 'r', encoding='utf-8').read())  
payload_unzip = js.call('get_data', base64.b64encode(payload).decode())  
payload_unzip = base64.b64decode(payload_unzip)  
print(payload_unzip)

接下来就是 return [this._decode(n, "Response"), t]

返回了两个值,t 就是之前解析的 pushframe 的数据,n 就是上面解压后的数据,根据前面的分析,这个"response"对应的就是 e.Respond。我们打断点跟进去

转成 proto 文件

message webcast_im_Response{  
    repeated webcast_im_Message messages=1;  
    string cursor=2;  
    int64 fetchinterval=3;  
    int64 now=4;  
    string internalext=5;  
    int32 fetchtype=6;  
    int64 heartbeatduration=8;  
    bool needack=9;  
    string pushserver=10;  
    string livecursor=11;  
    bool historynomore=12;  
}  
message webcast_im_Message{  
     string method=1;  
     bytes payload=2;  
     int64 msgid=3;  
     int32 msgtype=4;  
     int64 offset=5;  
     bool needwrdsstore=6;  
     int64 wrdsversion=7;  
     string wrdssubkey=8;  
}

然后我们将解压后的数据用这个解析,得到了正确结果。代码示例:

res3 = pb.webcast_im_Response()  
res3.ParseFromString(payload_unzip)  
print(res3)

至此,这部分分析就完成了。总结一下

  1. 对响应数据解析成 pushframe
  2. 取出 payload,执行 unGzip 函数解压
  3. 对解压后的数据解析成 response

ack 处理

let {response: u, frame: l, cursor: c, needAck: h, internalExt: f} = s;

u 就是上面解析的数据,l 是原始响应的数据,h 和 f 分别就是 response 中的两个字段,取出对应字段的数据。

if (h) {
    let e = yield this.transport.ack(l, u);
    this.cursor.set(c, f),
    ef("sending ack");
    let t = null == (n = this.client) ? void 0 : n.send(e);
    t || (this.emitter.emit("error", "socket-driver", Error(`socket already close [logid: ${null != (r = null == l ? void 0 : l.LogID) ? r : ""}]`)),
    this.pingStarted = !1,
    this.emitter.emit("downgrade", "polling"))
}

判断语句,当 needack 等于 True 的时候,将会执行里面的语句。其中,关键函数就是 this.transport.ack(l,u)。然后下面通过 send 函数,将其发送出去。跟进去

这个也是 PushFrame。其中,payload 是通过一个函数加载出来的,u 是上面解析数据中 internalExt 字段对应的值。我们跟进 s 函数

这个没什么特殊的,我们直接抠出来,再加一个编码

function s(e) {  
    let t = [];  
    for (let n of e) {  
        let e = n.charCodeAt(0);  
        e < 128  
            ? t.push(e)  
            : e < 2048  
            ? (t.push(192 + (e >> 6)), t.push(128 + (63 & e)))  
            : e < 65536 &&  
              (t.push(224 + (e >> 12)),  
              t.push(128 + ((e >> 6) & 63)),  
              t.push(128 + (63 & e)));  
    }  
    return new Uint8Array(t);  
}  

function get_ack_payload(e) {  
    data = s(e);  
    return arrayBufferToBase64(data);  
}

然后我们构造出数据,用 PushFrame 序列化即可。

至此,ack 的处理也完成了,前面的懂了,这个就很容易理解了。

emit

if ((null == l ? void 0 : l.payload_type) === a.AG.Msg && (this.emitter.emit("message", u),
this.emitter.emit("header", "socket", L(null != (i = l.headers) ? i : []))),
(null == l ? void 0 : l.payload_type) === a.AG.Close)
    return this.emitter.emit("stop-driver", "websocket"),
    null == (o = this.client) || o.socket.close(),
    t(Error("close by payloadtype"))

前面是一些判断语句,核心函数是 this.emitter.emit("message", u) 。我们猜测在这个函数里,完成了对上面 Transport.decode 得到的数据解析,得到了弹幕数据。

在上面解析数据中,可以看到有很多个 messages。而这里就有一个 forEach 函数,推测是分别对每一个 message 进行解析。我们跟进去,发现确实是这样

函数是 e(...t),跟进去

继续跟

这里没有明显的那种函数,这个函数上面也有关键词 Promise,是异步程序,因为我对这个了解不多,所以就采取了两个比较笨的方法,方法一就是直接多次单步,当执行到解析的程序时,自然会跟进去。在跟了好几次之后,终于跟到了关键位置。另一个方法就是运行几次,然后查看堆栈,如下:

然后我们把断点打下,运行到这里

到这里是不是就很熟悉了,payload 就是一串字节数据,method 就是对应的 proto 的类,传入了 decode 方法。

到了我们熟悉的 decode 函数。仿照之前的方法,进入到了这里

到了这里,就是转 proto 文件的事情。

如果一个个写的话,其实是很麻烦的,因为写的时候会发现嵌套的内容还是比较多的。在开头提到的那篇大佬的文章中,他是使用的 AST 从代码中提取的,但由于现在的代码结构与他的不一样,所以没法直接用他的代码。我现在也在学用 AST 提取内容,只不过 protobuf 文件的内容并没变,所以我就直接拿了他的文件用了。到此,本文内容就结束了。接下来再简单梳理总结一下流程,然后按照这个流程写代码就可以了

总结

  1. 建立 websocket 连接
  2. {payload_type:"hb"} 序列化后,通过 send 方法发送
  3. 服务器会在 onMessage 方法响应数据,我们对响应数据用 PushFrame 解析
  4. 取出 payload 和 compress_type,如果 compress_type 为 gzip,就对 payload 进行解压
  5. 用 Response 解析解压后的数据
  6. 取出 needack, internalext, logid, messages 等数据
  7. 如果 needack 为 true,说明需要发送一个 ack 的包,我们将 {payload_type: "ack", 'payload':s(internalext), LogID:logid} 用 PushFrame 序列化后发送出去,其中,这个中间涉及到了一个 s 函数,我们需要将其 js 源码抠出来,对 internalext 进行处理
  8. 处理 messages 数据,将其解析为弹幕等数据。messages 对应的 protobuf 类型为解压后的数据的 method 的值,然后我们用相应的 protobuf 对象进行解析即可。

最终代码可以去看那位大佬的,流程是一样的,我只是重新分析了一遍,我就不提供了。

有问题可以留言哦~ 觉得有帮助也可以投喂一下博主,感谢~
文章链接:https://www.corrain.top/mouyin-live-danmu/
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明文章地址及作者

评论

  1. 我是作者
    5 月前
    2023-11-19 13:24:07

    你这个博客加了什么特效,都卡成dog了兄弟。不过文章写的不错,哈哈,很有价值。

    • 博主
      我是作者
      5 月前
      2023-11-19 13:44:55

      刚换了个服务器,不知道为啥变得现在这么卡了,最近我也在看看有没有啥加速的方法

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
( ゜- ゜)つロ
_(:з」∠)_
(⌒▽⌒)
( ̄▽ ̄)
(=・ω・=)
(*°▽°*)八(*°▽°*)♪
✿ヽ(°▽°)ノ✿
(¦3【▓▓】
눈_눈
(ಡωಡ)
_(≧∇≦」∠)_
━━━∑(゚□゚*川━
(`・ω・´)
( ̄3 ̄)
✧(≖ ◡ ≖✿)
(・∀・)
(〜 ̄△ ̄)〜
→_→
(°∀°)ノ
╮( ̄▽ ̄)╭
( ´_ゝ`)
←_←
(;¬_¬)
(゚Д゚≡゚д゚)!?
( ´・・)ノ(._.`)
Σ(゚д゚;)
Σ(  ̄□ ̄||)<
(´;ω;`)
(/TДT)/
(^・ω・^)
(。・ω・。)
(● ̄(エ) ̄●)
ε=ε=(ノ≧∇≦)ノ
(´・_・`)
(-_-#)
( ̄へ ̄)
( ̄ε(# ̄) Σ
(╯°口°)╯(┴—┴
ヽ(`Д´)ノ
("▔□▔)/
(º﹃º )
(๑>؂<๑)
。゚(゚´Д`)゚。
(∂ω∂)
(┯_┯)
(・ω< )★
( ๑ˊ•̥▵•)੭₎₎
¥ㄟ(´・ᴗ・`)ノ¥
Σ_(꒪ཀ꒪」∠)_
٩(๛ ˘ ³˘)۶❤
(๑‾᷅^‾᷅๑)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
小黄脸
热词系列一
tv_小电视
上一篇
下一篇