mirror of https://github.com/OpenVidu/openvidu.git
Merge branch 'master' of github.com:OpenVidu/openvidu
commit
2bf9a6b493
|
@ -18,7 +18,7 @@ version: '3.1'
|
||||||
services:
|
services:
|
||||||
|
|
||||||
openvidu-server:
|
openvidu-server:
|
||||||
image: openvidu/openvidu-server:2.13.0-beta4
|
image: openvidu/openvidu-server:2.13.0-beta3
|
||||||
restart: on-failure
|
restart: on-failure
|
||||||
network_mode: host
|
network_mode: host
|
||||||
volumes:
|
volumes:
|
||||||
|
|
|
@ -41,9 +41,7 @@ First clone this repository and move to openvidu-docker-compose folder:
|
||||||
|
|
||||||
```
|
```
|
||||||
$ git clone https://github.com/OpenVidu/openvidu.git
|
$ git clone https://github.com/OpenVidu/openvidu.git
|
||||||
$ cd openvidu
|
$ cd openvidu/openvidu-server/docker/openvidu-docker-compose
|
||||||
$ git checkout -b deploy-docker-compose origin/deploy-docker-compose
|
|
||||||
$ cd openvidu-server/docker/openvidu-docker-compose
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### OpenVidu configuration
|
### OpenVidu configuration
|
||||||
|
|
|
@ -242,12 +242,16 @@ public class OpenViduServer implements JsonRpcConfigurer {
|
||||||
String msg = "\n\n\n" + " Configuration errors\n" + " --------------------\n" + "\n";
|
String msg = "\n\n\n" + " Configuration errors\n" + " --------------------\n" + "\n";
|
||||||
|
|
||||||
for (Error error : config.getConfigErrors()) {
|
for (Error error : config.getConfigErrors()) {
|
||||||
msg += " * Property " + config.getPropertyName(error.getProperty());
|
msg += " * ";
|
||||||
if (error.getValue() == null || error.getValue().equals("")) {
|
if(error.getProperty() != null) {
|
||||||
msg += " is not set. ";
|
msg += "Property " + config.getPropertyName(error.getProperty());
|
||||||
} else {
|
if (error.getValue() == null || error.getValue().equals("")) {
|
||||||
msg += "=" + error.getValue() + ". ";
|
msg += " is not set. ";
|
||||||
|
} else {
|
||||||
|
msg += "=" + error.getValue() + ". ";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
msg += error.getMessage() + "\n";
|
msg += error.getMessage() + "\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,147 @@
|
||||||
|
package io.openvidu.server.config;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.nio.file.StandardOpenOption;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class Dotenv {
|
||||||
|
|
||||||
|
public static class DotenvFormatException extends Exception {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = -7280645547648990756L;
|
||||||
|
|
||||||
|
public DotenvFormatException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Logger log = LoggerFactory.getLogger(Dotenv.class);
|
||||||
|
|
||||||
|
private List<String> lines;
|
||||||
|
private Map<String, String> properties;
|
||||||
|
|
||||||
|
private Path envFile;
|
||||||
|
|
||||||
|
private List<String> additionalProperties = new ArrayList<>();
|
||||||
|
|
||||||
|
public void read() throws IOException, DotenvFormatException {
|
||||||
|
read(Paths.get(".env"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public String get(String property) {
|
||||||
|
return properties.get(property);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Path getEnvFile() {
|
||||||
|
return envFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void write() throws IOException {
|
||||||
|
write(envFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void write(Path envFile) throws IOException {
|
||||||
|
|
||||||
|
List<String> outLines = new ArrayList<>();
|
||||||
|
|
||||||
|
for (String line : lines) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
Pair<String, String> propValue = parseLine(envFile, line);
|
||||||
|
if(propValue == null) {
|
||||||
|
outLines.add(line);
|
||||||
|
} else {
|
||||||
|
outLines.add(propValue.getKey()+"="+properties.get(propValue.getKey()));
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (DotenvFormatException e) {
|
||||||
|
log.error("Previously parsed line is producing a parser error", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!additionalProperties.isEmpty()) {
|
||||||
|
for(String prop : additionalProperties) {
|
||||||
|
outLines.add(prop+"="+properties.get(prop));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Files.write(envFile, outLines, Charset.defaultCharset(), StandardOpenOption.CREATE);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void read(Path envFile) throws IOException, DotenvFormatException {
|
||||||
|
|
||||||
|
this.envFile = envFile;
|
||||||
|
|
||||||
|
lines = Files.readAllLines(envFile);
|
||||||
|
|
||||||
|
properties = new HashMap<>();
|
||||||
|
for (String line : lines) {
|
||||||
|
|
||||||
|
log.debug("Reading line '{}'", line);
|
||||||
|
|
||||||
|
Pair<String, String> propValue = parseLine(envFile, line);
|
||||||
|
|
||||||
|
if (propValue != null) {
|
||||||
|
|
||||||
|
log.debug("Setting property {}={}", propValue.getKey(), propValue.getValue());
|
||||||
|
|
||||||
|
properties.put(propValue.getKey(), propValue.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Pair<String, String> parseLine(Path envFile, String line) throws DotenvFormatException {
|
||||||
|
|
||||||
|
if (isWhitespace(line) || isComment(line)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
int index = line.indexOf("=");
|
||||||
|
|
||||||
|
if (index == -1) {
|
||||||
|
throw new DotenvFormatException("File " + envFile + " has a malformed line with content \"" + line + "\"");
|
||||||
|
}
|
||||||
|
|
||||||
|
String property = line.substring(0, index).trim();
|
||||||
|
|
||||||
|
if (property.equals("")) {
|
||||||
|
throw new DotenvFormatException("File " + envFile + " has a malformed line with content \"" + line + "\"");
|
||||||
|
}
|
||||||
|
|
||||||
|
String value = line.substring(index + 1);
|
||||||
|
|
||||||
|
return ImmutablePair.of(property, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isComment(String line) {
|
||||||
|
return line.startsWith("#") || line.startsWith("//");
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isWhitespace(String line) {
|
||||||
|
return line.trim().equals("");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void set(String property, String value) {
|
||||||
|
|
||||||
|
if (!properties.containsKey(property)) {
|
||||||
|
additionalProperties.add(property);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.properties.put(property, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -18,6 +18,7 @@
|
||||||
package io.openvidu.server.config;
|
package io.openvidu.server.config;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
import java.net.Inet6Address;
|
import java.net.Inet6Address;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
@ -50,6 +51,7 @@ import com.google.gson.JsonSyntaxException;
|
||||||
import io.openvidu.java.client.OpenViduRole;
|
import io.openvidu.java.client.OpenViduRole;
|
||||||
import io.openvidu.server.OpenViduServer;
|
import io.openvidu.server.OpenViduServer;
|
||||||
import io.openvidu.server.cdr.CDREventName;
|
import io.openvidu.server.cdr.CDREventName;
|
||||||
|
import io.openvidu.server.config.Dotenv.DotenvFormatException;
|
||||||
import io.openvidu.server.recording.RecordingNotification;
|
import io.openvidu.server.recording.RecordingNotification;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
|
@ -96,6 +98,8 @@ public class OpenviduConfig {
|
||||||
@Autowired
|
@Autowired
|
||||||
protected Environment env;
|
protected Environment env;
|
||||||
|
|
||||||
|
protected Dotenv dotenv;
|
||||||
|
|
||||||
@Value("#{'${spring.profiles.active:}'.length() > 0 ? '${spring.profiles.active:}'.split(',') : \"default\"}")
|
@Value("#{'${spring.profiles.active:}'.length() > 0 ? '${spring.profiles.active:}'.split(',') : \"default\"}")
|
||||||
protected String springProfile;
|
protected String springProfile;
|
||||||
|
|
||||||
|
@ -371,7 +375,17 @@ public class OpenviduConfig {
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
protected void checkConfigurationProperties() {
|
protected void checkConfigurationProperties() {
|
||||||
|
|
||||||
|
dotenv = new Dotenv();
|
||||||
|
|
||||||
|
try {
|
||||||
|
dotenv.read();
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.warn("Exception reading .env file. "+ e.getClass()+":"+e.getMessage());
|
||||||
|
} catch (DotenvFormatException e) {
|
||||||
|
log.warn("Format error in .env file. "+ e.getClass()+":"+e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.checkConfigurationParameters();
|
this.checkConfigurationParameters();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -556,8 +570,15 @@ public class OpenviduConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> checkKmsUris() {
|
public List<String> checkKmsUris() {
|
||||||
|
|
||||||
String property = "kms.uris";
|
String property = "kms.uris";
|
||||||
String kmsUris = getConfigValue(property);
|
|
||||||
|
return asKmsUris(property, getConfigValue(property));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> asKmsUris(String property, String kmsUris) {
|
||||||
|
|
||||||
if (kmsUris == null || kmsUris.isEmpty()) {
|
if (kmsUris == null || kmsUris.isEmpty()) {
|
||||||
return Arrays.asList();
|
return Arrays.asList();
|
||||||
}
|
}
|
||||||
|
@ -577,7 +598,6 @@ public class OpenviduConfig {
|
||||||
addError(property, uri + " is not a valid WebSocket URL");
|
addError(property, uri + " is not a valid WebSocket URL");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return kmsUrisArray;
|
return kmsUrisArray;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
|
@ -90,15 +91,6 @@ public class KurentoParticipant extends Participant {
|
||||||
this.publisher = new PublisherEndpoint(endpointType, this, participant.getParticipantPublicId(),
|
this.publisher = new PublisherEndpoint(endpointType, this, participant.getParticipantPublicId(),
|
||||||
this.session.getPipeline(), this.openviduConfig, null);
|
this.session.getPipeline(), this.openviduConfig, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Participant other : session.getParticipants()) {
|
|
||||||
if (!other.getParticipantPublicId().equals(this.getParticipantPublicId())
|
|
||||||
&& !OpenViduRole.SUBSCRIBER.equals(other.getToken().getRole())) {
|
|
||||||
// Initialize a SubscriberEndpoint for each other user connected with PUBLISHER
|
|
||||||
// or MODERATOR role
|
|
||||||
getNewOrExistingSubscriber(other.getParticipantPublicId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void createPublishingEndpoint(MediaOptions mediaOptions, String streamId) {
|
public void createPublishingEndpoint(MediaOptions mediaOptions, String streamId) {
|
||||||
|
@ -226,92 +218,126 @@ public class KurentoParticipant extends Participant {
|
||||||
|
|
||||||
KurentoParticipant kSender = (KurentoParticipant) sender;
|
KurentoParticipant kSender = (KurentoParticipant) sender;
|
||||||
|
|
||||||
if (kSender.getPublisher() == null) {
|
if (kSender.streaming && kSender.getPublisher() != null
|
||||||
log.warn("PARTICIPANT {}: Trying to connect to a user without a publishing endpoint",
|
&& kSender.getPublisher().closingLock.readLock().tryLock()) {
|
||||||
this.getParticipantPublicId());
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(),
|
|
||||||
senderName);
|
|
||||||
|
|
||||||
SubscriberEndpoint subscriber = getNewOrExistingSubscriber(senderName);
|
|
||||||
|
|
||||||
try {
|
|
||||||
CountDownLatch subscriberLatch = new CountDownLatch(1);
|
|
||||||
Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch);
|
|
||||||
try {
|
try {
|
||||||
if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) {
|
log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(),
|
||||||
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
|
senderName);
|
||||||
"Timeout reached when creating subscriber endpoint");
|
|
||||||
|
SubscriberEndpoint subscriber = getNewOrExistingSubscriber(senderName);
|
||||||
|
|
||||||
|
try {
|
||||||
|
CountDownLatch subscriberLatch = new CountDownLatch(1);
|
||||||
|
Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch);
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) {
|
||||||
|
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
|
||||||
|
"Timeout reached when creating subscriber endpoint");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
|
||||||
|
"Interrupted when creating subscriber endpoint: " + e.getMessage());
|
||||||
|
}
|
||||||
|
if (oldMediaEndpoint != null) {
|
||||||
|
log.warn(
|
||||||
|
"PARTICIPANT {}: Two threads are trying to create at "
|
||||||
|
+ "the same time a subscriber endpoint for user {}",
|
||||||
|
this.getParticipantPublicId(), senderName);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (subscriber.getEndpoint() == null) {
|
||||||
|
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
|
||||||
|
"Unable to create subscriber endpoint");
|
||||||
|
}
|
||||||
|
|
||||||
|
String subscriberEndpointName = this.getParticipantPublicId() + "_"
|
||||||
|
+ kSender.getPublisherStreamId();
|
||||||
|
|
||||||
|
subscriber.setEndpointName(subscriberEndpointName);
|
||||||
|
subscriber.getEndpoint().setName(subscriberEndpointName);
|
||||||
|
subscriber.setStreamId(kSender.getPublisherStreamId());
|
||||||
|
|
||||||
|
endpointConfig.addEndpointListeners(subscriber, "subscriber");
|
||||||
|
|
||||||
|
} catch (OpenViduException e) {
|
||||||
|
this.subscribers.remove(senderName);
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
|
log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(),
|
||||||
"Interrupted when creating subscriber endpoint: " + e.getMessage());
|
senderName);
|
||||||
|
try {
|
||||||
|
String sdpAnswer = subscriber.subscribe(sdpOffer, kSender.getPublisher());
|
||||||
|
log.trace("PARTICIPANT {}: Subscribing SdpAnswer is {}", this.getParticipantPublicId(), sdpAnswer);
|
||||||
|
log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(),
|
||||||
|
senderName, this.session.getSessionId());
|
||||||
|
|
||||||
|
if (!silent
|
||||||
|
&& !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) {
|
||||||
|
endpointConfig.getCdr().recordNewSubscriber(this, this.session.getSessionId(),
|
||||||
|
sender.getPublisherStreamId(), sender.getParticipantPublicId(), subscriber.createdAt());
|
||||||
|
}
|
||||||
|
|
||||||
|
return sdpAnswer;
|
||||||
|
} catch (KurentoServerException e) {
|
||||||
|
// TODO Check object status when KurentoClient sets this info in the object
|
||||||
|
if (e.getCode() == 40101) {
|
||||||
|
log.warn(
|
||||||
|
"Publisher endpoint was already released when trying to connect a subscriber endpoint to it",
|
||||||
|
e);
|
||||||
|
} else {
|
||||||
|
log.error("Exception connecting subscriber endpoint to publisher endpoint", e);
|
||||||
|
}
|
||||||
|
this.subscribers.remove(senderName);
|
||||||
|
releaseSubscriberEndpoint(senderName, (KurentoParticipant) sender, subscriber, null, false);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
kSender.getPublisher().closingLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
if (oldMediaEndpoint != null) {
|
} else {
|
||||||
log.warn(
|
log.error(
|
||||||
"PARTICIPANT {}: Two threads are trying to create at "
|
"PublisherEndpoint of participant {} of session {} is closed. Participant {} couldn't subscribe to it ",
|
||||||
+ "the same time a subscriber endpoint for user {}",
|
senderName, sender.getSessionId(), this.participantPublicId);
|
||||||
this.getParticipantPublicId(), senderName);
|
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
|
||||||
return null;
|
"Unable to create subscriber endpoint. Publisher endpoint of participant " + senderName
|
||||||
}
|
+ "is closed");
|
||||||
if (subscriber.getEndpoint() == null) {
|
|
||||||
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint");
|
|
||||||
}
|
|
||||||
|
|
||||||
String subscriberEndpointName = this.getParticipantPublicId() + "_" + kSender.getPublisherStreamId();
|
|
||||||
|
|
||||||
subscriber.setEndpointName(subscriberEndpointName);
|
|
||||||
subscriber.getEndpoint().setName(subscriberEndpointName);
|
|
||||||
subscriber.setStreamId(kSender.getPublisherStreamId());
|
|
||||||
|
|
||||||
endpointConfig.addEndpointListeners(subscriber, "subscriber");
|
|
||||||
|
|
||||||
} catch (OpenViduException e) {
|
|
||||||
this.subscribers.remove(senderName);
|
|
||||||
throw e;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(), senderName);
|
|
||||||
try {
|
|
||||||
String sdpAnswer = subscriber.subscribe(sdpOffer, kSender.getPublisher());
|
|
||||||
log.trace("PARTICIPANT {}: Subscribing SdpAnswer is {}", this.getParticipantPublicId(), sdpAnswer);
|
|
||||||
log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(),
|
|
||||||
senderName, this.session.getSessionId());
|
|
||||||
|
|
||||||
if (!silent && !ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) {
|
|
||||||
endpointConfig.getCdr().recordNewSubscriber(this, this.session.getSessionId(),
|
|
||||||
sender.getPublisherStreamId(), sender.getParticipantPublicId(), subscriber.createdAt());
|
|
||||||
}
|
|
||||||
|
|
||||||
return sdpAnswer;
|
|
||||||
} catch (KurentoServerException e) {
|
|
||||||
// TODO Check object status when KurentoClient sets this info in the object
|
|
||||||
if (e.getCode() == 40101) {
|
|
||||||
log.warn("Publisher endpoint was already released when trying "
|
|
||||||
+ "to connect a subscriber endpoint to it", e);
|
|
||||||
} else {
|
|
||||||
log.error("Exception connecting subscriber endpoint " + "to publisher endpoint", e);
|
|
||||||
}
|
|
||||||
this.subscribers.remove(senderName);
|
|
||||||
releaseSubscriberEndpoint((KurentoParticipant) sender, subscriber, null, false);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void cancelReceivingMedia(KurentoParticipant senderKurentoParticipant, EndReason reason, boolean silent) {
|
public void cancelReceivingMedia(KurentoParticipant senderKurentoParticipant, EndReason reason, boolean silent) {
|
||||||
final String senderName = senderKurentoParticipant.getParticipantPublicId();
|
final String senderName = senderKurentoParticipant.getParticipantPublicId();
|
||||||
|
final PublisherEndpoint pub = senderKurentoParticipant.publisher;
|
||||||
log.info("PARTICIPANT {}: cancel receiving media from {}", this.getParticipantPublicId(), senderName);
|
if (pub != null) {
|
||||||
SubscriberEndpoint subscriberEndpoint = subscribers.remove(senderName);
|
try {
|
||||||
if (subscriberEndpoint == null || subscriberEndpoint.getEndpoint() == null) {
|
if (pub.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
|
||||||
log.warn("PARTICIPANT {}: Trying to cancel receiving video from user {}. "
|
try {
|
||||||
+ "But there is no such subscriber endpoint.", this.getParticipantPublicId(), senderName);
|
log.info("PARTICIPANT {}: cancel receiving media from {}", this.getParticipantPublicId(),
|
||||||
} else {
|
senderName);
|
||||||
releaseSubscriberEndpoint(senderKurentoParticipant, subscriberEndpoint, reason, silent);
|
SubscriberEndpoint subscriberEndpoint = subscribers.remove(senderName);
|
||||||
log.info("PARTICIPANT {}: stopped receiving media from {} in room {}", this.getParticipantPublicId(),
|
if (subscriberEndpoint == null) {
|
||||||
senderName, this.session.getSessionId());
|
log.warn(
|
||||||
|
"PARTICIPANT {}: Trying to cancel receiving video from user {}. "
|
||||||
|
+ "But there is no such subscriber endpoint.",
|
||||||
|
this.getParticipantPublicId(), senderName);
|
||||||
|
} else {
|
||||||
|
releaseSubscriberEndpoint(senderName, senderKurentoParticipant, subscriberEndpoint, reason,
|
||||||
|
silent);
|
||||||
|
log.info("PARTICIPANT {}: stopped receiving media from {} in room {}",
|
||||||
|
this.getParticipantPublicId(), senderName, this.session.getSessionId());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
pub.closingLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
subscribers.remove(senderName);
|
||||||
|
log.error(
|
||||||
|
"Timeout wating for PublisherEndpoint closing lock of participant {} to be available for participant {} to call cancelReceveivingMedia",
|
||||||
|
senderName, this.getParticipantPublicId());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,7 +356,7 @@ public class KurentoParticipant extends Participant {
|
||||||
it.remove();
|
it.remove();
|
||||||
if (subscriber != null && subscriber.getEndpoint() != null) {
|
if (subscriber != null && subscriber.getEndpoint() != null) {
|
||||||
|
|
||||||
releaseSubscriberEndpoint(
|
releaseSubscriberEndpoint(remoteParticipantName,
|
||||||
(KurentoParticipant) this.session.getParticipantByPublicId(remoteParticipantName), subscriber,
|
(KurentoParticipant) this.session.getParticipantByPublicId(remoteParticipantName), subscriber,
|
||||||
reason, false);
|
reason, false);
|
||||||
log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(),
|
log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(),
|
||||||
|
@ -367,7 +393,6 @@ public class KurentoParticipant extends Participant {
|
||||||
log.debug("PARTICIPANT {}: New subscriber endpoint to user {}", this.getParticipantPublicId(),
|
log.debug("PARTICIPANT {}: New subscriber endpoint to user {}", this.getParticipantPublicId(),
|
||||||
senderPublicId);
|
senderPublicId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return subscriberEndpoint;
|
return subscriberEndpoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -391,52 +416,71 @@ public class KurentoParticipant extends Participant {
|
||||||
|
|
||||||
private void releasePublisherEndpoint(EndReason reason, long kmsDisconnectionTime) {
|
private void releasePublisherEndpoint(EndReason reason, long kmsDisconnectionTime) {
|
||||||
if (publisher != null && publisher.getEndpoint() != null) {
|
if (publisher != null && publisher.getEndpoint() != null) {
|
||||||
|
final ReadWriteLock closingLock = publisher.closingLock;
|
||||||
// Remove streamId from publisher's map
|
try {
|
||||||
this.session.publishedStreamIds.remove(this.getPublisherStreamId());
|
if (closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
|
||||||
|
try {
|
||||||
if (this.openviduConfig.isRecordingModuleEnabled()
|
this.releasePublisherEndpointAux(reason, kmsDisconnectionTime);
|
||||||
&& this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) {
|
} finally {
|
||||||
this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId(),
|
closingLock.writeLock().unlock();
|
||||||
kmsDisconnectionTime);
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error(
|
||||||
|
"Timeout wating for PublisherEndpoint closing lock of participant {} to be available to call releasePublisherEndpoint",
|
||||||
|
this.participantPublicId, this.getParticipantPublicId());
|
||||||
|
log.error("Forcing PublisherEndpoint release. Possibly some session event will be incomplete");
|
||||||
|
this.releasePublisherEndpointAux(reason, kmsDisconnectionTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
publisher.unregisterErrorListeners();
|
|
||||||
publisher.cancelStatsLoop.set(true);
|
|
||||||
|
|
||||||
for (MediaElement el : publisher.getMediaElements()) {
|
|
||||||
releaseElement(getParticipantPublicId(), el);
|
|
||||||
}
|
|
||||||
releaseElement(getParticipantPublicId(), publisher.getEndpoint());
|
|
||||||
this.streaming = false;
|
|
||||||
this.session.deregisterPublisher();
|
|
||||||
|
|
||||||
endpointConfig.getCdr().stopPublisher(this.getParticipantPublicId(), publisher.getStreamId(), reason);
|
|
||||||
publisher = null;
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
log.warn("PARTICIPANT {}: Trying to release publisher endpoint but is null", getParticipantPublicId());
|
log.warn("PARTICIPANT {}: Trying to release publisher endpoint but is null", getParticipantPublicId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void releaseSubscriberEndpoint(KurentoParticipant senderKurentoParticipant, SubscriberEndpoint subscriber,
|
private void releasePublisherEndpointAux(EndReason reason, long kmsDisconnectionTime) {
|
||||||
EndReason reason, boolean silent) {
|
// Remove streamId from publisher's map
|
||||||
final String senderName = senderKurentoParticipant.getParticipantPublicId();
|
this.session.publishedStreamIds.remove(this.getPublisherStreamId());
|
||||||
|
|
||||||
|
if (this.openviduConfig.isRecordingModuleEnabled()
|
||||||
|
&& this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) {
|
||||||
|
this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId(),
|
||||||
|
kmsDisconnectionTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
publisher.unregisterErrorListeners();
|
||||||
|
publisher.cancelStatsLoop.set(true);
|
||||||
|
|
||||||
|
for (MediaElement el : publisher.getMediaElements()) {
|
||||||
|
releaseElement(getParticipantPublicId(), el);
|
||||||
|
}
|
||||||
|
releaseElement(getParticipantPublicId(), publisher.getEndpoint());
|
||||||
|
this.streaming = false;
|
||||||
|
this.session.deregisterPublisher();
|
||||||
|
|
||||||
|
endpointConfig.getCdr().stopPublisher(this.getParticipantPublicId(), publisher.getStreamId(), reason);
|
||||||
|
publisher = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void releaseSubscriberEndpoint(String senderName, KurentoParticipant publisherParticipant,
|
||||||
|
SubscriberEndpoint subscriber, EndReason reason, boolean silent) {
|
||||||
|
|
||||||
if (subscriber != null) {
|
if (subscriber != null) {
|
||||||
|
|
||||||
subscriber.unregisterErrorListeners();
|
subscriber.unregisterErrorListeners();
|
||||||
subscriber.cancelStatsLoop.set(true);
|
subscriber.cancelStatsLoop.set(true);
|
||||||
|
|
||||||
releaseElement(senderName, subscriber.getEndpoint());
|
if (subscriber.getEndpoint() != null) {
|
||||||
|
releaseElement(senderName, subscriber.getEndpoint());
|
||||||
|
}
|
||||||
|
|
||||||
if (!silent) {
|
if (!silent) {
|
||||||
|
|
||||||
// Stop PlayerEndpoint of IP CAM if last subscriber disconnected
|
// Stop PlayerEndpoint of IP CAM if last subscriber disconnected
|
||||||
final PublisherEndpoint senderPublisher = senderKurentoParticipant.publisher;
|
if (publisherParticipant != null && publisherParticipant.publisher != null) {
|
||||||
if (senderPublisher != null) {
|
final PublisherEndpoint senderPublisher = publisherParticipant.publisher;
|
||||||
// If no PublisherEndpoint, then it means that the publisher already closed it
|
// If no PublisherEndpoint, then it means that the publisher already closed it
|
||||||
final KurentoMediaOptions options = (KurentoMediaOptions) senderPublisher.getMediaOptions();
|
final KurentoMediaOptions options = (KurentoMediaOptions) senderPublisher.getMediaOptions();
|
||||||
if (options.onlyPlayWithSubscribers != null && options.onlyPlayWithSubscribers) {
|
if (options != null && options.onlyPlayWithSubscribers != null && options.onlyPlayWithSubscribers) {
|
||||||
synchronized (senderPublisher) {
|
synchronized (senderPublisher) {
|
||||||
senderPublisher.numberOfSubscribers--;
|
senderPublisher.numberOfSubscribers--;
|
||||||
if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) {
|
if (senderPublisher.isPlayerEndpoint() && senderPublisher.numberOfSubscribers == 0) {
|
||||||
|
|
|
@ -89,15 +89,6 @@ public class KurentoSession extends Session {
|
||||||
|
|
||||||
public void newPublisher(Participant participant) {
|
public void newPublisher(Participant participant) {
|
||||||
registerPublisher();
|
registerPublisher();
|
||||||
|
|
||||||
// pre-load endpoints to recv video from the new publisher
|
|
||||||
for (Participant p : participants.values()) {
|
|
||||||
if (participant.equals(p)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
((KurentoParticipant) p).getNewOrExistingSubscriber(participant.getParticipantPublicId());
|
|
||||||
}
|
|
||||||
|
|
||||||
log.debug("SESSION {}: Virtually subscribed other participants {} to new publisher {}", sessionId,
|
log.debug("SESSION {}: Virtually subscribed other participants {} to new publisher {}", sessionId,
|
||||||
participants.values(), participant.getParticipantPublicId());
|
participants.values(), participant.getParticipantPublicId());
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,8 @@ import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.kurento.client.Continuation;
|
import org.kurento.client.Continuation;
|
||||||
import org.kurento.client.GenericMediaElement;
|
import org.kurento.client.GenericMediaElement;
|
||||||
|
@ -76,6 +78,16 @@ public class PublisherEndpoint extends MediaEndpoint {
|
||||||
|
|
||||||
public int numberOfSubscribers = 0;
|
public int numberOfSubscribers = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This lock protects the following method with read lock:
|
||||||
|
* KurentoParticipant#receiveMediaFrom. It uses tryLock, immediately failing if
|
||||||
|
* written locked
|
||||||
|
*
|
||||||
|
* Lock is written-locked upon KurentoParticipant#releasePublisherEndpoint and
|
||||||
|
* KurentoParticipant#cancelReceivingMedia
|
||||||
|
*/
|
||||||
|
public ReadWriteLock closingLock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
public PublisherEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName,
|
public PublisherEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName,
|
||||||
MediaPipeline pipeline, OpenviduConfig openviduConfig, PassThrough passThru) {
|
MediaPipeline pipeline, OpenviduConfig openviduConfig, PassThrough passThru) {
|
||||||
super(endpointType, owner, endpointName, pipeline, openviduConfig, log);
|
super(endpointType, owner, endpointName, pipeline, openviduConfig, log);
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
package io.openvidu.server.config;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.StandardCopyOption;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import io.openvidu.server.config.Dotenv.DotenvFormatException;
|
||||||
|
|
||||||
|
class DotenvTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void loadTest() throws IOException, DotenvFormatException {
|
||||||
|
|
||||||
|
// Given
|
||||||
|
Path envFile = createDotenvTestFile();
|
||||||
|
|
||||||
|
Dotenv dotenv = new Dotenv();
|
||||||
|
|
||||||
|
// When
|
||||||
|
dotenv.read(envFile);
|
||||||
|
|
||||||
|
// Then
|
||||||
|
assertEquals("value1", dotenv.get("PROPERTY_ONE"));
|
||||||
|
assertEquals("value2", dotenv.get("PROPERTY_TWO"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void writeSameFileTest() throws IOException, DotenvFormatException {
|
||||||
|
|
||||||
|
// Given
|
||||||
|
Path envFile = createDotenvTestFile();
|
||||||
|
String originalEnvContent = new String(Files.readAllBytes(envFile));
|
||||||
|
|
||||||
|
Dotenv dotenv = new Dotenv();
|
||||||
|
|
||||||
|
// When
|
||||||
|
dotenv.read(envFile);
|
||||||
|
|
||||||
|
Files.delete(envFile);
|
||||||
|
|
||||||
|
dotenv.write();
|
||||||
|
String recreatedEnvContent = new String(Files.readAllBytes(envFile));
|
||||||
|
|
||||||
|
// Then
|
||||||
|
assertEquals(originalEnvContent, recreatedEnvContent);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void writeModifiedFileTest() throws IOException, DotenvFormatException {
|
||||||
|
|
||||||
|
// Given
|
||||||
|
Path envFile = createDotenvTestFile();
|
||||||
|
|
||||||
|
Dotenv dotenv = new Dotenv();
|
||||||
|
|
||||||
|
// When
|
||||||
|
dotenv.read(envFile);
|
||||||
|
|
||||||
|
dotenv.set("PROPERTY_ONE", "value_one");
|
||||||
|
dotenv.set("PROPERTY_TWO", "value_two");
|
||||||
|
dotenv.set("PROPERTY_THREE", "value_three");
|
||||||
|
|
||||||
|
dotenv.write();
|
||||||
|
|
||||||
|
Dotenv updDotenv = new Dotenv();
|
||||||
|
updDotenv.read(envFile);
|
||||||
|
|
||||||
|
// Then
|
||||||
|
assertEquals("value_one", updDotenv.get("PROPERTY_ONE"));
|
||||||
|
assertEquals("value_two", updDotenv.get("PROPERTY_TWO"));
|
||||||
|
assertEquals("value_three", updDotenv.get("PROPERTY_THREE"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path createDotenvTestFile() throws IOException {
|
||||||
|
Path envFile = Files.createTempFile("env", ".tmp");
|
||||||
|
Files.copy(getClass().getResourceAsStream("/.env"), envFile, StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
return envFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
# Comment 1
|
||||||
|
PROPERTY_ONE=value1
|
||||||
|
|
||||||
|
# Comment 2
|
||||||
|
PROPERTY_TWO=value2
|
Loading…
Reference in New Issue