@@ -3,6 +3,7 @@ import { distinct, mergeMap, take } from 'rxjs/operators';
3
3
import { TestScheduler } from 'rxjs/testing' ;
4
4
import { of , Observable } from 'rxjs' ;
5
5
import { observableMatcher } from '../helpers/observableMatcher' ;
6
+ import { asInteropObservable } from '../helpers/interop-helper' ;
6
7
7
8
/** @test {distinct} */
8
9
describe ( 'distinct' , ( ) => {
@@ -218,6 +219,20 @@ describe('distinct', () => {
218
219
} ) ;
219
220
} ) ;
220
221
222
+ it ( 'should support a flushing an interop observable stream' , ( ) => {
223
+ testScheduler . run ( ( { hot, expectObservable, expectSubscriptions } ) => {
224
+ const e1 = hot ( ' --a--b--a--b--a--b--|' ) ;
225
+ const e1subs = ' ^-------------------!' ;
226
+ const e2 = hot ( ' -----------x--------|' ) ;
227
+ const e2subs = ' ^-------------------!' ;
228
+ const expected = '--a--b--------a--b--|' ;
229
+
230
+ expectObservable ( e1 . pipe ( distinct ( undefined , asInteropObservable ( e2 ) ) ) ) . toBe ( expected ) ;
231
+ expectSubscriptions ( e1 . subscriptions ) . toBe ( e1subs ) ;
232
+ expectSubscriptions ( e2 . subscriptions ) . toBe ( e2subs ) ;
233
+ } ) ;
234
+ } ) ;
235
+
221
236
it ( 'should raise error if flush raises error' , ( ) => {
222
237
testScheduler . run ( ( { hot, expectObservable, expectSubscriptions } ) => {
223
238
const e1 = hot ( ' --a--b--a--b--a--b--|' ) ;
@@ -232,6 +247,20 @@ describe('distinct', () => {
232
247
} ) ;
233
248
} ) ;
234
249
250
+ it ( 'should raise error if interop flush raises error' , ( ) => {
251
+ testScheduler . run ( ( { hot, expectObservable, expectSubscriptions } ) => {
252
+ const e1 = hot ( ' --a--b--a--b--a--b--|' ) ;
253
+ const e1subs = ' ^------------! ' ;
254
+ const e2 = hot ( ' -----------x-# ' ) ;
255
+ const e2subs = ' ^------------! ' ;
256
+ const expected = '--a--b-------# ' ;
257
+
258
+ expectObservable ( e1 . pipe ( distinct ( undefined , asInteropObservable ( e2 ) ) ) ) . toBe ( expected ) ;
259
+ expectSubscriptions ( e1 . subscriptions ) . toBe ( e1subs ) ;
260
+ expectSubscriptions ( e2 . subscriptions ) . toBe ( e2subs ) ;
261
+ } ) ;
262
+ } ) ;
263
+
235
264
it ( 'should unsubscribe from the flushing stream when the main stream is unsubbed' , ( ) => {
236
265
testScheduler . run ( ( { hot, expectObservable, expectSubscriptions } ) => {
237
266
const e1 = hot ( ' --a--b--a--b--a--b--|' ) ;
@@ -247,6 +276,21 @@ describe('distinct', () => {
247
276
} ) ;
248
277
} ) ;
249
278
279
+ it ( 'should unsubscribe from the flushing interop stream when the main stream is unsubbed' , ( ) => {
280
+ testScheduler . run ( ( { hot, expectObservable, expectSubscriptions } ) => {
281
+ const e1 = hot ( ' --a--b--a--b--a--b--|' ) ;
282
+ const e1subs = ' ^----------! ' ;
283
+ const e2 = hot ( ' -----------x--------|' ) ;
284
+ const e2subs = ' ^----------! ' ;
285
+ const unsub = ' -----------! ' ;
286
+ const expected = '--a--b------ ' ;
287
+
288
+ expectObservable ( e1 . pipe ( distinct ( undefined , asInteropObservable ( e2 ) ) ) , unsub ) . toBe ( expected ) ;
289
+ expectSubscriptions ( e1 . subscriptions ) . toBe ( e1subs ) ;
290
+ expectSubscriptions ( e2 . subscriptions ) . toBe ( e2subs ) ;
291
+ } ) ;
292
+ } ) ;
293
+
250
294
it ( 'should allow opting in to default comparator with flush' , ( ) => {
251
295
testScheduler . run ( ( { hot, expectObservable, expectSubscriptions } ) => {
252
296
const e1 = hot ( ' --a--b--a--b--a--b--|' ) ;
@@ -261,6 +305,20 @@ describe('distinct', () => {
261
305
} ) ;
262
306
} ) ;
263
307
308
+ it ( 'should allow opting in to default comparator with interop flush' , ( ) => {
309
+ testScheduler . run ( ( { hot, expectObservable, expectSubscriptions } ) => {
310
+ const e1 = hot ( ' --a--b--a--b--a--b--|' ) ;
311
+ const e1subs = ' ^-------------------!' ;
312
+ const e2 = hot ( ' -----------x--------|' ) ;
313
+ const e2subs = ' ^-------------------!' ;
314
+ const expected = '--a--b--------a--b--|' ;
315
+
316
+ expectObservable ( e1 . pipe ( distinct ( undefined , asInteropObservable ( e2 ) ) ) ) . toBe ( expected ) ;
317
+ expectSubscriptions ( e1 . subscriptions ) . toBe ( e1subs ) ;
318
+ expectSubscriptions ( e2 . subscriptions ) . toBe ( e2subs ) ;
319
+ } ) ;
320
+ } ) ;
321
+
264
322
it ( 'should stop listening to a synchronous observable when unsubscribed' , ( ) => {
265
323
const sideEffects : number [ ] = [ ] ;
266
324
const synchronousObservable = new Observable < number > ( ( subscriber ) => {
0 commit comments