diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 5e8f2c5d61..f82251b694 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -653,6 +653,41 @@ describe('Observable.create', () => { } }); }); + + describe('pipe', () => { + it('should exist', () => { + expect(Observable.prototype.pipe).to.exist; + }); + + it('should pipe through multiple functions', () => { + const ob1 = Observable.of(1); + const ob2 = Observable.of(2); + const ob3 = Observable.of(3); + const ob4 = Observable.of(4); + const results = []; + + const piped = ob1.pipe( + s => { + expect(s).to.equal(ob1); + return ob2; + }, + s => { + expect(s).to.equal(ob2); + return ob3; + }, + s => { + expect(s).to.equal(ob3); + return ob4; + }, + ); + + expect(piped).to.equal(ob4); + + piped.subscribe(x => results.push(x), null, () => results.push('done')); + + expect(results).to.deep.equal([4, 'done']); + }); + }); }); /** @test {Observable} */ diff --git a/spec/Subject-spec.ts b/spec/Subject-spec.ts index 0a4b57776d..1b2cbbbace 100644 --- a/spec/Subject-spec.ts +++ b/spec/Subject-spec.ts @@ -487,7 +487,7 @@ describe('Subject', () => { it('should handle subject never emits', () => { const observable = hot('-').asObservable(); - expectObservable(observable).toBe([]); + expectObservable(observable).toBe('-'); }); it('should handle subject completes without emits', () => { diff --git a/spec/pipe/map-spec.ts b/spec/pipe/map-spec.ts new file mode 100644 index 0000000000..a354602204 --- /dev/null +++ b/spec/pipe/map-spec.ts @@ -0,0 +1,39 @@ + +import * as Rx from '../../dist/cjs/Rx'; +import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports + +declare const { asDiagram }; +declare const hot: typeof marbleTestingSignature.hot; +declare const cold: typeof marbleTestingSignature.cold; +declare const expectObservable: typeof marbleTestingSignature.expectObservable; +declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; + +describe('map-pipe', () => { + it('should pipe through the map operator', () => { + const s1 = cold( '---a--b--c--|'); + const project = x => x + x; + expectObservable(Rx.Pipe.map(project)(s1)).toBe(s1.map(project)); + }); + + it('should work with Observable.prototype.pipe', () => { + const values = { + a: 1, + b: 2, + c: 3, + x: 1, + y: 1.5, + z: 2 + }; + + const s1 = cold( '---a---b---c---|', values); + const expected = '---x---y---z---|'; + const { map } = Rx.Pipe; + + const t1 = s1.pipe( + map((x: number) => x + 1), + map((x: number) => x / 2) + ); + + expectObservable(t1).toBe(expected, values); + }); +}); diff --git a/spec/util/compose-spec.ts b/spec/util/compose-spec.ts new file mode 100644 index 0000000000..9a5f512f9c --- /dev/null +++ b/spec/util/compose-spec.ts @@ -0,0 +1,40 @@ +import { expect } from 'chai'; +import { compose } from '../../dist/cjs/util/compose'; + +describe('compose', () => { + it('should exist', () => { + expect(compose).to.be.a('function'); + }); + + it('should compose functions together', () => { + const addOne = x => x + 1; + const double = x => x + x; + const square = x => x * x; + + const composed = compose( + addOne, + double, + square + ); + + const result = composed(2); + + expect(result).to.equal(square(double(addOne(2)))); + }); + + it('should handle type switching', () => { + const toString = (x: number) => '' + x; + const toInt = (x: string) => parseInt(x); + const toObject = (x: number) => ({ value: x }); + + const composed = compose( + toString, + toInt, + toObject + ); + + const result = composed(2.0); + + expect(result).to.deep.equal({ value: 2 }); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 73dc054351..53c337778c 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -6,6 +6,7 @@ import { root } from './util/root'; import { toSubscriber } from './util/toSubscriber'; import { IfObservable } from './observable/IfObservable'; import { ErrorObservable } from './observable/ErrorObservable'; +import { compose } from './util/compose'; import { observable as Symbol_observable } from './symbol/observable'; export interface Subscribable { @@ -167,6 +168,11 @@ export class Observable implements Subscribable { }); } + pipe(...fns: ((x: Observable) => Observable)[]): Observable { + const composed = compose.apply(this, fns); + return composed(this); + } + protected _subscribe(subscriber: Subscriber): TeardownLogic { return this.source.subscribe(subscriber); } diff --git a/src/Rx.ts b/src/Rx.ts index 3bebef958c..fc255d3b5e 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -4,7 +4,7 @@ // definition export {Subject, AnonymousSubject} from './Subject'; /* tslint:enable:no-unused-variable */ -export {Observable} from './Observable'; +import {Observable} from './Observable'; // statics /* tslint:disable:no-use-before-declare */ @@ -174,6 +174,17 @@ import { iterator } from './symbol/iterator'; import { observable } from './symbol/observable'; /* tslint:enable:no-unused-variable */ +export { compose } from './util/compose'; + +// pull in pipe definitions +import { map as mapPipe } from './pipe/map'; + +/** + * @typedef {Object} Rx.Pipe + */ +let Pipe = { + map: mapPipe +}; /** * @typedef {Object} Rx.Scheduler @@ -216,5 +227,7 @@ let Symbol = { export { Scheduler, - Symbol + Symbol, + Pipe, + Observable }; diff --git a/src/pipe/map.ts b/src/pipe/map.ts new file mode 100644 index 0000000000..e44aa5b39f --- /dev/null +++ b/src/pipe/map.ts @@ -0,0 +1,9 @@ +import { map as mapProto } from '../operator/map'; +import { Observable } from '../Observable'; + +export function map( + project: (value: T, index: number) => R, + thisArg?: any +): (source: Observable) => Observable { + return (source: Observable) => mapProto.call(source, project); +} \ No newline at end of file diff --git a/src/testing/TestScheduler.ts b/src/testing/TestScheduler.ts index 8c7985529d..4265c86916 100644 --- a/src/testing/TestScheduler.ts +++ b/src/testing/TestScheduler.ts @@ -15,7 +15,7 @@ interface FlushableTest { expected?: any[]; } -export type observableToBeFn = (marbles: string, values?: any, errorValue?: any) => void; +export type observableToBeFn = (marbles: string|Observable, values?: any, errorValue?: any) => void; export type subscriptionLogsToBeFn = (marbles: string | string[]) => void; export class TestScheduler extends VirtualTimeScheduler { @@ -100,10 +100,23 @@ export class TestScheduler extends VirtualTimeScheduler { this.flushTests.push(flushTest); + const compareMarbles = (marbles: string, values?: any, errorValue?: any) => { + flushTest.ready = true; + flushTest.expected = TestScheduler.parseMarbles(marbles, values, errorValue, true); + }; + + const compareObservables = (expectedObservable: Observable) => { + flushTest.ready = true; + flushTest.expected = this.materializeInnerObservable(expectedObservable, 0); + }; + return { - toBe(marbles: string, values?: any, errorValue?: any) { - flushTest.ready = true; - flushTest.expected = TestScheduler.parseMarbles(marbles, values, errorValue, true); + toBe(marbles: string|Observable, values?: any, errorValue?: any) { + if (typeof marbles === 'string') { + compareMarbles(marbles, values, errorValue); + } else { + compareObservables(marbles); + } } }; } diff --git a/src/util/compose.ts b/src/util/compose.ts new file mode 100644 index 0000000000..8cbcd9c18e --- /dev/null +++ b/src/util/compose.ts @@ -0,0 +1,31 @@ +export function compose( + fnA: (a: A) => B, fnB: (b: B) => C): (a: A) => C; +export function compose( + fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D): (a: A) => D; +export function compose( + fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D): (a: A) => D; +export function compose( + fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D, + fnD: (d: D) => E): (a: A) => E; +export function compose( + fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D, + fnD: (d: D) => E, fnE: (e: E) => F): (a: A) => F; +export function compose( + fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D, + fnD: (d: D) => E, fnE: (e: E) => F, fnF: (f: F) => G): (a: A) => G; +export function compose( + fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D, + fnD: (d: D) => E, fnE: (e: E) => F, fnF: (f: F) => G, + fnG: (g: G) => H): (a: A) => H; +export function compose( + fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D, + fnD: (d: D) => E, fnE: (e: E) => F, fnF: (f: F) => G, + fnG: (g: G) => H, fnH: (h: H) => I): (a: A) => I; +export function compose( + fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D, + fnD: (d: D) => E, fnE: (e: E) => F, fnF: (f: F) => G, + fnG: (g: G) => H, fnH: (h: H) => I, fnJ: (i: I) => J): (a: A) => J; + +export function compose(...fns: any[]): any { + return (t: any) => fns.reduce((prev, fn) => fn(prev), t); +} \ No newline at end of file