Merge branch 'master' of github.com:OpenVidu/openvidu

pull/431/head
OscarSotoSanchez 2020-04-09 16:35:06 +02:00
commit 0a36927836
8 changed files with 248 additions and 124 deletions

View File

@ -230,7 +230,7 @@ public class OpenViduServer implements JsonRpcConfigurer {
}
public static <T> void checkConfigProperties(Class<T> configClass) throws InterruptedException {
ConfigurableApplicationContext app = SpringApplication.run(configClass,
new String[] { "--spring.main.web-application-type=none" });
OpenviduConfig config = app.getBean(OpenviduConfig.class);
@ -243,7 +243,7 @@ public class OpenViduServer implements JsonRpcConfigurer {
for (Error error : config.getConfigErrors()) {
msg += " * ";
if(error.getProperty() != null) {
if (error.getProperty() != null) {
msg += "Property " + config.getPropertyName(error.getProperty());
if (error.getValue() == null || error.getValue().equals("")) {
msg += " is not set. ";
@ -251,7 +251,7 @@ public class OpenViduServer implements JsonRpcConfigurer {
msg += "=" + error.getValue() + ". ";
}
}
msg += error.getMessage() + "\n";
}
@ -265,21 +265,21 @@ public class OpenViduServer implements JsonRpcConfigurer {
// Wait forever
new Semaphore(0).acquire();
} else {
String msg = "\n\n\n" + " Configuration properties\n" + " ----------------------\n" + "\n";
String msg = "\n\n\n" + " Configuration properties\n" + " ------------------------\n" + "\n";
Map<String, String> configProps = config.getConfigProps();
List<String> configPropNames = new ArrayList<>(config.getUserProperties());
Collections.sort(configPropNames);
for(String property : configPropNames) {
for (String property : configPropNames) {
String value = configProps.get(property);
msg += " * "+config.getPropertyName(property)+"="+(value == null? "": value)+"\n";
msg += " * " + config.getPropertyName(property) + "=" + (value == null ? "" : value) + "\n";
}
msg += "\n\n";
log.info(msg);
}
}
@ -290,10 +290,10 @@ public class OpenViduServer implements JsonRpcConfigurer {
String dashboardUrl = httpUrl + "dashboard/";
// @formatter:off
String msg = "\n\n----------------------------------------------------\n" + "\n"
+ " OpenVidu Platform is ready!\n" + " ---------------------------\n" + "\n"
+ " * OpenVidu Server: " + httpUrl + "\n" + "\n" + " * OpenVidu Dashboard: " + dashboardUrl + "\n"
+ "\n" + "----------------------------------------------------\n";
String msg = "\n\n----------------------------------------------------\n" + "\n" + " OpenVidu is ready!\n"
+ " ---------------------------\n" + "\n" + " * OpenVidu Server: " + httpUrl + "\n" + "\n"
+ " * OpenVidu Dashboard: " + dashboardUrl + "\n" + "\n"
+ "----------------------------------------------------\n";
// @formatter:on
log.info(msg);

View File

@ -81,6 +81,11 @@ public class OpenviduConfig {
public String getMessage() {
return message;
}
@Override
public String toString() {
return "Error [property=" + property + ", value=" + value + ", message=" + message + "]";
}
}
private static final Logger log = LoggerFactory.getLogger(OpenviduConfig.class);
@ -95,11 +100,13 @@ public class OpenviduConfig {
private List<String> userConfigProps;
private Map<String, ?> propertiesSource;
@Autowired
protected Environment env;
protected Dotenv dotenv;
@Value("#{'${spring.profiles.active:}'.length() > 0 ? '${spring.profiles.active:}'.split(',') : \"default\"}")
protected String springProfile;
@ -332,6 +339,17 @@ public class OpenviduConfig {
}
// Properties management methods
public OpenviduConfig deriveWithAdditionalPropertiesSource(Map<String, ?> propertiesSource) {
OpenviduConfig config = newOpenviduConfig();
config.propertiesSource = propertiesSource;
config.env = env;
return config;
}
protected OpenviduConfig newOpenviduConfig() {
return new OpenviduConfig();
}
public List<Error> getConfigErrors() {
return configErrors;
@ -345,9 +363,25 @@ public class OpenviduConfig {
return userConfigProps;
}
public String getConfigValue(String property) {
private String getValue(String property) {
String value = env.getProperty(property);
String value = null;
if (propertiesSource != null) {
Object valueObj = propertiesSource.get(property);
if (valueObj != null) {
if(valueObj instanceof Iterable) {
value = JsonUtils.toJson(valueObj);
} else {
value = valueObj.toString();
}
}
}
if (value == null) {
value = env.getProperty(property);
}
this.configProps.put(property, value);
@ -367,27 +401,27 @@ public class OpenviduConfig {
String value = null;
if (property != null) {
value = getConfigValue(property);
value = getValue(property);
}
this.configErrors.add(new Error(property, value, msg));
}
@PostConstruct
protected void checkConfigurationProperties() {
public void checkConfiguration() {
dotenv = new Dotenv();
try {
dotenv.read();
} catch (IOException e) {
log.warn("Exception reading .env file. "+ e.getClass()+":"+e.getMessage());
log.warn("Exception reading .env file. " + e.getClass() + ":" + e.getMessage());
} catch (DotenvFormatException e) {
log.warn("Format error in .env file. "+ e.getClass()+":"+e.getMessage());
log.warn("Format error in .env file. " + e.getClass() + ":" + e.getMessage());
}
try {
this.checkConfigurationParameters();
this.checkConfigurationProperties();
} catch (Exception e) {
log.error("Exception checking configuration", e);
addError(null, "Exception checking configuration." + e.getClass().getName() + ":" + e.getMessage());
@ -405,15 +439,15 @@ public class OpenviduConfig {
// Properties
protected void checkConfigurationParameters() {
protected void checkConfigurationProperties() {
serverPort = getConfigValue("server.port");
serverPort = getValue("server.port");
coturnRedisDbname = getConfigValue("coturn.redis.dbname");
coturnRedisDbname = getValue("coturn.redis.dbname");
coturnRedisPassword = getConfigValue("coturn.redis.password");
coturnRedisPassword = getValue("coturn.redis.password");
coturnRedisConnectTimeout = getConfigValue("coturn.redis.connect-timeout");
coturnRedisConnectTimeout = getValue("coturn.redis.connect-timeout");
openviduSecret = asNonEmptyString("openvidu.secret");
@ -436,6 +470,9 @@ public class OpenviduConfig {
openviduStreamsVideoMinRecvBandwidth = asNonNegativeInteger("openvidu.streams.video.min-recv-bandwidth");
openviduStreamsVideoMaxSendBandwidth = asNonNegativeInteger("openvidu.streams.video.max-send-bandwidth");
openviduStreamsVideoMinSendBandwidth = asNonNegativeInteger("openvidu.streams.video.min-send-bandwidth");
openviduSessionsGarbageInterval = asNonNegativeInteger("openvidu.sessions.garbage.interval");
openviduSessionsGarbageThreshold = asNonNegativeInteger("openvidu.sessions.garbage.threshold");
kmsUrisList = checkKmsUris();
@ -447,9 +484,6 @@ public class OpenviduConfig {
checkCertificateType();
openviduSessionsGarbageInterval = asNonNegativeInteger("openvidu.sessions.garbage.interval");
openviduSessionsGarbageThreshold = asNonNegativeInteger("openvidu.sessions.garbage.threshold");
}
private void checkCertificateType() {
@ -465,7 +499,7 @@ public class OpenviduConfig {
}
private void checkCoturnIp() {
coturnIp = getConfigValue("coturn.ip");
coturnIp = getValue("coturn.ip");
if (coturnIp == null || this.coturnIp.isEmpty()) {
try {
@ -501,13 +535,13 @@ public class OpenviduConfig {
private void checkOpenviduPublicurl() {
final String property = "openvidu.domain.or.public.ip";
String domain = getConfigValue(property);
String domain = getValue(property);
if (domain != null && !domain.isEmpty()) {
this.openviduPublicUrl = "https://" + domain;
} else {
final String urlProperty = "openvidu.publicurl";
String publicurl = getConfigValue(urlProperty);
String publicurl = getValue(urlProperty);
if (publicurl == null || publicurl.isEmpty()) {
addError(property, "Cannot be empty");
} else {
@ -570,15 +604,15 @@ public class OpenviduConfig {
}
public List<String> checkKmsUris() {
String property = "kms.uris";
return asKmsUris(property, getConfigValue(property));
return asKmsUris(property, getValue(property));
}
public List<String> asKmsUris(String property, String kmsUris) {
if (kmsUris == null || kmsUris.isEmpty()) {
return Arrays.asList();
}
@ -646,7 +680,7 @@ public class OpenviduConfig {
// -------------------------------------------------------
protected String asOptionalURL(String property) {
String optionalUrl = getConfigValue(property);
String optionalUrl = getValue(property);
try {
if (!optionalUrl.isEmpty()) {
checkUrl(optionalUrl);
@ -659,7 +693,7 @@ public class OpenviduConfig {
}
protected String asNonEmptyString(String property) {
String stringValue = getConfigValue(property);
String stringValue = getValue(property);
if (stringValue != null && !stringValue.isEmpty()) {
return stringValue;
} else {
@ -669,11 +703,11 @@ public class OpenviduConfig {
}
protected String asOptionalString(String property) {
return getConfigValue(property);
return getValue(property);
}
protected boolean asBoolean(String property) {
String value = getConfigValue(property);
String value = getValue(property);
if (value == null) {
addError(property, "Cannot be empty");
return false;
@ -689,7 +723,7 @@ public class OpenviduConfig {
protected Integer asNonNegativeInteger(String property) {
try {
Integer integerValue = Integer.parseInt(getConfigValue(property));
Integer integerValue = Integer.parseInt(getValue(property));
if (integerValue < 0) {
addError(property, "Is not a non negative integer");
@ -705,7 +739,7 @@ public class OpenviduConfig {
* This method checks all types of Internet addresses (IPv4, IPv6 and Domains)
*/
protected String asOptionalInetAddress(String property) {
String inetAddress = getConfigValue(property);
String inetAddress = getValue(property);
if (inetAddress != null && !inetAddress.isEmpty()) {
try {
Inet6Address.getByName(inetAddress).getHostAddress();
@ -733,7 +767,7 @@ public class OpenviduConfig {
protected List<String> asJsonStringsArray(String property) {
try {
Gson gson = new Gson();
JsonArray jsonArray = gson.fromJson(getConfigValue(property), JsonArray.class);
JsonArray jsonArray = gson.fromJson(getValue(property), JsonArray.class);
List<String> list = JsonUtils.toStringList(jsonArray);
if (list.size() == 1 && list.get(0).isEmpty()) {
list = new ArrayList<>();
@ -746,7 +780,7 @@ public class OpenviduConfig {
}
protected <E extends Enum<E>> E asEnumValue(String property, Class<E> enumType) {
String value = this.getConfigValue(property);
String value = this.getValue(property);
try {
return Enum.valueOf(enumType, value);
} catch (IllegalArgumentException e) {

View File

@ -28,6 +28,7 @@ import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
@ -447,16 +448,27 @@ public abstract class SessionManager {
long sessionExistsSince = currentMillis - sessionNotActive.getStartTime();
if (sessionExistsSince > (openviduConfig.getSessionGarbageThreshold() * 1000)) {
try {
sessionNotActive.closingLock.writeLock().lock();
if (sessions.containsKey(sessionId)) {
// The session passed to active during lock wait
continue;
if (sessionNotActive.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (sessions.containsKey(sessionId)) {
// The session passed to active during lock wait
continue;
}
iter.remove();
cleanCollections(sessionId);
log.info("Non active session {} cleaned up by garbage collector", sessionId);
} finally {
sessionNotActive.closingLock.writeLock().unlock();
}
} else {
log.error(
"Timeout waiting for Session closing lock to be available for garbage collector to clean session {}",
sessionId);
}
iter.remove();
cleanCollections(sessionId);
log.info("Non active session {} cleaned up by garbage collector", sessionId);
} finally {
sessionNotActive.closingLock.writeLock().unlock();
} catch (InterruptedException e) {
log.error(
"InterruptedException while waiting for Session closing lock to be available for garbage collector to clean session {}",
sessionId);
}
}
}
@ -515,13 +527,20 @@ public abstract class SessionManager {
// to the session. That is: if the session was in the automatic recording stop
// timeout with INDIVIDUAL recording (no docker participant connected)
try {
session.closingLock.writeLock().lock();
if (session.isClosed()) {
return;
if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (session.isClosed()) {
return;
}
this.closeSessionAndEmptyCollections(session, reason, true);
} finally {
session.closingLock.writeLock().unlock();
}
} else {
log.error("Timeout waiting for Session {} closing lock to be available", sessionId);
}
this.closeSessionAndEmptyCollections(session, reason, true);
} finally {
session.closingLock.writeLock().unlock();
} catch (InterruptedException e) {
log.error("InterruptedException while waiting for Session {} closing lock to be available", sessionId);
}
}
}

View File

@ -331,12 +331,18 @@ public class KurentoParticipant extends Participant {
} finally {
pub.closingLock.writeLock().unlock();
}
} else {
log.error(
"Timeout waiting for PublisherEndpoint closing lock of participant {} to be available for participant {} to call cancelReceveivingMedia",
senderName, this.getParticipantPublicId());
}
} catch (InterruptedException e) {
subscribers.remove(senderName);
log.error(
"Timeout wating for PublisherEndpoint closing lock of participant {} to be available for participant {} to call cancelReceveivingMedia",
"InterruptedException while waiting for PublisherEndpoint closing lock of participant {} to be available for participant {} to call cancelReceveivingMedia",
senderName, this.getParticipantPublicId());
} finally {
// Always clean map
subscribers.remove(senderName);
}
}
}

View File

@ -86,7 +86,7 @@ public class KurentoSessionManager extends SessionManager {
@Override
/* Protected by Session.closingLock.readLock */
public synchronized void joinRoom(Participant participant, String sessionId, Integer transactionId) {
public void joinRoom(Participant participant, String sessionId, Integer transactionId) {
Set<Participant> existingParticipants = null;
boolean lockAcquired = false;
try {
@ -162,8 +162,7 @@ public class KurentoSessionManager extends SessionManager {
}
@Override
public synchronized boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason,
boolean closeWebSocket) {
public boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason, boolean closeWebSocket) {
log.info("Request [LEAVE_ROOM] for participant {} of session {} with reason {}",
participant.getParticipantPublicId(), participant.getSessionId(),
reason != null ? reason.name() : "NULL");
@ -235,17 +234,27 @@ public class KurentoSessionManager extends SessionManager {
recordingManager.initAutomaticRecordingStopThread(session);
} else {
try {
session.closingLock.writeLock().lock();
if (session.isClosed()) {
return false;
if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (session.isClosed()) {
return false;
}
log.info("No more participants in session '{}', removing it and closing it", sessionId);
this.closeSessionAndEmptyCollections(session, reason, true);
sessionClosedByLastParticipant = true;
} finally {
session.closingLock.writeLock().unlock();
}
} else {
log.error(
"Timeout waiting for Session {} closing lock to be available for closing as last participant left",
sessionId);
}
log.info("No more participants in session '{}', removing it and closing it", sessionId);
this.closeSessionAndEmptyCollections(session, reason, true);
sessionClosedByLastParticipant = true;
} finally {
session.closingLock.writeLock().unlock();
} catch (InterruptedException e) {
log.error(
"InterruptedException while waiting for Session {} closing lock to be available for closing as last participant left",
sessionId);
}
}
} else if (remainingParticipants.size() == 1 && openviduConfig.isRecordingModuleEnabled()
&& MediaMode.ROUTED.equals(session.getSessionProperties().mediaMode())

View File

@ -480,34 +480,46 @@ public class RecordingManager {
recordingId, this.openviduConfig.getOpenviduRecordingAutostopTimeout());
if (this.automaticRecordingStopThreads.remove(session.getSessionId()) != null) {
boolean alreadyUnlocked = false;
try {
session.closingLock.writeLock().lock();
if (session.isClosed()) {
return;
}
if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) {
// Close session if there are no participants connected (RECORDER does not
// count) and publishing
log.info("Closing session {} after automatic stop of recording {}", session.getSessionId(),
recordingId);
sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop, true);
if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (session.isClosed()) {
return;
}
if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) {
// Close session if there are no participants connected (RECORDER does not
// count) and publishing
log.info("Closing session {} after automatic stop of recording {}",
session.getSessionId(), recordingId);
sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop,
true);
} else {
// There are users connected, but no one is publishing
// We don't need the lock if session is not closing
session.closingLock.writeLock().unlock();
alreadyUnlocked = true;
log.info(
"Automatic stopping recording {}. There are users connected to session {}, but no one is publishing",
recordingId, session.getSessionId());
this.stopRecording(session, recordingId, EndReason.automaticStop);
}
} finally {
if (!alreadyUnlocked) {
session.closingLock.writeLock().unlock();
}
}
} else {
// There are users connected, but no one is publishing
session.closingLock.writeLock().unlock(); // We don't need the lock if session's not closing
alreadyUnlocked = true;
log.info(
"Automatic stopping recording {}. There are users connected to session {}, but no one is publishing",
recordingId, session.getSessionId());
this.stopRecording(session, recordingId, EndReason.automaticStop);
}
} finally {
if (!alreadyUnlocked) {
session.closingLock.writeLock().unlock();
log.error(
"Timeout waiting for Session {} closing lock to be available for automatic recording stop thred",
session.getSessionId());
}
} catch (InterruptedException e) {
log.error(
"InterruptedException while waiting for Session {} closing lock to be available for automatic recording stop thred",
session.getSessionId());
}
} else {
// This code shouldn't be reachable
log.warn("Recording {} was already automatically stopped by a previous thread", recordingId);
@ -523,23 +535,34 @@ public class RecordingManager {
if (future != null) {
boolean cancelled = future.cancel(false);
try {
session.closingLock.writeLock().lock();
if (session.isClosed()) {
return false;
}
if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) {
// Close session if there are no participants connected (except for RECORDER).
// This code will only be executed if recording is manually stopped during the
// automatic stop timeout, so the session must be also closed
log.info(
"Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session",
if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (session.isClosed()) {
return false;
}
if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) {
// Close session if there are no participants connected (except for RECORDER).
// This code will only be executed if recording is manually stopped during the
// automatic stop timeout, so the session must be also closed
log.info(
"Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session",
session.getSessionId());
sessionManager.closeSessionAndEmptyCollections(session, reason, false);
}
} finally {
session.closingLock.writeLock().unlock();
}
} else {
log.error(
"Timeout waiting for Session {} closing lock to be available for aborting automatic recording stop thred",
session.getSessionId());
sessionManager.closeSessionAndEmptyCollections(session, reason, false);
}
return cancelled;
} finally {
session.closingLock.writeLock().unlock();
} catch (InterruptedException e) {
log.error(
"InterruptedException while waiting for Session {} closing lock to be available for aborting automatic recording stop thred",
session.getSessionId());
}
return cancelled;
} else {
return true;
}

View File

@ -21,6 +21,7 @@ import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
@ -234,15 +235,28 @@ public class SessionRestController {
Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId);
if (sessionNotActive != null) {
try {
sessionNotActive.closingLock.writeLock().lock();
if (sessionNotActive.isClosed()) {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
if (sessionNotActive.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (sessionNotActive.isClosed()) {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive,
EndReason.sessionClosedByServer, true);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
} finally {
sessionNotActive.closingLock.writeLock().unlock();
}
} else {
String errorMsg = "Timeout waiting for Session " + sessionId
+ " closing lock to be available for closing from DELETE /api/sessions";
log.error(errorMsg);
return this.generateErrorResponse(errorMsg, "/api/sessions", HttpStatus.BAD_REQUEST);
}
this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer,
true);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
} finally {
sessionNotActive.closingLock.writeLock().unlock();
} catch (InterruptedException e) {
String errorMsg = "InterruptedException while waiting for Session " + sessionId
+ " closing lock to be available for closing from DELETE /api/sessions";
log.error(errorMsg);
return this.generateErrorResponse(errorMsg, "/api/sessions", HttpStatus.BAD_REQUEST);
}
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);

View File

@ -22,6 +22,7 @@ import static org.openqa.selenium.OutputType.BASE64;
import java.awt.Color;
import java.awt.image.BufferedImage;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
@ -3259,6 +3260,8 @@ public class OpenViduTestAppE2eTest {
recording.getResolution(), realResolution);
log.info("Recording map color: {}", colorMap.toString());
log.info("Recording frame below");
System.out.println(bufferedImageToBase64PngString(image));
isFine = this.checkVideoAverageRgbGreen(colorMap);
} catch (IOException | JCodecException e) {
log.warn("Error getting frame from video recording: {}", e.getMessage());
@ -3267,6 +3270,22 @@ public class OpenViduTestAppE2eTest {
return isFine;
}
private String bufferedImageToBase64PngString(BufferedImage image) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
String imageString = null;
try {
ImageIO.write(image, "png", bos);
byte[] imageBytes = bos.toByteArray();
imageString = "data:image/png;base64," + Base64.getEncoder().encodeToString(imageBytes);
bos.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return imageString;
}
private void checkIndividualRecording(String recPath, Recording recording, int numberOfVideoFiles,
String audioDecoder, String videoDecoder, boolean checkAudio) throws IOException {