/* * (C) Copyright 2014 Kurento (http://kurento.org/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ var defineProperty_IE8 = false if(Object.defineProperty) { try { Object.defineProperty({}, "x", {}); } catch(e) { defineProperty_IE8 = true } } // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Function/bind if (!Function.prototype.bind) { Function.prototype.bind = function(oThis) { if (typeof this !== 'function') { // closest thing possible to the ECMAScript 5 // internal IsCallable function throw new TypeError('Function.prototype.bind - what is trying to be bound is not callable'); } var aArgs = Array.prototype.slice.call(arguments, 1), fToBind = this, fNOP = function() {}, fBound = function() { return fToBind.apply(this instanceof fNOP && oThis ? this : oThis, aArgs.concat(Array.prototype.slice.call(arguments))); }; fNOP.prototype = this.prototype; fBound.prototype = new fNOP(); return fBound; }; } var EventEmitter = require('events').EventEmitter; var inherits = require('inherits'); var packers = require('./packers'); var Mapper = require('./Mapper'); var BASE_TIMEOUT = 5000; function unifyResponseMethods(responseMethods) { if(!responseMethods) return {}; for(var key in responseMethods) { var value = responseMethods[key]; if(typeof value == 'string') responseMethods[key] = { response: value } }; return responseMethods; }; function unifyTransport(transport) { if(!transport) return; // Transport as a function if(transport instanceof Function) return {send: transport}; // WebSocket & DataChannel if(transport.send instanceof Function) return transport; // Message API (Inter-window & WebWorker) if(transport.postMessage instanceof Function) { transport.send = transport.postMessage; return transport; } // Stream API if(transport.write instanceof Function) { transport.send = transport.write; return transport; } // Transports that only can receive messages, but not send if(transport.onmessage !== undefined) return; if(transport.pause instanceof Function) return; throw new SyntaxError("Transport is not a function nor a valid object"); }; /** * Representation of a RPC notification * * @class * * @constructor * * @param {String} method -method of the notification * @param params - parameters of the notification */ function RpcNotification(method, params) { if(defineProperty_IE8) { this.method = method this.params = params } else { Object.defineProperty(this, 'method', {value: method, enumerable: true}); Object.defineProperty(this, 'params', {value: params, enumerable: true}); } }; /** * @class * * @constructor * * @param {object} packer * * @param {object} [options] * * @param {object} [transport] * * @param {Function} [onRequest] */ function RpcBuilder(packer, options, transport, onRequest) { var self = this; if(!packer) throw new SyntaxError('Packer is not defined'); if(!packer.pack || !packer.unpack) throw new SyntaxError('Packer is invalid'); var responseMethods = unifyResponseMethods(packer.responseMethods); if(options instanceof Function) { if(transport != undefined) throw new SyntaxError("There can't be parameters after onRequest"); onRequest = options; transport = undefined; options = undefined; }; if(options && options.send instanceof Function) { if(transport && !(transport instanceof Function)) throw new SyntaxError("Only a function can be after transport"); onRequest = transport; transport = options; options = undefined; }; if(transport instanceof Function) { if(onRequest != undefined) throw new SyntaxError("There can't be parameters after onRequest"); onRequest = transport; transport = undefined; }; if(transport && transport.send instanceof Function) if(onRequest && !(onRequest instanceof Function)) throw new SyntaxError("Only a function can be after transport"); options = options || {}; EventEmitter.call(this); if(onRequest) this.on('request', onRequest); if(defineProperty_IE8) this.peerID = options.peerID else Object.defineProperty(this, 'peerID', {value: options.peerID}); var max_retries = options.max_retries || 0; function transportMessage(event) { self.decode(event.data || event); }; this.getTransport = function() { return transport; } this.setTransport = function(value) { // Remove listener from old transport if(transport) { // W3C transports if(transport.removeEventListener) transport.removeEventListener('message', transportMessage); // Node.js Streams API else if(transport.removeListener) transport.removeListener('data', transportMessage); }; // Set listener on new transport if(value) { // W3C transports if(value.addEventListener) value.addEventListener('message', transportMessage); // Node.js Streams API else if(value.addListener) value.addListener('data', transportMessage); }; transport = unifyTransport(value); } if(!defineProperty_IE8) Object.defineProperty(this, 'transport', { get: this.getTransport.bind(this), set: this.setTransport.bind(this) }) this.setTransport(transport); var request_timeout = options.request_timeout || BASE_TIMEOUT; var ping_request_timeout = options.ping_request_timeout || request_timeout; var response_timeout = options.response_timeout || BASE_TIMEOUT; var duplicates_timeout = options.duplicates_timeout || BASE_TIMEOUT; var requestID = 0; var requests = new Mapper(); var responses = new Mapper(); var processedResponses = new Mapper(); var message2Key = {}; /** * Store the response to prevent to process duplicate request later */ function storeResponse(message, id, dest) { var response = { message: message, /** Timeout to auto-clean old responses */ timeout: setTimeout(function() { responses.remove(id, dest); }, response_timeout) }; responses.set(response, id, dest); }; /** * Store the response to ignore duplicated messages later */ function storeProcessedResponse(ack, from) { var timeout = setTimeout(function() { processedResponses.remove(ack, from); }, duplicates_timeout); processedResponses.set(timeout, ack, from); }; /** * Representation of a RPC request * * @class * @extends RpcNotification * * @constructor * * @param {String} method -method of the notification * @param params - parameters of the notification * @param {Integer} id - identifier of the request * @param [from] - source of the notification */ function RpcRequest(method, params, id, from, transport) { RpcNotification.call(this, method, params); this.getTransport = function() { return transport; } this.setTransport = function(value) { transport = unifyTransport(value); } if(!defineProperty_IE8) Object.defineProperty(this, 'transport', { get: this.getTransport.bind(this), set: this.setTransport.bind(this) }) var response = responses.get(id, from); /** * @constant {Boolean} duplicated */ if(!(transport || self.getTransport())) { if(defineProperty_IE8) this.duplicated = Boolean(response) else Object.defineProperty(this, 'duplicated', { value: Boolean(response) }); } var responseMethod = responseMethods[method]; this.pack = packer.pack.bind(packer, this, id) /** * Generate a response to this request * * @param {Error} [error] * @param {*} [result] * * @returns {string} */ this.reply = function(error, result, transport) { // Fix optional parameters if(error instanceof Function || error && error.send instanceof Function) { if(result != undefined) throw new SyntaxError("There can't be parameters after callback"); transport = error; result = null; error = undefined; } else if(result instanceof Function || result && result.send instanceof Function) { if(transport != undefined) throw new SyntaxError("There can't be parameters after callback"); transport = result; result = null; }; transport = unifyTransport(transport); // Duplicated request, remove old response timeout if(response) clearTimeout(response.timeout); if(from != undefined) { if(error) error.dest = from; if(result) result.dest = from; }; var message; // New request or overriden one, create new response with provided data if(error || result != undefined) { if(self.peerID != undefined) { if(error) error.from = self.peerID; else result.from = self.peerID; } // Protocol indicates that responses has own request methods if(responseMethod) { if(responseMethod.error == undefined && error) message = { error: error }; else { var method = error ? responseMethod.error : responseMethod.response; message = { method: method, params: error || result }; } } else message = { error: error, result: result }; message = packer.pack(message, id); } // Duplicate & not-overriden request, re-send old response else if(response) message = response.message; // New empty reply, response null value else message = packer.pack({result: null}, id); // Store the response to prevent to process a duplicated request later storeResponse(message, id, from); // Return the stored response so it can be directly send back transport = transport || this.getTransport() || self.getTransport(); if(transport) return transport.send(message); return message; } }; inherits(RpcRequest, RpcNotification); function cancel(message) { var key = message2Key[message]; if(!key) return; delete message2Key[message]; var request = requests.pop(key.id, key.dest); if(!request) return; clearTimeout(request.timeout); // Start duplicated responses timeout storeProcessedResponse(key.id, key.dest); }; /** * Allow to cancel a request and don't wait for a response * * If `message` is not given, cancel all the request */ this.cancel = function(message) { if(message) return cancel(message); for(var message in message2Key) cancel(message); }; this.close = function() { // Prevent to receive new messages var transport = this.getTransport(); if(transport && transport.close) transport.close(4003, "Cancel request"); // Request & processed responses this.cancel(); processedResponses.forEach(clearTimeout); // Responses responses.forEach(function(response) { clearTimeout(response.timeout); }); }; /** * Generates and encode a JsonRPC 2.0 message * * @param {String} method -method of the notification * @param params - parameters of the notification * @param [dest] - destination of the notification * @param {object} [transport] - transport where to send the message * @param [callback] - function called when a response to this request is * received. If not defined, a notification will be send instead * * @returns {string} A raw JsonRPC 2.0 request or notification string */ this.encode = function(method, params, dest, transport, callback) { // Fix optional parameters if(params instanceof Function) { if(dest != undefined) throw new SyntaxError("There can't be parameters after callback"); callback = params; transport = undefined; dest = undefined; params = undefined; } else if(dest instanceof Function) { if(transport != undefined) throw new SyntaxError("There can't be parameters after callback"); callback = dest; transport = undefined; dest = undefined; } else if(transport instanceof Function) { if(callback != undefined) throw new SyntaxError("There can't be parameters after callback"); callback = transport; transport = undefined; }; if(self.peerID != undefined) { params = params || {}; params.from = self.peerID; }; if(dest != undefined) { params = params || {}; params.dest = dest; }; // Encode message var message = { method: method, params: params }; if(callback) { var id = requestID++; var retried = 0; message = packer.pack(message, id); function dispatchCallback(error, result) { self.cancel(message); callback(error, result); }; var request = { message: message, callback: dispatchCallback, responseMethods: responseMethods[method] || {} }; var encode_transport = unifyTransport(transport); function sendRequest(transport) { var rt = (method === 'ping' ? ping_request_timeout : request_timeout); request.timeout = setTimeout(timeout, rt*Math.pow(2, retried++)); message2Key[message] = {id: id, dest: dest}; requests.set(request, id, dest); transport = transport || encode_transport || self.getTransport(); if(transport) return transport.send(message); return message; }; function retry(transport) { transport = unifyTransport(transport); console.warn(retried+' retry for request message:',message); var timeout = processedResponses.pop(id, dest); clearTimeout(timeout); return sendRequest(transport); }; function timeout() { if(retried < max_retries) return retry(transport); var error = new Error('Request has timed out'); error.request = message; error.retry = retry; dispatchCallback(error) }; return sendRequest(transport); }; // Return the packed message message = packer.pack(message); transport = transport || this.getTransport(); if(transport) return transport.send(message); return message; }; /** * Decode and process a JsonRPC 2.0 message * * @param {string} message - string with the content of the message * * @returns {RpcNotification|RpcRequest|undefined} - the representation of the * notification or the request. If a response was processed, it will return * `undefined` to notify that it was processed * * @throws {TypeError} - Message is not defined */ this.decode = function(message, transport) { if(!message) throw new TypeError("Message is not defined"); try { message = packer.unpack(message); } catch(e) { // Ignore invalid messages return console.debug(e, message); }; var id = message.id; var ack = message.ack; var method = message.method; var params = message.params || {}; var from = params.from; var dest = params.dest; // Ignore messages send by us if(self.peerID != undefined && from == self.peerID) return; // Notification if(id == undefined && ack == undefined) { var notification = new RpcNotification(method, params); if(self.emit('request', notification)) return; return notification; }; function processRequest() { // If we have a transport and it's a duplicated request, reply inmediatly transport = unifyTransport(transport) || self.getTransport(); if(transport) { var response = responses.get(id, from); if(response) return transport.send(response.message); }; var idAck = (id != undefined) ? id : ack; var request = new RpcRequest(method, params, idAck, from, transport); if(self.emit('request', request)) return; return request; }; function processResponse(request, error, result) { request.callback(error, result); }; function duplicatedResponse(timeout) { console.warn("Response already processed", message); // Update duplicated responses timeout clearTimeout(timeout); storeProcessedResponse(ack, from); }; // Request, or response with own method if(method) { // Check if it's a response with own method if(dest == undefined || dest == self.peerID) { var request = requests.get(ack, from); if(request) { var responseMethods = request.responseMethods; if(method == responseMethods.error) return processResponse(request, params); if(method == responseMethods.response) return processResponse(request, null, params); return processRequest(); } var processed = processedResponses.get(ack, from); if(processed) return duplicatedResponse(processed); } // Request return processRequest(); }; var error = message.error; var result = message.result; // Ignore responses not send to us if(error && error.dest && error.dest != self.peerID) return; if(result && result.dest && result.dest != self.peerID) return; // Response var request = requests.get(ack, from); if(!request) { var processed = processedResponses.get(ack, from); if(processed) return duplicatedResponse(processed); return console.warn("No callback was defined for this message", message); }; // Process response processResponse(request, error, result); }; }; inherits(RpcBuilder, EventEmitter); RpcBuilder.RpcNotification = RpcNotification; module.exports = RpcBuilder; var clients = require('./clients'); var transports = require('./clients/transports'); RpcBuilder.clients = clients; RpcBuilder.clients.transports = transports; RpcBuilder.packers = packers;