michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this file, michael@0: * You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: "use strict"; michael@0: michael@0: // Don't modify this, instead set services.push.debug. michael@0: let gDebuggingEnabled = false; michael@0: michael@0: function debug(s) { michael@0: if (gDebuggingEnabled) michael@0: dump("-*- PushService.jsm: " + s + "\n"); michael@0: } michael@0: michael@0: const Cc = Components.classes; michael@0: const Ci = Components.interfaces; michael@0: const Cu = Components.utils; michael@0: const Cr = Components.results; michael@0: michael@0: Cu.import("resource://gre/modules/XPCOMUtils.jsm"); michael@0: Cu.import("resource://gre/modules/Services.jsm"); michael@0: Cu.import("resource://gre/modules/IndexedDBHelper.jsm"); michael@0: Cu.import("resource://gre/modules/Timer.jsm"); michael@0: Cu.import("resource://gre/modules/Preferences.jsm"); michael@0: Cu.import("resource://gre/modules/Promise.jsm"); michael@0: Cu.importGlobalProperties(["indexedDB"]); michael@0: michael@0: XPCOMUtils.defineLazyModuleGetter(this, "AlarmService", michael@0: "resource://gre/modules/AlarmService.jsm"); michael@0: michael@0: this.EXPORTED_SYMBOLS = ["PushService"]; michael@0: michael@0: const prefs = new Preferences("services.push."); michael@0: // Set debug first so that all debugging actually works. michael@0: gDebuggingEnabled = prefs.get("debug"); michael@0: michael@0: const kPUSHDB_DB_NAME = "push"; michael@0: const kPUSHDB_DB_VERSION = 1; // Change this if the IndexedDB format changes michael@0: const kPUSHDB_STORE_NAME = "push"; michael@0: michael@0: const kUDP_WAKEUP_WS_STATUS_CODE = 4774; // WebSocket Close status code sent michael@0: // by server to signal that it can michael@0: // wake client up using UDP. michael@0: michael@0: const kCHILD_PROCESS_MESSAGES = ["Push:Register", "Push:Unregister", michael@0: "Push:Registrations"]; michael@0: michael@0: // This is a singleton michael@0: this.PushDB = function PushDB() { michael@0: debug("PushDB()"); michael@0: michael@0: // set the indexeddb database michael@0: this.initDBHelper(kPUSHDB_DB_NAME, kPUSHDB_DB_VERSION, michael@0: [kPUSHDB_STORE_NAME]); michael@0: }; michael@0: michael@0: this.PushDB.prototype = { michael@0: __proto__: IndexedDBHelper.prototype, michael@0: michael@0: upgradeSchema: function(aTransaction, aDb, aOldVersion, aNewVersion) { michael@0: debug("PushDB.upgradeSchema()") michael@0: michael@0: let objectStore = aDb.createObjectStore(kPUSHDB_STORE_NAME, michael@0: { keyPath: "channelID" }); michael@0: michael@0: // index to fetch records based on endpoints. used by unregister michael@0: objectStore.createIndex("pushEndpoint", "pushEndpoint", { unique: true }); michael@0: michael@0: // index to fetch records per manifest, so we can identify endpoints michael@0: // associated with an app. Since an app can have multiple endpoints michael@0: // uniqueness cannot be enforced michael@0: objectStore.createIndex("manifestURL", "manifestURL", { unique: false }); michael@0: }, michael@0: michael@0: /* michael@0: * @param aChannelRecord michael@0: * The record to be added. michael@0: * @param aSuccessCb michael@0: * Callback function to invoke with result ID. michael@0: * @param aErrorCb [optional] michael@0: * Callback function to invoke when there was an error. michael@0: */ michael@0: put: function(aChannelRecord, aSuccessCb, aErrorCb) { michael@0: debug("put()"); michael@0: michael@0: this.newTxn( michael@0: "readwrite", michael@0: kPUSHDB_STORE_NAME, michael@0: function txnCb(aTxn, aStore) { michael@0: debug("Going to put " + aChannelRecord.channelID); michael@0: aStore.put(aChannelRecord).onsuccess = function setTxnResult(aEvent) { michael@0: debug("Request successful. Updated record ID: " + michael@0: aEvent.target.result); michael@0: }; michael@0: }, michael@0: aSuccessCb, michael@0: aErrorCb michael@0: ); michael@0: }, michael@0: michael@0: /* michael@0: * @param aChannelID michael@0: * The ID of record to be deleted. michael@0: * @param aSuccessCb michael@0: * Callback function to invoke with result. michael@0: * @param aErrorCb [optional] michael@0: * Callback function to invoke when there was an error. michael@0: */ michael@0: delete: function(aChannelID, aSuccessCb, aErrorCb) { michael@0: debug("delete()"); michael@0: michael@0: this.newTxn( michael@0: "readwrite", michael@0: kPUSHDB_STORE_NAME, michael@0: function txnCb(aTxn, aStore) { michael@0: debug("Going to delete " + aChannelID); michael@0: aStore.delete(aChannelID); michael@0: }, michael@0: aSuccessCb, michael@0: aErrorCb michael@0: ); michael@0: }, michael@0: michael@0: getByPushEndpoint: function(aPushEndpoint, aSuccessCb, aErrorCb) { michael@0: debug("getByPushEndpoint()"); michael@0: michael@0: this.newTxn( michael@0: "readonly", michael@0: kPUSHDB_STORE_NAME, michael@0: function txnCb(aTxn, aStore) { michael@0: aTxn.result = undefined; michael@0: michael@0: let index = aStore.index("pushEndpoint"); michael@0: index.get(aPushEndpoint).onsuccess = function setTxnResult(aEvent) { michael@0: aTxn.result = aEvent.target.result; michael@0: debug("Fetch successful " + aEvent.target.result); michael@0: } michael@0: }, michael@0: aSuccessCb, michael@0: aErrorCb michael@0: ); michael@0: }, michael@0: michael@0: getByChannelID: function(aChannelID, aSuccessCb, aErrorCb) { michael@0: debug("getByChannelID()"); michael@0: michael@0: this.newTxn( michael@0: "readonly", michael@0: kPUSHDB_STORE_NAME, michael@0: function txnCb(aTxn, aStore) { michael@0: aTxn.result = undefined; michael@0: michael@0: aStore.get(aChannelID).onsuccess = function setTxnResult(aEvent) { michael@0: aTxn.result = aEvent.target.result; michael@0: debug("Fetch successful " + aEvent.target.result); michael@0: } michael@0: }, michael@0: aSuccessCb, michael@0: aErrorCb michael@0: ); michael@0: }, michael@0: michael@0: getAllByManifestURL: function(aManifestURL, aSuccessCb, aErrorCb) { michael@0: debug("getAllByManifestURL()"); michael@0: if (!aManifestURL) { michael@0: if (typeof aErrorCb == "function") { michael@0: aErrorCb("PushDB.getAllByManifestURL: Got undefined aManifestURL"); michael@0: } michael@0: return; michael@0: } michael@0: michael@0: let self = this; michael@0: this.newTxn( michael@0: "readonly", michael@0: kPUSHDB_STORE_NAME, michael@0: function txnCb(aTxn, aStore) { michael@0: let index = aStore.index("manifestURL"); michael@0: let range = IDBKeyRange.only(aManifestURL); michael@0: aTxn.result = []; michael@0: index.openCursor(range).onsuccess = function(event) { michael@0: let cursor = event.target.result; michael@0: if (cursor) { michael@0: debug(cursor.value.manifestURL + " " + cursor.value.channelID); michael@0: aTxn.result.push(cursor.value); michael@0: cursor.continue(); michael@0: } michael@0: } michael@0: }, michael@0: aSuccessCb, michael@0: aErrorCb michael@0: ); michael@0: }, michael@0: michael@0: getAllChannelIDs: function(aSuccessCb, aErrorCb) { michael@0: debug("getAllChannelIDs()"); michael@0: michael@0: this.newTxn( michael@0: "readonly", michael@0: kPUSHDB_STORE_NAME, michael@0: function txnCb(aTxn, aStore) { michael@0: aStore.mozGetAll().onsuccess = function(event) { michael@0: aTxn.result = event.target.result; michael@0: } michael@0: }, michael@0: aSuccessCb, michael@0: aErrorCb michael@0: ); michael@0: }, michael@0: michael@0: drop: function(aSuccessCb, aErrorCb) { michael@0: debug("drop()"); michael@0: this.newTxn( michael@0: "readwrite", michael@0: kPUSHDB_STORE_NAME, michael@0: function txnCb(aTxn, aStore) { michael@0: aStore.clear(); michael@0: }, michael@0: aSuccessCb(), michael@0: aErrorCb() michael@0: ); michael@0: } michael@0: }; michael@0: michael@0: /** michael@0: * A proxy between the PushService and the WebSocket. The listener is used so michael@0: * that the PushService can silence messages from the WebSocket by setting michael@0: * PushWebSocketListener._pushService to null. This is required because michael@0: * a WebSocket can continue to send messages or errors after it has been michael@0: * closed but the PushService may not be interested in these. It's easier to michael@0: * stop listening than to have checks at specific points. michael@0: */ michael@0: this.PushWebSocketListener = function(pushService) { michael@0: this._pushService = pushService; michael@0: } michael@0: michael@0: this.PushWebSocketListener.prototype = { michael@0: onStart: function(context) { michael@0: if (!this._pushService) michael@0: return; michael@0: this._pushService._wsOnStart(context); michael@0: }, michael@0: michael@0: onStop: function(context, statusCode) { michael@0: if (!this._pushService) michael@0: return; michael@0: this._pushService._wsOnStop(context, statusCode); michael@0: }, michael@0: michael@0: onAcknowledge: function(context, size) { michael@0: // EMPTY michael@0: }, michael@0: michael@0: onBinaryMessageAvailable: function(context, message) { michael@0: // EMPTY michael@0: }, michael@0: michael@0: onMessageAvailable: function(context, message) { michael@0: if (!this._pushService) michael@0: return; michael@0: this._pushService._wsOnMessageAvailable(context, message); michael@0: }, michael@0: michael@0: onServerClose: function(context, aStatusCode, aReason) { michael@0: if (!this._pushService) michael@0: return; michael@0: this._pushService._wsOnServerClose(context, aStatusCode, aReason); michael@0: } michael@0: } michael@0: michael@0: // websocket states michael@0: // websocket is off michael@0: const STATE_SHUT_DOWN = 0; michael@0: // Websocket has been opened on client side, waiting for successful open. michael@0: // (_wsOnStart) michael@0: const STATE_WAITING_FOR_WS_START = 1; michael@0: // Websocket opened, hello sent, waiting for server reply (_handleHelloReply). michael@0: const STATE_WAITING_FOR_HELLO = 2; michael@0: // Websocket operational, handshake completed, begin protocol messaging. michael@0: const STATE_READY = 3; michael@0: michael@0: /** michael@0: * The implementation of the SimplePush system. This runs in the B2G parent michael@0: * process and is started on boot. It uses WebSockets to communicate with the michael@0: * server and PushDB (IndexedDB) for persistence. michael@0: */ michael@0: this.PushService = { michael@0: observe: function observe(aSubject, aTopic, aData) { michael@0: switch (aTopic) { michael@0: /* michael@0: * We need to call uninit() on shutdown to clean up things that modules aren't very good michael@0: * at automatically cleaning up, so we don't get shutdown leaks on browser shutdown. michael@0: */ michael@0: case "xpcom-shutdown": michael@0: this.uninit(); michael@0: case "network-active-changed": /* On B2G. */ michael@0: case "network:offline-status-changed": /* On desktop. */ michael@0: // In case of network-active-changed, always disconnect existing michael@0: // connections. In case of offline-status changing from offline to michael@0: // online, it is likely that these statements will be no-ops. michael@0: if (this._udpServer) { michael@0: this._udpServer.close(); michael@0: // Set to null since this is checked in _listenForUDPWakeup() michael@0: this._udpServer = null; michael@0: } michael@0: michael@0: this._shutdownWS(); michael@0: michael@0: // Try to connect if network-active-changed or the offline-status michael@0: // changed to online. michael@0: if (aTopic === "network-active-changed" || aData === "online") { michael@0: this._startListeningIfChannelsPresent(); michael@0: } michael@0: break; michael@0: case "nsPref:changed": michael@0: if (aData == "services.push.serverURL") { michael@0: debug("services.push.serverURL changed! websocket. new value " + michael@0: prefs.get("serverURL")); michael@0: this._shutdownWS(); michael@0: } else if (aData == "services.push.connection.enabled") { michael@0: if (prefs.get("connection.enabled")) { michael@0: this._startListeningIfChannelsPresent(); michael@0: } else { michael@0: this._shutdownWS(); michael@0: } michael@0: } else if (aData == "services.push.debug") { michael@0: gDebuggingEnabled = prefs.get("debug"); michael@0: } michael@0: break; michael@0: case "timer-callback": michael@0: if (aSubject == this._requestTimeoutTimer) { michael@0: if (Object.keys(this._pendingRequests).length == 0) { michael@0: this._requestTimeoutTimer.cancel(); michael@0: } michael@0: michael@0: // Set to true if at least one request timed out. michael@0: let requestTimedOut = false; michael@0: for (let channelID in this._pendingRequests) { michael@0: let duration = Date.now() - this._pendingRequests[channelID].ctime; michael@0: michael@0: // If any of the registration requests time out, all the ones after it michael@0: // also made to fail, since we are going to be disconnecting the socket. michael@0: if (requestTimedOut || duration > this._requestTimeout) { michael@0: debug("Request timeout: Removing " + channelID); michael@0: requestTimedOut = true; michael@0: this._pendingRequests[channelID] michael@0: .deferred.reject({status: 0, error: "TimeoutError"}); michael@0: michael@0: delete this._pendingRequests[channelID]; michael@0: for (let i = this._requestQueue.length - 1; i >= 0; --i) michael@0: if (this._requestQueue[i].channelID == channelID) michael@0: this._requestQueue.splice(i, 1); michael@0: } michael@0: } michael@0: michael@0: // The most likely reason for a registration request timing out is michael@0: // that the socket has disconnected. Best to reconnect. michael@0: if (requestTimedOut) { michael@0: this._shutdownWS(); michael@0: this._reconnectAfterBackoff(); michael@0: } michael@0: } michael@0: break; michael@0: case "webapps-clear-data": michael@0: debug("webapps-clear-data"); michael@0: michael@0: let data = aSubject.QueryInterface(Ci.mozIApplicationClearPrivateDataParams); michael@0: if (!data) { michael@0: debug("webapps-clear-data: Failed to get information about application"); michael@0: return; michael@0: } michael@0: michael@0: // Only remove push registrations for apps. michael@0: if (data.browserOnly) { michael@0: return; michael@0: } michael@0: michael@0: let appsService = Cc["@mozilla.org/AppsService;1"] michael@0: .getService(Ci.nsIAppsService); michael@0: let manifestURL = appsService.getManifestURLByLocalId(data.appId); michael@0: if (!manifestURL) { michael@0: debug("webapps-clear-data: No manifest URL found for " + data.appId); michael@0: return; michael@0: } michael@0: michael@0: this._db.getAllByManifestURL(manifestURL, function(records) { michael@0: debug("Got " + records.length); michael@0: for (let i = 0; i < records.length; i++) { michael@0: this._db.delete(records[i].channelID, null, function() { michael@0: debug("webapps-clear-data: " + manifestURL + michael@0: " Could not delete entry " + records[i].channelID); michael@0: }); michael@0: // courtesy, but don't establish a connection michael@0: // just for it michael@0: if (this._ws) { michael@0: debug("Had a connection, so telling the server"); michael@0: this._send("unregister", {channelID: records[i].channelID}); michael@0: } michael@0: } michael@0: }.bind(this), function() { michael@0: debug("webapps-clear-data: Error in getAllByManifestURL(" + manifestURL + ")"); michael@0: }); michael@0: michael@0: break; michael@0: } michael@0: }, michael@0: michael@0: get _UAID() { michael@0: return prefs.get("userAgentID"); michael@0: }, michael@0: michael@0: set _UAID(newID) { michael@0: if (typeof(newID) !== "string") { michael@0: debug("Got invalid, non-string UAID " + newID + michael@0: ". Not updating userAgentID"); michael@0: return; michael@0: } michael@0: debug("New _UAID: " + newID); michael@0: prefs.set("userAgentID", newID); michael@0: }, michael@0: michael@0: // keeps requests buffered if the websocket disconnects or is not connected michael@0: _requestQueue: [], michael@0: _ws: null, michael@0: _pendingRequests: {}, michael@0: _currentState: STATE_SHUT_DOWN, michael@0: _requestTimeout: 0, michael@0: _requestTimeoutTimer: null, michael@0: _retryFailCount: 0, michael@0: michael@0: /** michael@0: * According to the WS spec, servers should immediately close the underlying michael@0: * TCP connection after they close a WebSocket. This causes wsOnStop to be michael@0: * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the michael@0: * WebSocket up, it should try to reconnect. But if the server closes the michael@0: * WebSocket because it will wake up the client via UDP, then the client michael@0: * shouldn't re-establish the connection. If the server says that it will michael@0: * wake up the client over UDP, this is set to true in wsOnServerClose. It is michael@0: * checked in wsOnStop. michael@0: */ michael@0: _willBeWokenUpByUDP: false, michael@0: michael@0: /** michael@0: * Sends a message to the Push Server through an open websocket. michael@0: * typeof(msg) shall be an object michael@0: */ michael@0: _wsSendMessage: function(msg) { michael@0: if (!this._ws) { michael@0: debug("No WebSocket initialized. Cannot send a message."); michael@0: return; michael@0: } michael@0: msg = JSON.stringify(msg); michael@0: debug("Sending message: " + msg); michael@0: this._ws.sendMsg(msg); michael@0: }, michael@0: michael@0: init: function() { michael@0: debug("init()"); michael@0: if (!prefs.get("enabled")) michael@0: return null; michael@0: michael@0: this._db = new PushDB(); michael@0: michael@0: let ppmm = Cc["@mozilla.org/parentprocessmessagemanager;1"] michael@0: .getService(Ci.nsIMessageBroadcaster); michael@0: michael@0: kCHILD_PROCESS_MESSAGES.forEach(function addMessage(msgName) { michael@0: ppmm.addMessageListener(msgName, this); michael@0: }.bind(this)); michael@0: michael@0: this._alarmID = null; michael@0: michael@0: this._requestTimeout = prefs.get("requestTimeout"); michael@0: michael@0: this._startListeningIfChannelsPresent(); michael@0: michael@0: Services.obs.addObserver(this, "xpcom-shutdown", false); michael@0: Services.obs.addObserver(this, "webapps-clear-data", false); michael@0: michael@0: // On B2G the NetworkManager interface fires a network-active-changed michael@0: // event. michael@0: // michael@0: // The "active network" is based on priority - i.e. Wi-Fi has higher michael@0: // priority than data. The PushService should just use the preferred michael@0: // network, and not care about all interface changes. michael@0: // network-active-changed is not fired when the network goes offline, but michael@0: // socket connections time out. The check for Services.io.offline in michael@0: // _beginWSSetup() prevents unnecessary retries. When the network comes michael@0: // back online, network-active-changed is fired. michael@0: // michael@0: // On non-B2G platforms, the offline-status-changed event is used to know michael@0: // when to (dis)connect. It may not fire if the underlying OS changes michael@0: // networks; in such a case we rely on timeout. michael@0: // michael@0: // On B2G both events fire, one after the other, when the network goes michael@0: // online, so we explicitly check for the presence of NetworkManager and michael@0: // don't add an observer for offline-status-changed on B2G. michael@0: Services.obs.addObserver(this, this._getNetworkStateChangeEventName(), false); michael@0: michael@0: // This is only used for testing. Different tests require connecting to michael@0: // slightly different URLs. michael@0: prefs.observe("serverURL", this); michael@0: // Used to monitor if the user wishes to disable Push. michael@0: prefs.observe("connection.enabled", this); michael@0: // Debugging michael@0: prefs.observe("debug", this); michael@0: michael@0: this._started = true; michael@0: }, michael@0: michael@0: _shutdownWS: function() { michael@0: debug("shutdownWS()"); michael@0: this._currentState = STATE_SHUT_DOWN; michael@0: this._willBeWokenUpByUDP = false; michael@0: michael@0: if (this._wsListener) michael@0: this._wsListener._pushService = null; michael@0: try { michael@0: this._ws.close(0, null); michael@0: } catch (e) {} michael@0: this._ws = null; michael@0: michael@0: this._waitingForPong = false; michael@0: this._stopAlarm(); michael@0: }, michael@0: michael@0: uninit: function() { michael@0: if (!this._started) michael@0: return; michael@0: michael@0: debug("uninit()"); michael@0: michael@0: prefs.ignore("debug", this); michael@0: prefs.ignore("connection.enabled", this); michael@0: prefs.ignore("serverURL", this); michael@0: Services.obs.removeObserver(this, this._getNetworkStateChangeEventName()); michael@0: Services.obs.removeObserver(this, "webapps-clear-data", false); michael@0: Services.obs.removeObserver(this, "xpcom-shutdown", false); michael@0: michael@0: if (this._db) { michael@0: this._db.close(); michael@0: this._db = null; michael@0: } michael@0: michael@0: if (this._udpServer) { michael@0: this._udpServer.close(); michael@0: this._udpServer = null; michael@0: } michael@0: michael@0: // All pending requests (ideally none) are dropped at this point. We michael@0: // shouldn't have any applications performing registration/unregistration michael@0: // or receiving notifications. michael@0: this._shutdownWS(); michael@0: michael@0: // At this point, profile-change-net-teardown has already fired, so the michael@0: // WebSocket has been closed with NS_ERROR_ABORT (if it was up) and will michael@0: // try to reconnect. Stop the timer. michael@0: this._stopAlarm(); michael@0: michael@0: if (this._requestTimeoutTimer) { michael@0: this._requestTimeoutTimer.cancel(); michael@0: } michael@0: michael@0: debug("shutdown complete!"); michael@0: }, michael@0: michael@0: /** michael@0: * How retries work: The goal is to ensure websocket is always up on michael@0: * networks not supporting UDP. So the websocket should only be shutdown if michael@0: * onServerClose indicates UDP wakeup. If WS is closed due to socket error, michael@0: * _reconnectAfterBackoff() is called. The retry alarm is started and when michael@0: * it times out, beginWSSetup() is called again. michael@0: * michael@0: * On a successful connection, the alarm is cancelled in michael@0: * wsOnMessageAvailable() when the ping alarm is started. michael@0: * michael@0: * If we are in the middle of a timeout (i.e. waiting), but michael@0: * a register/unregister is called, we don't want to wait around anymore. michael@0: * _sendRequest will automatically call beginWSSetup(), which will cancel the michael@0: * timer. In addition since the state will have changed, even if a pending michael@0: * timer event comes in (because the timer fired the event before it was michael@0: * cancelled), so the connection won't be reset. michael@0: */ michael@0: _reconnectAfterBackoff: function() { michael@0: debug("reconnectAfterBackoff()"); michael@0: michael@0: // Calculate new timeout, but cap it to pingInterval. michael@0: let retryTimeout = prefs.get("retryBaseInterval") * michael@0: Math.pow(2, this._retryFailCount); michael@0: retryTimeout = Math.min(retryTimeout, prefs.get("pingInterval")); michael@0: michael@0: this._retryFailCount++; michael@0: michael@0: debug("Retry in " + retryTimeout + " Try number " + this._retryFailCount); michael@0: this._setAlarm(retryTimeout); michael@0: }, michael@0: michael@0: _beginWSSetup: function() { michael@0: debug("beginWSSetup()"); michael@0: if (this._currentState != STATE_SHUT_DOWN) { michael@0: debug("_beginWSSetup: Not in shutdown state! Current state " + michael@0: this._currentState); michael@0: return; michael@0: } michael@0: michael@0: if (!prefs.get("connection.enabled")) { michael@0: debug("_beginWSSetup: connection.enabled is not set to true. Aborting."); michael@0: return; michael@0: } michael@0: michael@0: // Stop any pending reconnects scheduled for the near future. michael@0: this._stopAlarm(); michael@0: michael@0: if (Services.io.offline) { michael@0: debug("Network is offline."); michael@0: return; michael@0: } michael@0: michael@0: let serverURL = prefs.get("serverURL"); michael@0: if (!serverURL) { michael@0: debug("No services.push.serverURL found!"); michael@0: return; michael@0: } michael@0: michael@0: let uri; michael@0: try { michael@0: uri = Services.io.newURI(serverURL, null, null); michael@0: } catch(e) { michael@0: debug("Error creating valid URI from services.push.serverURL (" + michael@0: serverURL + ")"); michael@0: return; michael@0: } michael@0: michael@0: if (uri.scheme === "wss") { michael@0: this._ws = Cc["@mozilla.org/network/protocol;1?name=wss"] michael@0: .createInstance(Ci.nsIWebSocketChannel); michael@0: } michael@0: else if (uri.scheme === "ws") { michael@0: debug("Push over an insecure connection (ws://) is not allowed!"); michael@0: return; michael@0: } michael@0: else { michael@0: debug("Unsupported websocket scheme " + uri.scheme); michael@0: return; michael@0: } michael@0: michael@0: michael@0: debug("serverURL: " + uri.spec); michael@0: this._wsListener = new PushWebSocketListener(this); michael@0: this._ws.protocol = "push-notification"; michael@0: this._ws.asyncOpen(uri, serverURL, this._wsListener, null); michael@0: this._currentState = STATE_WAITING_FOR_WS_START; michael@0: }, michael@0: michael@0: _startListeningIfChannelsPresent: function() { michael@0: // Check to see if we need to do anything. michael@0: this._db.getAllChannelIDs(function(channelIDs) { michael@0: if (channelIDs.length > 0) { michael@0: this._beginWSSetup(); michael@0: } michael@0: }.bind(this)); michael@0: }, michael@0: michael@0: /** |delay| should be in milliseconds. */ michael@0: _setAlarm: function(delay) { michael@0: // Bug 909270: Since calls to AlarmService.add() are async, calls must be michael@0: // 'queued' to ensure only one alarm is ever active. michael@0: if (this._settingAlarm) { michael@0: // onSuccess will handle the set. Overwriting the variable enforces the michael@0: // last-writer-wins semantics. michael@0: this._queuedAlarmDelay = delay; michael@0: this._waitingForAlarmSet = true; michael@0: return; michael@0: } michael@0: michael@0: // Stop any existing alarm. michael@0: this._stopAlarm(); michael@0: michael@0: this._settingAlarm = true; michael@0: AlarmService.add( michael@0: { michael@0: date: new Date(Date.now() + delay), michael@0: ignoreTimezone: true michael@0: }, michael@0: this._onAlarmFired.bind(this), michael@0: function onSuccess(alarmID) { michael@0: this._alarmID = alarmID; michael@0: debug("Set alarm " + delay + " in the future " + this._alarmID); michael@0: this._settingAlarm = false; michael@0: michael@0: if (this._waitingForAlarmSet) { michael@0: this._waitingForAlarmSet = false; michael@0: this._setAlarm(this._queuedAlarmDelay); michael@0: } michael@0: }.bind(this) michael@0: ) michael@0: }, michael@0: michael@0: _stopAlarm: function() { michael@0: if (this._alarmID !== null) { michael@0: debug("Stopped existing alarm " + this._alarmID); michael@0: AlarmService.remove(this._alarmID); michael@0: this._alarmID = null; michael@0: } michael@0: }, michael@0: michael@0: /** michael@0: * There is only one alarm active at any time. This alarm has 3 intervals michael@0: * corresponding to 3 tasks. michael@0: * michael@0: * 1) Reconnect on ping timeout. michael@0: * If we haven't received any messages from the server by the time this michael@0: * alarm fires, the connection is closed and PushService tries to michael@0: * reconnect, repurposing the alarm for (3). michael@0: * michael@0: * 2) Send a ping. michael@0: * The protocol sends a ping ({}) on the wire every pingInterval ms. Once michael@0: * it sends the ping, the alarm goes to task (1) which is waiting for michael@0: * a pong. If data is received after the ping is sent, michael@0: * _wsOnMessageAvailable() will reset the ping alarm (which cancels michael@0: * waiting for the pong). So as long as the connection is fine, pong alarm michael@0: * never fires. michael@0: * michael@0: * 3) Reconnect after backoff. michael@0: * The alarm is set by _reconnectAfterBackoff() and increases in duration michael@0: * every time we try and fail to connect. When it triggers, websocket michael@0: * setup begins again. On successful socket setup, the socket starts michael@0: * receiving messages. The alarm now goes to (2) where it monitors the michael@0: * WebSocket by sending a ping. Since incoming data is a sign of the michael@0: * connection being up, the ping alarm is reset every time data is michael@0: * received. michael@0: */ michael@0: _onAlarmFired: function() { michael@0: // Conditions are arranged in decreasing specificity. michael@0: // i.e. when _waitingForPong is true, other conditions are also true. michael@0: if (this._waitingForPong) { michael@0: debug("Did not receive pong in time. Reconnecting WebSocket."); michael@0: this._shutdownWS(); michael@0: this._reconnectAfterBackoff(); michael@0: } michael@0: else if (this._currentState == STATE_READY) { michael@0: // Send a ping. michael@0: // Bypass the queue; we don't want this to be kept pending. michael@0: // Watch out for exception in case the socket has disconnected. michael@0: // When this happens, we pretend the ping was sent and don't specially michael@0: // handle the exception, as the lack of a pong will lead to the socket michael@0: // being reset. michael@0: try { michael@0: this._wsSendMessage({}); michael@0: } catch (e) { michael@0: } michael@0: michael@0: this._waitingForPong = true; michael@0: this._setAlarm(prefs.get("requestTimeout")); michael@0: } michael@0: else if (this._alarmID !== null) { michael@0: debug("reconnect alarm fired."); michael@0: // Reconnect after back-off. michael@0: // The check for a non-null _alarmID prevents a situation where the alarm michael@0: // fires, but _shutdownWS() is called from another code-path (e.g. michael@0: // network state change) and we don't want to reconnect. michael@0: // michael@0: // It also handles the case where _beginWSSetup() is called from another michael@0: // code-path. michael@0: // michael@0: // alarmID will be non-null only when no shutdown/connect is michael@0: // called between _reconnectAfterBackoff() setting the alarm and the michael@0: // alarm firing. michael@0: michael@0: // Websocket is shut down. Backoff interval expired, try to connect. michael@0: this._beginWSSetup(); michael@0: } michael@0: }, michael@0: michael@0: /** michael@0: * Protocol handler invoked by server message. michael@0: */ michael@0: _handleHelloReply: function(reply) { michael@0: debug("handleHelloReply()"); michael@0: if (this._currentState != STATE_WAITING_FOR_HELLO) { michael@0: debug("Unexpected state " + this._currentState + michael@0: "(expected STATE_WAITING_FOR_HELLO)"); michael@0: this._shutdownWS(); michael@0: return; michael@0: } michael@0: michael@0: if (typeof reply.uaid !== "string") { michael@0: debug("No UAID received or non string UAID received"); michael@0: this._shutdownWS(); michael@0: return; michael@0: } michael@0: michael@0: if (reply.uaid === "") { michael@0: debug("Empty UAID received!"); michael@0: this._shutdownWS(); michael@0: return; michael@0: } michael@0: michael@0: // To avoid sticking extra large values sent by an evil server into prefs. michael@0: if (reply.uaid.length > 128) { michael@0: debug("UAID received from server was too long: " + michael@0: reply.uaid); michael@0: this._shutdownWS(); michael@0: return; michael@0: } michael@0: michael@0: function finishHandshake() { michael@0: this._UAID = reply.uaid; michael@0: this._currentState = STATE_READY; michael@0: this._processNextRequestInQueue(); michael@0: } michael@0: michael@0: // By this point we've got a UAID from the server that we are ready to michael@0: // accept. michael@0: // michael@0: // If we already had a valid UAID before, we have to ask apps to michael@0: // re-register. michael@0: if (this._UAID && this._UAID != reply.uaid) { michael@0: debug("got new UAID: all re-register"); michael@0: michael@0: this._notifyAllAppsRegister() michael@0: .then(this._dropRegistrations.bind(this)) michael@0: .then(finishHandshake.bind(this)); michael@0: michael@0: return; michael@0: } michael@0: michael@0: // otherwise we are good to go michael@0: finishHandshake.bind(this)(); michael@0: }, michael@0: michael@0: /** michael@0: * Protocol handler invoked by server message. michael@0: */ michael@0: _handleRegisterReply: function(reply) { michael@0: debug("handleRegisterReply()"); michael@0: if (typeof reply.channelID !== "string" || michael@0: typeof this._pendingRequests[reply.channelID] !== "object") michael@0: return; michael@0: michael@0: let tmp = this._pendingRequests[reply.channelID]; michael@0: delete this._pendingRequests[reply.channelID]; michael@0: if (Object.keys(this._pendingRequests).length == 0 && michael@0: this._requestTimeoutTimer) michael@0: this._requestTimeoutTimer.cancel(); michael@0: michael@0: if (reply.status == 200) { michael@0: tmp.deferred.resolve(reply); michael@0: } else { michael@0: tmp.deferred.reject(reply); michael@0: } michael@0: }, michael@0: michael@0: /** michael@0: * Protocol handler invoked by server message. michael@0: */ michael@0: _handleNotificationReply: function(reply) { michael@0: debug("handleNotificationReply()"); michael@0: if (typeof reply.updates !== 'object') { michael@0: debug("No 'updates' field in response. Type = " + typeof reply.updates); michael@0: return; michael@0: } michael@0: michael@0: debug("Reply updates: " + reply.updates.length); michael@0: for (let i = 0; i < reply.updates.length; i++) { michael@0: let update = reply.updates[i]; michael@0: debug("Update: " + update.channelID + ": " + update.version); michael@0: if (typeof update.channelID !== "string") { michael@0: debug("Invalid update literal at index " + i); michael@0: continue; michael@0: } michael@0: michael@0: if (update.version === undefined) { michael@0: debug("update.version does not exist"); michael@0: continue; michael@0: } michael@0: michael@0: let version = update.version; michael@0: michael@0: if (typeof version === "string") { michael@0: version = parseInt(version, 10); michael@0: } michael@0: michael@0: if (typeof version === "number" && version >= 0) { michael@0: // FIXME(nsm): this relies on app update notification being infallible! michael@0: // eventually fix this michael@0: this._receivedUpdate(update.channelID, version); michael@0: this._sendAck(update.channelID, version); michael@0: } michael@0: } michael@0: }, michael@0: michael@0: // FIXME(nsm): batch acks for efficiency reasons. michael@0: _sendAck: function(channelID, version) { michael@0: debug("sendAck()"); michael@0: this._send('ack', { michael@0: updates: [{channelID: channelID, version: version}] michael@0: }); michael@0: }, michael@0: michael@0: /* michael@0: * Must be used only by request/response style calls over the websocket. michael@0: */ michael@0: _sendRequest: function(action, data) { michael@0: debug("sendRequest() " + action); michael@0: if (typeof data.channelID !== "string") { michael@0: debug("Received non-string channelID"); michael@0: return Promise.reject("Received non-string channelID"); michael@0: } michael@0: michael@0: let deferred = Promise.defer(); michael@0: michael@0: if (Object.keys(this._pendingRequests).length == 0) { michael@0: // start the timer since we now have at least one request michael@0: if (!this._requestTimeoutTimer) michael@0: this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"] michael@0: .createInstance(Ci.nsITimer); michael@0: this._requestTimeoutTimer.init(this, michael@0: this._requestTimeout, michael@0: Ci.nsITimer.TYPE_REPEATING_SLACK); michael@0: } michael@0: michael@0: this._pendingRequests[data.channelID] = { deferred: deferred, michael@0: ctime: Date.now() }; michael@0: michael@0: this._send(action, data); michael@0: return deferred.promise; michael@0: }, michael@0: michael@0: _send: function(action, data) { michael@0: debug("send()"); michael@0: this._requestQueue.push([action, data]); michael@0: debug("Queued " + action); michael@0: this._processNextRequestInQueue(); michael@0: }, michael@0: michael@0: _processNextRequestInQueue: function() { michael@0: debug("_processNextRequestInQueue()"); michael@0: michael@0: if (this._requestQueue.length == 0) { michael@0: debug("Request queue empty"); michael@0: return; michael@0: } michael@0: michael@0: if (this._currentState != STATE_READY) { michael@0: if (!this._ws) { michael@0: // This will end up calling processNextRequestInQueue(). michael@0: this._beginWSSetup(); michael@0: } michael@0: else { michael@0: // We have a socket open so we are just waiting for hello to finish. michael@0: // That will call processNextRequestInQueue(). michael@0: } michael@0: return; michael@0: } michael@0: michael@0: let [action, data] = this._requestQueue.shift(); michael@0: data.messageType = action; michael@0: if (!this._ws) { michael@0: // If our websocket is not ready and our state is STATE_READY we may as michael@0: // well give up all assumptions about the world and start from scratch michael@0: // again. Discard the message itself, let the timeout notify error to michael@0: // the app. michael@0: debug("This should never happen!"); michael@0: this._shutdownWS(); michael@0: } michael@0: michael@0: this._wsSendMessage(data); michael@0: // Process the next one as soon as possible. michael@0: setTimeout(this._processNextRequestInQueue.bind(this), 0); michael@0: }, michael@0: michael@0: _receivedUpdate: function(aChannelID, aLatestVersion) { michael@0: debug("Updating: " + aChannelID + " -> " + aLatestVersion); michael@0: michael@0: let compareRecordVersionAndNotify = function(aPushRecord) { michael@0: debug("compareRecordVersionAndNotify()"); michael@0: if (!aPushRecord) { michael@0: debug("No record for channel ID " + aChannelID); michael@0: return; michael@0: } michael@0: michael@0: if (aPushRecord.version == null || michael@0: aPushRecord.version < aLatestVersion) { michael@0: debug("Version changed, notifying app and updating DB"); michael@0: aPushRecord.version = aLatestVersion; michael@0: this._notifyApp(aPushRecord); michael@0: this._updatePushRecord(aPushRecord) michael@0: .then( michael@0: null, michael@0: function(e) { michael@0: debug("Error updating push record"); michael@0: } michael@0: ); michael@0: } michael@0: else { michael@0: debug("No significant version change: " + aLatestVersion); michael@0: } michael@0: } michael@0: michael@0: let recoverNoSuchChannelID = function(aChannelIDFromServer) { michael@0: debug("Could not get channelID " + aChannelIDFromServer + " from DB"); michael@0: } michael@0: michael@0: this._db.getByChannelID(aChannelID, michael@0: compareRecordVersionAndNotify.bind(this), michael@0: recoverNoSuchChannelID.bind(this)); michael@0: }, michael@0: michael@0: // Fires a push-register system message to all applications that have michael@0: // registrations. michael@0: _notifyAllAppsRegister: function() { michael@0: debug("notifyAllAppsRegister()"); michael@0: let deferred = Promise.defer(); michael@0: michael@0: // records are objects describing the registrations as stored in IndexedDB. michael@0: function wakeupRegisteredApps(records) { michael@0: // Pages to be notified. michael@0: // wakeupTable[manifestURL] -> [ pageURL ] michael@0: let wakeupTable = {}; michael@0: for (let i = 0; i < records.length; i++) { michael@0: let record = records[i]; michael@0: if (!(record.manifestURL in wakeupTable)) michael@0: wakeupTable[record.manifestURL] = []; michael@0: michael@0: wakeupTable[record.manifestURL].push(record.pageURL); michael@0: } michael@0: michael@0: let messenger = Cc["@mozilla.org/system-message-internal;1"] michael@0: .getService(Ci.nsISystemMessagesInternal); michael@0: michael@0: for (let manifestURL in wakeupTable) { michael@0: wakeupTable[manifestURL].forEach(function(pageURL) { michael@0: messenger.sendMessage('push-register', {}, michael@0: Services.io.newURI(pageURL, null, null), michael@0: Services.io.newURI(manifestURL, null, null)); michael@0: }); michael@0: } michael@0: michael@0: deferred.resolve(); michael@0: } michael@0: michael@0: this._db.getAllChannelIDs(wakeupRegisteredApps, deferred.reject); michael@0: michael@0: return deferred.promise; michael@0: }, michael@0: michael@0: _notifyApp: function(aPushRecord) { michael@0: if (!aPushRecord || !aPushRecord.pageURL || !aPushRecord.manifestURL) { michael@0: debug("notifyApp() something is undefined. Dropping notification"); michael@0: return; michael@0: } michael@0: michael@0: debug("notifyApp() " + aPushRecord.pageURL + michael@0: " " + aPushRecord.manifestURL); michael@0: let pageURI = Services.io.newURI(aPushRecord.pageURL, null, null); michael@0: let manifestURI = Services.io.newURI(aPushRecord.manifestURL, null, null); michael@0: let message = { michael@0: pushEndpoint: aPushRecord.pushEndpoint, michael@0: version: aPushRecord.version michael@0: }; michael@0: let messenger = Cc["@mozilla.org/system-message-internal;1"] michael@0: .getService(Ci.nsISystemMessagesInternal); michael@0: messenger.sendMessage('push', message, pageURI, manifestURI); michael@0: }, michael@0: michael@0: _updatePushRecord: function(aPushRecord) { michael@0: debug("updatePushRecord()"); michael@0: let deferred = Promise.defer(); michael@0: this._db.put(aPushRecord, deferred.resolve, deferred.reject); michael@0: return deferred.promise; michael@0: }, michael@0: michael@0: _dropRegistrations: function() { michael@0: let deferred = Promise.defer(); michael@0: this._db.drop(deferred.resolve, deferred.reject); michael@0: return deferred.promise; michael@0: }, michael@0: michael@0: receiveMessage: function(aMessage) { michael@0: debug("receiveMessage(): " + aMessage.name); michael@0: michael@0: if (kCHILD_PROCESS_MESSAGES.indexOf(aMessage.name) == -1) { michael@0: debug("Invalid message from child " + aMessage.name); michael@0: return; michael@0: } michael@0: michael@0: let mm = aMessage.target.QueryInterface(Ci.nsIMessageSender); michael@0: let json = aMessage.data; michael@0: this[aMessage.name.slice("Push:".length).toLowerCase()](json, mm); michael@0: }, michael@0: michael@0: /** michael@0: * Called on message from the child process. aPageRecord is an object sent by michael@0: * navigator.push, identifying the sending page and other fields. michael@0: */ michael@0: register: function(aPageRecord, aMessageManager) { michael@0: debug("register()"); michael@0: michael@0: let uuidGenerator = Cc["@mozilla.org/uuid-generator;1"] michael@0: .getService(Ci.nsIUUIDGenerator); michael@0: // generateUUID() gives a UUID surrounded by {...}, slice them off. michael@0: let channelID = uuidGenerator.generateUUID().toString().slice(1, -1); michael@0: michael@0: this._sendRequest("register", {channelID: channelID}) michael@0: .then( michael@0: this._onRegisterSuccess.bind(this, aPageRecord, channelID), michael@0: this._onRegisterError.bind(this, aPageRecord, aMessageManager) michael@0: ) michael@0: .then( michael@0: function(message) { michael@0: aMessageManager.sendAsyncMessage("PushService:Register:OK", message); michael@0: }, michael@0: function(message) { michael@0: aMessageManager.sendAsyncMessage("PushService:Register:KO", message); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Exceptions thrown in _onRegisterSuccess are caught by the promise obtained michael@0: * from _sendRequest, causing the promise to be rejected instead. michael@0: */ michael@0: _onRegisterSuccess: function(aPageRecord, generatedChannelID, data) { michael@0: debug("_onRegisterSuccess()"); michael@0: let deferred = Promise.defer(); michael@0: let message = { requestID: aPageRecord.requestID }; michael@0: michael@0: if (typeof data.channelID !== "string") { michael@0: debug("Invalid channelID " + message); michael@0: message["error"] = "Invalid channelID received"; michael@0: throw message; michael@0: } michael@0: else if (data.channelID != generatedChannelID) { michael@0: debug("Server replied with different channelID " + data.channelID + michael@0: " than what UA generated " + generatedChannelID); michael@0: message["error"] = "Server sent 200 status code but different channelID"; michael@0: throw message; michael@0: } michael@0: michael@0: try { michael@0: Services.io.newURI(data.pushEndpoint, null, null); michael@0: } michael@0: catch (e) { michael@0: debug("Invalid pushEndpoint " + data.pushEndpoint); michael@0: message["error"] = "Invalid pushEndpoint " + data.pushEndpoint; michael@0: throw message; michael@0: } michael@0: michael@0: let record = { michael@0: channelID: data.channelID, michael@0: pushEndpoint: data.pushEndpoint, michael@0: pageURL: aPageRecord.pageURL, michael@0: manifestURL: aPageRecord.manifestURL, michael@0: version: null michael@0: }; michael@0: michael@0: this._updatePushRecord(record) michael@0: .then( michael@0: function() { michael@0: message["pushEndpoint"] = data.pushEndpoint; michael@0: deferred.resolve(message); michael@0: }, michael@0: function(error) { michael@0: // Unable to save. michael@0: this._send("unregister", {channelID: record.channelID}); michael@0: message["error"] = error; michael@0: deferred.reject(message); michael@0: } michael@0: ); michael@0: michael@0: return deferred.promise; michael@0: }, michael@0: michael@0: /** michael@0: * Exceptions thrown in _onRegisterError are caught by the promise obtained michael@0: * from _sendRequest, causing the promise to be rejected instead. michael@0: */ michael@0: _onRegisterError: function(aPageRecord, aMessageManager, reply) { michael@0: debug("_onRegisterError()"); michael@0: if (!reply.error) { michael@0: debug("Called without valid error message!"); michael@0: } michael@0: throw { requestID: aPageRecord.requestID, error: reply.error }; michael@0: }, michael@0: michael@0: /** michael@0: * Called on message from the child process. michael@0: * michael@0: * Why is the record being deleted from the local database before the server michael@0: * is told? michael@0: * michael@0: * Unregistration is for the benefit of the app and the AppServer michael@0: * so that the AppServer does not keep pinging a channel the UserAgent isn't michael@0: * watching The important part of the transaction in this case is left to the michael@0: * app, to tell its server of the unregistration. Even if the request to the michael@0: * PushServer were to fail, it would not affect correctness of the protocol, michael@0: * and the server GC would just clean up the channelID eventually. Since the michael@0: * appserver doesn't ping it, no data is lost. michael@0: * michael@0: * If rather we were to unregister at the server and update the database only michael@0: * on success: If the server receives the unregister, and deletes the michael@0: * channelID, but the response is lost because of network failure, the michael@0: * application is never informed. In addition the application may retry the michael@0: * unregister when it fails due to timeout at which point the server will say michael@0: * it does not know of this unregistration. We'll have to make the michael@0: * registration/unregistration phases have retries and attempts to resend michael@0: * messages from the server, and have the client acknowledge. On a server, michael@0: * data is cheap, reliable notification is not. michael@0: */ michael@0: unregister: function(aPageRecord, aMessageManager) { michael@0: debug("unregister()"); michael@0: michael@0: let fail = function(error) { michael@0: debug("unregister() fail() error " + error); michael@0: let message = {requestID: aPageRecord.requestID, error: error}; michael@0: aMessageManager.sendAsyncMessage("PushService:Unregister:KO", message); michael@0: } michael@0: michael@0: this._db.getByPushEndpoint(aPageRecord.pushEndpoint, function(record) { michael@0: // If the endpoint didn't exist, let's just fail. michael@0: if (record === undefined) { michael@0: fail("NotFoundError"); michael@0: return; michael@0: } michael@0: michael@0: // Non-owner tried to unregister, say success, but don't do anything. michael@0: if (record.manifestURL !== aPageRecord.manifestURL) { michael@0: aMessageManager.sendAsyncMessage("PushService:Unregister:OK", { michael@0: requestID: aPageRecord.requestID, michael@0: pushEndpoint: aPageRecord.pushEndpoint michael@0: }); michael@0: return; michael@0: } michael@0: michael@0: this._db.delete(record.channelID, function() { michael@0: // Let's be nice to the server and try to inform it, but we don't care michael@0: // about the reply. michael@0: this._send("unregister", {channelID: record.channelID}); michael@0: aMessageManager.sendAsyncMessage("PushService:Unregister:OK", { michael@0: requestID: aPageRecord.requestID, michael@0: pushEndpoint: aPageRecord.pushEndpoint michael@0: }); michael@0: }.bind(this), fail); michael@0: }.bind(this), fail); michael@0: }, michael@0: michael@0: /** michael@0: * Called on message from the child process michael@0: */ michael@0: registrations: function(aPageRecord, aMessageManager) { michael@0: debug("registrations()"); michael@0: michael@0: if (aPageRecord.manifestURL) { michael@0: this._db.getAllByManifestURL(aPageRecord.manifestURL, michael@0: this._onRegistrationsSuccess.bind(this, aPageRecord, aMessageManager), michael@0: this._onRegistrationsError.bind(this, aPageRecord, aMessageManager)); michael@0: } michael@0: else { michael@0: this._onRegistrationsError(aPageRecord, aMessageManager); michael@0: } michael@0: }, michael@0: michael@0: _onRegistrationsSuccess: function(aPageRecord, michael@0: aMessageManager, michael@0: pushRecords) { michael@0: let registrations = []; michael@0: pushRecords.forEach(function(pushRecord) { michael@0: registrations.push({ michael@0: __exposedProps__: { pushEndpoint: 'r', version: 'r' }, michael@0: pushEndpoint: pushRecord.pushEndpoint, michael@0: version: pushRecord.version michael@0: }); michael@0: }); michael@0: aMessageManager.sendAsyncMessage("PushService:Registrations:OK", { michael@0: requestID: aPageRecord.requestID, michael@0: registrations: registrations michael@0: }); michael@0: }, michael@0: michael@0: _onRegistrationsError: function(aPageRecord, aMessageManager) { michael@0: aMessageManager.sendAsyncMessage("PushService:Registrations:KO", { michael@0: requestID: aPageRecord.requestID, michael@0: error: "Database error" michael@0: }); michael@0: }, michael@0: michael@0: // begin Push protocol handshake michael@0: _wsOnStart: function(context) { michael@0: debug("wsOnStart()"); michael@0: if (this._currentState != STATE_WAITING_FOR_WS_START) { michael@0: debug("NOT in STATE_WAITING_FOR_WS_START. Current state " + michael@0: this._currentState + ". Skipping"); michael@0: return; michael@0: } michael@0: michael@0: // Since we've had a successful connection reset the retry fail count. michael@0: this._retryFailCount = 0; michael@0: michael@0: // Openning an available UDP port. michael@0: this._listenForUDPWakeup(); michael@0: michael@0: let data = { michael@0: messageType: "hello", michael@0: } michael@0: michael@0: if (this._UAID) michael@0: data["uaid"] = this._UAID; michael@0: michael@0: let networkState = this._getNetworkState(); michael@0: if (networkState.ip) { michael@0: // Hostport is apparently a thing. michael@0: data["wakeup_hostport"] = { michael@0: ip: networkState.ip, michael@0: port: this._udpServer && this._udpServer.port michael@0: }; michael@0: michael@0: data["mobilenetwork"] = { michael@0: mcc: networkState.mcc, michael@0: mnc: networkState.mnc michael@0: }; michael@0: } michael@0: michael@0: function sendHelloMessage(ids) { michael@0: // On success, ids is an array, on error its not. michael@0: data["channelIDs"] = ids.map ? michael@0: ids.map(function(el) { return el.channelID; }) : []; michael@0: this._wsSendMessage(data); michael@0: this._currentState = STATE_WAITING_FOR_HELLO; michael@0: } michael@0: michael@0: this._db.getAllChannelIDs(sendHelloMessage.bind(this), michael@0: sendHelloMessage.bind(this)); michael@0: }, michael@0: michael@0: /** michael@0: * This statusCode is not the websocket protocol status code, but the TCP michael@0: * connection close status code. michael@0: * michael@0: * If we do not explicitly call ws.close() then statusCode is always michael@0: * NS_BASE_STREAM_CLOSED, even on a successful close. michael@0: */ michael@0: _wsOnStop: function(context, statusCode) { michael@0: debug("wsOnStop()"); michael@0: michael@0: if (statusCode != Cr.NS_OK && michael@0: !(statusCode == Cr.NS_BASE_STREAM_CLOSED && this._willBeWokenUpByUDP)) { michael@0: debug("Socket error " + statusCode); michael@0: this._reconnectAfterBackoff(); michael@0: } michael@0: michael@0: // Bug 896919. We always shutdown the WebSocket, even if we need to michael@0: // reconnect. This works because _reconnectAfterBackoff() is "async" michael@0: // (there is a minimum delay of the pref retryBaseInterval, which by default michael@0: // is 5000ms), so that function will open the WebSocket again. michael@0: this._shutdownWS(); michael@0: }, michael@0: michael@0: _wsOnMessageAvailable: function(context, message) { michael@0: debug("wsOnMessageAvailable() " + message); michael@0: michael@0: this._waitingForPong = false; michael@0: michael@0: // Reset the ping timer. Note: This path is executed at every step of the michael@0: // handshake, so this alarm does not need to be set explicitly at startup. michael@0: this._setAlarm(prefs.get("pingInterval")); michael@0: michael@0: let reply = undefined; michael@0: try { michael@0: reply = JSON.parse(message); michael@0: } catch(e) { michael@0: debug("Parsing JSON failed. text : " + message); michael@0: return; michael@0: } michael@0: michael@0: if (typeof reply.messageType != "string") { michael@0: debug("messageType not a string " + reply.messageType); michael@0: return; michael@0: } michael@0: michael@0: // A whitelist of protocol handlers. Add to these if new messages are added michael@0: // in the protocol. michael@0: let handlers = ["Hello", "Register", "Notification"]; michael@0: michael@0: // Build up the handler name to call from messageType. michael@0: // e.g. messageType == "register" -> _handleRegisterReply. michael@0: let handlerName = reply.messageType[0].toUpperCase() + michael@0: reply.messageType.slice(1).toLowerCase(); michael@0: michael@0: if (handlers.indexOf(handlerName) == -1) { michael@0: debug("No whitelisted handler " + handlerName + ". messageType: " + michael@0: reply.messageType); michael@0: return; michael@0: } michael@0: michael@0: let handler = "_handle" + handlerName + "Reply"; michael@0: michael@0: if (typeof this[handler] !== "function") { michael@0: debug("Handler whitelisted but not implemented! " + handler); michael@0: return; michael@0: } michael@0: michael@0: this[handler](reply); michael@0: }, michael@0: michael@0: /** michael@0: * The websocket should never be closed. Since we don't call ws.close(), michael@0: * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that michael@0: * function), which calls reconnect and re-establishes the WebSocket michael@0: * connection. michael@0: * michael@0: * If the server said it'll use UDP for wakeup, we set _willBeWokenUpByUDP michael@0: * and stop reconnecting in _wsOnStop(). michael@0: */ michael@0: _wsOnServerClose: function(context, aStatusCode, aReason) { michael@0: debug("wsOnServerClose() " + aStatusCode + " " + aReason); michael@0: michael@0: // Switch over to UDP. michael@0: if (aStatusCode == kUDP_WAKEUP_WS_STATUS_CODE) { michael@0: debug("Server closed with promise to wake up"); michael@0: this._willBeWokenUpByUDP = true; michael@0: // TODO: there should be no pending requests michael@0: } michael@0: }, michael@0: michael@0: _listenForUDPWakeup: function() { michael@0: debug("listenForUDPWakeup()"); michael@0: michael@0: if (this._udpServer) { michael@0: debug("UDP Server already running"); michael@0: return; michael@0: } michael@0: michael@0: if (!this._getNetworkState().ip) { michael@0: debug("No IP"); michael@0: return; michael@0: } michael@0: michael@0: if (!prefs.get("udp.wakeupEnabled")) { michael@0: debug("UDP support disabled"); michael@0: return; michael@0: } michael@0: michael@0: this._udpServer = Cc["@mozilla.org/network/udp-socket;1"] michael@0: .createInstance(Ci.nsIUDPSocket); michael@0: this._udpServer.init(-1, false); michael@0: this._udpServer.asyncListen(this); michael@0: debug("listenForUDPWakeup listening on " + this._udpServer.port); michael@0: michael@0: return this._udpServer.port; michael@0: }, michael@0: michael@0: /** michael@0: * Called by UDP Server Socket. As soon as a ping is recieved via UDP, michael@0: * reconnect the WebSocket and get the actual data. michael@0: */ michael@0: onPacketReceived: function(aServ, aMessage) { michael@0: debug("Recv UDP datagram on port: " + this._udpServer.port); michael@0: this._beginWSSetup(); michael@0: }, michael@0: michael@0: /** michael@0: * Called by UDP Server Socket if the socket was closed for some reason. michael@0: * michael@0: * If this happens, we reconnect the WebSocket to not miss out on michael@0: * notifications. michael@0: */ michael@0: onStopListening: function(aServ, aStatus) { michael@0: debug("UDP Server socket was shutdown. Status: " + aStatus); michael@0: this._udpServer = undefined; michael@0: this._beginWSSetup(); michael@0: }, michael@0: michael@0: /** michael@0: * Get mobile network information to decide if the client is capable of being michael@0: * woken up by UDP (which currently just means having an mcc and mnc along michael@0: * with an IP). michael@0: */ michael@0: _getNetworkState: function() { michael@0: debug("getNetworkState()"); michael@0: try { michael@0: if (!prefs.get("udp.wakeupEnabled")) { michael@0: debug("UDP support disabled, we do not send any carrier info"); michael@0: throw "UDP disabled"; michael@0: } michael@0: michael@0: let nm = Cc["@mozilla.org/network/manager;1"].getService(Ci.nsINetworkManager); michael@0: if (nm.active && nm.active.type == Ci.nsINetworkInterface.NETWORK_TYPE_MOBILE) { michael@0: let icc = Cc["@mozilla.org/ril/content-helper;1"].getService(Ci.nsIIccProvider); michael@0: // TODO: Bug 927721 - PushService for multi-sim michael@0: // In Multi-sim, there is more than one client in iccProvider. Each michael@0: // client represents a icc service. To maintain backward compatibility michael@0: // with single sim, we always use client 0 for now. Adding support michael@0: // for multiple sim will be addressed in bug 927721, if needed. michael@0: let clientId = 0; michael@0: let iccInfo = icc.getIccInfo(clientId); michael@0: if (iccInfo) { michael@0: debug("Running on mobile data"); michael@0: return { michael@0: mcc: iccInfo.mcc, michael@0: mnc: iccInfo.mnc, michael@0: ip: nm.active.ip michael@0: } michael@0: } michael@0: } michael@0: } catch (e) {} michael@0: michael@0: debug("Running on wifi"); michael@0: michael@0: return { michael@0: mcc: 0, michael@0: mnc: 0, michael@0: ip: undefined michael@0: }; michael@0: }, michael@0: michael@0: // utility function used to add/remove observers in init() and shutdown() michael@0: _getNetworkStateChangeEventName: function() { michael@0: try { michael@0: Cc["@mozilla.org/network/manager;1"].getService(Ci.nsINetworkManager); michael@0: return "network-active-changed"; michael@0: } catch (e) { michael@0: return "network:offline-status-changed"; michael@0: } michael@0: } michael@0: }