From 883d686542f2c72c86b8365978cadf2b13d315e4 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Mon, 10 Jun 2019 17:03:41 +0200 Subject: [PATCH] openvidu-server: refactoring for multiple KMS support --- .../io/openvidu/server/OpenViduServer.java | 32 ++- .../io/openvidu/server/core/MediaOptions.java | 2 +- .../server/core/SessionEventsHandler.java | 2 +- .../AutodiscoveryKurentoClientProvider.java | 40 ---- .../server/kurento/KurentoClientProvider.java | 46 ----- .../OpenViduKurentoClientSessionInfo.java | 54 ----- .../kurento/core/KurentoMediaOptions.java | 2 +- .../kurento/core/KurentoParticipant.java | 3 +- .../server/kurento/core/KurentoSession.java | 19 +- .../kurento/core/KurentoSessionManager.java | 67 +++---- .../kurento/{ => endpoint}/KurentoFilter.java | 2 +- .../kurento/endpoint/MediaEndpoint.java | 1 - .../kurento/endpoint/PublisherEndpoint.java | 1 - .../kurento/{ => endpoint}/TrackType.java | 4 +- .../DummyLoadManager.java} | 25 ++- .../kurento/kms/FixedOneKmsManager.java | 98 +++++----- .../io/openvidu/server/kurento/kms/Kms.java | 75 ++++--- .../server/kurento/kms/KmsManager.java | 184 ++++++++++-------- .../server/kurento/kms/LoadManager.java | 3 + .../kurento/kms/MaxWebRtcLoadManager.java | 55 ------ .../server/recording/CompositeWrapper.java | 7 +- .../service/ComposedRecordingService.java | 13 +- .../recording/service/RecordingManager.java | 36 ++-- .../service/SingleStreamRecordingService.java | 19 +- .../server/test/core/RoomManagerTest.java | 48 +---- 25 files changed, 308 insertions(+), 530 deletions(-) delete mode 100644 openvidu-server/src/main/java/io/openvidu/server/kurento/AutodiscoveryKurentoClientProvider.java delete mode 100644 openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientProvider.java delete mode 100644 openvidu-server/src/main/java/io/openvidu/server/kurento/OpenViduKurentoClientSessionInfo.java rename openvidu-server/src/main/java/io/openvidu/server/kurento/{ => endpoint}/KurentoFilter.java (97%) rename openvidu-server/src/main/java/io/openvidu/server/kurento/{ => endpoint}/TrackType.java (90%) rename openvidu-server/src/main/java/io/openvidu/server/kurento/{KurentoClientSessionInfo.java => kms/DummyLoadManager.java} (60%) delete mode 100644 openvidu-server/src/main/java/io/openvidu/server/kurento/kms/MaxWebRtcLoadManager.java diff --git a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java index db64f4a2..7af2e42e 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java +++ b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java @@ -56,12 +56,13 @@ import io.openvidu.server.core.TokenGenerator; import io.openvidu.server.core.TokenGeneratorDefault; import io.openvidu.server.coturn.CoturnCredentialsService; import io.openvidu.server.coturn.CoturnCredentialsServiceFactory; -import io.openvidu.server.kurento.AutodiscoveryKurentoClientProvider; -import io.openvidu.server.kurento.KurentoClientProvider; import io.openvidu.server.kurento.core.KurentoParticipantEndpointConfig; import io.openvidu.server.kurento.core.KurentoSessionEventsHandler; import io.openvidu.server.kurento.core.KurentoSessionManager; +import io.openvidu.server.kurento.kms.DummyLoadManager; import io.openvidu.server.kurento.kms.FixedOneKmsManager; +import io.openvidu.server.kurento.kms.KmsManager; +import io.openvidu.server.kurento.kms.LoadManager; import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.rpc.RpcHandler; import io.openvidu.server.rpc.RpcNotificationService; @@ -91,8 +92,7 @@ public class OpenViduServer implements JsonRpcConfigurer { @Bean @ConditionalOnMissingBean - public KurentoClientProvider kmsManager() { - + public KmsManager kmsManager() { JsonParser parser = new JsonParser(); String uris = env.getProperty(KMSS_URIS_PROPERTY); JsonElement elem = parser.parse(uris); @@ -104,14 +104,8 @@ public class OpenViduServer implements JsonRpcConfigurer { } String firstKmsWsUri = kmsWsUris.get(0); - - if (firstKmsWsUri.equals("autodiscovery")) { - log.info("Using autodiscovery rules to locate KMS on every pipeline"); - return new AutodiscoveryKurentoClientProvider(); - } else { - log.info("Configuring OpenVidu Server to use first of the following kmss: " + kmsWsUris); - return new FixedOneKmsManager(firstKmsWsUri); - } + log.info("OpenVidu Server using one KMS: {}", kmsWsUris); + return new FixedOneKmsManager(firstKmsWsUri); } @Bean @@ -156,6 +150,12 @@ public class OpenViduServer implements JsonRpcConfigurer { return new TokenGeneratorDefault(); } + @Bean + @ConditionalOnMissingBean + public LoadManager loadManager() { + return new DummyLoadManager(); + } + @Bean @ConditionalOnMissingBean public OpenviduConfig openviduConfig() { @@ -275,12 +275,8 @@ public class OpenViduServer implements JsonRpcConfigurer { @EventListener(ApplicationReadyEvent.class) public void whenReady() { final String NEW_LINE = System.lineSeparator(); - String str = NEW_LINE + - NEW_LINE + " ACCESS IP " + - NEW_LINE + "-------------------------" + - NEW_LINE + httpUrl + - NEW_LINE + "-------------------------" + - NEW_LINE; + String str = NEW_LINE + NEW_LINE + " ACCESS IP " + NEW_LINE + "-------------------------" + + NEW_LINE + httpUrl + NEW_LINE + "-------------------------" + NEW_LINE; log.info(str); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/MediaOptions.java b/openvidu-server/src/main/java/io/openvidu/server/core/MediaOptions.java index 57bf2e9a..551a9c2d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/MediaOptions.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/MediaOptions.java @@ -19,7 +19,7 @@ package io.openvidu.server.core; import com.google.gson.JsonObject; -import io.openvidu.server.kurento.KurentoFilter; +import io.openvidu.server.kurento.endpoint.KurentoFilter; public class MediaOptions { diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java index 890063b1..704806b3 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java @@ -40,8 +40,8 @@ import io.openvidu.java.client.OpenViduRole; import io.openvidu.server.cdr.CallDetailRecord; import io.openvidu.server.config.InfoHandler; import io.openvidu.server.config.OpenviduConfig; -import io.openvidu.server.kurento.KurentoFilter; import io.openvidu.server.kurento.core.KurentoParticipant; +import io.openvidu.server.kurento.endpoint.KurentoFilter; import io.openvidu.server.recording.Recording; import io.openvidu.server.rpc.RpcNotificationService; diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/AutodiscoveryKurentoClientProvider.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/AutodiscoveryKurentoClientProvider.java deleted file mode 100644 index ee38a314..00000000 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/AutodiscoveryKurentoClientProvider.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.openvidu.server.kurento; - -import org.kurento.client.KurentoClient; -import org.kurento.client.Properties; - -import io.openvidu.client.OpenViduException; - -public class AutodiscoveryKurentoClientProvider implements KurentoClientProvider { - - private static final int ROOM_PIPELINE_LOAD_POINTS = 50; - - @Override - public KurentoClient getKurentoClient(KurentoClientSessionInfo sessionInfo) throws OpenViduException { - - return KurentoClient.create(Properties.of("loadPoints", ROOM_PIPELINE_LOAD_POINTS)); - - } - - @Override - public boolean destroyWhenUnused() { - return true; - } -} diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientProvider.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientProvider.java deleted file mode 100644 index 2172fe87..00000000 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientProvider.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.openvidu.server.kurento; - -import org.kurento.client.KurentoClient; - -import io.openvidu.client.OpenViduException; - -/** - * This service interface was designed so that the room manager could obtain a {@link KurentoClient} - * instance at any time, without requiring knowledge about the placement of the media server - * instances. It is left for the developer to provide an implementation for this API. - * - * @author Pablo Fuente (pablofuenteperez@gmail.com) - */ -public interface KurentoClientProvider { - - /** - * Obtains a {@link KurentoClient} instance given the custom session bean. Normally, it'd be - * called during a room's instantiation. - * - * @param sessionInfo - * custom information object required by the implementors of this interface - * @return the {@link KurentoClient} instance - * @throws OpenViduException - * in case there is an error obtaining a {@link KurentoClient} instance - */ - KurentoClient getKurentoClient(KurentoClientSessionInfo sessionInfo) throws OpenViduException; - - boolean destroyWhenUnused(); -} diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/OpenViduKurentoClientSessionInfo.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/OpenViduKurentoClientSessionInfo.java deleted file mode 100644 index a5ca9b66..00000000 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/OpenViduKurentoClientSessionInfo.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.openvidu.server.kurento; - -/** - * Implementation of the session info interface, contains a participant's - * private id and the session's id. - * - * @author Pablo Fuente (pablofuenteperez@gmail.com) - * - */ -public class OpenViduKurentoClientSessionInfo implements KurentoClientSessionInfo { - - private String participantPrivateId; - private String sessionId; - - public OpenViduKurentoClientSessionInfo(String participantPrivateId, String roomName) { - super(); - this.participantPrivateId = participantPrivateId; - this.sessionId = roomName; - } - - public String getParticipantPrivateId() { - return participantPrivateId; - } - - public void setParticipantPrivateId(String participantPrivateId) { - this.participantPrivateId = participantPrivateId; - } - - @Override - public String getRoomName() { - return sessionId; - } - - public void setSessionId(String sessionId) { - this.sessionId = sessionId; - } -} diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java index 13abb43f..cb3d57b5 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java @@ -21,7 +21,7 @@ import org.kurento.client.MediaElement; import org.kurento.client.MediaType; import io.openvidu.server.core.MediaOptions; -import io.openvidu.server.kurento.KurentoFilter; +import io.openvidu.server.kurento.endpoint.KurentoFilter; public class KurentoMediaOptions extends MediaOptions { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index e4c7c013..fdc7692b 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -372,8 +372,7 @@ public class KurentoParticipant extends Participant { if (this.openviduConfig.isRecordingModuleEnabled() && this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) { - this.recordingManager.stopOneIndividualStreamRecording(session.getSessionId(), - this.getPublisherStreamId(), false); + this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId()); } publisher.unregisterErrorListeners(); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index 87cff7bb..8871619b 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -25,7 +25,6 @@ import org.kurento.client.Continuation; import org.kurento.client.ErrorEvent; import org.kurento.client.EventListener; import org.kurento.client.IceCandidate; -import org.kurento.client.KurentoClient; import org.kurento.client.MediaPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +37,7 @@ import io.openvidu.java.client.Recording.OutputMode; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; +import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.recording.Recording; /** @@ -51,7 +51,7 @@ public class KurentoSession extends Session { private MediaPipeline pipeline; private CountDownLatch pipelineLatch = new CountDownLatch(1); - private KurentoClient kurentoClient; + private Kms kms; private KurentoSessionEventsHandler kurentoSessionHandler; private KurentoParticipantEndpointConfig kurentoEndpointConfig; @@ -63,11 +63,10 @@ public class KurentoSession extends Session { public final ConcurrentHashMap publishedStreamIds = new ConcurrentHashMap<>(); - public KurentoSession(Session sessionNotActive, KurentoClient kurentoClient, - KurentoSessionEventsHandler kurentoSessionHandler, KurentoParticipantEndpointConfig kurentoEndpointConfig, - boolean destroyKurentoClient) { + public KurentoSession(Session sessionNotActive, Kms kms, KurentoSessionEventsHandler kurentoSessionHandler, + KurentoParticipantEndpointConfig kurentoEndpointConfig, boolean destroyKurentoClient) { super(sessionNotActive); - this.kurentoClient = kurentoClient; + this.kms = kms; this.destroyKurentoClient = destroyKurentoClient; this.kurentoSessionHandler = kurentoSessionHandler; this.kurentoEndpointConfig = kurentoEndpointConfig; @@ -158,7 +157,7 @@ public class KurentoSession extends Session { log.debug("Session {} closed", this.sessionId); if (destroyKurentoClient) { - kurentoClient.destroy(); + kms.getKurentoClient().destroy(); } this.closed = true; @@ -192,6 +191,10 @@ public class KurentoSession extends Session { } } + public Kms getKms() { + return this.kms; + } + public MediaPipeline getPipeline() { try { pipelineLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS); @@ -208,7 +211,7 @@ public class KurentoSession extends Session { } log.info("SESSION {}: Creating MediaPipeline", sessionId); try { - kurentoClient.createMediaPipeline(new Continuation() { + kms.getKurentoClient().createMediaPipeline(new Continuation() { @Override public void onSuccess(MediaPipeline result) throws Exception { pipeline = result; diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 044c4f1f..02feebf5 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -24,7 +24,6 @@ import java.util.Set; import org.kurento.client.GenericMediaElement; import org.kurento.client.IceCandidate; -import org.kurento.client.KurentoClient; import org.kurento.client.ListenerSubscription; import org.kurento.jsonrpc.Props; import org.kurento.jsonrpc.message.Request; @@ -50,12 +49,11 @@ import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.core.SessionManager; -import io.openvidu.server.kurento.KurentoClientProvider; -import io.openvidu.server.kurento.KurentoClientSessionInfo; -import io.openvidu.server.kurento.KurentoFilter; -import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo; +import io.openvidu.server.kurento.endpoint.KurentoFilter; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.kurento.endpoint.SdpType; +import io.openvidu.server.kurento.kms.Kms; +import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.rpc.RpcHandler; import io.openvidu.server.utils.JsonUtils; @@ -64,7 +62,7 @@ public class KurentoSessionManager extends SessionManager { private static final Logger log = LoggerFactory.getLogger(KurentoSessionManager.class); @Autowired - private KurentoClientProvider kcProvider; + private KmsManager kmsManager; @Autowired private KurentoSessionEventsHandler kurentoSessionEventsHandler; @@ -72,18 +70,14 @@ public class KurentoSessionManager extends SessionManager { @Autowired private KurentoParticipantEndpointConfig kurentoEndpointConfig; - private KurentoClient kurentoClient; - @Override public synchronized void joinRoom(Participant participant, String sessionId, Integer transactionId) { Set existingParticipants = null; try { - KurentoClientSessionInfo kcSessionInfo = new OpenViduKurentoClientSessionInfo( - participant.getParticipantPrivateId(), sessionId); KurentoSession kSession = (KurentoSession) sessions.get(sessionId); - if (kSession == null && kcSessionInfo != null) { + if (kSession == null) { // First user connecting to the session Session sessionNotActive = sessionsNotActive.remove(sessionId); @@ -96,20 +90,18 @@ public class KurentoSessionManager extends SessionManager { openviduConfig, recordingManager); } - createSession(sessionNotActive, kcSessionInfo); - } - kSession = (KurentoSession) sessions.get(sessionId); - if (kSession == null) { - log.warn("Session '{}' not found"); - throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, "Session '" + sessionId - + "' was not found, must be created before '" + sessionId + "' can join"); + Kms lessLoadedKms = this.kmsManager.getLessLoadedKms(); + log.info("KMS less loaded is {} with a load of {}", lessLoadedKms.getUri(), lessLoadedKms.getLoad()); + kSession = createSession(sessionNotActive, lessLoadedKms); } + if (kSession.isClosed()) { log.warn("'{}' is trying to join session '{}' but it is closing", participant.getParticipantPublicId(), sessionId); throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "'" + participant.getParticipantPublicId() + "' is trying to join session '" + sessionId + "' but it is closing"); } + existingParticipants = getParticipants(sessionId); kSession.join(participant); } catch (OpenViduException e) { @@ -492,38 +484,29 @@ public class KurentoSessionManager extends SessionManager { } /** - * Creates a session if it doesn't already exist. The session's id will be - * indicated by the session info bean. - * - * @param kcSessionInfo bean that will be passed to the - * {@link KurentoClientProvider} in order to obtain the - * {@link KurentoClient} that will be used by the room + * Creates a session with the already existing not-active session in the + * indicated KMS, if it doesn't already exist + * * @throws OpenViduException in case of error while creating the session */ - public void createSession(Session sessionNotActive, KurentoClientSessionInfo kcSessionInfo) - throws OpenViduException { - String sessionId = kcSessionInfo.getRoomName(); - KurentoSession session = (KurentoSession) sessions.get(sessionId); + public KurentoSession createSession(Session sessionNotActive, Kms kms) throws OpenViduException { + KurentoSession session = (KurentoSession) sessions.get(sessionNotActive.getSessionId()); if (session != null) { throw new OpenViduException(Code.ROOM_CANNOT_BE_CREATED_ERROR_CODE, - "Session '" + sessionId + "' already exists"); + "Session '" + session.getSessionId() + "' already exists"); } - this.kurentoClient = kcProvider.getKurentoClient(kcSessionInfo); - session = new KurentoSession(sessionNotActive, kurentoClient, kurentoSessionEventsHandler, - kurentoEndpointConfig, kcProvider.destroyWhenUnused()); + session = new KurentoSession(sessionNotActive, kms, kurentoSessionEventsHandler, kurentoEndpointConfig, + kmsManager.destroyWhenUnused()); - KurentoSession oldSession = (KurentoSession) sessions.putIfAbsent(sessionId, session); + KurentoSession oldSession = (KurentoSession) sessions.putIfAbsent(session.getSessionId(), session); if (oldSession != null) { - log.warn("Session '{}' has just been created by another thread", sessionId); - return; + log.warn("Session '{}' has just been created by another thread", session.getSessionId()); + return oldSession; } - String kcName = "[NAME NOT AVAILABLE]"; - if (kurentoClient.getServerManager() != null) { - kcName = kurentoClient.getServerManager().getName(); - } - log.warn("No session '{}' exists yet. Created one using KurentoClient '{}'.", sessionId, kcName); + log.warn("No session '{}' exists yet. Created one on KMS '{}'", session.getSessionId(), kms.getUri()); sessionEventsHandler.onSessionCreated(session); + return session; } @Override @@ -852,6 +835,10 @@ public class KurentoSessionManager extends SessionManager { return ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); } + public KmsManager getKmsManager() { + return this.kmsManager; + } + private void applyFilterInPublisher(KurentoParticipant kParticipant, KurentoFilter filter) throws OpenViduException { GenericMediaElement.Builder builder = new GenericMediaElement.Builder(kParticipant.getPipeline(), diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoFilter.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/KurentoFilter.java similarity index 97% rename from openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoFilter.java rename to openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/KurentoFilter.java index 6370bcc5..58623bd0 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoFilter.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/KurentoFilter.java @@ -15,7 +15,7 @@ * */ -package io.openvidu.server.kurento; +package io.openvidu.server.kurento.endpoint; import com.google.gson.JsonElement; import com.google.gson.JsonObject; diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java index fd25534e..855e5898 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java @@ -38,7 +38,6 @@ import org.kurento.client.WebRtcEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonObject; diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index 9ea7d0c9..b1c8f7c2 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -46,7 +46,6 @@ import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.MediaOptions; -import io.openvidu.server.kurento.TrackType; import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.utils.JsonUtils; diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/TrackType.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/TrackType.java similarity index 90% rename from openvidu-server/src/main/java/io/openvidu/server/kurento/TrackType.java rename to openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/TrackType.java index 38be72bd..eec03479 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/TrackType.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/TrackType.java @@ -15,8 +15,8 @@ * */ -package io.openvidu.server.kurento; +package io.openvidu.server.kurento.endpoint; public enum TrackType { - ALL, VIDEO, AUDIO; + ALL, VIDEO, AUDIO; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientSessionInfo.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java similarity index 60% rename from openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientSessionInfo.java rename to openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java index d818041c..fcb8e3ea 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientSessionInfo.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java @@ -15,19 +15,18 @@ * */ -package io.openvidu.server.kurento; +package io.openvidu.server.kurento.kms; -import org.kurento.client.KurentoClient; +public class DummyLoadManager implements LoadManager { + + @Override + public double calculateLoad(Kms kms) { + return 1; + } + + @Override + public boolean allowMoreElements(Kms kms) { + return true; + } -/** - * Interface for beans holding information required to obtain a {@link KurentoClient}. - * - * @author Radu Tom Vlad - * - */ -public interface KurentoClientSessionInfo { - /** - * @return the room's name (or id) for whom a {@link KurentoClient} will be needed - */ - public String getRoomName(); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index a467bca7..fd708aee 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -17,72 +17,62 @@ package io.openvidu.server.kurento.kms; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - import org.kurento.client.KurentoClient; import org.kurento.client.KurentoConnectionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import io.openvidu.server.core.SessionManager; import io.openvidu.server.kurento.core.KurentoSession; +import io.openvidu.server.kurento.core.KurentoSessionManager; public class FixedOneKmsManager extends KmsManager { private static final Logger log = LoggerFactory.getLogger(FixedOneKmsManager.class); - @Autowired - SessionManager sessionManager; - - public static final AtomicBoolean CONNECTED_TO_KMS = new AtomicBoolean(false); - public static final AtomicLong TIME_OF_DISCONNECTION = new AtomicLong(0); - public FixedOneKmsManager(String kmsWsUri) { - this(kmsWsUri, 1); + KurentoClient kClient = KurentoClient.create(kmsWsUri, new KurentoConnectionListener() { + + @Override + public void reconnected(boolean isReconnected) { + ((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, true); + if (!isReconnected) { + // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) + log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsWsUri); + log.warn("Updating all webrtc endpoints for active sessions"); + sessionManager.getSessions().forEach(s -> { + ((KurentoSession) s).restartStatusInKurento(); + }); + } else { + // Same KMS. We may infer that openvidu-server/KMS connection has been lost, but + // not the clients/KMS connections + log.warn("Kurento Client reconnected to same KMS with uri {}", kmsWsUri); + } + } + + @Override + public void disconnected() { + ((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, + false); + ((KurentoSessionManager) sessionManager).getKmsManager().setTimeOfKurentoClientDisconnection(kmsWsUri, + System.currentTimeMillis()); + log.warn("Kurento Client disconnected from KMS with uri {}", kmsWsUri); + } + + @Override + public void connectionFailed() { + ((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, + false); + log.warn("Kurento Client failed connecting to KMS with uri {}", kmsWsUri); + } + + @Override + public void connected() { + ((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, true); + log.warn("Kurento Client is now connected to KMS with uri {}", kmsWsUri); + } + }); + + this.addKms(new Kms(kmsWsUri, kClient, loadManager)); } - public FixedOneKmsManager(String kmsWsUri, int numKmss) { - for (int i = 0; i < numKmss; i++) { - this.addKms(new Kms(KurentoClient.create(kmsWsUri, new KurentoConnectionListener() { - - @Override - public void reconnected(boolean isReconnected) { - CONNECTED_TO_KMS.compareAndSet(false, true); - if (!isReconnected) { - // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) - log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsWsUri); - log.warn("Updating all webrtc endpoints for active sessions"); - sessionManager.getSessions().forEach(s -> { - ((KurentoSession) s).restartStatusInKurento(); - }); - } else { - // Same KMS. We can infer that openvidu-server/KMS connection has been lost, but - // not the clients/KMS connections - log.warn("Kurento Client reconnected to same KMS with uri {}", kmsWsUri); - } - } - - @Override - public void disconnected() { - CONNECTED_TO_KMS.compareAndSet(true, false); - TIME_OF_DISCONNECTION.set(System.currentTimeMillis()); - log.warn("Kurento Client disconnected from KMS with uri {}", kmsWsUri); - } - - @Override - public void connectionFailed() { - CONNECTED_TO_KMS.set(false); - log.warn("Kurento Client failed connecting to KMS with uri {}", kmsWsUri); - } - - @Override - public void connected() { - CONNECTED_TO_KMS.compareAndSet(false, true); - log.warn("Kurento Client is now connected to KMS with uri {}", kmsWsUri); - } - }), kmsWsUri)); - } - } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java index 77363477..1c33f282 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java @@ -17,36 +17,67 @@ package io.openvidu.server.kurento.kms; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import org.kurento.client.KurentoClient; +/** + * Abstraction of a KMS instance: an object of this class corresponds to a KMS + * process running somewhere. + * + * It is uniquely identified by the KMS ws URI endpoint. It encapsulates the + * WebSocket client to communicate openvidu-server Java process with it and has + * a specific LoadManager service to calculate the load of this KMS based on + * different measures + * + * @author Pablo Fuente (pablofuenteperez@gmail.com) + */ public class Kms { - private LoadManager loadManager = new MaxWebRtcLoadManager(10000); - private KurentoClient client; - private String kmsUri; + private String kmsUri; + private KurentoClient client; + private LoadManager loadManager; - public Kms(KurentoClient client, String kmsUri) { - this.client = client; - this.kmsUri = kmsUri; - } + private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false); + private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0); - public void setLoadManager(LoadManager loadManager) { - this.loadManager = loadManager; - } + public Kms(String kmsUri, KurentoClient client, LoadManager loadManager) { + this.kmsUri = kmsUri; + this.client = client; + this.loadManager = loadManager; + } - public double getLoad() { - return loadManager.calculateLoad(this); - } + public String getUri() { + return kmsUri; + } - public boolean allowMoreElements() { - return loadManager.allowMoreElements(this); - } + public KurentoClient getKurentoClient() { + return this.client; + } - public String getUri() { - return kmsUri; - } + public double getLoad() { + return loadManager.calculateLoad(this); + } + + public boolean allowMoreElements() { + return loadManager.allowMoreElements(this); + } + + public boolean isKurentoClientConnected() { + return this.isKurentoClientConnected.get(); + } + + public void setKurentoClientConnected(boolean isConnected) { + this.isKurentoClientConnected.set(isConnected); + } + + public long getTimeOfKurentoClientDisconnection() { + return this.timeOfKurentoClientDisconnection.get(); + } + + public void setTimeOfKurentoClientDisconnection(long time) { + this.timeOfKurentoClientDisconnection.set(time); + } - public KurentoClient getKurentoClient() { - return this.client; - } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index f2a79dbf..b7407074 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -21,105 +21,129 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -import org.kurento.client.KurentoClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; -import io.openvidu.client.OpenViduException; -import io.openvidu.client.OpenViduException.Code; -import io.openvidu.server.kurento.KurentoClientProvider; -import io.openvidu.server.kurento.KurentoClientSessionInfo; -import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo; +import io.openvidu.server.core.SessionManager; -public abstract class KmsManager implements KurentoClientProvider { +@Service +public abstract class KmsManager { - public static class KmsLoad implements Comparable { + public class KmsLoad implements Comparable { - private Kms kms; - private double load; + private Kms kms; + private double load; - public KmsLoad(Kms kms, double load) { - this.kms = kms; - this.load = load; - } + public KmsLoad(Kms kms, double load) { + this.kms = kms; + this.load = load; + } - public Kms getKms() { - return kms; - } + public Kms getKms() { + return kms; + } - public double getLoad() { - return load; - } + public double getLoad() { + return load; + } - @Override - public int compareTo(KmsLoad o) { - return Double.compare(this.load, o.load); - } - } + @Override + public int compareTo(KmsLoad o) { + return Double.compare(this.load, o.load); + } + } - private final Logger log = LoggerFactory.getLogger(KmsManager.class); + @Autowired + protected SessionManager sessionManager; - private List kmss = new ArrayList(); - private Iterator usageIterator = null; + @Autowired + protected LoadManager loadManager; - @Override - public KurentoClient getKurentoClient(KurentoClientSessionInfo sessionInfo) throws OpenViduException { - if (!(sessionInfo instanceof OpenViduKurentoClientSessionInfo)) { - throw new OpenViduException(Code.GENERIC_ERROR_CODE, "Unkown session info bean type (expected " - + OpenViduKurentoClientSessionInfo.class.getName() + ")"); - } - return getKms((OpenViduKurentoClientSessionInfo) sessionInfo).getKurentoClient(); - } + private final Logger log = LoggerFactory.getLogger(KmsManager.class); - /** - * Returns a {@link Kms} using a round-robin strategy. - * - * @param sessionInfo - * session's id - */ - public synchronized Kms getKms(OpenViduKurentoClientSessionInfo sessionInfo) { - if (usageIterator == null || !usageIterator.hasNext()) { - usageIterator = kmss.iterator(); - } - return usageIterator.next(); - } + // Using KMS websocket uris as unique identifiers + private Map kmss = new ConcurrentHashMap<>(); - public synchronized void addKms(Kms kms) { - this.kmss.add(kms); - } + private Iterator usageIterator = null; - public synchronized Kms getLessLoadedKms() { - return Collections.min(getKmsLoads()).kms; - } + public synchronized void addKms(Kms kms) { + this.kmss.put(kms.getUri(), kms); + } - public synchronized Kms getNextLessLoadedKms() { - List sortedLoads = getKmssSortedByLoad(); - if (sortedLoads.size() > 1) { - return sortedLoads.get(1).kms; - } else { - return sortedLoads.get(0).kms; - } - } + public synchronized void removeKms(Kms kms) { + this.kmss.remove(kms.getUri()); + } - public synchronized List getKmssSortedByLoad() { - List kmsLoads = getKmsLoads(); - Collections.sort(kmsLoads); - return kmsLoads; - } + public synchronized Kms getKms(String sessionId) { + if (usageIterator == null || !usageIterator.hasNext()) { + usageIterator = kmss.values().iterator(); + } + return usageIterator.next(); + } - private List getKmsLoads() { - ArrayList kmsLoads = new ArrayList<>(); - for (Kms kms : kmss) { - double load = kms.getLoad(); - kmsLoads.add(new KmsLoad(kms, load)); - log.trace("Calc load {} for kms: {}", load, kms.getUri()); - } - return kmsLoads; - } + /** + * Returns a {@link Kms} using a round-robin strategy. + * + * @param sessionInfo session's id + */ + public synchronized Kms getKmsRoundRobin() { + if (usageIterator == null || !usageIterator.hasNext()) { + usageIterator = kmss.values().iterator(); + } + return usageIterator.next(); + } - @Override - public boolean destroyWhenUnused() { - return false; - } + public synchronized Kms getLessLoadedKms() { + return Collections.min(getKmsLoads()).kms; + } + + public synchronized Kms getNextLessLoadedKms() { + List sortedLoads = getKmssSortedByLoad(); + if (sortedLoads.size() > 1) { + return sortedLoads.get(1).kms; + } else { + return sortedLoads.get(0).kms; + } + } + + public synchronized List getKmssSortedByLoad() { + List kmsLoads = getKmsLoads(); + Collections.sort(kmsLoads); + return kmsLoads; + } + + public boolean isKurentoClientConnectedToKms(Kms kms) { + return this.kmss.get(kms.getUri()).isKurentoClientConnected(); + } + + public long getTimeOfKurentoClientDisconnection(Kms kms) { + return this.kmss.get(kms.getUri()).getTimeOfKurentoClientDisconnection(); + } + + public void setKurentoClientConnectedToKms(String kmsUri, boolean isConnected) { + this.kmss.get(kmsUri).setKurentoClientConnected(isConnected); + } + + public void setTimeOfKurentoClientDisconnection(String kmsUri, long time) { + this.kmss.get(kmsUri).setTimeOfKurentoClientDisconnection(time); + } + + private List getKmsLoads() { + ArrayList kmsLoads = new ArrayList<>(); + for (Kms kms : kmss.values()) { + double load = kms.getLoad(); + kmsLoads.add(new KmsLoad(kms, load)); + log.trace("Calc load {} for kms: {}", load, kms.getUri()); + } + return kmsLoads; + } + + public boolean destroyWhenUnused() { + return false; + } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java index 570499cd..a7754ba3 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java @@ -17,6 +17,9 @@ package io.openvidu.server.kurento.kms; +import org.springframework.stereotype.Service; + +@Service public interface LoadManager { double calculateLoad(Kms kms); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/MaxWebRtcLoadManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/MaxWebRtcLoadManager.java deleted file mode 100644 index b6623f28..00000000 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/MaxWebRtcLoadManager.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.openvidu.server.kurento.kms; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MaxWebRtcLoadManager implements LoadManager { - private static final Logger log = LoggerFactory.getLogger(MaxWebRtcLoadManager.class); - - private int maxWebRtcPerKms; - - public MaxWebRtcLoadManager(int maxWebRtcPerKms) { - this.maxWebRtcPerKms = maxWebRtcPerKms; - } - - @Override - public double calculateLoad(Kms kms) { - int numWebRtcs = countWebRtcEndpoints(kms); - if (numWebRtcs > maxWebRtcPerKms) { - return 1; - } else { - return numWebRtcs / (double) maxWebRtcPerKms; - } - } - - @Override - public boolean allowMoreElements(Kms kms) { - return countWebRtcEndpoints(kms) < maxWebRtcPerKms; - } - - private synchronized int countWebRtcEndpoints(Kms kms) { - try { - return kms.getKurentoClient().getServerManager().getPipelines().size(); - } catch (Throwable e) { - log.warn("Error counting KurentoClient pipelines", e); - return 0; - } - } -} diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java b/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java index 2109cccb..b55462a4 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java @@ -38,7 +38,6 @@ import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; -import io.openvidu.server.kurento.kms.FixedOneKmsManager; public class CompositeWrapper { @@ -87,8 +86,8 @@ public class CompositeWrapper { this.recorderEndpoint.record(); } - public synchronized void stopCompositeRecording(CountDownLatch stopLatch, boolean forceAfterKmsRestart) { - if (!forceAfterKmsRestart) { + public synchronized void stopCompositeRecording(CountDownLatch stopLatch, Long timeOfKmsDisconnection) { + if (timeOfKmsDisconnection == 0) { this.recorderEndpoint.addStoppedListener(new EventListener() { @Override public void onEvent(StoppedEvent event) { @@ -102,7 +101,7 @@ public class CompositeWrapper { }); this.recorderEndpoint.stop(); } else { - endTime = FixedOneKmsManager.TIME_OF_DISCONNECTION.get(); + endTime = timeOfKmsDisconnection; stopLatch.countDown(); log.warn("Forcing composed audio-only recording stop after KMS restart in session {}", this.session.getSessionId()); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java index a64ae656..337adbe9 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java @@ -93,15 +93,10 @@ public class ComposedRecordingService extends RecordingService { @Override public Recording stopRecording(Session session, Recording recording, EndReason reason) { - return this.stopRecording(session, recording, reason, false); - } - - public Recording stopRecording(Session session, Recording recording, EndReason reason, - boolean forceAfterKmsRestart) { if (recording.hasVideo()) { return this.stopRecordingWithVideo(session, recording, reason); } else { - return this.stopRecordingAudioOnly(session, recording, reason, forceAfterKmsRestart); + return this.stopRecordingAudioOnly(session, recording, reason); } } @@ -331,8 +326,7 @@ public class ComposedRecordingService extends RecordingService { return recording; } - private Recording stopRecordingAudioOnly(Session session, Recording recording, EndReason reason, - boolean forceAfterKmsRestart) { + private Recording stopRecordingAudioOnly(Session session, Recording recording, EndReason reason) { log.info("Stopping composed (audio-only) recording {} of session {}. Reason: {}", recording.getId(), recording.getSessionId(), reason); @@ -349,10 +343,9 @@ public class ComposedRecordingService extends RecordingService { } CompositeWrapper compositeWrapper = this.composites.remove(sessionId); - final CountDownLatch stoppedCountDown = new CountDownLatch(1); + compositeWrapper.stopCompositeRecording(stoppedCountDown, ((KurentoSession)session).getKms().getTimeOfKurentoClientDisconnection()); - compositeWrapper.stopCompositeRecording(stoppedCountDown, forceAfterKmsRestart); try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { recording.setStatus(io.openvidu.java.client.Recording.Status.failed); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index 123c1cc5..61394602 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -62,9 +62,8 @@ import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionManager; -import io.openvidu.server.kurento.KurentoClientProvider; -import io.openvidu.server.kurento.KurentoClientSessionInfo; -import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo; +import io.openvidu.server.kurento.core.KurentoSession; +import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.recording.Recording; import io.openvidu.server.utils.CustomFileManager; import io.openvidu.server.utils.DockerManager; @@ -89,7 +88,7 @@ public class RecordingManager { protected OpenviduConfig openviduConfig; @Autowired - private KurentoClientProvider kcProvider; + private KmsManager kmsManager; protected Map startingRecordings = new ConcurrentHashMap<>(); protected Map startedRecordings = new ConcurrentHashMap<>(); @@ -221,10 +220,10 @@ public class RecordingManager { recording = this.sessionsRecordings.get(session.getSessionId()); switch (recording.getOutputMode()) { case COMPOSED: - recording = this.composedRecordingService.stopRecording(session, recording, reason, true); + recording = this.composedRecordingService.stopRecording(session, recording, reason); break; case INDIVIDUAL: - recording = this.singleStreamRecordingService.stopRecording(session, recording, reason, true); + recording = this.singleStreamRecordingService.stopRecording(session, recording, reason); break; } this.abortAutomaticRecordingStopThread(session); @@ -254,22 +253,23 @@ public class RecordingManager { } } - public void stopOneIndividualStreamRecording(String sessionId, String streamId, boolean forceAfterKmsRestart) { - Recording recording = this.sessionsRecordings.get(sessionId); + public void stopOneIndividualStreamRecording(KurentoSession session, String streamId) { + Recording recording = this.sessionsRecordings.get(session.getSessionId()); if (recording == null) { log.error("Cannot stop recording of existing stream {}. Session {} is not being recorded", streamId, - sessionId); + session.getSessionId()); } if (io.openvidu.java.client.Recording.OutputMode.INDIVIDUAL.equals(recording.getOutputMode())) { // Stop specific RecorderEndpoint for this stream - log.info("Stopping RecorderEndpoint in session {} for stream of participant {}", sessionId, streamId); + log.info("Stopping RecorderEndpoint in session {} for stream of participant {}", session.getSessionId(), + streamId); final CountDownLatch stoppedCountDown = new CountDownLatch(1); - this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(sessionId, streamId, - stoppedCountDown, forceAfterKmsRestart); + this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(session.getSessionId(), streamId, + stoppedCountDown, session.getKms().getTimeOfKurentoClientDisconnection()); try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { log.error("Error waiting for recorder endpoint of stream {} to stop in session {}", streamId, - sessionId); + session.getSessionId()); } } catch (InterruptedException e) { log.error("Exception while waiting for state change", e); @@ -277,9 +277,9 @@ public class RecordingManager { } else if (io.openvidu.java.client.Recording.OutputMode.COMPOSED.equals(recording.getOutputMode()) && !recording.hasVideo()) { // Disconnect this stream from existing Composite recorder - log.info("Removing PublisherEndpoint from Composite in session {} for stream of participant {}", sessionId, - streamId); - this.composedRecordingService.removePublisherEndpointFromComposite(sessionId, streamId); + log.info("Removing PublisherEndpoint from Composite in session {} for stream of participant {}", + session.getSessionId(), streamId); + this.composedRecordingService.removePublisherEndpointFromComposite(session.getSessionId(), streamId); } } @@ -536,9 +536,7 @@ public class RecordingManager { final String testFilePath = testFolderPath + "/TEST_RECORDING_PATH.webm"; // Check Kurento Media Server write permissions in recording path - KurentoClientSessionInfo kcSessionInfo = new OpenViduKurentoClientSessionInfo("TEST_RECORDING_PATH", - "TEST_RECORDING_PATH"); - MediaPipeline pipeline = this.kcProvider.getKurentoClient(kcSessionInfo).createMediaPipeline(); + MediaPipeline pipeline = this.kmsManager.getLessLoadedKms().getKurentoClient().createMediaPipeline(); RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, "file://" + testFilePath).build(); final AtomicBoolean kurentoRecorderError = new AtomicBoolean(false); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index 3b7a6887..b6f14958 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -56,8 +56,8 @@ import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.kurento.core.KurentoParticipant; +import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; -import io.openvidu.server.kurento.kms.FixedOneKmsManager; import io.openvidu.server.recording.RecorderEndpointWrapper; import io.openvidu.server.recording.Recording; @@ -129,11 +129,6 @@ public class SingleStreamRecordingService extends RecordingService { @Override public Recording stopRecording(Session session, Recording recording, EndReason reason) { - return this.stopRecording(session, recording, reason, false); - } - - public Recording stopRecording(Session session, Recording recording, EndReason reason, - boolean forceAfterKmsRestart) { log.info("Stopping individual ({}) recording {} of session {}. Reason: {}", recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recording.getId(), recording.getSessionId(), reason); @@ -141,9 +136,11 @@ public class SingleStreamRecordingService extends RecordingService { final int numberOfActiveRecorders = recorders.get(recording.getSessionId()).size(); final CountDownLatch stoppedCountDown = new CountDownLatch(numberOfActiveRecorders); + final long timeOfKurentoClientDisconnection = ((KurentoSession) session).getKms() + .getTimeOfKurentoClientDisconnection(); for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) { this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(), - stoppedCountDown, forceAfterKmsRestart); + stoppedCountDown, timeOfKurentoClientDisconnection); } try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { @@ -225,10 +222,10 @@ public class SingleStreamRecordingService extends RecordingService { } public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId, - CountDownLatch globalStopLatch, boolean forceAfterKmsRestart) { + CountDownLatch globalStopLatch, Long kmsDisconnectionTime) { log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId); final RecorderEndpointWrapper finalWrapper = this.recorders.get(sessionId).remove(streamId); - if (finalWrapper != null && !forceAfterKmsRestart) { + if (finalWrapper != null && kmsDisconnectionTime == 0) { finalWrapper.getRecorder().addStoppedListener(new EventListener() { @Override public void onEvent(StoppedEvent event) { @@ -241,8 +238,8 @@ public class SingleStreamRecordingService extends RecordingService { }); finalWrapper.getRecorder().stop(); } else { - if (forceAfterKmsRestart) { - finalWrapper.setEndTime(FixedOneKmsManager.TIME_OF_DISCONNECTION.get()); + if (kmsDisconnectionTime != 0) { + finalWrapper.setEndTime(kmsDisconnectionTime); generateIndividualMetadataFile(finalWrapper); log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId, sessionId); diff --git a/openvidu-server/src/test/java/io/openvidu/server/test/core/RoomManagerTest.java b/openvidu-server/src/test/java/io/openvidu/server/test/core/RoomManagerTest.java index f9644622..63c2fdea 100644 --- a/openvidu-server/src/test/java/io/openvidu/server/test/core/RoomManagerTest.java +++ b/openvidu-server/src/test/java/io/openvidu/server/test/core/RoomManagerTest.java @@ -16,41 +16,9 @@ package io.openvidu.server.test.core; -import static org.junit.matchers.JUnitMatchers.containsString; -import static org.junit.matchers.JUnitMatchers.hasItem; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.whenNew; - -import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -62,9 +30,7 @@ import org.kurento.client.ErrorEvent; import org.kurento.client.EventListener; import org.kurento.client.FaceOverlayFilter; import org.kurento.client.HubPort; -import org.kurento.client.IceCandidate; import org.kurento.client.KurentoClient; -import org.kurento.client.MediaElement; import org.kurento.client.MediaPipeline; import org.kurento.client.MediaType; import org.kurento.client.Mixer; @@ -75,25 +41,15 @@ import org.kurento.client.ServerManager; import org.kurento.client.WebRtcEndpoint; import org.mockito.ArgumentCaptor; import org.mockito.Captor; -import org.mockito.Matchers; import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import org.springframework.context.ConfigurableApplicationContext; -import io.openvidu.client.OpenViduException; -import io.openvidu.client.OpenViduException.Code; -import io.openvidu.server.OpenViduServer; import io.openvidu.server.core.Participant; import io.openvidu.server.core.SessionManager; -import io.openvidu.server.core.Token; -import io.openvidu.server.kurento.KurentoClientProvider; -import io.openvidu.server.kurento.KurentoClientSessionInfo; import io.openvidu.server.kurento.core.KurentoSessionEventsHandler; -import io.openvidu.server.kurento.core.KurentoSessionManager; +import io.openvidu.server.kurento.kms.KmsManager; /** * Tests for {@link RoomManager} when using mocked {@link KurentoClient} resources. @@ -124,7 +80,7 @@ public class RoomManagerTest { private SessionManager manager; @Mock - private KurentoClientProvider kcProvider; + private KmsManager kcProvider; @Mock private KurentoSessionEventsHandler roomHandler;