addon-sdk/source/lib/sdk/event/utils.js

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 /* This Source Code Form is subject to the terms of the Mozilla Public
     2  * License, v. 2.0. If a copy of the MPL was not distributed with this
     3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     4 "use strict";
     6 module.metadata = {
     7   "stability": "unstable"
     8 };
    10 let { emit, on, once, off, EVENT_TYPE_PATTERN } = require("./core");
    12 // This module provides set of high order function for working with event
    13 // streams (streams in a NodeJS style that dispatch data, end and error
    14 // events).
    16 // Function takes a `target` object and returns set of implicit references
    17 // (non property references) it keeps. This basically allows defining
    18 // references between objects without storing the explicitly. See transform for
    19 // more details.
    20 let refs = (function() {
    21   let refSets = new WeakMap();
    22   return function refs(target) {
    23     if (!refSets.has(target)) refSets.set(target, new Set());
    24     return refSets.get(target);
    25   };
    26 })();
    28 function transform(input, f) {
    29   let output = {};
    31   // Since event listeners don't prevent `input` to be GC-ed we wanna presrve
    32   // it until `output` can be GC-ed. There for we add implicit reference which
    33   // is removed once `input` ends.
    34   refs(output).add(input);
    36   const next = data => receive(output, data);
    37   once(output, "start", () => start(input));
    38   on(input, "error", error => emit(output, "error", error));
    39   on(input, "end", function() {
    40     refs(output).delete(input);
    41     end(output);
    42   });
    43   on(input, "data", data => f(data, next));
    44   return output;
    45 }
    47 // High order event transformation function that takes `input` event channel
    48 // and returns transformation containing only events on which `p` predicate
    49 // returns `true`.
    50 function filter(input, predicate) {
    51   return transform(input, function(data, next) {
    52     if (predicate(data))
    53       next(data);
    54   });
    55 }
    56 exports.filter = filter;
    58 // High order function that takes `input` and returns input of it's values
    59 // mapped via given `f` function.
    60 const map = (input, f) => transform(input, (data, next) => next(f(data)));
    61 exports.map = map;
    63 // High order function that takes `input` stream of streams and merges them
    64 // into single event stream. Like flatten but time based rather than order
    65 // based.
    66 function merge(inputs) {
    67   let output = {};
    68   let open = 1;
    69   let state = [];
    70   output.state = state;
    71   refs(output).add(inputs);
    73   function end(input) {
    74     open = open - 1;
    75     refs(output).delete(input);
    76     if (open === 0) emit(output, "end");
    77   }
    78   const error = e => emit(output, "error", e);
    79   function forward(input) {
    80     state.push(input);
    81     open = open + 1;
    82     on(input, "end", () => end(input));
    83     on(input, "error", error);
    84     on(input, "data", data => emit(output, "data", data));
    85   }
    87   // If `inputs` is an array treat it as a stream.
    88   if (Array.isArray(inputs)) {
    89     inputs.forEach(forward);
    90     end(inputs);
    91   }
    92   else {
    93     on(inputs, "end", () => end(inputs));
    94     on(inputs, "error", error);
    95     on(inputs, "data", forward);
    96   }
    98   return output;
    99 }
   100 exports.merge = merge;
   102 const expand = (inputs, f) => merge(map(inputs, f));
   103 exports.expand = expand;
   105 const pipe = (from, to) => on(from, "*", emit.bind(emit, to));
   106 exports.pipe = pipe;
   109 // Shim signal APIs so other modules can be used as is.
   111 const receive = (input, message) => {
   112   if (input[receive])
   113     input[receive](input, message);
   114   else
   115     emit(input, "data", message);
   117   input.value = message;
   118 };
   119 receive.toString = () => "@@receive";
   120 exports.receive = receive;
   121 exports.send = receive;
   123 const end = input => {
   124   if (input[end])
   125     input[end](input);
   126   else
   127     emit(input, "end", input);
   128 };
   129 end.toString = () => "@@end";
   130 exports.end = end;
   132 const stop = input => {
   133   if (input[stop])
   134     input[stop](input);
   135   else
   136     emit(input, "stop", input);
   137 };
   138 stop.toString = () => "@@stop";
   139 exports.stop = stop;
   141 const start = input => {
   142   if (input[start])
   143     input[start](input);
   144   else
   145     emit(input, "start", input);
   146 };
   147 start.toString = () => "@@start";
   148 exports.start = start;
   150 const lift = (step, ...inputs) => {
   151   let args = null;
   152   let opened = inputs.length;
   153   let started = false;
   154   const output = {};
   155   const init = () => {
   156     args = [...inputs.map(input => input.value)];
   157     output.value = step(...args);
   158   };
   160   inputs.forEach((input, index) => {
   161     on(input, "data", data => {
   162       args[index] = data;
   163       receive(output, step(...args));
   164     });
   165     on(input, "end", () => {
   166       opened = opened - 1;
   167       if (opened <= 0)
   168         end(output);
   169     });
   170   });
   172   once(output, "start", () => {
   173     inputs.forEach(start);
   174     init();
   175   });
   177   init();
   179   return output;
   180 };
   181 exports.lift = lift;
   183 const merges = inputs => {
   184   let opened = inputs.length;
   185   let output = { value: inputs[0].value };
   186   inputs.forEach((input, index) => {
   187     on(input, "data", data => receive(output, data));
   188     on(input, "end", () => {
   189       opened = opened - 1;
   190       if (opened <= 0)
   191         end(output);
   192     });
   193   });
   195   once(output, "start", () => {
   196     inputs.forEach(start);
   197     output.value = inputs[0].value;
   198   });
   200   return output;
   201 };
   202 exports.merges = merges;
   204 const foldp = (step, initial, input) => {
   205   let output = map(input, x => step(output.value, x));
   206   output.value = initial;
   207   return output;
   208 };
   209 exports.foldp = foldp;
   211 const keepIf = (p, base, input) => {
   212   let output = filter(input, p);
   213   output.value = base;
   214   return output;
   215 };
   216 exports.keepIf = keepIf;
   218 function Input() {}
   219 Input.start = input => emit(input, "start", input);
   220 Input.prototype.start = Input.start;
   222 Input.end = input => {
   223   emit(input, "end", input);
   224   stop(input);
   225 };
   226 Input.prototype[end] = Input.end;
   228 exports.Input = Input;
   230 const $source = "@@source";
   231 const $outputs = "@@outputs";
   232 exports.outputs = $outputs;
   234 function Reactor(options={}) {
   235   const {onStep, onStart, onEnd} = options;
   236   if (onStep)
   237     this.onStep = onStep;
   238   if (onStart)
   239     this.onStart = onStart;
   240   if (onEnd)
   241     this.onEnd = onEnd;
   242 }
   243 Reactor.prototype.onStep = _ => void(0);
   244 Reactor.prototype.onStart = _ => void(0);
   245 Reactor.prototype.onEnd = _ => void(0);
   246 Reactor.prototype.onNext = function(present, past) {
   247   this.value = present;
   248   this.onStep(present, past);
   249 };
   250 Reactor.prototype.run = function(input) {
   251   on(input, "data", message => this.onNext(message, input.value));
   252   on(input, "end", () => this.onEnd(input.value));
   253   start(input);
   254   this.value = input.value;
   255   this.onStart(input.value);
   256 };
   257 exports.Reactor = Reactor;
   259 /**
   260  * Takes an object used as options with potential keys like 'onMessage',
   261  * used to be called `require('sdk/event/core').setListeners` on.
   262  * This strips all keys that would trigger a listener to be set.
   263  * 
   264  * @params {Object} object
   265  * @return {Object}
   266  */
   268 function stripListeners (object) {
   269   return Object.keys(object || {}).reduce((agg, key) => {
   270     if (!EVENT_TYPE_PATTERN.test(key))
   271       agg[key] = object[key];
   272     return agg;
   273   }, {});
   274 }
   275 exports.stripListeners = stripListeners;

mercurial