分类目录归档:message

MQTT 3.1.1,值得升级的6个新特性

前言

以前看英文文章或资料,看完之后,摘要或者忘记。这一次选择感兴趣的MQTT 3.1.1介绍文章资料,引文见文末,作为练手;非完全翻译,去除掉一些广告性描述,若侵权,请告知。

在沉寂了四年之后,QTT 3.1.1规范于2014年10月30号正式发布,与此同时MQTT 3.1.1已成为OASIS(结构化信息标准促进组织)开放物联网消息传递协议标准(连接1 连接2),换种说法就是MQTT 3.1.1已升级为国际物联网标准。

正如HTTP为人们通过万维网分享信息铺平了道路一样,MQTT能将几十亿低成本、嵌入式数据采集遥测设备连接到网络。

与MQTT 3.1(还不是国际物联网标准呢)规范相比,MQTT 3.1.1目标在于消除歧义,尽可能的向后兼容,事实上一些大众所需的新特性被包含在这个版本(更多的是物联网标准推动),因此不仅是一个维护版本,也是一种巨大的进步呢。除了概念的澄清和陈旧规范重写外,有一些很有趣的变化是值得注意的。

会话表示标志(Session Present Flag)

如果一个终端与服务器之间建立一个持久会话连接(假设这个终端没有使用到一个“clean session”标记清除已有回话标志), 一个新增的“Session Present”标志(会话表示标志,逻辑值为true或false)会在CONNACK中出现,表明MQTT服务器已经拥有当前客户端上次连接会话信息,比如订阅的主题,排队信息和其它信息等。

会话表示标志若为true,客户端可减少了一次发送订阅SUBSCRIBLE交互步骤,有助于更有效的数据通信;为false,客户端需要再次发送订阅SUBSCRIBLE消息,不可略过。

新增订阅失败代码反馈

MQTT 3.1.1之前,终端连接之后无法知道其发送的订阅主题是否被MQTT服务器接受与否。此新特性较适用于细粒度权限MQTT主题管理;若无授权,服务器会把错误代码(0x80)附加在SUBACK中,客户端就可以知道订阅失败。

MQTT匿名客户端

需要支持临时或匿名?客户端仅仅需要在发送CONNECT时把客户端标识符( client identifier )置空(零长度)即可,MQTT服务器会为此类请求生成一个随机、唯一客户端标记符。但这要求客户端必须设置Clean Session标记为1,否则服务器端会直接返回包含0x02 (Identifier rejected)代码的CONNACK,同时关闭连接。

可用于后端程序(不需要维护回话状态)向终端发送消息的客户端,MQTT服务器程序可区别对待。

快速发布无等待

这是一个新增的特别有用的特性,客户端可以在发送CONNECT之后,可无须等待MQTT服务器返回的CONNACK,根据需要即刻发送PUBLISH、SUBSCRIBLE、DISCONNEECT等消息,可避免客户端资源等待。此特性也适用于突发模式(burst-mode)客户端需求,只关心数据要尽快的发送出去,而不是去担心是否需要维护一个长连接。

这需要MQTT服务器实现在分发消息之前检查客户端是否有权限发布到这些主题上。

客户端标识符可以变长一些

MQTT 3.1针对客户端标识符( client identifier)限制是23个字节,实际环境下会有所不便,已有遗留系统可能使用UUID作为客户端的标识符,这样服务器端需要做一些彼此之间的MAP映射。 MQTT 3.1.1中上限为65535个字节,毕竟成为业界标准,需要兼容大量的遗留设备和基础设施。

其它小改变

  1. CONNECT消息可变头部协议名称MQIsdp被改为MQTT,语义更准确
  2. 所有字符串明确规定使用UTF-8编码,包括客户端标识符(Client Identifier)
  3. CONNECT消息可变头部协议版本号,由0x03变成了0x04 QoS 0类型PUBLISH消息DUP标记必须被设置为0
  4. MQTT Over WebSocket 被定义,互联网地址编码分配机构(Internet Assigned Numbers Authority)分配标识符为mqtt。虽在MQTT 3.1规范通篇没有提到WebSocket,但因其二进制属性可以很容易的在WebSocket通道传输。

