From 3086b4aaf27d79e4339f2935e88ee5bf9f4cb865 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Tue, 1 Sep 2020 15:17:17 +0200 Subject: [PATCH] openvidu-server: mediasoup refactoring --- openvidu-browser/src/OpenVidu/Stream.ts | 10 +- .../client/internal/ProtocolElements.java | 5 +- .../server/core/SessionEventsHandler.java | 14 +- .../openvidu/server/core/SessionManager.java | 4 +- .../kurento/core/KurentoParticipant.java | 75 +++++++--- .../kurento/core/KurentoSessionManager.java | 86 +++++++---- .../kurento/endpoint/MediaEndpoint.java | 18 +++ .../kurento/endpoint/PublisherEndpoint.java | 134 +++++++++--------- .../kurento/endpoint/SubscriberEndpoint.java | 22 +-- .../server/recording/CompositeWrapper.java | 2 +- .../service/SingleStreamRecordingService.java | 8 +- .../io/openvidu/server/rpc/RpcHandler.java | 56 +++++--- 12 files changed, 277 insertions(+), 157 deletions(-) diff --git a/openvidu-browser/src/OpenVidu/Stream.ts b/openvidu-browser/src/OpenVidu/Stream.ts index b3386c5f..9178f3af 100644 --- a/openvidu-browser/src/OpenVidu/Stream.ts +++ b/openvidu-browser/src/OpenVidu/Stream.ts @@ -830,7 +830,8 @@ export class Stream extends EventDispatcher { let params; if (reconnect) { params = { - stream: this.streamId + stream: this.streamId, + sdpString: sdpOfferParam } } else { let typeOfVideo = ''; @@ -846,10 +847,10 @@ export class Stream extends EventDispatcher { typeOfVideo, frameRate: !!this.frameRate ? this.frameRate : -1, videoDimensions: JSON.stringify(this.videoDimensions), - filter: this.outboundStreamOpts.publisherProperties.filter + filter: this.outboundStreamOpts.publisherProperties.filter, + sdpOffer: sdpOfferParam } } - params['sdpOffer'] = sdpOfferParam; this.session.openvidu.sendRequest(method, params, (error, response) => { if (error) { @@ -955,8 +956,9 @@ export class Stream extends EventDispatcher { + this.streamId, sdpAnswer); const method = reconnect ? 'reconnectStream' : 'receiveVideoFrom'; - const params = { sdpAnswer }; + const params = {}; params[reconnect ? 'stream' : 'sender'] = this.streamId; + params[reconnect ? 'sdpString' : 'sdpAnswer'] = sdpAnswer; this.session.openvidu.sendRequest(method, params, (error, response) => { if (error) { diff --git a/openvidu-client/src/main/java/io/openvidu/client/internal/ProtocolElements.java b/openvidu-client/src/main/java/io/openvidu/client/internal/ProtocolElements.java index 8f033494..5e4d62c9 100644 --- a/openvidu-client/src/main/java/io/openvidu/client/internal/ProtocolElements.java +++ b/openvidu-client/src/main/java/io/openvidu/client/internal/ProtocolElements.java @@ -70,6 +70,9 @@ public class ProtocolElements { public static final String UNPUBLISHVIDEO_METHOD = "unpublishVideo"; + public static final String PREPARERECEIVEVIDEO_METHOD = "prepareReceiveVideFrom"; + public static final String PREPARERECEIVEVIDEO_SDPOFFER_PARAM = "sdpOffer"; + public static final String RECEIVEVIDEO_METHOD = "receiveVideoFrom"; public static final String RECEIVEVIDEO_SDPOFFER_PARAM = "sdpOffer"; public static final String RECEIVEVIDEO_SENDER_PARAM = "sender"; @@ -120,7 +123,7 @@ public class ProtocolElements { public static final String RECONNECTSTREAM_METHOD = "reconnectStream"; public static final String RECONNECTSTREAM_STREAM_PARAM = "stream"; - public static final String RECONNECTSTREAM_SDPOFFER_PARAM = "sdpOffer"; + public static final String RECONNECTSTREAM_SDPSTRING_PARAM = "sdpString"; // ---------------------------- SERVER RESPONSES & EVENTS ----------------- diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java index e217cff4..34609036 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java @@ -279,14 +279,24 @@ public class SessionEventsHandler { } } - public void onSubscribe(Participant participant, Session session, String sdpAnswer, Integer transactionId, + public void onPrepareSubscription(Participant participant, Session session, String sdpOffer, Integer transactionId, + OpenViduException error) { + if (error != null) { + rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); + return; + } + JsonObject result = new JsonObject(); + result.addProperty(ProtocolElements.PREPARERECEIVEVIDEO_SDPOFFER_PARAM, sdpOffer); + rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, result); + } + + public void onSubscribe(Participant participant, Session session, Integer transactionId, OpenViduException error) { if (error != null) { rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); return; } JsonObject result = new JsonObject(); - result.addProperty(ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM, sdpAnswer); rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, result); if (ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index 91dd8fd6..af55da6e 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -106,7 +106,9 @@ public abstract class SessionManager { public abstract void unpublishVideo(Participant participant, Participant moderator, Integer transactionId, EndReason reason); - public abstract void subscribe(Participant participant, String senderName, String sdpOffer, Integer transactionId); + public abstract void prepareSubscription(Participant participant, String senderPublicId, Integer id); + + public abstract void subscribe(Participant participant, String senderName, String sdpAnwser, Integer transactionId); public abstract void unsubscribe(Participant participant, String senderName, Integer transactionId); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index 05c795b3..45017756 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -54,7 +54,6 @@ import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.Participant; import io.openvidu.server.kurento.endpoint.MediaEndpoint; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; -import io.openvidu.server.kurento.endpoint.SdpType; import io.openvidu.server.kurento.endpoint.SubscriberEndpoint; import io.openvidu.server.recording.service.RecordingManager; @@ -169,15 +168,15 @@ public class KurentoParticipant extends Participant { return session; } - public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback, boolean silent) { - log.info("PARTICIPANT {}: Request to publish video in room {} (sdp type {})", this.getParticipantPublicId(), - this.session.getSessionId(), sdpType); - log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpString); + public String publishToRoom(String sdpOffer, boolean doLoopback, boolean silent) { + log.info("PARTICIPANT {}: Request to publish video in room {})", this.getParticipantPublicId(), + this.session.getSessionId()); + log.trace("PARTICIPANT {}: Publishing Sdp Offer is {}", this.getParticipantPublicId(), sdpOffer); - String sdpResponse = this.getPublisher().publish(sdpType, sdpString, doLoopback); + String sdpAnswer = this.getPublisher().publish(sdpOffer, doLoopback); this.streaming = true; - log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpResponse); + log.trace("PARTICIPANT {}: Publishing Sdp Answer is {}", this.getParticipantPublicId(), sdpAnswer); log.info("PARTICIPANT {}: Is now publishing video in room {}", this.getParticipantPublicId(), this.session.getSessionId()); @@ -191,7 +190,7 @@ public class KurentoParticipant extends Participant { publisher.getMediaOptions(), publisher.createdAt()); } - return sdpResponse; + return sdpAnswer; } public void unpublishMedia(EndReason reason, long kmsDisconnectionTime) { @@ -204,12 +203,11 @@ public class KurentoParticipant extends Participant { this.getParticipantPublicId()); } - public String receiveMediaFrom(Participant sender, String sdpOffer, boolean silent) { + public String prepareReceiveMediaFrom(Participant sender) { final String senderName = sender.getParticipantPublicId(); - log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(), + log.info("PARTICIPANT {}: Request to prepare receive media from {} in room {}", this.getParticipantPublicId(), senderName, this.session.getSessionId()); - log.trace("PARTICIPANT {}: SdpOffer for {} is {}", this.getParticipantPublicId(), senderName, sdpOffer); if (senderName.equals(this.getParticipantPublicId())) { log.warn("PARTICIPANT {}: trying to configure loopback by subscribing", this.getParticipantPublicId()); @@ -269,8 +267,56 @@ public class KurentoParticipant extends Participant { log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(), senderName); try { - String sdpAnswer = subscriber.subscribe(sdpOffer, kSender.getPublisher()); - log.trace("PARTICIPANT {}: Subscribing SdpAnswer is {}", this.getParticipantPublicId(), sdpAnswer); + String sdpOffer = subscriber.prepareSubscription(kSender.getPublisher()); + log.trace("PARTICIPANT {}: Subscribing SdpOffer is {}", this.getParticipantPublicId(), sdpOffer); + log.info("PARTICIPANT {}: offer prepared to receive media from {} in room {}", + this.getParticipantPublicId(), senderName, this.session.getSessionId()); + return sdpOffer; + } catch (KurentoServerException e) { + log.error("Exception preparing subscriber endpoint for user {}: {}", this.getParticipantPublicId(), + e.getMessage()); + this.subscribers.remove(senderName); + releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false); + return null; + } + } finally { + kSender.getPublisher().closingLock.readLock().unlock(); + } + } else { + log.error( + "PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ", + senderName, sender.getSessionId(), this.participantPublicId); + throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, + "Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName + + "is closed"); + } + } + + public void receiveMediaFrom(Participant sender, String sdpAnswer, boolean silent) { + final String senderName = sender.getParticipantPublicId(); + + log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(), + senderName, this.session.getSessionId()); + log.trace("PARTICIPANT {}: SdpAnswer for {} is {}", this.getParticipantPublicId(), senderName, sdpAnswer); + + if (senderName.equals(this.getParticipantPublicId())) { + log.warn("PARTICIPANT {}: trying to configure loopback by subscribing", this.getParticipantPublicId()); + throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Can loopback only when publishing media"); + } + + KurentoParticipant kSender = (KurentoParticipant) sender; + + if (kSender.streaming && kSender.getPublisher() != null + && kSender.getPublisher().closingLock.readLock().tryLock()) { + + try { + final SubscriberEndpoint subscriber = getSubscriber(senderName); + if (subscriber.getEndpoint() == null) { + throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint"); + } + + try { + subscriber.subscribe(sdpAnswer, kSender.getPublisher()); log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(), senderName, this.session.getSessionId()); @@ -279,8 +325,6 @@ public class KurentoParticipant extends Participant { endpointConfig.getCdr().recordNewSubscriber(this, this.session.getSessionId(), sender.getPublisherStreamId(), sender.getParticipantPublicId(), subscriber.createdAt()); } - - return sdpAnswer; } catch (KurentoServerException e) { // TODO Check object status when KurentoClient sets this info in the object if (e.getCode() == 40101) { @@ -292,7 +336,6 @@ public class KurentoParticipant extends Participant { } this.subscribers.remove(senderName); releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false); - return null; } } finally { kSender.getPublisher().closingLock.readLock().unlock(); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index cad0d229..681941ea 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; -import io.openvidu.java.client.*; import org.apache.commons.lang3.RandomStringUtils; import org.kurento.client.GenericMediaElement; import org.kurento.client.IceCandidate; @@ -48,6 +47,12 @@ import com.google.gson.JsonObject; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.internal.ProtocolElements; +import io.openvidu.java.client.MediaMode; +import io.openvidu.java.client.Recording; +import io.openvidu.java.client.RecordingLayout; +import io.openvidu.java.client.RecordingMode; +import io.openvidu.java.client.RecordingProperties; +import io.openvidu.java.client.SessionProperties; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.FinalUser; import io.openvidu.server.core.IdentifierPrefixes; @@ -58,7 +63,6 @@ import io.openvidu.server.core.SessionManager; import io.openvidu.server.core.Token; import io.openvidu.server.kurento.endpoint.KurentoFilter; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; -import io.openvidu.server.kurento.endpoint.SdpType; import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.rpc.RpcHandler; @@ -368,7 +372,6 @@ public class KurentoSessionManager extends SessionManager { kurentoOptions.isOffer, kurentoOptions.sdpOffer, kurentoOptions.doLoopback, kurentoOptions.rtspUri, participant.getParticipantPublicId()); - SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER; KurentoSession kSession = kParticipant.getSession(); kParticipant.createPublishingEndpoint(mediaOptions, null); @@ -395,7 +398,7 @@ public class KurentoSessionManager extends SessionManager { } } - sdpAnswer = kParticipant.publishToRoom(sdpType, kurentoOptions.sdpOffer, kurentoOptions.doLoopback, false); + sdpAnswer = kParticipant.publishToRoom(kurentoOptions.sdpOffer, kurentoOptions.doLoopback, false); if (sdpAnswer == null) { OpenViduException e = new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, @@ -502,11 +505,54 @@ public class KurentoSessionManager extends SessionManager { } @Override - public void subscribe(Participant participant, String senderName, String sdpOffer, Integer transactionId) { - String sdpAnswer = null; + public void prepareSubscription(Participant participant, String senderPublicId, Integer transactionId) { + String sdpOffer = null; Session session = null; try { - log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpOffer={} ({})", senderName, sdpOffer, + log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpOffer={} ({})", senderPublicId, sdpOffer, + participant.getParticipantPublicId()); + + KurentoParticipant kParticipant = (KurentoParticipant) participant; + session = ((KurentoParticipant) participant).getSession(); + Participant senderParticipant = session.getParticipantByPublicId(senderPublicId); + + if (senderParticipant == null) { + log.warn( + "PARTICIPANT {}: Requesting to recv media from user {} " + + "in session {} but user could not be found", + participant.getParticipantPublicId(), senderPublicId, session.getSessionId()); + throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, + "User '" + senderPublicId + " not found in session '" + session.getSessionId() + "'"); + } + if (!senderParticipant.isStreaming()) { + log.warn( + "PARTICIPANT {}: Requesting to recv media from user {} " + + "in session {} but user is not streaming media", + participant.getParticipantPublicId(), senderPublicId, session.getSessionId()); + throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, + "User '" + senderPublicId + " not streaming media in session '" + session.getSessionId() + "'"); + } + + sdpOffer = kParticipant.prepareReceiveMediaFrom(senderParticipant); + if (sdpOffer == null) { + throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Unable to generate SDP offer when subscribing '" + + participant.getParticipantPublicId() + "' to '" + senderPublicId + "'"); + } + } catch (OpenViduException e) { + log.error("PARTICIPANT {}: Error preparing subscription to {}", participant.getParticipantPublicId(), + senderPublicId, e); + sessionEventsHandler.onPrepareSubscription(participant, session, null, transactionId, e); + } + if (sdpOffer != null) { + sessionEventsHandler.onPrepareSubscription(participant, session, sdpOffer, transactionId, null); + } + } + + @Override + public void subscribe(Participant participant, String senderName, String sdpAnswer, Integer transactionId) { + Session session = null; + try { + log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpAnswer={} ({})", senderName, sdpAnswer, participant.getParticipantPublicId()); KurentoParticipant kParticipant = (KurentoParticipant) participant; @@ -530,18 +576,11 @@ public class KurentoSessionManager extends SessionManager { "User '" + senderName + " not streaming media in session '" + session.getSessionId() + "'"); } - sdpAnswer = kParticipant.receiveMediaFrom(senderParticipant, sdpOffer, false); - if (sdpAnswer == null) { - throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, - "Unable to generate SDP answer when subscribing '" + participant.getParticipantPublicId() - + "' to '" + senderName + "'"); - } + kParticipant.receiveMediaFrom(senderParticipant, sdpAnswer, false); + sessionEventsHandler.onSubscribe(participant, session, transactionId, null); } catch (OpenViduException e) { log.error("PARTICIPANT {}: Error subscribing to {}", participant.getParticipantPublicId(), senderName, e); - sessionEventsHandler.onSubscribe(participant, session, null, transactionId, e); - } - if (sdpAnswer != null) { - sessionEventsHandler.onSubscribe(participant, session, sdpAnswer, transactionId, null); + sessionEventsHandler.onSubscribe(participant, session, transactionId, e); } } @@ -1046,7 +1085,7 @@ public class KurentoSessionManager extends SessionManager { } @Override - public void reconnectStream(Participant participant, String streamId, String sdpOffer, Integer transactionId) { + public void reconnectStream(Participant participant, String streamId, String sdpString, Integer transactionId) { KurentoParticipant kParticipant = (KurentoParticipant) participant; KurentoSession kSession = kParticipant.getSession(); @@ -1067,8 +1106,7 @@ public class KurentoSessionManager extends SessionManager { // 3) Create a new PublisherEndpoint connecting it to the previous PassThrough kParticipant.resetPublisherEndpoint(kurentoOptions, passThru); kParticipant.createPublishingEndpoint(kurentoOptions, streamId); - SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER; - String sdpAnswer = kParticipant.publishToRoom(sdpType, sdpOffer, kurentoOptions.doLoopback, true); + String sdpAnswer = kParticipant.publishToRoom(sdpString, kurentoOptions.doLoopback, true); sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(), kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer, @@ -1081,12 +1119,8 @@ public class KurentoSessionManager extends SessionManager { if (senderPrivateId != null) { KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId); kParticipant.cancelReceivingMedia(sender, null, true); - String sdpAnswer = kParticipant.receiveMediaFrom(sender, sdpOffer, true); - if (sdpAnswer == null) { - throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, - "Unable to generate SDP answer when reconnecting subscriber to '" + streamId + "'"); - } - sessionEventsHandler.onSubscribe(participant, kSession, sdpAnswer, transactionId, null); + kParticipant.receiveMediaFrom(sender, sdpString, true); + sessionEventsHandler.onSubscribe(participant, kSession, transactionId, null); } else { throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Stream '" + streamId + "' does not exist in Session '" + kSession.getSessionId() + "'"); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java index f4159ed3..d03f42e5 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java @@ -516,6 +516,24 @@ public abstract class MediaEndpoint { } } + protected String generateOffer() throws OpenViduException { + if (this.isWeb()) { + if (webEndpoint == null) { + throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, + "Can't generate offer when WebRtcEndpoint is null (ep: " + endpointName + ")"); + } + return webEndpoint.generateOffer(); + } else if (this.isPlayerEndpoint()) { + return ""; + } else { + if (endpoint == null) { + throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE, + "Can't generate offer when RtpEndpoint is null (ep: " + endpointName + ")"); + } + return endpoint.generateOffer(); + } + } + /** * If supported, it registers a listener for when a new {@link IceCandidate} is * gathered by the internal endpoint ({@link WebRtcEndpoint}) and sends it to diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index e35de370..b652cad2 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -175,57 +175,42 @@ public class PublisherEndpoint extends MediaEndpoint { /** * Initializes this media endpoint for publishing media and processes the SDP - * offer or answer. If the internal endpoint is an {@link WebRtcEndpoint}, it - * first registers an event listener for the ICE candidates and instructs the - * endpoint to start gathering the candidates. If required, it connects to - * itself (after applying the intermediate media elements and the - * {@link PassThrough}) to allow loopback of the media stream. + * offer. If the internal endpoint is an {@link WebRtcEndpoint}, it first + * registers an event listener for the ICE candidates and instructs the endpoint + * to start gathering the candidates. If required, it connects to itself (after + * applying the intermediate media elements and the {@link PassThrough}) to + * allow loopback of the media stream. * - * @param sdpType indicates the type of the sdpString (offer or - * answer) - * @param sdpString offer or answer from the remote peer - * @param doLoopback loopback flag - * @param loopbackAlternativeSrc alternative loopback source - * @param loopbackConnectionType how to connect the loopback source - * @return the SDP response (the answer if processing an offer SDP, otherwise is - * the updated offer generated previously by this endpoint) + * @param sdpOffer SDP offer from the remote peer + * @param doLoopback loopback flag + * @return the SDP answer */ - public synchronized String publish(SdpType sdpType, String sdpString, boolean doLoopback) { + public synchronized String publish(String sdpOffer, boolean doLoopback) { + String sdpResponse = processOffer(sdpOffer); registerOnIceCandidateEventListener(this.getOwner().getParticipantPublicId()); if (doLoopback) { - connect(this.getEndpoint()); + connect(this.getEndpoint(), false); } else { - innerConnect(); + innerConnect(false); } this.createdAt = System.currentTimeMillis(); - String sdpResponse = null; - switch (sdpType) { - case ANSWER: - sdpResponse = processAnswer(sdpString); - break; - case OFFER: - sdpResponse = processOffer(sdpString); - break; - default: - throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Sdp type not supported: " + sdpType); - } gatherCandidates(); return sdpResponse; } - public synchronized void connect(MediaElement sink) { + public synchronized void connect(MediaElement sink, boolean blocking) { if (!connected) { - innerConnect(); + innerConnect(blocking); } - internalSinkConnect(passThru, sink); + internalSinkConnect(passThru, sink, blocking); this.enableIpCameraIfNecessary(); } - public synchronized void connect(MediaElement sink, MediaType type) { + public synchronized void connect(MediaElement sink, MediaType type, boolean blocking) { if (!connected) { - innerConnect(); + innerConnect(blocking); } - internalSinkConnect(passThru, sink, type); + internalSinkConnect(passThru, sink, type, blocking); this.enableIpCameraIfNecessary(); } @@ -289,11 +274,11 @@ public class PublisherEndpoint extends MediaEndpoint { } if (connected) { if (first != null) { - internalSinkConnect(first, shaper, type); + internalSinkConnect(first, shaper, type, false); } else { - internalSinkConnect(this.getEndpoint(), shaper, type); + internalSinkConnect(this.getEndpoint(), shaper, type, false); } - internalSinkConnect(shaper, passThru, type); + internalSinkConnect(shaper, passThru, type, false); } elementIds.addFirst(id); elements.put(id, shaper); @@ -343,7 +328,7 @@ public class PublisherEndpoint extends MediaEndpoint { } else { prev = passThru; } - internalSinkConnect(next, prev); + internalSinkConnect(next, prev, false); } elementIds.remove(elementId); if (releaseElement) { @@ -408,13 +393,13 @@ public class PublisherEndpoint extends MediaEndpoint { } switch (muteType) { case ALL: - internalSinkConnect(this.getEndpoint(), sink); + internalSinkConnect(this.getEndpoint(), sink, false); break; case AUDIO: - internalSinkConnect(this.getEndpoint(), sink, MediaType.AUDIO); + internalSinkConnect(this.getEndpoint(), sink, MediaType.AUDIO, false); break; case VIDEO: - internalSinkConnect(this.getEndpoint(), sink, MediaType.VIDEO); + internalSinkConnect(this.getEndpoint(), sink, MediaType.VIDEO, false); break; } } @@ -440,7 +425,7 @@ public class PublisherEndpoint extends MediaEndpoint { return elementIds.get(idx - 1); } - private void innerConnect() { + private void innerConnect(boolean blocking) { if (this.getEndpoint() == null) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Can't connect null endpoint (ep: " + getEndpointName() + ")"); @@ -453,28 +438,32 @@ public class PublisherEndpoint extends MediaEndpoint { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "No media element with id " + prevId + " (ep: " + getEndpointName() + ")"); } - internalSinkConnect(current, prev); + internalSinkConnect(current, prev, blocking); current = prev; prevId = getPrevious(prevId); } - internalSinkConnect(current, passThru); + internalSinkConnect(current, passThru, blocking); connected = true; } - private void internalSinkConnect(final MediaElement source, final MediaElement sink) { - source.connect(sink, new Continuation() { - @Override - public void onSuccess(Void result) throws Exception { - log.debug("EP {}: Elements have been connected (source {} -> sink {})", getEndpointName(), - source.getId(), sink.getId()); - } + private void internalSinkConnect(final MediaElement source, final MediaElement sink, boolean blocking) { + if (blocking) { + source.connect(sink); + } else { + source.connect(sink, new Continuation() { + @Override + public void onSuccess(Void result) throws Exception { + log.debug("EP {}: Elements have been connected (source {} -> sink {})", getEndpointName(), + source.getId(), sink.getId()); + } - @Override - public void onError(Throwable cause) throws Exception { - log.warn("EP {}: Failed to connect media elements (source {} -> sink {})", getEndpointName(), - source.getId(), sink.getId(), cause); - } - }); + @Override + public void onError(Throwable cause) throws Exception { + log.warn("EP {}: Failed to connect media elements (source {} -> sink {})", getEndpointName(), + source.getId(), sink.getId(), cause); + } + }); + } } /** @@ -488,23 +477,28 @@ public class PublisherEndpoint extends MediaEndpoint { * be used instead * @see #internalSinkConnect(MediaElement, MediaElement) */ - private void internalSinkConnect(final MediaElement source, final MediaElement sink, final MediaType type) { + private void internalSinkConnect(final MediaElement source, final MediaElement sink, final MediaType type, + boolean blocking) { if (type == null) { - internalSinkConnect(source, sink); + internalSinkConnect(source, sink, blocking); } else { - source.connect(sink, type, new Continuation() { - @Override - public void onSuccess(Void result) throws Exception { - log.debug("EP {}: {} media elements have been connected (source {} -> sink {})", getEndpointName(), - type, source.getId(), sink.getId()); - } + if (blocking) { + source.connect(sink, type); + } else { + source.connect(sink, type, new Continuation() { + @Override + public void onSuccess(Void result) throws Exception { + log.debug("EP {}: {} media elements have been connected (source {} -> sink {})", + getEndpointName(), type, source.getId(), sink.getId()); + } - @Override - public void onError(Throwable cause) throws Exception { - log.warn("EP {}: Failed to connect {} media elements (source {} -> sink {})", getEndpointName(), - type, source.getId(), sink.getId(), cause); - } - }); + @Override + public void onError(Throwable cause) throws Exception { + log.warn("EP {}: Failed to connect {} media elements (source {} -> sink {})", getEndpointName(), + type, source.getId(), sink.getId(), cause); + } + }); + } } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java index db6309e4..ba6eeaca 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java @@ -18,7 +18,6 @@ package io.openvidu.server.kurento.endpoint; import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicBoolean; import org.kurento.client.MediaPipeline; import org.slf4j.Logger; @@ -38,8 +37,6 @@ import io.openvidu.server.kurento.core.KurentoParticipant; public class SubscriberEndpoint extends MediaEndpoint { private final static Logger log = LoggerFactory.getLogger(SubscriberEndpoint.class); - private AtomicBoolean connectedToPublisher = new AtomicBoolean(false); - private String publisherStreamId; public SubscriberEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName, @@ -47,23 +44,18 @@ public class SubscriberEndpoint extends MediaEndpoint { super(endpointType, owner, endpointName, pipeline, openviduConfig, log); } - public synchronized String subscribe(String sdpOffer, PublisherEndpoint publisher) { + public synchronized String prepareSubscription(PublisherEndpoint publisher) { registerOnIceCandidateEventListener(publisher.getOwner().getParticipantPublicId()); + publisher.connect(this.getEndpoint(), true); this.createdAt = System.currentTimeMillis(); - String sdpAnswer = processOffer(sdpOffer); - gatherCandidates(); - publisher.connect(this.getEndpoint()); - setConnectedToPublisher(true); this.publisherStreamId = publisher.getStreamId(); - return sdpAnswer; + String sdpOffer = generateOffer(); + gatherCandidates(); + return sdpOffer; } - public boolean isConnectedToPublisher() { - return connectedToPublisher.get(); - } - - public void setConnectedToPublisher(boolean connectedToPublisher) { - this.connectedToPublisher.set(connectedToPublisher); + public synchronized void subscribe(String sdpAnswer, PublisherEndpoint publisher) { + processAnswer(sdpAnswer); } @Override diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java b/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java index 7b0b550a..d8946d86 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java @@ -111,7 +111,7 @@ public class CompositeWrapper { public void connectPublisherEndpoint(PublisherEndpoint endpoint) throws OpenViduException { HubPort hubPort = new HubPort.Builder(composite).build(); - endpoint.connect(hubPort); + endpoint.connect(hubPort, false); String streamId = endpoint.getOwner().getPublisherStreamId(); this.hubPorts.put(streamId, hubPort); this.publisherEndpoints.put(streamId, endpoint); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index 0e246e34..1691d833 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -391,14 +391,14 @@ public class SingleStreamRecordingService extends RecordingService { MediaProfileSpecType profile) { switch (profile) { case WEBM: - publisherEndpoint.connect(recorder, MediaType.AUDIO); - publisherEndpoint.connect(recorder, MediaType.VIDEO); + publisherEndpoint.connect(recorder, MediaType.AUDIO, false); + publisherEndpoint.connect(recorder, MediaType.VIDEO, false); break; case WEBM_AUDIO_ONLY: - publisherEndpoint.connect(recorder, MediaType.AUDIO); + publisherEndpoint.connect(recorder, MediaType.AUDIO, false); break; case WEBM_VIDEO_ONLY: - publisherEndpoint.connect(recorder, MediaType.VIDEO); + publisherEndpoint.connect(recorder, MediaType.VIDEO, false); break; default: throw new UnsupportedOperationException("Unsupported profile when single stream recording: " + profile); diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java index bece2d68..4e742517 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java @@ -126,6 +126,9 @@ public class RpcHandler extends DefaultJsonRpcHandler { case ProtocolElements.ONICECANDIDATE_METHOD: onIceCandidate(rpcConnection, request); break; + case ProtocolElements.PREPARERECEIVEVIDEO_METHOD: + prepareReceiveVideoFrom(rpcConnection, request); + break; case ProtocolElements.RECEIVEVIDEO_METHOD: receiveVideoFrom(rpcConnection, request); break; @@ -333,6 +336,20 @@ public class RpcHandler extends DefaultJsonRpcHandler { } } + private void prepareReceiveVideoFrom(RpcConnection rpcConnection, Request request) { + Participant participant; + try { + participant = sanityCheckOfSession(rpcConnection, "subscribe"); + } catch (OpenViduException e) { + return; + } + + String senderStreamId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM); + String senderPublicId = parseSenderPublicIdFromStreamId(senderStreamId); + + sessionManager.prepareSubscription(participant, senderPublicId, request.getId()); + } + private void receiveVideoFrom(RpcConnection rpcConnection, Request request) { Participant participant; try { @@ -341,23 +358,12 @@ public class RpcHandler extends DefaultJsonRpcHandler { return; } - String senderPublicId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM); + String senderStreamId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM); + String sdpAnswer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM); - // Parse sender public id from stream id - if (senderPublicId.startsWith(IdentifierPrefixes.STREAM_ID + "IPC_") - && senderPublicId.contains(IdentifierPrefixes.IPCAM_ID)) { - // If IPCAM - senderPublicId = senderPublicId.substring(senderPublicId.indexOf("_" + IdentifierPrefixes.IPCAM_ID) + 1, - senderPublicId.length()); - } else { - // Not IPCAM - senderPublicId = senderPublicId.substring( - senderPublicId.lastIndexOf(IdentifierPrefixes.PARTICIPANT_PUBLIC_ID), senderPublicId.length()); - } + String senderPublicId = parseSenderPublicIdFromStreamId(senderStreamId); - String sdpOffer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPOFFER_PARAM); - - sessionManager.subscribe(participant, senderPublicId, sdpOffer, request.getId()); + sessionManager.subscribe(participant, senderPublicId, sdpAnswer, request.getId()); } private void unsubscribeFromVideo(RpcConnection rpcConnection, Request request) { @@ -622,9 +628,9 @@ public class RpcHandler extends DefaultJsonRpcHandler { return; } String streamId = getStringParam(request, ProtocolElements.RECONNECTSTREAM_STREAM_PARAM); - String sdpOffer = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM); + String sdpString = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM); try { - sessionManager.reconnectStream(participant, streamId, sdpOffer, request.getId()); + sessionManager.reconnectStream(participant, streamId, sdpString, request.getId()); } catch (OpenViduException e) { this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(), new JsonObject(), e); @@ -800,4 +806,20 @@ public class RpcHandler extends DefaultJsonRpcHandler { .equals(this.sessionManager.getParticipantPrivateIdFromStreamId(sessionId, streamId)); } + private String parseSenderPublicIdFromStreamId(String streamId) { + String senderPublicId; + // Parse sender public id from stream id + if (streamId.startsWith(IdentifierPrefixes.STREAM_ID + "IPC_") + && streamId.contains(IdentifierPrefixes.IPCAM_ID)) { + // If IPCAM + senderPublicId = streamId.substring(streamId.indexOf("_" + IdentifierPrefixes.IPCAM_ID) + 1, + streamId.length()); + } else { + // Not IPCAM + senderPublicId = streamId.substring(streamId.lastIndexOf(IdentifierPrefixes.PARTICIPANT_PUBLIC_ID), + streamId.length()); + } + return senderPublicId; + } + }