openvidu-server: hidden NullPointer bug fix when closing Subscriber endpoints after KMS reconnection (associated PublisherEndpoint possibly null)

pull/370/head
pabloFuente 2019-11-15 16:28:12 +01:00
parent 1f7f33bd65
commit 4d5a023b5c
2 changed files with 38 additions and 32 deletions

View File

@ -103,9 +103,8 @@ public class KurentoParticipant extends Participant {
final String publisherStreamId = this.getParticipantPublicId() + "_" final String publisherStreamId = this.getParticipantPublicId() + "_"
+ (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_" + (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_"
+ RandomStringUtils.random(5, true, false).toUpperCase(); + RandomStringUtils.random(5, true, false).toUpperCase();
this.publisher.setStreamId(publisherStreamId); publisher.setStreamId(publisherStreamId);
this.publisher.setEndpointName(publisherStreamId); publisher.setEndpointName(publisherStreamId);
publisher.setMediaOptions(mediaOptions); publisher.setMediaOptions(mediaOptions);
publisher.createEndpoint(publisherLatch); publisher.createEndpoint(publisherLatch);
if (getPublisher().getEndpoint() == null) { if (getPublisher().getEndpoint() == null) {
@ -218,7 +217,7 @@ public class KurentoParticipant extends Participant {
KurentoParticipant kSender = (KurentoParticipant) sender; KurentoParticipant kSender = (KurentoParticipant) sender;
if (kSender.getPublisher() == null) { if (kSender.getPublisher() == null) {
log.warn("PARTICIPANT {}: Trying to connect to a user without " + "a publishing endpoint", log.warn("PARTICIPANT {}: Trying to connect to a user without a publishing endpoint",
this.getParticipantPublicId()); this.getParticipantPublicId());
return null; return null;
} }
@ -423,18 +422,21 @@ public class KurentoParticipant extends Participant {
// Stop PlayerEndpoint of IP CAM if last subscriber disconnected // Stop PlayerEndpoint of IP CAM if last subscriber disconnected
final PublisherEndpoint senderPublisher = senderKurentoParticipant.publisher; final PublisherEndpoint senderPublisher = senderKurentoParticipant.publisher;
final KurentoMediaOptions options = (KurentoMediaOptions) senderPublisher.getMediaOptions(); if (senderPublisher != null) {
if (options.onlyPlayWithSubscribers != null && options.onlyPlayWithSubscribers) { // If no PublisherEndpoint, then it means that the publisher already closed it
synchronized (senderPublisher) { final KurentoMediaOptions options = (KurentoMediaOptions) senderPublisher.getMediaOptions();
senderPublisher.numberOfSubscribers--; if (options.onlyPlayWithSubscribers != null && options.onlyPlayWithSubscribers) {
if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) { synchronized (senderPublisher) {
try { senderPublisher.numberOfSubscribers--;
senderPublisher.getPlayerEndpoint().stop(); if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) {
log.info("IP Camera stream {} feed is now disabled because there are no subscribers", try {
senderPublisher.getStreamId()); senderPublisher.getPlayerEndpoint().stop();
} catch (Exception e) { log.info("IP Camera stream {} feed is now disabled because there are no subscribers",
log.info("Error while disabling feed for IP camera {}: {}", senderPublisher.getStreamId(), senderPublisher.getStreamId());
e.getMessage()); } catch (Exception e) {
log.info("Error while disabling feed for IP camera {}: {}",
senderPublisher.getStreamId(), e.getMessage());
}
} }
} }
} }
@ -483,20 +485,12 @@ public class KurentoParticipant extends Participant {
} }
public void resetPublisherEndpoint(MediaOptions mediaOptions) { public void resetPublisherEndpoint(MediaOptions mediaOptions) {
log.info("Reseting publisher endpoint for participant {}", this.getParticipantPublicId()); log.info("Resetting publisher endpoint for participant {}", this.getParticipantPublicId());
this.publisher = new PublisherEndpoint(endpointType, this, this.getParticipantPublicId(), this.publisher = new PublisherEndpoint(endpointType, this, this.getParticipantPublicId(),
this.session.getPipeline(), this.openviduConfig); this.session.getPipeline(), this.openviduConfig);
this.publisher.setMediaOptions(mediaOptions); this.publisher.setMediaOptions(mediaOptions);
} }
public void resetPublisherEndpoint() {
MediaOptions mediaOptions = null;
if (this.getPublisher() != null) {
mediaOptions = this.getPublisher().getMediaOptions();
}
this.resetPublisherEndpoint(mediaOptions);
}
@Override @Override
public JsonObject toJson() { public JsonObject toJson() {
return this.sharedJson(MediaEndpoint::toJson); return this.sharedJson(MediaEndpoint::toJson);

View File

@ -17,6 +17,8 @@
package io.openvidu.server.kurento.core; package io.openvidu.server.kurento.core;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -34,6 +36,7 @@ import io.openvidu.client.OpenViduException.Code;
import io.openvidu.client.internal.ProtocolElements; import io.openvidu.client.internal.ProtocolElements;
import io.openvidu.java.client.OpenViduRole; import io.openvidu.java.client.OpenViduRole;
import io.openvidu.server.core.EndReason; import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant; import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session; import io.openvidu.server.core.Session;
import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.Kms;
@ -176,7 +179,8 @@ public class KurentoSession extends Session {
checkClosed(); checkClosed();
KurentoParticipant removedParticipant = (KurentoParticipant) participants.remove(participant.getParticipantPrivateId()); KurentoParticipant removedParticipant = (KurentoParticipant) participants
.remove(participant.getParticipantPrivateId());
log.debug("SESSION {}: Cancel receiving media from participant '{}' for other participant", this.sessionId, log.debug("SESSION {}: Cancel receiving media from participant '{}' for other participant", this.sessionId,
participant.getParticipantPublicId()); participant.getParticipantPublicId());
@ -285,17 +289,24 @@ public class KurentoSession extends Session {
public void restartStatusInKurento(long kmsDisconnectionTime) { public void restartStatusInKurento(long kmsDisconnectionTime) {
log.info("Reseting process: reseting remote media objects for active session {}", this.sessionId); log.info("Resetting process: resetting remote media objects for active session {}", this.sessionId);
// Stop recording if session is being recorded // Stop recording if session is being recorded
if (recordingManager.sessionIsBeingRecorded(this.sessionId)) { if (recordingManager.sessionIsBeingRecorded(this.sessionId)) {
this.recordingManager.forceStopRecording(this, EndReason.mediaServerDisconnect, kmsDisconnectionTime); this.recordingManager.forceStopRecording(this, EndReason.mediaServerDisconnect, kmsDisconnectionTime);
} }
// Store MediaOptions for resetting PublisherEndpoints later
Map<String, MediaOptions> mediaOptionsMap = new HashMap<>();
// Close all MediaEndpoints of participants // Close all MediaEndpoints of participants
this.getParticipants().forEach(p -> { this.getParticipants().forEach(p -> {
KurentoParticipant kParticipant = (KurentoParticipant) p; KurentoParticipant kParticipant = (KurentoParticipant) p;
final boolean wasStreaming = kParticipant.isStreaming(); final boolean wasStreaming = kParticipant.isStreaming();
if (wasStreaming) {
mediaOptionsMap.put(kParticipant.getParticipantPublicId(),
kParticipant.getPublisher().getMediaOptions());
}
kParticipant.releaseAllFilters(); kParticipant.releaseAllFilters();
kParticipant.close(EndReason.mediaServerDisconnect, false, kmsDisconnectionTime); kParticipant.close(EndReason.mediaServerDisconnect, false, kmsDisconnectionTime);
if (wasStreaming) { if (wasStreaming) {
@ -306,21 +317,22 @@ public class KurentoSession extends Session {
// Release pipeline, create a new one and prepare new PublisherEndpoints for // Release pipeline, create a new one and prepare new PublisherEndpoints for
// allowed users // allowed users
log.info("Reseting process: closing media pipeline for active session {}", this.sessionId); log.info("Resetting process: closing media pipeline for active session {}", this.sessionId);
this.closePipeline(() -> { this.closePipeline(() -> {
log.info("Reseting process: media pipeline closed for active session {}", this.sessionId); log.info("Resetting process: media pipeline closed for active session {}", this.sessionId);
createPipeline(); createPipeline();
try { try {
if (!pipelineLatch.await(20, TimeUnit.SECONDS)) { if (!pipelineLatch.await(20, TimeUnit.SECONDS)) {
throw new Exception("MediaPipleine was not created in 20 seconds"); throw new Exception("MediaPipeline was not created in 20 seconds");
} }
getParticipants().forEach(p -> { getParticipants().forEach(p -> {
if (!OpenViduRole.SUBSCRIBER.equals(p.getToken().getRole())) { if (!OpenViduRole.SUBSCRIBER.equals(p.getToken().getRole())) {
((KurentoParticipant) p).resetPublisherEndpoint(); ((KurentoParticipant) p)
.resetPublisherEndpoint(mediaOptionsMap.get(p.getParticipantPublicId()));
} }
}); });
log.info( log.info(
"Reseting process: media pipeline created and publisher endpoints reseted for active session {}", "Resetting process: media pipeline created and publisher endpoints reseted for active session {}",
this.sessionId); this.sessionId);
} catch (Exception e) { } catch (Exception e) {
log.error("Error waiting to new MediaPipeline on KurentoSession restart: {}", e.getMessage()); log.error("Error waiting to new MediaPipeline on KurentoSession restart: {}", e.getMessage());