术语变化

  • MQTT代理 -> MQTT服务器(MQTT Broker is now MQTT Server)
  • 消息ID -> 包ID(Message ID is now Packet ID)
  • 消息类型 -> 包类型(Message Type is now Packet Type)
  • 主题路径 -> 主题名称(Subscribe and Unsubscribe take Topic Paths, rather than Topic names)
  • 以前在固定头部,现在在包类型中( Flags in the fixed header are now specific to the packet type
  • 0字节保留信息需要清除 (A zero byte retained message MUST NOT be stored as a retained message on the Server )

小结

当前MQTT 3.1.1已经在很多活跃开源项目/商业产品得到支持。比如Eclipse Paho,Mosquitto,JBoss A-MQ 6.1, Apache ActiveMQ 5.10-SNAPSHOT,Apache Camel 2.13.0,HiveMQ等。

关于Eclipse Paho:

  1. Eclipse Paho 1.0支持MQTT 3.1.1和MQTT 3.1规范
  2. Eclipse Paho 0.9仅支持 MQTT 3.1规范

包含MQTT 3.1.1和MQTT 3.1的客户端可以混合使用,彼此可以共存于同一个MQTT服务器下,在基本消息传输层面没有多大修改,同样的PUBLISH消息可以在MQTT客户端中自由流转,这个需要服务器端编码支持。

已有的MQTT 3.1客户端可以用着急升级,但升级之后可以从新增特性中收益良多。

转自:http://www.blogjava.net/yongboy/archive/2014/12/16/421460.html

五.MQTT协议笔记之订阅

前言

记忆不太好的时候,只能翻看以前的文章/笔记重新温习一遍,但找不到MQTT协议有关订阅部分的描述,好不容易从Evernote中找到贴出来,这样整个MQTT协议笔记,就比较齐全了。

SUBSCRIBE

一般来讲,客户端在成功建立TCP连接之后,发送CONNECT消息,在得到服务器端授权允许建立彼此连接的CONNACK消息之后,客户端会发送SUBSCRIBE消息,订阅感兴趣的Topic主题列表(至少一个主题),一个完整示范如下:


Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message Type(8) DUP flag QoS level RETAIN
    1 0 0 0 0 0 1 x
byte 2 Remaining Length
Variable header/可变头部
Message Identifier
byte 1 Message ID MSB (0) 0 0 0 0 0 0 0 0
byte 2 Message ID LSB (10) 0 0 0 0 1 0 1 0
Playload/消息体
Topic name
byte 1 Length MSB (0) 0 0 0 0 0 0 0 0
byte 2 Length LSB (3) 0 0 0 0 0 0 1 1
byte 3 'a' (0x61) 0 1 1 0 0 0 0 1
byte 4 '/' (0x2F) 0 0 1 0 1 1 1 1
byte 5 'b' (0x62) 0 1 1 0 0 0 1 0
Requested QoS
byte 6 Requested QoS (1) x x x x x x 0 1
Topic Name
byte 7 Length MSB (0) 0 0 0 0 0 0 0 0
byte 8 Length LSB (3) 0 0 0 0 0 0 1 1
byte 9 'c' (0x63) 0 1 1 0 0 0 1 1
byte 10 '/' (0x2F) 0 0 1 0 1 1 1 1
byte 11 'd' (0x64) 0 1 1 0 0 1 0 0
Requested QoS
byte 12 Requested QoS (2) x x x x x x 1 0

固定头部

Qos Level,可根据实际情况进行调整为0/1/2等。一般设为0表示最多一次。客户端可设置OoS Level值。 DUP flag,值为0表示第一次发送。

可变头部

因为上面示范QoS level值为1,因此需要客户端传递消息ID,16位,无符号的short类型。

消息体

订阅的主题名称采用修改版UTF-8编码,然后紧跟着对应的QoS值。下面的次序,可能更为形象:

Topic name "a/b"
Requested QoS 1
Topic name "c/d"
Requested QoS 2

订阅者的Topic name支持通配符#和+ :

  1. #支持一个主题内任意级别话题
  2. +只匹配一个主题级别的通配符

eg:

finance/stock/#
finance/sotkc/ibm/+

都是有效,更具体规则,请参阅协议附加部分。

在服务器接收处理时,按照顺序读取即可:

String topicName = readUTF();
int qosVal = read();

服务器可以发送QoS不大于客户端设置OoS的消息,尤其是服务器不提供良好的持久化机制的时候。

SUBACK

服务器会对发出SUBSCRIBE的消息返回一个确认消息。


Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (9) DUP flag QoS flags RETAIN
    1 0 0 1 x x x x
byte 2   Remaining Length
Variable header/可变头部
Message Identifier
byte 1 Message ID MSB (0) 0 0 0 0 0 0 0 0
byte 2 Message ID LSB (10) 0 0 0 0 1 0 1 0
Playload/消息体
byte 1 Granted QoS (0) x x x x x x 0 0
byte 1 Granted QoS (2) x x x x x x 1 0

可变头部

Message Identifier,服务器需要附加,客户端需要处理。

消息体

QoS,为服务器根据实际情况授予的QoS级别列表,和客户端发送的SUBSCRIBE的订阅Topic Name顺序完全一致。

客户端订阅几个TOPIC,服务器端一一给出各个TOPIC的QoS具体值。

UNSUBSCRIBE

服务器需要支持客户端取消订阅功能,UNSUBSCRIBE消息格式和SUBSCRIBE消息格式差不多,除了消息类型不同,消息体中没有了QoS字节,其它没有区别。


Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message Type(10) DUP flag QoS level RETAIN
    1 0 1 0 0 0 1 x
byte 2 Remaining Length
Variable header/可变头部
Message Identifier
byte 1 Message ID MSB (0) 0 0 0 0 0 0 0 0
byte 2 Message ID LSB (10) 0 0 0 0 1 0 1 0
Playload/消息体
Topic name
byte 1 Length MSB (0) 0 0 0 0 0 0 0 0
byte 2 Length LSB (3) 0 0 0 0 0 0 1 1
byte 3 'a' (0x61) 0 1 1 0 0 0 0 1
byte 4 '/' (0x2F) 0 0 1 0 1 1 1 1
byte 5 'b' (0x62) 0 1 1 0 0 0 1 0
Topic Name
byte 6 Length MSB (0) 0 0 0 0 0 0 0 0
byte 7 Length LSB (3) 0 0 0 0 0 0 1 1
byte 8 'c' (0x63) 0 1 1 0 0 0 1 1
byte 9 '/' (0x2F) 0 0 1 0 1 1 1 1
byte 10 'd' (0x64) 0 1 1 0 0 1 0 0

可变头部的消息ID的出现还是由固定头部的QoS Level(1)决定是否存在。

一般来讲,客户端发布退订,服务器端需要返回退订确认。

MQTT没讲是否允许客户端退订所有TOPIC。

UNSUBACK

服务器返回的UNSUBSCRIBE消息UNSUBACK相应很简单,没有消息体。


Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (9) DUP flag QoS flags RETAIN
    1 0 1 1 x x x x
byte 2   Remaining length (2)
    0 0 0 0 0 0 1 0
Variable header/可变头部
Message Identifier
byte 1 Message ID MSB (0) 0 0 0 0 0 0 0 0
byte 2 Message ID LSB (10) 0 0 0 0 1 0 1 0

小结

订阅部分,共有四个消息,分别一一对应。

命令 响应 备注 建议
SUBSCRIBE SUBACK 协议没有涉及最多运行订阅TOPIC数目,隐藏的隐患 建议至多10个
UNSUBSCRIBE UNSUBACK 是否可以退订所有订阅,不详 建议保留至少一个Topic

 

转自:http://www.blogjava.net/yongboy/archive/2014/04/12/412351.html

四.MQTT协议笔记之消息流

前言

前面的笔记已把所有消息类型都过了一遍,这里从消息流的角度尝试解读一下。

网络故障

在任何网络环境下,都会出现一方连接失败,比如离开公司大门那一刻没有了WIFI信号。但持续连接的另一端-服务器可能不能立即知道对方已断开。类似网络异常情况,都有可能在消息发送的过程中出现,消息发送出去,就丢失了。

MQTT协议假定客户端和服务器端稳定情况一般,彼此之通信管道不可靠,一旦客户端网络断开,情况就会很严重,很难恢复原状。

但别忘记,很多客户端会有永久性存储设备支持,比如闪存ROM、存储卡等,在通信出现异常的情况下可以用于保存关键数据或状态信息等。

总之,异常网络情况很复杂,只能小心处理之。

消息重发策略

QoS > 0情况下,PUBLISH、PUBREL、SUBSCRIBE、UNSUBSCRIBE等类型消息在发送者发送完之后,需要等待一个响应消息,若在一个指定时间段内没有收到,发送者可能需要重试。重发的消息,要求DUP标记要设置为1.

等待响应的超时应该在消息成功发送之后开始算起,并且等待超时应该是可以配置选项,以便在下一次重试的时候,适当加大。比如第一次重试超时10秒,下一次可能为20秒,再一次重试可能为60秒呢。当然,还要有一个重试次数限制的。

还有一种情况,客户端重新连接,但未在可变头部中设置clean session标记,但双方(客户端和服务器端)都应该重试先前未发送的动态消息(in-flight messages)。客户端不被强制要求发送未被确认的消息,但服务器端就得需要重发那些未被去确认的消息。

QoS level决定的消息流

QoS level为Quality of Service level的缩写,翻译成中文,服务质量等级。

MQTT 3.1协议在"4.1 Quality of Service levels and flows"章节中,仅仅讨论了客户端到服务器的发布流程,不太完整。因为决定消息到达率,能够提升发送质量的,应该是服务器发布PUBLISH消息到订阅者这一消息流方向。

QoS level 0

至多发送一次,发送即丢弃。没有确认消息,也不知道对方是否收到。

Client Message and direction Server
QoS = 0 PUBLISH 

---------->
Action: Publish message to subscribers then Forget
Reception: <=1

针对的消息不重要,丢失也无所谓。

网络层面,传输压力小。

QoS level 1

所有QoS level 1都要在可变头部中附加一个16位的消息ID。

SUBSCRIBE和UNSUBSCRIBE消息使用QoS level 1。

针对消息的发布,Qos level 1,意味着消息至少被传输一次。

发送者若在一段时间内接收不到PUBACK消息,发送者需要打开DUB标记为1,然后重新发送PUBLISH消息。因此会导致接收方可能会收到两次PUBLISH消息。针对客户端发布消息到服务器的消息流:

Client Message and direction Server
QoS = 1

DUP = 0
Message ID = x

Action: Store message

PUBLISH 

---------->
Actions:

  • Store message

  • Publish message to subscribers
  • Delete message

Reception: >=1
Action: Discard message PUBACK 

<----------
Message ID = x

针对服务器发布到订阅者的消息流:

Server Message and direction Subscriber
QoS = 1

DUP = 0

Message ID = x
PUBLISH 

---------->
Actions:

  • Store message

  • Make message available                       
Reception: >=1


PUBACK 

<----------
Message ID = x

发布者(客户端/服务器)若因种种异常接收不到PUBACK消息,会再次重新发送PUBLISH消息,同时设置DUP标记为1。接收者以服务器为例,这可能会导致服务器收到重复消息,按照流程,broker(服务器)发布消息到订阅者(会导致订阅者接收到重复消息),然后发送一条PUBACK确认消息到发布者。

在业务层面,或许可以弥补MQTT协议的不足之处:重试的消息ID一定要一致接收方一定判断当前接收的消息ID是否已经接受过

但一样不能够完全确保,消息一定到达了。

QoS level 2

仅仅在PUBLISH类型消息中出现,要求在可变头部中要附加消息ID。

级别高,通信压力稍大些,但确保了仅仅传输接收一次。

先看协议中流程图,Client -> Server方向,会有一个总体印象:

Client Message and direction Server
QoS = 2

DUP = 0
Message ID = x

Action: Store message

PUBLISH 

---------->
Action(a) Store message

or

Actions(b):

  • Store message ID
  • Publish message to subscribers
  PUBREC 

<----------
Message ID = x
Message ID = x PUBREL 

---------->
Actions(a):

  • Publish message to subscribers
  • Delete message

or

Action(b): Delete message ID

Action: Discard message PUBCOMP 

<----------
Message ID = x

Server -> Subscriber

Server Message and direction Subscriber
QoS = 2

DUP = 0

Message ID = x
PUBLISH 

---------->
Action: Store message
  PUBREC 

<----------
Message ID = x
Message ID = x PUBREL 

---------->
Actions:

  • Make message available                       


PUBCOMP 

<----------
Message ID = x

Server端采取的方案a和b,都包含了何时消息有效,何时处理消息。两个方案二选一,Server端自己决定。但无论死采取哪一种方式,都是在QoS level 2协议范畴下,不受影响。若一方没有接收到对应的确认消息,会从最近一次需要确认的消息重试,以便整个(QoS level 2)流程打通。

消息顺序

消息顺序会受许多因素的影响,但对于服务器程序,必须保证消息传递流程的每个阶段要和开始的顺序一致。例如,在QoS level 2定义的消息流中,PUBREL流必须和PUBLISH流具有相同的顺序发送:

Client Message and direction Server
  PUBLISH 1

---------->
PUBLISH 2
---------->
PUBLISH 3
---------->
 
  PUBREC 1

<----------
PUBREC 2
<----------
 
  PUBREL 1

---------->
 
  PUBREC 3

<----------
 
  PUBREL 2

---------->
 
  PUBCOMP 1

<----------
 
  PUBREL 3

---------->
 
  PUBCOMP 2

<----------
PUBCOMP 3
<----------
 

流动消息(in-flight messages)数量允许有一个可保证的效果:

  • 在流动消息(in-flight)窗口1中,每个传递流在下一个流开始之前完成。这保证消息以提交的顺序传递
  • 在流动消息(in-flight)大于1的窗口,只能在QoS level内被保证消息的顺序

消息的持久化

在MQTT协议中,PUBLISH消息固定头部RETAIN标记,只有为1才要求服务器需要持久保存此消息,除非新的PUBLISH覆盖。

对于持久的、最新一条PUBLISH消息,服务器不但要发送给当前的订阅者,并且新的订阅者(new subscriber,同样需要订阅了此消息对应的Topic name)会马上得到推送。

Tip:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送,不是所有。

消息流的编码/解码

MQTT协议中,由目前定义的14种类型消息在客户端和服务器端之间数据进行交互。若以JAVA语言构建MQTT服务器,可选择Netty作为基础。

在Netty中,数据的进入和流出,代表了一次完整的交互。无论是要进入的还是要流出的数据(单独以服务器为例),都可看做字节流。若把每种类型消息抽象为一个具体对象,那么处理起来就不难了。

客户端->服务器,进入的字节流,逐个字节/单位读取,可还原成一个具体的消息对象(解码的过程)。

要发送到客户端的消息对象,转换(编码)成字节流,然后由TCP通道流转到接收者。

小结

断断续续记录了MQTT 3.1协议的若干阅读笔记,总之是把协议个人认为不够清晰,或者我不好理解的地方,着重进行了分析。也便于自己以后回过来头来翻阅,不是那么快的忘却。

转自:http://www.blogjava.net/yongboy/archive/2014/02/15/409893.html

三.MQTT协议笔记之发布流程

前言

这次要讲到客户端/服务器的发布消息行为,与PUBLISH相关的消息类型,会在这里看到。

PUBLISH

客户端发布消息经由服务器分发到所有对应的订阅者那里。一个订阅者可以订阅若干个主题(Topic name),但一个PUBLISH消息只能拥有一个主题。

消息架构一览:

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message Type(3) DUP flag QoS level RETAIN
    0 0 1 1 0 0 1 0
byte 2 Remaining Length
Variable header/可变头部
Topic name
byte 1 Length MSB (0) 0 0 0 0 0 0 0 0
byte 2 Length LSB (3) 0 0 0 0 0 0 1 1
byte 3 'a' (0x61) 0 1 1 0 0 0 0 1
byte 4 '/' (0x2F) 0 0 1 0 1 1 1 1
byte 5 'b' (0x62) 0 1 1 0 0 0 1 0
Message Identifier
byte 6 Message ID MSB (0) 0 0 0 0 0 0 0 0
byte 7 Message ID LSB (10) 0 0 0 0 1 0 1 0
Playload/消息体
BLOB,二进制对象形式。二进制具体包含的内容和格式,可有应用程序自身定义。若消息体为空(0长度)也是可能的。

固定头部

DUP flag,设为0,表示当前为第一次发送。

RETAIN flag,只有在PUBLISH消息中才有效。

  • 1:表示发送的消息需要一直持久保存,不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。 备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送,不是所有。
  • 0:仅仅为当前订阅者推送此消息。

可变头部

Topic name,UTF-8编码字符串形式,不支持通配符!

消息体

一般作为UTF-8编码写入接口,但不排除自定义的消息格式。

空的消息体(zero-length)的PUBLISH消息也可以是合法的。

当服务器接收到空消息体(zero-length payload)、retain = 1、具有topic name的一个PUBLISH特殊消息,表示同时满足retain = 1、相同topic name的这两个特征的被持久化PUBLISH消息,可被删除。

Response/响应

固定头部QoS level决定了消息中间件针对发布者具体需要响应的内容:

QoS Level Expected response
QoS 0 None
QoS 1 PUBACK
QoS 2 PUBREC

备注:仅仅针对发布PUBLISH消息的发布者。

Actions:

无论是订阅者还是服务器接收到PUBLISH消息之后,需要根据QoS level执行不同动作。

QoS Level Expected Action
QoS 0 发送到所有感兴趣者
QoS 1 持久化记录下来,发送到所有感兴趣的参与者,返回一个PUBACK消息给发送者
QoS 2 持久化记录下来,暂时不发送所有感兴趣的参与者,返回一个PUBREC消息给发送者

如果服务器收到PUBLISH消息,参与者指的是订阅者。如果订阅者收到PUBLISH消息,参与者就是服务器。 需要注意:

  1. 发布者发布的PUBLISH消息发送到服务器,在payload/消息体处可能夹带有私货,可能含有自定义的数据 格式
  2. 若兼容MQTT客户端,经由服务器分发到所有对应订阅者处只能是规规矩矩的PUBLISH消息,并且固定头部的RETAIN标志不能被设置成有效值1

授权

未经授权的发布者提交的PUBLISH消息,服务器会忽略掉,客户端不会被通知。

PUBACK

作为订阅者/服务器接收(QoS level = 1)PUBLISH消息之后对发送者的响应,整个消息不复杂。

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (4) DUP flag QoS flags RETAIN
    0 1 0 0 x x x x
byte 2   Remaining Length (2)
    0 0 0 0 0 0 1 0
Variable header/可变头部
Message Identifier
byte 1 Message ID MSB (0) 0 0 0 0 0 0 0 0
byte 2 Message ID LSB (10) 0 0 0 0 1 0 1 0

虽没有消息体,但可变头部附加一个16位的无符号short类型。

PUBREC

字面意思为Assured publish received,作为订阅者/服务器对QoS level = 2的发布PUBLISH消息的发送方的响应,确认已经收到,为QoS level = 2消息流的第二个消息。 和PUBACK相比,除了消息类型不同外,其它都是一样。

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (5) DUP flag QoS flags RETAIN
    0 1 0 1 x x x x
byte 2   Remaining Length (2)
    0 0 0 0 0 0 1 0
Variable header/可变头部
Message Identifier
byte 1 Message ID MSB (0) 0 0 0 0 0 0 0 0
byte 2 Message ID LSB (10) 0 0 0 0 1 0 1 0

无论是订阅者还是服务器,在消费PUBREC消息之后需要发送一个PUBREL消息给发送者(和PUBREC具有同样的消息ID),确认已收到。

PUBREL

Qos level = 2的协议流的第三个消息,有PUBLISH消息的发布者发送,参与方接收。完整示范如下:

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (6) DUP flag QoS flags RETAIN
    0 1 1 0 0 0 1 x
byte 2   Remaining Length (2)
    0 0 0 0 0 0 1 0
Variable header/可变头部
Message Identifier
byte 1 Message ID MSB (0) 0 0 0 0 0 0 0 0
byte 2 Message ID LSB (10) 0 0 0 0 1 0 1 0

QoS level 1,PUBREL消息要求如此。

DUP flag 为0,表示消息第一次被发送。

毫无疑问,剩余长度为2个byte长度。

可变头部中,消息ID和发布者接收到的PUBREC所包含的消息ID是一致的。

动作:

  1. 服务器接收到发布者(a)的PUBREL消息,此时服务器让发布者(a)刚才发布PUBLISH消息可用,发送此PUBLISH消息给所有订阅此主题的订阅者,然后发送PUBCOMP消息给发布者(a)
  2. 可变头部包含消息ID和服务器接收的PUBREL消息ID是一致的。 一个订阅者接收到PUBREL消息,订阅者使PUBLISH消息可用,然后反馈一个PUBCOMP消息给服务器

PUBCOMP

作为QoS level = 2消息流第四个,也是最后一个消息,由收到PUBREL的一方向另一方做出的响应消息。

完整的消息一览,和PUBREL一致,除了消息类型。

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (7) DUP flag QoS flags RETAIN
    0 1 1 1 x x x x
byte 2   Remaining Length (2)
    0 0 0 0 0 0 1 0
Variable header/可变头部
Message Identifier
byte 1 Message ID MSB (0) 0 0 0 0 0 0 0 0
byte 2 Message ID LSB (10) 0 0 0 0 1 0 1 0

当客户端接收一个PUBCOMP消息时,客户端摒弃原来的消息,因为它已经成功发送消息到服务器。

小结

消息的发布和确认等一些流程,主要是跟消息发布者所设定的QoS level有关;稍加整理,绘制了下面一张图,理解起来可能会更清晰些:

点击查看原图

上图针对的是客户端发布消息到服务器端的方向。

为了确保消息已经成功传递过去,只有收到了确认,才会让人特别放心。

在QoS level = 2时,通信双方都需要知道各自的确认流程以及所处阶段等,交互很多,数据量大的情况下,可能会造成数据线路传递拥塞。服务器选择QoS = 0/1,大部分情况都是可以应对的。 比如重要消息,就要确保对方都要收到,然后彼此确认,OK,这个消息是真实、有效的。

无论Qos level为0、1,还是2,服务器(具备所有条件都满足之后)总要把收到的具体内容和topic组装成一个新的PUBLISH Message(也不一定要重新构造,但要求推送的PUBLISH消息,一定要具有明确的主题和内容,RETAIN标志不能设置为1)推送到所有感兴趣的订阅者。

嗯,消息的发布来源别忘记还有可能来自CONNECT消息中的Will Topic和Will Message,若是设置了Will flag标记的话。

  

转自:http://www.blogjava.net/yongboy/archive/2014/02/10/409689.html

二.MQTT协议笔记之连接和心跳

前言

本篇会把连接(CONNECT)、心跳(PINGREQ/PINGRESP)、确认(CONNACK)、断开连接(DISCONNECT)和在一起。

CONNECT

像前面所说,MQTT有关字符串部分采用的修改版的UTF-8编码,CONNECT可变头部中协议名称、消息体都是采用修改版的UTF-8编码。前面基本上可变头部内容不多,下面是一个较为完整的CONNECT消息结构:

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部

  Message Type(1) DUP flag QoS level RETAIN
byte 1
  0 0 0 1 x x x x
byte 2 Remaining Length
Variable header/可变头部
Protocol Name
byte 1 Length MSB (0) 0 0 0 0 0 0 0 0
byte 2 Length LSB (6) 0 0 0 0 0 1 1 0
byte 3 'M' 0 1 0 0 1 1 0 1
byte 4 'Q' 0 1 0 1 0 0 0 1
byte 5 'I' 0 1 0 0 1 0 0 1
byte 6 's' 0 1 1 1 0 0 1 1
byte 7 'd' 0 1 1 0 0 1 0 0
byte 8 'p' 0 1 1 1 0 0 0 0
Protocol Version Number
byte 9 Version (3) 0 0 0 0 0 0 1 1
Connect Flags

User Name Flag Password Flag Will Retain Will QoS Will Flag Clean Session Reserved
byte 10
1 1 0 0 1 1 1 x
Keep Alive timer
byte 11 Keep Alive MSB (0) 0 0 0 0 0 0 0 0
byte 12 Keep Alive LSB (10) 0 0 0 0 1 0 1 0
Payload/消息体

Client Identifier(客户端ID)

1-23个字符长度,客户端到服务器的全局唯一标志,如果客户端ID超出23个字符长度,服务器需要返回码为2,标识符被拒绝响应的CONNACK消息。

处理QoS级别1和2的消息ID中,可以使用到。

必填项。

Will Topic

Will Flag值为1,这里便是Will Topic的内容。QoS级别通过Will QoS字段定义,RETAIN值通过Will RETAIN标识,都定义在可变头里面。

Will Message

Will Flag若设为1,这里便是Will Message定义消息的内容,对应的主题为Will Topic。如果客户端意外的断开触发服务器PUBLISH此消息。
长度有可能为0。

在CONNECT消息中的Will Message是UTF-8编码的,当被服务器发布时则作为二进制的消息体。

User Name

如果设置User Name标识,可以在此读取用户名称。一般可用于身份验证。协议建议用户名为不多于12个字符,不是必须。

Password

如果设置Password标识,便可读取用户密码。建议密码为12个字符或者更少,但不是必须。

可变头部

协议名称和协议版本都是固定的。

连接标志(Connect Flags)

一个字节表示,除了第1位是保留未使用,其它7位都具有不同含义。

业务上很重要,对消息总体流程影响很大,需要牢记。

Clean Session

0,表示如果订阅的客户机断线了,要保存为其要推送的消息(QoS为1和QoS为2),若其重新连接时,需将这些消息推送(若客户端长时间不连接,需要设置一个过期值)。 1,断线服务器即清理相关信息,重新连接上来之后,会再次订阅。

Will Flag

定义了客户端(没有主动发送DISCONNECT消息)出现网络异常导致连接中断的情况下,服务器需要做的一些措施。

简而言之,就是客户端预先定义好,在自己异常断开的情况下,所留下的最后遗愿(Last Will),也称之为遗嘱(Testament)。 这个遗嘱就是一个由客户端预先定义好的主题和对应消息,附加在CONNECT的可变头部中,在客户端连接出现异常的情况下,由服务器主动发布此消息。

只有在Will Flag位为1时,Will Qos和Will Retain才会被读取,此时消息体payload中要出现Will Topic和Will Message具体内容,否则,Will QoS和Will Retain值会被忽略掉。

Will Qos

两位表示,和PUBLISH消息固定头部的QoS level含义一样。这里先掠过,到PUBLISH消息再回过头来看看,会更明白些。

若标识了Will Flag值为1,那么Will QoS就会生效,否则会被忽略掉。

Will RETAIN

如果设置Will Flag,Will Retain标志就是有效的,否则它将被忽略。

当客户端意外断开服务器发布其Will Message之后,服务器是否应该继续保存。这个属性和PUBLISH固定头部的RETAIN标志含义一样,这里先掠过。

User name 和 password Flag:

用于授权,两者要么为0要么为1,否则都是无效。都为0,表示客户端可自由连接/订阅,都为1,表示连接/订阅需要授权。

Payload/消息体

消息体定义的消息顺序(如上表所示),约定俗成,不得更改,否则将可能引起混乱。

若Will Flag值为0,那么在payload中,Client Identifer后面就不会存在Will Topic和Will Message内容。

若User Name和Password都为0,意味着Payload/消息体中,找不到User Name和password的值,就算有,也是无效。标志决定着是否读取与否。

心跳时间(Keep Alive timer)

以秒为单位,定义服务器端从客户端接收消息的最大时间间隔。一般应用服务会在业务层次检测客户端网络是否连接,不是TCP/IP协议层面的心跳机制(比如开启SOCKET的SO_KEEPALIVE选项)。 一般来讲,在一个心跳间隔内,客户端发送一个PINGREQ消息到服务器,服务器返回PINGRESP消息,完成一次心跳交互,继而等待下一轮。若客户端没有收到心跳反馈,会关闭掉TCP/IP端口连接,离线。 16位两个字节,可看做一个无符号的short类型值。最大值,2^16-1 = 65535秒 = 18小时。最小值可以为0,表示客户端不断开。一般设为几分钟,比如微信心跳周期为300秒。

Will Message编码

Will Message在CONNECT Payload/息体中,使用UTF-8编码。假设内容为“abcd”,大概如下:

  Description 7 6 5 4 3 2 1 0
byte 1 Length MSB (0) 0 0 0 0 0 0 0 0
byte 2 Length LSB (4) 0 0 0 0 0 1 0 0
byte 3 'a' (0x61) 0 1 1 0 0 0 0 1
byte 4 'b' (0x62) 0 1 1 0 0 0 1 0
byte 5 'c' (0x63) 0 1 1 0 0 0 1 1
byte 6 'd' (0x64) 0 1 1 0 0 1 0 0

有一点需要记住,PUBLISH的Payload/消息体中以二进制编码保存。

某刻客户端异常关闭触发服务器会PUBLISH此消息。那么服务器会直接把byte3-byte6之间字符取出,保存为二进制,附加到PUBLISH消息体中,大概存储如下:

  Description 7 6 5 4 3 2 1 0
byte 1 'a' (0x61) 0 1 1 0 0 0 0 1
byte 2 'b' (0x62) 0 1 1 0 0 0 1 0
byte 3 'c' (0x63) 0 1 1 0 0 0 1 1
byte 4 'd' (0x64) 0 1 1 0 0 1 0 0

另外,MQTT 3.1协议对Will message的说明很容易引起误解,3.1.1草案已经得到修正。

相关说明:

http://mqtt.org/wiki/doku.php/willmessageutf8_support

https://tools.oasis-open.org/issues/browse/MQTT-2

连接异常中断通知机制

CONNECT消息一旦设置在可变头部设置了Will flag标记,那就启用了Last-Will-And-Testament特性,此特性很赞。

一旦客户端出现异常中断,便会触发服务器发布Will Message消息到Will Topic主题上去,通知Will Topic订阅者,对方因异常退出。

接收CONNECT后的响应动作

接收到CONNECT消息之后,服务器应该返回一个CONNACK消息作为响应:

  1. 若客户端绕过CONNECT消息直接发送其它类型消息,服务器应关闭此非法连接 若客户端发送CONNECT之后未收到CONNACT,需要关闭当前连接,然后重新连接
  2. 相同Client ID客户端已连接到服务器,先前客户端必须断开连接后,服务器才能完成新的客户端CONNECT连接 客户端发送无效非法CONNECT消息,服务器需要关闭

CONNACK

一个完整的CONNACK消息大致如下:

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (2) DUP flag QoS flags RETAIN
    0 0 1 0 x x x x
byte 2   Remaining Length (2)
    0 0 0 0 0 0 1 0
Variable header/可变头部
Topic Name Compression Response
byte 1 Reserved values. Not used. x x x x x x x x
Connect Return Code
byte 2 Return Code                

可变头部第一个字节为保留,无甚用处。第二个字节为连接握手返回码:

返回值 16进制 含义
0 0x00 Connection Accepted
1 0x01 Connection Refused: unacceptable protocol version
2 0x02 Connection Refused: identifier rejected
3 0x03 Connection Refused: server unavailable
4 0x04 Connection Refused: bad user name or password
5 0x05 Connection Refused: not authorized
6-255   Reserved for future use

只有0-5目前被使用到,其他值有待日后使用。一般返回值为0x00,表示连接建立。非法的请求,需要返回相应的数值。

从上面看出,一个CONNACT,四个字节表示。一个正常的CONNACT消息实际内容可能如下: 0x20 0x02 0x00 0x00

若是在私有协议中,两个字节就足够了。

很多时候,客户端和服务器端在没有消息传递时,会一直保持着连接。虽然不能依靠TCP心跳机制(比如SO_KEEPALIVE选项),业务层面定义心跳机制,会让连接状态检测、控制更为直观。

PINGREQ

由客户端发送到服务器端,证明自己还在一直连接着呢。两个字节,固定值。

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (12) DUP flag QoS flags RETAIN
    1 1 0 0 x x x x
byte 2   Remaining Length (0)
    0 0 0 0 0 0 0 0

客户端会在一个心跳周期内发送一条PINGREQ消息到服务器端。

心跳频率在CONNECT可变头部“Keep Alive timer”中定义时间,单位为秒,无符号16位short表示。

PINGRESP

服务器收到PINGREQ请求之后,会立即响应一个两个字节固定格式的PINGRESP消息。

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (13) DUP flag QoS flags RETAIN
    1 1 0 1 x x x x
byte 2   Remaining Length (0)
    0 0 0 0 0 0 0 0

服务器一般若在1.5倍的心跳周期内接收不到客户端发送的PINGREQ,可考虑关闭客户端的连接描述符。此时的关闭连接的行为和接收到客户端发送DISCONNECT消息的处理行为一致,但对客户端的订阅不会产生影响(不会清除客户端订阅数据),这个需要牢记。

若客户端发送PINGREQ之后的一个心跳周期内接收不到PINGRESP消息,可考虑关闭TCP/IP套接字连接。

DISCONNECT

客户端主动发送到服务器端,表明即将关闭TCP/IP连接。此时要求服务器要完整、干净的进行断开处理,不能仅仅类似于关闭连接描述符类似草草处理之。 需要两个字节,值固定:

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (14) DUP flag QoS flags RETAIN
    1 1 1 0 x x x x
byte 2   Remaining Length (0)
    0 0 0 0 0 0 0 0

服务器要根据先前此客户端在发送CONNECT消息可变头部Connect flag中的“Clean session flag”所设置值,再次复习一下:

  1. 值为0,服务器必须在客户端断开之后继续存储/保持客户端的订阅状态。这些状态包括:

    • 存储订阅的消息QoS1和QoS2消息
    • 正在发送消息期间连接丢失导致发送失败的消息
    • 以便当客户端重新连接时以上消息可以被重新传递。
  2. 值为1,服务器需要立刻清理连接状态数据。

有一点需要牢记,服务器在接收到客户端发送的DISCONNECT消息之后,需要主动关闭TCP/IP连接。

转自:http://www.blogjava.net/yongboy/archive/2014/02/09/409630.html

一.MQTT协议笔记之头部信息

前言

MQTT(Message Queue Telemetry Transport),遥测传输协议,提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:

1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2. 对负载内容屏蔽的消息传输。
3. 使用 TCP/IP 提供网络连接。
4. 有三种消息发布服务质量:
    “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    “至少一次”,确保消息到达,但消息重复可能会发生。
    “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
6. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

MQTT 3.1协议在线版本: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html

官方下载地址: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf

PDF版本,42页,不算多。

另外,目前MQTT大家都用在了手机推送,可能还有很多的使用方式,有待进一步的探索。

协议方面,以前曾简单实现过一点HTTP协议,基于HTTP上构建若干种通信管道的socket.io协议,不过socket.io 0.9版本的协议才两三页而已。面对领域不同,自然解决的方式也不一样。

阅读完毕MQTT协议,有一个想法,其实可以基于MQTT协议,打造更加私有、精简(协议一些地方,略显多余)的传输协议,比如一个字节的传输开销。有时间,会详细说一下。

固定头部

固定头部,使用两个字节,共16位:

bit 7 6 5 4 3 2 1 0
byte 1 Message Type DUP flag QoS level RETAIN
byte 2 Remaining Length

第一个字节(byte 1)

消息类型(4-7),使用4位二进制表示,可代表16种消息类型:

Mnemonic Enumeration Description
Reserved 0 Reserved
CONNECT 1 Client request to connect to Server
CONNACK 2 Connect Acknowledgment
PUBLISH 3 Publish message
PUBACK 4 Publish Acknowledgment
PUBREC 5 Publish Received (assured delivery part 1)
PUBREL 6 Publish Release (assured delivery part 2)
PUBCOMP 7 Publish Complete (assured delivery part 3)
SUBSCRIBE 8 Client Subscribe request
SUBACK 9 Subscribe Acknowledgment
UNSUBSCRIBE 10 Client Unsubscribe request
UNSUBACK 11 Unsubscribe Acknowledgment
PINGREQ 12 PING Request
PINGRESP 13 PING Response
DISCONNECT 14 Client is Disconnecting
Reserved 15 Reserved

除去0和15位置属于保留待用,共14种消息事件类型。

DUP flag(打开标志)

保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等。只适用于客户端或服务器端尝试重发PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要满足以下条件:

 当QoS > 0
 消息需要回复确认

此时,在可变头部需要包含消息ID。当值为1时,表示当前消息先前已经被传送过。

QoS(Quality of Service,服务质量)

使用两个二进制表示PUBLISH类型消息:

QoS value bit 2 bit 1 Description
0 0 0 至多一次 发完即丢弃 <=1
1 0 1 至少一次 需要确认回复 >=1
2 1 0 只有一次 需要确认回复 =1
3 1 1 待用,保留位置

RETAIN(保持)

仅针对PUBLISH消息。不同值,不同含义:

1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。

备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。

0:仅仅为当前订阅者推送此消息。

假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服务器可以删除掉对应的已被持久化的PUBLISH消息。

如何解析

因为java使用有符号(最高位为符号位)数据表示,byte范围:-128-127。该字节的最高位(左边第一位),可能为1。若直接转换为byte类型,会出现负数,这是一个雷区。DataInputStream提供了int readUnsignedByte()读取方式,请注意。下面演示了,如何从一个字节中,获取到所有定义的信息,同时绕过雷区:

public static void main(String[] args) {
    byte publishFixHeader = 50;// 0 0 1 1 0 0 1 0

    doGetBit(publishFixHeader);
    int ori = 224;//1110000,DISCONNECT ,Message Type (14)
    byte flag = (byte) ori; //有符号byte       
    doGetBit(flag);
    doGetBit_v2(ori);
}


public static void doGetBit(byte flags) {
    boolean retain = (flags & 1) > 0;
    int qosLevel = (flags & 0x06) >> 1;
    boolean dupFlag = (flags & 8) > 0;
    int messageType = (flags >> 4) & 0x0f;

    System.out.format(
            "Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
            messageType, dupFlag, qosLevel, retain);
}

public static void doGetBit_v2(int flags) {
    boolean retain = (flags & 1) > 0;
    int qosLevel = (flags & 0x06) >> 1;
    boolean dupFlag = (flags & 8) > 0;
    int messageType = flags >> 4;

    System.out.format(
            "Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
            messageType, dupFlag, qosLevel, retain);
}

处理Remaining Length(剩余长度)

在当前消息中剩余的byte(字节)数,包含可变头部和负荷(称之为内容/body,更为合适)。单个字节最大值:01111111,16进制:0x7F,10进制为127。单个字节为什么不能是11111111(0xFF)呢?因为MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。同时MQTT协议最多允许4个字节表示剩余长度。那么最大长度为:0xFF,0xFF,0xFF,0x7F,二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455 byte=261120KB=256MB=0.25GB 四个字节之间值的范围:

Digits From To
1 0 (0x00) 127 (0x7F)
2 128 (0x80, 0x01) 16 383 (0xFF, 0x7F)
3 16 384 (0x80, 0x80, 0x01) 2 097 151 (0xFF, 0xFF, 0x7F)
4 2 097 152 (0x80, 0x80, 0x80, 0x01) 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

如何换算成十进制呢 ? 使用java语言表示如下:

public static void main(String[] args) throws IOException {
    // 模拟客户端写入
   ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
   DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);
   dataOutputStream.write(0xff);
   dataOutputStream.write(0xff);
   dataOutputStream.write(0xff);
   dataOutputStream.write(0x7f);

   InputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());

    // 模拟服务器/客户端解析
   System. out.println( "result is " + bytes2Length(arrayInputStream));
}

