diff --git a/MQTTnet.Core/Serializer/ByteReader.cs b/MQTTnet.Core/Serializer/ByteReader.cs index 935c777a9..ce7f709d5 100644 --- a/MQTTnet.Core/Serializer/ByteReader.cs +++ b/MQTTnet.Core/Serializer/ByteReader.cs @@ -24,7 +24,7 @@ public bool Read() return result; } - public byte Read(int count) + public int Read(int count) { if (_index + count > 8) { @@ -42,7 +42,7 @@ public byte Read(int count) _index++; } - return (byte)result; + return result; } } } diff --git a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs b/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs index ffac41b91..4100b6cb5 100644 --- a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs +++ b/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs @@ -16,86 +16,72 @@ public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel dest if (packet == null) throw new ArgumentNullException(nameof(packet)); if (destination == null) throw new ArgumentNullException(nameof(destination)); - var connectPacket = packet as MqttConnectPacket; - if (connectPacket != null) + if (packet is MqttConnectPacket connectPacket) { return SerializeAsync(connectPacket, destination); } - var connAckPacket = packet as MqttConnAckPacket; - if (connAckPacket != null) + if (packet is MqttConnAckPacket connAckPacket) { return SerializeAsync(connAckPacket, destination); } - var disconnectPacket = packet as MqttDisconnectPacket; - if (disconnectPacket != null) + if (packet is MqttDisconnectPacket disconnectPacket) { return SerializeAsync(disconnectPacket, destination); } - var pingReqPacket = packet as MqttPingReqPacket; - if (pingReqPacket != null) + if (packet is MqttPingReqPacket pingReqPacket) { return SerializeAsync(pingReqPacket, destination); } - var pingRespPacket = packet as MqttPingRespPacket; - if (pingRespPacket != null) + if (packet is MqttPingRespPacket pingRespPacket) { return SerializeAsync(pingRespPacket, destination); } - var publishPacket = packet as MqttPublishPacket; - if (publishPacket != null) + if (packet is MqttPublishPacket publishPacket) { return SerializeAsync(publishPacket, destination); } - var pubAckPacket = packet as MqttPubAckPacket; - if (pubAckPacket != null) + if (packet is MqttPubAckPacket pubAckPacket) { return SerializeAsync(pubAckPacket, destination); } - var pubRecPacket = packet as MqttPubRecPacket; - if (pubRecPacket != null) + if (packet is MqttPubRecPacket pubRecPacket) { return SerializeAsync(pubRecPacket, destination); } - var pubRelPacket = packet as MqttPubRelPacket; - if (pubRelPacket != null) + if (packet is MqttPubRelPacket pubRelPacket) { return SerializeAsync(pubRelPacket, destination); } - var pubCompPacket = packet as MqttPubCompPacket; - if (pubCompPacket != null) + if (packet is MqttPubCompPacket pubCompPacket) { return SerializeAsync(pubCompPacket, destination); } - var subscribePacket = packet as MqttSubscribePacket; - if (subscribePacket != null) + if (packet is MqttSubscribePacket subscribePacket) { return SerializeAsync(subscribePacket, destination); } - var subAckPacket = packet as MqttSubAckPacket; - if (subAckPacket != null) + if (packet is MqttSubAckPacket subAckPacket) { return SerializeAsync(subAckPacket, destination); } - var unsubscribePacket = packet as MqttUnsubscribePacket; - if (unsubscribePacket != null) + if (packet is MqttUnsubscribePacket unsubscribePacket) { return SerializeAsync(unsubscribePacket, destination); } - var unsubAckPacket = packet as MqttUnsubAckPacket; - if (unsubAckPacket != null) + if (packet is MqttUnsubAckPacket unsubAckPacket) { return SerializeAsync(unsubAckPacket, destination); } @@ -206,7 +192,7 @@ public async Task DeserializeAsync(IMqttCommunicationChannel sou } } - private async Task DeserializeUnsubscribeAsync(MqttPacketReader reader) + private static async Task DeserializeUnsubscribeAsync(MqttPacketReader reader) { var packet = new MqttUnsubscribePacket { @@ -221,7 +207,7 @@ private async Task DeserializeUnsubscribeAsync(MqttPacketReader return packet; } - private async Task DeserializeSubscribeAsync(MqttPacketReader reader) + private static async Task DeserializeSubscribeAsync(MqttPacketReader reader) { var packet = new MqttSubscribePacket { @@ -238,7 +224,7 @@ await reader.ReadRemainingDataStringWithLengthPrefixAsync(), return packet; } - private async Task DeserializePublishAsync(MqttPacketReader reader) + private static async Task DeserializePublishAsync(MqttPacketReader reader) { var fixedHeader = new ByteReader(reader.FixedHeader); var retain = fixedHeader.Read(); @@ -266,13 +252,10 @@ private async Task DeserializePublishAsync(MqttPacketReader read return packet; } - private async Task DeserializeConnectAsync(MqttPacketReader reader) + private static async Task DeserializeConnectAsync(MqttPacketReader reader) { - var packet = new MqttConnectPacket(); - - await reader.ReadRemainingDataByteAsync(); - await reader.ReadRemainingDataByteAsync(); - + await reader.ReadRemainingDataAsync(2); // Skip 2 bytes + var protocolName = await reader.ReadRemainingDataAsync(4); if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT") @@ -285,7 +268,12 @@ private async Task DeserializeConnectAsync(MqttPacketReader read var connectFlagsReader = new ByteReader(connectFlags); connectFlagsReader.Read(); // Reserved. - packet.CleanSession = connectFlagsReader.Read(); + + var packet = new MqttConnectPacket + { + CleanSession = connectFlagsReader.Read() + }; + var willFlag = connectFlagsReader.Read(); var willQoS = connectFlagsReader.Read(2); var willRetain = connectFlagsReader.Read(); @@ -318,7 +306,7 @@ await reader.ReadRemainingDataWithLengthPrefixAsync(), return packet; } - private async Task DeserializeSubAck(MqttPacketReader reader) + private static async Task DeserializeSubAck(MqttPacketReader reader) { var packet = new MqttSubAckPacket { @@ -333,7 +321,7 @@ private async Task DeserializeSubAck(MqttPacketReader reader) return packet; } - private async Task DeserializeConnAck(MqttPacketReader reader) + private static async Task DeserializeConnAck(MqttPacketReader reader) { var variableHeader1 = await reader.ReadRemainingDataByteAsync(); var variableHeader2 = await reader.ReadRemainingDataByteAsync(); @@ -347,7 +335,7 @@ private async Task DeserializeConnAck(MqttPacketReader reader) return packet; } - private void ValidateConnectPacket(MqttConnectPacket packet) + private static void ValidateConnectPacket(MqttConnectPacket packet) { if (string.IsNullOrEmpty(packet.ClientId) && !packet.CleanSession) { @@ -355,7 +343,7 @@ private void ValidateConnectPacket(MqttConnectPacket packet) } } - private void ValidatePublishPacket(MqttPublishPacket packet) + private static void ValidatePublishPacket(MqttPublishPacket packet) { if (packet.QualityOfServiceLevel == 0 && packet.Dup) { @@ -365,7 +353,7 @@ private void ValidatePublishPacket(MqttPublishPacket packet) private static readonly byte[] MqttPrefix = Encoding.UTF8.GetBytes("MQTT"); - private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) { ValidateConnectPacket(packet); @@ -420,7 +408,7 @@ private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel } } - private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -435,22 +423,22 @@ private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel } } - private Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination) { return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination); } - private Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination) { return SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination); } - private Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination) { return SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination); } - private Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination) { ValidatePublishPacket(packet); @@ -485,7 +473,7 @@ private Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel } } - private Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -496,7 +484,7 @@ private Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel d } } - private Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -507,7 +495,7 @@ private Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel d } } - private async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination) + private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -518,7 +506,7 @@ private async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationCha } } - private Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -529,7 +517,7 @@ private Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel } } - private Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -549,7 +537,7 @@ private Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChanne } } - private Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -568,7 +556,7 @@ private Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel d } } - private Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -587,7 +575,7 @@ private Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChan } } - private Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination) + private static Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -598,7 +586,7 @@ private Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel } } - private Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination) + private static Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { diff --git a/MQTTnet.Core/Serializer/MqttPacketWriter.cs b/MQTTnet.Core/Serializer/MqttPacketWriter.cs index eb9bfcf07..4e2a3cb7f 100644 --- a/MQTTnet.Core/Serializer/MqttPacketWriter.cs +++ b/MQTTnet.Core/Serializer/MqttPacketWriter.cs @@ -11,39 +11,6 @@ public sealed class MqttPacketWriter : IDisposable { private readonly MemoryStream _buffer = new MemoryStream(512); - public void InjectFixedHeader(byte fixedHeader) - { - if (_buffer.Length == 0) - { - Write(fixedHeader); - Write(0); - return; - } - - var backupBuffer = _buffer.ToArray(); - var remainingLength = (int)_buffer.Length; - - _buffer.SetLength(0); - - _buffer.WriteByte(fixedHeader); - - // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. - var x = remainingLength; - do - { - var encodedByte = x % 128; - x = x / 128; - if (x > 0) - { - encodedByte = encodedByte | 128; - } - - _buffer.WriteByte((byte)encodedByte); - } while (x > 0); - - _buffer.Write(backupBuffer, 0, backupBuffer.Length); - } - public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0) { var fixedHeader = (byte)((byte)packetType << 4); @@ -101,5 +68,38 @@ public void Dispose() { _buffer?.Dispose(); } + + private void InjectFixedHeader(byte fixedHeader) + { + if (_buffer.Length == 0) + { + Write(fixedHeader); + Write(0); + return; + } + + var backupBuffer = _buffer.ToArray(); + var remainingLength = (int)_buffer.Length; + + _buffer.SetLength(0); + + _buffer.WriteByte(fixedHeader); + + // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. + var x = remainingLength; + do + { + var encodedByte = x % 128; + x = x / 128; + if (x > 0) + { + encodedByte = encodedByte | 128; + } + + _buffer.WriteByte((byte)encodedByte); + } while (x > 0); + + _buffer.Write(backupBuffer, 0, backupBuffer.Length); + } } }