From 6d93fc34055ba3324dda9ff63e24e244eca110eb Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Wed, 30 Jun 2021 16:00:13 +0200 Subject: [PATCH] Forcibly reconnect subscribers upon publisher reconnection if necessary --- openvidu-browser/src/OpenVidu/Session.ts | 2 +- openvidu-browser/src/OpenVidu/Stream.ts | 9 +- .../client/internal/ProtocolElements.java | 8 +- .../openvidu/server/cdr/WebrtcDebugEvent.java | 2 +- .../server/core/SessionEventsHandler.java | 2 +- .../openvidu/server/core/SessionManager.java | 4 +- .../server/kurento/core/KurentoSession.java | 8 + .../core/KurentoSessionEventsHandler.java | 1 - .../kurento/core/KurentoSessionManager.java | 143 ++++++++++-------- .../io/openvidu/server/rpc/RpcHandler.java | 8 +- 10 files changed, 113 insertions(+), 74 deletions(-) diff --git a/openvidu-browser/src/OpenVidu/Session.ts b/openvidu-browser/src/OpenVidu/Session.ts index e984c527..e95c1dfe 100644 --- a/openvidu-browser/src/OpenVidu/Session.ts +++ b/openvidu-browser/src/OpenVidu/Session.ts @@ -1159,7 +1159,7 @@ export class Session extends EventDispatcher { return; } - stream.completeWebRtcPeerReceive(true, event.sdpOffer) + stream.completeWebRtcPeerReceive(true, true, event.sdpOffer) .then(() => stream.finalResolveForSubscription(true, resolve)) .catch(error => stream.finalRejectForSubscription(true, `Error while forcibly reconnecting remote stream ${event.streamId}: ${error.toString()}`, reject)); } else { diff --git a/openvidu-browser/src/OpenVidu/Stream.ts b/openvidu-browser/src/OpenVidu/Stream.ts index fe13ffd7..b270967c 100644 --- a/openvidu-browser/src/OpenVidu/Stream.ts +++ b/openvidu-browser/src/OpenVidu/Stream.ts @@ -1031,7 +1031,7 @@ export class Stream { */ initWebRtcPeerReceiveFromClient(reconnect: boolean): Promise { return new Promise((resolve, reject) => { - this.completeWebRtcPeerReceive(reconnect).then(response => { + this.completeWebRtcPeerReceive(reconnect, false).then(response => { this.webRtcPeer.processRemoteAnswer(response.sdpAnswer) .then(() => resolve()).catch(error => reject(error)); }).catch(error => reject(error)); @@ -1048,7 +1048,7 @@ export class Stream { if (error) { reject(new Error('Error on prepareReceiveVideoFrom: ' + JSON.stringify(error))); } else { - this.completeWebRtcPeerReceive(reconnect, response.sdpOffer) + this.completeWebRtcPeerReceive(reconnect, false, response.sdpOffer) .then(() => resolve()).catch(error => reject(error)); } }); @@ -1058,7 +1058,7 @@ export class Stream { /** * @hidden */ - completeWebRtcPeerReceive(reconnect: boolean, sdpOfferByServer?: string): Promise { + completeWebRtcPeerReceive(reconnect: boolean, forciblyReconnect: boolean, sdpOfferByServer?: string): Promise { return new Promise((resolve, reject) => { logger.debug("'Session.subscribe(Stream)' called"); @@ -1075,6 +1075,9 @@ export class Stream { } else { params['sdpOffer'] = sdpString; } + if (reconnect) { + params['forciblyReconnect'] = forciblyReconnect; + } this.session.openvidu.sendRequest(method, params, (error, response) => { if (error) { diff --git a/openvidu-client/src/main/java/io/openvidu/client/internal/ProtocolElements.java b/openvidu-client/src/main/java/io/openvidu/client/internal/ProtocolElements.java index 46929012..a33a54db 100644 --- a/openvidu-client/src/main/java/io/openvidu/client/internal/ProtocolElements.java +++ b/openvidu-client/src/main/java/io/openvidu/client/internal/ProtocolElements.java @@ -134,14 +134,20 @@ public class ProtocolElements { public static final String RECONNECTSTREAM_METHOD = "reconnectStream"; public static final String RECONNECTSTREAM_STREAM_PARAM = "stream"; public static final String RECONNECTSTREAM_SDPSTRING_PARAM = "sdpString"; + public static final String RECONNECTSTREAM_FORCIBLYRECONNECT_PARAM = "forciblyReconnect"; // TODO: REMOVE ON 2.18.0 public static final String RECONNECTSTREAM_SDPOFFER_PARAM = "sdpOffer"; // ENDTODO public static final String VIDEODATA_METHOD = "videoData"; - + public static final String ECHO_METHOD = "echo"; + public static final String FORCIBLYRECONNECTSUBSCRIBER_METHOD = "forciblyReconnectSubscriber"; + public static final String FORCIBLYRECONNECTSUBSCRIBER_CONNECTIONID_PARAM = "connectionId"; + public static final String FORCIBLYRECONNECTSUBSCRIBER_STREAMID_PARAM = "streamId"; + public static final String FORCIBLYRECONNECTSUBSCRIBER_SDPOFFER_PARAM = "sdpOffer"; + // ---------------------------- SERVER RESPONSES & EVENTS ----------------- public static final String PARTICIPANTJOINED_METHOD = "participantJoined"; diff --git a/openvidu-server/src/main/java/io/openvidu/server/cdr/WebrtcDebugEvent.java b/openvidu-server/src/main/java/io/openvidu/server/cdr/WebrtcDebugEvent.java index 8a0aeb25..8d5b9a54 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/cdr/WebrtcDebugEvent.java +++ b/openvidu-server/src/main/java/io/openvidu/server/cdr/WebrtcDebugEvent.java @@ -11,7 +11,7 @@ public class WebrtcDebugEvent { } public enum WebrtcDebugEventOperation { - publish, subscribe, reconnectPublisher, reconnectSubscriber + publish, subscribe, reconnectPublisher, reconnectSubscriber, forciblyReconnectSubscriber } public enum WebrtcDebugEventType { diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java index 3a856145..e9bd2b69 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java @@ -308,7 +308,7 @@ public class SessionEventsHandler { } } - public void onPrepareSubscription(Participant participant, Session session, String sdpOffer, Integer transactionId, + public void onPrepareSubscription(Participant participant, String sdpOffer, Integer transactionId, OpenViduException error) { if (error != null) { rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index eb33cb8c..f54d64b1 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -116,6 +116,8 @@ public abstract class SessionManager { public abstract void prepareSubscription(Participant participant, String senderPublicId, boolean reconnect, Integer id); + public abstract String prepareForcedSubscription(Participant participant, String senderPublicId); + public abstract void subscribe(Participant participant, String senderName, String sdpString, Integer transactionId, boolean initByServer); @@ -177,7 +179,7 @@ public abstract class SessionManager { Integer transactionId); public abstract void reconnectSubscriber(Participant participant, String streamId, String sdpString, - Integer transactionId, boolean initByServer); + Integer transactionId, boolean initByServer, boolean forciblyReconnect); public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) throws OpenViduException; diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index 732ed1e5..ca849e6d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -19,9 +19,11 @@ package io.openvidu.server.kurento.core; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.kurento.client.Continuation; import org.kurento.client.ErrorEvent; @@ -285,6 +287,12 @@ public class KurentoSession extends Session { return this.publishedStreamIds.get(streamId); } + public Set getParticipantsSubscribedToParticipant(String senderPublicId) { + return this.participants.values().stream() + .filter(p -> ((KurentoParticipant) p).getSubscriber(senderPublicId) != null) + .collect(Collectors.toSet()); + } + public void restartStatusInKurentoAfterReconnection(Long kmsDisconnectionTime) { log.info("Resetting process: resetting remote media objects for active session {}", this.sessionId); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionEventsHandler.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionEventsHandler.java index de3253f6..3c134868 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionEventsHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionEventsHandler.java @@ -32,7 +32,6 @@ public class KurentoSessionEventsHandler extends SessionEventsHandler { public void onIceCandidate(String roomName, String participantPrivateId, String senderPublicId, String endpointName, IceCandidate candidate) { JsonObject params = new JsonObject(); - params.addProperty(ProtocolElements.ICECANDIDATE_SENDERCONNECTIONID_PARAM, senderPublicId); params.addProperty(ProtocolElements.ICECANDIDATE_EPNAME_PARAM, endpointName); params.addProperty(ProtocolElements.ICECANDIDATE_SDPMLINEINDEX_PARAM, candidate.getSdpMLineIndex()); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index dc8acb6d..8417bcfc 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -538,67 +538,80 @@ public class KurentoSessionManager extends SessionManager { @Override public void prepareSubscription(Participant participant, String senderPublicId, boolean reconnect, Integer transactionId) { - String sdpOffer = null; - Session session = null; try { - log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpOffer={} ({})", senderPublicId, sdpOffer, - participant.getParticipantPublicId()); - - KurentoParticipant kParticipant = (KurentoParticipant) participant; - session = ((KurentoParticipant) participant).getSession(); - Participant senderParticipant = session.getParticipantByPublicId(senderPublicId); - - if (senderParticipant == null) { - log.warn( - "PARTICIPANT {}: Requesting to recv media from user {} " - + "in session {} but user could not be found", - participant.getParticipantPublicId(), senderPublicId, session.getSessionId()); - throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, - "User '" + senderPublicId + " not found in session '" + session.getSessionId() + "'"); - } - if (!senderParticipant.isStreaming()) { - log.warn( - "PARTICIPANT {}: Requesting to recv media from user {} " - + "in session {} but user is not streaming media", - participant.getParticipantPublicId(), senderPublicId, session.getSessionId()); - throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, - "User '" + senderPublicId + " not streaming media in session '" + session.getSessionId() + "'"); - } - - if (reconnect) { - kParticipant.cancelReceivingMedia(((KurentoParticipant) senderParticipant), null, true); - } - - sdpOffer = kParticipant.prepareReceiveMediaFrom(senderParticipant); - final String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(senderParticipant); - - CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server, - WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOffer, sdpOffer)); - - boolean isTranscodingAllowed = session.getSessionProperties().isTranscodingAllowed(); - VideoCodec forcedVideoCodec = session.getSessionProperties().forcedVideoCodec(); - - // Modify server's SDPOffer if forced codec is defined - if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) { - sdpOffer = sdpMunging.forceCodec(sdpOffer, participant, false, false, isTranscodingAllowed, - forcedVideoCodec); - - CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server, - WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOfferMunged, sdpOffer)); - } - - if (sdpOffer == null) { - throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Unable to generate SDP offer when subscribing '" - + participant.getParticipantPublicId() + "' to '" + senderPublicId + "'"); - } + String sdpOfferByServer = this.commonPrepareSubscription(participant, senderPublicId, reconnect, + WebrtcDebugEventOperation.subscribe); + sessionEventsHandler.onPrepareSubscription(participant, sdpOfferByServer, transactionId, null); } catch (OpenViduException e) { log.error("PARTICIPANT {}: Error preparing subscription to {}", participant.getParticipantPublicId(), senderPublicId, e); - sessionEventsHandler.onPrepareSubscription(participant, session, null, transactionId, e); + sessionEventsHandler.onPrepareSubscription(participant, null, transactionId, e); } - if (sdpOffer != null) { - sessionEventsHandler.onPrepareSubscription(participant, session, sdpOffer, transactionId, null); + } + + @Override + public String prepareForcedSubscription(Participant participant, String senderPublicId) { + return this.commonPrepareSubscription(participant, senderPublicId, true, + WebrtcDebugEventOperation.forciblyReconnectSubscriber); + } + + private String commonPrepareSubscription(Participant participant, String senderPublicId, boolean reconnect, + WebrtcDebugEventOperation operation) { + String sdpOffer = null; + Session session = null; + + log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpOffer={} ({})", senderPublicId, sdpOffer, + participant.getParticipantPublicId()); + + KurentoParticipant kParticipant = (KurentoParticipant) participant; + session = ((KurentoParticipant) participant).getSession(); + Participant senderParticipant = session.getParticipantByPublicId(senderPublicId); + + if (senderParticipant == null) { + log.warn( + "PARTICIPANT {}: Requesting to recv media from user {} " + + "in session {} but user could not be found", + participant.getParticipantPublicId(), senderPublicId, session.getSessionId()); + throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, + "User '" + senderPublicId + " not found in session '" + session.getSessionId() + "'"); } + if (!senderParticipant.isStreaming()) { + log.warn( + "PARTICIPANT {}: Requesting to recv media from user {} " + + "in session {} but user is not streaming media", + participant.getParticipantPublicId(), senderPublicId, session.getSessionId()); + throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, + "User '" + senderPublicId + " not streaming media in session '" + session.getSessionId() + "'"); + } + + if (reconnect) { + kParticipant.cancelReceivingMedia(((KurentoParticipant) senderParticipant), null, true); + } + + sdpOffer = kParticipant.prepareReceiveMediaFrom(senderParticipant); + final String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(senderParticipant); + + CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server, operation, + WebrtcDebugEventType.sdpOffer, sdpOffer)); + + boolean isTranscodingAllowed = session.getSessionProperties().isTranscodingAllowed(); + VideoCodec forcedVideoCodec = session.getSessionProperties().forcedVideoCodec(); + + // Modify server's SDPOffer if forced codec is defined + if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) { + sdpOffer = sdpMunging.forceCodec(sdpOffer, participant, false, false, isTranscodingAllowed, + forcedVideoCodec); + + CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server, operation, + WebrtcDebugEventType.sdpOfferMunged, sdpOffer)); + } + + if (sdpOffer == null) { + throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Unable to generate SDP offer when subscribing '" + + participant.getParticipantPublicId() + "' to '" + senderPublicId + "'"); + } + + return sdpOffer; } @Override @@ -1188,10 +1201,10 @@ public class KurentoSessionManager extends SessionManager { @Override public void reconnectSubscriber(Participant participant, String streamId, String sdpString, Integer transactionId, - boolean initByServer) { + boolean initByServer, boolean forciblyReconnect) { KurentoParticipant kParticipant = (KurentoParticipant) participant; KurentoSession kSession = kParticipant.getSession(); - reconnectSubscriber(kSession, kParticipant, streamId, sdpString, transactionId, initByServer); + reconnectSubscriber(kSession, kParticipant, streamId, sdpString, transactionId, initByServer, forciblyReconnect); } private String mungeSdpOffer(Session kSession, Participant participant, String sdpOffer, boolean isPublisher) { @@ -1205,7 +1218,7 @@ public class KurentoSessionManager extends SessionManager { return null; } - private void reconnectPublisher(KurentoSession kSession, KurentoParticipant kParticipant, String streamId, + protected void reconnectPublisher(KurentoSession kSession, KurentoParticipant kParticipant, String streamId, String sdpOffer, Integer transactionId) { String sdpOfferMunged = mungeSdpOffer(kSession, kParticipant, sdpOffer, true); @@ -1239,13 +1252,16 @@ public class KurentoSessionManager extends SessionManager { } private void reconnectSubscriber(KurentoSession kSession, KurentoParticipant kParticipant, String streamId, - String sdpString, Integer transactionId, boolean initByServer) { + String sdpString, Integer transactionId, boolean initByServer, boolean forciblyReconnect) { String senderPrivateId = kSession.getParticipantPrivateIdFromStreamId(streamId); if (senderPrivateId != null) { KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId); String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(sender); + WebrtcDebugEventOperation operation = forciblyReconnect + ? WebrtcDebugEventOperation.forciblyReconnectSubscriber + : WebrtcDebugEventOperation.reconnectSubscriber; if (initByServer) { @@ -1254,7 +1270,7 @@ public class KurentoSessionManager extends SessionManager { final String sdpAnswer = sdpString; CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.client, - WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); + operation, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); kParticipant.receiveMedia(sender, sdpAnswer, true, true); @@ -1270,14 +1286,13 @@ public class KurentoSessionManager extends SessionManager { String sdpOffer = sdpString; CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.client, - WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpOffer, sdpOffer)); + operation, WebrtcDebugEventType.sdpOffer, sdpOffer)); String sdpOfferMunged = mungeSdpOffer(kSession, kParticipant, sdpOffer, false); if (sdpOfferMunged != null) { sdpOffer = sdpOfferMunged; CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.client, - WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpOfferMunged, - sdpOffer)); + operation, WebrtcDebugEventType.sdpOfferMunged, sdpOffer)); } kParticipant.cancelReceivingMedia(sender, null, true); @@ -1291,7 +1306,7 @@ public class KurentoSessionManager extends SessionManager { kParticipant.getParticipantPublicId(), sdpAnswer); CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.server, - WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); + operation, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); sessionEventsHandler.onSubscribe(kParticipant, kSession, sdpAnswer, transactionId, null); diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java index faa3a81d..de08d696 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java @@ -662,7 +662,13 @@ public class RpcHandler extends DefaultJsonRpcHandler { sessionManager.reconnectPublisher(participant, streamId, sdpString, request.getId()); } else { boolean initByServer = request.getParams().has(ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM); - sessionManager.reconnectSubscriber(participant, streamId, sdpString, request.getId(), initByServer); + boolean forciblyReconnectSubscriber = false; + if (request.getParams().has(ProtocolElements.RECONNECTSTREAM_FORCIBLYRECONNECT_PARAM)) { + forciblyReconnectSubscriber = getBooleanParam(request, + ProtocolElements.RECONNECTSTREAM_FORCIBLYRECONNECT_PARAM); + } + sessionManager.reconnectSubscriber(participant, streamId, sdpString, request.getId(), initByServer, + forciblyReconnectSubscriber); } } catch (OpenViduException e) { this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(),