openvidu-server: allow publishing IP cameras

pull/370/head
pabloFuente 2019-10-30 10:25:34 +01:00
parent 91daa9aa44
commit 6ac4082613
17 changed files with 492 additions and 194 deletions

View File

@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
import io.openvidu.server.core.EndReason; import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant; import io.openvidu.server.core.Participant;
import io.openvidu.server.kurento.core.KurentoMediaOptions;
public class CDREventWebrtcConnection extends CDREventEnd implements Comparable<CDREventWebrtcConnection> { public class CDREventWebrtcConnection extends CDREventEnd implements Comparable<CDREventWebrtcConnection> {
@ -63,6 +64,13 @@ public class CDREventWebrtcConnection extends CDREventEnd implements Comparable<
json.addProperty("receivingFrom", this.receivingFrom); json.addProperty("receivingFrom", this.receivingFrom);
} else { } else {
json.addProperty("connection", "OUTBOUND"); json.addProperty("connection", "OUTBOUND");
if (mediaOptions instanceof KurentoMediaOptions) {
KurentoMediaOptions kMediaOptions = (KurentoMediaOptions)mediaOptions;
if (kMediaOptions.rtspUri != null) {
json.addProperty("rtspUri", kMediaOptions.rtspUri);
json.addProperty("adaptativeBitrate", kMediaOptions.adaptativeBitrate);
}
}
} }
if (this.mediaOptions.hasVideo()) { if (this.mediaOptions.hasVideo()) {
json.addProperty("videoSource", this.mediaOptions.getTypeOfVideo()); json.addProperty("videoSource", this.mediaOptions.getTypeOfVideo());

View File

@ -65,7 +65,7 @@ import io.openvidu.server.webhook.CDRLoggerWebhook;
* - receivingFrom: string * - receivingFrom: string
* - audioEnabled: boolean * - audioEnabled: boolean
* - videoEnabled: boolean * - videoEnabled: boolean
* - videoSource: "CAMERA", "SCREEN" * - videoSource: "CAMERA", "SCREEN", "CUSTOM", "IPCAM"
* - videoFramerate: number * - videoFramerate: number
* - videoDimensions: string * - videoDimensions: string
* - id: string * - id: string

View File

@ -51,9 +51,12 @@ public class Participant {
this.createdAt = System.currentTimeMillis(); this.createdAt = System.currentTimeMillis();
} }
this.token = token; this.token = token;
this.clientMetadata = clientMetadata; if (clientMetadata != null) {
if (!token.getServerMetadata().isEmpty()) this.clientMetadata = clientMetadata;
}
if (!token.getServerMetadata().isEmpty()) {
this.serverMetadata = token.getServerMetadata(); this.serverMetadata = token.getServerMetadata();
}
this.location = location; this.location = location;
this.platform = platform; this.platform = platform;
} }
@ -134,6 +137,10 @@ public class Participant {
return closed; return closed;
} }
public boolean isIpcam() {
return this.platform.equals("IPCAM") && this.participantPrivatetId.startsWith("IPCAM-");
}
public void setStreaming(boolean streaming) { public void setStreaming(boolean streaming) {
this.streaming = streaming; this.streaming = streaming;
} }

View File

