openvidu-server: active recordings from integer to map of ids

pull/574/head
pabloFuente 2020-12-02 13:09:31 +01:00
parent e241d3ff0e
commit 6a844ae145
7 changed files with 54 additions and 39 deletions

View File

@ -65,18 +65,18 @@ public class FixedOneKmsManager extends KmsManager {
} }
@Override @Override
public void incrementActiveRecordings(String mediaNodeId) { public void incrementActiveRecordings(String mediaNodeId, String recordingId, String sessionId) {
try { try {
this.getKmss().iterator().next().incrementActiveRecordings(); this.getKmss().iterator().next().incrementActiveRecordings(recordingId, sessionId);
} catch (NoSuchElementException e) { } catch (NoSuchElementException e) {
log.error("There is no KMS available when incrementing active recordings"); log.error("There is no KMS available when incrementing active recordings");
} }
} }
@Override @Override
public void decrementActiveRecordings(String mediaNodeId) { public void decrementActiveRecordings(String mediaNodeId, String recordingId) {
try { try {
this.getKmss().iterator().next().decrementActiveRecordings(); this.getKmss().iterator().next().decrementActiveRecordings(recordingId);
} catch (NoSuchElementException e) { } catch (NoSuchElementException e) {
log.error("There is no KMS available when decrementing active recordings"); log.error("There is no KMS available when decrementing active recordings");
} }

View File

@ -21,9 +21,10 @@ import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.kurento.client.KurentoClient; import org.kurento.client.KurentoClient;
@ -65,7 +66,7 @@ public class Kms {
private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0); private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0);
private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>(); private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>();
private AtomicInteger activeRecordings = new AtomicInteger(0); private Map<String, String> activeRecordings = new ConcurrentHashMap<>();
public Kms(KmsProperties props, LoadManager loadManager, QuarantineKiller quarantineKiller) { public Kms(KmsProperties props, LoadManager loadManager, QuarantineKiller quarantineKiller) {
this.id = props.getId(); this.id = props.getId();
@ -148,16 +149,16 @@ public class Kms {
this.kurentoSessions.remove(sessionId); this.kurentoSessions.remove(sessionId);
} }
public synchronized int getActiveRecordings() { public synchronized Set<Entry<String, String>> getActiveRecordings() {
return this.activeRecordings.get(); return this.activeRecordings.entrySet();
} }
public synchronized int incrementActiveRecordings() { public synchronized void incrementActiveRecordings(String recordingId, String sessionId) {
return this.activeRecordings.incrementAndGet(); this.activeRecordings.put(recordingId, sessionId);
} }
public synchronized void decrementActiveRecordings() { public synchronized void decrementActiveRecordings(String recordingId) {
this.activeRecordings.updateAndGet(i -> i > 0 ? i - 1 : i); this.activeRecordings.remove(recordingId);
this.quarantineKiller.dropMediaNode(this.id); this.quarantineKiller.dropMediaNode(this.id);
} }
@ -176,7 +177,7 @@ public class Kms {
return json; return json;
} }
public JsonObject toJsonExtended(boolean withSessions, boolean withExtraInfo) { public JsonObject toJsonExtended(boolean withSessions, boolean withActiveRecordings, boolean withExtraInfo) {
JsonObject json = this.toJson(); JsonObject json = this.toJson();
@ -188,6 +189,14 @@ public class Kms {
json.add("sessions", sessions); json.add("sessions", sessions);
} }
if (withActiveRecordings) {
JsonArray activeRecordingsJson = new JsonArray();
for (String recordingId : this.activeRecordings.keySet()) {
activeRecordingsJson.add(recordingId);
}
json.add("activeRecordings", activeRecordingsJson);
}
if (withExtraInfo) { if (withExtraInfo) {
if (json.get("connected").getAsBoolean()) { if (json.get("connected").getAsBoolean()) {

View File

@ -89,8 +89,8 @@ public abstract class KmsManager {
return json; return json;
} }
public JsonObject toJsonExtended(boolean withSessions, boolean withExtraInfo) { public JsonObject toJsonExtended(boolean withSessions, boolean withActiveRecordings, boolean withExtraInfo) {
JsonObject json = this.kms.toJsonExtended(withSessions, withExtraInfo); JsonObject json = this.kms.toJsonExtended(withSessions, withActiveRecordings, withExtraInfo);
json.addProperty("load", this.load); json.addProperty("load", this.load);
return json; return json;
} }
@ -356,9 +356,9 @@ public abstract class KmsManager {
public abstract boolean isMediaNodeAvailableForRecording(String mediaNodeId); public abstract boolean isMediaNodeAvailableForRecording(String mediaNodeId);
public abstract void incrementActiveRecordings(String mediaNodeId); public abstract void incrementActiveRecordings(String mediaNodeId, String recordingId, String sessionId);
public abstract void decrementActiveRecordings(String mediaNodeId); public abstract void decrementActiveRecordings(String mediaNodeId, String recordingId);
@PostConstruct @PostConstruct
protected abstract void postConstructInitKurentoClients(); protected abstract void postConstructInitKurentoClients();

View File

@ -80,12 +80,8 @@ public class ComposedRecordingService extends RecordingService {
} }
@Override @Override
public Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException { public Recording startRecording(Session session, String recordingId, RecordingProperties properties)
throws OpenViduException {
PropertiesRecordingId updatePropertiesAndRecordingId = this.setFinalRecordingNameAndGetFreeRecordingId(session,
properties);
properties = updatePropertiesAndRecordingId.properties;
String recordingId = updatePropertiesAndRecordingId.recordingId;
// Instantiate and store recording object // Instantiate and store recording object
Recording recording = new Recording(session.getSessionId(), recordingId, properties); Recording recording = new Recording(session.getSessionId(), recordingId, properties);
@ -290,7 +286,8 @@ public class ComposedRecordingService extends RecordingService {
cleanRecordingMaps(recordingAux); cleanRecordingMaps(recordingAux);
// Decrement active recordings // Decrement active recordings
kmsManager.decrementActiveRecordings(recordingAux.getRecordingProperties().mediaNode()); kmsManager.decrementActiveRecordings(recordingAux.getRecordingProperties().mediaNode(),
recordingId);
if (i == timeout) { if (i == timeout) {
log.error("Container did not launched in {} seconds", timeout / 2); log.error("Container did not launched in {} seconds", timeout / 2);
@ -366,7 +363,8 @@ public class ComposedRecordingService extends RecordingService {
// Decrement active recordings once it is downloaded. This method will also drop // Decrement active recordings once it is downloaded. This method will also drop
// the Media Node if no more sessions or recordings and status is // the Media Node if no more sessions or recordings and status is
// waiting-idle-to-terminate // waiting-idle-to-terminate
kmsManager.decrementActiveRecordings(finalRecordingArray[0].getRecordingProperties().mediaNode()); kmsManager.decrementActiveRecordings(finalRecordingArray[0].getRecordingProperties().mediaNode(),
finalRecordingArray[0].getId());
// Upload if necessary // Upload if necessary
this.uploadRecording(finalRecordingArray[0], reason); this.uploadRecording(finalRecordingArray[0], reason);
@ -595,7 +593,7 @@ public class ComposedRecordingService extends RecordingService {
// Decrement active recordings once it is downloaded. This method will also drop // Decrement active recordings once it is downloaded. This method will also drop
// the Media Node if no more sessions or recordings and status is // the Media Node if no more sessions or recordings and status is
// waiting-idle-to-terminate // waiting-idle-to-terminate
kmsManager.decrementActiveRecordings(recording.getRecordingProperties().mediaNode()); kmsManager.decrementActiveRecordings(recording.getRecordingProperties().mediaNode(), recording.getId());
// Upload if necessary // Upload if necessary
this.uploadRecording(recording, reason); this.uploadRecording(recording, reason);

View File

@ -72,6 +72,7 @@ import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingDownloader;
import io.openvidu.server.recording.RecordingUploader; import io.openvidu.server.recording.RecordingUploader;
import io.openvidu.server.recording.service.RecordingService.PropertiesRecordingId;
import io.openvidu.server.utils.CustomFileManager; import io.openvidu.server.utils.CustomFileManager;
import io.openvidu.server.utils.DockerManager; import io.openvidu.server.utils.DockerManager;
import io.openvidu.server.utils.JsonUtils; import io.openvidu.server.utils.JsonUtils;
@ -261,11 +262,18 @@ public class RecordingManager {
} }
public Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException { public Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException {
String recordingId = null;
try { try {
PropertiesRecordingId updatePropertiesAndRecordingId = ((RecordingService) this.composedRecordingService)
.setFinalRecordingNameAndGetFreeRecordingId(session, properties);
properties = updatePropertiesAndRecordingId.properties;
recordingId = updatePropertiesAndRecordingId.recordingId;
// INCREMENT ACTIVE RECORDINGS OF MEDIA NODE HERE. IF MEDIA NODE IS NOT // INCREMENT ACTIVE RECORDINGS OF MEDIA NODE HERE. IF MEDIA NODE IS NOT
// AVAILABLE FOR STARTING NEW RECORDINGS THIS METHOD THROWS AN EXCEPTION // AVAILABLE FOR STARTING NEW RECORDINGS THIS METHOD THROWS AN EXCEPTION
kmsManager.incrementActiveRecordings(properties.mediaNode()); kmsManager.incrementActiveRecordings(properties.mediaNode(), recordingId, session.getSessionId());
try { try {
if (session.recordingLock.tryLock(15, TimeUnit.SECONDS)) { if (session.recordingLock.tryLock(15, TimeUnit.SECONDS)) {
@ -277,13 +285,16 @@ public class RecordingManager {
Recording recording = null; Recording recording = null;
switch (properties.outputMode()) { switch (properties.outputMode()) {
case COMPOSED: case COMPOSED:
recording = this.composedRecordingService.startRecording(session, properties); recording = this.composedRecordingService.startRecording(session, recordingId,
properties);
break; break;
case COMPOSED_QUICK_START: case COMPOSED_QUICK_START:
recording = this.composedQuickStartRecordingService.startRecording(session, properties); recording = this.composedQuickStartRecordingService.startRecording(session, recordingId,
properties);
break; break;
case INDIVIDUAL: case INDIVIDUAL:
recording = this.singleStreamRecordingService.startRecording(session, properties); recording = this.singleStreamRecordingService.startRecording(session, recordingId,
properties);
break; break;
} }
this.recordingFromStartingToStarted(recording); this.recordingFromStartingToStarted(recording);
@ -323,7 +334,7 @@ public class RecordingManager {
} }
} catch (Exception e) { } catch (Exception e) {
// DECREMENT ACTIVE RECORDINGS OF MEDIA NODE AND TRY REMOVE MEDIA NODE HERE // DECREMENT ACTIVE RECORDINGS OF MEDIA NODE AND TRY REMOVE MEDIA NODE HERE
kmsManager.decrementActiveRecordings(properties.mediaNode()); kmsManager.decrementActiveRecordings(properties.mediaNode(), recordingId);
throw e; throw e;
} }
} }

View File

@ -71,7 +71,8 @@ public abstract class RecordingService {
this.cdr = cdr; this.cdr = cdr;
} }
public abstract Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException; public abstract Recording startRecording(Session session, String recordingId, RecordingProperties properties)
throws OpenViduException;
public abstract Recording stopRecording(Session session, Recording recording, EndReason reason); public abstract Recording stopRecording(Session session, Recording recording, EndReason reason);

View File

@ -85,12 +85,8 @@ public class SingleStreamRecordingService extends RecordingService {
} }
@Override @Override
public Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException { public Recording startRecording(Session session, String recordingId, RecordingProperties properties)
throws OpenViduException {
PropertiesRecordingId updatePropertiesAndRecordingId = this.setFinalRecordingNameAndGetFreeRecordingId(session,
properties);
properties = updatePropertiesAndRecordingId.properties;
String recordingId = updatePropertiesAndRecordingId.recordingId;
log.info("Starting individual ({}) recording {} of session {}", log.info("Starting individual ({}) recording {} of session {}",
properties.hasVideo() ? (properties.hasAudio() ? "video+audio" : "video-only") : "audioOnly", properties.hasVideo() ? (properties.hasAudio() ? "video+audio" : "video-only") : "audioOnly",
@ -187,7 +183,7 @@ public class SingleStreamRecordingService extends RecordingService {
// Decrement active recordings once it is downloaded. This method will also drop // Decrement active recordings once it is downloaded. This method will also drop
// the Media Node if no more sessions or recordings and status is // the Media Node if no more sessions or recordings and status is
// waiting-idle-to-terminate // waiting-idle-to-terminate
kmsManager.decrementActiveRecordings(session.getMediaNodeId()); kmsManager.decrementActiveRecordings(session.getMediaNodeId(), finalRecordingArray[0].getId());
// Upload if necessary // Upload if necessary
this.uploadRecording(finalRecordingArray[0], reason); this.uploadRecording(finalRecordingArray[0], reason);