openvidu-server: concurrent protection at PublisherEndpoint level for methods KurentoParticipant#cancelReceivingMedia, KurentoParticipant#releasePublisherEndpoint, KurentoParticipant#receiveMediaFrom

pull/431/head
pabloFuente 2020-04-08 15:38:07 +02:00
parent 0d2fcfbff7
commit 4a77cb2650
3 changed files with 173 additions and 126 deletions

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.function.Function; import java.util.function.Function;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
@ -90,15 +91,6 @@ public class KurentoParticipant extends Participant {
this.publisher = new PublisherEndpoint(endpointType, this, participant.getParticipantPublicId(), this.publisher = new PublisherEndpoint(endpointType, this, participant.getParticipantPublicId(),
this.session.getPipeline(), this.openviduConfig, null); this.session.getPipeline(), this.openviduConfig, null);
} }
for (Participant other : session.getParticipants()) {
if (!other.getParticipantPublicId().equals(this.getParticipantPublicId())
&& !OpenViduRole.SUBSCRIBER.equals(other.getToken().getRole())) {
// Initialize a SubscriberEndpoint for each other user connected with PUBLISHER
// or MODERATOR role
getNewOrExistingSubscriber(other.getParticipantPublicId());
}
}
} }
public void createPublishingEndpoint(MediaOptions mediaOptions, String streamId) { public void createPublishingEndpoint(MediaOptions mediaOptions, String streamId) {
@ -226,12 +218,10 @@ public class KurentoParticipant extends Participant {
KurentoParticipant kSender = (KurentoParticipant) sender; KurentoParticipant kSender = (KurentoParticipant) sender;
if (kSender.getPublisher() == null) { if (kSender.streaming && kSender.getPublisher() != null
log.warn("PARTICIPANT {}: Trying to connect to a user without a publishing endpoint", && kSender.getPublisher().closingLock.readLock().tryLock()) {
this.getParticipantPublicId());
return null;
}
try {
log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(), log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(),
senderName); senderName);
@ -240,6 +230,7 @@ public class KurentoParticipant extends Participant {
try { try {
CountDownLatch subscriberLatch = new CountDownLatch(1); CountDownLatch subscriberLatch = new CountDownLatch(1);
Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch); Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch);
try { try {
if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
@ -257,10 +248,12 @@ public class KurentoParticipant extends Participant {
return null; return null;
} }
if (subscriber.getEndpoint() == null) { if (subscriber.getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint"); throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Unable to create subscriber endpoint");
} }
String subscriberEndpointName = this.getParticipantPublicId() + "_" + kSender.getPublisherStreamId(); String subscriberEndpointName = this.getParticipantPublicId() + "_"
+ kSender.getPublisherStreamId();
subscriber.setEndpointName(subscriberEndpointName); subscriber.setEndpointName(subscriberEndpointName);
subscriber.getEndpoint().setName(subscriberEndpointName); subscriber.getEndpoint().setName(subscriberEndpointName);
@ -273,14 +266,16 @@ public class KurentoParticipant extends Participant {
throw e; throw e;
} }
log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(), senderName); log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(),
senderName);
try { try {
String sdpAnswer = subscriber.subscribe(sdpOffer, kSender.getPublisher()); String sdpAnswer = subscriber.subscribe(sdpOffer, kSender.getPublisher());
log.trace("PARTICIPANT {}: Subscribing SdpAnswer is {}", this.getParticipantPublicId(), sdpAnswer); log.trace("PARTICIPANT {}: Subscribing SdpAnswer is {}", this.getParticipantPublicId(), sdpAnswer);
log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(), log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(),
senderName, this.session.getSessionId()); senderName, this.session.getSessionId());
if (!silent && !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { if (!silent
&& !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) {
endpointConfig.getCdr().recordNewSubscriber(this, this.session.getSessionId(), endpointConfig.getCdr().recordNewSubscriber(this, this.session.getSessionId(),
sender.getPublisherStreamId(), sender.getParticipantPublicId(), subscriber.createdAt()); sender.getPublisherStreamId(), sender.getParticipantPublicId(), subscriber.createdAt());
} }
@ -289,29 +284,60 @@ public class KurentoParticipant extends Participant {
} catch (KurentoServerException e) { } catch (KurentoServerException e) {
// TODO Check object status when KurentoClient sets this info in the object // TODO Check object status when KurentoClient sets this info in the object
if (e.getCode() == 40101) { if (e.getCode() == 40101) {
log.warn("Publisher endpoint was already released when trying " log.warn(
+ "to connect a subscriber endpoint to it", e); "Publisher endpoint was already released when trying to connect a subscriber endpoint to it",
e);
} else { } else {
log.error("Exception connecting subscriber endpoint " + "to publisher endpoint", e); log.error("Exception connecting subscriber endpoint to publisher endpoint", e);
} }
this.subscribers.remove(senderName); this.subscribers.remove(senderName);
releaseSubscriberEndpoint((KurentoParticipant) sender, subscriber, null, false); releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false);
}
return null; return null;
} }
} finally {
kSender.getPublisher().closingLock.readLock().unlock();
}
} else {
log.error(
"PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ",
senderName, sender.getSessionId(), this.participantPublicId);
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName
+ "is closed");
}
}
public void cancelReceivingMedia(KurentoParticipant senderKurentoParticipant, EndReason reason, boolean silent) { public void cancelReceivingMedia(KurentoParticipant senderKurentoParticipant, EndReason reason, boolean silent) {
final String senderName = senderKurentoParticipant.getParticipantPublicId(); final String senderName = senderKurentoParticipant.getParticipantPublicId();
final PublisherEndpoint pub = senderKurentoParticipant.publisher;
log.info("PARTICIPANT {}: cancel receiving media from {}", this.getParticipantPublicId(), senderName); if (pub != null) {
try {
if (pub.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
log.info("PARTICIPANT {}: cancel receiving media from {}", this.getParticipantPublicId(),
senderName);
SubscriberEndpoint subscriberEndpoint = subscribers.remove(senderName); SubscriberEndpoint subscriberEndpoint = subscribers.remove(senderName);
if (subscriberEndpoint == null || subscriberEndpoint.getEndpoint() == null) { if (subscriberEndpoint == null) {
log.warn("PARTICIPANT {}: Trying to cancel receiving video from user {}. " log.warn(
+ "But there is no such subscriber endpoint.", this.getParticipantPublicId(), senderName); "PARTICIPANT {}: Trying to cancel receiving video from user {}. "
+ "But there is no such subscriber endpoint.",
this.getParticipantPublicId(), senderName);
} else { } else {
releaseSubscriberEndpoint(senderKurentoParticipant, subscriberEndpoint, reason, silent); releaseSubscriberEndpoint(senderName, senderKurentoParticipant, subscriberEndpoint, reason,
log.info("PARTICIPANT {}: stopped receiving media from {} in room {}", this.getParticipantPublicId(), silent);
senderName, this.session.getSessionId()); log.info("PARTICIPANT {}: stopped receiving media from {} in room {}",
this.getParticipantPublicId(), senderName, this.session.getSessionId());
}
} finally {
pub.closingLock.writeLock().unlock();
}
}
} catch (InterruptedException e) {
subscribers.remove(senderName);
log.error(
"Timeout wating for PublisherEndpoint closing lock of participant {} to be available for participant {} to call cancelReceveivingMedia",
senderName, this.getParticipantPublicId());
}
} }
} }
@ -330,7 +356,7 @@ public class KurentoParticipant extends Participant {
it.remove(); it.remove();
if (subscriber != null && subscriber.getEndpoint() != null) { if (subscriber != null && subscriber.getEndpoint() != null) {
releaseSubscriberEndpoint( releaseSubscriberEndpoint(remoteParticipantName,
(KurentoParticipant) this.session.getParticipantByPublicId(remoteParticipantName), subscriber, (KurentoParticipant) this.session.getParticipantByPublicId(remoteParticipantName), subscriber,
reason, false); reason, false);
log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(), log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(),
@ -367,7 +393,6 @@ public class KurentoParticipant extends Participant {
log.debug("PARTICIPANT {}: New subscriber endpoint to user {}", this.getParticipantPublicId(), log.debug("PARTICIPANT {}: New subscriber endpoint to user {}", this.getParticipantPublicId(),
senderPublicId); senderPublicId);
} }
return subscriberEndpoint; return subscriberEndpoint;
} }
@ -391,7 +416,28 @@ public class KurentoParticipant extends Participant {
private void releasePublisherEndpoint(EndReason reason, long kmsDisconnectionTime) { private void releasePublisherEndpoint(EndReason reason, long kmsDisconnectionTime) {
if (publisher != null && publisher.getEndpoint() != null) { if (publisher != null && publisher.getEndpoint() != null) {
final ReadWriteLock closingLock = publisher.closingLock;
try {
if (closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
this.releasePublisherEndpointAux(reason, kmsDisconnectionTime);
} finally {
closingLock.writeLock().unlock();
}
}
} catch (InterruptedException e) {
log.error(
"Timeout wating for PublisherEndpoint closing lock of participant {} to be available to call releasePublisherEndpoint",
this.participantPublicId, this.getParticipantPublicId());
log.error("Forcing PublisherEndpoint release. Possibly some session event will be incomplete");
this.releasePublisherEndpointAux(reason, kmsDisconnectionTime);
}
} else {
log.warn("PARTICIPANT {}: Trying to release publisher endpoint but is null", getParticipantPublicId());
}
}
private void releasePublisherEndpointAux(EndReason reason, long kmsDisconnectionTime) {
// Remove streamId from publisher's map // Remove streamId from publisher's map
this.session.publishedStreamIds.remove(this.getPublisherStreamId()); this.session.publishedStreamIds.remove(this.getPublisherStreamId());
@ -413,30 +459,28 @@ public class KurentoParticipant extends Participant {
endpointConfig.getCdr().stopPublisher(this.getParticipantPublicId(), publisher.getStreamId(), reason); endpointConfig.getCdr().stopPublisher(this.getParticipantPublicId(), publisher.getStreamId(), reason);
publisher = null; publisher = null;
} else {
log.warn("PARTICIPANT {}: Trying to release publisher endpoint but is null", getParticipantPublicId());
}
} }
private void releaseSubscriberEndpoint(KurentoParticipant senderKurentoParticipant, SubscriberEndpoint subscriber, private void releaseSubscriberEndpoint(String senderName, KurentoParticipant publisherParticipant,
EndReason reason, boolean silent) { SubscriberEndpoint subscriber, EndReason reason, boolean silent) {
final String senderName = senderKurentoParticipant.getParticipantPublicId();
if (subscriber != null) { if (subscriber != null) {
subscriber.unregisterErrorListeners(); subscriber.unregisterErrorListeners();
subscriber.cancelStatsLoop.set(true); subscriber.cancelStatsLoop.set(true);
if (subscriber.getEndpoint() != null) {
releaseElement(senderName, subscriber.getEndpoint()); releaseElement(senderName, subscriber.getEndpoint());
}
if (!silent) { if (!silent) {
// Stop PlayerEndpoint of IP CAM if last subscriber disconnected // Stop PlayerEndpoint of IP CAM if last subscriber disconnected
final PublisherEndpoint senderPublisher = senderKurentoParticipant.publisher; if (publisherParticipant != null && publisherParticipant.publisher != null) {
if (senderPublisher != null) { final PublisherEndpoint senderPublisher = publisherParticipant.publisher;
// If no PublisherEndpoint, then it means that the publisher already closed it // If no PublisherEndpoint, then it means that the publisher already closed it
final KurentoMediaOptions options = (KurentoMediaOptions) senderPublisher.getMediaOptions(); final KurentoMediaOptions options = (KurentoMediaOptions) senderPublisher.getMediaOptions();
if (options.onlyPlayWithSubscribers != null && options.onlyPlayWithSubscribers) { if (options != null && options.onlyPlayWithSubscribers != null && options.onlyPlayWithSubscribers) {
synchronized (senderPublisher) { synchronized (senderPublisher) {
senderPublisher.numberOfSubscribers--; senderPublisher.numberOfSubscribers--;
if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) { if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) {

View File

@ -89,15 +89,6 @@ public class KurentoSession extends Session {
public void newPublisher(Participant participant) { public void newPublisher(Participant participant) {
registerPublisher(); registerPublisher();
// pre-load endpoints to recv video from the new publisher
for (Participant p : participants.values()) {
if (participant.equals(p)) {
continue;
}
((KurentoParticipant) p).getNewOrExistingSubscriber(participant.getParticipantPublicId());
}
log.debug("SESSION {}: Virtually subscribed other participants {} to new publisher {}", sessionId, log.debug("SESSION {}: Virtually subscribed other participants {} to new publisher {}", sessionId,
participants.values(), participant.getParticipantPublicId()); participants.values(), participant.getParticipantPublicId());
} }

View File

@ -26,6 +26,8 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.kurento.client.Continuation; import org.kurento.client.Continuation;
import org.kurento.client.GenericMediaElement; import org.kurento.client.GenericMediaElement;
@ -76,6 +78,16 @@ public class PublisherEndpoint extends MediaEndpoint {
public int numberOfSubscribers = 0; public int numberOfSubscribers = 0;
/**
* This lock protects the following method with read lock:
* KurentoParticipant#receiveMediaFrom. It uses tryLock, immediately failing if
* written locked
*
* Lock is written-locked upon KurentoParticipant#releasePublisherEndpoint and
* KurentoParticipant#cancelReceivingMedia
*/
public ReadWriteLock closingLock = new ReentrantReadWriteLock();
public PublisherEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName, public PublisherEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName,
MediaPipeline pipeline, OpenviduConfig openviduConfig, PassThrough passThru) { MediaPipeline pipeline, OpenviduConfig openviduConfig, PassThrough passThru) {
super(endpointType, owner, endpointName, pipeline, openviduConfig, log); super(endpointType, owner, endpointName, pipeline, openviduConfig, log);