openvidu-browser: onForciblyReconnectSubscriber

pull/648/head
pabloFuente 2021-06-30 15:33:05 +02:00
parent ffcb56cc0d
commit 00d64ded9b
4 changed files with 155 additions and 88 deletions

View File

@ -117,12 +117,14 @@ export class OpenVidu {
* @hidden * @hidden
*/ */
webrtcStatsInterval: number = -1; webrtcStatsInterval: number = -1;
/** /**
* @hidden * @hidden
*/ */
sendBrowserLogs: OpenViduLoggerConfiguration = OpenViduLoggerConfiguration.disabled; sendBrowserLogs: OpenViduLoggerConfiguration = OpenViduLoggerConfiguration.disabled;
/**
* @hidden
*/
isPro: boolean = false;
/** /**
* @hidden * @hidden
*/ */
@ -768,7 +770,8 @@ export class OpenVidu {
filterEventDispatched: this.session.onFilterEventDispatched.bind(this.session), filterEventDispatched: this.session.onFilterEventDispatched.bind(this.session),
iceCandidate: this.session.recvIceCandidate.bind(this.session), iceCandidate: this.session.recvIceCandidate.bind(this.session),
mediaError: this.session.onMediaError.bind(this.session), mediaError: this.session.onMediaError.bind(this.session),
masterNodeCrashedNotification: this.onMasterNodeCrashedNotification.bind(this) masterNodeCrashedNotification: this.onMasterNodeCrashedNotification.bind(this),
forciblyReconnectSubscriber: this.session.onForciblyReconnectSubscriber.bind(this.session)
} }
}; };
this.jsonRpcClient = new RpcBuilder.clients.JsonRpcClient(config); this.jsonRpcClient = new RpcBuilder.clients.JsonRpcClient(config);

View File