/**
* 转化字节为 int类型长度
* @param in
* @return
* @throws IOException
*/
private static int bytes2Length(InputStream in) throws IOException {
    int multiplier = 1;
    int length = 0;
    int digit = 0;
    do {
        digit = in.read(); //一个字节的有符号或者无符号,转换转换为四个字节有符号 int类型
        length += (digit & 0x7f) * multiplier;
        multiplier *= 128;
   } while ((digit & 0x80) != 0);

    return length;
}

一般最后一个字节小于127(01111111),和0x80(10000000)进行&操作,最终结果都为0,因此计算会终止。代理中间件和请求者,中间传递的是字节流Stream,自然要从流中读取,逐一解析出来。

那么如何将int类型长度解析为不确定的字节值呢?

public static void main(String[] args) throws IOException {
    // 模拟服务器/客户端写入
   ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
   DataOutputStream dataOutputStream = new DataOutputStream(
             arrayOutputStream);

    // 模拟服务器/客户端解析
    length2Bytes(dataOutputStream, 128);
}

/**
* int类型长度解析为1-4个字节
* @param out
* @param length
* @throws IOException
*/
private static void length2Bytes(OutputStream out, int length)
         throws IOException {
    int val = length;
    do {
         int digit = val % 128;
        val = val / 128;
         if (val > 0)
             digit = digit | 0x80;

        out.write(digit);
   } while (val > 0);
}

