Skip to content

Commit 6c00d8b

Browse files
feat(fp): add fp operators
generates the new operators using the existing prototype operators, and a script to generate the resulting fp types.
1 parent 7276d7e commit 6c00d8b

16 files changed

+359
-112
lines changed

package.json

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@
5656
"commitmsg": "validate-commit-msg",
5757
"info": "npm-scripts-info",
5858
"build_all": "npm-run-all build_cjs build_global generate_packages",
59-
"build_cjs": "npm-run-all clean_dist_cjs copy_src_cjs compile_dist_cjs",
60-
"build_es6": "npm-run-all clean_dist_es6 copy_src_es6 compile_module_es6",
61-
"build_es6_for_docs": "npm-run-all clean_dist_es6 copy_src_es6 compile_dist_es6_for_docs",
59+
"build_cjs": "npm-run-all clean_dist_cjs copy_src_cjs generate_fp_cjs compile_dist_cjs",
60+
"build_es6": "npm-run-all clean_dist_es6 copy_src_es6 generate_fp_es6 compile_module_es6",
61+
"build_es6_for_docs": "npm-run-all clean_dist_es6 copy_src_es6 generate_fp_es6 compile_dist_es6_for_docs",
6262
"build_closure_core": "node ./tools/make-closure-core.js",
6363
"build_global": "npm-run-all clean_dist_global build_es6 && mkdirp ./dist/global && node ./tools/make-umd-bundle.js && npm-run-all build_closure_core clean_dist_es6",
6464
"build_perf": "webdriver-manager update && npm-run-all build_cjs build_global perf",
@@ -75,13 +75,15 @@
7575
"copy_src_cjs": "mkdirp ./dist/cjs/src && shx cp -r ./src/* ./dist/cjs/src",
7676
"copy_src_es6": "mkdirp ./dist/es6/src && shx cp -r ./src/* ./dist/es6/src",
7777
"commit": "git-cz",
78-
"compile_dist_cjs": "tsc ./dist/cjs/src/Rx.ts ./dist/cjs/src/add/observable/of.ts -m commonjs --lib es5,es2015.iterable,es2015.collection,es2015.promise,dom --sourceMap --outDir ./dist/cjs --target ES5 -d --diagnostics --pretty --noImplicitAny --noImplicitReturns --noImplicitThis --suppressImplicitAnyIndexErrors --moduleResolution node",
79-
"compile_module_es6": "tsc ./dist/es6/src/Rx.ts ./dist/es6/src/add/observable/of.ts -m es2015 --sourceMap --outDir ./dist/es6 --target ES5 -d --diagnostics --pretty --noImplicitAny --noImplicitReturns --noImplicitThis --suppressImplicitAnyIndexErrors --moduleResolution node --noEmitHelpers --lib es5,es2015.iterable,es2015.collection,es2015.promise,dom ",
80-
"compile_dist_es6_for_docs": "tsc ./dist/es6/src/Rx.ts ./dist/es6/src/add/observable/of.ts ./dist/es6/src/MiscJSDoc.ts -m es2015 --sourceMap --outDir ./dist/es6 --target ES6 -d --diagnostics --pretty --noImplicitAny --noImplicitReturns --noImplicitThis --suppressImplicitAnyIndexErrors --moduleResolution node",
78+
"compile_dist_cjs": "tsc ./dist/cjs/src/Rx.ts ./dist/cjs/src/fp.ts ./dist/cjs/src/pipe.ts ./dist/cjs/src/add/observable/of.ts -m commonjs --lib es5,es2015.iterable,es2015.collection,es2015.promise,dom --sourceMap --outDir ./dist/cjs --target ES5 -d --diagnostics --pretty --noImplicitAny --noImplicitReturns --noImplicitThis --suppressImplicitAnyIndexErrors --moduleResolution node",
79+
"compile_module_es6": "tsc ./dist/es6/src/Rx.ts ./dist/es6/src/fp.ts ./dist/es6/src/pipe.ts ./dist/es6/src/add/observable/of.ts -m es2015 --sourceMap --outDir ./dist/es6 --target ES5 -d --diagnostics --pretty --noImplicitAny --noImplicitReturns --noImplicitThis --suppressImplicitAnyIndexErrors --moduleResolution node --noEmitHelpers --lib es5,es2015.iterable,es2015.collection,es2015.promise,dom ",
80+
"compile_dist_es6_for_docs": "tsc ./dist/es6/src/Rx.ts ./dist/es6/src/fp.ts ./dist/es6/src/pipe.ts ./dist/es6/src/add/observable/of.ts ./dist/es6/src/MiscJSDoc.ts -m es2015 --sourceMap --outDir ./dist/es6 --target ES6 -d --diagnostics --pretty --noImplicitAny --noImplicitReturns --noImplicitThis --suppressImplicitAnyIndexErrors --moduleResolution node",
8181
"cover": "shx rm -rf dist/cjs && tsc src/Rx.ts src/add/observable/of.ts -m commonjs --lib es5,es2015.iterable,es2015.collection,es2015.promise,dom --outDir dist/cjs --sourceMap --target ES5 -d && nyc --reporter=lcov --reporter=html --exclude=spec/support/**/* --exclude=spec-js/**/* --exclude=node_modules mocha --opts spec/support/default.opts spec-js",
8282
"decision_tree_widget": "cd doc/decision-tree-widget && npm run build && cd ../..",
8383
"doctoc": "doctoc CONTRIBUTING.md",
8484
"generate_packages": "node .make-packages.js",
85+
"generate_fp_cjs": "node tools/make-fp.js ./dist/cjs",
86+
"generate_fp_es6": "node tools/make-fp.js ./dist/es6",
8587
"lint_perf": "eslint perf/",
8688
"lint_spec": "tslint -c tslint.json \"spec/**/*.ts\"",
8789
"lint_src": "tslint -c tslint.json \"src/**/*.ts\"",
@@ -204,4 +206,4 @@
204206
"dependencies": {
205207
"symbol-observable": "^1.0.1"
206208
}
207-
}
209+
}