@ -36,6 +36,7 @@ import io.openvidu.java.client.RecordingLayout;
import io.openvidu.java.client.SessionProperties; import io.openvidu.java.client.SessionProperties;
import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.kurento.endpoint.EndpointType;
import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.recording.service.RecordingManager;
public class Session implements SessionInterface { public class Session implements SessionInterface {
@ -167,7 +168,7 @@ public class Session implements SessionInterface {
} }
@Override @Override
public void join(Participant participant) { public void join(Participant participant, EndpointType endpointType) {
} }
@Override @Override

View File

@ -22,6 +22,7 @@ import java.util.Set;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import io.openvidu.java.client.SessionProperties; import io.openvidu.java.client.SessionProperties;
import io.openvidu.server.kurento.endpoint.EndpointType;
public interface SessionInterface { public interface SessionInterface {
@ -29,7 +30,7 @@ public interface SessionInterface {
SessionProperties getSessionProperties(); SessionProperties getSessionProperties();
void join(Participant participant); void join(Participant participant, EndpointType endpointType);
void leave(String participantPrivateId, EndReason reason); void leave(String participantPrivateId, EndReason reason);

View File

@ -51,6 +51,7 @@ import io.openvidu.server.kurento.core.KurentoTokenOptions;
import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.recording.service.RecordingManager;
import io.openvidu.server.utils.FormatChecker; import io.openvidu.server.utils.FormatChecker;
import io.openvidu.server.utils.GeoLocation; import io.openvidu.server.utils.GeoLocation;
import io.openvidu.server.utils.GeoLocationByIp;
import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.QuarantineKiller;
public abstract class SessionManager { public abstract class SessionManager {
@ -75,6 +76,9 @@ public abstract class SessionManager {
@Autowired @Autowired
protected QuarantineKiller quarantineKiller; protected QuarantineKiller quarantineKiller;
@Autowired
protected GeoLocationByIp geoLocationByIp;
public FormatChecker formatChecker = new FormatChecker(); public FormatChecker formatChecker = new FormatChecker();
protected ConcurrentMap<String, Session> sessions = new ConcurrentHashMap<>(); protected ConcurrentMap<String, Session> sessions = new ConcurrentHashMap<>();
@ -148,6 +152,8 @@ public abstract class SessionManager {
public abstract void removeFilterEventListener(Session session, Participant subscriber, String streamId, public abstract void removeFilterEventListener(Session session, Participant subscriber, String streamId,
String eventType); String eventType);
public abstract Participant publishIpcam(Session session, MediaOptions mediaOptions) throws Exception;
public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId)
throws OpenViduException; throws OpenViduException;
@ -393,6 +399,16 @@ public abstract class SessionManager {
} }
} }
public Participant newIpcamParticipant(String sessionId, String ipcamId, Token token, GeoLocation location, String platform) {
if (this.sessionidParticipantpublicidParticipant.get(sessionId) != null) {
Participant p = new Participant(ipcamId, ipcamId, ipcamId, sessionId, token, null, location, platform, null);
this.sessionidParticipantpublicidParticipant.get(sessionId).put(ipcamId, p);
return p;
} else {
throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, sessionId);
}
}
public Token consumeToken(String sessionId, String participantPrivateId, String token) { public Token consumeToken(String sessionId, String participantPrivateId, String token) {
if (this.sessionidTokenTokenobj.get(sessionId) != null) { if (this.sessionidTokenTokenobj.get(sessionId) != null) {
Token t = this.sessionidTokenTokenobj.get(sessionId).remove(token); Token t = this.sessionidTokenTokenobj.get(sessionId).remove(token);

View File

@ -17,8 +17,7 @@
package io.openvidu.server.kurento.core; package io.openvidu.server.kurento.core;
import org.kurento.client.MediaElement; import com.google.gson.JsonObject;
import org.kurento.client.MediaType;
import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.kurento.endpoint.KurentoFilter; import io.openvidu.server.kurento.endpoint.KurentoFilter;
@ -28,21 +27,38 @@ public class KurentoMediaOptions extends MediaOptions {
public boolean isOffer; public boolean isOffer;
public String sdpOffer; public String sdpOffer;
public boolean doLoopback; public boolean doLoopback;
public MediaElement loopbackAlternativeSrc;
public MediaType loopbackConnectionType;
public MediaElement[] mediaElements;
public KurentoMediaOptions(boolean isOffer, String sdpOffer, MediaElement loopbackAlternativeSrc, // IPCAM properties
MediaType loopbackConnectionType, Boolean hasAudio, Boolean hasVideo, Boolean audioActive, public String rtspUri;
Boolean videoActive, String typeOfVideo, Integer frameRate, String videoDimensions, KurentoFilter filter, public Boolean adaptativeBitrate;
boolean doLoopback, MediaElement... mediaElements) {
public KurentoMediaOptions(boolean isOffer, String sdpOffer, Boolean hasAudio, Boolean hasVideo,
Boolean audioActive, Boolean videoActive, String typeOfVideo, Integer frameRate, String videoDimensions,
KurentoFilter filter, boolean doLoopback) {
super(hasAudio, hasVideo, audioActive, videoActive, typeOfVideo, frameRate, videoDimensions, filter); super(hasAudio, hasVideo, audioActive, videoActive, typeOfVideo, frameRate, videoDimensions, filter);
this.isOffer = isOffer; this.isOffer = isOffer;
this.sdpOffer = sdpOffer; this.sdpOffer = sdpOffer;
this.loopbackAlternativeSrc = loopbackAlternativeSrc;
this.loopbackConnectionType = loopbackConnectionType;
this.doLoopback = doLoopback; this.doLoopback = doLoopback;
this.mediaElements = mediaElements; }
public KurentoMediaOptions(boolean isOffer, String sdpOffer, Boolean hasAudio, Boolean hasVideo,
Boolean audioActive, Boolean videoActive, String typeOfVideo, Integer frameRate, String videoDimensions,
KurentoFilter filter, boolean doLoopback, String rtspUri, Boolean adaptativeBitrate) {
super(hasAudio, hasVideo, audioActive, videoActive, typeOfVideo, frameRate, videoDimensions, filter);
this.isOffer = isOffer;
this.sdpOffer = sdpOffer;
this.doLoopback = doLoopback;
this.rtspUri = rtspUri;
this.adaptativeBitrate = adaptativeBitrate;
}
@Override
public JsonObject toJson() {
JsonObject json = super.toJson();
if (adaptativeBitrate != null) {
json.addProperty("adaptativeBitrate", adaptativeBitrate);
}
return json;
} }
} }

View File

@ -26,13 +26,12 @@ import java.util.function.Function;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.kurento.client.Continuation; import org.kurento.client.Continuation;
import org.kurento.client.Endpoint;
import org.kurento.client.ErrorEvent; import org.kurento.client.ErrorEvent;
import org.kurento.client.Filter; import org.kurento.client.Filter;
import org.kurento.client.IceCandidate; import org.kurento.client.IceCandidate;
import org.kurento.client.MediaElement; import org.kurento.client.MediaElement;
import org.kurento.client.MediaPipeline; import org.kurento.client.MediaPipeline;
import org.kurento.client.MediaType;
import org.kurento.client.SdpEndpoint;
import org.kurento.client.internal.server.KurentoServerException; import org.kurento.client.internal.server.KurentoServerException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -48,6 +47,7 @@ import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.EndReason; import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant; import io.openvidu.server.core.Participant;
import io.openvidu.server.kurento.endpoint.EndpointType;
import io.openvidu.server.kurento.endpoint.MediaEndpoint; import io.openvidu.server.kurento.endpoint.MediaEndpoint;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.kurento.endpoint.SdpType; import io.openvidu.server.kurento.endpoint.SdpType;
@ -61,7 +61,7 @@ public class KurentoParticipant extends Participant {
private OpenviduConfig openviduConfig; private OpenviduConfig openviduConfig;
private RecordingManager recordingManager; private RecordingManager recordingManager;
private boolean webParticipant = true; private EndpointType endpointType;
private final KurentoSession session; private final KurentoSession session;
private KurentoParticipantEndpointConfig endpointConfig; private KurentoParticipantEndpointConfig endpointConfig;
@ -72,12 +72,13 @@ public class KurentoParticipant extends Participant {
private final ConcurrentMap<String, Filter> filters = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Filter> filters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, SubscriberEndpoint> subscribers = new ConcurrentHashMap<String, SubscriberEndpoint>(); private final ConcurrentMap<String, SubscriberEndpoint> subscribers = new ConcurrentHashMap<String, SubscriberEndpoint>();
public KurentoParticipant(Participant participant, KurentoSession kurentoSession, public KurentoParticipant(Participant participant, KurentoSession kurentoSession, EndpointType endpointType,
KurentoParticipantEndpointConfig endpointConfig, OpenviduConfig openviduConfig, KurentoParticipantEndpointConfig endpointConfig, OpenviduConfig openviduConfig,
RecordingManager recordingManager) { RecordingManager recordingManager) {
super(participant.getFinalUserId(), participant.getParticipantPrivateId(), participant.getParticipantPublicId(), super(participant.getFinalUserId(), participant.getParticipantPrivateId(), participant.getParticipantPublicId(),
kurentoSession.getSessionId(), participant.getToken(), participant.getClientMetadata(), kurentoSession.getSessionId(), participant.getToken(), participant.getClientMetadata(),
participant.getLocation(), participant.getPlatform(), participant.getCreatedAt()); participant.getLocation(), participant.getPlatform(), participant.getCreatedAt());
this.endpointType = endpointType;
this.endpointConfig = endpointConfig; this.endpointConfig = endpointConfig;
this.openviduConfig = openviduConfig; this.openviduConfig = openviduConfig;
this.recordingManager = recordingManager; this.recordingManager = recordingManager;
@ -85,7 +86,7 @@ public class KurentoParticipant extends Participant {
if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) { if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) {
// Initialize a PublisherEndpoint // Initialize a PublisherEndpoint
this.publisher = new PublisherEndpoint(webParticipant, this, participant.getParticipantPublicId(), this.publisher = new PublisherEndpoint(endpointType, this, participant.getParticipantPublicId(),
this.session.getPipeline(), this.openviduConfig); this.session.getPipeline(), this.openviduConfig);
} }
@ -101,11 +102,11 @@ public class KurentoParticipant extends Participant {
public void createPublishingEndpoint(MediaOptions mediaOptions) { public void createPublishingEndpoint(MediaOptions mediaOptions) {
publisher.setMediaOptions(mediaOptions);
publisher.createEndpoint(publisherLatch); publisher.createEndpoint(publisherLatch);
if (getPublisher().getEndpoint() == null) { if (getPublisher().getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create publisher endpoint"); throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create publisher endpoint");
} }
publisher.setMediaOptions(mediaOptions);
String publisherStreamId = this.getParticipantPublicId() + "_" String publisherStreamId = this.getParticipantPublicId() + "_"
+ (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_" + (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_"
@ -171,14 +172,12 @@ public class KurentoParticipant extends Participant {
return session; return session;
} }
public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback, public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback) {
MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) {
log.info("PARTICIPANT {}: Request to publish video in room {} (sdp type {})", this.getParticipantPublicId(), log.info("PARTICIPANT {}: Request to publish video in room {} (sdp type {})", this.getParticipantPublicId(),
this.session.getSessionId(), sdpType); this.session.getSessionId(), sdpType);
log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpString); log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpString);
String sdpResponse = this.getPublisher().publish(sdpType, sdpString, doLoopback, loopbackAlternativeSrc, String sdpResponse = this.getPublisher().publish(sdpType, sdpString, doLoopback);
loopbackConnectionType);
this.streaming = true; this.streaming = true;
log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpResponse); log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpResponse);
@ -200,7 +199,7 @@ public class KurentoParticipant extends Participant {
log.info("PARTICIPANT {}: unpublishing media stream from room {}", this.getParticipantPublicId(), log.info("PARTICIPANT {}: unpublishing media stream from room {}", this.getParticipantPublicId(),
this.session.getSessionId()); this.session.getSessionId());
releasePublisherEndpoint(reason, kmsDisconnectionTime); releasePublisherEndpoint(reason, kmsDisconnectionTime);
this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), this.getPipeline(), this.publisher = new PublisherEndpoint(endpointType, this, this.getParticipantPublicId(), this.getPipeline(),
this.openviduConfig); this.openviduConfig);
log.info("PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)", log.info("PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)",
this.getParticipantPublicId()); this.getParticipantPublicId());
@ -233,7 +232,7 @@ public class KurentoParticipant extends Participant {
try { try {
CountDownLatch subscriberLatch = new CountDownLatch(1); CountDownLatch subscriberLatch = new CountDownLatch(1);
SdpEndpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch); Endpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch);
try { try {
if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
@ -339,7 +338,7 @@ public class KurentoParticipant extends Participant {
* @return the endpoint instance * @return the endpoint instance
*/ */
public SubscriberEndpoint getNewOrExistingSubscriber(String senderPublicId) { public SubscriberEndpoint getNewOrExistingSubscriber(String senderPublicId) {
SubscriberEndpoint subscriberEndpoint = new SubscriberEndpoint(webParticipant, this, senderPublicId, SubscriberEndpoint subscriberEndpoint = new SubscriberEndpoint(endpointType, this, senderPublicId,
this.getPipeline(), this.openviduConfig); this.getPipeline(), this.openviduConfig);
SubscriberEndpoint existingSendingEndpoint = this.subscribers.putIfAbsent(senderPublicId, subscriberEndpoint); SubscriberEndpoint existingSendingEndpoint = this.subscribers.putIfAbsent(senderPublicId, subscriberEndpoint);
@ -459,7 +458,7 @@ public class KurentoParticipant extends Participant {
public void resetPublisherEndpoint() { public void resetPublisherEndpoint() {
log.info("Reseting publisher endpoint for participant {}", this.getParticipantPublicId()); log.info("Reseting publisher endpoint for participant {}", this.getParticipantPublicId());
this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), this.publisher = new PublisherEndpoint(endpointType, this, this.getParticipantPublicId(),
this.session.getPipeline(), this.openviduConfig); this.session.getPipeline(), this.openviduConfig);
} }

View File

@ -17,6 +17,10 @@
package io.openvidu.server.kurento.core; package io.openvidu.server.kurento.core;
import org.kurento.client.BaseRtpEndpoint;
import org.kurento.client.Endpoint;
import org.kurento.client.PlayerEndpoint;
import org.kurento.client.WebRtcEndpoint;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -39,7 +43,140 @@ public class KurentoParticipantEndpointConfig {
public void addEndpointListeners(MediaEndpoint endpoint, String typeOfEndpoint) { public void addEndpointListeners(MediaEndpoint endpoint, String typeOfEndpoint) {
endpoint.getWebEndpoint().addMediaFlowInStateChangeListener(event -> { // WebRtcEndpoint events
if (endpoint.getWebEndpoint() != null) {
final WebRtcEndpoint finalEndpoint = endpoint.getWebEndpoint();
finalEndpoint.addIceGatheringDoneListener(event -> {
String msg = "KMS event [IceGatheringDone] -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | timestamp: " + event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
finalEndpoint.addNewCandidatePairSelectedListener(event -> {
endpoint.selectedLocalIceCandidate = event.getCandidatePair().getLocalCandidate();
endpoint.selectedRemoteIceCandidate = event.getCandidatePair().getRemoteCandidate();
String msg = "KMS event [NewCandidatePairSelected]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | local: " + endpoint.selectedLocalIceCandidate + " | remote: "
+ endpoint.selectedRemoteIceCandidate + " | timestamp: " + event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
finalEndpoint.addIceComponentStateChangeListener(event -> {
String msg = "KMS event [IceComponentStateChange]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | state: " + event.getState().name() + " | componentId: "
+ event.getComponentId() + " | streamId: " + event.getStreamId() + " | timestamp: "
+ event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
finalEndpoint.addDataChannelOpenListener(event -> {
String msg = "KMS event [DataChannelOpenEvent]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | channelId: " + event.getChannelId() + " | timestamp: "
+ event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
finalEndpoint.addDataChannelCloseListener(event -> {
String msg = "KMS event [DataChannelCloseEvent]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | channelId: " + event.getChannelId() + " | timestamp: "
+ event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
}
// PlayerEndpoint events
if (endpoint.getPlayerEndpoint() != null) {
final PlayerEndpoint finalEndpoint = endpoint.getPlayerEndpoint();
finalEndpoint.addEndOfStreamListener(event -> {
String msg = "KMS event [EndOfStreamEvent]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | timestamp: " + event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
finalEndpoint.addUriEndpointStateChangedListener(event -> {
String msg = "KMS event [UriEndpointStateChangedEvent]: -> endpoint: " + endpoint.getEndpointName()
+ " (" + typeOfEndpoint + ") | state: " + event.getState().name() + " | timestamp: "
+ event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
}
// BaseRtpEndpoint events
if (endpoint.getWebEndpoint() != null || endpoint.getRtpEndpoint() != null) {
final BaseRtpEndpoint finalEndpoint = ((BaseRtpEndpoint) endpoint.getEndpoint());
finalEndpoint.addConnectionStateChangedListener(event -> {
String msg = "KMS event [ConnectionStateChanged]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | oldState: " + event.getOldState() + " | newState: "
+ event.getNewState() + " | timestamp: " + event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
finalEndpoint.addMediaStateChangedListener(event -> {
String msg = "KMS event [MediaStateChangedEvent]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | oldState: " + event.getOldState() + " | newState: "
+ event.getNewState() + " | timestamp: " + event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
}
// Endpoint events
final Endpoint finalEndpoint = endpoint.getEndpoint();
finalEndpoint.addMediaFlowInStateChangeListener(event -> {
String msg = "KMS event [MediaFlowInStateChange] -> endpoint: " + endpoint.getEndpointName() + " (" String msg = "KMS event [MediaFlowInStateChange] -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | state: " + event.getState() + " | pad: " + event.getPadName() + typeOfEndpoint + ") | state: " + event.getState() + " | pad: " + event.getPadName()
+ " | mediaType: " + event.getMediaType() + " | timestamp: " + event.getTimestampMillis(); + " | mediaType: " + event.getMediaType() + " | timestamp: " + event.getTimestampMillis();
@ -51,7 +188,7 @@ public class KurentoParticipantEndpointConfig {
log.info(msg); log.info(msg);
}); });
endpoint.getWebEndpoint().addMediaFlowOutStateChangeListener(event -> { finalEndpoint.addMediaFlowOutStateChangeListener(event -> {
String msg = "KMS event [MediaFlowOutStateChange] -> endpoint: " + endpoint.getEndpointName() + " (" String msg = "KMS event [MediaFlowOutStateChange] -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | state: " + event.getState() + " | pad: " + event.getPadName() + typeOfEndpoint + ") | state: " + event.getState() + " | pad: " + event.getPadName()
+ " | mediaType: " + event.getMediaType() + " | timestamp: " + event.getTimestampMillis(); + " | mediaType: " + event.getMediaType() + " | timestamp: " + event.getTimestampMillis();
@ -63,71 +200,7 @@ public class KurentoParticipantEndpointConfig {
log.info(msg); log.info(msg);
}); });
endpoint.getWebEndpoint().addIceGatheringDoneListener(event -> { finalEndpoint.addErrorListener(event -> {
String msg = "KMS event [IceGatheringDone] -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | timestamp: " + event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
endpoint.getWebEndpoint().addConnectionStateChangedListener(event -> {
String msg = "KMS event [ConnectionStateChanged]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | oldState: " + event.getOldState() + " | newState: " + event.getNewState()
+ " | timestamp: " + event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
endpoint.getWebEndpoint().addNewCandidatePairSelectedListener(event -> {
endpoint.selectedLocalIceCandidate = event.getCandidatePair().getLocalCandidate();
endpoint.selectedRemoteIceCandidate = event.getCandidatePair().getRemoteCandidate();
String msg = "KMS event [NewCandidatePairSelected]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | local: " + endpoint.selectedLocalIceCandidate + " | remote: "
+ endpoint.selectedRemoteIceCandidate + " | timestamp: " + event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
endpoint.getEndpoint().addMediaTranscodingStateChangeListener(event -> {
String msg = "KMS event [MediaTranscodingStateChange]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | state: " + event.getState().name() + " | mediaType: " + event.getMediaType()
+ " | binName: " + event.getBinName() + " | timestamp: " + event.getTimestampMillis();
KmsEvent kmsEvent = new KmsMediaEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
event.getMediaType(), endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
endpoint.getWebEndpoint().addIceComponentStateChangeListener(event -> {
// if (!event.getState().equals(IceComponentState.READY)) {
String msg = "KMS event [IceComponentStateChange]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | state: " + event.getState().name() + " | componentId: "
+ event.getComponentId() + " | streamId: " + event.getStreamId() + " | timestamp: "
+ event.getTimestampMillis();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
// }
});
endpoint.getWebEndpoint().addErrorListener(event -> {
String msg = "KMS event [ERROR]: -> endpoint: " + endpoint.getEndpointName() + " (" + typeOfEndpoint String msg = "KMS event [ERROR]: -> endpoint: " + endpoint.getEndpointName() + " (" + typeOfEndpoint
+ ") | errorCode: " + event.getErrorCode() + " | description: " + event.getDescription() + ") | errorCode: " + event.getErrorCode() + " | description: " + event.getDescription()
+ " | timestamp: " + event.getTimestampMillis(); + " | timestamp: " + event.getTimestampMillis();
@ -138,6 +211,18 @@ public class KurentoParticipantEndpointConfig {
this.infoHandler.sendInfo(msg); this.infoHandler.sendInfo(msg);
log.error(msg); log.error(msg);
}); });
finalEndpoint.addMediaTranscodingStateChangeListener(event -> {
String msg = "KMS event [MediaTranscodingStateChange]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | state: " + event.getState().name() + " | mediaType: " + event.getMediaType()
+ " | binName: " + event.getBinName() + " | timestamp: " + event.getTimestampMillis();
KmsEvent kmsEvent = new KmsMediaEvent(event, endpoint.getOwner(), endpoint.getEndpointName(),
event.getMediaType(), endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.log(kmsEvent);
this.infoHandler.sendInfo(msg);
log.info(msg);
});
} }
public CallDetailRecord getCdr() { public CallDetailRecord getCdr() {

View File

@ -36,6 +36,7 @@ import io.openvidu.java.client.OpenViduRole;
import io.openvidu.server.core.EndReason; import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.Participant; import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session; import io.openvidu.server.core.Session;
import io.openvidu.server.kurento.endpoint.EndpointType;
import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.Kms;
/** /**
@ -54,8 +55,6 @@ public class KurentoSession extends Session {
private KurentoSessionEventsHandler kurentoSessionHandler; private KurentoSessionEventsHandler kurentoSessionHandler;
private KurentoParticipantEndpointConfig kurentoEndpointConfig; private KurentoParticipantEndpointConfig kurentoEndpointConfig;
private final ConcurrentHashMap<String, String> filterStates = new ConcurrentHashMap<>();
private Object pipelineCreateLock = new Object(); private Object pipelineCreateLock = new Object();
private Object pipelineReleaseLock = new Object(); private Object pipelineReleaseLock = new Object();
@ -71,19 +70,14 @@ public class KurentoSession extends Session {
} }
@Override @Override
public void join(Participant participant) { public void join(Participant participant, EndpointType endpointType) {
checkClosed(); checkClosed();
createPipeline(); createPipeline();
KurentoParticipant kurentoParticipant = new KurentoParticipant(participant, this, this.kurentoEndpointConfig, KurentoParticipant kurentoParticipant = new KurentoParticipant(participant, this, endpointType,
this.openviduConfig, this.recordingManager); this.kurentoEndpointConfig, this.openviduConfig, this.recordingManager);
participants.put(participant.getParticipantPrivateId(), kurentoParticipant); participants.put(participant.getParticipantPrivateId(), kurentoParticipant);
filterStates.forEach((filterId, state) -> {
log.info("Adding filter {}", filterId);
kurentoSessionHandler.updateFilter(sessionId, participant, filterId, state);
});
log.info("SESSION {}: Added participant {}", sessionId, participant); log.info("SESSION {}: Added participant {}", sessionId, participant);
if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) { if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) {

View File

@ -17,12 +17,19 @@
package io.openvidu.server.kurento.core; package io.openvidu.server.kurento.core;
import java.io.IOException;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Set; import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.RandomStringUtils;
import org.kurento.client.GenericMediaElement; import org.kurento.client.GenericMediaElement;
import org.kurento.client.IceCandidate; import org.kurento.client.IceCandidate;
import org.kurento.client.ListenerSubscription; import org.kurento.client.ListenerSubscription;
@ -44,16 +51,20 @@ import io.openvidu.java.client.RecordingMode;
import io.openvidu.java.client.RecordingProperties; import io.openvidu.java.client.RecordingProperties;
import io.openvidu.java.client.SessionProperties; import io.openvidu.java.client.SessionProperties;
import io.openvidu.server.core.EndReason; import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.FinalUser;
import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant; import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session; import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionManager; import io.openvidu.server.core.SessionManager;
import io.openvidu.server.core.Token;
import io.openvidu.server.kurento.endpoint.EndpointType;
import io.openvidu.server.kurento.endpoint.KurentoFilter; import io.openvidu.server.kurento.endpoint.KurentoFilter;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.kurento.endpoint.SdpType; import io.openvidu.server.kurento.endpoint.SdpType;
import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.rpc.RpcHandler; import io.openvidu.server.rpc.RpcHandler;
import io.openvidu.server.utils.GeoLocation;
import io.openvidu.server.utils.JsonUtils; import io.openvidu.server.utils.JsonUtils;
public class KurentoSessionManager extends SessionManager { public class KurentoSessionManager extends SessionManager {
@ -111,7 +122,7 @@ public class KurentoSessionManager extends SessionManager {
} }
existingParticipants = getParticipants(sessionId); existingParticipants = getParticipants(sessionId);
kSession.join(participant); kSession.join(participant, EndpointType.WEBRTC_ENDPOINT);
} catch (OpenViduException e) { } catch (OpenViduException e) {
log.warn("PARTICIPANT {}: Error joining/creating session {}", participant.getParticipantPublicId(), log.warn("PARTICIPANT {}: Error joining/creating session {}", participant.getParticipantPublicId(),
sessionId, e); sessionId, e);
@ -252,9 +263,8 @@ public class KurentoSessionManager extends SessionManager {
log.debug( log.debug(
"Request [PUBLISH_MEDIA] isOffer={} sdp={} " "Request [PUBLISH_MEDIA] isOffer={} sdp={} "
+ "loopbackAltSrc={} lpbkConnType={} doLoopback={} mediaElements={} ({})", + "loopbackAltSrc={} lpbkConnType={} doLoopback={} rtspUri={} ({})",
kurentoOptions.isOffer, kurentoOptions.sdpOffer, kurentoOptions.loopbackAlternativeSrc, kurentoOptions.isOffer, kurentoOptions.sdpOffer, kurentoOptions.doLoopback, kurentoOptions.rtspUri,
kurentoOptions.loopbackConnectionType, kurentoOptions.doLoopback, kurentoOptions.mediaElements,
participant.getParticipantPublicId()); participant.getParticipantPublicId());
SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER; SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER;
@ -284,8 +294,7 @@ public class KurentoSessionManager extends SessionManager {
} }
} }
sdpAnswer = kParticipant.publishToRoom(sdpType, kurentoOptions.sdpOffer, kurentoOptions.doLoopback, sdpAnswer = kParticipant.publishToRoom(sdpType, kurentoOptions.sdpOffer, kurentoOptions.doLoopback);
kurentoOptions.loopbackAlternativeSrc, kurentoOptions.loopbackConnectionType);
if (sdpAnswer == null) { if (sdpAnswer == null) {
OpenViduException e = new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, OpenViduException e = new OpenViduException(Code.MEDIA_SDP_ERROR_CODE,
@ -590,8 +599,8 @@ public class KurentoSessionManager extends SessionManager {
boolean doLoopback = RpcHandler.getBooleanParam(request, ProtocolElements.PUBLISHVIDEO_DOLOOPBACK_PARAM); boolean doLoopback = RpcHandler.getBooleanParam(request, ProtocolElements.PUBLISHVIDEO_DOLOOPBACK_PARAM);
return new KurentoMediaOptions(true, sdpOffer, null, null, hasAudio, hasVideo, audioActive, videoActive, return new KurentoMediaOptions(true, sdpOffer, hasAudio, hasVideo, audioActive, videoActive, typeOfVideo,
typeOfVideo, frameRate, videoDimensions, kurentoFilter, doLoopback); frameRate, videoDimensions, kurentoFilter, doLoopback);
} }
@Override @Override
@ -838,6 +847,68 @@ public class KurentoSessionManager extends SessionManager {
} }
} }
@Override
public Participant publishIpcam(Session session, MediaOptions mediaOptions) throws Exception {
KurentoSession kSession = (KurentoSession) session;
KurentoMediaOptions kMediaOptions = (KurentoMediaOptions) mediaOptions;
// Generate the location for the IpCam
GeoLocation location = null;
URL url = null;
String protocol = null;
try {
Pattern pattern = Pattern.compile("^(file|rtsp)://");
Matcher matcher = pattern.matcher(kMediaOptions.rtspUri);
if (matcher.find()) {
protocol = matcher.group(0).replaceAll("://$", "");
} else {
throw new MalformedURLException();
}
String parsedUrl = kMediaOptions.rtspUri.replaceAll("^.*?://", "http://");
url = new URL(parsedUrl);
} catch (Exception e) {
throw new MalformedURLException();
}
try {
location = this.geoLocationByIp.getLocationByIp(InetAddress.getByName(url.getHost()));
} catch (IOException e) {
e.printStackTrace();
location = null;
} catch (Exception e) {
log.warn("Error getting address location: {}", e.getMessage());
location = null;
}
final String rtspConnectionId = kMediaOptions.getTypeOfVideo() + "-" + protocol + "-"
+ RandomStringUtils.randomAlphanumeric(4).toLowerCase() + "-" + url.getAuthority()
+ url.getPath().replaceAll("/", "-").replaceAll("_", "-");
// Store a "fake" participant for the IpCam connection
this.newInsecureParticipant(rtspConnectionId);
String token = RandomStringUtils.randomAlphanumeric(16).toLowerCase();
Token tokenObj = null;
if (this.isTokenValidInSession(token, session.getSessionId(), rtspConnectionId)) {
tokenObj = this.consumeToken(session.getSessionId(), rtspConnectionId, token);
}
Participant ipcamParticipant = this.newIpcamParticipant(session.getSessionId(), rtspConnectionId, tokenObj,
location, mediaOptions.getTypeOfVideo());
// Store a "fake" final user for the IpCam connection
final String finalUserId = rtspConnectionId;
this.sessionidFinalUsers.get(session.getSessionId()).computeIfAbsent(finalUserId, k -> {
return new FinalUser(finalUserId, session.getSessionId(), ipcamParticipant);
}).addConnectionIfAbsent(ipcamParticipant);
// Join the participant to the session
kSession.join(ipcamParticipant, EndpointType.PLAYER_ENDPOINT);
// Publish the IpCam stream into the session
KurentoParticipant kParticipant = (KurentoParticipant) this.getParticipant(rtspConnectionId);
this.publishVideo(kParticipant, mediaOptions, null);
return kParticipant;
}
@Override @Override
public String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) { public String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) {
Session session = this.getSession(sessionId); Session session = this.getSession(sessionId);

View File

@ -0,0 +1,7 @@
package io.openvidu.server.kurento.endpoint;
public enum EndpointType {
WEBRTC_ENDPOINT, PLAYER_ENDPOINT, RTP_ENDPOINT
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.kurento.client.Continuation; import org.kurento.client.Continuation;
import org.kurento.client.Endpoint;
import org.kurento.client.ErrorEvent; import org.kurento.client.ErrorEvent;
import org.kurento.client.EventListener; import org.kurento.client.EventListener;
import org.kurento.client.IceCandidate; import org.kurento.client.IceCandidate;
@ -32,6 +33,7 @@ import org.kurento.client.ListenerSubscription;
import org.kurento.client.MediaElement; import org.kurento.client.MediaElement;
import org.kurento.client.MediaPipeline; import org.kurento.client.MediaPipeline;
import org.kurento.client.OnIceCandidateEvent; import org.kurento.client.OnIceCandidateEvent;
import org.kurento.client.PlayerEndpoint;
import org.kurento.client.RtpEndpoint; import org.kurento.client.RtpEndpoint;
import org.kurento.client.SdpEndpoint; import org.kurento.client.SdpEndpoint;
import org.kurento.client.WebRtcEndpoint; import org.kurento.client.WebRtcEndpoint;
@ -46,12 +48,14 @@ import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.Participant; import io.openvidu.server.core.Participant;
import io.openvidu.server.kurento.core.KurentoMediaOptions;
import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.kurento.core.KurentoTokenOptions; import io.openvidu.server.kurento.core.KurentoTokenOptions;
/** /**
* {@link WebRtcEndpoint} wrapper that supports buffering of * {@link Endpoint} wrapper. Can be based on WebRtcEndpoint (that supports
* {@link IceCandidate}s until the {@link WebRtcEndpoint} is created. * buffering of {@link IceCandidate}s until the {@link WebRtcEndpoint} is
* created), PlayerEndpoint (to play RTSP or file streams) and RtpEndpoint.
* Connections to other peers are opened using the corresponding method of the * Connections to other peers are opened using the corresponding method of the
* internal endpoint. * internal endpoint.
* *
@ -61,10 +65,11 @@ public abstract class MediaEndpoint {
private static Logger log; private static Logger log;
private OpenviduConfig openviduConfig; private OpenviduConfig openviduConfig;
private boolean web = false; private EndpointType endpointType;
private WebRtcEndpoint webEndpoint = null; private WebRtcEndpoint webEndpoint = null;
private RtpEndpoint endpoint = null; private RtpEndpoint endpoint = null;
private PlayerEndpoint playerEndpoint = null;
private final int maxRecvKbps; private final int maxRecvKbps;
private final int minRecvKbps; private final int minRecvKbps;
@ -98,14 +103,14 @@ public abstract class MediaEndpoint {
* @param pipeline * @param pipeline
* @param log * @param log
*/ */
public MediaEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline, public MediaEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName,
OpenviduConfig openviduConfig, Logger log) { MediaPipeline pipeline, OpenviduConfig openviduConfig, Logger log) {
if (log == null) { if (log == null) {
MediaEndpoint.log = LoggerFactory.getLogger(MediaEndpoint.class); MediaEndpoint.log = LoggerFactory.getLogger(MediaEndpoint.class);
} else { } else {
MediaEndpoint.log = log; MediaEndpoint.log = log;
} }
this.web = web; this.endpointType = endpointType;
this.owner = owner; this.owner = owner;
this.setEndpointName(endpointName); this.setEndpointName(endpointName);
this.setMediaPipeline(pipeline); this.setMediaPipeline(pipeline);
@ -135,7 +140,11 @@ public abstract class MediaEndpoint {
} }
public boolean isWeb() { public boolean isWeb() {
return web; return EndpointType.WEBRTC_ENDPOINT.equals(this.endpointType);
}
public boolean isPlayerEndpoint() {
return EndpointType.PLAYER_ENDPOINT.equals(this.endpointType);
} }
/** /**
@ -148,9 +157,11 @@ public abstract class MediaEndpoint {
/** /**
* @return the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) * @return the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint})
*/ */
public SdpEndpoint getEndpoint() { public Endpoint getEndpoint() {
if (this.isWeb()) { if (this.isWeb()) {
return this.webEndpoint; return this.webEndpoint;
} else if (this.isPlayerEndpoint()) {
return this.playerEndpoint;
} else { } else {
return this.endpoint; return this.endpoint;
} }
@ -164,10 +175,14 @@ public abstract class MediaEndpoint {
return webEndpoint; return webEndpoint;
} }
protected RtpEndpoint getRtpEndpoint() { public RtpEndpoint getRtpEndpoint() {
return endpoint; return endpoint;
} }
public PlayerEndpoint getPlayerEndpoint() {
return playerEndpoint;
}
/** /**
* If this object doesn't have a {@link WebRtcEndpoint}, it is created in a * If this object doesn't have a {@link WebRtcEndpoint}, it is created in a
* thread-safe way using the internal {@link MediaPipeline}. Otherwise no * thread-safe way using the internal {@link MediaPipeline}. Otherwise no
@ -179,8 +194,8 @@ public abstract class MediaEndpoint {
* *
* @return the existing endpoint, if any * @return the existing endpoint, if any
*/ */
public synchronized SdpEndpoint createEndpoint(CountDownLatch endpointLatch) { public synchronized Endpoint createEndpoint(CountDownLatch endpointLatch) {
SdpEndpoint old = this.getEndpoint(); Endpoint old = this.getEndpoint();
if (old == null) { if (old == null) {
internalEndpointInitialization(endpointLatch); internalEndpointInitialization(endpointLatch);
} else { } else {
@ -271,6 +286,31 @@ public abstract class MediaEndpoint {
log.error("EP {}: Failed to create a new WebRtcEndpoint", endpointName, cause); log.error("EP {}: Failed to create a new WebRtcEndpoint", endpointName, cause);
} }
}); });
} else if (this.isPlayerEndpoint()) {
KurentoMediaOptions mediaOptions = (KurentoMediaOptions) this.owner.getPublisherMediaOptions();
PlayerEndpoint.Builder playerBuilder = new PlayerEndpoint.Builder(pipeline, mediaOptions.rtspUri);
if (!mediaOptions.adaptativeBitrate) {
playerBuilder = playerBuilder.useEncodedMedia();
}
playerBuilder.buildAsync(new Continuation<PlayerEndpoint>() {
@Override
public void onSuccess(PlayerEndpoint result) throws Exception {
playerEndpoint = result;
log.trace("EP {}: Created a new PlayerEndpoint", endpointName);
endpointSubscription = registerElemErrListener(playerEndpoint);
playerEndpoint.play();
endpointLatch.countDown();
}
@Override
public void onError(Throwable cause) throws Exception {
endpointLatch.countDown();
log.error("EP {}: Failed to create a new PlayerEndpoint", endpointName, cause);
}
});
} else { } else {
new RtpEndpoint.Builder(pipeline).buildAsync(new Continuation<RtpEndpoint>() { new RtpEndpoint.Builder(pipeline).buildAsync(new Continuation<RtpEndpoint>() {
@Override @Override
@ -353,6 +393,8 @@ public abstract class MediaEndpoint {
"Can't process offer when WebRtcEndpoint is null (ep: " + endpointName + ")"); "Can't process offer when WebRtcEndpoint is null (ep: " + endpointName + ")");
} }
return webEndpoint.processOffer(offer); return webEndpoint.processOffer(offer);
} else if (this.isPlayerEndpoint()) {
return "";
} else { } else {
if (endpoint == null) { if (endpoint == null) {
throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE, throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE,
@ -362,29 +404,6 @@ public abstract class MediaEndpoint {
} }
} }
/**
* Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint})
* to generate the offer String that can be used to initiate a connection.
*
* @see SdpEndpoint#generateOffer()
* @return the Sdp offer
*/
protected String generateOffer() throws OpenViduException {
if (this.isWeb()) {
if (webEndpoint == null) {
throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE,
"Can't generate offer when WebRtcEndpoint is null (ep: " + endpointName + ")");
}
return webEndpoint.generateOffer();
} else {
if (endpoint == null) {
throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE,
"Can't generate offer when RtpEndpoint is null (ep: " + endpointName + ")");
}
return endpoint.generateOffer();
}
}
/** /**
* Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) * Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint})
* to process the answer String. * to process the answer String.
@ -400,6 +419,8 @@ public abstract class MediaEndpoint {
"Can't process answer when WebRtcEndpoint is null (ep: " + endpointName + ")"); "Can't process answer when WebRtcEndpoint is null (ep: " + endpointName + ")");
} }
return webEndpoint.processAnswer(answer); return webEndpoint.processAnswer(answer);
} else if (this.isPlayerEndpoint()) {
return "";
} else { } else {
if (endpoint == null) { if (endpoint == null) {
throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE, throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE,
@ -489,8 +510,10 @@ public abstract class MediaEndpoint {
JsonObject json = new JsonObject(); JsonObject json = new JsonObject();
json.addProperty("createdAt", this.createdAt); json.addProperty("createdAt", this.createdAt);
json.addProperty("webrtcEndpointName", this.getEndpointName()); json.addProperty("webrtcEndpointName", this.getEndpointName());
json.addProperty("remoteSdp", this.getEndpoint().getRemoteSessionDescriptor()); if (!this.isPlayerEndpoint()) {
json.addProperty("localSdp", this.getEndpoint().getLocalSessionDescriptor()); json.addProperty("remoteSdp", ((SdpEndpoint) this.getEndpoint()).getRemoteSessionDescriptor());
json.addProperty("localSdp", ((SdpEndpoint) this.getEndpoint()).getLocalSessionDescriptor());
}
json.add("receivedCandidates", new GsonBuilder().create().toJsonTree(this.receivedCandidateList)); json.add("receivedCandidates", new GsonBuilder().create().toJsonTree(this.receivedCandidateList));
json.addProperty("localCandidate", this.selectedLocalIceCandidate); json.addProperty("localCandidate", this.selectedLocalIceCandidate);
json.addProperty("remoteCandidate", this.selectedRemoteIceCandidate); json.addProperty("remoteCandidate", this.selectedRemoteIceCandidate);

View File

@ -46,6 +46,7 @@ import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.kurento.core.KurentoMediaOptions;
import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.utils.JsonUtils; import io.openvidu.server.utils.JsonUtils;
@ -73,9 +74,9 @@ public class PublisherEndpoint extends MediaEndpoint {
private Map<String, ListenerSubscription> elementsErrorSubscriptions = new HashMap<String, ListenerSubscription>(); private Map<String, ListenerSubscription> elementsErrorSubscriptions = new HashMap<String, ListenerSubscription>();
public PublisherEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline, public PublisherEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName,
OpenviduConfig openviduConfig) { MediaPipeline pipeline, OpenviduConfig openviduConfig) {
super(web, owner, endpointName, pipeline, openviduConfig, log); super(endpointType, owner, endpointName, pipeline, openviduConfig, log);
} }
@Override @Override
@ -172,15 +173,10 @@ public class PublisherEndpoint extends MediaEndpoint {
* @return the SDP response (the answer if processing an offer SDP, otherwise is * @return the SDP response (the answer if processing an offer SDP, otherwise is
* the updated offer generated previously by this endpoint) * the updated offer generated previously by this endpoint)
*/ */
public synchronized String publish(SdpType sdpType, String sdpString, boolean doLoopback, public synchronized String publish(SdpType sdpType, String sdpString, boolean doLoopback) {
MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) {
registerOnIceCandidateEventListener(this.getOwner().getParticipantPublicId()); registerOnIceCandidateEventListener(this.getOwner().getParticipantPublicId());
if (doLoopback) { if (doLoopback) {
if (loopbackAlternativeSrc == null) { connect(this.getEndpoint(), null);
connect(this.getEndpoint(), loopbackConnectionType);
} else {
connectAltLoopbackSrc(loopbackAlternativeSrc, loopbackConnectionType);
}
} else { } else {
innerConnect(); innerConnect();
} }
@ -200,10 +196,6 @@ public class PublisherEndpoint extends MediaEndpoint {
return sdpResponse; return sdpResponse;
} }
public synchronized String preparePublishConnection() {
return generateOffer();
}
public synchronized void connect(MediaElement sink) { public synchronized void connect(MediaElement sink) {
if (!connected) { if (!connected) {
innerConnect(); innerConnect();
@ -415,13 +407,6 @@ public class PublisherEndpoint extends MediaEndpoint {
return elementIds.get(idx - 1); return elementIds.get(idx - 1);
} }
private void connectAltLoopbackSrc(MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) {
if (!connected) {
innerConnect();
}
internalSinkConnect(loopbackAlternativeSrc, this.getEndpoint(), loopbackConnectionType);
}
private void innerConnect() { private void innerConnect() {
if (this.getEndpoint() == null) { if (this.getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
@ -549,6 +534,9 @@ public class PublisherEndpoint extends MediaEndpoint {
public JsonObject toJson() { public JsonObject toJson() {
JsonObject json = super.toJson(); JsonObject json = super.toJson();
json.addProperty("streamId", this.getStreamId()); json.addProperty("streamId", this.getStreamId());
if (this.isPlayerEndpoint()) {
json.addProperty("rtspUri", ((KurentoMediaOptions) this.mediaOptions).rtspUri);
}
json.add("mediaOptions", this.mediaOptions.toJson()); json.add("mediaOptions", this.mediaOptions.toJson());
return json; return json;
} }

View File

@ -42,9 +42,9 @@ public class SubscriberEndpoint extends MediaEndpoint {
private String publisherStreamId; private String publisherStreamId;
public SubscriberEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline, public SubscriberEndpoint(EndpointType endpointType, KurentoParticipant owner, String endpointName,
OpenviduConfig openviduConfig) { MediaPipeline pipeline, OpenviduConfig openviduConfig) {
super(web, owner, endpointName, pipeline, openviduConfig, log); super(endpointType, owner, endpointName, pipeline, openviduConfig, log);
} }
public synchronized String subscribe(String sdpOffer, PublisherEndpoint publisher) { public synchronized String subscribe(String sdpOffer, PublisherEndpoint publisher) {

View File

@ -17,6 +17,7 @@
package io.openvidu.server.rest; package io.openvidu.server.rest;
import java.net.MalformedURLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
@ -57,6 +58,7 @@ import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.Participant; import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session; import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionManager; import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.core.KurentoMediaOptions;
import io.openvidu.server.kurento.core.KurentoTokenOptions; import io.openvidu.server.kurento.core.KurentoTokenOptions;
import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.recording.service.RecordingManager;
@ -217,14 +219,14 @@ public class SessionRestController {
if (session != null) { if (session != null) {
this.sessionManager.closeSession(sessionId, EndReason.sessionClosedByServer); this.sessionManager.closeSession(sessionId, EndReason.sessionClosedByServer);
return new ResponseEntity<>(HttpStatus.NO_CONTENT); return new ResponseEntity<>(HttpStatus.NO_CONTENT);
}
Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId);
if (sessionNotActive != null) {
this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
} else { } else {
Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId); return new ResponseEntity<>(HttpStatus.NOT_FOUND);
if (sessionNotActive != null) {
this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
} }
} }
@ -261,11 +263,21 @@ public class SessionRestController {
session = this.sessionManager.getSession(sessionId); session = this.sessionManager.getSession(sessionId);
if (session != null) { if (session != null) {
if (this.sessionManager.unpublishStream(session, streamId, null, null, EndReason.forceUnpublishByServer)) {
return new ResponseEntity<>(HttpStatus.NO_CONTENT); final String participantPrivateId = this.sessionManager.getParticipantPrivateIdFromStreamId(sessionId,
} else { streamId);
if (participantPrivateId == null) {
return new ResponseEntity<>(HttpStatus.NOT_FOUND); return new ResponseEntity<>(HttpStatus.NOT_FOUND);
} }
Participant participant = this.sessionManager.getParticipant(participantPrivateId);
if (participant.isIpcam()) {
return new ResponseEntity<>(HttpStatus.METHOD_NOT_ALLOWED);
}
this.sessionManager.unpublishStream(session, streamId, null, null, EndReason.forceUnpublishByServer);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
} else { } else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND); return new ResponseEntity<>(HttpStatus.NOT_FOUND);
} }
@ -652,6 +664,62 @@ public class SessionRestController {
return new ResponseEntity<>(HttpStatus.OK); return new ResponseEntity<>(HttpStatus.OK);
} }
@RequestMapping(value = "/sessions/{sessionId}/connection", method = RequestMethod.POST)
public ResponseEntity<?> publishIpcam(@PathVariable("sessionId") String sessionId, @RequestBody Map<?, ?> params) {
if (params == null) {
return this.generateErrorResponse("Error in body parameters. Cannot be empty", "/api/rtsp",
HttpStatus.BAD_REQUEST);
}
log.info("REST API: POST /api/sessions/{}/connection {}", sessionId, params.toString());
Session session = this.sessionManager.getSession(sessionId);
if (session == null) {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
String type;
String rtspUri;
Boolean adaptativeBitrate;
try {
type = (String) params.get("type");
rtspUri = (String) params.get("rtspUri");
adaptativeBitrate = (Boolean) params.get("adaptativeBitrate");
} catch (ClassCastException e) {
return this.generateErrorResponse("Type error in some parameter",
"/api/sessions/" + sessionId + "/connection", HttpStatus.BAD_REQUEST);
}
if (rtspUri == null) {
return this.generateErrorResponse("\"rtspUri\" parameter is mandatory",
"/api/sessions/" + sessionId + "/connection", HttpStatus.BAD_REQUEST);
}
type = type != null ? type : "IPCAM";
adaptativeBitrate = adaptativeBitrate != null ? adaptativeBitrate : true;
boolean hasAudio = true;
boolean hasVideo = true;
boolean audioActive = true;
boolean videoActive = true;
String typeOfVideo = type;
Integer frameRate = null;
String videoDimensions = null;
KurentoMediaOptions mediaOptions = new KurentoMediaOptions(true, null, hasAudio, hasVideo, audioActive,
videoActive, typeOfVideo, frameRate, videoDimensions, null, false, rtspUri, adaptativeBitrate);
try {
Participant ipcamParticipant = this.sessionManager.publishIpcam(session, mediaOptions);
return new ResponseEntity<>(ipcamParticipant.toJson().toString(), getResponseHeaders(), HttpStatus.OK);
} catch (MalformedURLException e) {
return this.generateErrorResponse("\"rtspUri\" parameter is not a valid rtsp uri",
"/api/sessions/" + sessionId + "/connection", HttpStatus.BAD_REQUEST);
} catch (Exception e) {
return this.generateErrorResponse(e.getMessage(), "/api/sessions/" + sessionId + "/connection",
HttpStatus.INTERNAL_SERVER_ERROR);
}
}
private ResponseEntity<String> generateErrorResponse(String errorMessage, String path, HttpStatus status) { private ResponseEntity<String> generateErrorResponse(String errorMessage, String path, HttpStatus status) {
JsonObject responseJson = new JsonObject(); JsonObject responseJson = new JsonObject();
responseJson.addProperty("timestamp", System.currentTimeMillis()); responseJson.addProperty("timestamp", System.currentTimeMillis());

View File

@ -58,8 +58,10 @@ public class RpcNotificationService {
public void sendResponse(String participantPrivateId, Integer transactionId, Object result) { public void sendResponse(String participantPrivateId, Integer transactionId, Object result) {
Transaction t = getAndRemoveTransaction(participantPrivateId, transactionId); Transaction t = getAndRemoveTransaction(participantPrivateId, transactionId);
if (t == null) { if (t == null) {
log.error("No transaction {} found for paticipant with private id {}, unable to send result {}", if (!isIpcamParticipant(participantPrivateId)) {
transactionId, participantPrivateId, result); log.error("No transaction {} found for paticipant with private id {}, unable to send result {}",
transactionId, participantPrivateId, result);
}
return; return;
} }
try { try {
@ -73,8 +75,10 @@ public class RpcNotificationService {
OpenViduException error) { OpenViduException error) {
Transaction t = getAndRemoveTransaction(participantPrivateId, transactionId); Transaction t = getAndRemoveTransaction(participantPrivateId, transactionId);
if (t == null) { if (t == null) {
log.error("No transaction {} found for paticipant with private id {}, unable to send result {}", if (!isIpcamParticipant(participantPrivateId)) {
transactionId, participantPrivateId, data); log.error("No transaction {} found for paticipant with private id {}, unable to send result {}",
transactionId, participantPrivateId, data);
}
return; return;
} }
try { try {
@ -88,8 +92,10 @@ public class RpcNotificationService {
public void sendNotification(final String participantPrivateId, final String method, final Object params) { public void sendNotification(final String participantPrivateId, final String method, final Object params) {
RpcConnection rpcSession = rpcConnections.get(participantPrivateId); RpcConnection rpcSession = rpcConnections.get(participantPrivateId);
if (rpcSession == null || rpcSession.getSession() == null) { if (rpcSession == null || rpcSession.getSession() == null) {
log.error("No rpc session found for private id {}, unable to send notification {}: {}", if (!isIpcamParticipant(participantPrivateId)) {
participantPrivateId, method, params); log.error("No rpc session found for private id {}, unable to send notification {}: {}",
participantPrivateId, method, params);
}
return; return;
} }
Session s = rpcSession.getSession(); Session s = rpcSession.getSession();
@ -105,7 +111,9 @@ public class RpcNotificationService {
public RpcConnection closeRpcSession(String participantPrivateId) { public RpcConnection closeRpcSession(String participantPrivateId) {
RpcConnection rpcSession = rpcConnections.remove(participantPrivateId); RpcConnection rpcSession = rpcConnections.remove(participantPrivateId);
if (rpcSession == null || rpcSession.getSession() == null) { if (rpcSession == null || rpcSession.getSession() == null) {
log.error("No session found for private id {}, unable to cleanup", participantPrivateId); if (!isIpcamParticipant(participantPrivateId)) {
log.error("No session found for private id {}, unable to cleanup", participantPrivateId);
}
return null; return null;
} }
Session s = rpcSession.getSession(); Session s = rpcSession.getSession();
@ -123,7 +131,9 @@ public class RpcNotificationService {
private Transaction getAndRemoveTransaction(String participantPrivateId, Integer transactionId) { private Transaction getAndRemoveTransaction(String participantPrivateId, Integer transactionId) {
RpcConnection rpcSession = rpcConnections.get(participantPrivateId); RpcConnection rpcSession = rpcConnections.get(participantPrivateId);
if (rpcSession == null) { if (rpcSession == null) {
log.warn("Invalid WebSocket session id {}", participantPrivateId); if (!isIpcamParticipant(participantPrivateId)) {
log.warn("Invalid WebSocket session id {}", participantPrivateId);
}
return null; return null;
} }
log.trace("#{} - {} transactions", participantPrivateId, rpcSession.getTransactions().size()); log.trace("#{} - {} transactions", participantPrivateId, rpcSession.getTransactions().size());
@ -140,4 +150,8 @@ public class RpcNotificationService {
return this.rpcConnections.get(participantPrivateId); return this.rpcConnections.get(participantPrivateId);
} }
private boolean isIpcamParticipant(String participantPrivateId) {
return participantPrivateId.startsWith("IPCAM-");
}
} }