digit对val求模,最大值可能是127,一旦127 | 10000000 = 11111111 = 0xff = 255 请注意:剩余长度,只在固定头部中,无论是一个字节,还是四个字节,不能被算作可变头部中。

可变头部

固定头部仅定义了消息类型和一些标志位,一些消息的元数据,需要放入可变头部中。可变头部内容字节长度 + Playload/负荷字节长度 = 剩余长度,这个是需要牢记的。可变头部,包含了协议名称,版本号,连接标志,用户授权,心跳时间等内容,这部分和后面要讲到的CONNECT消息类型,有重复,暂时略过。

Playload/消息体/负荷

消息体主要是为配合固定/可变头部命令(比如CONNECT可变头部User name标记若为1则需要在消息体中附加用户名称字符串)而存在。

CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息体。PUBLISH的消息体以二进制形式对待。

请记住,MQTT协议只允许在PUBLISH类型消息体中使用自定义特性,在固定/可变头部想加入自定义私有特性,就免了吧。这也是为了协议免于流于形式,变得很分裂也为了兼顾现有客户端等。比如支持压缩等,那就可以在Playload中定义数据支持,在应用中进行读取处理。

这部分会在后面详细论述。

消息标识符/消息ID

固定头中的QoS level标志值为1或2时才会在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可变头中出现。

