MQTT协议实现Eclipse Paho学习总结
一、概述
遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。例如,但不仅限于此:
- 网络代价昂贵,带宽低、不可靠。
- 在嵌入设备中运行,处理器和内存资源有限。
该协议的特点有:
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
- 对负载内容屏蔽的消息传输。
- 使用 TCP/IP 提供网络连接。
- 有三种消息发布服务质量:
- “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- “至少一次”,确保消息到达,但消息重复可能会发生。
- “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
- 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
- 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
二、MQTT协议实现Eclipse Paho
org.eclipse.paho.client.mqttv3.internal:看看单词internal你可能就猜到了,没错,这就是第一个包的主要功能实现,这个包有承上启下的功能,首先对第一包提供功能的实现,其次调用剩下包中的类以实现MQTT协议的规定。
三、MQTT协议的报文类别
3.1 MQTT协议规定报文
当一个从客户端到服务器的TCP/IP套接字连接被建立时,必须用一个连接流来创建一个协议级别的会话。
2.连接请求确认(CONNECTACK)
连接请求确认报文(CONNECTACK)是服务器发给客户端,用以确认客户端的连接请求
3.发布报文(PUBLISH)
客户端发布报文到服务器端,用来提供给有着不同需求的订阅者们。每个发布的报文都有一个主题,这是一个分层的命名空间,他定义了报文来源分类,方便订阅者订阅他们需要的主题。订阅者们可以注册自己的需要的报文类别。
4.发布确认报文(PUBACK)
发布确认报文(PUBACK)是对服务质量级别为1的发布报文的应答。他可以是服务器对发布报文的客户端的报文确认,也可以是报文订阅者对发布报文的服务器的应答。
5.发布确认报文(PUBREC)
PUBREC报文是对服务质量级别为2的发布报文的应答。这是服务质量级别为2的协议流的第二个报文。PUBREC是由服务器端对发布报文的客户端的应答,或者是报文订阅者对发布报文的服务器的应答。
6.发布确认报文(PUBREL)
PUBREL是报文发布者对来自服务器的PUBREC报文的确认,或者是服务器对来自报文订阅者的PUBREC报文的确认。它是服务质量级别为2的协议流的第三个报文。
7.确定发布完成(PUBCOMP)
PUBCOMP报文是服务器对报文发布者的PUBREL报文的应答,或者是报文订阅者对服务器的PUBREL报文的应答。它是服务质量级别为2的协议流的第四个也是最后一个报文。
8.订阅命名的主题(SUBSCRIBE)
订阅报文(SUBSCRIBE)允许一个客户端在服务器上注册一个或多个感兴趣的主题名字。发布给这些主题的报文作为发布报文从服务器端交付给客户端。订阅报文也描述了订阅者想要收到的发布报文的服务质量等级。
9. 订阅报文确认(SUBACK)
当服务器收到客户端发来的订阅报文时,将发送订阅报文的确认报文给客户端。一个这样的确认报文包含一列被授予的服务质量等级。被授予的服务质量等级次序和对应的订阅报文中的主题名称的次序相符。
10. 退订命名的主题(UNSUBSCRIBE)
退订主题的报文是从客户端发往服务器端,用以退订命名的主题。
11. 退订确认(UNSUBACK)
退订确认报文是从服务器发往客户端,用以确认客户端发来的退订请求报文。
12. Ping请求(PINGREQ)
Ping请求报文是从连接的客户端发往服务器端,用来询问服务器端是否还存在。
13. Ping应答(PINGRESP)
Ping应答报文是从服务器端发往Ping请求的客户端,对客户端的Ping请求进行确认。
14. 断开通知(DISCONNECT)
断开通知报文是从客户端发往服务器端用来指明将要关闭它的TCP/IP连接,他允许彻底地断开,而非只是下线。如果客户端已经和干净会话标志集联系,那么所有先前关于客户端维护的信息将被丢弃。一个服务器在收到断开报文之后,不能依赖客户端关闭TCP/IP连接。
3.2 Eclipse Paho的对报文的实现
3.3 心跳包
- public class MqttPingReq extends MqttWireMessage {
- public MqttPingReq() {
- super(MqttWireMessage.MESSAGE_TYPE_PINGREQ);
- }
- /**
- * Returns <code>false</code> as message IDs are not required for MQTT
- * PINGREQ messages.
- */
- public boolean isMessageIdRequired() {
- return false;
- }
- protected byte[] getVariableHeader() throws MqttException {
- return new byte[0];
- }
- protected byte getMessageInfo() {
- return 0;
- }
- public String getKey() {
- return new String("Ping");
- }
- }
- /**
- * Writes an <code>MqttWireMessage</code> to the stream.
- */
- public void write(MqttWireMessage message) throws IOException, MqttException {
- byte[] bytes = message.getHeader();
- byte[] pl = message.getPayload();
- // out.write(message.getHeader());
- // out.write(message.getPayload());
- out.write(bytes,0,bytes.length);
- out.write(pl,0,pl.length);
- }
- protected byte[] getVariableHeader() throws MqttException {
- return new byte[0];
- }
- public byte[] getHeader() throws MqttException {
- if (encodedHeader == null) {
- try {
- int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);
- byte[] varHeader = getVariableHeader();
- int remLen = varHeader.length + getPayload().length;
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- dos.writeByte(first);//1个字节
- dos.write(encodeMBI(remLen));//1个字节
- dos.write(varHeader);//0个字节
- dos.flush();
- encodedHeader = baos.toByteArray();
- } catch(IOException ioe) {
- throw new MqttException(ioe);
- }
- }
- return encodedHeader;
- }
- /**
- * Sub-classes should override this method to supply the payload bytes.
- */
- public byte[] getPayload() throws MqttException {
- return new byte[0];//0个字节
- }
也就是说MQTT的心跳包只有2个字节!
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。