1
- import { Observable } from '../Observable' ;
2
- import { MonoTypeOperatorFunction } from '../types' ;
1
+ import { InteropObservable , MonoTypeOperatorFunction , ObservableInput } from '../types' ;
3
2
import { operate } from '../util/lift' ;
4
3
import { createOperatorSubscriber } from './OperatorSubscriber' ;
5
4
import { noop } from '../util/noop' ;
5
+ import { isInteropObservable } from '../util/isInteropObservable' ;
6
+ import { fromInteropObservable } from '../observable/innerFrom' ;
6
7
7
8
/**
8
9
* Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
@@ -57,11 +58,11 @@ import { noop } from '../util/noop';
57
58
* @see {@link distinctUntilKeyChanged }
58
59
*
59
60
* @param {function } [keySelector] Optional function to select which value you want to check as distinct.
60
- * @param {Observable } [flushes] Optional Observable for flushing the internal HashSet of the operator.
61
+ * @param {ObservableInput } [flushes] Optional ObservableInput for flushing the internal HashSet of the operator.
61
62
* @return A function that returns an Observable that emits items from the
62
63
* source Observable with distinct values.
63
64
*/
64
- export function distinct < T , K > ( keySelector ?: ( value : T ) => K , flushes ?: Observable < any > ) : MonoTypeOperatorFunction < T > {
65
+ export function distinct < T , K > ( keySelector ?: ( value : T ) => K , flushes ?: ObservableInput < any > ) : MonoTypeOperatorFunction < T > {
65
66
return operate ( ( source , subscriber ) => {
66
67
const distinctKeys = new Set ( ) ;
67
68
source . subscribe (
@@ -74,6 +75,11 @@ export function distinct<T, K>(keySelector?: (value: T) => K, flushes?: Observab
74
75
} )
75
76
) ;
76
77
77
- flushes ?. subscribe ( createOperatorSubscriber ( subscriber , ( ) => distinctKeys . clear ( ) , noop ) ) ;
78
+ let flush$ = flushes as InteropObservable < any > | ObservableInput < any > | undefined | any ;
79
+ if ( flushes && isInteropObservable ( flushes ) ) {
80
+ flush$ = fromInteropObservable ( flushes ) ;
81
+ }
82
+
83
+ flush$ ?. subscribe ( createOperatorSubscriber ( subscriber , ( ) => distinctKeys . clear ( ) , noop ) ) ;
78
84
} ) ;
79
85
}
0 commit comments