一个16位无符号位的short类型值(值不能为 0,0做保留作为无效的消息ID),仅仅要求在一个特定方向(服务器发往客户端为一个方向,客户端发送到服务器端为另一个方向)的通信消息中必须唯一。比如客户端发往服务器,有可能存在服务器发往客户端会同时存在重复,但不碍事。

可变头部中,需要两个字节的顺序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻译成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左边/上面,表示这是一个大端字节/网络字节序,符合人的阅读习惯,高位在最左边。

bit 7 6 5 4 3 2 1 0
  Message Identifier MSB
  Message Identifier LSB

但凡如此表示的,都可以视为一个16位无符号short类型整数,两个字节表示。在JAVA中处理比较简单:

DataInputStream.readUnsignedShort

或者

in.read() * 0xFF + in.read();

最大长度可为: 65535

UTF-8编码

有关字符串,MQTT采用的是修改版的UTF-8编码,一般形式为如下,需要牢记:

bit 7 6 5 4 3 2 1 0
byte 1 String Length MSB
byte 2 String Length LSB
bytes 3 ... Encoded Character Data


比如AVA,使用writeUTF()方法写入一串文字“OTWP”,头两个字节为一个完整的无符号数字,代表字符串字节长度,后面四个字节才是字符串真正的长度,共六个字节:

