1
1
package com .acuity .iot .dsa .dslink .protocol .requester ;
2
2
3
3
import org .iot .dsa .node .DSElement ;
4
+ import org .iot .dsa .node .DSNull ;
4
5
import org .iot .dsa .node .DSStatus ;
5
6
import org .iot .dsa .time .DSDateTime ;
6
7
9
10
*
10
11
* @author Daniel Shapiro, Aaron Hansen
11
12
*/
12
- class DSOutboundSubscribeStubs {
13
+ class DSOutboundSubscription {
13
14
14
15
///////////////////////////////////////////////////////////////////////////
15
16
// Instance Fields
@@ -21,7 +22,7 @@ class DSOutboundSubscribeStubs {
21
22
private DSDateTime lastTs ;
22
23
private DSElement lastValue ;
23
24
private String path ;
24
- private int qos = 0 ;
25
+ private int qos = - 1 ;
25
26
private Integer sid ;
26
27
private int size ;
27
28
private State state = State .PENDING_SUBSCRIBE ;
@@ -31,7 +32,7 @@ class DSOutboundSubscribeStubs {
31
32
// Constructors
32
33
///////////////////////////////////////////////////////////////////////////
33
34
34
- public DSOutboundSubscribeStubs (
35
+ public DSOutboundSubscription (
35
36
String path ,
36
37
DSOutboundSubscriptions subscriptions ) {
37
38
this .path = path ;
@@ -74,7 +75,7 @@ public void onDisconnect() {
74
75
}
75
76
}
76
77
77
- public DSOutboundSubscribeStubs setSid (Integer sid ) {
78
+ public DSOutboundSubscription setSid (Integer sid ) {
78
79
this .sid = sid ;
79
80
return this ;
80
81
}
@@ -87,30 +88,36 @@ public DSOutboundSubscribeStubs setSid(Integer sid) {
87
88
* If already subscribed, will pass the last update to the new subscriber.
88
89
*/
89
90
void add (DSOutboundSubscribeStub stub ) {
91
+ int prevQos = qos ;
90
92
if (stub .getQos () > qos ) {
91
93
qos = stub .getQos ();
92
94
}
93
95
if (contains (stub )) {
96
+ if (qos > prevQos ) {
97
+ getSubscriptions ().sendSubscribe (this );
98
+ }
94
99
return ;
95
100
}
96
- stub .setStubs (this );
101
+ stub .setSub (this );
97
102
if (last == null ) {
98
103
first = stub ;
99
104
last = stub ;
100
105
} else {
101
106
last .setNext (stub );
102
107
last = stub ;
103
108
}
104
- //Send the last update to the new subscription
105
109
if (++size > 1 ) {
106
110
if (lastValue != null ) {
107
111
try {
108
- stub .process (lastTs , lastValue , lastStatus );
112
+ stub .update (lastTs , lastValue , lastStatus );
109
113
} catch (Exception x ) {
110
114
subscriptions .error (path , x );
111
115
}
112
116
}
113
117
}
118
+ if (qos > prevQos ) { //need to resubscribe for new qos
119
+ getSubscriptions ().sendSubscribe (this );
120
+ }
114
121
}
115
122
116
123
private boolean contains (DSOutboundSubscribeStub stub ) {
@@ -140,21 +147,6 @@ private DSOutboundSubscribeStub predecessor(DSOutboundSubscribeStub stub) {
140
147
return cur ;
141
148
}
142
149
143
- void process (DSDateTime ts , DSElement value , DSStatus status ) {
144
- DSOutboundSubscribeStub stub = first ;
145
- while (stub != null ) {
146
- try {
147
- stub .process (ts , value , status );
148
- } catch (Exception x ) {
149
- subscriptions .error (path , x );
150
- }
151
- stub = stub .getNext ();
152
- }
153
- lastTs = ts ;
154
- lastValue = value ;
155
- lastStatus = status ;
156
- }
157
-
158
150
void remove (DSOutboundSubscribeStub stub ) {
159
151
DSOutboundSubscribeStub pred = predecessor (stub );
160
152
if (pred == last ) { //not contained
@@ -173,6 +165,19 @@ void remove(DSOutboundSubscribeStub stub) {
173
165
}
174
166
if (--size == 0 ) {
175
167
subscriptions .unsubscribe (this );
168
+ } else {
169
+ stub = first ;
170
+ int max = 0 ;
171
+ while (stub != null ) {
172
+ if (stub .getQos () > max ) {
173
+ max = stub .getQos ();
174
+ }
175
+ stub = stub .getNext ();
176
+ }
177
+ if (max != qos ) {
178
+ qos = max ;
179
+ getSubscriptions ().sendSubscribe (this );
180
+ }
176
181
}
177
182
}
178
183
@@ -184,6 +189,37 @@ int size() {
184
189
return size ;
185
190
}
186
191
192
+ void update (DSDateTime ts , DSElement value , DSStatus status ) {
193
+ DSOutboundSubscribeStub stub = first ;
194
+ while (stub != null ) {
195
+ try {
196
+ stub .update (ts , value , status );
197
+ } catch (Exception x ) {
198
+ subscriptions .error (path , x );
199
+ }
200
+ stub = stub .getNext ();
201
+ }
202
+ lastTs = ts ;
203
+ lastValue = value ;
204
+ lastStatus = status ;
205
+ }
206
+
207
+ void updateDisconnected () {
208
+ if (lastStatus == DSStatus .unknown ) {
209
+ return ;
210
+ }
211
+ lastStatus = DSStatus .unknown ;
212
+ lastTs = DSDateTime .currentTime ();
213
+ if (lastValue == null ) {
214
+ lastValue = DSNull .NULL ;
215
+ }
216
+ DSOutboundSubscribeStub cur = first ;
217
+ while (cur .getNext () != null ) {
218
+ cur .update (lastTs , lastValue , lastStatus );
219
+ cur = cur .getNext ();
220
+ }
221
+ }
222
+
187
223
///////////////////////////////////////////////////////////////////////////
188
224
// Inner Classes
189
225
///////////////////////////////////////////////////////////////////////////
0 commit comments