Skip to content

Components: Source

Tim Ermilov edited this page Feb 18, 2016 · 1 revision

Source components are responsible for generating events and/or data that should be processed.
Events and data generated by sources will trigger processor components asynchronously.
Source components are constructed using Rx.Observable.create function from RxJS.
The default exported user function will passed to the constructor.

"Hello world!" source example

In the simplest instance, "Hello world!" source component would look like this:

export default (observable) => {
    observable.onNext('Hello world!');
    observable.onCompleted();
};

This source dispatches a string "Hello world!" and immediately notifies that it is complete.

Endless source example

It is possible to create endless source components.
This might be useful for cases when you might need to receive data continuously. (e.g. slack bot)
Simplest case is endless counter dispatcher with preset interval:

export default (observable) => {
    let i = 0;
    setInterval(() => observable.onNext(i++), 1000);
};

This source will dispatch counter every second until user manually shuts it down.

Providing configuration options

Sometimes you might need to allow to pass configuration options for the source.
This can be done by adding more parameters to the function.
Here's an example of source component that will get the text for given URL:

import request from 'superagent';

export default (url, observable) => {
    request.get(url).end((err, res) => {
        if (err) return observable.onError(err);
        observable.onNext(res.text);
    });
};

Note that observable variable will always be the last parameter of the function.

HTTP and WebSocket input to source components

It is also possible to use HTTP requests and WebSocket to pass additional data to sources that are already running.
The input will be mapped to the URI of the pipeline appended by /input, e.g. http://alpha.exynize.com/pipeline/user/pipeline/input.
There are two ways to handle this.

Simple way

Simple way is to define new exported function called routeHandler that will get incoming requests from all the transports (POST, PUT, GET requests and websocket).
This can be done like so:

// create subject to pass request to main source function
const incoming = new Rx.Subject();
// define function that will handle incoming requests
export const routeHandler = (req) => incoming.onNext(req);
// main source function
export default (observable) => {
    incoming.subscribe(observable);
    observable.onNext('Waiting for input!');
};

This example uses Rx.Subject to pass the incoming request directly to source output.

Controlled way

More controlled way is to define new exported object called routeHandler that will process incoming requests from any transports it defines handlers for.
This can be done like so:

// create subject to pass request to main source function
const incoming = new Rx.Subject();
// define object that will handle only incoming GET requests
export const routeHandler = {
    get: (req) => incoming.onNext(req.query)
    // other possible methods:
    // "post" for POST requests
    // "put" for PUT requests
    // "ws" for WebSocket connection
};
// main source function
export default (observable) => {
    incoming.subscribe(observable);
    observable.onNext('Waiting for input!');
};

This example will only handle GET requests and will pass any query that incoming request has to the source function.

Note that source observable must not be complete and source must be running to accept requests.

Examples

You can find real-world examples of source components in usecases folder, currently the following ones are available: