Merge branch 'master' of github.com:OpenVidu/openvidu into master

pull/546/head
cruizba 2020-09-28 10:07:21 +02:00
commit f886ac72db
8 changed files with 244 additions and 127 deletions

View File

@ -22,7 +22,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.kurento.client.GenericMediaEvent;
@ -62,9 +61,7 @@ public class SessionEventsHandler {
@Autowired
protected OpenviduConfig openviduConfig;
Map<String, Recording> recordingsStarted = new ConcurrentHashMap<>();
ReentrantLock lock = new ReentrantLock();
private Map<String, Recording> recordingsToSendClientEvents = new ConcurrentHashMap<>();
public void onSessionCreated(Session session) {
CDR.recordSessionCreated(session);
@ -290,16 +287,10 @@ public class SessionEventsHandler {
rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, result);
if (ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) {
lock.lock();
try {
Recording recording = this.recordingsStarted.remove(session.getSessionId());
if (recording != null) {
// RECORDER participant is now receiving video from the first publisher
this.sendRecordingStartedNotification(session, recording);
}
} finally {
lock.unlock();
}
recordingsToSendClientEvents.computeIfPresent(session.getSessionId(), (key, value) -> {
sendRecordingStartedNotification(session, value);
return null;
});
}
}
@ -311,8 +302,9 @@ public class SessionEventsHandler {
rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject());
}
public void onNetworkQualityChanged(Participant participant, JsonObject params ) {
rpcNotificationService.sendNotification(participant.getParticipantPrivateId(), ProtocolElements.NETWORKQUALITYCHANGED_METHOD, params);
public void onNetworkQualityChanged(Participant participant, JsonObject params) {
rpcNotificationService.sendNotification(participant.getParticipantPrivateId(),
ProtocolElements.NETWORKQUALITYCHANGED_METHOD, params);
}
public void onSendMessage(Participant participant, JsonObject message, Set<Participant> participants,
@ -460,7 +452,7 @@ public class SessionEventsHandler {
public void sendRecordingStoppedNotification(Session session, Recording recording, EndReason reason) {
// Be sure to clean this map (this should return null)
this.recordingsStarted.remove(session.getSessionId());
recordingsToSendClientEvents.remove(session.getSessionId());
// Filter participants by roles according to "OPENVIDU_RECORDING_NOTIFICATION"
Set<Participant> existingParticipants;
@ -570,8 +562,8 @@ public class SessionEventsHandler {
this.rpcNotificationService.closeRpcSession(participantPrivateId);
}
public void setRecordingStarted(String sessionId, Recording recording) {
this.recordingsStarted.put(sessionId, recording);
public void storeRecordingToSendClientEvent(Recording recording) {
recordingsToSendClientEvents.put(recording.getSessionId(), recording);
}
private Set<Participant> filterParticipantsByRole(OpenViduRole[] roles, Set<Participant> participants) {

View File

@ -50,7 +50,7 @@ public class RecordingInfoUtils {
throw new OpenViduException(Code.RECORDING_FILE_EMPTY_ERROR, "The recording file is corrupted");
}
if (this.json.size() == 0) {
// Recording metadata from ffprobe is an emtpy JSON
// Recording metadata from ffprobe is an empty JSON
throw new OpenViduException(Code.RECORDING_FILE_EMPTY_ERROR, "The recording file is empty");
}

View File

@ -78,7 +78,7 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService
recordExecCommand += "export " + envs.get(i) + " ";
}
recordExecCommand += "&& ./composed_quick_start.sh --start-recording > /var/log/ffmpeg.log 2>&1 &";
dockerManager.runCommandInContainer(containerId, recordExecCommand, 0);
dockerManager.runCommandInContainer(containerId, recordExecCommand);
} catch (Exception e) {
this.cleanRecordingMaps(recording);
throw this.failStartRecording(session, recording,
@ -116,7 +116,7 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService
}
try {
dockerManager.runCommandInContainer(containerId, "./composed_quick_start.sh --stop-recording", 10);
dockerManager.runCommandInContainerSync(containerId, "./composed_quick_start.sh --stop-recording", 10);
} catch (InterruptedException e1) {
cleanRecordingMaps(recording);
log.error("Error stopping recording for session id: {}", session.getSessionId());

View File

@ -393,10 +393,10 @@ public class ComposedRecordingService extends RecordingService {
return finalRecordingArray[0];
}
protected void stopAndRemoveRecordingContainer(Recording recording, String containerId, int secondsOfWait) {
private void stopAndRemoveRecordingContainer(Recording recording, String containerId, int secondsOfWait) {
// Gracefully stop ffmpeg process
try {
dockerManager.runCommandInContainer(containerId, "echo 'q' > stop", 0);
dockerManager.runCommandInContainer(containerId, "echo 'q' > stop");
} catch (InterruptedException e1) {
e1.printStackTrace();
}
@ -440,21 +440,26 @@ public class ComposedRecordingService extends RecordingService {
}
protected void waitForVideoFileNotEmpty(Recording recording) throws OpenViduException {
boolean isPresent = false;
int i = 1;
int timeout = 150; // Wait for 150*150 = 22500 = 22.5 seconds
while (!isPresent && timeout <= 150) {
final String VIDEO_FILE = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"
+ recording.getName() + ".mp4";
int SECONDS_MAX_WAIT = 15;
int MILLISECONDS_INTERVAL_WAIT = 100;
int LIMIT = SECONDS_MAX_WAIT * 1000 / MILLISECONDS_INTERVAL_WAIT;
int i = 0;
boolean arePresent = fileExistsAndHasBytes(VIDEO_FILE);
while (!arePresent && i < LIMIT) {
try {
Thread.sleep(150);
timeout++;
File f = new File(this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"
+ recording.getName() + ".mp4");
isPresent = ((f.isFile()) && (f.length() > 0));
Thread.sleep(MILLISECONDS_INTERVAL_WAIT);
arePresent = fileExistsAndHasBytes(VIDEO_FILE);
i++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (i == timeout) {
if (!arePresent) {
log.error("Recorder container failed generating video file (is empty) for session {}",
recording.getSessionId());
throw new OpenViduException(Code.RECORDING_START_ERROR_CODE,
@ -462,7 +467,12 @@ public class ComposedRecordingService extends RecordingService {
}
}
protected void failRecordingCompletion(Recording recording, String containerId, OpenViduException e)
private boolean fileExistsAndHasBytes(String fileName) {
File f = new File(fileName);
return (f.exists() && f.isFile() && f.length() > 0);
}
private void failRecordingCompletion(Recording recording, String containerId, OpenViduException e)
throws OpenViduException {
recording.setStatus(io.openvidu.java.client.Recording.Status.failed);
dockerManager.removeDockerContainer(containerId, true);

View File

@ -283,6 +283,7 @@ public class RecordingManager {
if (!(OutputMode.COMPOSED.equals(properties.outputMode()) && properties.hasVideo())) {
// Directly send recording started notification for all cases except for
// COMPOSED recordings with video (will be sent on first RECORDER subscriber)
// Both INDIVIDUAL and COMPOSED_QUICK_START should notify immediately
this.sessionHandler.sendRecordingStartedNotification(session, recording);
}
if (session.getActivePublishers() == 0) {
@ -317,7 +318,19 @@ public class RecordingManager {
recording = this.sessionsRecordings.get(session.getSessionId());
}
recording = ((RecordingService) singleStreamRecordingService).sealRecordingMetadataFileAsStopped(recording);
if (recording == null) {
recording = this.sessionsRecordingsStarting.get(session.getSessionId());
if (recording == null) {
log.error("Cannot stop recording. Session {} is not being recorded", recordingId,
session.getSessionId());
return null;
} else {
// Recording is still starting
log.warn("Recording {} is still in \"starting\" status", recording.getId());
}
}
((RecordingService) singleStreamRecordingService).sealRecordingMetadataFileAsStopped(recording);
final long timestamp = System.currentTimeMillis();
this.cdr.recordRecordingStatusChanged(recording, reason, timestamp, Status.stopped);
@ -408,9 +421,16 @@ public class RecordingManager {
public void stopOneIndividualStreamRecording(KurentoSession session, String streamId, long kmsDisconnectionTime) {
Recording recording = this.sessionsRecordings.get(session.getSessionId());
if (recording == null) {
recording = this.sessionsRecordingsStarting.get(session.getSessionId());
if (recording == null) {
log.error("Cannot stop recording of existing stream {}. Session {} is not being recorded", streamId,
session.getSessionId());
return;
} else {
// Recording is still starting
log.warn("Recording {} is still in \"starting\" status", recording.getId());
}
}
if (OutputMode.INDIVIDUAL.equals(recording.getOutputMode())) {
// Stop specific RecorderEndpoint for this stream
@ -835,6 +855,8 @@ public class RecordingManager {
|| (sessionsRecordingsStarting.putIfAbsent(recording.getSessionId(), recording) != null)) {
log.error("Concurrent session recording initialization. Aborting this thread");
throw new RuntimeException("Concurrent initialization of recording " + recording.getId());
} else {
this.sessionHandler.storeRecordingToSendClientEvent(recording);
}
}
@ -843,7 +865,6 @@ public class RecordingManager {
* collection
*/
private void recordingFromStartingToStarted(Recording recording) {
this.sessionHandler.setRecordingStarted(recording.getSessionId(), recording);
this.sessionsRecordings.put(recording.getSessionId(), recording);
this.startingRecordings.remove(recording.getId());
this.sessionsRecordingsStarting.remove(recording.getSessionId());

View File

@ -26,23 +26,29 @@ import java.util.concurrent.TimeUnit;
import javax.ws.rs.ProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.*;
import com.github.dockerjava.api.command.CreateContainerCmd;
import com.github.dockerjava.api.command.CreateContainerResponse;
import com.github.dockerjava.api.command.ExecCreateCmdResponse;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.command.InspectImageResponse;
import com.github.dockerjava.api.exception.ConflictException;
import com.github.dockerjava.api.exception.DockerClientException;
import com.github.dockerjava.api.exception.InternalServerErrorException;
import com.github.dockerjava.api.exception.NotFoundException;
import com.github.dockerjava.api.model.*;
import com.github.dockerjava.api.model.Bind;
import com.github.dockerjava.api.model.Container;
import com.github.dockerjava.api.model.HostConfig;
import com.github.dockerjava.api.model.Volume;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.command.ExecStartResultCallback;
import com.github.dockerjava.core.command.PullImageResultCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.recording.service.WaitForContainerStoppedCallback;
@ -102,9 +108,8 @@ public class DockerManager {
}
public String runContainer(String container, String containerName, String user, List<Volume> volumes,
List<Bind> binds, String networkMode, List<String> envs, List<String> command, Long shmSize, boolean privileged,
Map<String, String> labels)
throws Exception {
List<Bind> binds, String networkMode, List<String> envs, List<String> command, Long shmSize,
boolean privileged, Map<String, String> labels) throws Exception {
CreateContainerCmd cmd = dockerClient.createContainerCmd(container).withEnv(envs);
if (containerName != null) {
@ -167,21 +172,30 @@ public class DockerManager {
}
}
public String runCommandInContainer(String containerId, String command, int secondsOfWait)
public void runCommandInContainer(String containerId, String command) throws InterruptedException {
ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(containerId).withAttachStdout(true)
.withAttachStderr(true).withCmd("bash", "-c", command).exec();
dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(new ExecStartResultCallback() {
});
}
public void runCommandInContainerSync(String containerId, String command, int secondsOfWait)
throws InterruptedException {
ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(containerId).withAttachStdout(true)
.withAttachStderr(true).withCmd("bash", "-c", command).exec();
CountDownLatch latch = new CountDownLatch(1);
final String[] stringResponse = new String[1];
dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(new ExecStartResultCallback() {
@Override
public void onNext(Frame item) {
stringResponse[0] = new String(item.getPayload());
public void onComplete() {
latch.countDown();
}
});
try {
latch.await(secondsOfWait, TimeUnit.SECONDS);
return stringResponse[0];
} catch (InterruptedException e) {
throw new InterruptedException("Container " + containerId + " did not return from executing command \""
+ command + "\" in " + secondsOfWait + " seconds");
}
}
public void waitForContainerStopped(String containerId, int secondsOfWait) throws Exception {

View File

@ -59,6 +59,10 @@ public class CustomWebhook {
CustomWebhook.context.close();
}
public static void clean() {
CustomWebhook.events.clear();
}
public synchronized static JsonObject waitForEvent(String eventName, int maxSecondsWait) throws Exception {
if (events.get(eventName) == null) {
events.put(eventName, new LinkedBlockingDeque<>());

View File

@ -1246,7 +1246,19 @@ public class OpenViduTestAppE2eTest {
log.info("Remote composed quick start record");
CountDownLatch initLatch = new CountDownLatch(1);
io.openvidu.test.browsers.utils.CustomWebhook.main(new String[0], initLatch);
try {
if (!initLatch.await(30, TimeUnit.SECONDS)) {
Assert.fail("Timeout waiting for webhook springboot app to start");
CustomWebhook.shutDown();
return;
}
final String sessionName = "COMPOSED_QUICK_START_RECORDED_SESSION";
JsonObject event;
// 1. MANUAL mode and recording explicitly stopped
@ -1287,9 +1299,10 @@ public class OpenViduTestAppE2eTest {
OV.fetch();
String recId = OV.startRecording(sessionName).getId();
user.getEventManager().waitUntilEventReaches("recordingStarted", 2);
CustomWebhook.waitForEvent("recordingStatusChanged", 5);
checkDockerContainerRunning("openvidu/openvidu-recording", 1);
Thread.sleep(1000);
Thread.sleep(2000);
Assert.assertEquals("Wrong number of recordings found", 1, OV.listRecordings().size());
OV.stopRecording(recId);
@ -1302,7 +1315,11 @@ public class OpenViduTestAppE2eTest {
checkDockerContainerRunning("openvidu/openvidu-recording", 0);
Assert.assertEquals("Wrong recording status", Recording.Status.ready,
OV.getRecording(sessionName).getStatus());
// 2. ALWAYS mode and recording stopped by session close up
CustomWebhook.clean();
user.getDriver().findElement(By.id("remove-all-users-btn")).click();
user.getDriver().findElement(By.id("add-user-btn")).click();
user.getDriver().findElement(By.id("session-name-input-0")).clear();
@ -1328,6 +1345,10 @@ public class OpenViduTestAppE2eTest {
user.getEventManager().waitUntilEventReaches("streamPlaying", 3);
user.getEventManager().waitUntilEventReaches("recordingStarted", 3);
event = CustomWebhook.waitForEvent("recordingStatusChanged", 5); // started
Assert.assertEquals("Wrong status in recordingStatusChanged event", "started",
event.get("status").getAsString());
checkDockerContainerRunning("openvidu/openvidu-recording", 1);
OV.fetch();
@ -1335,6 +1356,60 @@ public class OpenViduTestAppE2eTest {
session.close();
checkDockerContainerRunning("openvidu/openvidu-recording", 0);
Assert.assertEquals("Wrong recording status", Recording.Status.ready,
OV.getRecording(sessionName + "-1").getStatus());
// 3. Session closed before recording started should trigger
CustomWebhook.clean();
user.getDriver().findElement(By.id("remove-all-users-btn")).click();
user.getDriver().findElement(By.id("add-user-btn")).click();
user.getDriver().findElement(By.id("session-name-input-0")).clear();
user.getDriver().findElement(By.id("session-name-input-0")).sendKeys(sessionName);
user.getDriver().findElement(By.id("session-settings-btn-0")).click();
Thread.sleep(1000);
user.getDriver().findElement(By.id("recording-mode-select")).click();
Thread.sleep(500);
user.getDriver().findElement(By.id("option-ALWAYS")).click();
Thread.sleep(500);
user.getDriver().findElement(By.id("output-mode-select")).click();
Thread.sleep(500);
user.getDriver().findElement(By.id("option-COMPOSED_QUICK_START")).click();
Thread.sleep(500);
user.getDriver().findElement(By.id("save-btn")).click();
Thread.sleep(1000);
user.getDriver().findElement(By.className("join-btn")).click();
user.getEventManager().waitUntilEventReaches("connectionCreated", 6);
user.getEventManager().waitUntilEventReaches("accessAllowed", 3);
user.getEventManager().waitUntilEventReaches("streamCreated", 4);
user.getEventManager().waitUntilEventReaches("streamPlaying", 4);
checkDockerContainerRunning("openvidu/openvidu-recording", 1);
OV.fetch();
session = OV.getActiveSessions().get(0);
session.close();
// Recording hasn't had time to start. Should trigger stopped, started, failed
event = CustomWebhook.waitForEvent("recordingStatusChanged", 1); // stopped
Assert.assertEquals("Wrong status in recordingStatusChanged event", "stopped",
event.get("status").getAsString());
event = CustomWebhook.waitForEvent("recordingStatusChanged", 5); // started
Assert.assertEquals("Wrong status in recordingStatusChanged event", "started",
event.get("status").getAsString());
event = CustomWebhook.waitForEvent("recordingStatusChanged", 1); // failed
Assert.assertEquals("Wrong status in recordingStatusChanged event", "failed",
event.get("status").getAsString());
checkDockerContainerRunning("openvidu/openvidu-recording", 0);
Assert.assertEquals("Wrong recording status", Recording.Status.failed,
OV.getRecording(sessionName + "-2").getStatus());
} finally {
CustomWebhook.shutDown();
}
}
@Test
@ -2366,9 +2441,10 @@ public class OpenViduTestAppE2eTest {
.recordingLayout(RecordingLayout.BEST_FIT).resolution("1280x720").hasVideo(true).hasAudio(false)
.name(customRecordingName).build();
// Start recording method should block until video exists and size > 0
Recording recording2 = OV.startRecording(session.getSessionId(), recordingProperties);
recording2 = OV.stopRecording(recording2.getId());
Assert.assertEquals("Wrong recording status", Recording.Status.failed, recording2.getStatus());
Assert.assertEquals("Wrong recording status", Recording.Status.ready, recording2.getStatus());
OV.deleteRecording(recording2.getId());
recording2 = OV.startRecording(session.getSessionId(), recordingProperties);