openvidu-server: differentiate between activeRecordings and activeBroadcasts

pull/780/head
pabloFuente 2023-02-17 14:34:49 +01:00
parent 832b40fd83
commit 4118ba2e92
11 changed files with 158 additions and 28 deletions

View File

@ -17,6 +17,8 @@
package io.openvidu.java.client; package io.openvidu.java.client;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
@ -471,6 +473,9 @@ public class RecordingProperties {
return json; return json;
} }
/**
* @hidden
*/
public static RecordingProperties.Builder fromJson(Map<String, ?> params, RecordingProperties defaultProps) public static RecordingProperties.Builder fromJson(Map<String, ?> params, RecordingProperties defaultProps)
throws RuntimeException { throws RuntimeException {
@ -707,4 +712,14 @@ public class RecordingProperties {
return (OutputMode.COMPOSED.equals(outputMode) || OutputMode.COMPOSED_QUICK_START.equals(outputMode)); return (OutputMode.COMPOSED.equals(outputMode) || OutputMode.COMPOSED_QUICK_START.equals(outputMode));
} }
/**
* @hidden
*/
public final static Map<String, ?> removeNonBroadcastProperties(Map<String, ?> params) {
List<String> nonBroadcastProps = Arrays
.asList(new String[] { "outputMode", "name", "hasVideo", "ignoreFailedStreams" });
nonBroadcastProps.forEach(p -> params.remove(p));
return params;
}
} }

View File

@ -8,6 +8,7 @@ import org.junit.jupiter.api.Test;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import io.openvidu.java.client.Recording.OutputMode;
import io.openvidu.java.client.RecordingProperties; import io.openvidu.java.client.RecordingProperties;
public class RecordingPropertiesTest { public class RecordingPropertiesTest {
@ -115,6 +116,18 @@ public class RecordingPropertiesTest {
assertException(map, "Wrong 'mediaNode' parameter", IllegalArgumentException.class); assertException(map, "Wrong 'mediaNode' parameter", IllegalArgumentException.class);
} }
@Test
public void testNonBroadcastProperties() {
Map<String, ?> map = mapFromJsonString(
"{'outputMode':'INDIVIDUAL','name':'ABDCFG','hasVideo':false,'ignoreFailedStreams':true,'session':'TestSession','hasAudio':true,'recordingLayout':'CUSTOM','customLayout':'layout1','resolution':'920x600','frameRate':18,'shmSize':600000000,'mediaNode':{'id':'mediaNodeId'}}");
RecordingProperties.removeNonBroadcastProperties(map);
RecordingProperties props = RecordingProperties.fromJson(map, null).build();
Assertions.assertEquals(OutputMode.COMPOSED, props.outputMode());
Assertions.assertEquals("", props.name());
Assertions.assertEquals(true, props.hasVideo());
Assertions.assertNull(props.ignoreFailedStreams());
}
private JsonObject adaptProps(JsonObject json) { private JsonObject adaptProps(JsonObject json) {
json.remove("session"); json.remove("session");
if (json.has("mediaNode")) { if (json.has("mediaNode")) {

View File

@ -1,7 +1,13 @@
package io.openvidu.server.broadcast; package io.openvidu.server.broadcast;
import io.openvidu.java.client.RecordingProperties;
import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.Session;
public interface BroadcastManager { public interface BroadcastManager {
boolean sessionIsBeingBroadcasted(String sessionId); boolean sessionIsBeingBroadcasted(String sessionId);
void stopBroadcast(Session session, RecordingProperties properties, EndReason reason);
} }

View File

@ -1,5 +1,9 @@
package io.openvidu.server.broadcast; package io.openvidu.server.broadcast;
import io.openvidu.java.client.RecordingProperties;
import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.Session;
public class BroadcastManagerDummy implements BroadcastManager { public class BroadcastManagerDummy implements BroadcastManager {
@Override @Override
@ -7,4 +11,8 @@ public class BroadcastManagerDummy implements BroadcastManager {
return false; return false;
} }
@Override
public void stopBroadcast(Session session, RecordingProperties properties, EndReason reason) {
}
} }

View File

@ -685,7 +685,7 @@ public class SessionEventsHandler {
* by the crashed Media Node * by the crashed Media Node
*/ */
public void onMediaNodeCrashed(Kms kms, String environmentId, long timeOfDisconnection, List<String> sessionIds, public void onMediaNodeCrashed(Kms kms, String environmentId, long timeOfDisconnection, List<String> sessionIds,
List<String> recordingIds) { List<String> recordingIds, List<String> broadcasts) {
} }
public void onMediaNodeRecovered(Kms kms, String environmentId, long timeOfConnection) { public void onMediaNodeRecovered(Kms kms, String environmentId, long timeOfConnection) {

View File

@ -707,6 +707,19 @@ public abstract class SessionManager {
} }
} }
}); });
// Stop all external broadcasts
kms.getActiveBroadcasts().forEach(sessionId -> {
Session session = this.getSession(sessionId);
if (session != null && !session.isClosed()) {
// This is a broadcast of a Session hosted on a different Media Node
try {
this.broadcastManager.stopBroadcast(session, null, RecordingManager.finalReason(reason));
} catch (OpenViduException e) {
log.error("Error stopping external broadcast of session {} in Media Node {}: {}", sessionId,
kms.getId(), e.getMessage());
}
}
});
} }
private Participant newParticipantAux(String sessionId, String uniqueSessionId, String finalUserId, private Participant newParticipantAux(String sessionId, String uniqueSessionId, String finalUserId,

View File

@ -84,6 +84,24 @@ public class FixedOneKmsManager extends KmsManager {
} }
} }
@Override
public void incrementActiveBroadcasts(RecordingProperties properties, Session session) {
try {
this.getKmss().iterator().next().incrementActiveBroadcasts(session.getSessionId());
} catch (NoSuchElementException e) {
log.error("There is no KMS available when incrementing active broadcasts");
}
}
@Override
public void decrementActiveBroadcasts(RecordingProperties properties, Session session) {
try {
this.getKmss().iterator().next().decrementActiveBroadcasts(session.getSessionId());
} catch (NoSuchElementException e) {
log.error("There is no KMS available when decrementing active broadcasts");
}
}
@Override @Override
public void removeMediaNodeUponCrash(String mediaNodeId) { public void removeMediaNodeUponCrash(String mediaNodeId) {
} }

