From 13e3b52ff678ae68691f164662458478f20853e1 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Fri, 14 Feb 2020 22:32:57 +0100 Subject: [PATCH] openvidu-server: RPC reconnect method --- .../openvidu/server/core/SessionManager.java | 3 + .../kurento/core/KurentoParticipant.java | 104 ++++++++++-------- .../server/kurento/core/KurentoSession.java | 9 +- .../kurento/core/KurentoSessionManager.java | 60 +++++++++- .../kurento/endpoint/MediaEndpoint.java | 13 ++- .../kurento/endpoint/PublisherEndpoint.java | 14 ++- .../io/openvidu/server/rpc/RpcHandler.java | 47 ++++++-- 7 files changed, 182 insertions(+), 68 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index f34722ff..4717e833 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -155,6 +155,9 @@ public abstract class SessionManager { public abstract Participant publishIpcam(Session session, MediaOptions mediaOptions, String serverMetadata) throws Exception; + public abstract void reconnectStream(Participant participant, String streamId, String sdpOffer, + Integer transactionId); + public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) throws OpenViduException; diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index e91d3637..d6b23cb1 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -34,6 +34,7 @@ import org.kurento.client.Filter; import org.kurento.client.IceCandidate; import org.kurento.client.MediaElement; import org.kurento.client.MediaPipeline; +import org.kurento.client.PassThrough; import org.kurento.client.internal.server.KurentoServerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,7 +88,7 @@ public class KurentoParticipant extends Participant { if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) { // Initialize a PublisherEndpoint this.publisher = new PublisherEndpoint(endpointType, this, participant.getParticipantPublicId(), - this.session.getPipeline(), this.openviduConfig); + this.session.getPipeline(), this.openviduConfig, null); } for (Participant other : session.getParticipants()) { @@ -100,24 +101,25 @@ public class KurentoParticipant extends Participant { } } - public void createPublishingEndpoint(MediaOptions mediaOptions) { + public void createPublishingEndpoint(MediaOptions mediaOptions, String streamId) { String type = mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO"; - final String publisherStreamId = IdentifierPrefixes.STREAM_ID + type.substring(0, Math.min(type.length(), 3)) - + "_" + RandomStringUtils.randomAlphabetic(1).toUpperCase() + RandomStringUtils.randomAlphanumeric(3) - + "_" + this.getParticipantPublicId(); - publisher.setStreamId(publisherStreamId); - publisher.setEndpointName(publisherStreamId); + if (streamId == null) { + streamId = IdentifierPrefixes.STREAM_ID + type.substring(0, Math.min(type.length(), 3)) + "_" + + RandomStringUtils.randomAlphabetic(1).toUpperCase() + RandomStringUtils.randomAlphanumeric(3) + + "_" + this.getParticipantPublicId(); + } + publisher.setStreamId(streamId); + publisher.setEndpointName(streamId); publisher.setMediaOptions(mediaOptions); publisher.createEndpoint(publisherLatch); if (getPublisher().getEndpoint() == null) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create publisher endpoint"); } - - this.publisher.getEndpoint().setName(publisherStreamId); + publisher.getEndpoint().setName(streamId); endpointConfig.addEndpointListeners(this.publisher, "publisher"); - // Remove streamId from publisher's map + // Put streamId in publisher's map this.session.publishedStreamIds.putIfAbsent(this.getPublisherStreamId(), this.getParticipantPrivateId()); } @@ -155,6 +157,10 @@ public class KurentoParticipant extends Participant { return this.publisher; } + public SubscriberEndpoint getSubscriber(String senderPublicId) { + return this.subscribers.get(senderPublicId); + } + public Collection getSubscribers() { return this.subscribers.values(); } @@ -171,7 +177,7 @@ public class KurentoParticipant extends Participant { return session; } - public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback) { + public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback, boolean silent) { log.info("PARTICIPANT {}: Request to publish video in room {} (sdp type {})", this.getParticipantPublicId(), this.session.getSessionId(), sdpType); log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpString); @@ -188,8 +194,10 @@ public class KurentoParticipant extends Participant { this.recordingManager.startOneIndividualStreamRecording(session, null, null, this); } - endpointConfig.getCdr().recordNewPublisher(this, session.getSessionId(), publisher.getStreamId(), - publisher.getMediaOptions(), publisher.createdAt()); + if (!silent) { + endpointConfig.getCdr().recordNewPublisher(this, session.getSessionId(), publisher.getStreamId(), + publisher.getMediaOptions(), publisher.createdAt()); + } return sdpResponse; } @@ -199,12 +207,12 @@ public class KurentoParticipant extends Participant { this.session.getSessionId()); final MediaOptions mediaOptions = this.getPublisher().getMediaOptions(); releasePublisherEndpoint(reason, kmsDisconnectionTime); - resetPublisherEndpoint(mediaOptions); + resetPublisherEndpoint(mediaOptions, null); log.info("PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)", this.getParticipantPublicId()); } - public String receiveMediaFrom(Participant sender, String sdpOffer) { + public String receiveMediaFrom(Participant sender, String sdpOffer, boolean silent) { final String senderName = sender.getParticipantPublicId(); log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(), @@ -272,7 +280,7 @@ public class KurentoParticipant extends Participant { log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(), senderName, this.session.getSessionId()); - if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { + if (!silent && !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { endpointConfig.getCdr().recordNewSubscriber(this, this.session.getSessionId(), sender.getPublisherStreamId(), sender.getParticipantPublicId(), subscriber.createdAt()); } @@ -287,12 +295,12 @@ public class KurentoParticipant extends Participant { log.error("Exception connecting subscriber endpoint " + "to publisher endpoint", e); } this.subscribers.remove(senderName); - releaseSubscriberEndpoint((KurentoParticipant) sender, subscriber, null); + releaseSubscriberEndpoint((KurentoParticipant) sender, subscriber, null, false); } return null; } - public void cancelReceivingMedia(KurentoParticipant senderKurentoParticipant, EndReason reason) { + public void cancelReceivingMedia(KurentoParticipant senderKurentoParticipant, EndReason reason, boolean silent) { final String senderName = senderKurentoParticipant.getParticipantPublicId(); log.info("PARTICIPANT {}: cancel receiving media from {}", this.getParticipantPublicId(), senderName); @@ -301,7 +309,7 @@ public class KurentoParticipant extends Participant { log.warn("PARTICIPANT {}: Trying to cancel receiving video from user {}. " + "But there is no such subscriber endpoint.", this.getParticipantPublicId(), senderName); } else { - releaseSubscriberEndpoint(senderKurentoParticipant, subscriberEndpoint, reason); + releaseSubscriberEndpoint(senderKurentoParticipant, subscriberEndpoint, reason, silent); log.info("PARTICIPANT {}: stopped receiving media from {} in room {}", this.getParticipantPublicId(), senderName, this.session.getSessionId()); } @@ -321,9 +329,10 @@ public class KurentoParticipant extends Participant { final SubscriberEndpoint subscriber = entry.getValue(); it.remove(); if (subscriber != null && subscriber.getEndpoint() != null) { + releaseSubscriberEndpoint( (KurentoParticipant) this.session.getParticipantByPublicId(remoteParticipantName), subscriber, - reason); + reason, false); log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(), remoteParticipantName); } else { @@ -413,7 +422,7 @@ public class KurentoParticipant extends Participant { } private void releaseSubscriberEndpoint(KurentoParticipant senderKurentoParticipant, SubscriberEndpoint subscriber, - EndReason reason) { + EndReason reason, boolean silent) { final String senderName = senderKurentoParticipant.getParticipantPublicId(); if (subscriber != null) { @@ -424,40 +433,44 @@ public class KurentoParticipant extends Participant { releaseElement(senderName, subscriber.getEndpoint()); - // Stop PlayerEndpoint of IP CAM if last subscriber disconnected - final PublisherEndpoint senderPublisher = senderKurentoParticipant.publisher; - if (senderPublisher != null) { - // If no PublisherEndpoint, then it means that the publisher already closed it - final KurentoMediaOptions options = (KurentoMediaOptions) senderPublisher.getMediaOptions(); - if (options.onlyPlayWithSubscribers != null && options.onlyPlayWithSubscribers) { - synchronized (senderPublisher) { - senderPublisher.numberOfSubscribers--; - if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) { - try { - senderPublisher.getPlayerEndpoint().stop(); - log.info("IP Camera stream {} feed is now disabled because there are no subscribers", - senderPublisher.getStreamId()); - } catch (Exception e) { - log.info("Error while disabling feed for IP camera {}: {}", - senderPublisher.getStreamId(), e.getMessage()); + if (!silent) { + + // Stop PlayerEndpoint of IP CAM if last subscriber disconnected + final PublisherEndpoint senderPublisher = senderKurentoParticipant.publisher; + if (senderPublisher != null) { + // If no PublisherEndpoint, then it means that the publisher already closed it + final KurentoMediaOptions options = (KurentoMediaOptions) senderPublisher.getMediaOptions(); + if (options.onlyPlayWithSubscribers != null && options.onlyPlayWithSubscribers) { + synchronized (senderPublisher) { + senderPublisher.numberOfSubscribers--; + if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) { + try { + senderPublisher.getPlayerEndpoint().stop(); + log.info( + "IP Camera stream {} feed is now disabled because there are no subscribers", + senderPublisher.getStreamId()); + } catch (Exception e) { + log.info("Error while disabling feed for IP camera {}: {}", + senderPublisher.getStreamId(), e.getMessage()); + } } } } } - } - if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { - endpointConfig.getCdr().stopSubscriber(this.getParticipantPublicId(), senderName, - subscriber.getStreamId(), reason); - } + if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { + endpointConfig.getCdr().stopSubscriber(this.getParticipantPublicId(), senderName, + subscriber.getStreamId(), reason); + } + } } else { log.warn("PARTICIPANT {}: Trying to release subscriber endpoint for '{}' but is null", this.getParticipantPublicId(), senderName); } } - private void releaseElement(final String senderName, final MediaElement element) { + void releaseElement(final String senderName, final MediaElement element) { final String eid = element.getId(); try { element.release(new Continuation() { @@ -488,11 +501,12 @@ public class KurentoParticipant extends Participant { return this.publisher.getStreamId(); } - public void resetPublisherEndpoint(MediaOptions mediaOptions) { + public void resetPublisherEndpoint(MediaOptions mediaOptions, PassThrough passThru) { log.info("Resetting publisher endpoint for participant {}", this.getParticipantPublicId()); this.publisher = new PublisherEndpoint(endpointType, this, this.getParticipantPublicId(), - this.session.getPipeline(), this.openviduConfig); + this.session.getPipeline(), this.openviduConfig, passThru); this.publisher.setMediaOptions(mediaOptions); + this.publisherLatch = new CountDownLatch(1); } @Override diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index c4ab3991..6c1aeaf3 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -108,7 +108,7 @@ public class KurentoSession extends Session { if (participant.equals(subscriber)) { continue; } - ((KurentoParticipant) subscriber).cancelReceivingMedia((KurentoParticipant) participant, reason); + ((KurentoParticipant) subscriber).cancelReceivingMedia((KurentoParticipant) participant, reason, false); } log.debug("SESSION {}: Unsubscribed other participants {} from the publisher {}", sessionId, @@ -187,7 +187,7 @@ public class KurentoSession extends Session { log.debug("SESSION {}: Cancel receiving media from participant '{}' for other participant", this.sessionId, participant.getParticipantPublicId()); for (Participant other : participants.values()) { - ((KurentoParticipant) other).cancelReceivingMedia(removedParticipant, reason); + ((KurentoParticipant) other).cancelReceivingMedia(removedParticipant, reason, false); } } @@ -329,8 +329,9 @@ public class KurentoSession extends Session { } getParticipants().forEach(p -> { if (!OpenViduRole.SUBSCRIBER.equals(p.getToken().getRole())) { - ((KurentoParticipant) p) - .resetPublisherEndpoint(mediaOptionsMap.get(p.getParticipantPublicId())); + + ((KurentoParticipant) p).resetPublisherEndpoint(mediaOptionsMap.get(p.getParticipantPublicId()), + null); } }); log.info( diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 682e68a8..ee620c67 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -33,6 +33,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.kurento.client.GenericMediaElement; import org.kurento.client.IceCandidate; import org.kurento.client.ListenerSubscription; +import org.kurento.client.PassThrough; import org.kurento.jsonrpc.Props; import org.kurento.jsonrpc.message.Request; import org.slf4j.Logger; @@ -281,7 +282,7 @@ public class KurentoSessionManager extends SessionManager { SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER; KurentoSession kSession = kParticipant.getSession(); - kParticipant.createPublishingEndpoint(mediaOptions); + kParticipant.createPublishingEndpoint(mediaOptions, null); /* * for (MediaElement elem : kurentoOptions.mediaElements) { @@ -305,7 +306,7 @@ public class KurentoSessionManager extends SessionManager { } } - sdpAnswer = kParticipant.publishToRoom(sdpType, kurentoOptions.sdpOffer, kurentoOptions.doLoopback); + sdpAnswer = kParticipant.publishToRoom(sdpType, kurentoOptions.sdpOffer, kurentoOptions.doLoopback, false); if (sdpAnswer == null) { OpenViduException e = new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, @@ -416,7 +417,7 @@ public class KurentoSessionManager extends SessionManager { "User '" + senderName + " not streaming media in session '" + session.getSessionId() + "'"); } - sdpAnswer = kParticipant.receiveMediaFrom(senderParticipant, sdpOffer); + sdpAnswer = kParticipant.receiveMediaFrom(senderParticipant, sdpOffer, false); if (sdpAnswer == null) { throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Unable to generate SDP answer when subscribing '" + participant.getParticipantPublicId() @@ -448,7 +449,7 @@ public class KurentoSessionManager extends SessionManager { "User " + senderName + " not found in session " + session.getSessionId()); } - kParticipant.cancelReceivingMedia((KurentoParticipant) sender, EndReason.unsubscribe); + kParticipant.cancelReceivingMedia((KurentoParticipant) sender, EndReason.unsubscribe, false); sessionEventsHandler.onUnsubscribe(participant, transactionId, null); } @@ -924,6 +925,57 @@ public class KurentoSessionManager extends SessionManager { return kParticipant; } + @Override + public void reconnectStream(Participant participant, String streamId, String sdpOffer, Integer transactionId) { + KurentoParticipant kParticipant = (KurentoParticipant) participant; + KurentoSession kSession = kParticipant.getSession(); + + if (streamId.equals(participant.getPublisherStreamId())) { + + // Reconnect publisher + final KurentoMediaOptions kurentoOptions = (KurentoMediaOptions) kParticipant.getPublisher() + .getMediaOptions(); + + // 1) Disconnect broken PublisherEndpoint from its PassThrough + PublisherEndpoint publisher = kParticipant.getPublisher(); + final PassThrough passThru = publisher.disconnectFromPassThrough(); + + // 2) Destroy the broken PublisherEndpoint and nothing else + if (publisher.kmsWebrtcStatsThread != null) { + publisher.kmsWebrtcStatsThread.cancel(true); + } + kParticipant.releaseElement(participant.getParticipantPublicId(), publisher.getEndpoint()); + + // 3) Create a new PublisherEndpoint connecting it to the previous PassThrough + kParticipant.resetPublisherEndpoint(kurentoOptions, passThru); + kParticipant.createPublishingEndpoint(kurentoOptions, streamId); + SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER; + String sdpAnswer = kParticipant.publishToRoom(sdpType, sdpOffer, kurentoOptions.doLoopback, true); + + sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(), + kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer, + new HashSet(), transactionId, null); + + } else { + + // Reconnect subscriber + String senderPrivateId = kSession.getParticipantPrivateIdFromStreamId(streamId); + if (senderPrivateId != null) { + KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId); + kParticipant.cancelReceivingMedia(sender, null, true); + String sdpAnswer = kParticipant.receiveMediaFrom(sender, sdpOffer, true); + if (sdpAnswer == null) { + throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, + "Unable to generate SDP answer when reconnecting subscriber to '" + streamId + "'"); + } + sessionEventsHandler.onSubscribe(participant, kSession, sdpAnswer, transactionId, null); + } else { + throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, + "Stream '" + streamId + "' does not exist in Session '" + kSession.getSessionId() + "'"); + } + } + } + @Override public String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) { Session session = this.getSession(sessionId); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java index ad29e525..5fa5254f 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import org.kurento.client.BaseRtpEndpoint; import org.kurento.client.Continuation; import org.kurento.client.Endpoint; import org.kurento.client.ErrorEvent; @@ -32,7 +33,7 @@ import org.kurento.client.IceCandidate; import org.kurento.client.ListenerSubscription; import org.kurento.client.MediaElement; import org.kurento.client.MediaPipeline; -import org.kurento.client.OnIceCandidateEvent; +import org.kurento.client.PassThrough; import org.kurento.client.PlayerEndpoint; import org.kurento.client.RtpEndpoint; import org.kurento.client.SdpEndpoint; @@ -155,7 +156,8 @@ public abstract class MediaEndpoint { } /** - * @return the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) + * @return the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint} + * or {@link PlayerEndpoint}) */ public Endpoint getEndpoint() { if (this.isWeb()) { @@ -167,6 +169,13 @@ public abstract class MediaEndpoint { } } + public BaseRtpEndpoint getBaseRtpEndpoint() { + if (this.isWeb()) { + return this.webEndpoint; + } + return this.endpoint; + } + public long createdAt() { return this.createdAt; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index d1aebb55..1dd07dc7 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -77,15 +77,18 @@ public class PublisherEndpoint extends MediaEndpoint { public int numberOfSubscribers = 0; public PublisherEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName, - MediaPipeline pipeline, OpenviduConfig openviduConfig) { + MediaPipeline pipeline, OpenviduConfig openviduConfig, PassThrough passThru) { super(endpointType, owner, endpointName, pipeline, openviduConfig, log); + this.passThru = passThru; } @Override protected void internalEndpointInitialization(final CountDownLatch endpointLatch) { super.internalEndpointInitialization(endpointLatch); - passThru = new PassThrough.Builder(getPipeline()).build(); - passThruSubscription = registerElemErrListener(passThru); + if (this.passThru == null) { + passThru = new PassThrough.Builder(getPipeline()).build(); + passThruSubscription = registerElemErrListener(passThru); + } } @Override @@ -404,6 +407,11 @@ public class PublisherEndpoint extends MediaEndpoint { } } + public synchronized PassThrough disconnectFromPassThrough() { + this.internalSinkDisconnect(this.getWebEndpoint(), this.passThru); + return this.passThru; + } + private String getNext(String uid) { int idx = elementIds.indexOf(uid); if (idx < 0 || idx + 1 == elementIds.size()) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java index eeb8d017..bf882235 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java @@ -162,6 +162,9 @@ public class RpcHandler extends DefaultJsonRpcHandler { case ProtocolElements.REMOVEFILTEREVENTLISTENER_METHOD: removeFilterEventListener(rpcConnection, request); break; + case ProtocolElements.RECONNECTSTREAM_METHOD: + reconnectStream(rpcConnection, request); + break; default: log.error("Unrecognized request {}", request); break; @@ -602,6 +605,23 @@ public class RpcHandler extends DefaultJsonRpcHandler { } } + private void reconnectStream(RpcConnection rpcConnection, Request request) { + Participant participant; + try { + participant = sanityCheckOfSession(rpcConnection, "reconnectStream"); + } catch (OpenViduException e) { + return; + } + String streamId = getStringParam(request, ProtocolElements.RECONNECTSTREAM_STREAM_PARAM); + String sdpOffer = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM); + try { + sessionManager.reconnectStream(participant, streamId, sdpOffer, request.getId()); + } catch (OpenViduException e) { + this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(), + new JsonObject(), e); + } + } + public void leaveRoomAfterConnClosed(String participantPrivateId, EndReason reason) { try { sessionManager.evictParticipant(this.sessionManager.getParticipant(participantPrivateId), null, null, @@ -670,18 +690,25 @@ public class RpcHandler extends DefaultJsonRpcHandler { } } + @Override + public void afterReconnection(Session rpcSession) throws Exception { + log.info("After reconnection for WebSocket session: {}", rpcSession.getSessionId()); + } + @Override public void handleTransportError(Session rpcSession, Throwable exception) throws Exception { - log.error("Transport exception for WebSocket session: {} - Exception: {}", rpcSession.getSessionId(), - exception.getMessage()); - if ("IOException".equals(exception.getClass().getSimpleName()) - && "Broken pipe".equals(exception.getCause().getMessage())) { - log.warn("Parcipant with private id {} unexpectedly closed the websocket", rpcSession.getSessionId()); - } - if ("EOFException".equals(exception.getClass().getSimpleName())) { - // Store WebSocket connection interrupted exception for this web socket to - // automatically evict the participant on "afterConnectionClosed" event - this.webSocketEOFTransportError.put(rpcSession.getSessionId(), true); + if (rpcSession != null) { + log.error("Transport exception for WebSocket session: {} - Exception: {}", rpcSession.getSessionId(), + exception.getMessage()); + if ("IOException".equals(exception.getClass().getSimpleName()) && exception.getCause() != null + && "Broken pipe".equals(exception.getCause().getMessage())) { + log.warn("Parcipant with private id {} unexpectedly closed the websocket", rpcSession.getSessionId()); + } + if ("EOFException".equals(exception.getClass().getSimpleName())) { + // Store WebSocket connection interrupted exception for this web socket to + // automatically evict the participant on "afterConnectionClosed" event + this.webSocketEOFTransportError.put(rpcSession.getSessionId(), true); + } } }