spec/Observable-spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ describe('Observable.create', () => {
656656

657657
describe('pipe', () => {
658658
it('should exist', () => {
659-
expect(Observable.prototype.pipe).to.exist;
659+
expect(Observable.prototype.let).to.exist;
660660
});
661661

662662
it('should pipe through multiple functions', () => {
@@ -666,7 +666,7 @@ describe('Observable.create', () => {
666666
const ob4 = Observable.of(4);
667667
const results = [];
668668

669-
const piped = ob1.pipe(
669+
const piped = ob1.let(
670670
s => {
671671
expect(s).to.equal(ob1);
672672
return ob2;

spec/fp/map-spec.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
2+
import * as fp from '../../dist/cjs/fp';
3+
import { Observable } from '../../dist/cjs/Observable';
4+
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports
5+
6+
declare const { asDiagram };
7+
declare const hot: typeof marbleTestingSignature.hot;
8+
declare const cold: typeof marbleTestingSignature.cold;
9+
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
10+
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;
11+
12+
describe('map-let-fp', () => {
13+
it('should let through the map operator', () => {
14+
const s1 = cold( '---a--b--c--|');
15+
const project = x => x + x;
16+
expectObservable(fp.map(s1, project)).toBe(s1.map(project));
17+
});
18+
19+
it('should work with Observable.prototype.let', () => {
20+
const values = {
21+
a: 1,
22+
b: 2,
23+
c: 3,
24+
x: 1,
25+
y: 1.5,
26+
z: 2
27+
};
28+
29+
const s1: Observable<number> = cold( '---a---b---c---|', values);
30+
const expected = '---x---y---z---|';
31+
const { map } = fp;
32+
33+
const t1 = s1.let(
34+
o => map(o, x => x + 1),
35+
o => map(o, x => x / 2)
36+
);
37+
38+
expectObservable(t1).toBe(expected, values);
39+
});
40+
});

spec/operators/groupBy-spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {expect} from 'chai';
22
import * as Rx from '../../dist/cjs/Rx';
3-
import {GroupedObservable} from '../../dist/cjs/operator/groupBy';
3+
import {GroupedObservable} from '../../dist/cjs/observable/GroupedObservable';
44
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports
55

66
declare const { asDiagram };

spec/pipe/map-spec.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11

2-
import * as Rx from '../../dist/cjs/Rx';
2+
import * as pipe from '../../dist/cjs/pipe';
3+
import { Observable } from '../../dist/cjs/Observable';
34
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports
45

56
declare const { asDiagram };
@@ -8,14 +9,14 @@ declare const cold: typeof marbleTestingSignature.cold;
89
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
910
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;
1011

11-
describe('map-pipe', () => {
12-
it('should pipe through the map operator', () => {
12+
describe('map-let-pipe', () => {
13+
it('should let through the map operator', () => {
1314
const s1 = cold( '---a--b--c--|');
1415
const project = x => x + x;
15-
expectObservable(Rx.Pipe.map(project)(s1)).toBe(s1.map(project));
16+
expectObservable(pipe.map(project)(s1)).toBe(s1.map(project));
1617
});
1718

18-
it('should work with Observable.prototype.pipe', () => {
19+
it('should work with Observable.prototype.let', () => {
1920
const values = {
2021
a: 1,
2122
b: 2,
@@ -27,9 +28,9 @@ describe('map-pipe', () => {
2728

2829
const s1 = cold( '---a---b---c---|', values);
2930
const expected = '---x---y---z---|';
30-
const { map } = Rx.Pipe;
31+
const { map } = pipe;
3132

32-
const t1 = s1.pipe(
33+
const t1 = s1.let(
3334
map((x: number) => x + 1),
3435
map((x: number) => x / 2)
3536
);

src/Observable.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import { toSubscriber } from './util/toSubscriber';
77
import { IfObservable } from './observable/IfObservable';
88
import { ErrorObservable } from './observable/ErrorObservable';
99
import { observable as Symbol_observable } from './symbol/observable';
10-
import { compose } from './util/compose';
1110
import { letProto } from './operator/let';
1211

1312
export interface Subscribable<T> {
@@ -171,11 +170,6 @@ export class Observable<T> implements Subscribable<T> {
171170
});
172171
}
173172

174-
pipe(...fns: ((x: Observable<T>) => Observable<T>)[]): Observable<T> {
175-
const composed = compose.apply(this, fns);
176-
return composed(this);
177-
}
178-
179173
protected _subscribe(subscriber: Subscriber<any>): TeardownLogic {
180174
return this.source.subscribe(subscriber);
181175
}

src/Rx.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -176,16 +176,6 @@ import { observable } from './symbol/observable';
176176
/* tslint:enable:no-unused-variable */
177177
export { compose } from './util/compose';
178178

179-
// pull in pipe definitions
180-
import { map as mapPipe } from './pipe/map';
181-
182-
/**
183-
* @typedef {Object} Rx.Pipe
184-
*/
185-
let Pipe = {
186-
map: mapPipe
187-
};
188-
189179
/**
190180
* @typedef {Object} Rx.Scheduler
191181
* @property {Scheduler} queue Schedules on a queue in the current event frame
@@ -228,6 +218,5 @@ let Symbol = {
228218
export {
229219
Scheduler,
230220
Symbol,
231-
Pipe,
232221
Observable
233222
};

src/observable/GroupedObservable.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { Subject } from '../Subject';
2+
import { Observable } from '../Observable';
3+
import { Subscriber } from '../Subscriber';
4+
import { Subscription } from '../Subscription';
5+
6+
export interface RefCountSubscription {
7+
count: number;
8+
unsubscribe: () => void;
9+
closed: boolean;
10+
attemptedToUnsubscribe: boolean;
11+
}
12+
13+
/**
14+
* An Observable representing values belonging to the same group represented by
15+
* a common key. The values emitted by a GroupedObservable come from the source
16+
* Observable. The common key is available as the field `key` on a
17+
* GroupedObservable instance.
18+
*
19+
* @class GroupedObservable<K, T>
20+
*/
21+
export class GroupedObservable<K, T> extends Observable<T> {
22+
constructor(public key: K,
23+
private groupSubject: Subject<T>,
24+
private refCountSubscription?: RefCountSubscription) {
25+
super();
26+
}
27+
28+
protected _subscribe(subscriber: Subscriber<T>) {
29+
const subscription = new Subscription();
30+
const {refCountSubscription, groupSubject} = this;
31+
if (refCountSubscription && !refCountSubscription.closed) {
32+
subscription.add(new InnerRefCountSubscription(refCountSubscription));
33+
}
34+
subscription.add(groupSubject.subscribe(subscriber));
35+
return subscription;
36+
}
37+
}
38+
39+
/**
40+
* We need this JSDoc comment for affecting ESDoc.
41+
* @ignore
42+
* @extends {Ignored}
43+
*/
44+
class InnerRefCountSubscription extends Subscription {
45+
constructor(private parent: RefCountSubscription) {
46+
super();
47+
parent.count++;
48+
}
49+
50+
unsubscribe() {
51+
const parent = this.parent;
52+
if (!parent.closed && !this.closed) {
53+
super.unsubscribe();
54+
parent.count -= 1;
55+
if (parent.count === 0 && parent.attemptedToUnsubscribe) {
56+
parent.unsubscribe();
57+
}
58+
}
59+
}
60+
}

src/operator/groupBy.ts

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { Subscriber } from '../Subscriber';
2-
import { Subscription } from '../Subscription';
32
import { Observable } from '../Observable';
43
import { Operator } from '../Operator';
54
import { Subject } from '../Subject';
65
import { Map } from '../util/Map';
76
import { FastMap } from '../util/FastMap';
7+
import { GroupedObservable, RefCountSubscription } from '../observable/GroupedObservable';
88

99
/* tslint:disable:max-line-length */
1010
export function groupBy<T, K>(this: Observable<T>, keySelector: (value: T) => K): Observable<GroupedObservable<K, T>>;
@@ -87,13 +87,6 @@ export function groupBy<T, K, R>(this: Observable<T>, keySelector: (value: T) =>
8787
return this.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
8888
}
8989

90-
export interface RefCountSubscription {
91-
count: number;
92-
unsubscribe: () => void;
93-
closed: boolean;
94-
attemptedToUnsubscribe: boolean;
95-
}
96-
9790
class GroupByOperator<T, K, R> implements Operator<T, GroupedObservable<K, R>> {
9891
constructor(private keySelector: (value: T) => K,
9992
private elementSelector?: ((value: T) => R) | void,
@@ -250,52 +243,3 @@ class GroupDurationSubscriber<K, T> extends Subscriber<T> {
250243
this.parent.removeGroup(this.key);
251244
}
252245
}
253-
254-
/**
255-
* An Observable representing values belonging to the same group represented by
256-
* a common key. The values emitted by a GroupedObservable come from the source
257-
* Observable. The common key is available as the field `key` on a
258-
* GroupedObservable instance.
259-
*
260-
* @class GroupedObservable<K, T>
261-
*/
262-
export class GroupedObservable<K, T> extends Observable<T> {
263-
constructor(public key: K,
264-
private groupSubject: Subject<T>,
265-
private refCountSubscription?: RefCountSubscription) {
266-
super();
267-
}
268-
269-
protected _subscribe(subscriber: Subscriber<T>) {
270-
const subscription = new Subscription();
271-
const {refCountSubscription, groupSubject} = this;
272-
if (refCountSubscription && !refCountSubscription.closed) {
273-
subscription.add(new InnerRefCountSubscription(refCountSubscription));
274-
}
275-
subscription.add(groupSubject.subscribe(subscriber));
276-
return subscription;
277-
}
278-
}
279-
280-
/**
281-
* We need this JSDoc comment for affecting ESDoc.
282-
* @ignore
283-
* @extends {Ignored}
284-
*/
285-
class InnerRefCountSubscription extends Subscription {
286-
constructor(private parent: RefCountSubscription) {
287-
super();
288-
parent.count++;
289-
}
290-
291-
unsubscribe() {
292-
const parent = this.parent;
293-
if (!parent.closed && !this.closed) {
294-
super.unsubscribe();
295-
parent.count -= 1;
296-
if (parent.count === 0 && parent.attemptedToUnsubscribe) {
297-
parent.unsubscribe();
298-
}
299-
}
300-
}
301-
}

src/operator/multicast.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ import { Subscriber } from '../Subscriber';
44
import { Observable } from '../Observable';
55
import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable';
66

7+
export type factoryOrValue<T> = T | (() => T);
8+
export type selector<T> = (source: Observable<T>) => Observable<T>;
9+
710
/* tslint:disable:max-line-length */
811
export function multicast<T>(this: Observable<T>, subjectOrSubjectFactory: factoryOrValue<Subject<T>>): ConnectableObservable<T>;
9-
export function multicast<T>(SubjectFactory: (this: Observable<T>) => Subject<T>, selector?: selector<T>): Observable<T>;
12+
export function multicast<T>(SubjectFactory: () => Subject<T>, selector?: selector<T>): Observable<T>;
1013
/* tslint:enable:max-line-length */
1114

1215
/**
@@ -50,9 +53,6 @@ export function multicast<T>(this: Observable<T>, subjectOrSubjectFactory: Subje
5053
return <ConnectableObservable<T>> connectable;
5154
}
5255

53-
export type factoryOrValue<T> = T | (() => T);
54-
export type selector<T> = (source: Observable<T>) => Observable<T>;
55-
5656
export class MulticastOperator<T> implements Operator<T, T> {
5757
constructor(private subjectFactory: () => Subject<T>,
5858
private selector: (source: Observable<T>) => Observable<T>) {

src/operator/publish.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import { Observable } from '../Observable';
33
import { multicast } from './multicast';
44
import { ConnectableObservable } from '../observable/ConnectableObservable';
55

6+
export type selector<T> = (source: Observable<T>) => Observable<T>;
7+
68
/* tslint:disable:max-line-length */
79
export function publish<T>(this: Observable<T>): ConnectableObservable<T>;
810
export function publish<T>(this: Observable<T>, selector: selector<T>): Observable<T>;
@@ -25,5 +27,3 @@ export function publish<T>(this: Observable<T>, selector?: (source: Observable<T
2527
return selector ? multicast.call(this, () => new Subject<T>(), selector) :
2628
multicast.call(this, new Subject<T>());
2729
}
28-
29-
export type selector<T> = (source: Observable<T>) => Observable<T>;

src/operator/timeInterval.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,22 @@ import { Subscriber } from '../Subscriber';
44
import { IScheduler } from '../Scheduler';
55
import { async } from '../scheduler/async';
66

7+
export interface ITimeInterval<T> {
8+
value: T;
9+
interval: number;
10+
}
11+
712
/**
813
* @param scheduler
914
* @return {Observable<TimeInterval<any>>|WebSocketSubject<T>|Observable<T>}
1015
* @method timeInterval
1116
* @owner Observable
1217
*/
13-
export function timeInterval<T>(this: Observable<T>, scheduler: IScheduler = async): Observable<TimeInterval<T>> {
18+
export function timeInterval<T>(this: Observable<T>, scheduler: IScheduler = async): Observable<ITimeInterval<T>> {
1419
return this.lift(new TimeIntervalOperator(scheduler));
1520
}
1621

17-
export class TimeInterval<T> {
22+
export class TimeInterval<T> implements ITimeInterval<T> {
1823
constructor(public value: T, public interval: number) {
1924

2025
}

0 commit comments

Comments
 (0)