diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index c8e66519..7fedf892 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; import java.util.function.Function; import org.apache.commons.lang3.RandomStringUtils; @@ -90,15 +91,6 @@ public class KurentoParticipant extends Participant { this.publisher = new PublisherEndpoint(endpointType, this, participant.getParticipantPublicId(), this.session.getPipeline(), this.openviduConfig, null); } - - for (Participant other : session.getParticipants()) { - if (!other.getParticipantPublicId().equals(this.getParticipantPublicId()) - && !OpenViduRole.SUBSCRIBER.equals(other.getToken().getRole())) { - // Initialize a SubscriberEndpoint for each other user connected with PUBLISHER - // or MODERATOR role - getNewOrExistingSubscriber(other.getParticipantPublicId()); - } - } } public void createPublishingEndpoint(MediaOptions mediaOptions, String streamId) { @@ -226,92 +218,126 @@ public class KurentoParticipant extends Participant { KurentoParticipant kSender = (KurentoParticipant) sender; - if (kSender.getPublisher() == null) { - log.warn("PARTICIPANT {}: Trying to connect to a user without a publishing endpoint", - this.getParticipantPublicId()); - return null; - } + if (kSender.streaming && kSender.getPublisher() != null + && kSender.getPublisher().closingLock.readLock().tryLock()) { - log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(), - senderName); - - SubscriberEndpoint subscriber = getNewOrExistingSubscriber(senderName); - - try { - CountDownLatch subscriberLatch = new CountDownLatch(1); - Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch); try { - if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { - throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, - "Timeout reached when creating subscriber endpoint"); + log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(), + senderName); + + SubscriberEndpoint subscriber = getNewOrExistingSubscriber(senderName); + + try { + CountDownLatch subscriberLatch = new CountDownLatch(1); + Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch); + + try { + if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { + throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, + "Timeout reached when creating subscriber endpoint"); + } + } catch (InterruptedException e) { + throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, + "Interrupted when creating subscriber endpoint: " + e.getMessage()); + } + if (oldMediaEndpoint != null) { + log.warn( + "PARTICIPANT {}: Two threads are trying to create at " + + "the same time a subscriber endpoint for user {}", + this.getParticipantPublicId(), senderName); + return null; + } + if (subscriber.getEndpoint() == null) { + throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, + "Unable to create subscriber endpoint"); + } + + String subscriberEndpointName = this.getParticipantPublicId() + "_" + + kSender.getPublisherStreamId(); + + subscriber.setEndpointName(subscriberEndpointName); + subscriber.getEndpoint().setName(subscriberEndpointName); + subscriber.setStreamId(kSender.getPublisherStreamId()); + + endpointConfig.addEndpointListeners(subscriber, "subscriber"); + + } catch (OpenViduException e) { + this.subscribers.remove(senderName); + throw e; } - } catch (InterruptedException e) { - throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, - "Interrupted when creating subscriber endpoint: " + e.getMessage()); + + log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(), + senderName); + try { + String sdpAnswer = subscriber.subscribe(sdpOffer, kSender.getPublisher()); + log.trace("PARTICIPANT {}: Subscribing SdpAnswer is {}", this.getParticipantPublicId(), sdpAnswer); + log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(), + senderName, this.session.getSessionId()); + + if (!silent + && !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { + endpointConfig.getCdr().recordNewSubscriber(this, this.session.getSessionId(), + sender.getPublisherStreamId(), sender.getParticipantPublicId(), subscriber.createdAt()); + } + + return sdpAnswer; + } catch (KurentoServerException e) { + // TODO Check object status when KurentoClient sets this info in the object + if (e.getCode() == 40101) { + log.warn( + "Publisher endpoint was already released when trying to connect a subscriber endpoint to it", + e); + } else { + log.error("Exception connecting subscriber endpoint to publisher endpoint", e); + } + this.subscribers.remove(senderName); + releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false); + return null; + } + } finally { + kSender.getPublisher().closingLock.readLock().unlock(); } - if (oldMediaEndpoint != null) { - log.warn( - "PARTICIPANT {}: Two threads are trying to create at " - + "the same time a subscriber endpoint for user {}", - this.getParticipantPublicId(), senderName); - return null; - } - if (subscriber.getEndpoint() == null) { - throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint"); - } - - String subscriberEndpointName = this.getParticipantPublicId() + "_" + kSender.getPublisherStreamId(); - - subscriber.setEndpointName(subscriberEndpointName); - subscriber.getEndpoint().setName(subscriberEndpointName); - subscriber.setStreamId(kSender.getPublisherStreamId()); - - endpointConfig.addEndpointListeners(subscriber, "subscriber"); - - } catch (OpenViduException e) { - this.subscribers.remove(senderName); - throw e; + } else { + log.error( + "PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ", + senderName, sender.getSessionId(), this.participantPublicId); + throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, + "Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName + + "is closed"); } - - log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(), senderName); - try { - String sdpAnswer = subscriber.subscribe(sdpOffer, kSender.getPublisher()); - log.trace("PARTICIPANT {}: Subscribing SdpAnswer is {}", this.getParticipantPublicId(), sdpAnswer); - log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(), - senderName, this.session.getSessionId()); - - if (!silent && !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { - endpointConfig.getCdr().recordNewSubscriber(this, this.session.getSessionId(), - sender.getPublisherStreamId(), sender.getParticipantPublicId(), subscriber.createdAt()); - } - - return sdpAnswer; - } catch (KurentoServerException e) { - // TODO Check object status when KurentoClient sets this info in the object - if (e.getCode() == 40101) { - log.warn("Publisher endpoint was already released when trying " - + "to connect a subscriber endpoint to it", e); - } else { - log.error("Exception connecting subscriber endpoint " + "to publisher endpoint", e); - } - this.subscribers.remove(senderName); - releaseSubscriberEndpoint((KurentoParticipant) sender, subscriber, null, false); - } - return null; } public void cancelReceivingMedia(KurentoParticipant senderKurentoParticipant, EndReason reason, boolean silent) { final String senderName = senderKurentoParticipant.getParticipantPublicId(); - - log.info("PARTICIPANT {}: cancel receiving media from {}", this.getParticipantPublicId(), senderName); - SubscriberEndpoint subscriberEndpoint = subscribers.remove(senderName); - if (subscriberEndpoint == null || subscriberEndpoint.getEndpoint() == null) { - log.warn("PARTICIPANT {}: Trying to cancel receiving video from user {}. " - + "But there is no such subscriber endpoint.", this.getParticipantPublicId(), senderName); - } else { - releaseSubscriberEndpoint(senderKurentoParticipant, subscriberEndpoint, reason, silent); - log.info("PARTICIPANT {}: stopped receiving media from {} in room {}", this.getParticipantPublicId(), - senderName, this.session.getSessionId()); + final PublisherEndpoint pub = senderKurentoParticipant.publisher; + if (pub != null) { + try { + if (pub.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + try { + log.info("PARTICIPANT {}: cancel receiving media from {}", this.getParticipantPublicId(), + senderName); + SubscriberEndpoint subscriberEndpoint = subscribers.remove(senderName); + if (subscriberEndpoint == null) { + log.warn( + "PARTICIPANT {}: Trying to cancel receiving video from user {}. " + + "But there is no such subscriber endpoint.", + this.getParticipantPublicId(), senderName); + } else { + releaseSubscriberEndpoint(senderName, senderKurentoParticipant, subscriberEndpoint, reason, + silent); + log.info("PARTICIPANT {}: stopped receiving media from {} in room {}", + this.getParticipantPublicId(), senderName, this.session.getSessionId()); + } + } finally { + pub.closingLock.writeLock().unlock(); + } + } + } catch (InterruptedException e) { + subscribers.remove(senderName); + log.error( + "Timeout wating for PublisherEndpoint closing lock of participant {} to be available for participant {} to call cancelReceveivingMedia", + senderName, this.getParticipantPublicId()); + } } } @@ -330,7 +356,7 @@ public class KurentoParticipant extends Participant { it.remove(); if (subscriber != null && subscriber.getEndpoint() != null) { - releaseSubscriberEndpoint( + releaseSubscriberEndpoint(remoteParticipantName, (KurentoParticipant) this.session.getParticipantByPublicId(remoteParticipantName), subscriber, reason, false); log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(), @@ -367,7 +393,6 @@ public class KurentoParticipant extends Participant { log.debug("PARTICIPANT {}: New subscriber endpoint to user {}", this.getParticipantPublicId(), senderPublicId); } - return subscriberEndpoint; } @@ -391,52 +416,71 @@ public class KurentoParticipant extends Participant { private void releasePublisherEndpoint(EndReason reason, long kmsDisconnectionTime) { if (publisher != null && publisher.getEndpoint() != null) { - - // Remove streamId from publisher's map - this.session.publishedStreamIds.remove(this.getPublisherStreamId()); - - if (this.openviduConfig.isRecordingModuleEnabled() - && this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) { - this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId(), - kmsDisconnectionTime); + final ReadWriteLock closingLock = publisher.closingLock; + try { + if (closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + try { + this.releasePublisherEndpointAux(reason, kmsDisconnectionTime); + } finally { + closingLock.writeLock().unlock(); + } + } + } catch (InterruptedException e) { + log.error( + "Timeout wating for PublisherEndpoint closing lock of participant {} to be available to call releasePublisherEndpoint", + this.participantPublicId, this.getParticipantPublicId()); + log.error("Forcing PublisherEndpoint release. Possibly some session event will be incomplete"); + this.releasePublisherEndpointAux(reason, kmsDisconnectionTime); } - - publisher.unregisterErrorListeners(); - publisher.cancelStatsLoop.set(true); - - for (MediaElement el : publisher.getMediaElements()) { - releaseElement(getParticipantPublicId(), el); - } - releaseElement(getParticipantPublicId(), publisher.getEndpoint()); - this.streaming = false; - this.session.deregisterPublisher(); - - endpointConfig.getCdr().stopPublisher(this.getParticipantPublicId(), publisher.getStreamId(), reason); - publisher = null; - } else { log.warn("PARTICIPANT {}: Trying to release publisher endpoint but is null", getParticipantPublicId()); } } - private void releaseSubscriberEndpoint(KurentoParticipant senderKurentoParticipant, SubscriberEndpoint subscriber, - EndReason reason, boolean silent) { - final String senderName = senderKurentoParticipant.getParticipantPublicId(); + private void releasePublisherEndpointAux(EndReason reason, long kmsDisconnectionTime) { + // Remove streamId from publisher's map + this.session.publishedStreamIds.remove(this.getPublisherStreamId()); + + if (this.openviduConfig.isRecordingModuleEnabled() + && this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) { + this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId(), + kmsDisconnectionTime); + } + + publisher.unregisterErrorListeners(); + publisher.cancelStatsLoop.set(true); + + for (MediaElement el : publisher.getMediaElements()) { + releaseElement(getParticipantPublicId(), el); + } + releaseElement(getParticipantPublicId(), publisher.getEndpoint()); + this.streaming = false; + this.session.deregisterPublisher(); + + endpointConfig.getCdr().stopPublisher(this.getParticipantPublicId(), publisher.getStreamId(), reason); + publisher = null; + } + + private void releaseSubscriberEndpoint(String senderName, KurentoParticipant publisherParticipant, + SubscriberEndpoint subscriber, EndReason reason, boolean silent) { + if (subscriber != null) { subscriber.unregisterErrorListeners(); subscriber.cancelStatsLoop.set(true); - releaseElement(senderName, subscriber.getEndpoint()); + if (subscriber.getEndpoint() != null) { + releaseElement(senderName, subscriber.getEndpoint()); + } if (!silent) { // Stop PlayerEndpoint of IP CAM if last subscriber disconnected - final PublisherEndpoint senderPublisher = senderKurentoParticipant.publisher; - if (senderPublisher != null) { + if (publisherParticipant != null && publisherParticipant.publisher != null) { + final PublisherEndpoint senderPublisher = publisherParticipant.publisher; // If no PublisherEndpoint, then it means that the publisher already closed it final KurentoMediaOptions options = (KurentoMediaOptions) senderPublisher.getMediaOptions(); - if (options.onlyPlayWithSubscribers != null && options.onlyPlayWithSubscribers) { + if (options != null && options.onlyPlayWithSubscribers != null && options.onlyPlayWithSubscribers) { synchronized (senderPublisher) { senderPublisher.numberOfSubscribers--; if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index 22845b1b..e0ad8cb4 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -89,15 +89,6 @@ public class KurentoSession extends Session { public void newPublisher(Participant participant) { registerPublisher(); - - // pre-load endpoints to recv video from the new publisher - for (Participant p : participants.values()) { - if (participant.equals(p)) { - continue; - } - ((KurentoParticipant) p).getNewOrExistingSubscriber(participant.getParticipantPublicId()); - } - log.debug("SESSION {}: Virtually subscribed other participants {} to new publisher {}", sessionId, participants.values(), participant.getParticipantPublicId()); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index 1dd07dc7..d2b0d748 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -26,6 +26,8 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.kurento.client.Continuation; import org.kurento.client.GenericMediaElement; @@ -76,6 +78,16 @@ public class PublisherEndpoint extends MediaEndpoint { public int numberOfSubscribers = 0; + /** + * This lock protects the following method with read lock: + * KurentoParticipant#receiveMediaFrom. It uses tryLock, immediately failing if + * written locked + * + * Lock is written-locked upon KurentoParticipant#releasePublisherEndpoint and + * KurentoParticipant#cancelReceivingMedia + */ + public ReadWriteLock closingLock = new ReentrantReadWriteLock(); + public PublisherEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName, MediaPipeline pipeline, OpenviduConfig openviduConfig, PassThrough passThru) { super(endpointType, owner, endpointName, pipeline, openviduConfig, log);