爬虫基础篇之斗鱼弹幕

篇幅有限

完整内容及源码关注公众号:ReverseCode,发送

Socket

asyncore 模块为异步socket的服务器客户端通信提供简单的接口。该模块提供了异步socket服务客户端和服务器的基础架构。

相比python原生的socket api,asyncore具备有很大的优势,asyncore对原生的socket进行封装,提供非常简洁优秀的接口,利用asyncore覆写相关需要处理的接口方法,就可以完成一个socket的网络编程,从而不需要处理复杂的socket网络状况以及多线程处理等等。

asyncore实现流程

demo

服务端

1
netcat-win32-1.12>nc -l -p 9000  启动服务端

客户端

telnet 127.0.0.1 9000 客户端连接到服务器 发送信息

  1. 定义类继承自asyncore.dispatcher

    1
    class SocketClient(asyncore.dispatcher):
  2. 实现类中的回调代码

    • 实现构造函数

      • 调用父类方法

        1
        asyncore.dispatcher.__init__(self)
      • 创建 Socket 对象

        1
        self.create_socket()
      • 连接服务器

        1
        2
        address = (host,port)
        self.connect(address)
    • 实现 handle_connect 回调函数

      Socket 连接服务器成功时回调该函数

      1
      2
      def handle_connect(self):
      print("连接成功")
    • 实现 writable 回调函数

      描述是否有数据需要被发送到服务器。返回值为 True 表示可写,False 表示不可写,如果不实现默认返回为 True,当返回 True 时,回调函数 handle_write 将被触发

      1
      2
      def writable(self):
      return True
    • 实现 handle_write 回调函数

      当有数据需要发送时(writable 回调函数返回True时),该函数被触发,通常情况下在该函数中编写 send 方法发送数据

      1
      2
      3
      4
      def handle_write(self):
      # 内部实现对服务器发送数据的代码
      # 调用 send 方法发送数据,参数是字节数据
      self.send('hello world\n'.encode('utf-8'))
    • 实现 readable 回调函数

      描述是否有数据从服务端读取。返回 True 表示有数据需要读取,False 表示没有数据需要被读取,当不实现默认返回为 True,当返回 True 时,回调函数 handle_read 将被触发

      1
      2
      def readable(self):
      return True
    • 实现 handle_read 回调函数

      当有数据需要读取时触发(readable 回调函数返回 True 时),该函数被触发,通常情况下在该函数中编写 recv 方法接收数据

      1
      2
      3
      4
      5
      def handle_read(self):
      # 主动接收数据,参数是需要接收数据的长度
      # 返回的数据是字节数据
      result = self.recv(1024)
      print(result)
    • 实现 handle_error 回调函数

      当程序运行过程发生异常时回调

      1
      2
      3
      4
      def handle_error(self):
      # 编写处理错误方法
      t,e,trace = sys.exc_info()
      self.close()
    • 实现 handle_close 回调函数

      当连接被关闭时触发

      1
      2
      3
      def handle_close(self):
      print("连接关闭")
      self.close()
  3. 创建对象并且执行 asyncore.loop 进入运行循环

    • timeout 表示一次循环所需要的时长

      1
      2
      3
      client = SocketClient('127.0.0.1',9000)
      # 开始启动运行循环
      asyncore.loop(timeout=5)

抓取弹幕

开发流程

根据斗鱼弹幕服务器第三方接入协议v1.6.2.pdf 官方提供协议文档建立弹幕客户端开发流程。

  • 连接初始化
    • 使用TCP连接服务器
      • IP地址:openbarrage.douyutv.com
      • 端口:8601
    • 客户端向弹幕服务器发送登录请求,登录弹幕服务器
    • 弹幕服务器收到客户端登录请求并完成登录后,返回登录成功消息给客户端
    • 客户端收到登录成功消息后发送进入弹幕分组请求给弹幕服务器
    • 弹幕服务器接受到客户端弹幕分组请求后将客户端添加到请求指定的弹幕分组中
  • 服务过程
    • 客户端每隔 45 秒发送心跳给弹幕服务器,弹幕服务器回复心跳信息给客户端
    • 弹幕服务器如有广播信息,则推送给客户端,服务器消息协议
  • 断开连接
    • 客户端发送登出消息
    • 客户端关闭 TCP 连接

数据包发送和接收流程

数据包结构

