michael@0: /** michael@0: * Thin wrapper around a ChromeWorker that wraps postMessage/onmessage/onerror michael@0: * as promises. michael@0: * michael@0: * Not for public use yet. michael@0: */ michael@0: michael@0: "use strict"; michael@0: michael@0: this.EXPORTED_SYMBOLS = ["PromiseWorker"]; michael@0: michael@0: // The library of promises. michael@0: Components.utils.import("resource://gre/modules/Promise.jsm", this); michael@0: michael@0: /** michael@0: * An implementation of queues (FIFO). michael@0: * michael@0: * The current implementation uses one array, runs in O(n ^ 2), and is optimized michael@0: * for the case in which queues are generally short. michael@0: */ michael@0: let Queue = function Queue() { michael@0: this._array = []; michael@0: }; michael@0: Queue.prototype = { michael@0: pop: function pop() { michael@0: return this._array.shift(); michael@0: }, michael@0: push: function push(x) { michael@0: return this._array.push(x); michael@0: }, michael@0: isEmpty: function isEmpty() { michael@0: return this._array.length == 0; michael@0: } michael@0: }; michael@0: michael@0: /** michael@0: * An object responsible for dispatching messages to michael@0: * a chrome worker and routing the responses. michael@0: * michael@0: * @param {string} url The url containing the source code for this worker, michael@0: * as in constructor ChromeWorker. michael@0: * @param {Function} log A logging function. michael@0: * michael@0: * @constructor michael@0: */ michael@0: function PromiseWorker(url, log) { michael@0: if (typeof url != "string") { michael@0: throw new TypeError("Expecting a string"); michael@0: } michael@0: if (typeof log !== "function") { michael@0: throw new TypeError("log is expected to be a function"); michael@0: } michael@0: this._log = log; michael@0: this._url = url; michael@0: michael@0: /** michael@0: * The queue of deferred, waiting for the completion of their michael@0: * respective job by the worker. michael@0: * michael@0: * Each item in the list may contain an additional field |closure|, michael@0: * used to store strong references to value that must not be michael@0: * garbage-collected before the reply has been received (e.g. michael@0: * arrays). michael@0: * michael@0: * @type {Queue<{deferred:deferred, closure:*=}>} michael@0: */ michael@0: this._queue = new Queue(); michael@0: michael@0: /** michael@0: * The number of the current message. michael@0: * michael@0: * Used for debugging purposes. michael@0: */ michael@0: this._id = 0; michael@0: michael@0: /** michael@0: * The instant at which the worker was launched. michael@0: */ michael@0: this.launchTimeStamp = null; michael@0: michael@0: /** michael@0: * Timestamps provided by the worker for statistics purposes. michael@0: */ michael@0: this.workerTimeStamps = null; michael@0: } michael@0: PromiseWorker.prototype = { michael@0: /** michael@0: * Instantiate the worker lazily. michael@0: */ michael@0: get _worker() { michael@0: delete this._worker; michael@0: let worker = new ChromeWorker(this._url); michael@0: let self = this; michael@0: Object.defineProperty(this, "_worker", {value: michael@0: worker michael@0: }); michael@0: michael@0: // We assume that we call to _worker for the purpose of calling michael@0: // postMessage(). michael@0: this.launchTimeStamp = Date.now(); michael@0: michael@0: /** michael@0: * Receive errors that are not instances of OS.File.Error, propagate michael@0: * them to the listeners. michael@0: * michael@0: * The worker knows how to serialize errors that are instances michael@0: * of |OS.File.Error|. These are treated by |worker.onmessage|. michael@0: * However, for other errors, we rely on DOM's mechanism for michael@0: * serializing errors, which transmits these errors through michael@0: * |worker.onerror|. michael@0: * michael@0: * @param {Error} error Some JS error. michael@0: */ michael@0: worker.onerror = function onerror(error) { michael@0: self._log("Received uncaught error from worker", error.message, error.filename, error.lineno); michael@0: error.preventDefault(); michael@0: let {deferred} = self._queue.pop(); michael@0: deferred.reject(error); michael@0: }; michael@0: michael@0: /** michael@0: * Receive messages from the worker, propagate them to the listeners. michael@0: * michael@0: * Messages must have one of the following shapes: michael@0: * - {ok: some_value} in case of success michael@0: * - {fail: some_error} in case of error, where michael@0: * some_error is an instance of |PromiseWorker.WorkerError| michael@0: * michael@0: * Messages may also contain a field |id| to help michael@0: * with debugging. michael@0: * michael@0: * Messages may also optionally contain a field |durationMs|, holding michael@0: * the duration of the function call in milliseconds. michael@0: * michael@0: * @param {*} msg The message received from the worker. michael@0: */ michael@0: worker.onmessage = function onmessage(msg) { michael@0: self._log("Received message from worker", msg.data); michael@0: let handler = self._queue.pop(); michael@0: let deferred = handler.deferred; michael@0: let data = msg.data; michael@0: if (data.id != handler.id) { michael@0: throw new Error("Internal error: expecting msg " + handler.id + ", " + michael@0: " got " + data.id + ": " + JSON.stringify(msg.data)); michael@0: } michael@0: if ("timeStamps" in data) { michael@0: self.workerTimeStamps = data.timeStamps; michael@0: } michael@0: if ("ok" in data) { michael@0: // Pass the data to the listeners. michael@0: deferred.resolve(data); michael@0: } else if ("StopIteration" in data) { michael@0: // We have received a StopIteration error michael@0: deferred.reject(StopIteration); michael@0: } if ("fail" in data) { michael@0: // We have received an error that was serialized by the michael@0: // worker. michael@0: deferred.reject(new PromiseWorker.WorkerError(data.fail)); michael@0: } michael@0: }; michael@0: return worker; michael@0: }, michael@0: michael@0: /** michael@0: * Post a message to a worker. michael@0: * michael@0: * @param {string} fun The name of the function to call. michael@0: * @param {Array} array The contents of the message. michael@0: * @param {*=} closure An object holding references that should not be michael@0: * garbage-collected before the message treatment is complete. michael@0: * michael@0: * @return {promise} michael@0: */ michael@0: post: function post(fun, array, closure) { michael@0: let deferred = Promise.defer(); michael@0: let id = ++this._id; michael@0: let message = {fun: fun, args: array, id: id}; michael@0: this._log("Posting message", message); michael@0: try { michael@0: this._worker.postMessage(message); michael@0: } catch (ex if typeof ex == "number") { michael@0: this._log("Could not post message", message, "due to xpcom error", ex); michael@0: // handle raw xpcom errors (see eg bug 961317) michael@0: return Promise.reject(new Components.Exception("Error in postMessage", ex)); michael@0: } catch (ex) { michael@0: this._log("Could not post message", message, "due to error", ex); michael@0: return Promise.reject(ex); michael@0: } michael@0: michael@0: this._queue.push({deferred:deferred, closure: closure, id: id}); michael@0: this._log("Message posted"); michael@0: return deferred.promise; michael@0: } michael@0: }; michael@0: michael@0: /** michael@0: * An error that has been serialized by the worker. michael@0: * michael@0: * @constructor michael@0: */ michael@0: PromiseWorker.WorkerError = function WorkerError(data) { michael@0: this.data = data; michael@0: }; michael@0: michael@0: this.PromiseWorker = PromiseWorker;