diff --git a/openvidu-java-client/src/main/java/io/openvidu/java/client/RecordingProperties.java b/openvidu-java-client/src/main/java/io/openvidu/java/client/RecordingProperties.java index 5028b54a..624926dc 100644 --- a/openvidu-java-client/src/main/java/io/openvidu/java/client/RecordingProperties.java +++ b/openvidu-java-client/src/main/java/io/openvidu/java/client/RecordingProperties.java @@ -17,6 +17,8 @@ package io.openvidu.java.client; +import java.util.Arrays; +import java.util.List; import java.util.Map; import com.google.gson.JsonObject; @@ -471,6 +473,9 @@ public class RecordingProperties { return json; } + /** + * @hidden + */ public static RecordingProperties.Builder fromJson(Map params, RecordingProperties defaultProps) throws RuntimeException { @@ -707,4 +712,14 @@ public class RecordingProperties { return (OutputMode.COMPOSED.equals(outputMode) || OutputMode.COMPOSED_QUICK_START.equals(outputMode)); } + /** + * @hidden + */ + public final static Map removeNonBroadcastProperties(Map params) { + List nonBroadcastProps = Arrays + .asList(new String[] { "outputMode", "name", "hasVideo", "ignoreFailedStreams" }); + nonBroadcastProps.forEach(p -> params.remove(p)); + return params; + } + } diff --git a/openvidu-java-client/src/test/java/io/openvidu/java/client/test/RecordingPropertiesTest.java b/openvidu-java-client/src/test/java/io/openvidu/java/client/test/RecordingPropertiesTest.java index 5f0c195b..2b5498c8 100644 --- a/openvidu-java-client/src/test/java/io/openvidu/java/client/test/RecordingPropertiesTest.java +++ b/openvidu-java-client/src/test/java/io/openvidu/java/client/test/RecordingPropertiesTest.java @@ -8,6 +8,7 @@ import org.junit.jupiter.api.Test; import com.google.gson.Gson; import com.google.gson.JsonObject; +import io.openvidu.java.client.Recording.OutputMode; import io.openvidu.java.client.RecordingProperties; public class RecordingPropertiesTest { @@ -115,6 +116,18 @@ public class RecordingPropertiesTest { assertException(map, "Wrong 'mediaNode' parameter", IllegalArgumentException.class); } + @Test + public void testNonBroadcastProperties() { + Map map = mapFromJsonString( + "{'outputMode':'INDIVIDUAL','name':'ABDCFG','hasVideo':false,'ignoreFailedStreams':true,'session':'TestSession','hasAudio':true,'recordingLayout':'CUSTOM','customLayout':'layout1','resolution':'920x600','frameRate':18,'shmSize':600000000,'mediaNode':{'id':'mediaNodeId'}}"); + RecordingProperties.removeNonBroadcastProperties(map); + RecordingProperties props = RecordingProperties.fromJson(map, null).build(); + Assertions.assertEquals(OutputMode.COMPOSED, props.outputMode()); + Assertions.assertEquals("", props.name()); + Assertions.assertEquals(true, props.hasVideo()); + Assertions.assertNull(props.ignoreFailedStreams()); + } + private JsonObject adaptProps(JsonObject json) { json.remove("session"); if (json.has("mediaNode")) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/broadcast/BroadcastManager.java b/openvidu-server/src/main/java/io/openvidu/server/broadcast/BroadcastManager.java index 742aad6e..9d9bb1d8 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/broadcast/BroadcastManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/broadcast/BroadcastManager.java @@ -1,7 +1,13 @@ package io.openvidu.server.broadcast; +import io.openvidu.java.client.RecordingProperties; +import io.openvidu.server.core.EndReason; +import io.openvidu.server.core.Session; + public interface BroadcastManager { boolean sessionIsBeingBroadcasted(String sessionId); + void stopBroadcast(Session session, RecordingProperties properties, EndReason reason); + } diff --git a/openvidu-server/src/main/java/io/openvidu/server/broadcast/BroadcastManagerDummy.java b/openvidu-server/src/main/java/io/openvidu/server/broadcast/BroadcastManagerDummy.java index d8b6f112..c67f796d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/broadcast/BroadcastManagerDummy.java +++ b/openvidu-server/src/main/java/io/openvidu/server/broadcast/BroadcastManagerDummy.java @@ -1,5 +1,9 @@ package io.openvidu.server.broadcast; +import io.openvidu.java.client.RecordingProperties; +import io.openvidu.server.core.EndReason; +import io.openvidu.server.core.Session; + public class BroadcastManagerDummy implements BroadcastManager { @Override @@ -7,4 +11,8 @@ public class BroadcastManagerDummy implements BroadcastManager { return false; } + @Override + public void stopBroadcast(Session session, RecordingProperties properties, EndReason reason) { + } + } diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java index 91d8b55e..1e3ae7f0 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java @@ -685,7 +685,7 @@ public class SessionEventsHandler { * by the crashed Media Node */ public void onMediaNodeCrashed(Kms kms, String environmentId, long timeOfDisconnection, List sessionIds, - List recordingIds) { + List recordingIds, List broadcasts) { } public void onMediaNodeRecovered(Kms kms, String environmentId, long timeOfConnection) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index ad59347c..252df881 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -707,6 +707,19 @@ public abstract class SessionManager { } } }); + // Stop all external broadcasts + kms.getActiveBroadcasts().forEach(sessionId -> { + Session session = this.getSession(sessionId); + if (session != null && !session.isClosed()) { + // This is a broadcast of a Session hosted on a different Media Node + try { + this.broadcastManager.stopBroadcast(session, null, RecordingManager.finalReason(reason)); + } catch (OpenViduException e) { + log.error("Error stopping external broadcast of session {} in Media Node {}: {}", sessionId, + kms.getId(), e.getMessage()); + } + } + }); } private Participant newParticipantAux(String sessionId, String uniqueSessionId, String finalUserId, diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index e17adb42..e44295df 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -84,6 +84,24 @@ public class FixedOneKmsManager extends KmsManager { } } + @Override + public void incrementActiveBroadcasts(RecordingProperties properties, Session session) { + try { + this.getKmss().iterator().next().incrementActiveBroadcasts(session.getSessionId()); + } catch (NoSuchElementException e) { + log.error("There is no KMS available when incrementing active broadcasts"); + } + } + + @Override + public void decrementActiveBroadcasts(RecordingProperties properties, Session session) { + try { + this.getKmss().iterator().next().decrementActiveBroadcasts(session.getSessionId()); + } catch (NoSuchElementException e) { + log.error("There is no KMS available when decrementing active broadcasts"); + } + } + @Override public void removeMediaNodeUponCrash(String mediaNodeId) { } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java index dc964a32..74825a53 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java @@ -20,6 +20,7 @@ package io.openvidu.server.kurento.kms; import java.net.MalformedURLException; import java.net.URL; import java.util.Collection; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -73,6 +74,7 @@ public class Kms { private Map kurentoSessions = new ConcurrentHashMap<>(); private Map activeRecordings = new ConcurrentHashMap<>(); + private Set activeBroadcasts = ConcurrentHashMap.newKeySet(); private AtomicLong activeComposedRecordings = new AtomicLong(); public Kms(KmsProperties props, LoadManager loadManager, KmsManager kmsManager) { @@ -197,6 +199,10 @@ public class Kms { return this.activeRecordings.entrySet(); } + public synchronized Set getActiveBroadcasts() { + return this.activeBroadcasts; + } + public synchronized void incrementActiveRecordings(String sessionId, String recordingId, RecordingProperties properties) { this.activeRecordings.put(recordingId, sessionId); @@ -213,6 +219,15 @@ public class Kms { kmsManager.getMediaNodeManager().dropIdleMediaNode(this.id); } + public synchronized void incrementActiveBroadcasts(String sessionId) { + this.activeBroadcasts.add(sessionId); + } + + public synchronized void decrementActiveBroadcasts(String sessionId) { + this.activeBroadcasts.remove(sessionId); + kmsManager.getMediaNodeManager().dropIdleMediaNode(this.id); + } + public JsonObject toJson() { JsonObject json = new JsonObject(); json.addProperty("id", this.id); @@ -247,6 +262,11 @@ public class Kms { activeRecordingsJson.add(recordingId); } json.add("recordingIds", activeRecordingsJson); + JsonArray activeBroadcastsJson = new JsonArray(); + for (String sessionIdBroadcasted : this.activeBroadcasts) { + activeBroadcastsJson.add(sessionIdBroadcasted); + } + json.add("broadcasts", activeBroadcastsJson); } if (withExtraInfo) { @@ -310,6 +330,10 @@ public class Kms { return this.activeComposedRecordings.intValue(); } + public int getNumberOfBroadcasts() { + return this.activeBroadcasts.size(); + } + public MediaServer getMediaServerType() { if (this.mediaServer == null) { this.fetchMediaServerType(); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index 14f24a9d..b7a3f582 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -44,12 +44,6 @@ import org.springframework.beans.factory.annotation.Autowired; import com.google.gson.JsonObject; -import io.openvidu.client.OpenViduException; -import io.openvidu.client.OpenViduException.Code; -import io.openvidu.client.internal.ProtocolElements; -import io.openvidu.java.client.ConnectionProperties; -import io.openvidu.java.client.ConnectionType; -import io.openvidu.java.client.OpenViduRole; import io.openvidu.java.client.RecordingProperties; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.EndReason; @@ -57,7 +51,6 @@ import io.openvidu.server.core.IdentifierPrefixes; import io.openvidu.server.core.Session; import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionManager; -import io.openvidu.server.core.Token; import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.utils.MediaNodeManager; import io.openvidu.server.utils.RemoteOperationUtils; @@ -393,6 +386,10 @@ public abstract class KmsManager { public abstract void decrementActiveRecordings(RecordingProperties recordingProperties, String recordingId, Session session); + public abstract void incrementActiveBroadcasts(RecordingProperties properties, Session session); + + public abstract void decrementActiveBroadcasts(RecordingProperties properties, Session session); + public abstract void removeMediaNodeUponCrash(String mediaNodeId); protected abstract String getEnvironmentId(String mediaNodeId); @@ -424,11 +421,12 @@ public abstract class KmsManager { .collect(Collectors.toUnmodifiableList()); final List affectedRecordingIds = kms.getActiveRecordings().stream().map(entry -> entry.getKey()) .collect(Collectors.toUnmodifiableList()); + final List affectedBroadcasts = new ArrayList<>(kms.getActiveBroadcasts()); // 1. Send nodeCrashed webhook event String environmentId = getEnvironmentId(kms.getId()); sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection, affectedSessionIds, - affectedRecordingIds); + affectedRecordingIds, affectedBroadcasts); // 2. Remove Media Node from cluster if necessary if (mustRemoveMediaNode) { diff --git a/openvidu-test-e2e/src/main/java/io/openvidu/test/e2e/OpenViduTestE2e.java b/openvidu-test-e2e/src/main/java/io/openvidu/test/e2e/OpenViduTestE2e.java index 0b34444a..de8f884b 100644 --- a/openvidu-test-e2e/src/main/java/io/openvidu/test/e2e/OpenViduTestE2e.java +++ b/openvidu-test-e2e/src/main/java/io/openvidu/test/e2e/OpenViduTestE2e.java @@ -3,6 +3,7 @@ package io.openvidu.test.e2e; import static org.openqa.selenium.OutputType.BASE64; import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.net.HttpURLConnection; import java.nio.file.Files; @@ -857,4 +858,47 @@ public class OpenViduTestE2e { } } + // https://github.com/tiangolo/nginx-rtmp-docker + protected static void startRtmpServer() throws IOException { + File file = writeRtmpServerConfigInFile(); + String dockerRunCommand = "docker run -d --name broadcast-nginx -p 1935:1935 -v " + file.getAbsolutePath() + + ":/etc/nginx/nginx.conf tiangolo/nginx-rtmp"; + commandLine.executeCommand(dockerRunCommand, 10); + } + + protected static void stopRtmpServer() { + String dockerRemoveCommand = "docker rm -f broadcast-nginx"; + commandLine.executeCommand(dockerRemoveCommand, 10); + } + + private static File writeRtmpServerConfigInFile() throws IOException { + String newLine = System.getProperty("line.separator"); + // @formatter:off + String config = String.join(newLine, + "worker_processes auto;", + "rtmp_auto_push on;", + "events {}", + "rtmp {", + " server {", + " listen 1935;", + " listen [::]:1935 ipv6only=on;", + " application live {", + " live on;", + " recorder all {", + " record video;", + " record_path /tmp;", + " record_max_size 100000K;", + " record_unique on;", + " record_suffix rtmp.flv;", + " }", + " }", + " }", + "}"); + // @formatter:on + File tmpFile = File.createTempFile("broadcast-nginx", ".conf"); + FileWriter writer = new FileWriter(tmpFile); + writer.write(config); + writer.close(); + return tmpFile; + } } diff --git a/openvidu-test-e2e/src/test/java/io/openvidu/test/e2e/OpenViduProTestAppE2eTest.java b/openvidu-test-e2e/src/test/java/io/openvidu/test/e2e/OpenViduProTestAppE2eTest.java index 6266cccb..93cbedcd 100644 --- a/openvidu-test-e2e/src/test/java/io/openvidu/test/e2e/OpenViduProTestAppE2eTest.java +++ b/openvidu-test-e2e/src/test/java/io/openvidu/test/e2e/OpenViduProTestAppE2eTest.java @@ -4,7 +4,6 @@ import static org.junit.jupiter.api.Assertions.fail; import java.awt.Point; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileReader; import java.net.HttpURLConnection; import java.nio.file.Files; @@ -38,7 +37,6 @@ import org.openqa.selenium.Keys; import org.openqa.selenium.WebElement; import org.openqa.selenium.support.ui.ExpectedConditions; import org.springframework.http.HttpMethod; -import org.springframework.util.ResourceUtils; import com.google.gson.Gson; import com.google.gson.JsonArray; @@ -480,7 +478,13 @@ public class OpenViduProTestAppE2eTest extends AbstractOpenViduTestappE2eTest { containerId = restClient.rest(HttpMethod.GET, "/openvidu/api/media-nodes", HttpURLConnection.HTTP_OK) .get("content").getAsJsonArray().get(0).getAsJsonObject().get("environmentId").getAsString(); MediaNodeDockerUtils.crashMediaNode(containerId); - CustomWebhook.waitForEvent("nodeCrashed", 10); + JsonObject nodeCrashedEvent = CustomWebhook.waitForEvent("nodeCrashed", 10); + + Assertions.assertEquals(1, nodeCrashedEvent.get("recordingIds").getAsJsonArray().size()); + JsonArray affectedBroadcasts = nodeCrashedEvent.get("broadcasts").getAsJsonArray(); + Assertions.assertEquals(1, affectedBroadcasts.size()); + Assertions.assertTrue(affectedBroadcasts.get(0).equals(JsonParser.parseString("TestSession"))); + CustomWebhook.waitForEvent("mediaNodeStatusChanged", 2); for (int i = 0; i < 4; i++) { Assertions.assertEquals("nodeCrashed", @@ -2983,10 +2987,10 @@ public class OpenViduProTestAppE2eTest extends AbstractOpenViduTestappE2eTest { } @Test - @DisplayName("Broadcast and composed recording test") - void broadcastAndComposedRecordingTest() throws Exception { + @DisplayName("Broadcast ad STT and composed recording test") + void broadcastAndSttAndComposedRecordingTest() throws Exception { - log.info("Broadcast and composed recording test"); + log.info("Broadcast and STT and composed recording test"); try { startRtmpServer(); @@ -2996,19 +3000,6 @@ public class OpenViduProTestAppE2eTest extends AbstractOpenViduTestappE2eTest { } } - // https://github.com/tiangolo/nginx-rtmp-docker - private void startRtmpServer() throws FileNotFoundException { - File file = ResourceUtils.getFile("classpath:broadcast-nginx.conf"); - String dockerRunCommand = "docker run -d --name broadcast-nginx -p 1935:1935 -v " + file.getAbsolutePath() - + ":/etc/nginx/nginx.conf tiangolo/nginx-rtmp"; - commandLine.executeCommand(dockerRunCommand, 10); - } - - private void stopRtmpServer() { - String dockerRemoveCommand = "docker rm -f broadcast-nginx"; - commandLine.executeCommand(dockerRemoveCommand, 10); - } - private void checkRtmpRecordingIsFine(long secondsTimeout) throws InterruptedException { final String broadcastRecordingPath = "/opt/openvidu/recordings"; final String cleanBroadcastPath = "rm -rf " + broadcastRecordingPath + "/tmp";