bit 7 6 5 4 3 2 1 0
byte 1 Message Length MSB (0x00)
  0 0 0 0 0 0 0 0
byte 2 Message Length LSB (0x04)
  0 0 0 0 0 1 0 0
byte 3 'O' (0x4F)
  0 1 0 0 1 1 1 1
byte 4 'T' (0x54)
  0 1 0 1 0 1 0 0
byte 5 'W' (0x57)
  0 1 0 1 0 1 1 1
byte 6 'P' (0x50)
  0 1 0 1 0 0 0 0

这点,在程序中,可不用单独处理默认,直接使用readUTF()方法,可自动省去了处理字符串长度的麻烦。当然,可以手动读取字符串:

// 模拟写入
dataOutputStream.writeUTF( "abcd");// 2 + 4 = 6 byte
......
// 模拟读取 
int decodedLength = dataInputStream.readUnsignedShort();//2 byte
byte[] decodedString = new byte[decodedLength]; // 4 bytes
dataInputStream.read(decodedString);
String target = new String(decodedString, "UTF-8");

等同于:

String target = dataInputStream.readUTF();

MQTT无论是可变头部还是消息体中,只要是字符串部分,都是采用了修改版的UTF-8编码,读取和写入,借助DataInputStream/DataOutputStream的帮助,一行语句,略去了手动处理的麻烦。

小结

总之,掌握固定头部的QoS level、RETAIN标记、可变头部的Connect flags作用和意义,对总体理解MQTT作用很大。

 

转自:http://www.blogjava.net/yongboy/archive/2014/02/07/409587.html

 

mqtt笔记

MQTT(Message Queue Telemetry Transport),遥测传输协议,提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:

1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2. 对负载内容屏蔽的消息传输。
3. 使用 TCP/IP 提供网络连接。
4. 有三种消息发布服务质量:
    “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    “至少一次”,确保消息到达,但消息重复可能会发生。
    “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
6. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

MQTT 3.1协议在线版本: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html

官方下载地址: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf

PDF版本,42页,不算多。

目前MQTT大家都用在了手机推送,可能还有很多的使用方式,有待进一步的探索。

 

固定头部

固定头部,使用两个字节,共16位:

bit 7 6 5 4 3 2 1 0
byte 1 Message Type DUP flag QoS level RETAIN
byte 2 Remaining Length

第一个字节(byte 1)

消息类型(4-7),使用4位二进制表示,可代表16种消息类型:

Mnemonic Enumeration Description
Reserved 0 Reserved
CONNECT 1 Client request to connect to Server
CONNACK 2 Connect Acknowledgment
PUBLISH 3 Publish message
PUBACK 4 Publish Acknowledgment
PUBREC 5 Publish Received (assured delivery part 1)
PUBREL 6 Publish Release (assured delivery part 2)
PUBCOMP 7 Publish Complete (assured delivery part 3)
SUBSCRIBE 8 Client Subscribe request
SUBACK 9 Subscribe Acknowledgment
UNSUBSCRIBE 10 Client Unsubscribe request
UNSUBACK 11 Unsubscribe Acknowledgment
PINGREQ 12 PING Request
PINGRESP 13 PING Response
DISCONNECT 14 Client is Disconnecting
Reserved 15 Reserved

除去0和15位置属于保留待用,共14种消息事件类型。

DUP flag(打开标志)

保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等。只适用于客户端或服务器端尝试重发PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要满足以下条件:

 当QoS > 0
 消息需要回复确认

此时,在可变头部需要包含消息ID。当值为1时,表示当前消息先前已经被传送过。

QoS(Quality of Service,服务质量)

使用两个二进制表示PUBLISH类型消息:

QoS value bit 2 bit 1 Description
0 0 0 至多一次 发完即丢弃 <=1
1 0 1 至少一次 需要确认回复 >=1
2 1 0 只有一次 需要确认回复 =1
3 1 1 待用,保留位置

RETAIN(保持)

仅针对PUBLISH消息。不同值,不同含义:

1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。

备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。

0:仅仅为当前订阅者推送此消息。

假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服务器可以删除掉对应的已被持久化的PUBLISH消息。

Remaining Length(剩余长度)

在当前消息中剩余的byte(字节)数,包含可变头部和负荷(内容)。

单个字节最大值:01111111,16进制:0x7F,10进制为127。

MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。

MQTT协议最多允许4个字节表示剩余长度。最大长度为:0xFF,0xFF,0xFF,0x7F,二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455 byte=261120KB=256MB=0.25GB 四个字节之间值的范围:

Digits From To
1 0 (0x00) 127 (0x7F)
2 128 (0x80, 0x01) 16 383 (0xFF, 0x7F)
3 16 384 (0x80, 0x80, 0x01) 2 097 151 (0xFF, 0xFF, 0x7F)
4 2 097 152 (0x80, 0x80, 0x80, 0x01) 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