View File

@ -20,6 +20,7 @@ package io.openvidu.server.kurento.kms;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
@ -73,6 +74,7 @@ public class Kms {
private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>(); private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>();
private Map<String, String> activeRecordings = new ConcurrentHashMap<>(); private Map<String, String> activeRecordings = new ConcurrentHashMap<>();
private Set<String> activeBroadcasts = ConcurrentHashMap.newKeySet();
private AtomicLong activeComposedRecordings = new AtomicLong(); private AtomicLong activeComposedRecordings = new AtomicLong();
public Kms(KmsProperties props, LoadManager loadManager, KmsManager kmsManager) { public Kms(KmsProperties props, LoadManager loadManager, KmsManager kmsManager) {
@ -197,6 +199,10 @@ public class Kms {
return this.activeRecordings.entrySet(); return this.activeRecordings.entrySet();
} }
public synchronized Set<String> getActiveBroadcasts() {
return this.activeBroadcasts;
}
public synchronized void incrementActiveRecordings(String sessionId, String recordingId, public synchronized void incrementActiveRecordings(String sessionId, String recordingId,
RecordingProperties properties) { RecordingProperties properties) {
this.activeRecordings.put(recordingId, sessionId); this.activeRecordings.put(recordingId, sessionId);
@ -213,6 +219,15 @@ public class Kms {
kmsManager.getMediaNodeManager().dropIdleMediaNode(this.id); kmsManager.getMediaNodeManager().dropIdleMediaNode(this.id);
} }
public synchronized void incrementActiveBroadcasts(String sessionId) {
this.activeBroadcasts.add(sessionId);
}
public synchronized void decrementActiveBroadcasts(String sessionId) {
this.activeBroadcasts.remove(sessionId);
kmsManager.getMediaNodeManager().dropIdleMediaNode(this.id);
}
public JsonObject toJson() { public JsonObject toJson() {
JsonObject json = new JsonObject(); JsonObject json = new JsonObject();
json.addProperty("id", this.id); json.addProperty("id", this.id);
@ -247,6 +262,11 @@ public class Kms {
activeRecordingsJson.add(recordingId); activeRecordingsJson.add(recordingId);
} }
json.add("recordingIds", activeRecordingsJson); json.add("recordingIds", activeRecordingsJson);
JsonArray activeBroadcastsJson = new JsonArray();
for (String sessionIdBroadcasted : this.activeBroadcasts) {
activeBroadcastsJson.add(sessionIdBroadcasted);
}
json.add("broadcasts", activeBroadcastsJson);
} }
if (withExtraInfo) { if (withExtraInfo) {
@ -310,6 +330,10 @@ public class Kms {
return this.activeComposedRecordings.intValue(); return this.activeComposedRecordings.intValue();
} }
public int getNumberOfBroadcasts() {
return this.activeBroadcasts.size();
}
public MediaServer getMediaServerType() { public MediaServer getMediaServerType() {
if (this.mediaServer == null) { if (this.mediaServer == null) {
this.fetchMediaServerType(); this.fetchMediaServerType();

View File

@ -44,12 +44,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.client.internal.ProtocolElements;
import io.openvidu.java.client.ConnectionProperties;
import io.openvidu.java.client.ConnectionType;
import io.openvidu.java.client.OpenViduRole;
import io.openvidu.java.client.RecordingProperties; import io.openvidu.java.client.RecordingProperties;
import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.EndReason; import io.openvidu.server.core.EndReason;
@ -57,7 +51,6 @@ import io.openvidu.server.core.IdentifierPrefixes;
import io.openvidu.server.core.Session; import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionEventsHandler;
import io.openvidu.server.core.SessionManager; import io.openvidu.server.core.SessionManager;
import io.openvidu.server.core.Token;
import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.utils.MediaNodeManager; import io.openvidu.server.utils.MediaNodeManager;
import io.openvidu.server.utils.RemoteOperationUtils; import io.openvidu.server.utils.RemoteOperationUtils;
@ -393,6 +386,10 @@ public abstract class KmsManager {
public abstract void decrementActiveRecordings(RecordingProperties recordingProperties, String recordingId, public abstract void decrementActiveRecordings(RecordingProperties recordingProperties, String recordingId,
Session session); Session session);
public abstract void incrementActiveBroadcasts(RecordingProperties properties, Session session);
public abstract void decrementActiveBroadcasts(RecordingProperties properties, Session session);
public abstract void removeMediaNodeUponCrash(String mediaNodeId); public abstract void removeMediaNodeUponCrash(String mediaNodeId);
protected abstract String getEnvironmentId(String mediaNodeId); protected abstract String getEnvironmentId(String mediaNodeId);
@ -424,11 +421,12 @@ public abstract class KmsManager {
.collect(Collectors.toUnmodifiableList()); .collect(Collectors.toUnmodifiableList());
final List<String> affectedRecordingIds = kms.getActiveRecordings().stream().map(entry -> entry.getKey()) final List<String> affectedRecordingIds = kms.getActiveRecordings().stream().map(entry -> entry.getKey())
.collect(Collectors.toUnmodifiableList()); .collect(Collectors.toUnmodifiableList());
final List<String> affectedBroadcasts = new ArrayList<>(kms.getActiveBroadcasts());
// 1. Send nodeCrashed webhook event // 1. Send nodeCrashed webhook event
String environmentId = getEnvironmentId(kms.getId()); String environmentId = getEnvironmentId(kms.getId());
sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection, affectedSessionIds, sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection, affectedSessionIds,
affectedRecordingIds); affectedRecordingIds, affectedBroadcasts);
// 2. Remove Media Node from cluster if necessary // 2. Remove Media Node from cluster if necessary
if (mustRemoveMediaNode) { if (mustRemoveMediaNode) {

View File

@ -3,6 +3,7 @@ package io.openvidu.test.e2e;
import static org.openqa.selenium.OutputType.BASE64; import static org.openqa.selenium.OutputType.BASE64;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.nio.file.Files; import java.nio.file.Files;
@ -857,4 +858,47 @@ public class OpenViduTestE2e {
} }
} }
// https://github.com/tiangolo/nginx-rtmp-docker
protected static void startRtmpServer() throws IOException {
File file = writeRtmpServerConfigInFile();
String dockerRunCommand = "docker run -d --name broadcast-nginx -p 1935:1935 -v " + file.getAbsolutePath()
+ ":/etc/nginx/nginx.conf tiangolo/nginx-rtmp";
commandLine.executeCommand(dockerRunCommand, 10);
}
protected static void stopRtmpServer() {
String dockerRemoveCommand = "docker rm -f broadcast-nginx";
commandLine.executeCommand(dockerRemoveCommand, 10);
}
private static File writeRtmpServerConfigInFile() throws IOException {
String newLine = System.getProperty("line.separator");
// @formatter:off
String config = String.join(newLine,
"worker_processes auto;",
"rtmp_auto_push on;",
"events {}",
"rtmp {",
" server {",
" listen 1935;",
" listen [::]:1935 ipv6only=on;",
" application live {",
" live on;",
" recorder all {",
" record video;",
" record_path /tmp;",
" record_max_size 100000K;",
" record_unique on;",
" record_suffix rtmp.flv;",
" }",
" }",
" }",
"}");
// @formatter:on
File tmpFile = File.createTempFile("broadcast-nginx", ".conf");
FileWriter writer = new FileWriter(tmpFile);
writer.write(config);
writer.close();
return tmpFile;
}
} }

View File

@ -4,7 +4,6 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.awt.Point; import java.awt.Point;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader; import java.io.FileReader;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.nio.file.Files; import java.nio.file.Files;
@ -38,7 +37,6 @@ import org.openqa.selenium.Keys;
import org.openqa.selenium.WebElement; import org.openqa.selenium.WebElement;
import org.openqa.selenium.support.ui.ExpectedConditions; import org.openqa.selenium.support.ui.ExpectedConditions;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.util.ResourceUtils;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.JsonArray; import com.google.gson.JsonArray;
@ -480,7 +478,13 @@ public class OpenViduProTestAppE2eTest extends AbstractOpenViduTestappE2eTest {
containerId = restClient.rest(HttpMethod.GET, "/openvidu/api/media-nodes", HttpURLConnection.HTTP_OK) containerId = restClient.rest(HttpMethod.GET, "/openvidu/api/media-nodes", HttpURLConnection.HTTP_OK)
.get("content").getAsJsonArray().get(0).getAsJsonObject().get("environmentId").getAsString(); .get("content").getAsJsonArray().get(0).getAsJsonObject().get("environmentId").getAsString();
MediaNodeDockerUtils.crashMediaNode(containerId); MediaNodeDockerUtils.crashMediaNode(containerId);
CustomWebhook.waitForEvent("nodeCrashed", 10); JsonObject nodeCrashedEvent = CustomWebhook.waitForEvent("nodeCrashed", 10);
Assertions.assertEquals(1, nodeCrashedEvent.get("recordingIds").getAsJsonArray().size());
JsonArray affectedBroadcasts = nodeCrashedEvent.get("broadcasts").getAsJsonArray();
Assertions.assertEquals(1, affectedBroadcasts.size());
Assertions.assertTrue(affectedBroadcasts.get(0).equals(JsonParser.parseString("TestSession")));
CustomWebhook.waitForEvent("mediaNodeStatusChanged", 2); CustomWebhook.waitForEvent("mediaNodeStatusChanged", 2);
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
Assertions.assertEquals("nodeCrashed", Assertions.assertEquals("nodeCrashed",
@ -2983,10 +2987,10 @@ public class OpenViduProTestAppE2eTest extends AbstractOpenViduTestappE2eTest {
} }
@Test @Test
@DisplayName("Broadcast and composed recording test") @DisplayName("Broadcast ad STT and composed recording test")
void broadcastAndComposedRecordingTest() throws Exception { void broadcastAndSttAndComposedRecordingTest() throws Exception {
log.info("Broadcast and composed recording test"); log.info("Broadcast and STT and composed recording test");
try { try {
startRtmpServer(); startRtmpServer();
@ -2996,19 +3000,6 @@ public class OpenViduProTestAppE2eTest extends AbstractOpenViduTestappE2eTest {
} }
} }
// https://github.com/tiangolo/nginx-rtmp-docker
private void startRtmpServer() throws FileNotFoundException {
File file = ResourceUtils.getFile("classpath:broadcast-nginx.conf");
String dockerRunCommand = "docker run -d --name broadcast-nginx -p 1935:1935 -v " + file.getAbsolutePath()
+ ":/etc/nginx/nginx.conf tiangolo/nginx-rtmp";
commandLine.executeCommand(dockerRunCommand, 10);
}
private void stopRtmpServer() {
String dockerRemoveCommand = "docker rm -f broadcast-nginx";
commandLine.executeCommand(dockerRemoveCommand, 10);
}
private void checkRtmpRecordingIsFine(long secondsTimeout) throws InterruptedException { private void checkRtmpRecordingIsFine(long secondsTimeout) throws InterruptedException {
final String broadcastRecordingPath = "/opt/openvidu/recordings"; final String broadcastRecordingPath = "/opt/openvidu/recordings";
final String cleanBroadcastPath = "rm -rf " + broadcastRecordingPath + "/tmp"; final String cleanBroadcastPath = "rm -rf " + broadcastRecordingPath + "/tmp";