Skip to content

STRAWMAN: pipe method #2489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,41 @@ describe('Observable.create', () => {
}
});
});

describe('pipe', () => {
it('should exist', () => {
expect(Observable.prototype.pipe).to.exist;
});

it('should pipe through multiple functions', () => {
const ob1 = Observable.of(1);
const ob2 = Observable.of(2);
const ob3 = Observable.of(3);
const ob4 = Observable.of(4);
const results = [];

const piped = ob1.pipe(
s => {
expect(s).to.equal(ob1);
return ob2;
},
s => {
expect(s).to.equal(ob2);
return ob3;
},
s => {
expect(s).to.equal(ob3);
return ob4;
},
);

expect(piped).to.equal(ob4);

piped.subscribe(x => results.push(x), null, () => results.push('done'));

expect(results).to.deep.equal([4, 'done']);
});
});
});

/** @test {Observable} */
Expand Down
2 changes: 1 addition & 1 deletion spec/Subject-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ describe('Subject', () => {
it('should handle subject never emits', () => {
const observable = hot('-').asObservable();

expectObservable(observable).toBe(<any>[]);
expectObservable(observable).toBe('-');
});

it('should handle subject completes without emits', () => {
Expand Down
39 changes: 39 additions & 0 deletions spec/pipe/map-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

import * as Rx from '../../dist/cjs/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports

declare const { asDiagram };
declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;

describe('map-pipe', () => {
it('should pipe through the map operator', () => {
const s1 = cold( '---a--b--c--|');
const project = x => x + x;
expectObservable(Rx.Pipe.map(project)(s1)).toBe(s1.map(project));
});

it('should work with Observable.prototype.pipe', () => {
const values = {
a: 1,
b: 2,
c: 3,
x: 1,
y: 1.5,
z: 2
};

const s1 = cold( '---a---b---c---|', values);
const expected = '---x---y---z---|';
const { map } = Rx.Pipe;

const t1 = s1.pipe(
map((x: number) => x + 1),
map((x: number) => x / 2)
);

expectObservable(t1).toBe(expected, values);
});
});
40 changes: 40 additions & 0 deletions spec/util/compose-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { expect } from 'chai';
import { compose } from '../../dist/cjs/util/compose';

describe('compose', () => {
it('should exist', () => {
expect(compose).to.be.a('function');
});

it('should compose functions together', () => {
const addOne = x => x + 1;
const double = x => x + x;
const square = x => x * x;

const composed = compose(
addOne,
double,
square
);

const result = composed(2);

expect(result).to.equal(square(double(addOne(2))));
});

it('should handle type switching', () => {
const toString = (x: number) => '' + x;
const toInt = (x: string) => parseInt(x);
const toObject = (x: number) => ({ value: x });

const composed = compose(
toString,
toInt,
toObject
);

const result = composed(2.0);

expect(result).to.deep.equal({ value: 2 });
});
});
6 changes: 6 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { root } from './util/root';
import { toSubscriber } from './util/toSubscriber';
import { IfObservable } from './observable/IfObservable';
import { ErrorObservable } from './observable/ErrorObservable';
import { compose } from './util/compose';
import { observable as Symbol_observable } from './symbol/observable';

export interface Subscribable<T> {
Expand Down Expand Up @@ -167,6 +168,11 @@ export class Observable<T> implements Subscribable<T> {
});
}

pipe(...fns: ((x: Observable<T>) => Observable<T>)[]): Observable<T> {
const composed = compose.apply(this, fns);
return composed(this);
}

protected _subscribe(subscriber: Subscriber<any>): TeardownLogic {
return this.source.subscribe(subscriber);
}
Expand Down
17 changes: 15 additions & 2 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// definition
export {Subject, AnonymousSubject} from './Subject';
/* tslint:enable:no-unused-variable */
export {Observable} from './Observable';
import {Observable} from './Observable';

// statics
/* tslint:disable:no-use-before-declare */
Expand Down Expand Up @@ -174,6 +174,17 @@ import { iterator } from './symbol/iterator';
import { observable } from './symbol/observable';

/* tslint:enable:no-unused-variable */
export { compose } from './util/compose';

// pull in pipe definitions
import { map as mapPipe } from './pipe/map';

/**
* @typedef {Object} Rx.Pipe
*/
let Pipe = {
map: mapPipe
};

/**
* @typedef {Object} Rx.Scheduler
Expand Down Expand Up @@ -216,5 +227,7 @@ let Symbol = {

export {
Scheduler,
Symbol
Symbol,
Pipe,
Observable
};
9 changes: 9 additions & 0 deletions src/pipe/map.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { map as mapProto } from '../operator/map';
import { Observable } from '../Observable';

export function map<T, R>(
project: (value: T, index: number) => R,
thisArg?: any
): (source: Observable<T>) => Observable<R> {
return (source: Observable<T>) => mapProto.call(source, project);
}
21 changes: 17 additions & 4 deletions src/testing/TestScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ interface FlushableTest {
expected?: any[];
}

export type observableToBeFn = (marbles: string, values?: any, errorValue?: any) => void;
export type observableToBeFn = (marbles: string|Observable<any>, values?: any, errorValue?: any) => void;
export type subscriptionLogsToBeFn = (marbles: string | string[]) => void;

export class TestScheduler extends VirtualTimeScheduler {
Expand Down Expand Up @@ -100,10 +100,23 @@ export class TestScheduler extends VirtualTimeScheduler {

this.flushTests.push(flushTest);

const compareMarbles = (marbles: string, values?: any, errorValue?: any) => {
flushTest.ready = true;
flushTest.expected = TestScheduler.parseMarbles(marbles, values, errorValue, true);
};

const compareObservables = (expectedObservable: Observable<any>) => {
flushTest.ready = true;
flushTest.expected = this.materializeInnerObservable(expectedObservable, 0);
};

return {
toBe(marbles: string, values?: any, errorValue?: any) {
flushTest.ready = true;
flushTest.expected = TestScheduler.parseMarbles(marbles, values, errorValue, true);
toBe(marbles: string|Observable<any>, values?: any, errorValue?: any) {
if (typeof marbles === 'string') {
compareMarbles(marbles, values, errorValue);
} else {
compareObservables(marbles);
}
}
};
}
Expand Down
31 changes: 31 additions & 0 deletions src/util/compose.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
export function compose<A, B, C>(
fnA: (a: A) => B, fnB: (b: B) => C): (a: A) => C;
export function compose<A, B, C, D>(
fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D): (a: A) => D;
export function compose<A, B, C, D>(
fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D): (a: A) => D;
export function compose<A, B, C, D, E>(
fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D,
fnD: (d: D) => E): (a: A) => E;
export function compose<A, B, C, D, E, F>(
fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D,
fnD: (d: D) => E, fnE: (e: E) => F): (a: A) => F;
export function compose<A, B, C, D, E, F, G>(
fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D,
fnD: (d: D) => E, fnE: (e: E) => F, fnF: (f: F) => G): (a: A) => G;
export function compose<A, B, C, D, E, F, G, H>(
fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D,
fnD: (d: D) => E, fnE: (e: E) => F, fnF: (f: F) => G,
fnG: (g: G) => H): (a: A) => H;
export function compose<A, B, C, D, E, F, G, H, I>(
fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D,
fnD: (d: D) => E, fnE: (e: E) => F, fnF: (f: F) => G,
fnG: (g: G) => H, fnH: (h: H) => I): (a: A) => I;
export function compose<A, B, C, D, E, F, G, H, I, J>(
fnA: (a: A) => B, fnB: (b: B) => C, fnC: (c: C) => D,
fnD: (d: D) => E, fnE: (e: E) => F, fnF: (f: F) => G,
fnG: (g: G) => H, fnH: (h: H) => I, fnJ: (i: I) => J): (a: A) => J;

export function compose(...fns: any[]): any {
return (t: any) => fns.reduce((prev, fn) => fn(prev), t);
}