其实换个方式理解:第1字节的基数是1,而第2字节的基数:128,以此类推,第三字节的基数是:128*128=2的14次方,第四字节是:128*128*128=2的21次方;

例如,需要表达321=2*128+65.(2字节):10100001 0000 0011.

(和我们理解的低位运算放置顺序不一样,第一个字节是低位,后续字节是高位,但字节内部本身是低位右边,高位左边)。

可变头部

固定头部仅定义了消息类型和一些标志位,一些消息的元数据,需要放入可变头部中。可变头部内容字节长度 + Playload/负荷字节长度 = 剩余长度。

可变头部,包含了协议名称,版本号,连接标志,用户授权,心跳时间等内容。

可变头部居于固定头部和payload中间。

可变剩余长度(remaing length)不是可变头部的一部分,当然该长度值也是从可变头部开始计算,包含可变头部的长度+payload的长度。

可变头部的字段如下:

协议名称: MQTT CONNECT message. UTF编码:如 MQIsdp, capitalized.

协议版本:8位无符号,当前使用:3 (0x03),如下:

bit 7 6 5 4 3 2 1 0
  Protocol Version
  0 0 0 0 0 0 1 1

Connect flags

Clean session, Will, Will QoS, Retain flags 该字段的设置

一个字节表示,除了第1位是保留未使用,其它7位都具有不同含义。

业务上很重要,对消息总体流程影响很大,需要牢记。

 

Clean session flag

Position: bit 1 ,连接标志.

0:server需要存储client的订阅。包括存储Qos 1和2的订阅主题(当client重连时能将消息发送);当连接丢失的时候 服务器必须维护正在发送的消息的状态直到客户端重新连接到服务器。

1:server MUST忽略之前维护关于client的信息,并且将该connection当成clean的。server MUST 忽略任何client断开的状态。

 

原文翻译不能,所以参考了下一位大牛的表达:

0,表示如果订阅的客户机断线了,要保存为其要推送的消息(QoS为1和QoS为2),若其重新连接时,需将这些消息推送(若客户端长时间不连接,需要设置一个过期值)。 
1,断线服务器即清理相关信息,重新连接上来之后,会再次订阅。

Will Flag

定义了客户端(没有主动发送DISCONNECT消息)出现网络异常导致连接中断的情况下,服务器需要做的一些措施。

简而言之,就是客户端预先定义好,在自己异常断开的情况下,所留下的最后遗愿(Last Will),也称之为遗嘱(Testament)。 这个遗嘱就是一个由客户端预先定义好的主题和对应消息,附加在CONNECT的可变头部中,在客户端连接出现异常的情况下,由服务器主动发布此消息。

只有在Will Flag位为1时,Will Qos和Will Retain才会被读取,此时消息体Playload中要出现Will Topic和Will Message具体内容,否则,Will QoS和Will Retain值会被忽略掉。

Will Qos

两位表示,和PUBLISH消息固定头部的QoS level含义一样。

若标识了Will Flag值为1,那么Will QoS就会生效,否则会被忽略掉。

Will RETAIN

如果设置Will Flag,Will Retain标志就是有效的,否则它将被忽略。

当客户端意外断开服务器发布其Will Message之后,服务器是否应该继续保存。这个属性和PUBLISH固定头部的RETAIN标志含义一样,这里先掠过。

User name 和 password Flag:

用于授权,两者要么为0要么为1,否则都是无效。都为0,表示客户端可自由连接/订阅,都为1,表示连接/订阅需要授权。

 

bit 7 6 5 4 3 2 1 0
  User Name Flag Password Flag Will Retain Will QoS Will Flag Clean Session Reserved
  x x x x x x   x

 

Playload/消息体/负荷

消息体主要是为配合固定/可变头部命令(比如CONNECT可变头部User name标记若为1则需要在消息体中附加用户名称字符串)而存在。

CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息体。PUBLISH的消息体以二进制形式对待。

MQTT协议只允许在PUBLISH类型消息体中使用自定义特性,在固定/可变头部想加入自定义私有特性是不允许的。

这也是为了协议免于流于形式,变得很分裂也为了兼顾现有客户端等。比如支持压缩等,那就可以在Playload中定义数据支持,在应用中进行读取处理。

这部分会在后面详细论述。

消息标识符/消息ID

固定头中的QoS level标志值为1或2时才会在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可变头中出现。

一个16位无符号位的short类型值(值不能为 0,0做保留作为无效的消息ID),仅仅要求在一个特定方向(服务器发往客户端为一个方向,客户端发送到服务器端为另一个方向)的通信消息中必须唯一。比如客户端发往服务器,有可能存在服务器发往客户端会同时存在重复,但不碍事。

可变头部中,需要两个字节的顺序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻译成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左边/上面,表示这是一个大端字节/网络字节序,符合人的阅读习惯,高位在最左边。

bit 7 6 5 4 3 2 1 0
  Message Identifier MSB
  Message Identifier LSB

最大长度可为: 65535

UTF-8编码

有关字符串,MQTT采用的是修改版的UTF-8编码,一般形式为如下:

bit 7 6 5 4 3 2 1 0
byte 1 String Length MSB
byte 2 String Length LSB
bytes 3 ... Encoded Character Data

 

 最后的结构如下:

 

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
    Message Type(1) DUP flag QoS level RETAIN
byte 1
  0 0 0 1 x x x x
byte 2 Remaining Length
Variable header/可变头部
Protocol Name
byte 1 Length MSB (0) 0 0 0 0 0 0 0 0
byte 2 Length LSB (6) 0 0 0 0 0 1 1 0
byte 3 'M' 0 1 0 0 1 1 0 1
byte 4 'Q' 0 1 0 1 0 0 0 1
byte 5 'I' 0 1 0 0 1 0 0 1
byte 6 's' 0 1 1 1 0 0 1 1
byte 7 'd' 0 1 1 0 0 1 0 0
byte 8 'p' 0 1 1 1 0 0 0 0
Protocol Version Number
byte 9 Version (3) 0 0 0 0 0 0 1 1
Connect Flags
  User Name Flag Password Flag Will Retain Will QoS Will Flag Clean Session Reserved
byte 10
1 1 0 0 1 1 1 x
Keep Alive timer
byte 11 Keep Alive MSB (0) 0 0 0 0 0 0 0 0
byte 12 Keep Alive LSB (10) 0 0 0 0 1 0 1 0
Playload/消息体

Client Identifier(客户端ID)

1-23个字符长度,客户端到服务器的全局唯一标志,如果客户端ID超出23个字符长度,服务器需要返回码为2,标识符被拒绝响应的CONNACK消息。

处理QoS级别1和2的消息ID中,可以使用到。

必填项。

Will Topic

Will Flag值为1,这里便是Will Topic的内容。QoS级别通过Will QoS字段定义,RETAIN值通过Will RETAIN标识,都定义在可变头里面。

Will Message

Will Flag若设为1,这里便是Will Message定义消息的内容,对应的主题为Will Topic。如果客户端意外的断开触发服务器PUBLISH此消息。
长度有可能为0。
在CONNECT消息中的Will Message是UTF-8编码的,当被服务器发布时则作为二进制的消息体。

User Name

如果设置User Name标识,可以在此读取用户名称。一般可用于身份验证。协议建议用户名为不多于12个字符,不是必须。

Password

如果设置Password标识,便可读取用户密码。建议密码为12个字符或者更少,但不是必须。

 

心跳时间(Keep Alive timer)

以秒为单位,定义服务器端从客户端接收消息的最大时间间隔。一般应用服务会在业务层次检测客户端网络是否连接,不是TCP/IP协议层面的心跳机制(比如开启SOCKET的SO_KEEPALIVE选项)。 一般来讲,在一个心跳间隔内,客户端发送一个PINGREQ消息到服务器,服务器返回PINGRESP消息,完成一次心跳交互,继而等待下一轮。若客户端没有收到心跳反馈,会关闭掉TCP/IP端口连接,离线。 16位两个字节,可看做一个无符号的short类型值。最大值,2^16-1 = 65535秒 = 18小时。最小值可以为0,表示客户端不断开。一般设为几分钟,比如微信心跳周期为300秒。

Will Message编码

Will Message在CONNECT Payload/消息体中,使用UTF-8编码。假设内容为“abcd”,大概如下:

  Description 7 6 5 4 3 2 1 0
byte 1 Length MSB (0) 0 0 0 0 0 0 0 0
byte 2 Length LSB (4) 0 0 0 0 0 1 0 0
byte 3 'a' (0x61) 0 1 1 0 0 0 0 1
byte 4 'b' (0x62) 0 1 1 0 0 0 1 0
byte 5 'c' (0x63) 0 1 1 0 0 0 1 1
byte 6 'd' (0x64) 0 1 1 0 0 1 0 0

有一点需要记住,PUBLISH的Payload/消息体中以二进制编码保存。

