From 9b599ebb6ac3e0cc7152114caa456617cced15f5 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Sat, 30 Oct 2021 19:28:30 +0200 Subject: [PATCH] openvidu-test-e2e: mediaServerReconnect tests --- .../io/openvidu/server/core/EndReason.java | 44 +- .../server/kurento/core/KurentoSession.java | 71 +-- .../recording/service/RecordingManager.java | 6 +- .../e2e/AbstractOpenViduTestAppE2eTest.java | 78 ++- .../test/e2e/OpenViduTestAppE2eTest.java | 460 +++++++++++++----- 5 files changed, 466 insertions(+), 193 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/EndReason.java b/openvidu-server/src/main/java/io/openvidu/server/core/EndReason.java index f4b8cc5f..1c087465 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/EndReason.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/EndReason.java @@ -21,18 +21,18 @@ public enum EndReason { /** * A user called the RPC operation to unsubscribe from a remote stream. Applies - * to webrtcConnectionDestroyed + * to event webrtcConnectionDestroyed */ unsubscribe, /** - * A user called the RPC operation to unpublish a local stream. Applies to + * A user called the RPC operation to unpublish a local stream. Applies to event * webrtcConnectionDestroyed */ unpublish, /** - * A user called the RPC operation to leave the session. Applies to + * A user called the RPC operation to leave the session. Applies to events * webrtcConnectionDestroyed and participantLeft. Can trigger other events with * lastParticipantLeft */ @@ -40,49 +40,49 @@ public enum EndReason { /** * A user called the RPC operation to force the unpublishing of a remote stream. - * Applies to webrtcConnectionDestroyed + * Applies to event webrtcConnectionDestroyed */ forceUnpublishByUser, /** * The server application called the REST operation to force the unpublishing of - * a user's stream. Applies to webrtcConnectionDestroyed + * a user's stream. Applies to event webrtcConnectionDestroyed */ forceUnpublishByServer, /** * A user called the RPC operation to force the disconnection of a remote user. - * Applies to webrtcConnectionDestroyed and participantLeft. Can trigger other - * events with lastParticipantLeft + * Applies to events webrtcConnectionDestroyed and participantLeft. Can trigger + * other events with lastParticipantLeft */ forceDisconnectByUser, /** * The server application called the REST operation to force the disconnection - * of a user. Applies to webrtcConnectionDestroyed and participantLeft. Can - * trigger other events with lastParticipantLeft + * of a user. Applies to events webrtcConnectionDestroyed and participantLeft. + * Can trigger other events with lastParticipantLeft */ forceDisconnectByServer, /** * The last participant left the session, which caused the session to be closed. - * Applies to webrtcConnectionDestroyed, participantLeft, recordingStatusChanged - * and sessionDestroyed. Can be triggered from other events with other end - * reasons (disconnect, forceDisconnectByUser, forceDisconnectByServer, - * networkDisconnect) + * Applies to events webrtcConnectionDestroyed, participantLeft, + * recordingStatusChanged and sessionDestroyed. Can be triggered from other + * events with other end reasons (disconnect, forceDisconnectByUser, + * forceDisconnectByServer, networkDisconnect) */ lastParticipantLeft, /** * The server application called the REST operation to stop a recording. Applies - * to recordingStatusChanged + * to event recordingStatusChanged */ recordingStoppedByServer, /** * The server application called the REST operation to close a session. Applies - * to webrtcConnectionDestroyed, participantLeft, recordingStatusChanged and - * sessionDestroyed + * to events webrtcConnectionDestroyed, participantLeft, recordingStatusChanged + * and sessionDestroyed */ sessionClosedByServer, @@ -95,7 +95,7 @@ public enum EndReason { /** * A media server disconnected. This is reserved for Media Nodes being - * gracefully removed from an OpenVidu Pro cluster. Applies to + * gracefully removed from an OpenVidu Pro cluster. Applies to events * webrtcConnectionDestroyed, participantLeft, recordingStatusChanged and * sessionDestroyed */ @@ -103,29 +103,29 @@ public enum EndReason { /** * A media server disconnected, and a new one automatically reconnected. All of - * the media endpoints were destroyed in the process. Applies to + * the media endpoints were destroyed in the process. Applies to events * webrtcConnectionDestroyed and recordingStatusChanged */ mediaServerReconnect, /** * A node has crashed. For now this means a Media Node has crashed. Applies to - * webrtcConnectionDestroyed, participantLeft, recordingStatusChanged and + * events webrtcConnectionDestroyed, participantLeft, recordingStatusChanged and * sessionDestroyed */ nodeCrashed, /** * OpenVidu Server has gracefully stopped. This is reserved for OpenVidu Pro - * restart operation. Applies to webrtcConnectionDestroyed, participantLeft, - * recordingStatusChanged and sessionDestroyed + * restart operation. Applies to events webrtcConnectionDestroyed, + * participantLeft, recordingStatusChanged and sessionDestroyed */ openviduServerStopped, /** * A recording has been stopped automatically * (https://docs.openvidu.io/en/stable/advanced-features/recording/#automatic-stop-of-recordings). - * Applies to recordingStatusChanged + * Applies to event recordingStatusChanged */ automaticStop diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index 4b5f5cd4..223855bf 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -246,6 +246,15 @@ public class KurentoSession extends Session { } } + private void resetPipeline(Runnable callback) { + pipeline = null; + pipelineLatch = new CountDownLatch(1); + pipelineCreationErrorCause = null; + if (callback != null) { + callback.run(); + } + } + private void closePipeline(Runnable callback) { synchronized (pipelineReleaseLock) { if (pipeline == null) { @@ -260,25 +269,17 @@ public class KurentoSession extends Session { @Override public void onSuccess(Void result) throws Exception { log.debug("SESSION {}: Released Pipeline", sessionId); - pipeline = null; - pipelineLatch = new CountDownLatch(1); - pipelineCreationErrorCause = null; - if (callback != null) { - callback.run(); - } + resetPipeline(callback); } @Override public void onError(Throwable cause) throws Exception { log.warn("SESSION {}: Could not successfully release Pipeline", sessionId, cause); - pipeline = null; - pipelineLatch = new CountDownLatch(1); - pipelineCreationErrorCause = null; - if (callback != null) { - callback.run(); - } + resetPipeline(callback); } }); + } else { + resetPipeline(callback); } } } @@ -330,27 +331,33 @@ public class KurentoSession extends Session { // Release pipeline, create a new one and prepare new PublisherEndpoints for // allowed users log.info("Resetting process: closing media pipeline for active session {}", this.sessionId); - this.closePipeline(() -> { - log.info("Resetting process: media pipeline closed for active session {}", this.sessionId); - createPipeline(); - try { - if (!pipelineLatch.await(20, TimeUnit.SECONDS)) { - throw new Exception("MediaPipeline was not created in 20 seconds"); - } - getParticipants().forEach(p -> { - if (!OpenViduRole.SUBSCRIBER.equals(p.getToken().getRole())) { - - ((KurentoParticipant) p).resetPublisherEndpoint(mediaOptionsMap.get(p.getParticipantPublicId()), - null); + try { + RemoteOperationUtils.setToSkipRemoteOperations(); + this.closePipeline(() -> { + RemoteOperationUtils.revertToRunRemoteOperations(); + log.info("Resetting process: media pipeline closed for active session {}", this.sessionId); + createPipeline(); + try { + if (!pipelineLatch.await(20, TimeUnit.SECONDS)) { + throw new Exception("MediaPipeline was not created in 20 seconds"); } - }); - log.info( - "Resetting process: media pipeline created and publisher endpoints reseted for active session {}", - this.sessionId); - } catch (Exception e) { - log.error("Error waiting to new MediaPipeline on KurentoSession restart: {}", e.getMessage()); - } - }); + getParticipants().forEach(p -> { + if (!OpenViduRole.SUBSCRIBER.equals(p.getToken().getRole())) { + + ((KurentoParticipant) p) + .resetPublisherEndpoint(mediaOptionsMap.get(p.getParticipantPublicId()), null); + } + }); + log.info( + "Resetting process: media pipeline created and publisher endpoints reseted for active session {}", + this.sessionId); + } catch (Exception e) { + log.error("Error waiting to new MediaPipeline on KurentoSession restart: {}", e.getMessage()); + } + }); + } finally { + RemoteOperationUtils.revertToRunRemoteOperations(); + } } @Override diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index b454dade..ed7d4c2d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -362,7 +362,6 @@ public class RecordingManager { } ((RecordingService) singleStreamRecordingService).sealRecordingMetadataFileAsStopped(recording); - final long timestamp = System.currentTimeMillis(); this.cdr.recordRecordingStatusChanged(recording, reason, timestamp, Status.stopped); @@ -384,6 +383,11 @@ public class RecordingManager { public Recording forceStopRecording(Session session, EndReason reason, Long kmsDisconnectionTime) { Recording recording; recording = this.sessionsRecordings.get(session.getSessionId()); + + ((RecordingService) singleStreamRecordingService).sealRecordingMetadataFileAsStopped(recording); + final long timestamp = System.currentTimeMillis(); + this.cdr.recordRecordingStatusChanged(recording, reason, timestamp, Status.stopped); + switch (recording.getOutputMode()) { case COMPOSED: recording = this.composedRecordingService.stopRecording(session, recording, reason, kmsDisconnectionTime); diff --git a/openvidu-test-e2e/src/test/java/io/openvidu/test/e2e/AbstractOpenViduTestAppE2eTest.java b/openvidu-test-e2e/src/test/java/io/openvidu/test/e2e/AbstractOpenViduTestAppE2eTest.java index 862f6dd1..5ffdc3f8 100644 --- a/openvidu-test-e2e/src/test/java/io/openvidu/test/e2e/AbstractOpenViduTestAppE2eTest.java +++ b/openvidu-test-e2e/src/test/java/io/openvidu/test/e2e/AbstractOpenViduTestAppE2eTest.java @@ -253,12 +253,26 @@ public class AbstractOpenViduTestAppE2eTest { other.dispose(); it.remove(); } + this.closeAllSessions(OV); + if (isRecordingTest) { + deleteAllRecordings(OV); + isRecordingTest = false; + } + if (isKurentoRestartTest) { + this.stopMediaServer(false); + this.startMediaServer(true); + isKurentoRestartTest = false; + } + OV = new OpenVidu(OPENVIDU_URL, OPENVIDU_SECRET); + } + + protected void closeAllSessions(OpenVidu client) { try { - OV.fetch(); + client.fetch(); } catch (OpenViduJavaClientException | OpenViduHttpException e1) { log.error("Error fetching sessions: {}", e1.getMessage()); } - OV.getActiveSessions().forEach(session -> { + client.getActiveSessions().forEach(session -> { try { session.close(); log.info("Session {} successfully closed", session.getSessionId()); @@ -268,20 +282,29 @@ public class AbstractOpenViduTestAppE2eTest { log.error("Error closing session: {}", e.getMessage()); } }); - if (isRecordingTest) { - removeAllRecordingContiners(); - try { - FileUtils.cleanDirectory(new File("/opt/openvidu/recordings")); - } catch (IOException e) { - log.error(e.getMessage()); - } - isRecordingTest = false; + } + + protected void deleteAllRecordings(OpenVidu client) { + try { + client.listRecordings().forEach(recording -> { + try { + client.deleteRecording(recording.getId()); + log.info("Recording {} successfully deleted", recording.getId()); + } catch (OpenViduJavaClientException e) { + log.error("Error deleting recording: {}", e.getMessage()); + } catch (OpenViduHttpException e) { + log.error("Error deleting recording: {}", e.getMessage()); + } + }); + } catch (OpenViduJavaClientException | OpenViduHttpException e) { + log.error("Error listing recordings: {}", e.getMessage()); } - if (isKurentoRestartTest) { - this.restartMediaServer(); - isKurentoRestartTest = false; + removeAllRecordingContiners(); + try { + FileUtils.cleanDirectory(new File("/opt/openvidu/recordings")); + } catch (IOException e) { + log.error(e.getMessage()); } - OV = new OpenVidu(OPENVIDU_URL, OPENVIDU_SECRET); } protected void listEmptyRecordings() { @@ -318,7 +341,7 @@ public class AbstractOpenViduTestAppE2eTest { return "data:image/png;base64," + screenshotBase64; } - protected void startMediaServer() { + protected void startMediaServer(boolean waitUntilKurentoClientReconnection) { String command = null; if (MEDIA_SERVER_IMAGE.startsWith(KURENTO_IMAGE)) { log.info("Starting kurento"); @@ -336,9 +359,16 @@ public class AbstractOpenViduTestAppE2eTest { System.exit(1); } commandLine.executeCommand(command); + if (waitUntilKurentoClientReconnection) { + try { + Thread.sleep(4000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } - protected void stopMediaServer() { + protected void stopMediaServer(boolean waitUntilNodeCrashedEvent) { final String dockerRemoveCmd = "docker ps -a | awk '{ print $1,$2 }' | grep GREP_PARAMETER | awk '{ print $1 }' | xargs -I {} docker rm -f {}"; String grep = null; if (MEDIA_SERVER_IMAGE.startsWith(KURENTO_IMAGE)) { @@ -352,17 +382,13 @@ public class AbstractOpenViduTestAppE2eTest { System.exit(1); } commandLine.executeCommand(dockerRemoveCmd.replaceFirst("GREP_PARAMETER", grep)); - } - - protected void restartMediaServer() { - this.stopMediaServer(); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - System.err.println("Error restarting media server"); - e.printStackTrace(); + if (waitUntilNodeCrashedEvent) { + try { + Thread.sleep(4000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } - this.startMediaServer(); } protected void checkDockerContainerRunning(String imageName, int amount) { diff --git a/openvidu-test-e2e/src/test/java/io/openvidu/test/e2e/OpenViduTestAppE2eTest.java b/openvidu-test-e2e/src/test/java/io/openvidu/test/e2e/OpenViduTestAppE2eTest.java index ebb27a21..cfaba309 100644 --- a/openvidu-test-e2e/src/test/java/io/openvidu/test/e2e/OpenViduTestAppE2eTest.java +++ b/openvidu-test-e2e/src/test/java/io/openvidu/test/e2e/OpenViduTestAppE2eTest.java @@ -20,6 +20,9 @@ package io.openvidu.test.e2e; import static org.junit.Assert.fail; import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import java.util.Base64; @@ -3277,148 +3280,352 @@ public class OpenViduTestAppE2eTest extends AbstractOpenViduTestAppE2eTest { } @Test - @DisplayName("Kurento reconnect test") - void kurentoReconnectTest() throws Exception { + @DisplayName("Media server reconnect no active session test") + void mediaServerReconnectNoActiveSessionTest() throws Exception { + isKurentoRestartTest = true; + + log.info("Media server reconnect no active session test"); + + CountDownLatch initLatch = new CountDownLatch(1); + io.openvidu.test.browsers.utils.webhook.CustomWebhook.main(new String[0], initLatch); + + try { + + if (!initLatch.await(30, TimeUnit.SECONDS)) { + Assert.fail("Timeout waiting for webhook springboot app to start"); + CustomWebhook.shutDown(); + return; + } + + // TOTAL DISCONNECTION + // Session should not be affected + String customSessionId = "RECONNECTION_SESSION"; + OV.createSession(new SessionProperties.Builder().customSessionId(customSessionId).build()); + CustomWebhook.waitForEvent("sessionCreated", 2); + OV.fetch(); + List sessions = OV.getActiveSessions(); + Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size()); + this.stopMediaServer(true); + OV.fetch(); + sessions = OV.getActiveSessions(); + Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size()); + + // NO MEDIA SERVER + // Session should be created + OV.createSession(); + CustomWebhook.waitForEvent("sessionCreated", 2); + OV.fetch(); + sessions = OV.getActiveSessions(); + Assert.assertEquals("Expected 2 active sessions but found " + sessions.size(), 2, sessions.size()); + + // RECONNECTION + // Session should not be affected + this.startMediaServer(true); + OV.fetch(); + sessions = OV.getActiveSessions(); + Assert.assertEquals("Expected 2 active sessions but found " + sessions.size(), 2, sessions.size()); + this.closeAllSessions(OV); + OV.fetch(); + sessions = OV.getActiveSessions(); + Assert.assertEquals("Expected no active sessions but found " + sessions.size(), 0, sessions.size()); + + } finally { + CustomWebhook.shutDown(); + } + } + + @Test + @DisplayName("Media server reconnect active session no streams test") + void mediaServerReconnectActiveSessionNoSteamsTest() throws Exception { + isKurentoRestartTest = true; + + log.info("Media server reconnect active session no streams test"); + + CountDownLatch initLatch = new CountDownLatch(1); + io.openvidu.test.browsers.utils.webhook.CustomWebhook.main(new String[0], initLatch); + + try { + + if (!initLatch.await(30, TimeUnit.SECONDS)) { + Assert.fail("Timeout waiting for webhook springboot app to start"); + CustomWebhook.shutDown(); + return; + } + + setupBrowser("chrome"); + + // TOTAL DISCONNECTION + // Session should be destroyed with reason nodeCrashed + user.getDriver().findElement(By.id("add-user-btn")).click(); + user.getDriver().findElements(By.className("publish-checkbox")).forEach(el -> el.click()); + user.getDriver().findElement(By.className("join-btn")).click(); + + user.getEventManager().waitUntilEventReaches("connectionCreated", 1); + CustomWebhook.waitForEvent("sessionCreated", 2); + CustomWebhook.waitForEvent("participantJoined", 2); + + OV.fetch(); + List sessions = OV.getActiveSessions(); + Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size()); + Assert.assertEquals( + "Expected 1 active connection but found " + sessions.get(0).getActiveConnections().size(), 1, + sessions.get(0).getActiveConnections().size()); + + this.stopMediaServer(true); + + user.getEventManager().waitUntilEventReaches("sessionDisconnected", 1); + JsonObject event = CustomWebhook.waitForEvent("sessionDestroyed", 2); + Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString()); + + OV.fetch(); + sessions = OV.getActiveSessions(); + Assert.assertEquals("Expected no active sessions but found " + sessions.size(), 0, sessions.size()); + user.getDriver().findElement(By.id("remove-user-btn")).sendKeys(Keys.ENTER); + + // NO MEDIA SERVER + // Session should be created, but client's operation joinRoom should fail + user.getDriver().findElement(By.id("add-user-btn")).click(); + user.getDriver().findElement(By.className("publish-checkbox")).click(); + user.getDriver().findElement(By.className("subscribe-checkbox")).click(); + user.getDriver().findElement(By.className("join-btn")).click(); + + user.getWaiter().until(ExpectedConditions.alertIsPresent()); + Alert alert = user.getDriver().switchTo().alert(); + + final String alertMessage = "Error connecting to the session: There is no available Media Node where to initialize session 'TestSession'. Code: 204"; + Assert.assertTrue("Alert message wrong. Expected to contain: \"" + alertMessage + "\". Actual message: \"" + + alert.getText() + "\"", alert.getText().contains(alertMessage)); + alert.accept(); + + OV.fetch(); + sessions = OV.getActiveSessions(); + Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size()); + + user.getDriver().findElement(By.id("remove-user-btn")).sendKeys(Keys.ENTER); + this.closeAllSessions(OV); + CustomWebhook.waitForEvent("sessionDestroyed", 2); + + // RECONNECTION + // Nothing should happen as long as there were no streams while reconnecting + // A publisher should be able to publish normally after media server reconnected + this.startMediaServer(true); + + user.getDriver().findElement(By.id("add-user-btn")).click(); + user.getDriver().findElements(By.className("publish-checkbox")).forEach(el -> el.click()); + user.getDriver().findElement(By.className("join-btn")).click(); + + user.getEventManager().waitUntilEventReaches("connectionCreated", 1); + CustomWebhook.waitForEvent("sessionCreated", 2); + CustomWebhook.waitForEvent("participantJoined", 2); + + OV.fetch(); + sessions = OV.getActiveSessions(); + Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size()); + Assert.assertEquals( + "Expected 1 active connection but found " + sessions.get(0).getActiveConnections().size(), 1, + sessions.get(0).getActiveConnections().size()); + + this.stopMediaServer(false); + this.startMediaServer(true); + + user.getDriver().findElement(By.id("add-user-btn")).click(); + user.getDriver().findElement(By.cssSelector("#openvidu-instance-1 .join-btn")).click(); + user.getEventManager().waitUntilEventReaches("streamCreated", 2); + user.getEventManager().waitUntilEventReaches("streamPlaying", 2); + + this.closeAllSessions(OV); + CustomWebhook.waitForEvent("sessionDestroyed", 2); + user.getDriver().findElement(By.id("remove-all-users-btn")).click(); + user.getEventManager().clearAllCurrentEvents(); + CustomWebhook.clean(); + + } finally { + CustomWebhook.shutDown(); + } + } + + @Test + @DisplayName("Media server reconnect active session with streams test") + void mediaServerReconnectActiveSessionWithStreamsTest() throws Exception { isRecordingTest = true; isKurentoRestartTest = true; - log.info("Kurento reconnect test"); + log.info("Media server reconnect active session with streams test"); - OV.fetch(); - List sessions = OV.getActiveSessions(); - Assert.assertEquals("Expected no active sessions but found " + sessions.size(), 0, sessions.size()); + CountDownLatch initLatch = new CountDownLatch(1); + io.openvidu.test.browsers.utils.webhook.CustomWebhook.main(new String[0], initLatch); - this.stopMediaServer(); + try { - OV.fetch(); + if (!initLatch.await(30, TimeUnit.SECONDS)) { + Assert.fail("Timeout waiting for webhook springboot app to start"); + CustomWebhook.shutDown(); + return; + } - setupBrowser("chromeAsRoot"); + setupBrowser("chrome"); - // Connect one publisher with no connection to KMS. Session should fail to be - // created and an alert message should be displayed on the browser + // TOTAL DISCONNECTION + // Streams, Connections and Session should be destroyed with reason + // "nodeCrashed", with no possible recovery. INDIVIDUAL recording should be + // properly closed and available with reason "nodeCrashed" + user.getDriver().findElement(By.id("add-user-btn")).click(); + user.getDriver().findElement(By.id("add-user-btn")).click(); + user.getDriver().findElement(By.cssSelector("#openvidu-instance-1 .publish-checkbox")).click(); + user.getDriver().findElements(By.className("join-btn")).forEach(el -> el.click()); - user.getDriver().findElement(By.id("add-user-btn")).click(); - user.getDriver().findElement(By.className("join-btn")).click(); + user.getEventManager().clearAllCurrentEvents(); + user.getEventManager().waitUntilEventReaches("streamPlaying", 2); + CustomWebhook.waitForEvent("webrtcConnectionCreated", 5); + CustomWebhook.waitForEvent("webrtcConnectionCreated", 1); - user.getWaiter().until(ExpectedConditions.alertIsPresent()); - Alert alert = user.getDriver().switchTo().alert(); + OV.fetch(); + List sessions = OV.getActiveSessions(); + Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size()); + Session session = sessions.get(0); + String streamId = session.getActiveConnections().stream().filter(c -> c.getPublishers().size() > 0) + .findFirst().get().getPublishers().get(0).getStreamId(); - final String alertMessage = "Error connecting to the session: There is no available Media Node where to initialize session 'TestSession'. Code: 204"; - Assert.assertTrue("Alert message wrong. Expected to contain: \"" + alertMessage + "\". Actual message: \"" - + alert.getText() + "\"", alert.getText().contains(alertMessage)); - alert.accept(); + OV.startRecording(session.getSessionId(), + new RecordingProperties.Builder().outputMode(OutputMode.INDIVIDUAL).build()); + user.getEventManager().waitUntilEventReaches("recordingStarted", 2); - user.getDriver().findElement(By.id("remove-user-btn")).sendKeys(Keys.ENTER); + waitUntilFileExistsAndIsBiggerThan("/opt/openvidu/recordings/TestSession/" + streamId + ".webm", 400, 25); - this.startMediaServer(); - Thread.sleep(5000); + final CountDownLatch latch = new CountDownLatch(2); - // Connect one subscriber-only user with connection to KMS -> restart KMS -> -> - // check the session is still OK -> connect a publisher -> restart KMS -> check - // streamDestroyed events + user.getEventManager().on("sessionDisconnected", (ev) -> { + Assert.assertEquals("Expected 'sessionDisconnected' reason 'nodeCrashed'", "nodeCrashed", + ev.get("reason").getAsString()); + latch.countDown(); + }); - user.getDriver().findElement(By.id("add-user-btn")).click(); - user.getDriver().findElements(By.className("publish-checkbox")).forEach(el -> el.click()); - user.getDriver().findElement(By.className("join-btn")).click(); + user.getEventManager().clearAllCurrentEvents(); + CustomWebhook.clean(); + this.stopMediaServer(false); - user.getEventManager().waitUntilEventReaches("connectionCreated", 1); + user.getEventManager().waitUntilEventReaches("sessionDisconnected", 2); + if (!latch.await(6000, TimeUnit.MILLISECONDS)) { + gracefullyLeaveParticipants(2); + fail("Waiting for 2 sessionDisconnected events with reason 'nodeCrashed' to happen in total"); + return; + } + user.getEventManager().off("sessionDisconnected"); - OV.fetch(); - sessions = OV.getActiveSessions(); - Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size()); + JsonObject event = CustomWebhook.waitForEvent("webrtcConnectionDestroyed", 1); + Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString()); + event = CustomWebhook.waitForEvent("webrtcConnectionDestroyed", 1); + Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString()); + event = CustomWebhook.waitForEvent("participantLeft", 1); + Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString()); + event = CustomWebhook.waitForEvent("participantLeft", 1); + Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString()); + event = CustomWebhook.waitForEvent("recordingStatusChanged", 1); + Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString()); + event = CustomWebhook.waitForEvent("recordingStatusChanged", 1); + Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString()); + event = CustomWebhook.waitForEvent("sessionDestroyed", 1); + Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString()); - this.restartMediaServer(); - Thread.sleep(5000); + Recording rec = OV.getRecording("TestSession"); + Assert.assertTrue("Recording duration is 0", rec.getDuration() > 0); + Assert.assertTrue("Recording size is 0", rec.getSize() > 0); - OV.fetch(); - sessions = OV.getActiveSessions(); - Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size()); + this.recordingUtils.checkIndividualRecording("/opt/openvidu/recordings/TestSession/", rec, 1, "opus", "vp8", + true); - user.getDriver().findElement(By.id("add-user-btn")).click(); - user.getDriver().findElement(By.cssSelector("#openvidu-instance-1 .join-btn")).click(); + user.getDriver().findElement(By.id("remove-all-users-btn")).click(); + user.getEventManager().clearAllCurrentEvents(); + this.closeAllSessions(OV); + deleteAllRecordings(OV); + CustomWebhook.clean(); - user.getEventManager().waitUntilEventReaches("connectionCreated", 4); - user.getEventManager().waitUntilEventReaches("accessAllowed", 1); - user.getEventManager().waitUntilEventReaches("streamCreated", 2); - user.getEventManager().waitUntilEventReaches("streamPlaying", 2); + this.startMediaServer(true); - final int numberOfVideos = user.getDriver().findElements(By.tagName("video")).size(); - Assert.assertEquals("Expected 2 videos but found " + numberOfVideos, 2, numberOfVideos); - Assert.assertTrue("Videos were expected to have audio and video tracks", user.getEventManager() - .assertMediaTracks(user.getDriver().findElements(By.tagName("video")), true, true)); + // RECONNECTION + // Streams should be destroyed with reason "mediaServerReconnect", but + // Connections and Session should remain alive. INDIVIDUAL recording should be + // properly closed and available with reason "mediaServerReconnect" + user.getDriver().findElement(By.id("add-user-btn")).click(); + user.getDriver().findElement(By.id("add-user-btn")).click(); + user.getDriver().findElement(By.cssSelector("#openvidu-instance-1 .publish-checkbox")).click(); + user.getDriver().findElements(By.className("join-btn")).forEach(el -> el.click()); - OV.fetch(); - Session session = OV.getActiveSessions().get(0); - Assert.assertEquals("Expected 2 active connections but found " + session.getActiveConnections(), 2, - session.getActiveConnections().size()); - int pubs = session.getActiveConnections().stream().mapToInt(con -> con.getPublishers().size()).sum(); - int subs = session.getActiveConnections().stream().mapToInt(con -> con.getSubscribers().size()).sum(); - Assert.assertEquals("Expected 1 active publisher but found " + pubs, 1, pubs); - Assert.assertEquals("Expected 1 active subscriber but found " + subs, 1, subs); + user.getEventManager().waitUntilEventReaches("streamPlaying", 2); + CustomWebhook.waitForEvent("webrtcConnectionCreated", 5); + CustomWebhook.waitForEvent("webrtcConnectionCreated", 1); - OV.startRecording(session.getSessionId(), - new RecordingProperties.Builder().outputMode(OutputMode.INDIVIDUAL).build()); - user.getEventManager().waitUntilEventReaches("recordingStarted", 2); + OV.fetch(); + sessions = OV.getActiveSessions(); + Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size()); + session = sessions.get(0); + streamId = session.getActiveConnections().stream().filter(c -> c.getPublishers().size() > 0).findFirst() + .get().getPublishers().get(0).getStreamId(); - Thread.sleep(5000); + OV.startRecording(session.getSessionId(), + new RecordingProperties.Builder().outputMode(OutputMode.INDIVIDUAL).build()); + user.getEventManager().waitUntilEventReaches("recordingStarted", 2); - final CountDownLatch latch = new CountDownLatch(4); + waitUntilFileExistsAndIsBiggerThan("/opt/openvidu/recordings/TestSession/" + streamId + ".webm", 400, 25); - user.getEventManager().on("recordingStopped", (event) -> { - String reason = event.get("reason").getAsString(); - Assert.assertEquals("Expected 'recordingStopped' reason 'mediaServerReconnect'", "mediaServerReconnect", - reason); - latch.countDown(); - }); - user.getEventManager().on("streamDestroyed", (event) -> { - String reason = event.get("reason").getAsString(); - Assert.assertEquals("Expected 'streamDestroyed' reason 'mediaServerReconnect'", "mediaServerReconnect", - reason); - latch.countDown(); - }); + final CountDownLatch latch2 = new CountDownLatch(4); - this.restartMediaServer(); + user.getEventManager().on("recordingStopped", (ev) -> { + Assert.assertEquals("Expected 'recordingStopped' reason 'mediaServerReconnect'", "mediaServerReconnect", + ev.get("reason").getAsString()); + latch2.countDown(); + }); + user.getEventManager().on("streamDestroyed", (ev) -> { + Assert.assertEquals("Expected 'streamDestroyed' reason 'mediaServerReconnect'", "mediaServerReconnect", + ev.get("reason").getAsString()); + latch2.countDown(); + }); - user.getEventManager().waitUntilEventReaches("recordingStopped", 2); - user.getEventManager().waitUntilEventReaches("streamDestroyed", 2); - if (!latch.await(5000, TimeUnit.MILLISECONDS)) { - gracefullyLeaveParticipants(2); - fail("Waiting for 2 recordingStopped and 2 streamDestroyed events with reason 'mediaServerReconnect' to happen in total"); - return; + user.getEventManager().clearAllCurrentEvents(); + CustomWebhook.clean(); + this.stopMediaServer(false); + this.startMediaServer(false); + + user.getEventManager().waitUntilEventReaches("recordingStopped", 2); + user.getEventManager().waitUntilEventReaches("streamDestroyed", 2); + if (!latch2.await(6000, TimeUnit.MILLISECONDS)) { + gracefullyLeaveParticipants(2); + fail("Waiting for 2 recordingStopped and 2 streamDestroyed events with reason 'mediaServerReconnect' to happen in total"); + return; + } + user.getEventManager().off("recordingStopped"); + user.getEventManager().off("streamDestroyed"); + + event = CustomWebhook.waitForEvent("webrtcConnectionDestroyed", 1); + Assert.assertEquals("Wrong reason in webhook event", "mediaServerReconnect", + event.get("reason").getAsString()); + event = CustomWebhook.waitForEvent("webrtcConnectionDestroyed", 1); + Assert.assertEquals("Wrong reason in webhook event", "mediaServerReconnect", + event.get("reason").getAsString()); + event = CustomWebhook.waitForEvent("recordingStatusChanged", 1); + Assert.assertEquals("Wrong reason in webhook event", "mediaServerReconnect", + event.get("reason").getAsString()); + event = CustomWebhook.waitForEvent("recordingStatusChanged", 1); + Assert.assertEquals("Wrong reason in webhook event", "mediaServerReconnect", + event.get("reason").getAsString()); + + rec = OV.getRecording("TestSession"); + Assert.assertTrue("Recording duration is 0", rec.getDuration() > 0); + Assert.assertTrue("Recording size is 0", rec.getSize() > 0); + + this.recordingUtils.checkIndividualRecording("/opt/openvidu/recordings/TestSession/", rec, 1, "opus", "vp8", + true); + + OV.fetch(); + sessions = OV.getActiveSessions(); + Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size()); + session = sessions.get(0); + Assert.assertEquals("Expected 2 active connections but found " + session.getActiveConnections(), 2, + session.getActiveConnections().size()); + + } finally { + CustomWebhook.shutDown(); } - user.getEventManager().off("recordingStopped"); - user.getEventManager().off("streamDestroyed"); - - session.fetch(); - Assert.assertEquals("Expected 2 active connections but found " + session.getActiveConnections(), 2, - session.getActiveConnections().size()); - pubs = session.getActiveConnections().stream().mapToInt(con -> con.getPublishers().size()).sum(); - subs = session.getActiveConnections().stream().mapToInt(con -> con.getSubscribers().size()).sum(); - Assert.assertEquals("Expected no active publishers but found " + pubs, 0, pubs); - Assert.assertEquals("Expected no active subscribers but found " + subs, 0, subs); - - Recording rec = OV.getRecording("TestSession"); - Assert.assertTrue("Recording duration is 0", rec.getDuration() > 0); - Assert.assertTrue("Recording size is 0", rec.getSize() > 0); - - this.recordingUtils.checkIndividualRecording("/opt/openvidu/recordings/TestSession/", rec, 1, "opus", "vp8", - true); - - WebElement pubBtn = user.getDriver().findElements(By.cssSelector("#openvidu-instance-1 .pub-btn")).get(0); - pubBtn.click(); - pubBtn.click(); - user.getEventManager().waitUntilEventReaches("streamCreated", 4); - user.getEventManager().waitUntilEventReaches("streamPlaying", 4); - - session.fetch(); - Assert.assertEquals("Expected 2 active connections but found " + session.getActiveConnections(), 2, - session.getActiveConnections().size()); - pubs = session.getActiveConnections().stream().mapToInt(con -> con.getPublishers().size()).sum(); - subs = session.getActiveConnections().stream().mapToInt(con -> con.getSubscribers().size()).sum(); - Assert.assertEquals("Expected 1 active publisher but found " + pubs, 1, pubs); - Assert.assertEquals("Expected 1 active subscriber but found " + subs, 1, subs); - - gracefullyLeaveParticipants(2); } @Test @@ -4440,4 +4647,33 @@ public class OpenViduTestAppE2eTest extends AbstractOpenViduTestAppE2eTest { sessionVP9AllowTranscoding.getProperties().isTranscodingAllowed()); } + private void waitUntilFileExistsAndIsBiggerThan(String absolutePath, int kbs, int maxSecondsWait) throws Exception { + int interval = 100; + int maxLoops = (maxSecondsWait * 1000) / interval; + int loop = 0; + long bytes = 0; + + boolean bigger = false; + Path path = Paths.get(absolutePath); + + while (!bigger && loop < maxLoops) { + bigger = Files.exists(path) && Files.isReadable(path); + if (bigger) { + try { + bytes = Files.size(path); + } catch (IOException e) { + System.err.println("Error getting file size from " + path + ": " + e.getMessage()); + } + bigger = (bytes / 1024) > kbs; + } + loop++; + Thread.sleep(interval); + } + + if (!bigger && loop >= maxLoops) { + throw new Exception("File " + absolutePath + " did not reach a size of at least " + kbs + " KBs in " + + maxSecondsWait + " seconds. Last check was " + bytes + " KBs"); + } + } + }