前言
参考文章: https://www.52pojie.cn/thread-1737925-1-1.html
视频地址: https://www.bilibili.com/video/BV1hM411w7L3
参考文章对应的 github 代码地址:https://github.com/Prince-cool/dy_protobuf
本文章中所有内容仅供学习交流,若有侵权,请联系我立即删除
由于我在学习上面的这篇文章时,发现抖音的代码与这篇文章中的代码不一样了(但是流程仍然是一样的)。正好我也借此机会,自己按照这篇文章的思路分析了一下,在这个过程中学到了很多,特此记录一下。
目标地址:某音随便一个直播间
websocket
爬取弹幕之前,我们首先要知道工作流程。直播弹幕是实时通信,实时通信中,目前主流的协议是 websocket,具体可以参考 https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket 。我们需要知道的是,它有四个事件和两个方法,两个方法是 send 和 close,四个事件分别是 open, message, error, close,其中,最关键的是 open 和 message。open 事件就是初始化事件,当 websocket 连接刚刚建立后,将会进入到 open 事件中,在 open 事件中完成一些初始化的操作。由于 websocket 是实时通信,因此会涉及到客户端和服务器之间的双向通信,也就是客户端向服务器发数据,服务器也可以向客户端发数据。其中,客户端向服务器发数据是通过 websocket 的 send 方法实现的,服务器向客户端发数据,其实就是响应信息,当服务器响应数据时,会触发 message 事件,然后我们就可以在 message 事件中,处理服务器响应的数据。
该平台直播弹幕就是采用的 websocket 协议。根据上面的分析,不难知道,其工作流程就是:
- 建立 websocket 连接
- 我们向服务器发送一些初始化的信息,在 open 事件中发送
- 服务器响应数据,我们在 message 事件中,得到响应数据
- 对响应数据进行解析,得到弹幕等信息
其中,我们需要尤其关注:
- 在 open 事件中发出了什么信息,经过了什么处理
- 发送数据和响应数据的格式是什么
- 响应数据是如何解析的,中间有没有什么加密等
这也是本次爬虫的关键。
onOpen
打开一个直播间,打开开发者工具,开始抓包。

在上方选择 WS,就可以显示 websocket 的包了。在出现的第一个包中,打开 Initiator,可以看到调用堆栈,我们点击第一个。
进来之后,打下断点,刷新页面,可以发现程序断了下来。

通过查询 websocket 的介绍,我们知道 new WebSocket(n)
就是建立一个 ws 的连接,url 便是 n。然后,我们可以在下面看到 onOpen, onMessage 等方法。我们往下找,找到 onOpen 所在位置,打下断点

