addon-sdk/source/lib/sdk/event/utils.js

branch
TOR_BUG_3246
changeset 7
129ffea94266
equal deleted inserted replaced
-1:000000000000 0:e64d0c0bf6ec
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 "use strict";
5
6 module.metadata = {
7 "stability": "unstable"
8 };
9
10 let { emit, on, once, off, EVENT_TYPE_PATTERN } = require("./core");
11
12 // This module provides set of high order function for working with event
13 // streams (streams in a NodeJS style that dispatch data, end and error
14 // events).
15
16 // Function takes a `target` object and returns set of implicit references
17 // (non property references) it keeps. This basically allows defining
18 // references between objects without storing the explicitly. See transform for
19 // more details.
20 let refs = (function() {
21 let refSets = new WeakMap();
22 return function refs(target) {
23 if (!refSets.has(target)) refSets.set(target, new Set());
24 return refSets.get(target);
25 };
26 })();
27
28 function transform(input, f) {
29 let output = {};
30
31 // Since event listeners don't prevent `input` to be GC-ed we wanna presrve
32 // it until `output` can be GC-ed. There for we add implicit reference which
33 // is removed once `input` ends.
34 refs(output).add(input);
35
36 const next = data => receive(output, data);
37 once(output, "start", () => start(input));
38 on(input, "error", error => emit(output, "error", error));
39 on(input, "end", function() {
40 refs(output).delete(input);
41 end(output);
42 });
43 on(input, "data", data => f(data, next));
44 return output;
45 }
46
47 // High order event transformation function that takes `input` event channel
48 // and returns transformation containing only events on which `p` predicate
49 // returns `true`.
50 function filter(input, predicate) {
51 return transform(input, function(data, next) {
52 if (predicate(data))
53 next(data);
54 });
55 }
56 exports.filter = filter;
57
58 // High order function that takes `input` and returns input of it's values
59 // mapped via given `f` function.
60 const map = (input, f) => transform(input, (data, next) => next(f(data)));
61 exports.map = map;
62
63 // High order function that takes `input` stream of streams and merges them
64 // into single event stream. Like flatten but time based rather than order
65 // based.
66 function merge(inputs) {
67 let output = {};
68 let open = 1;
69 let state = [];
70 output.state = state;
71 refs(output).add(inputs);
72
73 function end(input) {
74 open = open - 1;
75 refs(output).delete(input);
76 if (open === 0) emit(output, "end");
77 }
78 const error = e => emit(output, "error", e);
79 function forward(input) {
80 state.push(input);
81 open = open + 1;
82 on(input, "end", () => end(input));
83 on(input, "error", error);
84 on(input, "data", data => emit(output, "data", data));
85 }
86
87 // If `inputs` is an array treat it as a stream.
88 if (Array.isArray(inputs)) {
89 inputs.forEach(forward);
90 end(inputs);
91 }
92 else {
93 on(inputs, "end", () => end(inputs));
94 on(inputs, "error", error);
95 on(inputs, "data", forward);
96 }
97
98 return output;
99 }
100 exports.merge = merge;
101
102 const expand = (inputs, f) => merge(map(inputs, f));
103 exports.expand = expand;
104
105 const pipe = (from, to) => on(from, "*", emit.bind(emit, to));
106 exports.pipe = pipe;
107
108
109 // Shim signal APIs so other modules can be used as is.
110
111 const receive = (input, message) => {
112 if (input[receive])
113 input[receive](input, message);
114 else
115 emit(input, "data", message);
116
117 input.value = message;
118 };
119 receive.toString = () => "@@receive";
120 exports.receive = receive;
121 exports.send = receive;
122
123 const end = input => {
124 if (input[end])
125 input[end](input);
126 else
127 emit(input, "end", input);
128 };
129 end.toString = () => "@@end";
130 exports.end = end;
131
132 const stop = input => {
133 if (input[stop])
134 input[stop](input);
135 else
136 emit(input, "stop", input);
137 };
138 stop.toString = () => "@@stop";
139 exports.stop = stop;
140
141 const start = input => {
142 if (input[start])
143 input[start](input);
144 else
145 emit(input, "start", input);
146 };
147 start.toString = () => "@@start";
148 exports.start = start;
149
150 const lift = (step, ...inputs) => {
151 let args = null;
152 let opened = inputs.length;
153 let started = false;
154 const output = {};
155 const init = () => {
156 args = [...inputs.map(input => input.value)];
157 output.value = step(...args);
158 };
159
160 inputs.forEach((input, index) => {
161 on(input, "data", data => {
162 args[index] = data;
163 receive(output, step(...args));
164 });
165 on(input, "end", () => {
166 opened = opened - 1;
167 if (opened <= 0)
168 end(output);
169 });
170 });
171
172 once(output, "start", () => {
173 inputs.forEach(start);
174 init();
175 });
176
177 init();
178
179 return output;
180 };
181 exports.lift = lift;
182
183 const merges = inputs => {
184 let opened = inputs.length;
185 let output = { value: inputs[0].value };
186 inputs.forEach((input, index) => {
187 on(input, "data", data => receive(output, data));
188 on(input, "end", () => {
189 opened = opened - 1;
190 if (opened <= 0)
191 end(output);
192 });
193 });
194
195 once(output, "start", () => {
196 inputs.forEach(start);
197 output.value = inputs[0].value;
198 });
199
200 return output;
201 };
202 exports.merges = merges;
203
204 const foldp = (step, initial, input) => {
205 let output = map(input, x => step(output.value, x));
206 output.value = initial;
207 return output;
208 };
209 exports.foldp = foldp;
210
211 const keepIf = (p, base, input) => {
212 let output = filter(input, p);
213 output.value = base;
214 return output;
215 };
216 exports.keepIf = keepIf;
217
218 function Input() {}
219 Input.start = input => emit(input, "start", input);
220 Input.prototype.start = Input.start;
221
222 Input.end = input => {
223 emit(input, "end", input);
224 stop(input);
225 };
226 Input.prototype[end] = Input.end;
227
228 exports.Input = Input;
229
230 const $source = "@@source";
231 const $outputs = "@@outputs";
232 exports.outputs = $outputs;
233
234 function Reactor(options={}) {
235 const {onStep, onStart, onEnd} = options;
236 if (onStep)
237 this.onStep = onStep;
238 if (onStart)
239 this.onStart = onStart;
240 if (onEnd)
241 this.onEnd = onEnd;
242 }
243 Reactor.prototype.onStep = _ => void(0);
244 Reactor.prototype.onStart = _ => void(0);
245 Reactor.prototype.onEnd = _ => void(0);
246 Reactor.prototype.onNext = function(present, past) {
247 this.value = present;
248 this.onStep(present, past);
249 };
250 Reactor.prototype.run = function(input) {
251 on(input, "data", message => this.onNext(message, input.value));
252 on(input, "end", () => this.onEnd(input.value));
253 start(input);
254 this.value = input.value;
255 this.onStart(input.value);
256 };
257 exports.Reactor = Reactor;
258
259 /**
260 * Takes an object used as options with potential keys like 'onMessage',
261 * used to be called `require('sdk/event/core').setListeners` on.
262 * This strips all keys that would trigger a listener to be set.
263 *
264 * @params {Object} object
265 * @return {Object}
266 */
267
268 function stripListeners (object) {
269 return Object.keys(object || {}).reduce((agg, key) => {
270 if (!EVENT_TYPE_PATTERN.test(key))
271 agg[key] = object[key];
272 return agg;
273 }, {});
274 }
275 exports.stripListeners = stripListeners;

mercurial