openvidu-server: active recordings counter improved

pull/574/head
pabloFuente 2020-12-02 10:35:39 +01:00
parent 9aa76d80aa
commit 5295a3a497
7 changed files with 102 additions and 29 deletions

View File

@ -53,6 +53,11 @@ public class FixedOneKmsManager extends KmsManager {
return Arrays.asList(kms);
}
@Override
public boolean isMediaNodeAvailableForSession(String mediaNodeId) {
return true;
}
@Override
public boolean isMediaNodeAvailableForRecording(String mediaNodeId) {
return true;

View File

@ -42,6 +42,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import com.google.gson.JsonObject;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.IdentifierPrefixes;
import io.openvidu.server.kurento.core.KurentoSession;
@ -349,9 +351,72 @@ public abstract class KmsManager {
return intervals[intervals.length - 1][0];
}
public void incrementActiveRecordings(String mediaNodeId) throws OpenViduException {
try {
if (KmsManager.selectAndRemoveKmsLock.tryLock(KmsManager.MAX_SECONDS_LOCK_WAIT, TimeUnit.SECONDS)) {
try {
final Kms kms = this.getKms(mediaNodeId);
if (kms == null) {
throw new OpenViduException(Code.MEDIA_NODE_NOT_FOUND,
"Media Node " + mediaNodeId + " does not exist");
}
kms.incrementActiveRecordings();
log.info("Incremented number of active recordings in Media Node {}. Current number: {}",
mediaNodeId, kms.getActiveRecordings());
if (!isMediaNodeAvailableForRecording(mediaNodeId)) {
throw new OpenViduException(Code.MEDIA_NODE_STATUS_WRONG,
"Media Node " + mediaNodeId + " does not allow starting new Recordings");
}
} finally {
KmsManager.selectAndRemoveKmsLock.unlock();
}
} else {
throw new OpenViduException(Code.GENERIC_ERROR_CODE,
"selectAndRemoveKmsLock couldn't be acquired within " + KmsManager.MAX_SECONDS_LOCK_WAIT
+ " seconds when incrementing active recordings of Media Node " + mediaNodeId);
}
} catch (InterruptedException e) {
throw new OpenViduException(Code.GENERIC_ERROR_CODE,
"InterruptedException waiting to acquire selectAndRemoveKmsLock when incrementing active recordings of Media Node "
+ mediaNodeId + ": " + e.getMessage());
}
}
public void decrementActiveRecordings(String mediaNodeId) throws OpenViduException {
try {
if (KmsManager.selectAndRemoveKmsLock.tryLock(KmsManager.MAX_SECONDS_LOCK_WAIT, TimeUnit.SECONDS)) {
try {
final Kms kms = this.getKms(mediaNodeId);
if (kms != null) {
kms.decrementActiveRecordings();
log.info("Decremented number of active recordings in Media Node {}. Current number: {}",
mediaNodeId, kms.getActiveRecordings());
} else {
log.warn("Trying to decrement active recordings of Media Node {} but cannot be found",
mediaNodeId);
}
} finally {
KmsManager.selectAndRemoveKmsLock.unlock();
}
} else {
throw new OpenViduException(Code.GENERIC_ERROR_CODE,
"selectAndRemoveKmsLock couldn't be acquired within " + KmsManager.MAX_SECONDS_LOCK_WAIT
+ " seconds when decrementing active recordings of Media Node " + mediaNodeId);
}
} catch (InterruptedException e) {
throw new OpenViduException(Code.GENERIC_ERROR_CODE,
"InterruptedException waiting to acquire selectAndRemoveKmsLock when decrementing active recordings of Media Node "
+ mediaNodeId + ": " + e.getMessage());
}
}
public abstract List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure)
throws Exception;
public abstract boolean isMediaNodeAvailableForSession(String mediaNodeId);
public abstract boolean isMediaNodeAvailableForRecording(String mediaNodeId);
@PostConstruct

View File

