diff --git a/mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java b/mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java index 9aadb95..0e9366f 100644 --- a/mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java +++ b/mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java @@ -51,10 +51,7 @@ import java.net.InetSocketAddress; import java.net.ProtocolException; import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -800,6 +797,30 @@ private void completeRequest(short id, byte originalType, Object arg) { } } + private void completeRequestSubAck(short id, byte[] grantedQos) throws ProtocolException { + Request request = requests.remove(id); + if( request!=null ) { + assert SUBSCRIBE.TYPE==request.frame.messageType(); + if(request.cb!=null) { + ArrayList rejectedTopics = new ArrayList(grantedQos.length); + for (int i = 0; i < grantedQos.length; i++) { + if(SUBACK.isFailureQos(grantedQos[i])) { + rejectedTopics.add(new SUBSCRIBE().decode(request.frame).topics()[i].toString()); + } + } + + if(!rejectedTopics.isEmpty()) { + ((Callback)request.cb).onFailure(new ProtocolException("Server rejected subscribe to: " + Arrays.toString(rejectedTopics.toArray()))); + } + if(rejectedTopics.size() < grantedQos.length) { + ((Callback) request.cb).onSuccess(grantedQos); + } + } + } else { + handleFatalFailure(new ProtocolException("Command from server contained an invalid message id: " + id)); + } + } + private void processFrame(MQTTFrame frame) { try { switch(frame.messageType()) { @@ -838,7 +859,7 @@ private void processFrame(MQTTFrame frame) { } case SUBACK.TYPE: { SUBACK ack = new SUBACK().decode(frame); - completeRequest(ack.messageId(), SUBSCRIBE.TYPE, ack.grantedQos()); + completeRequestSubAck(ack.messageId(), ack.grantedQos()); break; } case UNSUBACK.TYPE: { diff --git a/mqtt-client/src/main/java/org/fusesource/mqtt/codec/SUBACK.java b/mqtt-client/src/main/java/org/fusesource/mqtt/codec/SUBACK.java index ca10e0f..9c7471f 100644 --- a/mqtt-client/src/main/java/org/fusesource/mqtt/codec/SUBACK.java +++ b/mqtt-client/src/main/java/org/fusesource/mqtt/codec/SUBACK.java @@ -37,6 +37,8 @@ public class SUBACK implements Message { public static final byte[] NO_GRANTED_QOS = new byte[0]; public static final byte TYPE = 9; + private static final byte FAILURE_QOS = (byte)0x80; + private short messageId; private byte[] grantedQos = NO_GRANTED_QOS; @@ -70,6 +72,10 @@ public byte[] grantedQos() { return grantedQos; } + public static boolean isFailureQos(byte grantedQos) { + return grantedQos == FAILURE_QOS; + } + public SUBACK grantedQos(byte[] grantedQos) { this.grantedQos = grantedQos; return this;