From c111ed20af7c042e297904d3947471124d405b57 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Fri, 27 Mar 2020 20:55:25 +0100 Subject: [PATCH] openvidu-server: garbage collector for non active sessions --- openvidu-server/pom.xml | 29 +++- .../server/config/OpenviduConfig.java | 33 ++++- .../openvidu/server/core/SessionManager.java | 61 ++++++++- .../server/kurento/core/KurentoSession.java | 3 +- .../kurento/core/KurentoSessionManager.java | 2 + ...itional-spring-configuration-metadata.json | 12 ++ .../src/main/resources/application.properties | 3 + ...essionGarbageCollectorIntegrationTest.java | 127 ++++++++++++++++++ .../config/IntegrationTestConfiguration.java | 65 +++++++++ .../resources/integration-test.properties | 45 +++++++ openvidu-test-e2e/jenkins/Jenkinsfile | 3 + 11 files changed, 372 insertions(+), 11 deletions(-) create mode 100644 openvidu-server/src/test/java/io/openvidu/server/test/integration/SessionGarbageCollectorIntegrationTest.java create mode 100644 openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java create mode 100644 openvidu-server/src/test/resources/integration-test.properties diff --git a/openvidu-server/pom.xml b/openvidu-server/pom.xml index 4b5c1de9..fa70e94e 100644 --- a/openvidu-server/pom.xml +++ b/openvidu-server/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -105,6 +107,13 @@ maven-enforcer-plugin ${version.enforcer.plugin} + + + org.apache.maven.plugins + maven-surefire-plugin + ${version.surefire.plugin} + + @@ -314,6 +323,24 @@ + + org.springframework.boot + spring-boot-starter-test + ${version.spring-boot} + test + + + org.skyscreamer + jsonassert + + + + + org.powermock + powermock-module-junit4 + ${version.powermock} + test + org.hamcrest hamcrest-core diff --git a/openvidu-server/src/main/java/io/openvidu/server/config/OpenviduConfig.java b/openvidu-server/src/main/java/io/openvidu/server/config/OpenviduConfig.java index fc19b9c3..da3e2794 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/config/OpenviduConfig.java +++ b/openvidu-server/src/main/java/io/openvidu/server/config/OpenviduConfig.java @@ -42,11 +42,6 @@ import java.util.stream.Stream; import javax.annotation.PostConstruct; -import com.google.gson.Gson; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; - import org.apache.http.Header; import org.apache.http.message.BasicHeader; import org.kurento.jsonrpc.JsonUtils; @@ -63,6 +58,11 @@ import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.support.PropertiesLoaderUtils; import org.springframework.stereotype.Component; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + import io.openvidu.java.client.OpenViduRole; import io.openvidu.server.OpenViduServer; import io.openvidu.server.cdr.CDREventName; @@ -81,7 +81,8 @@ public class OpenviduConfig { public static final Set OPENVIDU_INTEGER_PROPERTIES = new HashSet<>( Arrays.asList("openvidu.recording.autostop-timeout", "openvidu.streams.video.max-recv-bandwidth", "openvidu.streams.video.min-recv-bandwidth", "openvidu.streams.video.max-send-bandwidth", - "openvidu.streams.video.min-send-bandwidth")); + "openvidu.streams.video.min-send-bandwidth", "openvidu.sessions.garbage.interval", + "openvidu.sessions.garbage.threshold")); public static final Set OPENVIDU_BOOLEAN_PROPERTIES = new HashSet<>(Arrays.asList("openvidu.cdr", "openvidu.recording", "openvidu.recording.public-access", "openvidu.webhook")); @@ -168,6 +169,12 @@ public class OpenviduConfig { @Value("${openvidu.streams.video.min-send-bandwidth}") protected int openviduStreamsVideoMinSendBandwidth; + @Value("${openvidu.sessions.garbage.interval}") + protected int openviduSessionsGarbageInterval; + + @Value("${openvidu.sessions.garbage.threshold}") + protected int openviduSessionsGarbageThreshold; + @Value("${coturn.redis.ip}") protected String coturnRedisIp; @@ -286,6 +293,14 @@ public class OpenviduConfig { return this.openviduStreamsVideoMinSendBandwidth; } + public int getSessionGarbageInterval() { + return this.openviduSessionsGarbageInterval; + } + + public int getSessionGarbageThreshold() { + return this.openviduSessionsGarbageThreshold; + } + public String getCoturnIp() { return this.coturnIp; } @@ -481,6 +496,12 @@ public class OpenviduConfig { case "openvidu.streams.video.min-send-bandwidth": checkIntegerNonNegative(parameters, parameter, admitStringified); break; + case "openvidu.sessions.garbage.interval": + checkIntegerNonNegative(parameters, parameter, admitStringified); + break; + case "openvidu.sessions.garbage.threshold": + checkIntegerNonNegative(parameters, parameter, admitStringified); + break; case "kms.uris": String kmsUris; try { diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index 38fc1b56..6b950d79 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -19,13 +19,18 @@ package io.openvidu.server.core; import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; +import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.apache.commons.lang3.RandomStringUtils; @@ -82,8 +87,8 @@ public abstract class SessionManager { public FormatChecker formatChecker = new FormatChecker(); - protected ConcurrentMap sessions = new ConcurrentHashMap<>(); - protected ConcurrentMap sessionsNotActive = new ConcurrentHashMap<>(); + final protected ConcurrentMap sessions = new ConcurrentHashMap<>(); + final protected ConcurrentMap sessionsNotActive = new ConcurrentHashMap<>(); protected ConcurrentMap> sessionidParticipantpublicidParticipant = new ConcurrentHashMap<>(); protected ConcurrentMap> sessionidFinalUsers = new ConcurrentHashMap<>(); protected ConcurrentMap> sessionidAccumulatedRecordings = new ConcurrentHashMap<>(); @@ -417,6 +422,58 @@ public abstract class SessionManager { } } + @PostConstruct + private void startSessionGarbageCollector() { + if (openviduConfig.getSessionGarbageInterval() == 0) { + log.info( + "Garbage collector for non active sessions is disabled (property 'openvidu.sessions.garbage.interval' is 0)"); + return; + } + TimerTask task = new TimerTask() { + @Override + public void run() { + // Remove all non active sessions created more than the specified time + log.info("Running non active sessions garbage collector..."); + final long currentMillis = System.currentTimeMillis(); + + // Loop through all non active sessions. Safely remove them and clean all of + // their data if their threshold has elapsed + for (Iterator> iter = sessionsNotActive.entrySet().iterator(); iter.hasNext();) { + final Session sessionNotActive = iter.next().getValue(); + final String sessionId = sessionNotActive.getSessionId(); + long sessionExistsSince = currentMillis - sessionNotActive.getStartTime(); + if (sessionExistsSince > (openviduConfig.getSessionGarbageThreshold() * 1000)) { + try { + sessionNotActive.closingLock.writeLock().lock(); + if (sessions.containsKey(sessionId)) { + // The session passed to active during lock wait + continue; + } + iter.remove(); + cleanCollections(sessionId); + log.info("Non active session {} cleaned up by garbage collector", sessionId); + } finally { + sessionNotActive.closingLock.writeLock().unlock(); + } + } + } + + // Warn about possible ghost sessions + for (Iterator> iter = sessions.entrySet().iterator(); iter.hasNext();) { + final Session sessionActive = iter.next().getValue(); + if (sessionActive.getParticipants().size() == 0) { + log.warn("Possible ghost session {}", sessionActive.getSessionId()); + } + } + } + }; + new Timer().scheduleAtFixedRate(task, openviduConfig.getSessionGarbageInterval() * 1000, + openviduConfig.getSessionGarbageInterval() * 1000); + log.info( + "Garbage collector for non active sessions initialized. Running every {} seconds and cleaning up non active Sessions more than {} seconds old", + openviduConfig.getSessionGarbageInterval(), openviduConfig.getSessionGarbageThreshold()); + } + /** * Closes an existing session by releasing all resources that were allocated for * it. Once closed, the session can be reopened (will be empty and it will use diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index fb3c2cc5..22845b1b 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -128,8 +128,7 @@ public class KurentoSession extends Session { } participant.releaseAllFilters(); - log.info("PARTICIPANT {}: Leaving session {} for reason {}", participant.getParticipantPublicId(), - this.sessionId, reason.name()); + log.info("PARTICIPANT {}: Leaving session {}", participant.getParticipantPublicId(), this.sessionId); this.removeParticipant(participant, reason); participant.close(reason, true, 0); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index fe08ffc4..f45fb374 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -82,6 +82,7 @@ public class KurentoSessionManager extends SessionManager { private KurentoParticipantEndpointConfig kurentoEndpointConfig; @Override + /* Protected by Session.closingLock.readLock */ public synchronized void joinRoom(Participant participant, String sessionId, Integer transactionId) { Set existingParticipants = null; boolean lockAcquired = false; @@ -866,6 +867,7 @@ public class KurentoSessionManager extends SessionManager { } @Override + /* Protected by Session.closingLock.readLock */ public Participant publishIpcam(Session session, MediaOptions mediaOptions, String serverMetadata) throws Exception { final String sessionId = session.getSessionId(); diff --git a/openvidu-server/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/openvidu-server/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 8a4920fb..5aef5b1e 100644 --- a/openvidu-server/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/openvidu-server/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -125,6 +125,18 @@ "description": "Minimum video bandwidth sent from OpenVidu Server to clients, in kbps. 0 means unconstrained", "defaultValue": 300 }, + { + "name": "openvidu.sessions.garbage.interval", + "type": "java.lang.Integer", + "description": "How often the garbage collector of non active sessions runs. This helps cleaning up sessions that have been initialized through REST API (and maybe tokens have been created for them) but have had no users connected. Default to 900s (15 mins). 0 to disable non active sessions garbage collector", + "defaultValue": 900 + }, + { + "name": "openvidu.sessions.garbage.threshold", + "type": "java.lang.Integer", + "description": "Minimum time in seconds that a non active session must have been in existence for the garbage collector of non active sessions to remove it. Default to 3600s (1 hour). If non active sessions garbage collector is disabled (property 'openvidu.sessions.garbage.interval' to 0) this property is ignored", + "defaultValue": 3600 + }, { "name": "coturn.ip", "type": "java.lang.String", diff --git a/openvidu-server/src/main/resources/application.properties b/openvidu-server/src/main/resources/application.properties index 2bbb2f9d..8c3e3c22 100644 --- a/openvidu-server/src/main/resources/application.properties +++ b/openvidu-server/src/main/resources/application.properties @@ -36,6 +36,9 @@ openvidu.streams.video.min-recv-bandwidth=300 openvidu.streams.video.max-send-bandwidth=1000 openvidu.streams.video.min-send-bandwidth=300 +openvidu.sessions.garbage.interval=900 +openvidu.sessions.garbage.threshold=3600 + coturn.redis.ip=127.0.0.1 coturn.redis.dbname=0 coturn.redis.password=turn diff --git a/openvidu-server/src/test/java/io/openvidu/server/test/integration/SessionGarbageCollectorIntegrationTest.java b/openvidu-server/src/test/java/io/openvidu/server/test/integration/SessionGarbageCollectorIntegrationTest.java new file mode 100644 index 00000000..954196c3 --- /dev/null +++ b/openvidu-server/src/test/java/io/openvidu/server/test/integration/SessionGarbageCollectorIntegrationTest.java @@ -0,0 +1,127 @@ +/* + * (C) Copyright 2017-2020 OpenVidu (https://openvidu.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.openvidu.server.test.integration; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.junit.Assert; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.web.WebAppConfiguration; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; + +import io.openvidu.java.client.OpenViduRole; +import io.openvidu.server.core.Participant; +import io.openvidu.server.core.SessionManager; +import io.openvidu.server.core.Token; +import io.openvidu.server.kurento.kms.KmsManager; +import io.openvidu.server.rest.SessionRestController; +import io.openvidu.server.test.integration.config.IntegrationTestConfiguration; + +/** + * @author Pablo Fuente (pablofuenteperez@gmail.com) + */ +@SpringBootTest(properties = { "openvidu.sessions.garbage.interval=1", "openvidu.sessions.garbage.threshold=1" }) +@TestPropertySource(locations = "classpath:integration-test.properties") +@ContextConfiguration(classes = { IntegrationTestConfiguration.class }) +@WebAppConfiguration +public class SessionGarbageCollectorIntegrationTest { + + private static final Logger log = LoggerFactory.getLogger(SessionGarbageCollectorIntegrationTest.class); + + @SpyBean + private KmsManager kmsManager; + + @Autowired + private SessionManager sessionManager; + + @Autowired + private SessionRestController sessionRestController; + + @Test + @DisplayName("Sessions not active garbage collector") + void garbageCollectorOfSessionsNotActiveTest() throws Exception { + + log.info("Sessions not active garbage collector"); + + JsonObject jsonResponse; + + getSessionId(); + jsonResponse = listSessions(); + + Assert.assertEquals("Wrong number of sessions", 1, jsonResponse.get("numberOfElements").getAsInt()); + + Thread.sleep(2000); + + jsonResponse = listSessions(); + Assert.assertEquals("Wrong number of sessions", 0, jsonResponse.get("numberOfElements").getAsInt()); + + getSessionId(); + getSessionId(); + String sessionId = getSessionId(); + jsonResponse = listSessions(); + Assert.assertEquals("Wrong number of sessions", 3, jsonResponse.get("numberOfElements").getAsInt()); + + String token = getToken(sessionId); + joinParticipant(sessionId, token); + + Thread.sleep(2000); + + jsonResponse = listSessions(); + Assert.assertEquals("Wrong number of sessions", 1, jsonResponse.get("numberOfElements").getAsInt()); + } + + private String getSessionId() { + String stringResponse = (String) sessionRestController.getSessionId(new HashMap<>()).getBody(); + return new Gson().fromJson(stringResponse, JsonObject.class).get("id").getAsString(); + } + + private String getToken(String sessionId) { + Map map = new HashMap<>(); + map.put("session", sessionId); + String stringResponse = (String) sessionRestController.newToken(map).getBody(); + return new Gson().fromJson(stringResponse, JsonObject.class).get("token").getAsString(); + } + + private JsonObject listSessions() { + String stringResponse = (String) sessionRestController.listSessions(false).getBody(); + return new Gson().fromJson(stringResponse, JsonObject.class); + } + + private void joinParticipant(String sessionId, String token) { + Token t = new Token(token, OpenViduRole.PUBLISHER, "SERVER_METADATA", null, null); + String uuid = UUID.randomUUID().toString(); + String participantPrivateId = "PARTICIPANT_PRIVATE_ID_" + uuid; + String finalUserId = "FINAL_USER_ID_" + uuid; + Participant participant = sessionManager.newParticipant(sessionId, participantPrivateId, t, "CLIENT_METADATA", + null, "Chrome", finalUserId); + sessionManager.joinRoom(participant, sessionId, null); + } + +} diff --git a/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java b/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java new file mode 100644 index 00000000..89d96631 --- /dev/null +++ b/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java @@ -0,0 +1,65 @@ +package io.openvidu.server.test.integration.config; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.kurento.client.Continuation; +import org.kurento.client.KurentoClient; +import org.kurento.client.MediaPipeline; +import org.kurento.client.ServerManager; +import org.mockito.Mockito; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +import io.openvidu.server.kurento.kms.FixedOneKmsManager; +import io.openvidu.server.kurento.kms.Kms; +import io.openvidu.server.kurento.kms.KmsManager; +import io.openvidu.server.kurento.kms.KmsProperties; + +/** + * KmsManager bean mock + * + * @author Pablo Fuente (pablofuenteperez@gmail.com) + */ +@TestConfiguration +public class IntegrationTestConfiguration { + + @Bean + public KmsManager kmsManager() throws Exception { + final KmsManager spy = Mockito.spy(new FixedOneKmsManager()); + doAnswer(invocation -> { + List successfullyConnectedKmss = new ArrayList<>(); + List kmsProperties = invocation.getArgument(0); + for (KmsProperties kmsProp : kmsProperties) { + Kms kms = new Kms(kmsProp, spy.getLoadManager()); + KurentoClient kClient = mock(KurentoClient.class); + + doAnswer(i -> { + Thread.sleep((long) (Math.random() * 1000)); + ((Continuation) i.getArgument(0)).onSuccess(mock(MediaPipeline.class)); + return null; + }).when(kClient).createMediaPipeline((Continuation) any()); + + ServerManager serverManagerMock = mock(ServerManager.class); + when(serverManagerMock.getCpuCount()).thenReturn(new Random().nextInt(32) + 1); + when(kClient.getServerManager()).thenReturn(serverManagerMock); + + kms.setKurentoClient(kClient); + kms.setKurentoClientConnected(true); + kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); + + spy.addKms(kms); + successfullyConnectedKmss.add(kms); + } + return successfullyConnectedKmss; + }).when(spy).initializeKurentoClients(any(List.class), any(Boolean.class), any(Boolean.class)); + return spy; + } + +} diff --git a/openvidu-server/src/test/resources/integration-test.properties b/openvidu-server/src/test/resources/integration-test.properties new file mode 100644 index 00000000..8c3e3c22 --- /dev/null +++ b/openvidu-server/src/test/resources/integration-test.properties @@ -0,0 +1,45 @@ +server.address=0.0.0.0 +server.ssl.enabled=true +server.port=4443 +server.ssl.key-store=classpath:openvidu-selfsigned.jks +server.ssl.key-store-password=openvidu +server.ssl.key-store-type=JKS +server.ssl.key-alias=openvidu-selfsigned + +logging.level.root=info +spring.main.allow-bean-definition-overriding=true + +kms.uris=["ws://localhost:8888/kurento"] + +openvidu.publicurl=local +openvidu.secret=MY_SECRET + +openvidu.cdr=false +openvidu.cdr.path=log + +openvidu.webhook=false +openvidu.webhook.endpoint= +openvidu.webhook.headers=[] +openvidu.webhook.events=["sessionCreated","sessionDestroyed","participantJoined","participantLeft","webrtcConnectionCreated","webrtcConnectionDestroyed","recordingStatusChanged","filterEventDispatched","mediaNodeStatusChanged"] + +openvidu.recording=false +openvidu.recording.version=2.9.0 +openvidu.recording.path=/opt/openvidu/recordings +openvidu.recording.public-access=false +openvidu.recording.notification=publisher_moderator +openvidu.recording.custom-layout=/opt/openvidu/custom-layout +openvidu.recording.autostop-timeout=120 +openvidu.recording.composed-url= + +openvidu.streams.video.max-recv-bandwidth=1000 +openvidu.streams.video.min-recv-bandwidth=300 +openvidu.streams.video.max-send-bandwidth=1000 +openvidu.streams.video.min-send-bandwidth=300 + +openvidu.sessions.garbage.interval=900 +openvidu.sessions.garbage.threshold=3600 + +coturn.redis.ip=127.0.0.1 +coturn.redis.dbname=0 +coturn.redis.password=turn +coturn.redis.connect-timeout=30 diff --git a/openvidu-test-e2e/jenkins/Jenkinsfile b/openvidu-test-e2e/jenkins/Jenkinsfile index aa735eb3..4a199690 100644 --- a/openvidu-test-e2e/jenkins/Jenkinsfile +++ b/openvidu-test-e2e/jenkins/Jenkinsfile @@ -52,6 +52,9 @@ node('container') { stage('OpenVidu TestApp build') { sh 'cd openvidu/openvidu-testapp && npm install --unsafe-perm && npm link openvidu-browser && npm link openvidu-node-client && export NG_CLI_ANALYTICS=ci && ./node_modules/@angular/cli/bin/ng build --prod' } + stage('OpenVidu Server integration tests') { + sh 'cd openvidu/openvidu-server && mvn --batch-mode -Dtest=*IntegrationTest test' + } stage('OpenVidu Server build') { sh 'cd openvidu/openvidu-server/src/dashboard && npm install --unsafe-perm && npm link openvidu-browser && export NG_CLI_ANALYTICS=ci && ./node_modules/@angular/cli/bin/ng build --prod --output-path ../main/resources/static' sh 'cd openvidu/openvidu-server && mvn --batch-mode clean compile package'