michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: "use strict"; michael@0: michael@0: module.metadata = { michael@0: "stability": "unstable" michael@0: }; michael@0: michael@0: let { emit, on, once, off, EVENT_TYPE_PATTERN } = require("./core"); michael@0: michael@0: // This module provides set of high order function for working with event michael@0: // streams (streams in a NodeJS style that dispatch data, end and error michael@0: // events). michael@0: michael@0: // Function takes a `target` object and returns set of implicit references michael@0: // (non property references) it keeps. This basically allows defining michael@0: // references between objects without storing the explicitly. See transform for michael@0: // more details. michael@0: let refs = (function() { michael@0: let refSets = new WeakMap(); michael@0: return function refs(target) { michael@0: if (!refSets.has(target)) refSets.set(target, new Set()); michael@0: return refSets.get(target); michael@0: }; michael@0: })(); michael@0: michael@0: function transform(input, f) { michael@0: let output = {}; michael@0: michael@0: // Since event listeners don't prevent `input` to be GC-ed we wanna presrve michael@0: // it until `output` can be GC-ed. There for we add implicit reference which michael@0: // is removed once `input` ends. michael@0: refs(output).add(input); michael@0: michael@0: const next = data => receive(output, data); michael@0: once(output, "start", () => start(input)); michael@0: on(input, "error", error => emit(output, "error", error)); michael@0: on(input, "end", function() { michael@0: refs(output).delete(input); michael@0: end(output); michael@0: }); michael@0: on(input, "data", data => f(data, next)); michael@0: return output; michael@0: } michael@0: michael@0: // High order event transformation function that takes `input` event channel michael@0: // and returns transformation containing only events on which `p` predicate michael@0: // returns `true`. michael@0: function filter(input, predicate) { michael@0: return transform(input, function(data, next) { michael@0: if (predicate(data)) michael@0: next(data); michael@0: }); michael@0: } michael@0: exports.filter = filter; michael@0: michael@0: // High order function that takes `input` and returns input of it's values michael@0: // mapped via given `f` function. michael@0: const map = (input, f) => transform(input, (data, next) => next(f(data))); michael@0: exports.map = map; michael@0: michael@0: // High order function that takes `input` stream of streams and merges them michael@0: // into single event stream. Like flatten but time based rather than order michael@0: // based. michael@0: function merge(inputs) { michael@0: let output = {}; michael@0: let open = 1; michael@0: let state = []; michael@0: output.state = state; michael@0: refs(output).add(inputs); michael@0: michael@0: function end(input) { michael@0: open = open - 1; michael@0: refs(output).delete(input); michael@0: if (open === 0) emit(output, "end"); michael@0: } michael@0: const error = e => emit(output, "error", e); michael@0: function forward(input) { michael@0: state.push(input); michael@0: open = open + 1; michael@0: on(input, "end", () => end(input)); michael@0: on(input, "error", error); michael@0: on(input, "data", data => emit(output, "data", data)); michael@0: } michael@0: michael@0: // If `inputs` is an array treat it as a stream. michael@0: if (Array.isArray(inputs)) { michael@0: inputs.forEach(forward); michael@0: end(inputs); michael@0: } michael@0: else { michael@0: on(inputs, "end", () => end(inputs)); michael@0: on(inputs, "error", error); michael@0: on(inputs, "data", forward); michael@0: } michael@0: michael@0: return output; michael@0: } michael@0: exports.merge = merge; michael@0: michael@0: const expand = (inputs, f) => merge(map(inputs, f)); michael@0: exports.expand = expand; michael@0: michael@0: const pipe = (from, to) => on(from, "*", emit.bind(emit, to)); michael@0: exports.pipe = pipe; michael@0: michael@0: michael@0: // Shim signal APIs so other modules can be used as is. michael@0: michael@0: const receive = (input, message) => { michael@0: if (input[receive]) michael@0: input[receive](input, message); michael@0: else michael@0: emit(input, "data", message); michael@0: michael@0: input.value = message; michael@0: }; michael@0: receive.toString = () => "@@receive"; michael@0: exports.receive = receive; michael@0: exports.send = receive; michael@0: michael@0: const end = input => { michael@0: if (input[end]) michael@0: input[end](input); michael@0: else michael@0: emit(input, "end", input); michael@0: }; michael@0: end.toString = () => "@@end"; michael@0: exports.end = end; michael@0: michael@0: const stop = input => { michael@0: if (input[stop]) michael@0: input[stop](input); michael@0: else michael@0: emit(input, "stop", input); michael@0: }; michael@0: stop.toString = () => "@@stop"; michael@0: exports.stop = stop; michael@0: michael@0: const start = input => { michael@0: if (input[start]) michael@0: input[start](input); michael@0: else michael@0: emit(input, "start", input); michael@0: }; michael@0: start.toString = () => "@@start"; michael@0: exports.start = start; michael@0: michael@0: const lift = (step, ...inputs) => { michael@0: let args = null; michael@0: let opened = inputs.length; michael@0: let started = false; michael@0: const output = {}; michael@0: const init = () => { michael@0: args = [...inputs.map(input => input.value)]; michael@0: output.value = step(...args); michael@0: }; michael@0: michael@0: inputs.forEach((input, index) => { michael@0: on(input, "data", data => { michael@0: args[index] = data; michael@0: receive(output, step(...args)); michael@0: }); michael@0: on(input, "end", () => { michael@0: opened = opened - 1; michael@0: if (opened <= 0) michael@0: end(output); michael@0: }); michael@0: }); michael@0: michael@0: once(output, "start", () => { michael@0: inputs.forEach(start); michael@0: init(); michael@0: }); michael@0: michael@0: init(); michael@0: michael@0: return output; michael@0: }; michael@0: exports.lift = lift; michael@0: michael@0: const merges = inputs => { michael@0: let opened = inputs.length; michael@0: let output = { value: inputs[0].value }; michael@0: inputs.forEach((input, index) => { michael@0: on(input, "data", data => receive(output, data)); michael@0: on(input, "end", () => { michael@0: opened = opened - 1; michael@0: if (opened <= 0) michael@0: end(output); michael@0: }); michael@0: }); michael@0: michael@0: once(output, "start", () => { michael@0: inputs.forEach(start); michael@0: output.value = inputs[0].value; michael@0: }); michael@0: michael@0: return output; michael@0: }; michael@0: exports.merges = merges; michael@0: michael@0: const foldp = (step, initial, input) => { michael@0: let output = map(input, x => step(output.value, x)); michael@0: output.value = initial; michael@0: return output; michael@0: }; michael@0: exports.foldp = foldp; michael@0: michael@0: const keepIf = (p, base, input) => { michael@0: let output = filter(input, p); michael@0: output.value = base; michael@0: return output; michael@0: }; michael@0: exports.keepIf = keepIf; michael@0: michael@0: function Input() {} michael@0: Input.start = input => emit(input, "start", input); michael@0: Input.prototype.start = Input.start; michael@0: michael@0: Input.end = input => { michael@0: emit(input, "end", input); michael@0: stop(input); michael@0: }; michael@0: Input.prototype[end] = Input.end; michael@0: michael@0: exports.Input = Input; michael@0: michael@0: const $source = "@@source"; michael@0: const $outputs = "@@outputs"; michael@0: exports.outputs = $outputs; michael@0: michael@0: function Reactor(options={}) { michael@0: const {onStep, onStart, onEnd} = options; michael@0: if (onStep) michael@0: this.onStep = onStep; michael@0: if (onStart) michael@0: this.onStart = onStart; michael@0: if (onEnd) michael@0: this.onEnd = onEnd; michael@0: } michael@0: Reactor.prototype.onStep = _ => void(0); michael@0: Reactor.prototype.onStart = _ => void(0); michael@0: Reactor.prototype.onEnd = _ => void(0); michael@0: Reactor.prototype.onNext = function(present, past) { michael@0: this.value = present; michael@0: this.onStep(present, past); michael@0: }; michael@0: Reactor.prototype.run = function(input) { michael@0: on(input, "data", message => this.onNext(message, input.value)); michael@0: on(input, "end", () => this.onEnd(input.value)); michael@0: start(input); michael@0: this.value = input.value; michael@0: this.onStart(input.value); michael@0: }; michael@0: exports.Reactor = Reactor; michael@0: michael@0: /** michael@0: * Takes an object used as options with potential keys like 'onMessage', michael@0: * used to be called `require('sdk/event/core').setListeners` on. michael@0: * This strips all keys that would trigger a listener to be set. michael@0: * michael@0: * @params {Object} object michael@0: * @return {Object} michael@0: */ michael@0: michael@0: function stripListeners (object) { michael@0: return Object.keys(object || {}).reduce((agg, key) => { michael@0: if (!EVENT_TYPE_PATTERN.test(key)) michael@0: agg[key] = object[key]; michael@0: return agg; michael@0: }, {}); michael@0: } michael@0: exports.stripListeners = stripListeners;