From 4d5a023b5c399a55a59d9ec6a0b7f0710a60dfbd Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Fri, 15 Nov 2019 16:28:12 +0100 Subject: [PATCH] openvidu-server: hidden NullPointer bug fix when closing Subscriber endpoints after KMS reconnection (associated PublisherEndpoint possibly null) --- .../kurento/core/KurentoParticipant.java | 44 ++++++++----------- .../server/kurento/core/KurentoSession.java | 26 ++++++++--- 2 files changed, 38 insertions(+), 32 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index b671b759..6d018242 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -103,9 +103,8 @@ public class KurentoParticipant extends Participant { final String publisherStreamId = this.getParticipantPublicId() + "_" + (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_" + RandomStringUtils.random(5, true, false).toUpperCase(); - this.publisher.setStreamId(publisherStreamId); - this.publisher.setEndpointName(publisherStreamId); - + publisher.setStreamId(publisherStreamId); + publisher.setEndpointName(publisherStreamId); publisher.setMediaOptions(mediaOptions); publisher.createEndpoint(publisherLatch); if (getPublisher().getEndpoint() == null) { @@ -218,7 +217,7 @@ public class KurentoParticipant extends Participant { KurentoParticipant kSender = (KurentoParticipant) sender; if (kSender.getPublisher() == null) { - log.warn("PARTICIPANT {}: Trying to connect to a user without " + "a publishing endpoint", + log.warn("PARTICIPANT {}: Trying to connect to a user without a publishing endpoint", this.getParticipantPublicId()); return null; } @@ -423,18 +422,21 @@ public class KurentoParticipant extends Participant { // Stop PlayerEndpoint of IP CAM if last subscriber disconnected final PublisherEndpoint senderPublisher = senderKurentoParticipant.publisher; - final KurentoMediaOptions options = (KurentoMediaOptions) senderPublisher.getMediaOptions(); - if (options.onlyPlayWithSubscribers != null && options.onlyPlayWithSubscribers) { - synchronized (senderPublisher) { - senderPublisher.numberOfSubscribers--; - if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) { - try { - senderPublisher.getPlayerEndpoint().stop(); - log.info("IP Camera stream {} feed is now disabled because there are no subscribers", - senderPublisher.getStreamId()); - } catch (Exception e) { - log.info("Error while disabling feed for IP camera {}: {}", senderPublisher.getStreamId(), - e.getMessage()); + if (senderPublisher != null) { + // If no PublisherEndpoint, then it means that the publisher already closed it + final KurentoMediaOptions options = (KurentoMediaOptions) senderPublisher.getMediaOptions(); + if (options.onlyPlayWithSubscribers != null && options.onlyPlayWithSubscribers) { + synchronized (senderPublisher) { + senderPublisher.numberOfSubscribers--; + if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) { + try { + senderPublisher.getPlayerEndpoint().stop(); + log.info("IP Camera stream {} feed is now disabled because there are no subscribers", + senderPublisher.getStreamId()); + } catch (Exception e) { + log.info("Error while disabling feed for IP camera {}: {}", + senderPublisher.getStreamId(), e.getMessage()); + } } } } @@ -483,20 +485,12 @@ public class KurentoParticipant extends Participant { } public void resetPublisherEndpoint(MediaOptions mediaOptions) { - log.info("Reseting publisher endpoint for participant {}", this.getParticipantPublicId()); + log.info("Resetting publisher endpoint for participant {}", this.getParticipantPublicId()); this.publisher = new PublisherEndpoint(endpointType, this, this.getParticipantPublicId(), this.session.getPipeline(), this.openviduConfig); this.publisher.setMediaOptions(mediaOptions); } - public void resetPublisherEndpoint() { - MediaOptions mediaOptions = null; - if (this.getPublisher() != null) { - mediaOptions = this.getPublisher().getMediaOptions(); - } - this.resetPublisherEndpoint(mediaOptions); - } - @Override public JsonObject toJson() { return this.sharedJson(MediaEndpoint::toJson); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index c349d451..a825952d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -17,6 +17,8 @@ package io.openvidu.server.kurento.core; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -34,6 +36,7 @@ import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.internal.ProtocolElements; import io.openvidu.java.client.OpenViduRole; import io.openvidu.server.core.EndReason; +import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.kurento.kms.Kms; @@ -176,7 +179,8 @@ public class KurentoSession extends Session { checkClosed(); - KurentoParticipant removedParticipant = (KurentoParticipant) participants.remove(participant.getParticipantPrivateId()); + KurentoParticipant removedParticipant = (KurentoParticipant) participants + .remove(participant.getParticipantPrivateId()); log.debug("SESSION {}: Cancel receiving media from participant '{}' for other participant", this.sessionId, participant.getParticipantPublicId()); @@ -285,17 +289,24 @@ public class KurentoSession extends Session { public void restartStatusInKurento(long kmsDisconnectionTime) { - log.info("Reseting process: reseting remote media objects for active session {}", this.sessionId); + log.info("Resetting process: resetting remote media objects for active session {}", this.sessionId); // Stop recording if session is being recorded if (recordingManager.sessionIsBeingRecorded(this.sessionId)) { this.recordingManager.forceStopRecording(this, EndReason.mediaServerDisconnect, kmsDisconnectionTime); } + // Store MediaOptions for resetting PublisherEndpoints later + Map mediaOptionsMap = new HashMap<>(); + // Close all MediaEndpoints of participants this.getParticipants().forEach(p -> { KurentoParticipant kParticipant = (KurentoParticipant) p; final boolean wasStreaming = kParticipant.isStreaming(); + if (wasStreaming) { + mediaOptionsMap.put(kParticipant.getParticipantPublicId(), + kParticipant.getPublisher().getMediaOptions()); + } kParticipant.releaseAllFilters(); kParticipant.close(EndReason.mediaServerDisconnect, false, kmsDisconnectionTime); if (wasStreaming) { @@ -306,21 +317,22 @@ public class KurentoSession extends Session { // Release pipeline, create a new one and prepare new PublisherEndpoints for // allowed users - log.info("Reseting process: closing media pipeline for active session {}", this.sessionId); + log.info("Resetting process: closing media pipeline for active session {}", this.sessionId); this.closePipeline(() -> { - log.info("Reseting process: media pipeline closed for active session {}", this.sessionId); + log.info("Resetting process: media pipeline closed for active session {}", this.sessionId); createPipeline(); try { if (!pipelineLatch.await(20, TimeUnit.SECONDS)) { - throw new Exception("MediaPipleine was not created in 20 seconds"); + throw new Exception("MediaPipeline was not created in 20 seconds"); } getParticipants().forEach(p -> { if (!OpenViduRole.SUBSCRIBER.equals(p.getToken().getRole())) { - ((KurentoParticipant) p).resetPublisherEndpoint(); + ((KurentoParticipant) p) + .resetPublisherEndpoint(mediaOptionsMap.get(p.getParticipantPublicId())); } }); log.info( - "Reseting process: media pipeline created and publisher endpoints reseted for active session {}", + "Resetting process: media pipeline created and publisher endpoints reseted for active session {}", this.sessionId); } catch (Exception e) { log.error("Error waiting to new MediaPipeline on KurentoSession restart: {}", e.getMessage());