@ -778,25 +778,22 @@ export class Session extends EventDispatcher {
* @hidden * @hidden
*/ */
onParticipantLeft(event: { connectionId: string, reason: string }): void { onParticipantLeft(event: { connectionId: string, reason: string }): void {
this.getRemoteConnection(event.connectionId, 'onParticipantLeft').then(connection => {
if (!!connection.stream) {
const stream = connection.stream;
if (this.remoteConnections.size > 0) { const streamEvent = new StreamEvent(true, this, 'streamDestroyed', stream, event.reason);
this.getRemoteConnection(event.connectionId, 'onParticipantLeft').then(connection => { this.ee.emitEvent('streamDestroyed', [streamEvent]);
if (!!connection.stream) { streamEvent.callDefaultBehavior();
const stream = connection.stream;
const streamEvent = new StreamEvent(true, this, 'streamDestroyed', stream, event.reason); this.remoteStreamsCreated.delete(stream.streamId);
this.ee.emitEvent('streamDestroyed', [streamEvent]); }
streamEvent.callDefaultBehavior(); this.remoteConnections.delete(connection.connectionId);
this.ee.emitEvent('connectionDestroyed', [new ConnectionEvent(false, this, 'connectionDestroyed', connection, event.reason)]);
this.remoteStreamsCreated.delete(stream.streamId); })
} .catch(openViduError => {
this.remoteConnections.delete(connection.connectionId); logger.error(openViduError);
this.ee.emitEvent('connectionDestroyed', [new ConnectionEvent(false, this, 'connectionDestroyed', connection, event.reason)]); });
})
.catch(openViduError => {
logger.error(openViduError);
});
}
} }
/** /**
@ -1123,6 +1120,61 @@ export class Session extends EventDispatcher {
}); });
} }
/**
* @hidden
*/
onForciblyReconnectSubscriber(event: { connectionId: string, streamId: string, sdpOffer: string }): Promise<void> {
return new Promise((resolve, reject) => {
this.getRemoteConnection(event.connectionId, 'onForciblyReconnectSubscriber')
.then(connection => {
if (!!connection.stream && connection.stream.streamId === event.streamId) {
const stream = connection.stream;
if (stream.setupReconnectionEventEmitter(resolve, reject)) {
// Ongoing reconnection
// Wait for the event emitter to be free (with success or error) and call the method again
if (stream.reconnectionEventEmitter!['onForciblyReconnectSubscriberLastEvent'] != null) {
// Two or more onForciblyReconnectSubscriber events were received while a reconnection process
// of the subscriber was already taking place. Always use the last one to retry the re-subscription
// process, as that SDP offer will be the only one available at the server side. Ignore previous ones
stream.reconnectionEventEmitter!['onForciblyReconnectSubscriberLastEvent'] = event;
reject('Ongoing forced subscriber reconnection');
} else {
// One onForciblyReconnectSubscriber even has been received while a reconnection process
// of the subscriber was already taking place. Set up a listener to wait for it to retry the
// forced reconnection process
stream.reconnectionEventEmitter!['onForciblyReconnectSubscriberLastEvent'] = event;
const callback = () => {
const eventAux = stream.reconnectionEventEmitter!['onForciblyReconnectSubscriberLastEvent'];
delete stream.reconnectionEventEmitter!['onForciblyReconnectSubscriberLastEvent'];
this.onForciblyReconnectSubscriber(eventAux);
}
stream.reconnectionEventEmitter!.once('success', () => {
callback();
});
stream.reconnectionEventEmitter!.once('error', () => {
callback();
});
}
return;
}
stream.completeWebRtcPeerReceive(true, event.sdpOffer)
.then(() => stream.finalResolveForSubscription(true, resolve))
.catch(error => stream.finalRejectForSubscription(true, `Error while forcibly reconnecting remote stream ${event.streamId}: ${error.toString()}`, reject));
} else {
const errMsg = "No stream with streamId '" + event.streamId + "' found for connection '" + event.connectionId + "' on 'streamPropertyChanged' event";
logger.error(errMsg);
reject(errMsg);
}
})
.catch(openViduError => {
logger.error(openViduError);
reject(openViduError);
});
});
}
/** /**
* @hidden * @hidden
*/ */
@ -1463,6 +1515,7 @@ export class Session extends EventDispatcher {
if (!!sendBrowserLogs) { if (!!sendBrowserLogs) {
this.openvidu.sendBrowserLogs = sendBrowserLogs; this.openvidu.sendBrowserLogs = sendBrowserLogs;
} }
this.openvidu.isPro = !!webrtcStatsInterval && !!sendBrowserLogs;
this.openvidu.wsUri = 'wss://' + url.host + '/openvidu'; this.openvidu.wsUri = 'wss://' + url.host + '/openvidu';
this.openvidu.httpUri = 'https://' + url.host; this.openvidu.httpUri = 'https://' + url.host;

View File

@ -815,6 +815,27 @@ export class Stream {
return false; return false;
} }
/**
* @hidden
*/
setupReconnectionEventEmitter(resolve: (value: void | PromiseLike<void>) => void, reject: (reason?: any) => void): boolean {
if (this.reconnectionEventEmitter == undefined) {
// There is no ongoing reconnection
this.reconnectionEventEmitter = new EventEmitter();
return false;
} else {
// Ongoing reconnection
console.warn(`Trying to reconnect stream ${this.streamId} (${this.isLocal() ? 'Publisher' : 'Subscriber'}) but an ongoing reconnection process is active. Waiting for response...`);
this.reconnectionEventEmitter.once('success', () => {
resolve();
});
this.reconnectionEventEmitter.once('error', error => {
reject(error);
});
return true;
}
}
/** /**
* @hidden * @hidden
*/ */
@ -822,18 +843,8 @@ export class Stream {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (reconnect) { if (reconnect) {
if (this.reconnectionEventEmitter == undefined) { if (this.setupReconnectionEventEmitter(resolve, reject)) {
// There is no ongoing reconnection
this.reconnectionEventEmitter = new EventEmitter();
} else {
// Ongoing reconnection // Ongoing reconnection
console.warn(`Trying to reconnect stream ${this.streamId} (Publisher) but an ongoing reconnection process is active. Waiting for response...`);
this.reconnectionEventEmitter.once('success', () => {
resolve();
});
this.reconnectionEventEmitter.once('error', error => {
reject(error);
});
return; return;
} }
} else { } else {
@ -956,6 +967,32 @@ export class Stream {
}); });
} }
/**
* @hidden
*/
finalResolveForSubscription(reconnect: boolean, resolve: (value: void | PromiseLike<void>) => void) {
logger.info("'Subscriber' (" + this.streamId + ") successfully " + (reconnect ? "reconnected" : "subscribed"));
this.remotePeerSuccessfullyEstablished(reconnect);
this.initWebRtcStats();
if (reconnect) {
this.reconnectionEventEmitter?.emitEvent('success');
delete this.reconnectionEventEmitter;
}
resolve();
}
/**
* @hidden
*/
finalRejectForSubscription(reconnect: boolean, error: any, reject: (reason?: any) => void) {
logger.error("Error for 'Subscriber' (" + this.streamId + ") while trying to " + (reconnect ? "reconnect" : "subscribe") + ": " + error.toString());
if (reconnect) {
this.reconnectionEventEmitter?.emitEvent('error', [error]);
delete this.reconnectionEventEmitter;
}
reject(error);
}
/** /**
* @hidden * @hidden
*/ */
@ -963,56 +1000,27 @@ export class Stream {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (reconnect) { if (reconnect) {
if (this.reconnectionEventEmitter == undefined) { if (this.setupReconnectionEventEmitter(resolve, reject)) {
// There is no ongoing reconnection
this.reconnectionEventEmitter = new EventEmitter();
} else {
// Ongoing reconnection // Ongoing reconnection
console.warn(`Trying to reconnect stream ${this.streamId} (Subscriber) but an ongoing reconnection process is active. Waiting for response...`);
this.reconnectionEventEmitter.once('success', () => {
resolve();
});
this.reconnectionEventEmitter.once('error', error => {
reject(error);
});
return; return;
} }
} }
const finalResolve = () => {
logger.info("'Subscriber' (" + this.streamId + ") successfully " + (reconnect ? "reconnected" : "subscribed"));
this.remotePeerSuccessfullyEstablished(reconnect);
this.initWebRtcStats();
if (reconnect) {
this.reconnectionEventEmitter?.emitEvent('success');
delete this.reconnectionEventEmitter;
}
resolve();
}
const finalReject = error => {
if (reconnect) {
this.reconnectionEventEmitter?.emitEvent('error', [error]);
delete this.reconnectionEventEmitter;
}
reject(error);
}
if (this.session.openvidu.mediaServer === 'mediasoup') { if (this.session.openvidu.mediaServer === 'mediasoup') {
// Server initiates negotiation // Server initiates negotiation
this.initWebRtcPeerReceiveFromServer(reconnect) this.initWebRtcPeerReceiveFromServer(reconnect)
.then(() => finalResolve()) .then(() => this.finalResolveForSubscription(reconnect, resolve))
.catch(error => finalReject(error)); .catch(error => this.finalRejectForSubscription(reconnect, error, reject));
} else { } else {
// Client initiates negotiation // Client initiates negotiation
this.initWebRtcPeerReceiveFromClient(reconnect) this.initWebRtcPeerReceiveFromClient(reconnect)
.then(() => finalResolve()) .then(() => this.finalResolveForSubscription(reconnect, resolve))
.catch(error => finalReject(error)); .catch(error => this.finalRejectForSubscription(reconnect, error, reject));
} }
}); });
@ -1320,7 +1328,10 @@ export class Stream {
return state; return state;
} }
private initWebRtcStats(): void { /**
* @hidden
*/
initWebRtcStats(): void {
this.webRtcStats = new WebRtcStats(this); this.webRtcStats = new WebRtcStats(this);
this.webRtcStats.initWebRtcStats(); this.webRtcStats.initWebRtcStats();

View File

@ -24,14 +24,14 @@ export class OpenViduLogger {
private loggingSessionId: string | undefined; private loggingSessionId: string | undefined;
/** /**
* @hidden * @hidden
*/ */
static configureJSNLog(openVidu: OpenVidu, token: string) { static configureJSNLog(openVidu: OpenVidu, token: string) {
try { try {
// If dev mode // If dev mode or...
if ((window['LOG_JSNLOG_RESULTS']) || if ((window['LOG_JSNLOG_RESULTS']) ||
// If instance is created and it is OpenVidu Pro // If instance is created and it is OpenVidu Pro
(this.instance && openVidu.webrtcStatsInterval > -1 (this.instance && openVidu.isPro
// If logs are enabled // If logs are enabled
&& this.instance.isOpenViduBrowserLogsDebugActive(openVidu) && this.instance.isOpenViduBrowserLogsDebugActive(openVidu)
// Only reconfigure it if session or finalUserId has changed // Only reconfigure it if session or finalUserId has changed
@ -135,8 +135,8 @@ export class OpenViduLogger {
} }
/** /**
* @hidden * @hidden
*/ */
static getInstance(): OpenViduLogger { static getInstance(): OpenViduLogger {
if (!OpenViduLogger.instance) { if (!OpenViduLogger.instance) {
OpenViduLogger.instance = new OpenViduLogger(); OpenViduLogger.instance = new OpenViduLogger();
@ -159,9 +159,9 @@ export class OpenViduLogger {
// Return console functions with jsnlog integration // Return console functions with jsnlog integration
private getConsoleWithJSNLog() { private getConsoleWithJSNLog() {
return function(openViduLogger: OpenViduLogger){ return function (openViduLogger: OpenViduLogger) {
return { return {
log: function(...args){ log: function (...args) {
openViduLogger.defaultConsoleLogger.log.apply(openViduLogger.defaultConsoleLogger.logger, arguments); openViduLogger.defaultConsoleLogger.log.apply(openViduLogger.defaultConsoleLogger.logger, arguments);
if (openViduLogger.isJSNLogSetup) { if (openViduLogger.isJSNLogSetup) {
JL().info(arguments); JL().info(arguments);
@ -173,7 +173,7 @@ export class OpenViduLogger {
JL().info(arguments); JL().info(arguments);
} }
}, },
debug: function(...args) { debug: function (...args) {
openViduLogger.defaultConsoleLogger.debug.apply(openViduLogger.defaultConsoleLogger.logger, arguments); openViduLogger.defaultConsoleLogger.debug.apply(openViduLogger.defaultConsoleLogger.logger, arguments);
}, },
warn: function (...args) { warn: function (...args) {
@ -202,7 +202,7 @@ export class OpenViduLogger {
} }
private disableLogger() { private disableLogger() {
JL.setOptions({enabled: false}); JL.setOptions({ enabled: false });
this.isJSNLogSetup = false; this.isJSNLogSetup = false;
this.loggingSessionId = undefined; this.loggingSessionId = undefined;
this.currentAppender = undefined; this.currentAppender = undefined;
@ -215,8 +215,8 @@ export class OpenViduLogger {
} }
/** /**
* @hidden * @hidden
*/ */
log(...args: any[]) { log(...args: any[]) {
if (!this.isProdMode) { if (!this.isProdMode) {
this.defaultConsoleLogger.log.apply(this.defaultConsoleLogger.logger, arguments); this.defaultConsoleLogger.log.apply(this.defaultConsoleLogger.logger, arguments);
@ -227,8 +227,8 @@ export class OpenViduLogger {
} }
/** /**
* @hidden * @hidden
*/ */
debug(...args: any[]) { debug(...args: any[]) {
if (!this.isProdMode) { if (!this.isProdMode) {
this.defaultConsoleLogger.debug.apply(this.defaultConsoleLogger.logger, arguments); this.defaultConsoleLogger.debug.apply(this.defaultConsoleLogger.logger, arguments);
@ -236,8 +236,8 @@ export class OpenViduLogger {
} }
/** /**
* @hidden * @hidden
*/ */
info(...args: any[]) { info(...args: any[]) {
if (!this.isProdMode) { if (!this.isProdMode) {
this.defaultConsoleLogger.info.apply(this.defaultConsoleLogger.logger, arguments); this.defaultConsoleLogger.info.apply(this.defaultConsoleLogger.logger, arguments);
@ -248,8 +248,8 @@ export class OpenViduLogger {
} }
/** /**
* @hidden * @hidden
*/ */
warn(...args: any[]) { warn(...args: any[]) {
if (!this.isProdMode) { if (!this.isProdMode) {
this.defaultConsoleLogger.warn.apply(this.defaultConsoleLogger.logger, arguments); this.defaultConsoleLogger.warn.apply(this.defaultConsoleLogger.logger, arguments);
@ -260,8 +260,8 @@ export class OpenViduLogger {
} }
/** /**
* @hidden * @hidden
*/ */
error(...args: any[]) { error(...args: any[]) {
this.defaultConsoleLogger.error.apply(this.defaultConsoleLogger.logger, arguments); this.defaultConsoleLogger.error.apply(this.defaultConsoleLogger.logger, arguments);
if (this.isJSNLogSetup) { if (this.isJSNLogSetup) {
@ -270,8 +270,8 @@ export class OpenViduLogger {
} }
/** /**
* @hidden * @hidden
*/ */
flush() { flush() {
if (this.isJSNLogSetup && this.currentAppender != null) { if (this.isJSNLogSetup && this.currentAppender != null) {
this.currentAppender.sendBatch(); this.currentAppender.sendBatch();