这里就有着 onOpen 和 onMessage 两个函数,这两个也是我们后续要重点分析的。我们运行程序,会发现进入到了 onOpen 中。代码如下:
this.client.onOpen(()=>{
ef("socket established"),
this.emitter.emit("start-driver", "websocket"),
t(),
this.pingStarted = !0,
this.ping()
}
显然,上面四行都是传递一些信号是否建立等相关的信息,并不是我们关心的地方,我们关心的是发送的数据是什么,要找到发送的数据。所以,重点是这个 ping 函数。我们进入到 ping 函数里面。

这时,我们看到了很关键的函数,就是 send。因此,我们推测,后面的 this.transport.ping()
函数的返回值就是客户端向服务器发送的数据。我们跟进来

发现调用了 encode
函数,传递了两个参数,分别是 {payload_type:"hb"}
和 "PushFrame"
。继续跟进去,

发现调用了 _encode
,好家伙,一直套娃 hhh。继续跟

观察代码,猜测 this.getType(t)
是获取了数据类型,而 n.encode(e)
猜测是对 e
进行编码,返回的便是编码后的数据。这时,就基本上有了结论。"PushFrame"
应该是 e
的编码类型,我们跟进去

这时,就可以看到很多信息了。首先,"PushFrame"
决定了用 e.PushFrame
这个对象进行编码,然后我们可以看到 encode
和 decode
两个方法,对应着编码和解码。我们先来看解码,因为解码的结果形式是利于我们观看的。

我们可以发现,出来了从 1 开始的连续整数,并且类型有 string, int32, bytes 等类型,可以联想到 protobuf。protobuf 的知识可以参考开篇文章中的大佬的另一篇文章: https://www.52pojie.cn/thread-1735975-1-1.html ,我就简单说明一下,因为我也是刚了解几天。关于 protobuf,其实就可以把它当成一种数据结构,我们可以类比于字典,按照它定义的语法,定义一些变量,然后 protobuf 会提供一些序列化和反序列的方法,将其转换成一个字节序列,这样的话,就方便我们传输。而当我们收到了一个字节序列,如果知道它对应的 protobuf 的格式,那么直接调用其反序列化的方法,就可以直接得到原始的形式,方便我们查看。
我们打开 Network 面板,选择 WS,可以看到有一个包,打开 Messages,发现什么都没有,因为这个时候还在编码阶段,并没有发送 send 出去。

到这里,我们就把这个 ping 函数分析的差不多了。这里总结一下:
- 原始数据:
{payload_type:"hb"}
- 方式:protobuf
- 对象:e.PushFrame
接下来,我们就可以验证一下是否正确。再点击一下运行,

断在了 send 方法这里,而上面的 e 便是 {payload_type:"hb"}
编码后的数据。我们再运行一下,让其发送出去,打开 Network,便可以看到有了消息。

其中,蓝色向上的箭头表示客户端发送给服务器的数据,红色向下的箭头表示服务器发送给客户端的数据,第一条便是刚才我们发送的数据,并且服务器也做出了响应,这个响应数据后续再说,我们先来验证一下这个发送的数据。
根据 protobuf 的知识,我们先根据 decode 中部分,还原出 proto 代码,如下:
syntax = "proto3";
message pushproto_PushHeader{
string key=1;
string value=2;
}
message pushproto_PushFrame{
uint64 seqid=1;
uint64 logid=2;
uint64 service=3;
uint64 method=4;
repeated pushproto_PushHeader headers=5;
string payloadencoding=6;
string payload_type=7;
bytes payload=8;
}
然后我们编译成 python 文件,接下来实例化之后,我们将 payload_type 的值设为"hb",然后将其序列化,与 Network 中响应的数据对比。代码如下:
import test_pb2 as pb
pushframe = pb.pushproto_PushFrame()
pushframe.payload_type = "hb"
res = pushframe.SerializeToString()
print(res.hex())
为了便于对比,我们将结果转为 hex 类型,输出如下:

浏览器结果为:

发现一致,说明之前的分析没问题。至此,onOpen 已经解决了。接下来便是处理响应数据了。
onMessage
数据会在 onMessage 处收到,我们到对应的函数处下断点,如下:

我们跟进去

这个便是核心代码了。我们仍旧关心关键函数,接下来逐渐分析。
Transport.decode
首先,前三行只是一些复制操作,再往下就看到了一个关键函数,我们跟到这里,可以猜到 e.data 就是响应的数据。我们可以去 Network 里,可以看到有一条响应的数据,查看后可以发现是一样的。

显然这个 decode 函数,就是对这个响应数据进行解析还原。我们跟进去

首先,我们一直往下跟,发现运行到了 _decodeFrameOrResponse
,并没有进入到上面的 return 那里。可以猜测,上面这个是判断是否是响应数据,如果是的话,就用 _decodeFrameOrResponse
解析,否则就用上面的函数解析。

t = this._decode(e, "PushFrame")
与之前分析的一样,将响应数据用 PushFrame 对象进行解析。这时我们可以从 Network 那里,复制响应数据,然后我们自己解析一下,对比结果。这里,我推荐复制 base64 的形式,方便操作。

代码如下(为了避免代码太长,这里 data 里我就没把数据放上去):
import base64
data = 'xxxxxxxxx'
data_bytes = base64.b64decode(data)
res2 = pb.pushproto_PushFrame()
res2.ParseFromString(data_bytes)
print(res2)
输出结果为:

对比

可以发现是一致的,说明没问题。
然后是 n = yield this._extractResponse(t);
,t 是解析后的数据,从这个名字上看,可以猜测这个是从解析的数据中提取处一部分。跟进去,代码如下:
let r = null == (n = t.headers) ? void 0 : n.some(e=>"compress_type" === e.key && "gzip" === e.value)
, i = r ? yield e.unGzip(t.payload) : t.payload;
return i
首先看这个条件语句,大致意思就是当 e 的 compress_type 为 gzip 时,会执行 e.unGzip(t.payload)
,否则则执行 t.payload
。从这个名字就可以看出,是执行解压操作。t.payload
显然就是取出 payload 对应的数据。接下来分析这个 unGzip
函数。跟进去

可以看到调用了 ungzip
,继续跟进去,来到了这个文件


这是一个 webpack 打包,但是这个比较简单,大括号里只有一个序号 29507,所以只需要把开头结尾去掉,就可以。开头只需要删掉 a.
开头的内容即可,后面还需要添加 window=global
。然后我们在 Python 中调用 e7 函数,会发现报错,这主要是编码问题。这里提供一种处理方法,我们在 python 中,将数据转换成 base64 后传入到 js 文件中,然后在 js 中,将其转换为 bytes 类型,然后再转成 base64 传回 python,代码如下:
function arrayBufferToBase64(array) {
array = new Uint8Array(array);
var length = array.byteLength;
var table = [
"A",
"B",
"C",
"D",
"E",
"F",
"G",
"H",
"I",
"J",
"K",
"L",
"M",
"N",
"O",
"P",
"Q",
"R",
"S",
"T",
"U",
"V",
"W",
"X",
"Y",
"Z",
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h",
"i",
"j",
"k",
"l",
"m",
"n",
"o",
"p",
"q",
"r",
"s",
"t",
"u",
"v",
"w",
"x",
"y",
"z",
"0",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"+",
"/",
];
var base64Str = "";
for (var i = 0; length - i >= 3; i += 3) {
var num1 = array[i];
var num2 = array[i + 1];
var num3 = array[i + 2];
base64Str +=
table[num1 >>> 2] +
table[((num1 & 0b11) << 4) | (num2 >>> 4)] +
table[((num2 & 0b1111) << 2) | (num3 >>> 6)] +
table[num3 & 0b111111];
}
var lastByte = length - i;
if (lastByte === 1) {
var lastNum1 = array[i];
base64Str +=
table[lastNum1 >>> 2] + table[(lastNum1 & 0b11) << 4] + "==";
} else if (lastByte === 2) {
var lastNum1 = array[i];
var lastNum2 = array[i + 1];
base64Str +=
table[lastNum1 >>> 2] +
table[((lastNum1 & 0b11) << 4) | (lastNum2 >>> 4)] +
table[(lastNum2 & 0b1111) << 2] +
"=";
}
return base64Str;
}
function base64ToUint8Array(base64String) {
let padding = "=".repeat((4 - (base64String.length % 4)) % 4);
let base64 = (base64String + padding)
.replace(/\-/g, "+")
.replace(/_/g, "/");
let rawData = window.atob(base64);
let outputArray = new Uint8Array(rawData.length);
for (var i = 0; i < rawData.length; ++i) {
outputArray[i] = rawData.charCodeAt(i);
}
return outputArray;
}
function get_data(in_data) {
in_data = base64ToUint8Array(in_data);
data = e7(in_data);
data = arrayBufferToBase64(data);
return data;
}
然后将之前的 payload 数据,用这个函数处理一下,得到解压后的数据。代码如下:
# 响应数据测试
import base64
data = 'xxxxxxxx'
data_bytes = base64.b64decode(data)
res2 = pb.pushproto_PushFrame()
res2.ParseFromString(data_bytes)
# 取出payload
payload = res2.payload
# 解压
import execjs
js = execjs.compile(open('1.js', 'r', encoding='utf-8').read())
payload_unzip = js.call('get_data', base64.b64encode(payload).decode())
payload_unzip = base64.b64decode(payload_unzip)
print(payload_unzip)
接下来就是 return [this._decode(n, "Response"), t]
返回了两个值,t 就是之前解析的 pushframe 的数据,n 就是上面解压后的数据,根据前面的分析,这个"response"对应的就是 e.Respond。我们打断点跟进去

转成 proto 文件
message webcast_im_Response{
repeated webcast_im_Message messages=1;
string cursor=2;
int64 fetchinterval=3;
int64 now=4;
string internalext=5;
int32 fetchtype=6;
int64 heartbeatduration=8;
bool needack=9;
string pushserver=10;
string livecursor=11;
bool historynomore=12;
}
message webcast_im_Message{
string method=1;
bytes payload=2;
int64 msgid=3;
int32 msgtype=4;
int64 offset=5;
bool needwrdsstore=6;
int64 wrdsversion=7;
string wrdssubkey=8;
}
然后我们将解压后的数据用这个解析,得到了正确结果。代码示例:
res3 = pb.webcast_im_Response()
res3.ParseFromString(payload_unzip)
print(res3)

至此,这部分分析就完成了。总结一下
- 对响应数据解析成 pushframe
- 取出 payload,执行 unGzip 函数解压
- 对解压后的数据解析成 response
ack 处理
let {response: u, frame: l, cursor: c, needAck: h, internalExt: f} = s;
u 就是上面解析的数据,l 是原始响应的数据,h 和 f 分别就是 response 中的两个字段,取出对应字段的数据。
if (h) {
let e = yield this.transport.ack(l, u);
this.cursor.set(c, f),
ef("sending ack");
let t = null == (n = this.client) ? void 0 : n.send(e);
t || (this.emitter.emit("error", "socket-driver", Error(`socket already close [logid: ${null != (r = null == l ? void 0 : l.LogID) ? r : ""}]`)),
this.pingStarted = !1,
this.emitter.emit("downgrade", "polling"))
}
判断语句,当 needack 等于 True 的时候,将会执行里面的语句。其中,关键函数就是 this.transport.ack(l,u)
。然后下面通过 send 函数,将其发送出去。跟进去

这个也是 PushFrame。其中,payload 是通过一个函数加载出来的,u 是上面解析数据中 internalExt 字段对应的值。我们跟进 s 函数

这个没什么特殊的,我们直接抠出来,再加一个编码
function s(e) {
let t = [];
for (let n of e) {
let e = n.charCodeAt(0);
e < 128
? t.push(e)
: e < 2048
? (t.push(192 + (e >> 6)), t.push(128 + (63 & e)))
: e < 65536 &&
(t.push(224 + (e >> 12)),
t.push(128 + ((e >> 6) & 63)),
t.push(128 + (63 & e)));
}
return new Uint8Array(t);
}
function get_ack_payload(e) {
data = s(e);
return arrayBufferToBase64(data);
}
然后我们构造出数据,用 PushFrame 序列化即可。
至此,ack 的处理也完成了,前面的懂了,这个就很容易理解了。
emit
if ((null == l ? void 0 : l.payload_type) === a.AG.Msg && (this.emitter.emit("message", u),
this.emitter.emit("header", "socket", L(null != (i = l.headers) ? i : []))),
(null == l ? void 0 : l.payload_type) === a.AG.Close)
return this.emitter.emit("stop-driver", "websocket"),
null == (o = this.client) || o.socket.close(),
t(Error("close by payloadtype"))
前面是一些判断语句,核心函数是 this.emitter.emit("message", u)
。我们猜测在这个函数里,完成了对上面 Transport.decode
得到的数据解析,得到了弹幕数据。

在上面解析数据中,可以看到有很多个 messages。而这里就有一个 forEach 函数,推测是分别对每一个 message 进行解析。我们跟进去,发现确实是这样
函数是 e(...t)
,跟进去

继续跟

这里没有明显的那种函数,这个函数上面也有关键词 Promise,是异步程序,因为我对这个了解不多,所以就采取了两个比较笨的方法,方法一就是直接多次单步,当执行到解析的程序时,自然会跟进去。在跟了好几次之后,终于跟到了关键位置。另一个方法就是运行几次,然后查看堆栈,如下:

然后我们把断点打下,运行到这里

到这里是不是就很熟悉了,payload 就是一串字节数据,method 就是对应的 proto 的类,传入了 decode 方法。

到了我们熟悉的 decode 函数。仿照之前的方法,进入到了这里

到了这里,就是转 proto 文件的事情。
如果一个个写的话,其实是很麻烦的,因为写的时候会发现嵌套的内容还是比较多的。在开头提到的那篇大佬的文章中,他是使用的 AST 从代码中提取的,但由于现在的代码结构与他的不一样,所以没法直接用他的代码。我现在也在学用 AST 提取内容,只不过 protobuf 文件的内容并没变,所以我就直接拿了他的文件用了。到此,本文内容就结束了。接下来再简单梳理总结一下流程,然后按照这个流程写代码就可以了
总结
- 建立 websocket 连接
- 将
{payload_type:"hb"}
序列化后,通过 send 方法发送 - 服务器会在 onMessage 方法响应数据,我们对响应数据用 PushFrame 解析。
- 取出 payload 和 compress_type,如果 compress_type 为 gzip,就对 payload 进行解压
- 用 Response 解析解压后的数据
- 取出 needack, internalext, logid, messages 等数据
- 如果 needack 为 true,说明需要发送一个 ack 的包,我们将
{payload_type: "ack", 'payload':s(internalext), LogID:logid}
用 PushFrame 序列化后发送出去,其中,这个中间涉及到了一个 s 函数,我们需要将其 js 源码抠出来,对 internalext 进行处理 - 处理 messages 数据,将其解析为弹幕等数据。messages 对应的 protobuf 类型为解压后的数据的 method 的值,然后我们用相应的 protobuf 对象进行解析即可。
最终代码可以去看那位大佬的,流程是一样的,我只是重新分析了一遍,我就不提供了。
你这个博客加了什么特效,都卡成dog了兄弟。不过文章写的不错,哈哈,很有价值。
刚换了个服务器,不知道为啥变得现在这么卡了,最近我也在看看有没有啥加速的方法