Skip to content

Commit a03db15

Browse files
author
于家鹏
committed
support mqtt3.1
-)remain length support >1 byte -)swicth protocol from 3 to 4
1 parent 1c95cd9 commit a03db15

File tree

2 files changed

+48
-21
lines changed

2 files changed

+48
-21
lines changed

MQTTClient_AS3/src/com/godpaper/mqtt/as3/core/MQTT_Protocol.as

+36-7
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ package com.godpaper.mqtt.as3.core
9292
protected var qos:uint;
9393
protected var retain:uint;
9494
protected var remainingLength:uint;
95+
protected var remainPosition:uint;
9596

9697
///* stores the will of the client {willFlag,willQos,willRetainFlag} */
9798
public static var WILL:Array;
@@ -198,7 +199,20 @@ package com.godpaper.mqtt.as3.core
198199

199200
this.position = 0;
200201
this.writeByte(value);
201-
this.writeByte(remainingLength);
202+
var le:uint = remainingLength;
203+
var digit:uint = 0;
204+
do {
205+
digit = le % 128;
206+
le = le / 128;
207+
if(le > 0 ){
208+
digit = digit| 0x80;
209+
}
210+
this.writeByte(digit);
211+
212+
}while(le > 0);
213+
this.remainPosition = this.position;
214+
215+
//this.writeByte(remainingLength);
202216
this.readBytes(fixHead);
203217

204218
type = value & 0xF0;
@@ -207,6 +221,7 @@ package com.godpaper.mqtt.as3.core
207221
retain = value & 0x01;
208222
}
209223

224+
210225
public function writeMessageValue(value:*):void//Variable Head
211226
{
212227
this.position = 2;
@@ -220,11 +235,25 @@ package com.godpaper.mqtt.as3.core
220235
this.position = 0;
221236
this.writeType(input.readUnsignedByte());
222237
//get VarHead and Payload use RemainingLength
223-
remainingLength = input.readUnsignedByte();
238+
239+
//remainingLength = input.readUnsignedByte();
224240

225-
input.readBytes(this, 2, remainingLength);
226-
serialize();
241+
//input.readBytes(this, 2, remainingLength);
242+
243+
var multiplier :uint = 1;
244+
var remainLength:uint = 0;
245+
do
246+
{
247+
var le:uint = input.readUnsignedByte();
248+
remainLength += (le & 127) * multiplier;
249+
multiplier *= 128;
250+
251+
}
252+
while ((le & 128) != 0);
253+
remainingLength = remainLength;
227254
writeMessageType( type + (dup << 3) + (qos << 1) + retain );
255+
input.readBytes(this, this.remainPosition, remainingLength);
256+
serialize();
228257
}
229258

230259
public function readMessageType():ByteArray
@@ -256,9 +285,9 @@ package com.godpaper.mqtt.as3.core
256285
payLoad = new ByteArray();
257286

258287
this.position = 0;
259-
this.readBytes(fixHead, 0, 2);
288+
this.readBytes(fixHead, 0, this.remainPosition);
260289

261-
this.position = 2;
290+
this.position = this.remainPosition;
262291
switch( type ){
263292
case CONNECT://Remaining Length is the length of the variable header (12 bytes) and the length of the Payload
264293
this.readBytes(varHead, 0 , 12);
@@ -268,7 +297,7 @@ package com.godpaper.mqtt.as3.core
268297
break;
269298
case PUBLISH://Remaining Length is the length of the variable header plus the length of the payload
270299
var index:int = (this.readUnsignedByte() << 8) + this.readUnsignedByte();//the length of variable header
271-
this.position = 2;
300+
this.position = this.remainPosition;
272301
this.readBytes(varHead, 0 , index + (qos?4:2));
273302
this.readBytes(payLoad);
274303

MQTTClient_AS3/src/com/godpaper/mqtt/as3/impl/MQTTSocket.as

+12-14
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ package com.godpaper.mqtt.as3.impl
217217
*
218218
*/
219219
// public function MQTTSocket(host:String=null, port:int=1883, topicname:String=null, clientid:String=null, username:String=null, password:String=null,willRetain:Boolean=true,willQos:Boolean=true,willFlag:Boolean=true,cleanSession:Boolean=true)
220-
public function MQTTSocket(host:String=null, port:int=1883,username:String=null, password:String=null, topicname:String=null, clientid:String=null, will:Boolean=true,cleanSession:Boolean=true)
220+
public function MQTTSocket(host:String=null, port:int=1883,username:String=null, password:String=null, topicname:String=null, clientid:String=null, will:Boolean=false,cleanSession:Boolean=false)
221221
{
222222
//parameters store
223223
if (host)
@@ -483,14 +483,12 @@ package com.godpaper.mqtt.as3.impl
483483
this.connectMessage=new MQTT_Protocol();
484484
var bytes:ByteArray=new ByteArray();
485485
bytes.writeByte(0x00); //0
486-
bytes.writeByte(0x06); //6
486+
bytes.writeByte(0x04); //6
487487
bytes.writeByte(0x4d); //M
488488
bytes.writeByte(0x51); //Q
489-
bytes.writeByte(0x49); //I
490-
bytes.writeByte(0x73); //S
491-
bytes.writeByte(0x64); //D
492-
bytes.writeByte(0x70); //P
493-
bytes.writeByte(0x03); //Protocol version = 3
489+
bytes.writeByte(0x54); //T
490+
bytes.writeByte(0x54); //T
491+
bytes.writeByte(0x04); //Protocol version = 3
494492
//Connect flags
495493
var type:int=0;
496494
if (cleanSession)
@@ -707,20 +705,20 @@ package com.godpaper.mqtt.as3.impl
707705
//Variable header
708706
//Payload
709707
//Actions
710-
var varHead:ByteArray=packet.readMessageValue();
708+
var varHead:ByteArray = packet.readMessageValue();
711709
var length:uint = (varHead.readUnsignedByte() << 8) + varHead.readUnsignedByte();
712710
var topicName:String = varHead.readMultiByte(length, "utf");
713711
if( packet.readQoS() ){
714712
var messageId:uint = (varHead.readUnsignedByte() << 8) + varHead.readUnsignedByte();
715713
LOG.info("Publish Message ID {0}", messageId);
716714
}
717715
var payLoad:ByteArray = packet.readPayLoad();
718-
length = (payLoad.readUnsignedByte() << 8) + payLoad.readUnsignedByte();
719-
if( length > payLoad.length ){
720-
length = payLoad.length;
721-
payLoad.position = 0;
722-
}
723-
var topicContent:String = payLoad.readMultiByte(length, "utf");
716+
//length = (payLoad.readUnsignedByte() << 8) + payLoad.readUnsignedByte();
717+
//if( length > payLoad.length ){
718+
// length = payLoad.length;
719+
// payLoad.position = 0;
720+
//}
721+
var topicContent:String = payLoad.readMultiByte(payLoad.length, "utf-8");
724722

725723
LOG.info("Publish TopicName {0}", topicName);
726724
LOG.info("Publish TopicContent {0}", topicContent);

0 commit comments

Comments
 (0)