协议格式

  • 消息长度:4 字节小端整数,表示整条消息(包括自身)长度(字节数)。 消息长度出现两遍,二者相同。
  • 消息类型:2 字节小端整数,表示消息类型。取值如下:
    • 689 客户端发送给弹幕服务器的文本格式数据
    • 690 弹幕服务器发送给客户端的文本格式数据。
  • 加密字段:1字节,暂时未用,默认为 0。
  • 保留字段:1字节,暂时未用,默认为 0。
  • 数据部分:n字节+1字节,斗鱼独创序列化文本数据,结尾必须为‘\0’。(所有协议内容均为 UTF-8 编码)

数据包封装

对数据包进行对象化封装,对数据的封装方便以后使用,实现对象和二进制数据之间的转换

  • 通过参数构建数据包对象
  • 实现获取数据包长度的方法
  • 实现获取二进制数据的方法

实现发送数据包

发送数据流程图

  • 构建发送数据包的队列容器

    1
    self.send_queue = Queue()
  • 实现回调函数,判断容器中有数据就发送没有数据不发送

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    def writable(self):
    return self.send_queue.qsize() > 0

    def handle_write(self):

    # 从发送数据包队列中获取数据包对象
    dp = self.send_queue.get()

    # 获取数据包的长度,并且发送给服务器
    dp_length = dp.get_length()
    dp_length_data = dp_length.to_bytes(4,byteorder='little',signed=False)
    self.send(dp_length_data)

    # 发送数据包二进制数据
    self.send(dp.get_bytes())
    self.send_queue.task_done()
    pass
  • 实现登录函数

    • 构建登录数据包

      1
      2
      content = "type@=loginreq/roomid@={}/".format(room_id)
      login_dp = DataPacket(DATA_PACKET_TYPE_SEND,content=content)
    • 把数据包添加到发送数据包容器中

      1
      2
      # 把数据包添加到发送数据包容器中
      self.send_queue.put(login_dp)

实现接收数据

接收数据流程图

  • 构建接收数据包队列

    1
    2
    # 存放接收的数据包对象
    self.recv_queue = Queue()
  • 读取回调函数中读取数据

    • 读取长度

      1
      2
      3
      4
      # 读取长度,二进制数据
      data_length_data = self.recv(4)
      # 通过二进制获取length 具体数据
      data_length = int.from_bytes(data_length_data,byteorder='little',signed=False)
    • 读取内容

      1
      2
      # 通过数据包的长度获取数据
      data = self.recv(data_length)
    • 构建数据包对象

      • 数据包构造函数中解析二进制来构建数据包对象

        1
        2
        3
        4
        5
        self.type = int.from_bytes(data_bytes[4:6],byteorder='little',signed=False)
        self.encrypt_flag = int.from_bytes(data_bytes[6:7],byteorder='little',signed=False)
        self.preserve_flag = int.from_bytes(data_bytes[7:8],byteorder='little',signed=False)
        # 构建数据部分
        self.content = str(data_bytes[8:-1],encoding='utf-8')
      • 通过二进制数据构建数据包对象

        1
        2
        # 通过二进制数据构建数据包对象
        dp = DataPacket(data_bytes=data)
    • 把数据包放入接收数据包容器中

      1
      2
      # 把数据包放入接收数据包容器中
      self.recv_queue.put(dp)
  • 构建处理线程专门处理接收数据包容器中数据

    • 构建线程

      1
      2
      3
      4
      # 构建一个专门处理接收数据包容器中的数据包的线程
      self.callback_thread = threading.Thread(target=self.do_callback)
      self.callback_thread.setDaemon(True)
      self.callback_thread.start()
    • 实现回调函数处理接收的数据包

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      def do_callback(self):
      '''
      专门负责处理接收数据包容器中的数据
      :return:
      '''
      while True:
      # 从接收数据包容器中获取数据包
      dp = self.recv_queue.get()
      # 对数据进行处理
      print(dp.content)

      pass

实现外部传入回调函数

通过外部指定回调函数实现自定义数据处理

  • 添加参数 callback

    • 构造函数中添加参数

      1
      2
      3
      def __init__(self,host,port,callback=None):
      # 定义外部传入的自定义回调函数
      self.callback = callback
    • 外部传入自定义回调函数

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      def data_callback(dp):
      '''
      自定义回调函数
      :param dp: 数据包对象
      :return:
      '''
      print("data_callback:",dp.content)
      pass

      if __name__ == '__main__':
      client = DouyuClient('openbarrage.douyutv.com',8601,callback=data_callback)

      client.login_room_id(4494106)

      asyncore.loop(timeout=10)
  • 在处理接收数据包的线程中调用回调函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    def do_callback(self):
    '''
    专门负责处理接收数据包容器中的数据
    :return:
    '''
    while True:
    # 从接收数据包容器中获取数据包
    dp = self.recv_queue.get()
    # 对数据进行处理
    if self.callback is not None:
    self.callback(dp)
    self.recv_queue.task_done()

