openvidu-test-e2e: mediaServerReconnect tests

pull/658/head
pabloFuente 2021-10-30 19:28:30 +02:00
parent 82115a7ce8
commit 9b599ebb6a
5 changed files with 466 additions and 193 deletions

View File

@ -21,18 +21,18 @@ public enum EndReason {
/** /**
* A user called the RPC operation to unsubscribe from a remote stream. Applies * A user called the RPC operation to unsubscribe from a remote stream. Applies
* to webrtcConnectionDestroyed * to event webrtcConnectionDestroyed
*/ */
unsubscribe, unsubscribe,
/** /**
* A user called the RPC operation to unpublish a local stream. Applies to * A user called the RPC operation to unpublish a local stream. Applies to event
* webrtcConnectionDestroyed * webrtcConnectionDestroyed
*/ */
unpublish, unpublish,
/** /**
* A user called the RPC operation to leave the session. Applies to * A user called the RPC operation to leave the session. Applies to events
* webrtcConnectionDestroyed and participantLeft. Can trigger other events with * webrtcConnectionDestroyed and participantLeft. Can trigger other events with
* lastParticipantLeft * lastParticipantLeft
*/ */
@ -40,49 +40,49 @@ public enum EndReason {
/** /**
* A user called the RPC operation to force the unpublishing of a remote stream. * A user called the RPC operation to force the unpublishing of a remote stream.
* Applies to webrtcConnectionDestroyed * Applies to event webrtcConnectionDestroyed
*/ */
forceUnpublishByUser, forceUnpublishByUser,
/** /**
* The server application called the REST operation to force the unpublishing of * The server application called the REST operation to force the unpublishing of
* a user's stream. Applies to webrtcConnectionDestroyed * a user's stream. Applies to event webrtcConnectionDestroyed
*/ */
forceUnpublishByServer, forceUnpublishByServer,
/** /**
* A user called the RPC operation to force the disconnection of a remote user. * A user called the RPC operation to force the disconnection of a remote user.
* Applies to webrtcConnectionDestroyed and participantLeft. Can trigger other * Applies to events webrtcConnectionDestroyed and participantLeft. Can trigger
* events with lastParticipantLeft * other events with lastParticipantLeft
*/ */
forceDisconnectByUser, forceDisconnectByUser,
/** /**
* The server application called the REST operation to force the disconnection * The server application called the REST operation to force the disconnection
* of a user. Applies to webrtcConnectionDestroyed and participantLeft. Can * of a user. Applies to events webrtcConnectionDestroyed and participantLeft.
* trigger other events with lastParticipantLeft * Can trigger other events with lastParticipantLeft
*/ */
forceDisconnectByServer, forceDisconnectByServer,
/** /**
* The last participant left the session, which caused the session to be closed. * The last participant left the session, which caused the session to be closed.
* Applies to webrtcConnectionDestroyed, participantLeft, recordingStatusChanged * Applies to events webrtcConnectionDestroyed, participantLeft,
* and sessionDestroyed. Can be triggered from other events with other end * recordingStatusChanged and sessionDestroyed. Can be triggered from other
* reasons (disconnect, forceDisconnectByUser, forceDisconnectByServer, * events with other end reasons (disconnect, forceDisconnectByUser,
* networkDisconnect) * forceDisconnectByServer, networkDisconnect)
*/ */
lastParticipantLeft, lastParticipantLeft,
/** /**
* The server application called the REST operation to stop a recording. Applies * The server application called the REST operation to stop a recording. Applies
* to recordingStatusChanged * to event recordingStatusChanged
*/ */
recordingStoppedByServer, recordingStoppedByServer,
/** /**
* The server application called the REST operation to close a session. Applies * The server application called the REST operation to close a session. Applies
* to webrtcConnectionDestroyed, participantLeft, recordingStatusChanged and * to events webrtcConnectionDestroyed, participantLeft, recordingStatusChanged
* sessionDestroyed * and sessionDestroyed
*/ */
sessionClosedByServer, sessionClosedByServer,
@ -95,7 +95,7 @@ public enum EndReason {
/** /**
* A media server disconnected. This is reserved for Media Nodes being * A media server disconnected. This is reserved for Media Nodes being
* gracefully removed from an OpenVidu Pro cluster. Applies to * gracefully removed from an OpenVidu Pro cluster. Applies to events
* webrtcConnectionDestroyed, participantLeft, recordingStatusChanged and * webrtcConnectionDestroyed, participantLeft, recordingStatusChanged and
* sessionDestroyed * sessionDestroyed
*/ */
@ -103,29 +103,29 @@ public enum EndReason {
/** /**
* A media server disconnected, and a new one automatically reconnected. All of * A media server disconnected, and a new one automatically reconnected. All of
* the media endpoints were destroyed in the process. Applies to * the media endpoints were destroyed in the process. Applies to events
* webrtcConnectionDestroyed and recordingStatusChanged * webrtcConnectionDestroyed and recordingStatusChanged
*/ */
mediaServerReconnect, mediaServerReconnect,
/** /**
* A node has crashed. For now this means a Media Node has crashed. Applies to * A node has crashed. For now this means a Media Node has crashed. Applies to
* webrtcConnectionDestroyed, participantLeft, recordingStatusChanged and * events webrtcConnectionDestroyed, participantLeft, recordingStatusChanged and
* sessionDestroyed * sessionDestroyed
*/ */
nodeCrashed, nodeCrashed,
/** /**
* OpenVidu Server has gracefully stopped. This is reserved for OpenVidu Pro * OpenVidu Server has gracefully stopped. This is reserved for OpenVidu Pro
* restart operation. Applies to webrtcConnectionDestroyed, participantLeft, * restart operation. Applies to events webrtcConnectionDestroyed,
* recordingStatusChanged and sessionDestroyed * participantLeft, recordingStatusChanged and sessionDestroyed
*/ */
openviduServerStopped, openviduServerStopped,
/** /**
* A recording has been stopped automatically * A recording has been stopped automatically
* (https://docs.openvidu.io/en/stable/advanced-features/recording/#automatic-stop-of-recordings). * (https://docs.openvidu.io/en/stable/advanced-features/recording/#automatic-stop-of-recordings).
* Applies to recordingStatusChanged * Applies to event recordingStatusChanged
*/ */
automaticStop automaticStop

View File

@ -246,6 +246,15 @@ public class KurentoSession extends Session {
} }
} }
private void resetPipeline(Runnable callback) {
pipeline = null;
pipelineLatch = new CountDownLatch(1);
pipelineCreationErrorCause = null;
if (callback != null) {
callback.run();
}
}
private void closePipeline(Runnable callback) { private void closePipeline(Runnable callback) {
synchronized (pipelineReleaseLock) { synchronized (pipelineReleaseLock) {
if (pipeline == null) { if (pipeline == null) {
@ -260,25 +269,17 @@ public class KurentoSession extends Session {
@Override @Override
public void onSuccess(Void result) throws Exception { public void onSuccess(Void result) throws Exception {
log.debug("SESSION {}: Released Pipeline", sessionId); log.debug("SESSION {}: Released Pipeline", sessionId);
pipeline = null; resetPipeline(callback);
pipelineLatch = new CountDownLatch(1);
pipelineCreationErrorCause = null;
if (callback != null) {
callback.run();
}
} }
@Override @Override
public void onError(Throwable cause) throws Exception { public void onError(Throwable cause) throws Exception {
log.warn("SESSION {}: Could not successfully release Pipeline", sessionId, cause); log.warn("SESSION {}: Could not successfully release Pipeline", sessionId, cause);
pipeline = null; resetPipeline(callback);
pipelineLatch = new CountDownLatch(1);
pipelineCreationErrorCause = null;
if (callback != null) {
callback.run();
}
} }
}); });
} else {
resetPipeline(callback);
} }
} }
} }
@ -330,7 +331,10 @@ public class KurentoSession extends Session {
// Release pipeline, create a new one and prepare new PublisherEndpoints for // Release pipeline, create a new one and prepare new PublisherEndpoints for
// allowed users // allowed users
log.info("Resetting process: closing media pipeline for active session {}", this.sessionId); log.info("Resetting process: closing media pipeline for active session {}", this.sessionId);
try {
RemoteOperationUtils.setToSkipRemoteOperations();
this.closePipeline(() -> { this.closePipeline(() -> {
RemoteOperationUtils.revertToRunRemoteOperations();
log.info("Resetting process: media pipeline closed for active session {}", this.sessionId); log.info("Resetting process: media pipeline closed for active session {}", this.sessionId);
createPipeline(); createPipeline();
try { try {
@ -340,8 +344,8 @@ public class KurentoSession extends Session {
getParticipants().forEach(p -> { getParticipants().forEach(p -> {
if (!OpenViduRole.SUBSCRIBER.equals(p.getToken().getRole())) { if (!OpenViduRole.SUBSCRIBER.equals(p.getToken().getRole())) {
((KurentoParticipant) p).resetPublisherEndpoint(mediaOptionsMap.get(p.getParticipantPublicId()), ((KurentoParticipant) p)
null); .resetPublisherEndpoint(mediaOptionsMap.get(p.getParticipantPublicId()), null);
} }
}); });
log.info( log.info(
@ -351,6 +355,9 @@ public class KurentoSession extends Session {
log.error("Error waiting to new MediaPipeline on KurentoSession restart: {}", e.getMessage()); log.error("Error waiting to new MediaPipeline on KurentoSession restart: {}", e.getMessage());
} }
}); });
} finally {
RemoteOperationUtils.revertToRunRemoteOperations();
}
} }
@Override @Override

View File

@ -362,7 +362,6 @@ public class RecordingManager {
} }
((RecordingService) singleStreamRecordingService).sealRecordingMetadataFileAsStopped(recording); ((RecordingService) singleStreamRecordingService).sealRecordingMetadataFileAsStopped(recording);
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
this.cdr.recordRecordingStatusChanged(recording, reason, timestamp, Status.stopped); this.cdr.recordRecordingStatusChanged(recording, reason, timestamp, Status.stopped);
@ -384,6 +383,11 @@ public class RecordingManager {
public Recording forceStopRecording(Session session, EndReason reason, Long kmsDisconnectionTime) { public Recording forceStopRecording(Session session, EndReason reason, Long kmsDisconnectionTime) {
Recording recording; Recording recording;
recording = this.sessionsRecordings.get(session.getSessionId()); recording = this.sessionsRecordings.get(session.getSessionId());
((RecordingService) singleStreamRecordingService).sealRecordingMetadataFileAsStopped(recording);
final long timestamp = System.currentTimeMillis();
this.cdr.recordRecordingStatusChanged(recording, reason, timestamp, Status.stopped);
switch (recording.getOutputMode()) { switch (recording.getOutputMode()) {
case COMPOSED: case COMPOSED:
recording = this.composedRecordingService.stopRecording(session, recording, reason, kmsDisconnectionTime); recording = this.composedRecordingService.stopRecording(session, recording, reason, kmsDisconnectionTime);

View File

@ -253,12 +253,26 @@ public class AbstractOpenViduTestAppE2eTest {
other.dispose(); other.dispose();
it.remove(); it.remove();
} }
this.closeAllSessions(OV);
if (isRecordingTest) {
deleteAllRecordings(OV);
isRecordingTest = false;
}
if (isKurentoRestartTest) {
this.stopMediaServer(false);
this.startMediaServer(true);
isKurentoRestartTest = false;
}
OV = new OpenVidu(OPENVIDU_URL, OPENVIDU_SECRET);
}
protected void closeAllSessions(OpenVidu client) {
try { try {
OV.fetch(); client.fetch();
} catch (OpenViduJavaClientException | OpenViduHttpException e1) { } catch (OpenViduJavaClientException | OpenViduHttpException e1) {
log.error("Error fetching sessions: {}", e1.getMessage()); log.error("Error fetching sessions: {}", e1.getMessage());
} }
OV.getActiveSessions().forEach(session -> { client.getActiveSessions().forEach(session -> {
try { try {
session.close(); session.close();
log.info("Session {} successfully closed", session.getSessionId()); log.info("Session {} successfully closed", session.getSessionId());
@ -268,20 +282,29 @@ public class AbstractOpenViduTestAppE2eTest {
log.error("Error closing session: {}", e.getMessage()); log.error("Error closing session: {}", e.getMessage());
} }
}); });
if (isRecordingTest) { }
protected void deleteAllRecordings(OpenVidu client) {
try {
client.listRecordings().forEach(recording -> {
try {
client.deleteRecording(recording.getId());
log.info("Recording {} successfully deleted", recording.getId());
} catch (OpenViduJavaClientException e) {
log.error("Error deleting recording: {}", e.getMessage());
} catch (OpenViduHttpException e) {
log.error("Error deleting recording: {}", e.getMessage());
}
});
} catch (OpenViduJavaClientException | OpenViduHttpException e) {
log.error("Error listing recordings: {}", e.getMessage());
}
removeAllRecordingContiners(); removeAllRecordingContiners();
try { try {
FileUtils.cleanDirectory(new File("/opt/openvidu/recordings")); FileUtils.cleanDirectory(new File("/opt/openvidu/recordings"));
} catch (IOException e) { } catch (IOException e) {
log.error(e.getMessage()); log.error(e.getMessage());
} }
isRecordingTest = false;
}
if (isKurentoRestartTest) {
this.restartMediaServer();
isKurentoRestartTest = false;
}
OV = new OpenVidu(OPENVIDU_URL, OPENVIDU_SECRET);
} }
protected void listEmptyRecordings() { protected void listEmptyRecordings() {
@ -318,7 +341,7 @@ public class AbstractOpenViduTestAppE2eTest {
return "data:image/png;base64," + screenshotBase64; return "data:image/png;base64," + screenshotBase64;
} }
protected void startMediaServer() { protected void startMediaServer(boolean waitUntilKurentoClientReconnection) {
String command = null; String command = null;
if (MEDIA_SERVER_IMAGE.startsWith(KURENTO_IMAGE)) { if (MEDIA_SERVER_IMAGE.startsWith(KURENTO_IMAGE)) {
log.info("Starting kurento"); log.info("Starting kurento");
@ -336,9 +359,16 @@ public class AbstractOpenViduTestAppE2eTest {
System.exit(1); System.exit(1);
} }
commandLine.executeCommand(command); commandLine.executeCommand(command);
if (waitUntilKurentoClientReconnection) {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }
protected void stopMediaServer() { protected void stopMediaServer(boolean waitUntilNodeCrashedEvent) {
final String dockerRemoveCmd = "docker ps -a | awk '{ print $1,$2 }' | grep GREP_PARAMETER | awk '{ print $1 }' | xargs -I {} docker rm -f {}"; final String dockerRemoveCmd = "docker ps -a | awk '{ print $1,$2 }' | grep GREP_PARAMETER | awk '{ print $1 }' | xargs -I {} docker rm -f {}";
String grep = null; String grep = null;
if (MEDIA_SERVER_IMAGE.startsWith(KURENTO_IMAGE)) { if (MEDIA_SERVER_IMAGE.startsWith(KURENTO_IMAGE)) {
@ -352,17 +382,13 @@ public class AbstractOpenViduTestAppE2eTest {
System.exit(1); System.exit(1);
} }
commandLine.executeCommand(dockerRemoveCmd.replaceFirst("GREP_PARAMETER", grep)); commandLine.executeCommand(dockerRemoveCmd.replaceFirst("GREP_PARAMETER", grep));
} if (waitUntilNodeCrashedEvent) {
protected void restartMediaServer() {
this.stopMediaServer();
try { try {
Thread.sleep(1000); Thread.sleep(4000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
System.err.println("Error restarting media server");
e.printStackTrace(); e.printStackTrace();
} }
this.startMediaServer(); }
} }
protected void checkDockerContainerRunning(String imageName, int amount) { protected void checkDockerContainerRunning(String imageName, int amount) {

View File

@ -20,6 +20,9 @@ package io.openvidu.test.e2e;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Arrays; import java.util.Arrays;
import java.util.Base64; import java.util.Base64;
@ -3277,27 +3280,113 @@ public class OpenViduTestAppE2eTest extends AbstractOpenViduTestAppE2eTest {
} }
@Test @Test
@DisplayName("Kurento reconnect test") @DisplayName("Media server reconnect no active session test")
void kurentoReconnectTest() throws Exception { void mediaServerReconnectNoActiveSessionTest() throws Exception {
isRecordingTest = true;
isKurentoRestartTest = true; isKurentoRestartTest = true;
log.info("Kurento reconnect test"); log.info("Media server reconnect no active session test");
CountDownLatch initLatch = new CountDownLatch(1);
io.openvidu.test.browsers.utils.webhook.CustomWebhook.main(new String[0], initLatch);
try {
if (!initLatch.await(30, TimeUnit.SECONDS)) {
Assert.fail("Timeout waiting for webhook springboot app to start");
CustomWebhook.shutDown();
return;
}
// TOTAL DISCONNECTION
// Session should not be affected
String customSessionId = "RECONNECTION_SESSION";
OV.createSession(new SessionProperties.Builder().customSessionId(customSessionId).build());
CustomWebhook.waitForEvent("sessionCreated", 2);
OV.fetch();
List<Session> sessions = OV.getActiveSessions();
Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size());
this.stopMediaServer(true);
OV.fetch();
sessions = OV.getActiveSessions();
Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size());
// NO MEDIA SERVER
// Session should be created
OV.createSession();
CustomWebhook.waitForEvent("sessionCreated", 2);
OV.fetch();
sessions = OV.getActiveSessions();
Assert.assertEquals("Expected 2 active sessions but found " + sessions.size(), 2, sessions.size());
// RECONNECTION
// Session should not be affected
this.startMediaServer(true);
OV.fetch();
sessions = OV.getActiveSessions();
Assert.assertEquals("Expected 2 active sessions but found " + sessions.size(), 2, sessions.size());
this.closeAllSessions(OV);
OV.fetch();
sessions = OV.getActiveSessions();
Assert.assertEquals("Expected no active sessions but found " + sessions.size(), 0, sessions.size());
} finally {
CustomWebhook.shutDown();
}
}
@Test
@DisplayName("Media server reconnect active session no streams test")
void mediaServerReconnectActiveSessionNoSteamsTest() throws Exception {
isKurentoRestartTest = true;
log.info("Media server reconnect active session no streams test");
CountDownLatch initLatch = new CountDownLatch(1);
io.openvidu.test.browsers.utils.webhook.CustomWebhook.main(new String[0], initLatch);
try {
if (!initLatch.await(30, TimeUnit.SECONDS)) {
Assert.fail("Timeout waiting for webhook springboot app to start");
CustomWebhook.shutDown();
return;
}
setupBrowser("chrome");
// TOTAL DISCONNECTION
// Session should be destroyed with reason nodeCrashed
user.getDriver().findElement(By.id("add-user-btn")).click();
user.getDriver().findElements(By.className("publish-checkbox")).forEach(el -> el.click());
user.getDriver().findElement(By.className("join-btn")).click();
user.getEventManager().waitUntilEventReaches("connectionCreated", 1);
CustomWebhook.waitForEvent("sessionCreated", 2);
CustomWebhook.waitForEvent("participantJoined", 2);
OV.fetch(); OV.fetch();
List<Session> sessions = OV.getActiveSessions(); List<Session> sessions = OV.getActiveSessions();
Assert.assertEquals("Expected no active sessions but found " + sessions.size(), 0, sessions.size()); Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size());
Assert.assertEquals(
"Expected 1 active connection but found " + sessions.get(0).getActiveConnections().size(), 1,
sessions.get(0).getActiveConnections().size());
this.stopMediaServer(); this.stopMediaServer(true);
user.getEventManager().waitUntilEventReaches("sessionDisconnected", 1);
JsonObject event = CustomWebhook.waitForEvent("sessionDestroyed", 2);
Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString());
OV.fetch(); OV.fetch();
sessions = OV.getActiveSessions();
Assert.assertEquals("Expected no active sessions but found " + sessions.size(), 0, sessions.size());
user.getDriver().findElement(By.id("remove-user-btn")).sendKeys(Keys.ENTER);
setupBrowser("chromeAsRoot"); // NO MEDIA SERVER
// Session should be created, but client's operation joinRoom should fail
// Connect one publisher with no connection to KMS. Session should fail to be
// created and an alert message should be displayed on the browser
user.getDriver().findElement(By.id("add-user-btn")).click(); user.getDriver().findElement(By.id("add-user-btn")).click();
user.getDriver().findElement(By.className("publish-checkbox")).click();
user.getDriver().findElement(By.className("subscribe-checkbox")).click();
user.getDriver().findElement(By.className("join-btn")).click(); user.getDriver().findElement(By.className("join-btn")).click();
user.getWaiter().until(ExpectedConditions.alertIsPresent()); user.getWaiter().until(ExpectedConditions.alertIsPresent());
@ -3308,94 +3397,135 @@ public class OpenViduTestAppE2eTest extends AbstractOpenViduTestAppE2eTest {
+ alert.getText() + "\"", alert.getText().contains(alertMessage)); + alert.getText() + "\"", alert.getText().contains(alertMessage));
alert.accept(); alert.accept();
OV.fetch();
sessions = OV.getActiveSessions();
Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size());
user.getDriver().findElement(By.id("remove-user-btn")).sendKeys(Keys.ENTER); user.getDriver().findElement(By.id("remove-user-btn")).sendKeys(Keys.ENTER);
this.closeAllSessions(OV);
CustomWebhook.waitForEvent("sessionDestroyed", 2);
this.startMediaServer(); // RECONNECTION
Thread.sleep(5000); // Nothing should happen as long as there were no streams while reconnecting
// A publisher should be able to publish normally after media server reconnected
// Connect one subscriber-only user with connection to KMS -> restart KMS -> -> this.startMediaServer(true);
// check the session is still OK -> connect a publisher -> restart KMS -> check
// streamDestroyed events
user.getDriver().findElement(By.id("add-user-btn")).click(); user.getDriver().findElement(By.id("add-user-btn")).click();
user.getDriver().findElements(By.className("publish-checkbox")).forEach(el -> el.click()); user.getDriver().findElements(By.className("publish-checkbox")).forEach(el -> el.click());
user.getDriver().findElement(By.className("join-btn")).click(); user.getDriver().findElement(By.className("join-btn")).click();
user.getEventManager().waitUntilEventReaches("connectionCreated", 1); user.getEventManager().waitUntilEventReaches("connectionCreated", 1);
CustomWebhook.waitForEvent("sessionCreated", 2);
CustomWebhook.waitForEvent("participantJoined", 2);
OV.fetch(); OV.fetch();
sessions = OV.getActiveSessions(); sessions = OV.getActiveSessions();
Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size()); Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size());
Assert.assertEquals(
"Expected 1 active connection but found " + sessions.get(0).getActiveConnections().size(), 1,
sessions.get(0).getActiveConnections().size());
this.restartMediaServer(); this.stopMediaServer(false);
Thread.sleep(5000); this.startMediaServer(true);
OV.fetch();
sessions = OV.getActiveSessions();
Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size());
user.getDriver().findElement(By.id("add-user-btn")).click(); user.getDriver().findElement(By.id("add-user-btn")).click();
user.getDriver().findElement(By.cssSelector("#openvidu-instance-1 .join-btn")).click(); user.getDriver().findElement(By.cssSelector("#openvidu-instance-1 .join-btn")).click();
user.getEventManager().waitUntilEventReaches("connectionCreated", 4);
user.getEventManager().waitUntilEventReaches("accessAllowed", 1);
user.getEventManager().waitUntilEventReaches("streamCreated", 2); user.getEventManager().waitUntilEventReaches("streamCreated", 2);
user.getEventManager().waitUntilEventReaches("streamPlaying", 2); user.getEventManager().waitUntilEventReaches("streamPlaying", 2);
final int numberOfVideos = user.getDriver().findElements(By.tagName("video")).size(); this.closeAllSessions(OV);
Assert.assertEquals("Expected 2 videos but found " + numberOfVideos, 2, numberOfVideos); CustomWebhook.waitForEvent("sessionDestroyed", 2);
Assert.assertTrue("Videos were expected to have audio and video tracks", user.getEventManager() user.getDriver().findElement(By.id("remove-all-users-btn")).click();
.assertMediaTracks(user.getDriver().findElements(By.tagName("video")), true, true)); user.getEventManager().clearAllCurrentEvents();
CustomWebhook.clean();
} finally {
CustomWebhook.shutDown();
}
}
@Test
@DisplayName("Media server reconnect active session with streams test")
void mediaServerReconnectActiveSessionWithStreamsTest() throws Exception {
isRecordingTest = true;
isKurentoRestartTest = true;
log.info("Media server reconnect active session with streams test");
CountDownLatch initLatch = new CountDownLatch(1);
io.openvidu.test.browsers.utils.webhook.CustomWebhook.main(new String[0], initLatch);
try {
if (!initLatch.await(30, TimeUnit.SECONDS)) {
Assert.fail("Timeout waiting for webhook springboot app to start");
CustomWebhook.shutDown();
return;
}
setupBrowser("chrome");
// TOTAL DISCONNECTION
// Streams, Connections and Session should be destroyed with reason
// "nodeCrashed", with no possible recovery. INDIVIDUAL recording should be
// properly closed and available with reason "nodeCrashed"
user.getDriver().findElement(By.id("add-user-btn")).click();
user.getDriver().findElement(By.id("add-user-btn")).click();
user.getDriver().findElement(By.cssSelector("#openvidu-instance-1 .publish-checkbox")).click();
user.getDriver().findElements(By.className("join-btn")).forEach(el -> el.click());
user.getEventManager().clearAllCurrentEvents();
user.getEventManager().waitUntilEventReaches("streamPlaying", 2);
CustomWebhook.waitForEvent("webrtcConnectionCreated", 5);
CustomWebhook.waitForEvent("webrtcConnectionCreated", 1);
OV.fetch(); OV.fetch();
Session session = OV.getActiveSessions().get(0); List<Session> sessions = OV.getActiveSessions();
Assert.assertEquals("Expected 2 active connections but found " + session.getActiveConnections(), 2, Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size());
session.getActiveConnections().size()); Session session = sessions.get(0);
int pubs = session.getActiveConnections().stream().mapToInt(con -> con.getPublishers().size()).sum(); String streamId = session.getActiveConnections().stream().filter(c -> c.getPublishers().size() > 0)
int subs = session.getActiveConnections().stream().mapToInt(con -> con.getSubscribers().size()).sum(); .findFirst().get().getPublishers().get(0).getStreamId();
Assert.assertEquals("Expected 1 active publisher but found " + pubs, 1, pubs);
Assert.assertEquals("Expected 1 active subscriber but found " + subs, 1, subs);
OV.startRecording(session.getSessionId(), OV.startRecording(session.getSessionId(),
new RecordingProperties.Builder().outputMode(OutputMode.INDIVIDUAL).build()); new RecordingProperties.Builder().outputMode(OutputMode.INDIVIDUAL).build());
user.getEventManager().waitUntilEventReaches("recordingStarted", 2); user.getEventManager().waitUntilEventReaches("recordingStarted", 2);
Thread.sleep(5000); waitUntilFileExistsAndIsBiggerThan("/opt/openvidu/recordings/TestSession/" + streamId + ".webm", 400, 25);
final CountDownLatch latch = new CountDownLatch(4); final CountDownLatch latch = new CountDownLatch(2);
user.getEventManager().on("recordingStopped", (event) -> { user.getEventManager().on("sessionDisconnected", (ev) -> {
String reason = event.get("reason").getAsString(); Assert.assertEquals("Expected 'sessionDisconnected' reason 'nodeCrashed'", "nodeCrashed",
Assert.assertEquals("Expected 'recordingStopped' reason 'mediaServerReconnect'", "mediaServerReconnect", ev.get("reason").getAsString());
reason);
latch.countDown();
});
user.getEventManager().on("streamDestroyed", (event) -> {
String reason = event.get("reason").getAsString();
Assert.assertEquals("Expected 'streamDestroyed' reason 'mediaServerReconnect'", "mediaServerReconnect",
reason);
latch.countDown(); latch.countDown();
}); });
this.restartMediaServer(); user.getEventManager().clearAllCurrentEvents();
CustomWebhook.clean();
this.stopMediaServer(false);
user.getEventManager().waitUntilEventReaches("recordingStopped", 2); user.getEventManager().waitUntilEventReaches("sessionDisconnected", 2);
user.getEventManager().waitUntilEventReaches("streamDestroyed", 2); if (!latch.await(6000, TimeUnit.MILLISECONDS)) {
if (!latch.await(5000, TimeUnit.MILLISECONDS)) {
gracefullyLeaveParticipants(2); gracefullyLeaveParticipants(2);
fail("Waiting for 2 recordingStopped and 2 streamDestroyed events with reason 'mediaServerReconnect' to happen in total"); fail("Waiting for 2 sessionDisconnected events with reason 'nodeCrashed' to happen in total");
return; return;
} }
user.getEventManager().off("recordingStopped"); user.getEventManager().off("sessionDisconnected");
user.getEventManager().off("streamDestroyed");
session.fetch(); JsonObject event = CustomWebhook.waitForEvent("webrtcConnectionDestroyed", 1);
Assert.assertEquals("Expected 2 active connections but found " + session.getActiveConnections(), 2, Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString());
session.getActiveConnections().size()); event = CustomWebhook.waitForEvent("webrtcConnectionDestroyed", 1);
pubs = session.getActiveConnections().stream().mapToInt(con -> con.getPublishers().size()).sum(); Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString());
subs = session.getActiveConnections().stream().mapToInt(con -> con.getSubscribers().size()).sum(); event = CustomWebhook.waitForEvent("participantLeft", 1);
Assert.assertEquals("Expected no active publishers but found " + pubs, 0, pubs); Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString());
Assert.assertEquals("Expected no active subscribers but found " + subs, 0, subs); event = CustomWebhook.waitForEvent("participantLeft", 1);
Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString());
event = CustomWebhook.waitForEvent("recordingStatusChanged", 1);
Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString());
event = CustomWebhook.waitForEvent("recordingStatusChanged", 1);
Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString());
event = CustomWebhook.waitForEvent("sessionDestroyed", 1);
Assert.assertEquals("Wrong reason in webhook event", "nodeCrashed", event.get("reason").getAsString());
Recording rec = OV.getRecording("TestSession"); Recording rec = OV.getRecording("TestSession");
Assert.assertTrue("Recording duration is 0", rec.getDuration() > 0); Assert.assertTrue("Recording duration is 0", rec.getDuration() > 0);
@ -3404,21 +3534,98 @@ public class OpenViduTestAppE2eTest extends AbstractOpenViduTestAppE2eTest {
this.recordingUtils.checkIndividualRecording("/opt/openvidu/recordings/TestSession/", rec, 1, "opus", "vp8", this.recordingUtils.checkIndividualRecording("/opt/openvidu/recordings/TestSession/", rec, 1, "opus", "vp8",
true); true);
WebElement pubBtn = user.getDriver().findElements(By.cssSelector("#openvidu-instance-1 .pub-btn")).get(0); user.getDriver().findElement(By.id("remove-all-users-btn")).click();
pubBtn.click(); user.getEventManager().clearAllCurrentEvents();
pubBtn.click(); this.closeAllSessions(OV);
user.getEventManager().waitUntilEventReaches("streamCreated", 4); deleteAllRecordings(OV);
user.getEventManager().waitUntilEventReaches("streamPlaying", 4); CustomWebhook.clean();
session.fetch(); this.startMediaServer(true);
// RECONNECTION
// Streams should be destroyed with reason "mediaServerReconnect", but
// Connections and Session should remain alive. INDIVIDUAL recording should be
// properly closed and available with reason "mediaServerReconnect"
user.getDriver().findElement(By.id("add-user-btn")).click();
user.getDriver().findElement(By.id("add-user-btn")).click();
user.getDriver().findElement(By.cssSelector("#openvidu-instance-1 .publish-checkbox")).click();
user.getDriver().findElements(By.className("join-btn")).forEach(el -> el.click());
user.getEventManager().waitUntilEventReaches("streamPlaying", 2);
CustomWebhook.waitForEvent("webrtcConnectionCreated", 5);
CustomWebhook.waitForEvent("webrtcConnectionCreated", 1);
OV.fetch();
sessions = OV.getActiveSessions();
Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size());
session = sessions.get(0);
streamId = session.getActiveConnections().stream().filter(c -> c.getPublishers().size() > 0).findFirst()
.get().getPublishers().get(0).getStreamId();
OV.startRecording(session.getSessionId(),
new RecordingProperties.Builder().outputMode(OutputMode.INDIVIDUAL).build());
user.getEventManager().waitUntilEventReaches("recordingStarted", 2);
waitUntilFileExistsAndIsBiggerThan("/opt/openvidu/recordings/TestSession/" + streamId + ".webm", 400, 25);
final CountDownLatch latch2 = new CountDownLatch(4);
user.getEventManager().on("recordingStopped", (ev) -> {
Assert.assertEquals("Expected 'recordingStopped' reason 'mediaServerReconnect'", "mediaServerReconnect",
ev.get("reason").getAsString());
latch2.countDown();
});
user.getEventManager().on("streamDestroyed", (ev) -> {
Assert.assertEquals("Expected 'streamDestroyed' reason 'mediaServerReconnect'", "mediaServerReconnect",
ev.get("reason").getAsString());
latch2.countDown();
});
user.getEventManager().clearAllCurrentEvents();
CustomWebhook.clean();
this.stopMediaServer(false);
this.startMediaServer(false);
user.getEventManager().waitUntilEventReaches("recordingStopped", 2);
user.getEventManager().waitUntilEventReaches("streamDestroyed", 2);
if (!latch2.await(6000, TimeUnit.MILLISECONDS)) {
gracefullyLeaveParticipants(2);
fail("Waiting for 2 recordingStopped and 2 streamDestroyed events with reason 'mediaServerReconnect' to happen in total");
return;
}
user.getEventManager().off("recordingStopped");
user.getEventManager().off("streamDestroyed");
event = CustomWebhook.waitForEvent("webrtcConnectionDestroyed", 1);
Assert.assertEquals("Wrong reason in webhook event", "mediaServerReconnect",
event.get("reason").getAsString());
event = CustomWebhook.waitForEvent("webrtcConnectionDestroyed", 1);
Assert.assertEquals("Wrong reason in webhook event", "mediaServerReconnect",
event.get("reason").getAsString());
event = CustomWebhook.waitForEvent("recordingStatusChanged", 1);
Assert.assertEquals("Wrong reason in webhook event", "mediaServerReconnect",
event.get("reason").getAsString());
event = CustomWebhook.waitForEvent("recordingStatusChanged", 1);
Assert.assertEquals("Wrong reason in webhook event", "mediaServerReconnect",
event.get("reason").getAsString());
rec = OV.getRecording("TestSession");
Assert.assertTrue("Recording duration is 0", rec.getDuration() > 0);
Assert.assertTrue("Recording size is 0", rec.getSize() > 0);
this.recordingUtils.checkIndividualRecording("/opt/openvidu/recordings/TestSession/", rec, 1, "opus", "vp8",
true);
OV.fetch();
sessions = OV.getActiveSessions();
Assert.assertEquals("Expected 1 active sessions but found " + sessions.size(), 1, sessions.size());
session = sessions.get(0);
Assert.assertEquals("Expected 2 active connections but found " + session.getActiveConnections(), 2, Assert.assertEquals("Expected 2 active connections but found " + session.getActiveConnections(), 2,
session.getActiveConnections().size()); session.getActiveConnections().size());
pubs = session.getActiveConnections().stream().mapToInt(con -> con.getPublishers().size()).sum();
subs = session.getActiveConnections().stream().mapToInt(con -> con.getSubscribers().size()).sum();
Assert.assertEquals("Expected 1 active publisher but found " + pubs, 1, pubs);
Assert.assertEquals("Expected 1 active subscriber but found " + subs, 1, subs);
gracefullyLeaveParticipants(2); } finally {
CustomWebhook.shutDown();
}
} }
@Test @Test
@ -4440,4 +4647,33 @@ public class OpenViduTestAppE2eTest extends AbstractOpenViduTestAppE2eTest {
sessionVP9AllowTranscoding.getProperties().isTranscodingAllowed()); sessionVP9AllowTranscoding.getProperties().isTranscodingAllowed());
} }
private void waitUntilFileExistsAndIsBiggerThan(String absolutePath, int kbs, int maxSecondsWait) throws Exception {
int interval = 100;
int maxLoops = (maxSecondsWait * 1000) / interval;
int loop = 0;
long bytes = 0;
boolean bigger = false;
Path path = Paths.get(absolutePath);
while (!bigger && loop < maxLoops) {
bigger = Files.exists(path) && Files.isReadable(path);
if (bigger) {
try {
bytes = Files.size(path);
} catch (IOException e) {
System.err.println("Error getting file size from " + path + ": " + e.getMessage());
}
bigger = (bytes / 1024) > kbs;
}
loop++;
Thread.sleep(interval);
}
if (!bigger && loop >= maxLoops) {
throw new Exception("File " + absolutePath + " did not reach a size of at least " + kbs + " KBs in "
+ maxSecondsWait + " seconds. Last check was " + bytes + " KBs");
}
}
} }