D 的个人博客

但行好事莫问前程

  menu
417 文章
3446695 浏览
15 当前访客
ღゝ◡╹)ノ❤️

基于 Pushlets 的消息推送设计

基于 Pushlets 的消息推送设计

Pushlets 是通过长连接方式实现“推”消息的。推送模式分为:Poll(轮询)、Pull(拉)。本文围绕 Pull 模式进行设计。

原理

客户端发起请求,服务端接收到请求后根据 Pushlets 协议进行处理。推数据通过 HTTP 响应返回。

客户端在接收到响应后根据 Pushlets 协议进行处理,重新发起请求。Pull 模式时序:

Pushlets Pull

  1. join:join 请求,服务器端建立 Pushlet 会话
  2. join-ack:join 应答,返回会话 id
  3. listen:订阅并监听主题
  4. listen-ack:监听应答,返回会话 id,订阅 id
  5. subscribe(可选):订阅主题
  6. subscribe-ack(可选):订阅主题应答,返回会话 id,订阅
  7. refresh:长连接请求,实参会话 id
  8. refresh-ack:长连接响应,包括下一次 refresh 请求间隔
    hb:心跳响应
    data:推数据
  9. leave:清空订阅
  10. leave-ack:清空订阅应答

服务器端

服务器端主要负责维护会话,根据请求处理应答。使用内存队列维护每个会话的主题事件。

事件产生后通过分发器(Dispatcher)将事件发布到指定订阅者的事件队列里。Pull 模式使用阻塞队列,读超时(没有事件)后返回 hb 与 refresh 指令的应答。

事件发布

  • 广播:将事件发布给所有订阅者
  • 多播:将事件发布给匹配的订阅者
  • 单播:将事件发给某个订阅者

关键参数

订阅者的事件队列配置:

  • queue.size=24
    队列大小为 24。如果队列满了新发布到该队列的事件将被丢弃。
  • queue.read.timeout.millis=20000
    队列读超时 20 秒。读超时后返回 hb 与 refresh 指令的应答。该项配置即请求线程最长 hold 时间。
  • queue.write.timeout.millis=20
    队列写超时 20 毫秒。如果队列是满的,等待 20 毫秒后如果还满,则销毁该订阅者。

刷新时长配置:

  • pull.refresh.timeout.millis=45000
    服务器端刷新超时 45 秒。如果服务器端某订阅者超过 45 秒没有收到客户端的 listen 或 refresh 请求,则销毁该订阅者。该超时判断发生在发布事件时。
  • pull.refresh.wait.min.millis=2000
    pull.refresh.wait.max.millis=6000
    refresh 指令中指定客户端下次请求的等待时间区间,值取该区间内的随机值。

客户端

Pushlets 支持多种客户端,例如浏览器客户端、Java 客户端。浏览器客户端又分为 iframe 和 AJAX 两种。

初始化客户端后,客户端发起监听、订阅请求,并根据服务器返回指令发送 refresh 请求。当有 data 应答时,回调客户端 onData(event) 函数实现消息处理。

技术设计

对 Pushlets 做接口封装以屏蔽其特性细节,也便于以后兼容其他服务器消息推送技术(例如 WebSocket)做好铺垫。

封装的服务器推机制定义为 Channel 服务,提供服务器到浏览器客户端的消息推送。

服务器端

通过 Channel API 在 JS 客户端与服务器端建立长连接,使服务器端可以实时地发送消息给客户端。

Channel 类图

JS 客户端

oaweb.Channel() 类:

  • init()
    初始化服务器调用 URL、客户端状态。
  • open()
    发送 join 请求,服务端创建会话。
  • subscribe(listeners : {topic, onmessage, onerror})
    发送 subscribe 请求,服务端创建订阅者,添加订阅主题。
  • unsubscribe(topic)
    发送 unsubscribe 请求,服务端移除订阅。
  • close()
    发送 leave 请求,服务端销毁会话。

时序

Channel 时序图

集群

均衡器通过源地址保持策略保证同一 IP 的请求均会分发到固定服务节点。

当服务节点进行业务逻辑处理后,发送消息到消息服务系统;

服务节点订阅消息主题,当监听到新消息时调用 Channel 服务发布消息到具体的推送实现组件(Pushlets)。

Channel 集群

参考