From 21615755f991bc72cdace7a651778c75cb982e13 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Mon, 31 May 2021 15:20:54 +0200 Subject: [PATCH] Support subscription with different client-server negotiation order --- openvidu-browser/src/OpenVidu/Session.ts | 6 +- openvidu-browser/src/OpenVidu/Stream.ts | 125 +++++-- .../src/OpenViduInternal/Events/Event.ts | 4 +- .../openvidu/server/core/SessionManager.java | 19 +- .../kurento/core/KurentoParticipant.java | 328 +++++++----------- .../kurento/core/KurentoSessionManager.java | 265 ++++++-------- .../kurento/endpoint/SubscriberEndpoint.java | 18 +- .../io/openvidu/server/rpc/RpcHandler.java | 74 +--- 8 files changed, 352 insertions(+), 487 deletions(-) diff --git a/openvidu-browser/src/OpenVidu/Session.ts b/openvidu-browser/src/OpenVidu/Session.ts index bcafc362..6a0d2a7f 100644 --- a/openvidu-browser/src/OpenVidu/Session.ts +++ b/openvidu-browser/src/OpenVidu/Session.ts @@ -197,7 +197,7 @@ export class Session extends EventDispatcher { * #### Events dispatched * * The [[Session]] object of the local participant will dispatch a `sessionDisconnected` event. - * This event will automatically unsubscribe the leaving participant from every Subscriber object of the session (this includes closing the WebRTCPeer connection and disposing all MediaStreamTracks) + * This event will automatically unsubscribe the leaving participant from every Subscriber object of the session (this includes closing the RTCPeerConnection and disposing all MediaStreamTracks) * and also deletes any HTML video element associated to each Subscriber (only those [created by OpenVidu Browser](/en/stable/cheatsheet/manage-videos/#let-openvidu-take-care-of-the-video-players)). * For every video removed, each Subscriber object will dispatch a `videoElementDestroyed` event. * Call `event.preventDefault()` upon event `sessionDisconnected` to avoid this behavior and take care of disposing and cleaning all the Subscriber objects yourself. @@ -210,7 +210,7 @@ export class Session extends EventDispatcher { * or/and `Session.disconnect()` in the previous session). See [[StreamEvent]] and [[VideoElementEvent]] to learn more. * * The [[Session]] object of every other participant connected to the session will dispatch a `streamDestroyed` event if the disconnected participant was publishing. - * This event will automatically unsubscribe the Subscriber object from the session (this includes closing the WebRTCPeer connection and disposing all MediaStreamTracks) + * This event will automatically unsubscribe the Subscriber object from the session (this includes closing the RTCPeerConnection and disposing all MediaStreamTracks) * and also deletes any HTML video element associated to that Subscriber (only those [created by OpenVidu Browser](/en/stable/cheatsheet/manage-videos/#let-openvidu-take-care-of-the-video-players)). * For every video removed, the Subscriber object will dispatch a `videoElementDestroyed` event. * Call `event.preventDefault()` upon event `streamDestroyed` to avoid this default behavior and take care of disposing and cleaning the Subscriber object yourself. @@ -437,7 +437,7 @@ export class Session extends EventDispatcher { * Call `event.preventDefault()` upon event `streamDestroyed` if you want to clean the Publisher object on your own or re-publish it in a different Session. * * The [[Session]] object of every other participant connected to the session will dispatch a `streamDestroyed` event. - * This event will automatically unsubscribe the Subscriber object from the session (this includes closing the WebRTCPeer connection and disposing all MediaStreamTracks) and + * This event will automatically unsubscribe the Subscriber object from the session (this includes closing the RTCPeerConnection and disposing all MediaStreamTracks) and * delete any HTML video element associated to it (only those [created by OpenVidu Browser](/en/stable/cheatsheet/manage-videos/#let-openvidu-take-care-of-the-video-players)). * For every video removed, the Subscriber object will dispatch a `videoElementDestroyed` event. * Call `event.preventDefault()` upon event `streamDestroyed` to avoid this default behavior and take care of disposing and cleaning the Subscriber object on your own. diff --git a/openvidu-browser/src/OpenVidu/Stream.ts b/openvidu-browser/src/OpenVidu/Stream.ts index 2e4cfb80..cece1ecf 100644 --- a/openvidu-browser/src/OpenVidu/Stream.ts +++ b/openvidu-browser/src/OpenVidu/Stream.ts @@ -281,6 +281,19 @@ export class Stream { } + /** + * Recreates the media connection with the server. This entails the disposal of the previous RTCPeerConnection and the re-negotiation + * of a new one, that will apply the same properties. + * + * This method can be useful in those situations were there the media connection breaks and OpenVidu is not able to recover on its own + * for any kind of unanticipated reason (see [Automatic reconnection](/en/latest/advanced-features/automatic-reconnection/)). + * + * @returns A Promise (to which you can optionally subscribe to) that is resolved if the reconnection operation was successful and rejected with an Error object if not + */ + public reconnect(): Promise { + return this.reconnectStream('API'); + } + /** * Applies an audio/video filter to the stream. * @@ -465,11 +478,12 @@ export class Stream { * @hidden */ disposeWebRtcPeer(): void { + const webrtcId: string = this.webRtcPeer.id; if (!!this.webRtcPeer) { this.webRtcPeer.dispose(); this.stopWebRtcStats(); } - logger.info((!!this.outboundStreamOpts ? 'Outbound ' : 'Inbound ') + "WebRTCPeer from 'Stream' with id [" + this.streamId + '] is now closed'); + logger.info((!!this.outboundStreamOpts ? 'Outbound ' : 'Inbound ') + "RTCPeerConnection with id [" + webrtcId + "] from 'Stream' with id [" + this.streamId + '] is now closed'); } /** @@ -967,6 +981,9 @@ export class Stream { } const finalResolve = () => { + logger.info("'Subscriber' (" + this.streamId + ") successfully " + (reconnect ? "reconnected" : "subscribed")); + this.remotePeerSuccessfullyEstablished(reconnect); + this.initWebRtcStats(); if (reconnect) { this.reconnectionEventEmitter?.emitEvent('success'); delete this.reconnectionEventEmitter; @@ -982,20 +999,50 @@ export class Stream { reject(error); } + if (this.session.openvidu.mediaServer === 'mediasoup') { + + // Server initiates negotiation + + this.initWebRtcPeerReceiveFromServer(reconnect) + .then(() => finalResolve()) + .catch(error => finalReject(error)); + + } else { + + // Client initiates negotiation + + this.initWebRtcPeerReceiveFromClient(reconnect) + .then(() => finalResolve()) + .catch(error => finalReject(error)); + + } + }); + } + + /** + * @hidden + */ + initWebRtcPeerReceiveFromClient(reconnect: boolean): Promise { + return new Promise((resolve, reject) => { + this.completeWebRtcPeerReceive(reconnect).then(response => { + this.webRtcPeer.processRemoteAnswer(response.sdpAnswer) + .then(() => resolve()).catch(error => reject(error)); + }).catch(error => reject(error)); + }); + } + + /** + * @hidden + */ + initWebRtcPeerReceiveFromServer(reconnect: boolean): Promise { + return new Promise((resolve, reject) => { + // Server initiates negotiation this.session.openvidu.sendRequest('prepareReceiveVideoFrom', { sender: this.streamId, reconnect }, (error, response) => { if (error) { - finalReject(new Error('Error on prepareReceiveVideoFrom: ' + JSON.stringify(error))); + reject(new Error('Error on prepareReceiveVideoFrom: ' + JSON.stringify(error))); } else { - this.completeWebRtcPeerReceive(response.sdpOffer, reconnect) - .then(() => { - logger.info("'Subscriber' (" + this.streamId + ") successfully " + (reconnect ? "reconnected" : "subscribed")); - this.remotePeerSuccessfullyEstablished(reconnect); - this.initWebRtcStats(); - finalResolve(); - }) - .catch(error => { - finalReject(error); - }); + this.completeWebRtcPeerReceive(reconnect, response.sdpOffer) + .then(() => resolve()).catch(error => reject(error)); } }); }); @@ -1004,7 +1051,7 @@ export class Stream { /** * @hidden */ - completeWebRtcPeerReceive(sdpOffer: string, reconnect: boolean): Promise { + completeWebRtcPeerReceive(reconnect: boolean, sdpOfferByServer?: string): Promise { return new Promise((resolve, reject) => { const offerConstraints = { @@ -1021,20 +1068,24 @@ export class Stream { simulcast: false }; - const successAnswerCallback = (sdpAnswer) => { - logger.debug('Sending SDP answer to subscribe to ' - + this.streamId, sdpAnswer); + const sendSdpToServer = (sdpString: string) => { + + logger.debug(`Sending local SDP ${(!!sdpOfferByServer ? 'answer' : 'offer')} to subscribe to ${this.streamId}`, sdpString); const method = reconnect ? 'reconnectStream' : 'receiveVideoFrom'; const params = {}; params[reconnect ? 'stream' : 'sender'] = this.streamId; - params[reconnect ? 'sdpString' : 'sdpAnswer'] = sdpAnswer; + if (!!sdpOfferByServer) { + params[reconnect ? 'sdpString' : 'sdpAnswer'] = sdpString; + } else { + params['sdpOffer'] = sdpString; + } this.session.openvidu.sendRequest(method, params, (error, response) => { if (error) { reject(new Error('Error on ' + method + ' : ' + JSON.stringify(error))); } else { - resolve(); + resolve(response); } }); }; @@ -1044,22 +1095,36 @@ export class Stream { } this.webRtcPeer = new WebRtcPeerRecvonly(options); this.webRtcPeer.addIceConnectionStateChangeListener(this.streamId); - this.webRtcPeer.processRemoteOffer(sdpOffer) - .then(() => { + + if (!!sdpOfferByServer) { + + this.webRtcPeer.processRemoteOffer(sdpOfferByServer).then(() => { this.webRtcPeer.createAnswer().then(sdpAnswer => { - this.webRtcPeer.processLocalAnswer(sdpAnswer) - .then(() => { - successAnswerCallback(sdpAnswer.sdp); - }).catch(error => { - reject(new Error('(subscribe) SDP process local answer error: ' + JSON.stringify(error))); - }); + this.webRtcPeer.processLocalAnswer(sdpAnswer).then(() => { + sendSdpToServer(sdpAnswer.sdp!); + }).catch(error => { + reject(new Error('(subscribe) SDP process local answer error: ' + JSON.stringify(error))); + }); }).catch(error => { reject(new Error('(subscribe) SDP create answer error: ' + JSON.stringify(error))); }); - }) - .catch(error => { + }).catch(error => { reject(new Error('(subscribe) SDP process remote offer error: ' + JSON.stringify(error))); }); + + } else { + + this.webRtcPeer.createOffer().then(sdpOffer => { + this.webRtcPeer.processLocalOffer(sdpOffer).then(() => { + sendSdpToServer(sdpOffer.sdp!); + }).catch(error => { + reject(new Error('(subscribe) SDP process local offer error: ' + JSON.stringify(error))); + }); + }).catch(error => { + reject(new Error('(subscribe) SDP create offer error: ' + JSON.stringify(error))); + }); + + } }); } @@ -1198,7 +1263,7 @@ export class Stream { } } - public async reconnectStream(event: string): Promise { + private async reconnectStream(event: string) { const isWsConnected = await this.isWebsocketConnected(event, 3000); if (isWsConnected) { // There is connection to openvidu-server. The RTCPeerConnection is the only one broken @@ -1241,7 +1306,7 @@ export class Stream { }); } - async awaitWebRtcPeerConnectionState(timeout: number): Promise { + private async awaitWebRtcPeerConnectionState(timeout: number): Promise { let state = this.getRTCPeerConnection().iceConnectionState; const interval = 150; const intervals = Math.ceil(timeout / interval); diff --git a/openvidu-browser/src/OpenViduInternal/Events/Event.ts b/openvidu-browser/src/OpenViduInternal/Events/Event.ts index 5d87dfd6..61c743a3 100644 --- a/openvidu-browser/src/OpenViduInternal/Events/Event.ts +++ b/openvidu-browser/src/OpenViduInternal/Events/Event.ts @@ -60,14 +60,14 @@ export abstract class Event { /** * Prevents the default behavior of the event. The following events have a default behavior: * - * - `sessionDisconnected`: dispatched by [[Session]] object, automatically unsubscribes the leaving participant from every Subscriber object of the session (this includes closing the WebRTCPeer connection and disposing all MediaStreamTracks) + * - `sessionDisconnected`: dispatched by [[Session]] object, automatically unsubscribes the leaving participant from every Subscriber object of the session (this includes closing the RTCPeerConnection and disposing all MediaStreamTracks) * and also deletes any HTML video element associated to each Subscriber (only those created by OpenVidu Browser, either by passing a valid parameter as `targetElement` in method [[Session.subscribe]] or * by calling [[Subscriber.createVideoElement]]). For every video removed, each Subscriber object will also dispatch a `videoElementDestroyed` event. * * - `streamDestroyed`: * - If dispatched by a [[Publisher]] (*you* have unpublished): automatically stops all media tracks and deletes any HTML video element associated to it (only those created by OpenVidu Browser, either by passing a valid parameter as `targetElement` * in method [[OpenVidu.initPublisher]] or by calling [[Publisher.createVideoElement]]). For every video removed, the Publisher object will also dispatch a `videoElementDestroyed` event. - * - If dispatched by [[Session]] (*other user* has unpublished): automatically unsubscribes the proper Subscriber object from the session (this includes closing the WebRTCPeer connection and disposing all MediaStreamTracks) + * - If dispatched by [[Session]] (*other user* has unpublished): automatically unsubscribes the proper Subscriber object from the session (this includes closing the RTCPeerConnection and disposing all MediaStreamTracks) * and also deletes any HTML video element associated to that Subscriber (only those created by OpenVidu Browser, either by passing a valid parameter as `targetElement` in method [[Session.subscribe]] or * by calling [[Subscriber.createVideoElement]]). For every video removed, the Subscriber object will also dispatch a `videoElementDestroyed` event. */ diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index 27d692d0..c986678f 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -116,15 +116,8 @@ public abstract class SessionManager { public abstract void prepareSubscription(Participant participant, String senderPublicId, boolean reconnect, Integer id); - // TODO: REMOVE ON 2.18.0 - public abstract void subscribe(Participant participant, String senderName, String sdpAnwser, Integer transactionId, - boolean is2180); - // END TODO - - // TODO: UNCOMMENT ON 2.18.0 - // public abstract void subscribe(Participant participant, String senderName, - // String sdpAnwser, Integer transactionId); - // END TODO + public abstract void subscribe(Participant participant, String senderName, String sdpString, Integer transactionId, + boolean initByServer); public abstract void unsubscribe(Participant participant, String senderName, Integer transactionId); @@ -180,13 +173,11 @@ public abstract class SessionManager { public abstract Participant publishIpcam(Session session, MediaOptions mediaOptions, ConnectionProperties connectionProperties) throws Exception; - public abstract void reconnectStream(Participant participant, String streamId, String sdpOffer, + public abstract void reconnectPublisher(Participant participant, String streamId, String sdpOffer, Integer transactionId); - // TODO: REMOVE ON 2.18.0 - public abstract void reconnectStream2170(Participant participant, String streamId, String sdpOffer, - Integer transactionId); - // END TODO + public abstract void reconnectSubscriber(Participant participant, String streamId, String sdpString, + Integer transactionId, boolean initByServer); public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) throws OpenViduException; diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index 6f68b914..79ed1b39 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.Lock; import java.util.function.Function; import org.apache.commons.lang3.RandomStringUtils; @@ -220,240 +220,100 @@ public class KurentoParticipant extends Participant { } KurentoParticipant kSender = (KurentoParticipant) sender; + if (kSender.streaming && kSender.getPublisher() != null) { - if (kSender.streaming && kSender.getPublisher() != null - && kSender.getPublisher().closingLock.readLock().tryLock()) { - - try { - log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(), - senderName); - - SubscriberEndpoint subscriber = getNewOrExistingSubscriber(senderName); - + final Lock closingReadLock = kSender.getPublisher().closingLock.readLock(); + if (closingReadLock.tryLock()) { try { - CountDownLatch subscriberLatch = new CountDownLatch(1); - Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch); + + SubscriberEndpoint subscriber = initializeSubscriberEndpoint(kSender); try { - if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { - throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, - "Timeout reached when creating subscriber endpoint"); - } - } catch (InterruptedException e) { - throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, - "Interrupted when creating subscriber endpoint: " + e.getMessage()); - } - if (oldMediaEndpoint != null) { - log.warn( - "PARTICIPANT {}: Two threads are trying to create at " - + "the same time a subscriber endpoint for user {}", - this.getParticipantPublicId(), senderName); + String sdpOffer = subscriber.prepareSubscription(kSender.getPublisher()); + log.trace("PARTICIPANT {}: Subscribing SdpOffer is {}", this.getParticipantPublicId(), + sdpOffer); + log.info("PARTICIPANT {}: offer prepared to receive media from {} in room {}", + this.getParticipantPublicId(), senderName, this.session.getSessionId()); + return sdpOffer; + } catch (KurentoServerException e) { + log.error("Exception preparing subscriber endpoint for user {}: {}", + this.getParticipantPublicId(), e.getMessage()); + this.subscribers.remove(senderName); + releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false); return null; } - if (subscriber.getEndpoint() == null) { - throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, - "Unable to create subscriber endpoint"); - } - - String subscriberEndpointName = calculateSubscriberEndpointName(kSender); - - subscriber.setEndpointName(subscriberEndpointName); - subscriber.getEndpoint().setName(subscriberEndpointName); - subscriber.setStreamId(kSender.getPublisherStreamId()); - - endpointConfig.addEndpointListeners(subscriber, "subscriber"); - - } catch (OpenViduException e) { - this.subscribers.remove(senderName); - throw e; + } finally { + closingReadLock.unlock(); } - - log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(), - senderName); - try { - String sdpOffer = subscriber.prepareSubscription(kSender.getPublisher()); - log.trace("PARTICIPANT {}: Subscribing SdpOffer is {}", this.getParticipantPublicId(), sdpOffer); - log.info("PARTICIPANT {}: offer prepared to receive media from {} in room {}", - this.getParticipantPublicId(), senderName, this.session.getSessionId()); - return sdpOffer; - } catch (KurentoServerException e) { - log.error("Exception preparing subscriber endpoint for user {}: {}", this.getParticipantPublicId(), - e.getMessage()); - this.subscribers.remove(senderName); - releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false); - return null; - } - } finally { - kSender.getPublisher().closingLock.readLock().unlock(); } - } else { - log.error( - "PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ", - senderName, sender.getSessionId(), this.participantPublicId); - throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, - "Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName - + "is closed"); } + log.error( + "PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ", + senderName, sender.getSessionId(), this.participantPublicId); + throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, + "Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName + "is closed"); } - public void receiveMediaFrom2180(Participant sender, String sdpAnswer, boolean silent) { + public String receiveMedia(Participant sender, String sdpString, boolean silent, boolean initByServer) { final String senderName = sender.getParticipantPublicId(); - log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(), senderName, this.session.getSessionId()); - log.trace("PARTICIPANT {}: SdpAnswer for {} is {}", this.getParticipantPublicId(), senderName, sdpAnswer); + log.trace("PARTICIPANT {}: Sdp string for {} is {}", this.getParticipantPublicId(), senderName, sdpString); if (senderName.equals(this.getParticipantPublicId())) { log.warn("PARTICIPANT {}: trying to configure loopback by subscribing", this.getParticipantPublicId()); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Can loopback only when publishing media"); } - KurentoParticipant kSender = (KurentoParticipant) sender; + if (kSender.streaming && kSender.getPublisher() != null) { - if (kSender.streaming && kSender.getPublisher() != null - && kSender.getPublisher().closingLock.readLock().tryLock()) { - - try { - final SubscriberEndpoint subscriber = getSubscriber(senderName); - if (subscriber.getEndpoint() == null) { - throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint"); - } - + final Lock closingReadLock = kSender.getPublisher().closingLock.readLock(); + if (closingReadLock.tryLock()) { try { - subscriber.subscribe(sdpAnswer, kSender.getPublisher()); - log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(), - senderName, this.session.getSessionId()); - if (!silent - && !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { - endpointConfig.getCdr().recordNewSubscriber(this, sender.getPublisherStreamId(), - sender.getParticipantPublicId(), subscriber.createdAt()); - } - } catch (KurentoServerException e) { - // TODO Check object status when KurentoClient sets this info in the object - if (e.getCode() == 40101) { - log.warn( - "Publisher endpoint was already released when trying to connect a subscriber endpoint to it", - e); - } else { - log.error("Exception connecting subscriber endpoint to publisher endpoint", e); - } - this.subscribers.remove(senderName); - releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false); - } - } finally { - kSender.getPublisher().closingLock.readLock().unlock(); - } - } else { - log.error( - "PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ", - senderName, sender.getSessionId(), this.participantPublicId); - throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, - "Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName - + "is closed"); - } - } + // If initialized by server SubscriberEndpoint was created on + // prepareReceiveMediaFrom. If initialized by client must be created now + final SubscriberEndpoint subscriber = initByServer ? getSubscriber(senderName) + : initializeSubscriberEndpoint(kSender); - public String receiveMediaFrom2170(Participant sender, String sdpOffer, boolean silent) { - final String senderName = sender.getParticipantPublicId(); - - log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(), - senderName, this.session.getSessionId()); - log.trace("PARTICIPANT {}: SdpOffer for {} is {}", this.getParticipantPublicId(), senderName, sdpOffer); - - if (senderName.equals(this.getParticipantPublicId())) { - log.warn("PARTICIPANT {}: trying to configure loopback by subscribing", this.getParticipantPublicId()); - throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Can loopback only when publishing media"); - } - - KurentoParticipant kSender = (KurentoParticipant) sender; - - if (kSender.streaming && kSender.getPublisher() != null - && kSender.getPublisher().closingLock.readLock().tryLock()) { - - try { - log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(), - senderName); - - SubscriberEndpoint subscriber = getNewOrExistingSubscriber(senderName); - - try { - CountDownLatch subscriberLatch = new CountDownLatch(1); - Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch); - - try { - if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { - throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, - "Timeout reached when creating subscriber endpoint"); - } - } catch (InterruptedException e) { - throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, - "Interrupted when creating subscriber endpoint: " + e.getMessage()); - } - if (oldMediaEndpoint != null) { - log.warn( - "PARTICIPANT {}: Two threads are trying to create at " - + "the same time a subscriber endpoint for user {}", - this.getParticipantPublicId(), senderName); - return null; - } if (subscriber.getEndpoint() == null) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint"); } - - String subscriberEndpointName = calculateSubscriberEndpointName(kSender); - - subscriber.setEndpointName(subscriberEndpointName); - subscriber.getEndpoint().setName(subscriberEndpointName); - subscriber.setStreamId(kSender.getPublisherStreamId()); - - endpointConfig.addEndpointListeners(subscriber, "subscriber"); - - } catch (OpenViduException e) { - this.subscribers.remove(senderName); - throw e; + try { + String sdpAnswer = subscriber.subscribe(sdpString, kSender.getPublisher()); + log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", + this.getParticipantPublicId(), senderName, this.session.getSessionId()); + if (!silent && !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID + .equals(this.getParticipantPublicId())) { + endpointConfig.getCdr().recordNewSubscriber(this, sender.getPublisherStreamId(), + sender.getParticipantPublicId(), subscriber.createdAt()); + } + return sdpAnswer; + } catch (KurentoServerException e) { + // TODO Check object status when KurentoClient sets this info in the object + if (e.getCode() == 40101) { + log.warn( + "Publisher endpoint was already released when trying to connect a subscriber endpoint to it", + e); + } else { + log.error("Exception connecting subscriber endpoint to publisher endpoint", e); + } + this.subscribers.remove(senderName); + releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false); + return null; + } + } finally { + closingReadLock.unlock(); } - log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(), - senderName); - try { - String sdpAnswer = subscriber.subscribe(sdpOffer, kSender.getPublisher()); - log.trace("PARTICIPANT {}: Subscribing SdpAnswer is {}", this.getParticipantPublicId(), sdpAnswer); - log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(), - senderName, this.session.getSessionId()); - - if (!silent - && !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { - endpointConfig.getCdr().recordNewSubscriber(this, sender.getPublisherStreamId(), - sender.getParticipantPublicId(), subscriber.createdAt()); - } - - return sdpAnswer; - } catch (KurentoServerException e) { - // TODO Check object status when KurentoClient sets this info in the object - if (e.getCode() == 40101) { - log.warn( - "Publisher endpoint was already released when trying to connect a subscriber endpoint to it", - e); - } else { - log.error("Exception connecting subscriber endpoint to publisher endpoint", e); - } - this.subscribers.remove(senderName); - releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false); - return null; - } - } finally { - kSender.getPublisher().closingLock.readLock().unlock(); } - } else { - log.error( - "PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ", - senderName, sender.getSessionId(), this.participantPublicId); - throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, - "Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName - + "is closed"); } + log.error( + "PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ", + senderName, sender.getSessionId(), this.participantPublicId); + throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, + "Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName + "is closed"); } public void cancelReceivingMedia(KurentoParticipant senderKurentoParticipant, EndReason reason, boolean silent) { @@ -461,7 +321,8 @@ public class KurentoParticipant extends Participant { final PublisherEndpoint pub = senderKurentoParticipant.publisher; if (pub != null) { try { - if (pub.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + final Lock closingWriteLock = pub.closingLock.writeLock(); + if (closingWriteLock.tryLock(15, TimeUnit.SECONDS)) { try { log.info("PARTICIPANT {}: cancel receiving media from {}", this.getParticipantPublicId(), senderName); @@ -478,7 +339,7 @@ public class KurentoParticipant extends Participant { this.getParticipantPublicId(), senderName, this.session.getSessionId()); } } finally { - pub.closingLock.writeLock().unlock(); + closingWriteLock.unlock(); } } else { log.error( @@ -585,15 +446,66 @@ public class KurentoParticipant extends Participant { return this.getParticipantPublicId() + "_" + senderParticipant.getPublisherStreamId(); } + private SubscriberEndpoint initializeSubscriberEndpoint(Participant kSender) { + + String senderName = kSender.getParticipantPublicId(); + + log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(), + senderName); + + SubscriberEndpoint subscriber = getNewOrExistingSubscriber(senderName); + + try { + CountDownLatch subscriberLatch = new CountDownLatch(1); + Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch); + + try { + if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { + throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, + "Timeout reached when creating subscriber endpoint"); + } + } catch (InterruptedException e) { + throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, + "Interrupted when creating subscriber endpoint: " + e.getMessage()); + } + if (oldMediaEndpoint != null) { + log.warn( + "PARTICIPANT {}: Two threads are trying to create at " + + "the same time a subscriber endpoint for user {}", + this.getParticipantPublicId(), senderName); + return null; + } + if (subscriber.getEndpoint() == null) { + throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint"); + } + + String subscriberEndpointName = calculateSubscriberEndpointName(kSender); + + subscriber.setEndpointName(subscriberEndpointName); + subscriber.getEndpoint().setName(subscriberEndpointName); + subscriber.setStreamId(kSender.getPublisherStreamId()); + + endpointConfig.addEndpointListeners(subscriber, "subscriber"); + + } catch (OpenViduException e) { + this.subscribers.remove(senderName); + throw e; + } + + log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(), senderName); + + return subscriber; + } + private void releasePublisherEndpoint(EndReason reason, Long kmsDisconnectionTime) { if (publisher != null && publisher.getEndpoint() != null) { - final ReadWriteLock closingLock = publisher.closingLock; try { - if (closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + final Lock closingWriteLock = publisher.closingLock.writeLock(); + if (closingWriteLock.tryLock(15, TimeUnit.SECONDS)) { try { this.releasePublisherEndpointAux(reason, kmsDisconnectionTime); } finally { - closingLock.writeLock().unlock(); + closingWriteLock.unlock(); } } } catch (InterruptedException e) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 87cf7182..714b46bf 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -596,12 +596,12 @@ public class KurentoSessionManager extends SessionManager { } @Override - public void subscribe(Participant participant, String senderName, String sdpAnswer, Integer transactionId, - boolean is2180) { + public void subscribe(Participant participant, String senderName, String sdpString, Integer transactionId, + boolean initByServer) { Session session = null; try { - log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpAnswer={} ({})", senderName, sdpAnswer, + log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpString={} ({})", senderName, sdpString, participant.getParticipantPublicId()); KurentoParticipant kParticipant = (KurentoParticipant) participant; @@ -627,48 +627,44 @@ public class KurentoSessionManager extends SessionManager { String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(senderParticipant); - // TODO: REMOVE ON 2.18.0 - if (is2180) { + if (initByServer) { + + // Server initiated negotiation. sdpString is the SDP Answer of the client - // Client's SDPAnswer to the server's SDPOffer CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client, - WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); + WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, sdpString)); - kParticipant.receiveMediaFrom2180(senderParticipant, sdpAnswer, false); + kParticipant.receiveMedia(senderParticipant, sdpString, false, true); sessionEventsHandler.onSubscribe(participant, session, transactionId, null); + } else { + // Client initiated negotiation. sdpString is the SDP Offer of the client + boolean isTranscodingAllowed = session.getSessionProperties().isTranscodingAllowed(); VideoCodec forcedVideoCodec = session.getSessionProperties().forcedVideoCodec(); + String sdpOffer = sdpString; // Modify sdp if forced codec is defined if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) { - sdpAnswer = sdpMunging.forceCodec(sdpAnswer, participant, false, false, isTranscodingAllowed, + sdpOffer = sdpMunging.forceCodec(sdpString, participant, false, false, isTranscodingAllowed, forcedVideoCodec, false); CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client, - WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOfferMunged, sdpAnswer)); + WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpOfferMunged, sdpOffer)); } - String finalSdpAnswer = kParticipant.receiveMediaFrom2170(senderParticipant, sdpAnswer, false); - if (finalSdpAnswer == null) { + String sdpAnswer = kParticipant.receiveMedia(senderParticipant, sdpOffer, false, false); + if (sdpAnswer == null) { throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Unable to generate SDP answer when subscribing '" + participant.getParticipantPublicId() + "' to '" + senderName + "'"); } CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server, - WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, finalSdpAnswer)); - sessionEventsHandler.onSubscribe(participant, session, finalSdpAnswer, transactionId, null); + WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); + sessionEventsHandler.onSubscribe(participant, session, sdpAnswer, transactionId, null); } - // END TODO - - // TODO: UNCOMMENT ON 2.18.0 -// CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client, -// WebrtcDebugEventOperation.subscribe, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); -// String remoteSdpAnswer = kParticipant.receiveMediaFrom(senderParticipant, sdpAnswer, false); -// sessionEventsHandler.onSubscribe(participant, session, transactionId, null); - // END TODO } catch (OpenViduException e) { log.error("PARTICIPANT {}: Error subscribing to {}", participant.getParticipantPublicId(), senderName, e); @@ -1178,184 +1174,127 @@ public class KurentoSessionManager extends SessionManager { return kParticipant; } - // TODO: REMOVE ON 2.18.0 @Override - public void reconnectStream2170(Participant participant, String streamId, String sdpOffer, Integer transactionId) { + public void reconnectPublisher(Participant participant, String streamId, String sdpString, Integer transactionId) { KurentoParticipant kParticipant = (KurentoParticipant) participant; KurentoSession kSession = kParticipant.getSession(); - boolean isPublisher = streamId.equals(participant.getPublisherStreamId()); + reconnectPublisher(kSession, kParticipant, streamId, sdpString, transactionId); + } + + @Override + public void reconnectSubscriber(Participant participant, String streamId, String sdpString, Integer transactionId, + boolean initByServer) { + KurentoParticipant kParticipant = (KurentoParticipant) participant; + KurentoSession kSession = kParticipant.getSession(); + reconnectSubscriber(kSession, kParticipant, streamId, sdpString, transactionId, initByServer); + } + + private String mungeSdpOffer(Session kSession, Participant participant, String sdpOffer, boolean isPublisher) { boolean isTranscodingAllowed = kSession.getSessionProperties().isTranscodingAllowed(); VideoCodec forcedVideoCodec = kSession.getSessionProperties().forcedVideoCodec(); - - boolean sdpOfferHasBeenMunged = false; - String originalSdpOffer = sdpOffer; - // Modify sdp if forced codec is defined if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) { - sdpOfferHasBeenMunged = true; - sdpOffer = sdpMunging.forceCodec(sdpOffer, participant, isPublisher, true, isTranscodingAllowed, + return sdpMunging.forceCodec(sdpOffer, participant, isPublisher, true, isTranscodingAllowed, forcedVideoCodec, false); } + return null; + } - if (isPublisher) { + private void reconnectPublisher(KurentoSession kSession, KurentoParticipant kParticipant, String streamId, + String sdpOffer, Integer transactionId) { - CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.client, - WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOffer, originalSdpOffer)); - if (sdpOfferHasBeenMunged) { - CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.client, - WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOfferMunged, sdpOffer)); - } + String sdpOfferMunged = mungeSdpOffer(kSession, kParticipant, sdpOffer, true); - // Reconnect publisher - final KurentoMediaOptions kurentoOptions = (KurentoMediaOptions) kParticipant.getPublisher() - .getMediaOptions(); + CDR.log(new WebrtcDebugEvent(kParticipant, streamId, WebrtcDebugEventIssuer.client, + WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOffer, sdpOffer)); + if (sdpOfferMunged != null) { + sdpOffer = sdpOfferMunged; + CDR.log(new WebrtcDebugEvent(kParticipant, streamId, WebrtcDebugEventIssuer.client, + WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOfferMunged, sdpOffer)); + } + // Reconnect publisher + final KurentoMediaOptions kurentoOptions = (KurentoMediaOptions) kParticipant.getPublisher().getMediaOptions(); + // 1) Disconnect broken PublisherEndpoint from its PassThrough + PublisherEndpoint publisher = kParticipant.getPublisher(); + final PassThrough passThru = publisher.disconnectFromPassThrough(); + // 2) Destroy the broken PublisherEndpoint and nothing else + publisher.cancelStatsLoop.set(true); + kParticipant.releaseElement(kParticipant.getParticipantPublicId(), publisher.getEndpoint()); + // 3) Create a new PublisherEndpoint connecting it to the previous PassThrough + kParticipant.resetPublisherEndpoint(kurentoOptions, passThru); + kParticipant.createPublishingEndpoint(kurentoOptions, streamId); + String sdpAnswer = kParticipant.publishToRoom(sdpOffer, kurentoOptions.doLoopback, true); + log.debug("SDP Answer for publishing reconnection PARTICIPANT {}: {}", kParticipant.getParticipantPublicId(), + sdpAnswer); + CDR.log(new WebrtcDebugEvent(kParticipant, streamId, WebrtcDebugEventIssuer.server, + WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); + sessionEventsHandler.onPublishMedia(kParticipant, kParticipant.getPublisherStreamId(), + kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer, + new HashSet(), transactionId, null); + } - // 1) Disconnect broken PublisherEndpoint from its PassThrough - PublisherEndpoint publisher = kParticipant.getPublisher(); - final PassThrough passThru = publisher.disconnectFromPassThrough(); + private void reconnectSubscriber(KurentoSession kSession, KurentoParticipant kParticipant, String streamId, + String sdpString, Integer transactionId, boolean initByServer) { - // 2) Destroy the broken PublisherEndpoint and nothing else - publisher.cancelStatsLoop.set(true); - kParticipant.releaseElement(participant.getParticipantPublicId(), publisher.getEndpoint()); + String senderPrivateId = kSession.getParticipantPrivateIdFromStreamId(streamId); + if (senderPrivateId != null) { - // 3) Create a new PublisherEndpoint connecting it to the previous PassThrough - kParticipant.resetPublisherEndpoint(kurentoOptions, passThru); - kParticipant.createPublishingEndpoint(kurentoOptions, streamId); - String sdpAnswer = kParticipant.publishToRoom(sdpOffer, kurentoOptions.doLoopback, true); - log.debug("SDP Answer for publishing reconnection PARTICIPANT {}: {}", participant.getParticipantPublicId(), - sdpAnswer); + KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId); + String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(sender); - CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.server, - WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); + if (initByServer) { - sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(), - kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer, - new HashSet(), transactionId, null); + // Server initiated negotiation - } else { + final String sdpAnswer = sdpString; - // Reconnect subscriber - String senderPrivateId = kSession.getParticipantPrivateIdFromStreamId(streamId); - if (senderPrivateId != null) { + CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.client, + WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); - KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId); - String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(sender); + kParticipant.receiveMedia(sender, sdpAnswer, true, true); - CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client, - WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpOffer, - originalSdpOffer)); - if (sdpOfferHasBeenMunged) { - CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client, + log.debug("SDP Answer for subscribing reconnection PARTICIPANT {}: {}", + kParticipant.getParticipantPublicId(), sdpAnswer); + + sessionEventsHandler.onSubscribe(kParticipant, kSession, sdpAnswer, transactionId, null); + + } else { + + // Client initiated negotiation + + String sdpOffer = sdpString; + + CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.client, + WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpOffer, sdpOffer)); + + String sdpOfferMunged = mungeSdpOffer(kSession, kParticipant, sdpOffer, false); + if (sdpOfferMunged != null) { + sdpOffer = sdpOfferMunged; + CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.client, WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpOfferMunged, sdpOffer)); } - String sdpAnswer = kParticipant.receiveMediaFrom2170(sender, sdpOffer, true); + kParticipant.cancelReceivingMedia(sender, null, true); + String sdpAnswer = kParticipant.receiveMedia(sender, sdpOffer, true, false); if (sdpAnswer == null) { throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Unable to generate SDP answer when reconnecting subscriber to '" + streamId + "'"); } log.debug("SDP Answer for subscribing reconnection PARTICIPANT {}: {}", - participant.getParticipantPublicId(), sdpAnswer); + kParticipant.getParticipantPublicId(), sdpAnswer); - CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.server, + CDR.log(new WebrtcDebugEvent(kParticipant, subscriberEndpointName, WebrtcDebugEventIssuer.server, WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); - sessionEventsHandler.onSubscribe(participant, kSession, sdpAnswer, transactionId, null); - } else { - throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, - "Stream '" + streamId + "' does not exist in Session '" + kSession.getSessionId() + "'"); + sessionEventsHandler.onSubscribe(kParticipant, kSession, sdpAnswer, transactionId, null); + } - } - } - // END TODO - - @Override - public void reconnectStream(Participant participant, String streamId, String sdpOfferOrAnswer, - Integer transactionId) { - KurentoParticipant kParticipant = (KurentoParticipant) participant; - KurentoSession kSession = kParticipant.getSession(); - boolean isPublisher = streamId.equals(participant.getPublisherStreamId()); - - if (isPublisher) { - - // Reconnect publisher - - String sdpOffer = sdpOfferOrAnswer; - - boolean isTranscodingAllowed = kSession.getSessionProperties().isTranscodingAllowed(); - VideoCodec forcedVideoCodec = kSession.getSessionProperties().forcedVideoCodec(); - - boolean sdpOfferHasBeenMunged = false; - final String originalSdpOffer = sdpOffer; - - // Modify sdp if forced codec is defined - if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) { - sdpOfferHasBeenMunged = true; - sdpOffer = sdpMunging.forceCodec(sdpOffer, participant, isPublisher, true, isTranscodingAllowed, - forcedVideoCodec, false); - } - - CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.client, - WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOffer, originalSdpOffer)); - if (sdpOfferHasBeenMunged) { - CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.client, - WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpOfferMunged, sdpOffer)); - } - - // Reconnect publisher - final KurentoMediaOptions kurentoOptions = (KurentoMediaOptions) kParticipant.getPublisher() - .getMediaOptions(); - - // 1) Disconnect broken PublisherEndpoint from its PassThrough - PublisherEndpoint publisher = kParticipant.getPublisher(); - final PassThrough passThru = publisher.disconnectFromPassThrough(); - - // 2) Destroy the broken PublisherEndpoint and nothing else - publisher.cancelStatsLoop.set(true); - kParticipant.releaseElement(participant.getParticipantPublicId(), publisher.getEndpoint()); - - // 3) Create a new PublisherEndpoint connecting it to the previous PassThrough - kParticipant.resetPublisherEndpoint(kurentoOptions, passThru); - kParticipant.createPublishingEndpoint(kurentoOptions, streamId); - String sdpAnswer = kParticipant.publishToRoom(sdpOffer, kurentoOptions.doLoopback, true); - log.debug("SDP Answer for publishing reconnection PARTICIPANT {}: {}", participant.getParticipantPublicId(), - sdpAnswer); - - CDR.log(new WebrtcDebugEvent(participant, streamId, WebrtcDebugEventIssuer.server, - WebrtcDebugEventOperation.reconnectPublisher, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); - - sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(), - kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer, - new HashSet(), transactionId, null); } else { - - // Reconnect subscriber - - final String sdpAnswer = sdpOfferOrAnswer; - - String senderPrivateId = kSession.getParticipantPrivateIdFromStreamId(streamId); - if (senderPrivateId != null) { - - KurentoParticipant sender = (KurentoParticipant) kSession.getParticipantByPrivateId(senderPrivateId); - String subscriberEndpointName = kParticipant.calculateSubscriberEndpointName(sender); - - CDR.log(new WebrtcDebugEvent(participant, subscriberEndpointName, WebrtcDebugEventIssuer.client, - WebrtcDebugEventOperation.reconnectSubscriber, WebrtcDebugEventType.sdpAnswer, sdpAnswer)); - - kParticipant.receiveMediaFrom2180(sender, sdpAnswer, true); - - log.debug("SDP Answer for subscribing reconnection PARTICIPANT {}: {}", - participant.getParticipantPublicId(), sdpAnswer); - - sessionEventsHandler.onSubscribe(participant, kSession, sdpAnswer, transactionId, null); - - } else { - throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, - "Stream '" + streamId + "' does not exist in Session '" + kSession.getSessionId() + "'"); - } + throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, + "Stream '" + streamId + "' does not exist in Session '" + kSession.getSessionId() + "'"); } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java index 1a159ada..7a0b9f61 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java @@ -62,26 +62,20 @@ public class SubscriberEndpoint extends MediaEndpoint { return sdpOffer; } - public synchronized String subscribe(String sdpAnswer, PublisherEndpoint publisher) { - // TODO: REMOVE ON 2.18.0 - if (this.createdAt == null) { - // 2.17.0 + public synchronized String subscribe(String sdpString, PublisherEndpoint publisher) { + if (this.publisherStreamId == null) { + // Client initiated negotiation registerOnIceCandidateEventListener(publisher.getOwner().getParticipantPublicId()); this.createdAt = System.currentTimeMillis(); - String realSdpAnswer = processOffer(sdpAnswer); + String realSdpAnswer = processOffer(sdpString); gatherCandidates(); publisher.connect(this.getEndpoint(), false); this.publisherStreamId = publisher.getStreamId(); return realSdpAnswer; } else { - // 2.18.0 - return processAnswer(sdpAnswer); + // Server initiated negotiation + return processAnswer(sdpString); } - // END TODO - - // TODO: UNCOMMENT ON 2.18.0 - // processAnswer(sdpAnswer); - // END TODO } @Override diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java index 09f143dc..faa3a81d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java @@ -358,18 +358,7 @@ public class RpcHandler extends DefaultJsonRpcHandler { String senderStreamId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM); String senderPublicId = parseSenderPublicIdFromStreamId(senderStreamId); - boolean reconnect = false; - - // TODO: REMOVE ON 2.18.0 - if (request.getParams().has(ProtocolElements.PREPARERECEIVEVIDEO_RECONNECT_PARAM)) { - reconnect = getBooleanParam(request, ProtocolElements.PREPARERECEIVEVIDEO_RECONNECT_PARAM); - } - // END TODO - - // TODO: UNCOMMENT ON 2.18.0 - // boolean reconnect = getBooleanParam(request, - // ProtocolElements.PREPARERECEIVEVIDEO_RECONNECT_PARAM); - // END TODO + boolean reconnect = getBooleanParam(request, ProtocolElements.PREPARERECEIVEVIDEO_RECONNECT_PARAM); sessionManager.prepareSubscription(participant, senderPublicId, reconnect, request.getId()); } @@ -385,28 +374,15 @@ public class RpcHandler extends DefaultJsonRpcHandler { String senderStreamId = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SENDER_PARAM); String senderPublicId = parseSenderPublicIdFromStreamId(senderStreamId); - // TODO: REMOVE ON 2.18.0 if (request.getParams().has(ProtocolElements.RECEIVEVIDEO_SDPOFFER_PARAM)) { - // 2.17.0: initiative held by browser when subscribing - // The request comes with an SDPOffer + // Client initiated negotiation (comes with SDP Offer) String sdpOffer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPOFFER_PARAM); sessionManager.subscribe(participant, senderPublicId, sdpOffer, request.getId(), false); } else if (request.getParams().has(ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM)) { - // 2.18.0: initiative held by server when subscribing - // This is the final call after prepareReceiveVidoFrom, comes with SDPAnswer + // Server initiated negotiation (comes with SDP Answer) String sdpAnswer = getStringParam(request, ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM); sessionManager.subscribe(participant, senderPublicId, sdpAnswer, request.getId(), true); } - // END TODO - - // TODO: UNCOMMENT ON 2.18.0 - /* - * String sdpAnswer = getStringParam(request, - * ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM); - * sessionManager.subscribe(participant, senderPublicId, sdpAnswer, - * request.getId()); - */ - // END TODO } private void unsubscribeFromVideo(RpcConnection rpcConnection, Request request) { @@ -670,40 +646,28 @@ public class RpcHandler extends DefaultJsonRpcHandler { } catch (OpenViduException e) { return; } + String streamId = getStringParam(request, ProtocolElements.RECONNECTSTREAM_STREAM_PARAM); + boolean isPublisher = streamId.equals(participant.getPublisherStreamId()); - // TODO: REMOVE ON 2.18.0 + String sdpString = null; if (request.getParams().has(ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM)) { - // 2.17.0 - try { - String sdpOffer = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM); - sessionManager.reconnectStream(participant, streamId, sdpOffer, request.getId()); - } catch (OpenViduException e) { - this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(), - new JsonObject(), e); - } + sdpString = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPOFFER_PARAM); } else if (request.getParams().has(ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM)) { - // 2.18.0 - String sdpString = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM); - try { - sessionManager.reconnectStream(participant, streamId, sdpString, request.getId()); - } catch (OpenViduException e) { - this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(), - new JsonObject(), e); - } + sdpString = getStringParam(request, ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM); } - // END TODO - // TODO: UNCOMMENT ON 2.18.0 - /* - * String sdpString = getStringParam(request, - * ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM); try { - * sessionManager.reconnectStream(participant, streamId, sdpString, - * request.getId()); } catch (OpenViduException e) { - * this.notificationService.sendErrorResponse(participant. - * getParticipantPrivateId(), request.getId(), new JsonObject(), e); } - */ - // END TODO + try { + if (isPublisher) { + sessionManager.reconnectPublisher(participant, streamId, sdpString, request.getId()); + } else { + boolean initByServer = request.getParams().has(ProtocolElements.RECONNECTSTREAM_SDPSTRING_PARAM); + sessionManager.reconnectSubscriber(participant, streamId, sdpString, request.getId(), initByServer); + } + } catch (OpenViduException e) { + this.notificationService.sendErrorResponse(participant.getParticipantPrivateId(), request.getId(), + new JsonObject(), e); + } } private void updateVideoData(RpcConnection rpcConnection, Request request) {