/* * (C) Copyright 2014 Kurento (http://kurento.org/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ var defineProperty_IE8 = false; if (Object.defineProperty) { try { Object.defineProperty({}, 'x', {}); } catch (e) { defineProperty_IE8 = true; } } // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Function/bind if (!Function.prototype.bind) { Function.prototype.bind = function (oThis) { if (typeof this !== 'function') { // closest thing possible to the ECMAScript 5 // internal IsCallable function throw new TypeError('Function.prototype.bind - what is trying to be bound is not callable'); } var aArgs = Array.prototype.slice.call(arguments, 1), fToBind = this, fNOP = function () {}, fBound = function () { return fToBind.apply(this instanceof fNOP && oThis ? this : oThis, aArgs.concat(Array.prototype.slice.call(arguments))); }; fNOP.prototype = this.prototype; fBound.prototype = new fNOP(); return fBound; }; } var EventEmitter = require('events').EventEmitter; var inherits = require('inherits'); var packers = require('./packers'); var Mapper = require('./Mapper'); var BASE_TIMEOUT = 5000; function unifyResponseMethods(responseMethods) { if (!responseMethods) return {}; for (var key in responseMethods) { var value = responseMethods[key]; if (typeof value == 'string') responseMethods[key] = { response: value }; } return responseMethods; } function unifyTransport(transport) { if (!transport) return; // Transport as a function if (transport instanceof Function) return { send: transport }; // WebSocket & DataChannel if (transport.send instanceof Function) return transport; // Message API (Inter-window & WebWorker) if (transport.postMessage instanceof Function) { transport.send = transport.postMessage; return transport; } // Stream API if (transport.write instanceof Function) { transport.send = transport.write; return transport; } // Transports that only can receive messages, but not send if (transport.onmessage !== undefined) return; if (transport.pause instanceof Function) return; throw new SyntaxError('Transport is not a function nor a valid object'); } /** * Representation of a RPC notification * * @class * * @constructor * * @param {String} method -method of the notification * @param params - parameters of the notification */ function RpcNotification(method, params) { if (defineProperty_IE8) { this.method = method; this.params = params; } else { Object.defineProperty(this, 'method', { value: method, enumerable: true }); Object.defineProperty(this, 'params', { value: params, enumerable: true }); } } /** * @class * * @constructor * * @param {object} packer * * @param {object} [options] * * @param {object} [transport] * * @param {Function} [onRequest] */ function RpcBuilder(packer, options, transport, onRequest) { var self = this; if (!packer) throw new SyntaxError('Packer is not defined'); if (!packer.pack || !packer.unpack) throw new SyntaxError('Packer is invalid'); var responseMethods = unifyResponseMethods(packer.responseMethods); if (options instanceof Function) { if (transport != undefined) throw new SyntaxError("There can't be parameters after onRequest"); onRequest = options; transport = undefined; options = undefined; } if (options && options.send instanceof Function) { if (transport && !(transport instanceof Function)) throw new SyntaxError('Only a function can be after transport'); onRequest = transport; transport = options; options = undefined; } if (transport instanceof Function) { if (onRequest != undefined) throw new SyntaxError("There can't be parameters after onRequest"); onRequest = transport; transport = undefined; } if (transport && transport.send instanceof Function) if (onRequest && !(onRequest instanceof Function)) throw new SyntaxError('Only a function can be after transport'); options = options || {}; EventEmitter.call(this); if (onRequest) this.on('request', onRequest); if (defineProperty_IE8) this.peerID = options.peerID; else Object.defineProperty(this, 'peerID', { value: options.peerID }); var max_retries = options.max_retries || 0; function transportMessage(event) { self.decode(event.data || event); } this.getTransport = function () { return transport; }; this.setTransport = function (value) { // Remove listener from old transport if (transport) { // W3C transports if (transport.removeEventListener) transport.removeEventListener('message', transportMessage); // Node.js Streams API else if (transport.removeListener) transport.removeListener('data', transportMessage); } // Set listener on new transport if (value) { // W3C transports if (value.addEventListener) value.addEventListener('message', transportMessage); // Node.js Streams API else if (value.addListener) value.addListener('data', transportMessage); } transport = unifyTransport(value); }; if (!defineProperty_IE8) Object.defineProperty(this, 'transport', { get: this.getTransport.bind(this), set: this.setTransport.bind(this) }); this.setTransport(transport); var request_timeout = options.request_timeout || BASE_TIMEOUT; var ping_request_timeout = options.ping_request_timeout || request_timeout; var response_timeout = options.response_timeout || BASE_TIMEOUT; var duplicates_timeout = options.duplicates_timeout || BASE_TIMEOUT; var requestID = 0; var requests = new Mapper(); var responses = new Mapper(); var processedResponses = new Mapper(); var message2Key = {}; /** * Store the response to prevent to process duplicate request later */ function storeResponse(message, id, dest) { var response = { message: message, /** Timeout to auto-clean old responses */ timeout: setTimeout(function () { responses.remove(id, dest); }, response_timeout) }; responses.set(response, id, dest); } /** * Store the response to ignore duplicated messages later */ function storeProcessedResponse(ack, from) { var timeout = setTimeout(function () { processedResponses.remove(ack, from); }, duplicates_timeout); processedResponses.set(timeout, ack, from); } /** * Representation of a RPC request * * @class * @extends RpcNotification * * @constructor * * @param {String} method -method of the notification * @param params - parameters of the notification * @param {Integer} id - identifier of the request * @param [from] - source of the notification */ function RpcRequest(method, params, id, from, transport) { RpcNotification.call(this, method, params); this.getTransport = function () { return transport; }; this.setTransport = function (value) { transport = unifyTransport(value); }; if (!defineProperty_IE8) Object.defineProperty(this, 'transport', { get: this.getTransport.bind(this), set: this.setTransport.bind(this) }); var response = responses.get(id, from); /** * @constant {Boolean} duplicated */ if (!(transport || self.getTransport())) { if (defineProperty_IE8) this.duplicated = Boolean(response); else Object.defineProperty(this, 'duplicated', { value: Boolean(response) }); } var responseMethod = responseMethods[method]; this.pack = packer.pack.bind(packer, this, id); /** * Generate a response to this request * * @param {Error} [error] * @param {*} [result] * * @returns {string} */ this.reply = function (error, result, transport) { // Fix optional parameters if (error instanceof Function || (error && error.send instanceof Function)) { if (result != undefined) throw new SyntaxError("There can't be parameters after callback"); transport = error; result = null; error = undefined; } else if (result instanceof Function || (result && result.send instanceof Function)) { if (transport != undefined) throw new SyntaxError("There can't be parameters after callback"); transport = result; result = null; } transport = unifyTransport(transport); // Duplicated request, remove old response timeout if (response) clearTimeout(response.timeout); if (from != undefined) { if (error) error.dest = from; if (result) result.dest = from; } var message; // New request or overriden one, create new response with provided data if (error || result != undefined) { if (self.peerID != undefined) { if (error) error.from = self.peerID; else result.from = self.peerID; } // Protocol indicates that responses has own request methods if (responseMethod) { if (responseMethod.error == undefined && error) message = { error: error }; else { var method = error ? responseMethod.error : responseMethod.response; message = { method: method, params: error || result }; } } else message = { error: error, result: result }; message = packer.pack(message, id); } // Duplicate & not-overriden request, re-send old response else if (response) message = response.message; // New empty reply, response null value else message = packer.pack( { result: null }, id ); // Store the response to prevent to process a duplicated request later storeResponse(message, id, from); // Return the stored response so it can be directly send back transport = transport || this.getTransport() || self.getTransport(); if (transport) return transport.send(message); return message; }; } inherits(RpcRequest, RpcNotification); function cancel(message) { var key = message2Key[message]; if (!key) return; delete message2Key[message]; var request = requests.pop(key.id, key.dest); if (!request) return; clearTimeout(request.timeout); // Start duplicated responses timeout storeProcessedResponse(key.id, key.dest); } /** * Allow to cancel a request and don't wait for a response * * If `message` is not given, cancel all the request */ this.cancel = function (message) { if (message) return cancel(message); for (var message in message2Key) cancel(message); }; this.close = function () { // Prevent to receive new messages var transport = this.getTransport(); if (transport && transport.close) transport.close(4003, 'Cancel request'); // Request & processed responses this.cancel(); processedResponses.forEach(clearTimeout); // Responses responses.forEach(function (response) { clearTimeout(response.timeout); }); }; /** * Generates and encode a JsonRPC 2.0 message * * @param {String} method -method of the notification * @param params - parameters of the notification * @param [dest] - destination of the notification * @param {object} [transport] - transport where to send the message * @param [callback] - function called when a response to this request is * received. If not defined, a notification will be send instead * * @returns {string} A raw JsonRPC 2.0 request or notification string */ this.encode = function (method, params, dest, transport, callback) { // Fix optional parameters if (params instanceof Function) { if (dest != undefined) throw new SyntaxError("There can't be parameters after callback"); callback = params; transport = undefined; dest = undefined; params = undefined; } else if (dest instanceof Function) { if (transport != undefined) throw new SyntaxError("There can't be parameters after callback"); callback = dest; transport = undefined; dest = undefined; } else if (transport instanceof Function) { if (callback != undefined) throw new SyntaxError("There can't be parameters after callback"); callback = transport; transport = undefined; } if (self.peerID != undefined) { params = params || {}; params.from = self.peerID; } if (dest != undefined) { params = params || {}; params.dest = dest; } // Encode message var message = { method: method, params: params }; if (callback) { var id = requestID++; var retried = 0; message = packer.pack(message, id); function dispatchCallback(error, result) { self.cancel(message); callback(error, result); } var request = { message: message, callback: dispatchCallback, responseMethods: responseMethods[method] || {} }; var encode_transport = unifyTransport(transport); function sendRequest(transport) { var rt = method === 'ping' ? ping_request_timeout : request_timeout; request.timeout = setTimeout(timeout, rt * Math.pow(2, retried++)); message2Key[message] = { id: id, dest: dest }; requests.set(request, id, dest); transport = transport || encode_transport || self.getTransport(); if (transport) return transport.send(message); return message; } function retry(transport) { transport = unifyTransport(transport); console.warn(retried + ' retry for request message:', message); var timeout = processedResponses.pop(id, dest); clearTimeout(timeout); return sendRequest(transport); } function timeout() { if (retried < max_retries) return retry(transport); var error = new Error('Request has timed out'); error.request = message; error.retry = retry; dispatchCallback(error); } return sendRequest(transport); } // Return the packed message message = packer.pack(message); transport = transport || this.getTransport(); if (transport) return transport.send(message); return message; }; /** * Decode and process a JsonRPC 2.0 message * * @param {string} message - string with the content of the message * * @returns {RpcNotification|RpcRequest|undefined} - the representation of the * notification or the request. If a response was processed, it will return * `undefined` to notify that it was processed * * @throws {TypeError} - Message is not defined */ this.decode = function (message, transport) { if (!message) throw new TypeError('Message is not defined'); try { message = packer.unpack(message); } catch (e) { // Ignore invalid messages return console.debug(e, message); } var id = message.id; var ack = message.ack; var method = message.method; var params = message.params || {}; var from = params.from; var dest = params.dest; // Ignore messages send by us if (self.peerID != undefined && from == self.peerID) return; // Notification if (id == undefined && ack == undefined) { var notification = new RpcNotification(method, params); if (self.emit('request', notification)) return; return notification; } function processRequest() { // If we have a transport and it's a duplicated request, reply inmediatly transport = unifyTransport(transport) || self.getTransport(); if (transport) { var response = responses.get(id, from); if (response) return transport.send(response.message); } var idAck = id != undefined ? id : ack; var request = new RpcRequest(method, params, idAck, from, transport); if (self.emit('request', request)) return; return request; } function processResponse(request, error, result) { request.callback(error, result); } function duplicatedResponse(timeout) { console.warn('Response already processed', message); // Update duplicated responses timeout clearTimeout(timeout); storeProcessedResponse(ack, from); } // Request, or response with own method if (method) { // Check if it's a response with own method if (dest == undefined || dest == self.peerID) { var request = requests.get(ack, from); if (request) { var responseMethods = request.responseMethods; if (method == responseMethods.error) return processResponse(request, params); if (method == responseMethods.response) return processResponse(request, null, params); return processRequest(); } var processed = processedResponses.get(ack, from); if (processed) return duplicatedResponse(processed); } // Request return processRequest(); } var error = message.error; var result = message.result; // Ignore responses not send to us if (error && error.dest && error.dest != self.peerID) return; if (result && result.dest && result.dest != self.peerID) return; // Response var request = requests.get(ack, from); if (!request) { var processed = processedResponses.get(ack, from); if (processed) return duplicatedResponse(processed); return console.warn('No callback was defined for this message', message); } // Process response processResponse(request, error, result); }; } inherits(RpcBuilder, EventEmitter); RpcBuilder.RpcNotification = RpcNotification; module.exports = RpcBuilder; var clients = require('./clients'); var transports = require('./clients/transports'); RpcBuilder.clients = clients; RpcBuilder.clients.transports = transports; RpcBuilder.packers = packers;