数据内容序列化与反序列化

  1. 键 key 和值 value 直接采用‘@=’分割
  2. 数组采用‘/’分割
  3. 如果 key 或者 value 中含有字符‘/’,则使用‘@S’转义
  4. 如果 key 或者 value 中含有字符‘@’,使用‘@A’转义

例子

多个键值对数据:key1@=value1/key2@=value2/key3@=value3/

数组数据:value1/value2/value3/

登录

登录.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def login_room_id(self, room_id):
# 2.客户端向弹幕服务器发送登录请求,登录弹幕服务器
self.room_id = room_id

send_data = {
"type": "loginreq",
"roomid": str(room_id)
}

# 构建登录数据包
content = encode_content(send_data)
login_dp = DataPacket(DATA_PACKET_TYPE_SEND, content=content)

# 把数据包添加到发送数据包容器中
self.send_queue.put(login_dp)

加入弹幕分组

弹幕分组.png

参看斗鱼弹幕文档,-9999 为海量弹幕

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def join_room_group(self):
'''
4.客户端收到登录成功消息后发送进入弹幕分组请求给弹幕服务器
:return:
'''
send_data = {
"type": "joingroup",
"rid": str(self.room_id),
"gid": '-9999'
}
content = encode_content(send_data)

dp = DataPacket(type=DATA_PACKET_TYPE_SEND, content=content)
self.send_queue.put(dp)

pass

心跳机制

作用是让服务器解决假死连接问题,客户端必须每隔45秒发送一次请求,否则就会被主动断开。

  • 实现发送心跳函数
    • 构建心跳数据包
    • 把数据包添加到发送数据包容器队列中
  • 构建心跳线程
    • 构建心跳线程
    • 添加触发机制
    • 添加暂停机制

心跳机制.png

1
2
3
4
5
6
7
8
def send_heart_data_packet(self):
# 6.客户端每隔 45 秒发送心跳给弹幕服务器,弹幕服务器回复心跳信息给客户端
send_data = {
"type": "mrkl"
}
content = encode_content(send_data)
dp = DataPacket(type=DATA_PACKET_TYPE_SEND, content=content)
self.send_queue.put(dp)

WebSocket

以上方案已经失效,斗鱼弹幕已使用websocket实现,不过都是协议而已,基础逻辑没有变化。

通过抓包获取wss的地址及端口

websocket1

查看抓包中的Messages中的二进制信息,数据格式和1.6.2保持一致。

websocket2

run_forever是核心逻辑通过while循环实现自动重连,建立_job_open时调用_one_hello登录加入群组,通过_job_heartbeat持续接受心跳机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async def run_forever(self) -> None:
"""
重写
@return:
"""
self._waiting_end = self._loop.create_future()
while not self._closed:
self._logger_info(f'正在启动 {self._area_id} 号数据连接')
if self._waiting_pause is not None:
self._logger_info(f'暂停启动 {self._area_id} 号数据连接,等待 RESUME 指令')
await self._waiting_pause

async with self._opening_lock:
if self._closed:
self._logger_info(f'{self._area_id} 号数据连接确认收到关闭信号,正在处理')
break
# 未成功建立数据连接,循环重试
if await self._prepare_client() and await self._job_open():
tasks = [self._loop.create_task(i()) for i in self._funcs_task]

self._task_main = self._loop.create_task(self._job_main())
tasks.append(self._task_main)

task_heartbeat = self._loop.create_task(self._job_heartbeat())
tasks.append(task_heartbeat)
else:
continue

_, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
self._logger_info(f'{self._area_id} 号数据连接异常或主动断开,正在处理剩余信息')
for i in pending:
if i != self._task_main:
i.cancel()
await self._job_close()
if pending:
await asyncio.wait(pending)
self._logger_info(f'{self._area_id} 号数据连接退出,剩余任务处理完毕')
await self._conn.clean()
self._waiting_end.set_result(True)

websocket抓取斗鱼弹幕

完整源码请关注微信公众号:ReverseCode,回复:爬虫基础

文章作者: J
文章链接: http://onejane.github.io/2021/04/08/爬虫基础篇之斗鱼弹幕/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 万物皆可逆向
支付宝打赏
微信打赏