某刻客户端异常关闭触发服务器会PUBLISH此消息。那么服务器会直接把byte3-byte6之间字符取出,保存为二进制,附加到PUBLISH消息体中,大概存储如下:

  Description 7 6 5 4 3 2 1 0
byte 1 'a' (0x61) 0 1 1 0 0 0 0 1
byte 2 'b' (0x62) 0 1 1 0 0 0 1 0
byte 3 'c' (0x63) 0 1 1 0 0 0 1 1
byte 4 'd' (0x64) 0 1 1 0 0 1 0 0

另外,MQTT 3.1协议对Will message的说明很容易引起误解,3.1.1草案已经得到修正。

相关说明:

http://mqtt.org/wiki/doku.php/willmessageutf8_support

https://tools.oasis-open.org/issues/browse/MQTT-2

连接异常中断通知机制

CONNECT消息一旦设置在可变头部设置了Will flag标记,那就启用了Last-Will-And-Testament特性,此特性很赞。

一旦客户端出现异常中断,便会触发服务器发布Will Message消息到Will Topic主题上去,通知Will Topic订阅者,对方因异常退出。

接收CONNECT后的响应动作

接收到CONNECT消息之后,服务器应该返回一个CONNACK消息作为响应:

  1. 若客户端绕过CONNECT消息直接发送其它类型消息,服务器应关闭此非法连接 若客户端发送CONNECT之后未收到CONNACT,需要关闭当前连接,然后重新连接
  2. 相同Client ID客户端已连接到服务器,先前客户端必须断开连接后,服务器才能完成新的客户端CONNECT连接 客户端发送无效非法CONNECT消息,服务器需要关闭

CONNACK

一个完整的CONNACK消息大致如下:

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (2) DUP flag QoS flags RETAIN
    0 0 1 0 x x x x
byte 2   Remaining Length (2)
    0 0 0 0 0 0 1 0
Variable header/可变头部
Topic Name Compression Response
byte 1 Reserved values. Not used. x x x x x x x x
Connect Return Code
byte 2 Return Code                

可变头部第一个字节为保留,无甚用处。第二个字节为连接握手返回码:

返回值 16进制 含义
0 0x00 Connection Accepted
1 0x01 Connection Refused: unacceptable protocol version
2 0x02 Connection Refused: identifier rejected
3 0x03 Connection Refused: server unavailable
4 0x04 Connection Refused: bad user name or password
5 0x05 Connection Refused: not authorized
6-255   Reserved for future use

只有0-5目前被使用到,其他值有待日后使用。一般返回值为0x00,表示连接建立。非法的请求,需要返回相应的数值。

从上面看出,一个CONNACT,四个字节表示。一个正常的CONNACT消息实际内容可能如下: 0x20 0x02 0x00 0x00

若是在私有协议中,两个字节就足够了。

很多时候,客户端和服务器端在没有消息传递时,会一直保持着连接。虽然不能依靠TCP心跳机制(比如SO_KEEPALIVE选项),业务层面定义心跳机制,会让连接状态检测、控制更为直观。

 

PINGREQ

由客户端发送到服务器端,证明自己还在一直连接着呢。两个字节,固定值。

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (12) DUP flag QoS flags RETAIN
    1 1 0 0 x x x x
byte 2   Remaining Length (0)
    0 0 0 0 0 0 0 0

客户端会在一个心跳周期内发送一条PINGREQ消息到服务器端。

心跳频率在CONNECT可变头部“Keep Alive timer”中定义时间,单位为秒,无符号16位short表示。

PINGRESP

服务器收到PINGREQ请求之后,会立即响应一个两个字节固定格式的PINGRESP消息。

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (13) DUP flag QoS flags RETAIN
    1 1 0 1 x x x x
byte 2   Remaining Length (0)
    0 0 0 0 0 0 0 0

服务器一般若在1.5倍的心跳周期内接收不到客户端发送的PINGREQ,可考虑关闭客户端的连接描述符。此时的关闭连接的行为和接收到客户端发送DISCONNECT消息的处理行为一致,但对客户端的订阅不会产生影响(不会清除客户端订阅数据),这个需要牢记。

若客户端发送PINGREQ之后的一个心跳周期内接收不到PINGRESP消息,可考虑关闭TCP/IP套接字连接。

DISCONNECT

客户端主动发送到服务器端,表明即将关闭TCP/IP连接。此时要求服务器要完整、干净的进行断开处理,不能仅仅类似于关闭连接描述符类似草草处理之。 需要两个字节,值固定:

  Description 7 6 5 4 3 2 1 0
Fixed header/固定头部
byte 1   Message type (14) DUP flag QoS flags RETAIN
    1 1 1 0 x x x x
byte 2   Remaining Length (0)
    0 0 0 0 0 0 0 0

服务器要根据先前此客户端在发送CONNECT消息可变头部Connect flag中的“Clean session flag”所设置值,再次复习一下:

  1. 值为0,服务器必须在客户端断开之后继续存储/保持客户端的订阅状态。这些状态包括:

    • 存储订阅的消息QoS1和QoS2消息
    • 正在发送消息期间连接丢失导致发送失败的消息
    • 以便当客户端重新连接时以上消息可以被重新传递。
  2. 值为1,服务器需要立刻清理连接状态数据。

有一点需要牢记,服务器在接收到客户端发送的DISCONNECT消息之后,需要主动关闭TCP/IP连接。

转自: http://www.cnblogs.com/leeying/p/3791077.html

MQTT协议

MQTT - MQ Telemetry Transport

  • 轻量级的 machine-to-machine 通信协议。
  • publish/subscribe模式。
  • 基于TCP/IP。
  • 支持QoS。
  • 适合于低带宽、不可靠连接、嵌入式设备、CPU内存资源紧张。
  • 是一种比较不错的Android消息推送方案。
  • FacebookMessenger采用了MQTT。
  • MQTT有可能成为物联网的重要协议。

消息体

 
点击查看原图

MessageType

点击查看原图
CONNECT
TCP连接建立完毕后,Client向Server发出一个Request。
如果一段时间内接收不到Server的Response,则关闭socket,重新建立一个session连接。
如果一个ClientID已经与服务器连接,则持有同样ClientID的旧有连接必须由服务器关闭后,新建立才能建立。
CONNACK
Server发出Response响应。
0x00 Connection Accepted

0x01 Connection Refused: unacceptable protocol version

0x02 Connection Refused: identifier rejected

0x03 Connection Refused: server unavailable

0x04 Connection Refused: bad user name or password

0x05 Connection Refused: not authorized
PUBLISH 发布消息
Client/Servier均可以进行PUBLISH。
publish message 应该包含一个TopicName(Subject/Channel),即订阅关键词。
关于Topic通配符
/:用来表示层次,比如a/b,a/b/c。
#:表示匹配>=0个层次,比如a/#就匹配a/,a/b,a/b/c。
单独的一个#表示匹配所有。
不允许 a#和a/#/c。
+:表示匹配一个层次,例如a/+匹配a/b,a/c,不匹配a/b/c。
单独的一个+是允许的,a+不允许,a/+/b不允许
PUBACK 发布消息后的确认
QoS=1时,Server向Client发布该确认(Client收到确认后删除),订阅者向Server发布确认。
PUBREC / PUBREL / PUBCOMP
QoS=2时
1. Server->Client发布PUBREC(已收到);
2. Client->Server发布PUBREL(已释放);
3. Server->Client发布PUBCOMP(已完成),Client删除msg;
订阅者也会向Server发布类似过程确认。
PINGREQ / PINGRES 心跳
Client有责任发送KeepAliveTime时长告诉给Server。在一个时长内,发送PINGREQ,Server发送PINGRES确认。
Server在1.5个时长内未收到PINGREQ,就断开连接。
Client在1个时长内未收到PINGRES,断开连接。
一般来说,时长设置为几个分钟。最大18hours,0表示一直未断开。

QoS

点击查看原图
QoS=0:最多一次,有可能重复或丢失。
QoS=1:至少一次,有可能重复。
Client[Qos=1,DUP=0/*重复次数*/,MessageId=x] --->PUBLISH--> Server收到后,存储Message,发布,删除,向Client回发PUBACK
Client收到PUBACK后,删除Message;如果未收到PUBACK,设置DUP++,重新发送,Server端重新发布,所以有可能重复发送消息。
QoS=2:只有一次,确保消息只到达一次(用于比较严格的计费系统)。

Clean Session

如果为false(flag=0),Client断开连接后,Server应该保存Client的订阅信息。
如果为true(flag=1),表示Server应该立刻丢弃任何会话状态信息。

QoS图例:

QoS = 0: 至多一次,不保证消息到达,可能会丢失或重复。server没有response。

QoS = 1: 至少一次,确保消息到达,但是可能会有重复。server向client发送PUBACK(Publish Acknowledgement)


QoS = 2: 只有一次,确保消息到达并只有一次。消息类型有PUBREC(Publish Received 已收到)、PUBREL(Publish Released 已释放)、PUBCOMP(Publish Completed 已完成)。