@jetjs/streams
class Operator
recives values as an observer, manipulates the values and act as an observable for the manipulated values.
Example:
function filter(transform(val): boolean) {
let so;
return new ObservableObserver(
(subscriptionObserver) => { so = subscriptionObserver; // return function to get notified on termination},
(value) => {
let res = transform(value);
if (!res) {
so.next(value); // add to stream if transform returned false
}
}
);
}
Observable.of("val-1", "val-2")
.pipe(filter(str => str.endsWith("-2")))
.subscribe((str) => console.log("Received: " + str));
// prints:
// Received: val-1
Methods
Method | Return | Description |
---|---|---|
addObservable(observable) | void | |
complete() | void | |
createSimpleOperator(fn) | Operator | |
error(err) | void | |
next(value) | void | |
start(subscription) | void | |
subscribe(observerOrNext, error, complete) | Subscription | |
subscribeUpstream() | void |
Method Details
-
addObservable(observable) Method
Parent Observable is added, if a Operator is added to a stream via pipe. This is done to implement the laziness. So this Operator can call #subscribe after it has received a subscription.
Signature:addObservable(observable: Observable<I>): void;
Returns:void
Parameter Type Description observable Observable
The observable to get values from
-
complete() Method
Signature:complete(): void;
Returns:void
-
createSimpleOperator(fn) Method
Helper to implement a simple Operators.
see Operator.pipe
Signature:static createSimpleOperator<T, O>(fn: (val: T) => O | undefined): Operator<T, O>;
Returns:Operator
Operator instance
Parameter Type Description fn (val: T) => O | undefined
function to transform the value. If undefined is returned, value will be removed from the stream. if an error occur, it will be passed to the #error function
-
error(err) Method
Signature:error(err: Error): void;
Returns:void
Parameter Type Description err Error
-
next(value) Method
Signature:next(value: I): void;
Returns:void
Parameter Type Description value I
-
start(subscription) Method
Signature:start(subscription: Subscription): void;
Returns:void
Parameter Type Description subscription Subscription
-
subscribe(observerOrNext, error, complete) Method
Signature:subscribe(observerOrNext: Observer<O> | NextFn<O>, error?: ErrorFn, complete?: CompleteFn): Subscription;
Returns:Subscription
Parameter Type Description observerOrNext Observer
| NextFn error ErrorFn
complete CompleteFn
-
subscribeUpstream() Method
Signature:protected subscribeUpstream(): void;
Returns:void