@ -24,6 +24,7 @@ import io.openvidu.server.cdr.CallDetailRecord;
import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.Session;
import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.RecordingDownloader;
import io.openvidu.server.recording.RecordingUploader;
@ -35,9 +36,10 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService
private static final Logger log = LoggerFactory.getLogger(ComposedRecordingService.class);
public ComposedQuickStartRecordingService(RecordingManager recordingManager,
RecordingDownloader recordingDownloader, RecordingUploader recordingUploader, CustomFileManager fileManager,
OpenviduConfig openviduConfig, CallDetailRecord cdr, DockerManager dockerManager) {
super(recordingManager, recordingDownloader, recordingUploader, fileManager, openviduConfig, cdr,
RecordingDownloader recordingDownloader, RecordingUploader recordingUploader, KmsManager kmsManager,
CustomFileManager fileManager, OpenviduConfig openviduConfig, CallDetailRecord cdr,
DockerManager dockerManager) {
super(recordingManager, recordingDownloader, recordingUploader, kmsManager, fileManager, openviduConfig, cdr,
dockerManager);
}

View File

@ -52,6 +52,7 @@ import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.recording.CompositeWrapper;
import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.RecordingDownloader;
@ -72,9 +73,9 @@ public class ComposedRecordingService extends RecordingService {
protected DockerManager dockerManager;
public ComposedRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
RecordingUploader recordingUploader, CustomFileManager fileManager, OpenviduConfig openviduConfig,
CallDetailRecord cdr, DockerManager dockerManager) {
super(recordingManager, recordingDownloader, recordingUploader, fileManager, openviduConfig, cdr);
RecordingUploader recordingUploader, KmsManager kmsManager, CustomFileManager fileManager,
OpenviduConfig openviduConfig, CallDetailRecord cdr, DockerManager dockerManager) {
super(recordingManager, recordingDownloader, recordingUploader, kmsManager, fileManager, openviduConfig, cdr);
this.dockerManager = dockerManager;
}
@ -287,12 +288,14 @@ public class ComposedRecordingService extends RecordingService {
}
}
cleanRecordingMaps(recordingAux);
// Decrement active recordings
kmsManager.decrementActiveRecordings(recordingAux.getRecordingProperties().mediaNode());
if (i == timeout) {
log.error("Container did not launched in {} seconds", timeout / 2);
return;
}
// Decrement active recordings
((KurentoSession) session).getKms().decrementActiveRecordings();
}).start();
}
} else {
@ -363,7 +366,7 @@ public class ComposedRecordingService extends RecordingService {
// Decrement active recordings once it is downloaded. This method will also drop
// the Media Node if no more sessions or recordings and status is
// waiting-idle-to-terminate
((KurentoSession) session).getKms().decrementActiveRecordings();
kmsManager.decrementActiveRecordings(finalRecordingArray[0].getRecordingProperties().mediaNode());
// Upload if necessary
this.uploadRecording(finalRecordingArray[0], reason);
@ -592,7 +595,7 @@ public class ComposedRecordingService extends RecordingService {
// Decrement active recordings once it is downloaded. This method will also drop
// the Media Node if no more sessions or recordings and status is
// waiting-idle-to-terminate
((KurentoSession) session).getKms().decrementActiveRecordings();
kmsManager.decrementActiveRecordings(recording.getRecordingProperties().mediaNode());
// Upload if necessary
this.uploadRecording(recording, reason);

View File

@ -67,7 +67,6 @@ import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionEventsHandler;
import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.recording.Recording;
@ -168,11 +167,11 @@ public class RecordingManager {
this.dockerManager.init();
this.composedRecordingService = new ComposedRecordingService(this, recordingDownloader, recordingUploader,
fileManager, openviduConfig, cdr, this.dockerManager);
kmsManager, fileManager, openviduConfig, cdr, this.dockerManager);
this.composedQuickStartRecordingService = new ComposedQuickStartRecordingService(this, recordingDownloader,
recordingUploader, fileManager, openviduConfig, cdr, this.dockerManager);
recordingUploader, kmsManager, fileManager, openviduConfig, cdr, this.dockerManager);
this.singleStreamRecordingService = new SingleStreamRecordingService(this, recordingDownloader,
recordingUploader, fileManager, openviduConfig, cdr);
recordingUploader, kmsManager, fileManager, openviduConfig, cdr);
this.checkRecordingRequirements(this.openviduConfig.getOpenViduRecordingPath(),
this.openviduConfig.getOpenviduRecordingCustomLayout());
@ -264,13 +263,9 @@ public class RecordingManager {
public Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException {
try {
// 1. INCREMENT ACTIVE RECORDINGS OF MEDIA NODE HERE
((KurentoSession) session).getKms().incrementActiveRecordings();
// 2. CHECK THAT MEDIA NODE HAS RUNNING STATUS. IF NOT THEN FAIL RECORDING START
if (!kmsManager.isMediaNodeAvailableForRecording(properties.mediaNode())) {
throw new OpenViduException(Code.MEDIA_NODE_STATUS_WRONG,
"Media Node " + properties.mediaNode() + " status is not \"running\"");
}
// INCREMENT ACTIVE RECORDINGS OF MEDIA NODE HERE. IF MEDIA NODE IS NOT
// AVAILABLE FOR STARTING NEW RECORDINGS THIS METHOD THROWS AN EXCEPTION
kmsManager.incrementActiveRecordings(properties.mediaNode());
try {
if (session.recordingLock.tryLock(15, TimeUnit.SECONDS)) {
@ -328,7 +323,7 @@ public class RecordingManager {
}
} catch (Exception e) {
// DECREMENT ACTIVE RECORDINGS OF MEDIA NODE AND TRY REMOVE MEDIA NODE HERE
((KurentoSession) session).getKms().decrementActiveRecordings();
kmsManager.decrementActiveRecordings(properties.mediaNode());
throw e;
}
}

View File

@ -31,6 +31,7 @@ import io.openvidu.server.cdr.CallDetailRecord;
import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.Session;
import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.RecordingDownloader;
import io.openvidu.server.recording.RecordingUploader;
@ -46,6 +47,7 @@ public abstract class RecordingService {
protected RecordingManager recordingManager;
protected RecordingDownloader recordingDownloader;
protected RecordingUploader recordingUploader;
protected KmsManager kmsManager;
protected CustomFileManager fileManager;
protected CallDetailRecord cdr;
@ -58,11 +60,12 @@ public abstract class RecordingService {
public final static String INDIVIDUAL_RECORDING_COMPRESSED_EXTENSION = ".zip";
public RecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
RecordingUploader recordingUploader, CustomFileManager fileManager, OpenviduConfig openviduConfig,
CallDetailRecord cdr) {
RecordingUploader recordingUploader, KmsManager kmsManager, CustomFileManager fileManager,
OpenviduConfig openviduConfig, CallDetailRecord cdr) {
this.recordingManager = recordingManager;
this.recordingDownloader = recordingDownloader;
this.recordingUploader = recordingUploader;
this.kmsManager = kmsManager;
this.fileManager = fileManager;
this.openviduConfig = openviduConfig;
this.cdr = cdr;

View File

@ -61,8 +61,8 @@ import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.recording.RecorderEndpointWrapper;
import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.RecordingDownloader;
@ -79,9 +79,9 @@ public class SingleStreamRecordingService extends RecordingService {
private Map<String, Map<String, List<RecorderEndpointWrapper>>> storedRecorders = new ConcurrentHashMap<>();
public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
RecordingUploader recordingUploader, CustomFileManager fileManager, OpenviduConfig openviduConfig,
CallDetailRecord cdr) {
super(recordingManager, recordingDownloader, recordingUploader, fileManager, openviduConfig, cdr);
RecordingUploader recordingUploader, KmsManager kmsManager, CustomFileManager fileManager,
OpenviduConfig openviduConfig, CallDetailRecord cdr) {
super(recordingManager, recordingDownloader, recordingUploader, kmsManager, fileManager, openviduConfig, cdr);
}
@Override
@ -187,7 +187,7 @@ public class SingleStreamRecordingService extends RecordingService {
// Decrement active recordings once it is downloaded. This method will also drop
// the Media Node if no more sessions or recordings and status is
// waiting-idle-to-terminate
((KurentoSession) session).getKms().decrementActiveRecordings();
kmsManager.decrementActiveRecordings(session.getMediaNodeId());
// Upload if necessary
this.uploadRecording(finalRecordingArray[0], reason);