mirror of https://github.com/OpenVidu/openvidu.git
openvidu-server: active recordings count in Kms
parent
0f90627f03
commit
00f2127264
|
@ -23,6 +23,7 @@ import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.kurento.client.KurentoClient;
|
import org.kurento.client.KurentoClient;
|
||||||
|
@ -62,6 +63,7 @@ public class Kms {
|
||||||
private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0);
|
private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0);
|
||||||
|
|
||||||
private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>();
|
private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>();
|
||||||
|
private AtomicInteger activeRecordings = new AtomicInteger(0);
|
||||||
|
|
||||||
public Kms(KmsProperties props, LoadManager loadManager) {
|
public Kms(KmsProperties props, LoadManager loadManager) {
|
||||||
this.id = props.getId();
|
this.id = props.getId();
|
||||||
|
@ -143,6 +145,10 @@ public class Kms {
|
||||||
this.kurentoSessions.remove(sessionId);
|
this.kurentoSessions.remove(sessionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public AtomicInteger getActiveRecordings() {
|
||||||
|
return this.activeRecordings;
|
||||||
|
}
|
||||||
|
|
||||||
public JsonObject toJson() {
|
public JsonObject toJson() {
|
||||||
JsonObject json = new JsonObject();
|
JsonObject json = new JsonObject();
|
||||||
json.addProperty("id", this.id);
|
json.addProperty("id", this.id);
|
||||||
|
|
|
@ -10,7 +10,6 @@ public class DummyRecordingDownloader implements RecordingDownloader {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Just immediately run callback function
|
// Just immediately run callback function
|
||||||
callback.run();
|
callback.run();
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -58,6 +58,7 @@ import io.openvidu.server.recording.Recording;
|
||||||
import io.openvidu.server.recording.RecordingDownloader;
|
import io.openvidu.server.recording.RecordingDownloader;
|
||||||
import io.openvidu.server.recording.RecordingInfoUtils;
|
import io.openvidu.server.recording.RecordingInfoUtils;
|
||||||
import io.openvidu.server.utils.DockerManager;
|
import io.openvidu.server.utils.DockerManager;
|
||||||
|
import io.openvidu.server.utils.QuarantineKiller;
|
||||||
|
|
||||||
public class ComposedRecordingService extends RecordingService {
|
public class ComposedRecordingService extends RecordingService {
|
||||||
|
|
||||||
|
@ -70,8 +71,8 @@ public class ComposedRecordingService extends RecordingService {
|
||||||
private DockerManager dockerManager;
|
private DockerManager dockerManager;
|
||||||
|
|
||||||
public ComposedRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
|
public ComposedRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
|
||||||
OpenviduConfig openviduConfig, CallDetailRecord cdr) {
|
OpenviduConfig openviduConfig, CallDetailRecord cdr, QuarantineKiller quarantineKiller) {
|
||||||
super(recordingManager, recordingDownloader, openviduConfig, cdr);
|
super(recordingManager, recordingDownloader, openviduConfig, cdr, quarantineKiller);
|
||||||
this.dockerManager = new DockerManager();
|
this.dockerManager = new DockerManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,6 +96,9 @@ public class ComposedRecordingService extends RecordingService {
|
||||||
recording = this.startRecordingAudioOnly(session, recording, properties);
|
recording = this.startRecordingAudioOnly(session, recording, properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Increment active recordings
|
||||||
|
((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet();
|
||||||
|
|
||||||
return recording;
|
return recording;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -341,6 +345,10 @@ public class ComposedRecordingService extends RecordingService {
|
||||||
this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason);
|
this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Decrement active recordings
|
||||||
|
((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet();
|
||||||
|
|
||||||
return recording;
|
return recording;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -380,7 +388,6 @@ public class ComposedRecordingService extends RecordingService {
|
||||||
|
|
||||||
this.cleanRecordingMaps(recording);
|
this.cleanRecordingMaps(recording);
|
||||||
|
|
||||||
// TODO: DOWNLOAD FILE IF SCALABILITY MODE
|
|
||||||
final Recording[] finalRecordingArray = new Recording[1];
|
final Recording[] finalRecordingArray = new Recording[1];
|
||||||
finalRecordingArray[0] = recording;
|
finalRecordingArray[0] = recording;
|
||||||
try {
|
try {
|
||||||
|
@ -398,6 +405,13 @@ public class ComposedRecordingService extends RecordingService {
|
||||||
final long timestamp = System.currentTimeMillis();
|
final long timestamp = System.currentTimeMillis();
|
||||||
cdr.recordRecordingStatusChanged(finalRecordingArray[0], reason, timestamp,
|
cdr.recordRecordingStatusChanged(finalRecordingArray[0], reason, timestamp,
|
||||||
finalRecordingArray[0].getStatus());
|
finalRecordingArray[0].getStatus());
|
||||||
|
|
||||||
|
// Decrement active recordings once it is downloaded
|
||||||
|
((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet();
|
||||||
|
|
||||||
|
// Now we can drop Media Node if waiting-idle-to-terminate
|
||||||
|
this.quarantineKiller.dropMediaNode(session.getMediaNodeId());
|
||||||
|
|
||||||
});
|
});
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("Error while downloading recording {}: {}", finalRecordingArray[0].getName(), e.getMessage());
|
log.error("Error while downloading recording {}: {}", finalRecordingArray[0].getName(), e.getMessage());
|
||||||
|
|
|
@ -73,6 +73,7 @@ import io.openvidu.server.recording.RecordingDownloader;
|
||||||
import io.openvidu.server.utils.CustomFileManager;
|
import io.openvidu.server.utils.CustomFileManager;
|
||||||
import io.openvidu.server.utils.DockerManager;
|
import io.openvidu.server.utils.DockerManager;
|
||||||
import io.openvidu.server.utils.JsonUtils;
|
import io.openvidu.server.utils.JsonUtils;
|
||||||
|
import io.openvidu.server.utils.QuarantineKiller;
|
||||||
|
|
||||||
public class RecordingManager {
|
public class RecordingManager {
|
||||||
|
|
||||||
|
@ -97,6 +98,9 @@ public class RecordingManager {
|
||||||
@Autowired
|
@Autowired
|
||||||
private KmsManager kmsManager;
|
private KmsManager kmsManager;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
protected QuarantineKiller quarantineKiller;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private CallDetailRecord cdr;
|
private CallDetailRecord cdr;
|
||||||
|
|
||||||
|
@ -150,9 +154,10 @@ public class RecordingManager {
|
||||||
RecordingManager.IMAGE_TAG = openviduConfig.getOpenViduRecordingVersion();
|
RecordingManager.IMAGE_TAG = openviduConfig.getOpenViduRecordingVersion();
|
||||||
|
|
||||||
this.dockerManager = new DockerManager();
|
this.dockerManager = new DockerManager();
|
||||||
this.composedRecordingService = new ComposedRecordingService(this, recordingDownloader, openviduConfig, cdr);
|
this.composedRecordingService = new ComposedRecordingService(this, recordingDownloader, openviduConfig, cdr,
|
||||||
|
quarantineKiller);
|
||||||
this.singleStreamRecordingService = new SingleStreamRecordingService(this, recordingDownloader, openviduConfig,
|
this.singleStreamRecordingService = new SingleStreamRecordingService(this, recordingDownloader, openviduConfig,
|
||||||
cdr);
|
cdr, quarantineKiller);
|
||||||
|
|
||||||
log.info("Recording module required: Downloading openvidu/openvidu-recording:"
|
log.info("Recording module required: Downloading openvidu/openvidu-recording:"
|
||||||
+ openviduConfig.getOpenViduRecordingVersion() + " Docker image (350MB aprox)");
|
+ openviduConfig.getOpenViduRecordingVersion() + " Docker image (350MB aprox)");
|
||||||
|
@ -603,7 +608,8 @@ public class RecordingManager {
|
||||||
log.warn("No KMSs were defined in kms.uris array. Recording path check aborted");
|
log.warn("No KMSs were defined in kms.uris array. Recording path check aborted");
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
MediaPipeline pipeline = this.kmsManager.getLessLoadedAndRunningKms().getKurentoClient().createMediaPipeline();
|
MediaPipeline pipeline = this.kmsManager.getLessLoadedAndRunningKms().getKurentoClient()
|
||||||
|
.createMediaPipeline();
|
||||||
RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, "file://" + testFilePath).build();
|
RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, "file://" + testFilePath).build();
|
||||||
|
|
||||||
final AtomicBoolean kurentoRecorderError = new AtomicBoolean(false);
|
final AtomicBoolean kurentoRecorderError = new AtomicBoolean(false);
|
||||||
|
|
|
@ -34,6 +34,7 @@ import io.openvidu.server.recording.Recording;
|
||||||
import io.openvidu.server.recording.RecordingDownloader;
|
import io.openvidu.server.recording.RecordingDownloader;
|
||||||
import io.openvidu.server.utils.CommandExecutor;
|
import io.openvidu.server.utils.CommandExecutor;
|
||||||
import io.openvidu.server.utils.CustomFileManager;
|
import io.openvidu.server.utils.CustomFileManager;
|
||||||
|
import io.openvidu.server.utils.QuarantineKiller;
|
||||||
|
|
||||||
public abstract class RecordingService {
|
public abstract class RecordingService {
|
||||||
|
|
||||||
|
@ -43,14 +44,16 @@ public abstract class RecordingService {
|
||||||
protected RecordingManager recordingManager;
|
protected RecordingManager recordingManager;
|
||||||
protected RecordingDownloader recordingDownloader;
|
protected RecordingDownloader recordingDownloader;
|
||||||
protected CallDetailRecord cdr;
|
protected CallDetailRecord cdr;
|
||||||
|
protected QuarantineKiller quarantineKiller;
|
||||||
protected CustomFileManager fileWriter = new CustomFileManager();
|
protected CustomFileManager fileWriter = new CustomFileManager();
|
||||||
|
|
||||||
RecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
|
RecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
|
||||||
OpenviduConfig openviduConfig, CallDetailRecord cdr) {
|
OpenviduConfig openviduConfig, CallDetailRecord cdr, QuarantineKiller quarantineKiller) {
|
||||||
this.recordingManager = recordingManager;
|
this.recordingManager = recordingManager;
|
||||||
this.recordingDownloader = recordingDownloader;
|
this.recordingDownloader = recordingDownloader;
|
||||||
this.openviduConfig = openviduConfig;
|
this.openviduConfig = openviduConfig;
|
||||||
this.cdr = cdr;
|
this.cdr = cdr;
|
||||||
|
this.quarantineKiller = quarantineKiller;
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException;
|
public abstract Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException;
|
||||||
|
|
|
@ -59,10 +59,12 @@ import io.openvidu.server.core.EndReason;
|
||||||
import io.openvidu.server.core.Participant;
|
import io.openvidu.server.core.Participant;
|
||||||
import io.openvidu.server.core.Session;
|
import io.openvidu.server.core.Session;
|
||||||
import io.openvidu.server.kurento.core.KurentoParticipant;
|
import io.openvidu.server.kurento.core.KurentoParticipant;
|
||||||
|
import io.openvidu.server.kurento.core.KurentoSession;
|
||||||
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
|
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
|
||||||
import io.openvidu.server.recording.RecorderEndpointWrapper;
|
import io.openvidu.server.recording.RecorderEndpointWrapper;
|
||||||
import io.openvidu.server.recording.Recording;
|
import io.openvidu.server.recording.Recording;
|
||||||
import io.openvidu.server.recording.RecordingDownloader;
|
import io.openvidu.server.recording.RecordingDownloader;
|
||||||
|
import io.openvidu.server.utils.QuarantineKiller;
|
||||||
|
|
||||||
public class SingleStreamRecordingService extends RecordingService {
|
public class SingleStreamRecordingService extends RecordingService {
|
||||||
|
|
||||||
|
@ -74,8 +76,8 @@ public class SingleStreamRecordingService extends RecordingService {
|
||||||
private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream.";
|
private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream.";
|
||||||
|
|
||||||
public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
|
public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
|
||||||
OpenviduConfig openviduConfig, CallDetailRecord cdr) {
|
OpenviduConfig openviduConfig, CallDetailRecord cdr, QuarantineKiller quarantineKiller) {
|
||||||
super(recordingManager, recordingDownloader, openviduConfig, cdr);
|
super(recordingManager, recordingDownloader, openviduConfig, cdr, quarantineKiller);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -129,6 +131,9 @@ public class SingleStreamRecordingService extends RecordingService {
|
||||||
|
|
||||||
this.generateRecordingMetadataFile(recording);
|
this.generateRecordingMetadataFile(recording);
|
||||||
|
|
||||||
|
// Increment active recordings
|
||||||
|
((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet();
|
||||||
|
|
||||||
return recording;
|
return recording;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,7 +168,6 @@ public class SingleStreamRecordingService extends RecordingService {
|
||||||
|
|
||||||
this.cleanRecordingMaps(recording);
|
this.cleanRecordingMaps(recording);
|
||||||
|
|
||||||
// TODO: DOWNLOAD FILES IF SCALABILITY MODE
|
|
||||||
final Recording[] finalRecordingArray = new Recording[1];
|
final Recording[] finalRecordingArray = new Recording[1];
|
||||||
finalRecordingArray[0] = recording;
|
finalRecordingArray[0] = recording;
|
||||||
try {
|
try {
|
||||||
|
@ -181,6 +185,13 @@ public class SingleStreamRecordingService extends RecordingService {
|
||||||
finalRecordingArray[0].getStatus());
|
finalRecordingArray[0].getStatus());
|
||||||
|
|
||||||
storedRecorders.remove(finalRecordingArray[0].getSessionId());
|
storedRecorders.remove(finalRecordingArray[0].getSessionId());
|
||||||
|
|
||||||
|
// Decrement active recordings once it is downloaded
|
||||||
|
((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet();
|
||||||
|
|
||||||
|
// Now we can drop Media Node if waiting-idle-to-terminate
|
||||||
|
this.quarantineKiller.dropMediaNode(session.getMediaNodeId());
|
||||||
|
|
||||||
});
|
});
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("Error while downloading recording {}", finalRecordingArray[0].getName());
|
log.error("Error while downloading recording {}", finalRecordingArray[0].getName());
|
||||||
|
|
Loading…
Reference in New Issue