1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/addon-sdk/source/lib/sdk/event/utils.js Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,275 @@ 1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.7 +"use strict"; 1.8 + 1.9 +module.metadata = { 1.10 + "stability": "unstable" 1.11 +}; 1.12 + 1.13 +let { emit, on, once, off, EVENT_TYPE_PATTERN } = require("./core"); 1.14 + 1.15 +// This module provides set of high order function for working with event 1.16 +// streams (streams in a NodeJS style that dispatch data, end and error 1.17 +// events). 1.18 + 1.19 +// Function takes a `target` object and returns set of implicit references 1.20 +// (non property references) it keeps. This basically allows defining 1.21 +// references between objects without storing the explicitly. See transform for 1.22 +// more details. 1.23 +let refs = (function() { 1.24 + let refSets = new WeakMap(); 1.25 + return function refs(target) { 1.26 + if (!refSets.has(target)) refSets.set(target, new Set()); 1.27 + return refSets.get(target); 1.28 + }; 1.29 +})(); 1.30 + 1.31 +function transform(input, f) { 1.32 + let output = {}; 1.33 + 1.34 + // Since event listeners don't prevent `input` to be GC-ed we wanna presrve 1.35 + // it until `output` can be GC-ed. There for we add implicit reference which 1.36 + // is removed once `input` ends. 1.37 + refs(output).add(input); 1.38 + 1.39 + const next = data => receive(output, data); 1.40 + once(output, "start", () => start(input)); 1.41 + on(input, "error", error => emit(output, "error", error)); 1.42 + on(input, "end", function() { 1.43 + refs(output).delete(input); 1.44 + end(output); 1.45 + }); 1.46 + on(input, "data", data => f(data, next)); 1.47 + return output; 1.48 +} 1.49 + 1.50 +// High order event transformation function that takes `input` event channel 1.51 +// and returns transformation containing only events on which `p` predicate 1.52 +// returns `true`. 1.53 +function filter(input, predicate) { 1.54 + return transform(input, function(data, next) { 1.55 + if (predicate(data)) 1.56 + next(data); 1.57 + }); 1.58 +} 1.59 +exports.filter = filter; 1.60 + 1.61 +// High order function that takes `input` and returns input of it's values 1.62 +// mapped via given `f` function. 1.63 +const map = (input, f) => transform(input, (data, next) => next(f(data))); 1.64 +exports.map = map; 1.65 + 1.66 +// High order function that takes `input` stream of streams and merges them 1.67 +// into single event stream. Like flatten but time based rather than order 1.68 +// based. 1.69 +function merge(inputs) { 1.70 + let output = {}; 1.71 + let open = 1; 1.72 + let state = []; 1.73 + output.state = state; 1.74 + refs(output).add(inputs); 1.75 + 1.76 + function end(input) { 1.77 + open = open - 1; 1.78 + refs(output).delete(input); 1.79 + if (open === 0) emit(output, "end"); 1.80 + } 1.81 + const error = e => emit(output, "error", e); 1.82 + function forward(input) { 1.83 + state.push(input); 1.84 + open = open + 1; 1.85 + on(input, "end", () => end(input)); 1.86 + on(input, "error", error); 1.87 + on(input, "data", data => emit(output, "data", data)); 1.88 + } 1.89 + 1.90 + // If `inputs` is an array treat it as a stream. 1.91 + if (Array.isArray(inputs)) { 1.92 + inputs.forEach(forward); 1.93 + end(inputs); 1.94 + } 1.95 + else { 1.96 + on(inputs, "end", () => end(inputs)); 1.97 + on(inputs, "error", error); 1.98 + on(inputs, "data", forward); 1.99 + } 1.100 + 1.101 + return output; 1.102 +} 1.103 +exports.merge = merge; 1.104 + 1.105 +const expand = (inputs, f) => merge(map(inputs, f)); 1.106 +exports.expand = expand; 1.107 + 1.108 +const pipe = (from, to) => on(from, "*", emit.bind(emit, to)); 1.109 +exports.pipe = pipe; 1.110 + 1.111 + 1.112 +// Shim signal APIs so other modules can be used as is. 1.113 + 1.114 +const receive = (input, message) => { 1.115 + if (input[receive]) 1.116 + input[receive](input, message); 1.117 + else 1.118 + emit(input, "data", message); 1.119 + 1.120 + input.value = message; 1.121 +}; 1.122 +receive.toString = () => "@@receive"; 1.123 +exports.receive = receive; 1.124 +exports.send = receive; 1.125 + 1.126 +const end = input => { 1.127 + if (input[end]) 1.128 + input[end](input); 1.129 + else 1.130 + emit(input, "end", input); 1.131 +}; 1.132 +end.toString = () => "@@end"; 1.133 +exports.end = end; 1.134 + 1.135 +const stop = input => { 1.136 + if (input[stop]) 1.137 + input[stop](input); 1.138 + else 1.139 + emit(input, "stop", input); 1.140 +}; 1.141 +stop.toString = () => "@@stop"; 1.142 +exports.stop = stop; 1.143 + 1.144 +const start = input => { 1.145 + if (input[start]) 1.146 + input[start](input); 1.147 + else 1.148 + emit(input, "start", input); 1.149 +}; 1.150 +start.toString = () => "@@start"; 1.151 +exports.start = start; 1.152 + 1.153 +const lift = (step, ...inputs) => { 1.154 + let args = null; 1.155 + let opened = inputs.length; 1.156 + let started = false; 1.157 + const output = {}; 1.158 + const init = () => { 1.159 + args = [...inputs.map(input => input.value)]; 1.160 + output.value = step(...args); 1.161 + }; 1.162 + 1.163 + inputs.forEach((input, index) => { 1.164 + on(input, "data", data => { 1.165 + args[index] = data; 1.166 + receive(output, step(...args)); 1.167 + }); 1.168 + on(input, "end", () => { 1.169 + opened = opened - 1; 1.170 + if (opened <= 0) 1.171 + end(output); 1.172 + }); 1.173 + }); 1.174 + 1.175 + once(output, "start", () => { 1.176 + inputs.forEach(start); 1.177 + init(); 1.178 + }); 1.179 + 1.180 + init(); 1.181 + 1.182 + return output; 1.183 +}; 1.184 +exports.lift = lift; 1.185 + 1.186 +const merges = inputs => { 1.187 + let opened = inputs.length; 1.188 + let output = { value: inputs[0].value }; 1.189 + inputs.forEach((input, index) => { 1.190 + on(input, "data", data => receive(output, data)); 1.191 + on(input, "end", () => { 1.192 + opened = opened - 1; 1.193 + if (opened <= 0) 1.194 + end(output); 1.195 + }); 1.196 + }); 1.197 + 1.198 + once(output, "start", () => { 1.199 + inputs.forEach(start); 1.200 + output.value = inputs[0].value; 1.201 + }); 1.202 + 1.203 + return output; 1.204 +}; 1.205 +exports.merges = merges; 1.206 + 1.207 +const foldp = (step, initial, input) => { 1.208 + let output = map(input, x => step(output.value, x)); 1.209 + output.value = initial; 1.210 + return output; 1.211 +}; 1.212 +exports.foldp = foldp; 1.213 + 1.214 +const keepIf = (p, base, input) => { 1.215 + let output = filter(input, p); 1.216 + output.value = base; 1.217 + return output; 1.218 +}; 1.219 +exports.keepIf = keepIf; 1.220 + 1.221 +function Input() {} 1.222 +Input.start = input => emit(input, "start", input); 1.223 +Input.prototype.start = Input.start; 1.224 + 1.225 +Input.end = input => { 1.226 + emit(input, "end", input); 1.227 + stop(input); 1.228 +}; 1.229 +Input.prototype[end] = Input.end; 1.230 + 1.231 +exports.Input = Input; 1.232 + 1.233 +const $source = "@@source"; 1.234 +const $outputs = "@@outputs"; 1.235 +exports.outputs = $outputs; 1.236 + 1.237 +function Reactor(options={}) { 1.238 + const {onStep, onStart, onEnd} = options; 1.239 + if (onStep) 1.240 + this.onStep = onStep; 1.241 + if (onStart) 1.242 + this.onStart = onStart; 1.243 + if (onEnd) 1.244 + this.onEnd = onEnd; 1.245 +} 1.246 +Reactor.prototype.onStep = _ => void(0); 1.247 +Reactor.prototype.onStart = _ => void(0); 1.248 +Reactor.prototype.onEnd = _ => void(0); 1.249 +Reactor.prototype.onNext = function(present, past) { 1.250 + this.value = present; 1.251 + this.onStep(present, past); 1.252 +}; 1.253 +Reactor.prototype.run = function(input) { 1.254 + on(input, "data", message => this.onNext(message, input.value)); 1.255 + on(input, "end", () => this.onEnd(input.value)); 1.256 + start(input); 1.257 + this.value = input.value; 1.258 + this.onStart(input.value); 1.259 +}; 1.260 +exports.Reactor = Reactor; 1.261 + 1.262 +/** 1.263 + * Takes an object used as options with potential keys like 'onMessage', 1.264 + * used to be called `require('sdk/event/core').setListeners` on. 1.265 + * This strips all keys that would trigger a listener to be set. 1.266 + * 1.267 + * @params {Object} object 1.268 + * @return {Object} 1.269 + */ 1.270 + 1.271 +function stripListeners (object) { 1.272 + return Object.keys(object || {}).reduce((agg, key) => { 1.273 + if (!EVENT_TYPE_PATTERN.test(key)) 1.274 + agg[key] = object[key]; 1.275 + return agg; 1.276 + }, {}); 1.277 +} 1.278 +exports.stripListeners = stripListeners;