openvidu-server: store KMS events

pull/255/head
pabloFuente 2019-02-26 14:33:26 +01:00
parent 9f2e8130de
commit 8ba4bf0b09
13 changed files with 176 additions and 126 deletions

View File

@ -877,7 +877,7 @@ export class Session implements EventDispatcher {
return { candidate: msg.candidate }; return { candidate: msg.candidate };
} }
}; };
this.getConnection(msg.endpointName, 'Connection not found for endpoint ' + msg.endpointName + '. Ice candidate will be ignored: ' + candidate) this.getConnection(msg.senderConnectionId, 'Connection not found for connectionId ' + msg.senderConnectionId + ' owning endpoint ' + msg.endpointName + '. Ice candidate will be ignored: ' + candidate)
.then(connection => { .then(connection => {
const stream = connection.stream; const stream = connection.stream;
stream.getWebRtcPeer().addIceCandidate(candidate).catch(error => { stream.getWebRtcPeer().addIceCandidate(candidate).catch(error => {

View File

@ -163,6 +163,7 @@ public class ProtocolElements {
public static final String MEDIAERROR_ERROR_PARAM = "error"; public static final String MEDIAERROR_ERROR_PARAM = "error";
public static final String ICECANDIDATE_METHOD = "iceCandidate"; public static final String ICECANDIDATE_METHOD = "iceCandidate";
public static final String ICECANDIDATE_SENDERCONNECTIONID_PARAM = "senderConnectionId";
public static final String ICECANDIDATE_EPNAME_PARAM = "endpointName"; public static final String ICECANDIDATE_EPNAME_PARAM = "endpointName";
public static final String ICECANDIDATE_CANDIDATE_PARAM = "candidate"; public static final String ICECANDIDATE_CANDIDATE_PARAM = "candidate";
public static final String ICECANDIDATE_SDPMID_PARAM = "sdpMid"; public static final String ICECANDIDATE_SDPMID_PARAM = "sdpMid";

View File

@ -17,10 +17,14 @@
package io.openvidu.server.cdr; package io.openvidu.server.cdr;
import io.openvidu.server.kurento.endpoint.KmsEvent;
public interface CDRLogger { public interface CDRLogger {
public void log(CDREvent event); public void log(CDREvent event);
public void log(KmsEvent event);
public boolean canBeDisabled(); public boolean canBeDisabled();
} }

View File

@ -20,6 +20,8 @@ package io.openvidu.server.cdr;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.openvidu.server.kurento.endpoint.KmsEvent;
public class CDRLoggerFile implements CDRLogger { public class CDRLoggerFile implements CDRLogger {
private Logger log = LoggerFactory.getLogger(CDRLoggerFile.class); private Logger log = LoggerFactory.getLogger(CDRLoggerFile.class);
@ -29,6 +31,10 @@ public class CDRLoggerFile implements CDRLogger {
log.info("{}", event); log.info("{}", event);
} }
@Override
public void log(KmsEvent event) {
}
@Override @Override
public boolean canBeDisabled() { public boolean canBeDisabled() {
return true; return true;

View File

@ -30,6 +30,7 @@ import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant; import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session; import io.openvidu.server.core.Session;
import io.openvidu.server.kurento.endpoint.KmsEvent;
import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.recording.service.RecordingManager;
@ -180,6 +181,10 @@ public class CallDetailRecord {
this.log(new CDREventRecording(recordingStartedEvent, RecordingManager.finalReason(reason))); this.log(new CDREventRecording(recordingStartedEvent, RecordingManager.finalReason(reason)));
} }
public void recordKmsEvent(KmsEvent event) {
this.log(event);
}
private void log(CDREvent event) { private void log(CDREvent event) {
this.loggers.forEach(logger -> { this.loggers.forEach(logger -> {
if (openviduConfig.isCdrEnabled() || !logger.canBeDisabled()) { if (openviduConfig.isCdrEnabled() || !logger.canBeDisabled()) {
@ -188,4 +193,10 @@ public class CallDetailRecord {
}); });
} }
private void log(KmsEvent event) {
this.loggers.forEach(logger -> {
logger.log(event);
});
}
} }

View File

@ -28,7 +28,6 @@ import org.kurento.client.Continuation;
import org.kurento.client.ErrorEvent; import org.kurento.client.ErrorEvent;
import org.kurento.client.Filter; import org.kurento.client.Filter;
import org.kurento.client.IceCandidate; import org.kurento.client.IceCandidate;
import org.kurento.client.IceComponentState;
import org.kurento.client.MediaElement; import org.kurento.client.MediaElement;
import org.kurento.client.MediaPipeline; import org.kurento.client.MediaPipeline;
import org.kurento.client.MediaType; import org.kurento.client.MediaType;
@ -110,8 +109,12 @@ public class KurentoParticipant extends Participant {
String publisherStreamId = this.getParticipantPublicId() + "_" String publisherStreamId = this.getParticipantPublicId() + "_"
+ (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_" + (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_"
+ RandomStringUtils.random(5, true, false).toUpperCase(); + RandomStringUtils.random(5, true, false).toUpperCase();
this.publisher.setEndpointName(publisherStreamId);
this.publisher.getEndpoint().setName(publisherStreamId);
this.publisher.setStreamId(publisherStreamId); this.publisher.setStreamId(publisherStreamId);
addEndpointListeners(this.publisher);
addEndpointListeners(this.publisher, "publisher");
// Remove streamId from publisher's map // Remove streamId from publisher's map
this.session.publishedStreamIds.putIfAbsent(this.getPublisherStreamId(), this.getParticipantPrivateId()); this.session.publishedStreamIds.putIfAbsent(this.getPublisherStreamId(), this.getParticipantPrivateId());
@ -246,11 +249,13 @@ public class KurentoParticipant extends Participant {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint"); throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint");
} }
String subscriberStreamId = this.getParticipantPublicId() + "_" + kSender.getPublisherStreamId(); String subscriberEndpointName = this.getParticipantPublicId() + "_" + kSender.getPublisherStreamId();
subscriber.getEndpoint().setName(subscriberStreamId); subscriber.setEndpointName(subscriberEndpointName);
subscriber.getEndpoint().setName(subscriberEndpointName);
subscriber.setStreamId(kSender.getPublisherStreamId());
addEndpointListeners(subscriber); addEndpointListeners(subscriber, "subscriber");
} catch (OpenViduException e) { } catch (OpenViduException e) {
this.subscribers.remove(senderName); this.subscribers.remove(senderName);
@ -329,13 +334,12 @@ public class KurentoParticipant extends Participant {
* @return the endpoint instance * @return the endpoint instance
*/ */
public SubscriberEndpoint getNewOrExistingSubscriber(String senderPublicId) { public SubscriberEndpoint getNewOrExistingSubscriber(String senderPublicId) {
SubscriberEndpoint subscriberEndpoint = new SubscriberEndpoint(webParticipant, this, senderPublicId,
SubscriberEndpoint sendingEndpoint = new SubscriberEndpoint(webParticipant, this, senderPublicId,
this.getPipeline(), this.openviduConfig); this.getPipeline(), this.openviduConfig);
SubscriberEndpoint existingSendingEndpoint = this.subscribers.putIfAbsent(senderPublicId, sendingEndpoint); SubscriberEndpoint existingSendingEndpoint = this.subscribers.putIfAbsent(senderPublicId, subscriberEndpoint);
if (existingSendingEndpoint != null) { if (existingSendingEndpoint != null) {
sendingEndpoint = existingSendingEndpoint; subscriberEndpoint = existingSendingEndpoint;
log.trace("PARTICIPANT {}: Already exists a subscriber endpoint to user {}", this.getParticipantPublicId(), log.trace("PARTICIPANT {}: Already exists a subscriber endpoint to user {}", this.getParticipantPublicId(),
senderPublicId); senderPublicId);
} else { } else {
@ -343,7 +347,7 @@ public class KurentoParticipant extends Participant {
senderPublicId); senderPublicId);
} }
return sendingEndpoint; return subscriberEndpoint;
} }
public void addIceCandidate(String endpointName, IceCandidate iceCandidate) { public void addIceCandidate(String endpointName, IceCandidate iceCandidate) {
@ -354,8 +358,8 @@ public class KurentoParticipant extends Participant {
} }
} }
public void sendIceCandidate(String endpointName, IceCandidate candidate) { public void sendIceCandidate(String senderPublicId, String endpointName, IceCandidate candidate) {
session.sendIceCandidate(this.getParticipantPrivateId(), endpointName, candidate); session.sendIceCandidate(this.getParticipantPrivateId(), senderPublicId, endpointName, candidate);
} }
public void sendMediaError(ErrorEvent event) { public void sendMediaError(ErrorEvent event) {
@ -429,104 +433,103 @@ public class KurentoParticipant extends Participant {
} }
} }
private void addEndpointListeners(MediaEndpoint endpoint) { private void addEndpointListeners(MediaEndpoint endpoint, String typeOfEndpoint) {
/*
* endpoint.getWebEndpoint().addElementConnectedListener((element) -> { String
* msg = " Element connected (" +
* endpoint.getEndpoint().getName() + ") -> " + "SINK: " +
* element.getSink().getName() + " | SOURCE: " + element.getSource().getName() +
* " | MEDIATYPE: " + element.getMediaType(); System.out.println(msg);
* this.infoHandler.sendInfo(msg); });
*/
/*
* endpoint.getWebEndpoint().addElementDisconnectedListener((event) -> { String
* msg = " Element disconnected (" +
* endpoint.getEndpoint().getName() + ") -> " + "SINK: " +
* event.getSinkMediaDescription() + " | SOURCE: " +
* event.getSourceMediaDescription() + " | MEDIATYPE: " + event.getMediaType();
* System.out.println(msg); this.infoHandler.sendInfo(msg); });
*/
/*
* endpoint.getWebEndpoint().addErrorListener((event) -> { String msg =
* " Error (PUBLISHER) -> " + "ERRORCODE: " +
* event.getErrorCode() + " | DESCRIPTION: " + event.getDescription() +
* " | TIMESTAMP: " + System.currentTimeMillis(); log.debug(msg);
* this.infoHandler.sendInfo(msg); });
*
* endpoint.getWebEndpoint().addMediaSessionStartedListener((event) -> { String
* msg = " Media session started (" +
* endpoint.getEndpoint().getName() + ") | TIMESTAMP: " +
* System.currentTimeMillis(); log.debug(msg); this.infoHandler.sendInfo(msg);
* });
*
* endpoint.getWebEndpoint().addMediaSessionTerminatedListener((event) -> {
* String msg = " Media session terminated (" +
* endpoint.getEndpoint().getName() + ") | TIMESTAMP: " +
* System.currentTimeMillis(); log.debug(msg); this.infoHandler.sendInfo(msg);
* });
*
* endpoint.getWebEndpoint().addMediaStateChangedListener((event) -> { String
* msg = " Media state changed (" +
* endpoint.getEndpoint().getName() + ") from " + event.getOldState() + " to " +
* event.getNewState(); log.debug(msg); this.infoHandler.sendInfo(msg); });
*
* endpoint.getWebEndpoint().addIceCandidateFoundListener((event) -> { String
* msg = " ICE CANDIDATE FOUND (" +
* endpoint.getEndpoint().getName() + "): CANDIDATE: " +
* event.getCandidate().getCandidate() + " | TIMESTAMP: " +
* System.currentTimeMillis(); log.debug(msg); this.infoHandler.sendInfo(msg);
* });
*/
endpoint.getWebEndpoint().addMediaFlowInStateChangeListener(event -> { endpoint.getWebEndpoint().addMediaFlowInStateChangeListener(event -> {
String msg1 = "Media flow in state change (" + endpoint.getEndpoint().getName() + ") -> " + "STATE: " String msg = "KMS event [MediaFlowInStateChange] -> endpoint: " + endpoint.getEndpointName() + " ("
+ event.getState() + " | SOURCE: " + event.getSource().getName() + " | PAD: " + event.getPadName() + typeOfEndpoint + ") | state: " + event.getState() + " | pad: " + event.getPadName()
+ " | MEDIATYPE: " + event.getMediaType() + " | TIMESTAMP: " + System.currentTimeMillis(); + " | mediaType: " + event.getMediaType() + " | timestamp: " + event.getTimestamp();
endpoint.kmsEvents.add(new KmsMediaEvent(event, event.getMediaType(), endpoint.createdAt())); KmsEvent kmsEvent = new KmsMediaEvent(event, endpoint.getEndpointName(), event.getMediaType(),
log.info(msg1); endpoint.createdAt());
this.infoHandler.sendInfo(msg1); endpoint.kmsEvents.add(kmsEvent);
this.CDR.recordKmsEvent(kmsEvent);
this.infoHandler.sendInfo(msg);
log.warn(msg);
}); });
endpoint.getWebEndpoint().addMediaFlowOutStateChangeListener(event -> { endpoint.getWebEndpoint().addMediaFlowOutStateChangeListener(event -> {
String msg1 = "Media flow out state change (" + endpoint.getEndpoint().getName() + ") -> " + "STATE: " String msg = "KMS event [MediaFlowOutStateChange] -> endpoint: " + endpoint.getEndpointName() + " ("
+ event.getState() + " | SOURCE: " + event.getSource().getName() + " | PAD: " + event.getPadName() + typeOfEndpoint + ") | state: " + event.getState() + " | pad: " + event.getPadName()
+ " | MEDIATYPE: " + event.getMediaType() + " | TIMESTAMP: " + System.currentTimeMillis(); + " | mediaType: " + event.getMediaType() + " | timestamp: " + event.getTimestamp();
endpoint.kmsEvents.add(new KmsMediaEvent(event, event.getMediaType(), endpoint.createdAt())); KmsEvent kmsEvent = new KmsMediaEvent(event, endpoint.getEndpointName(), event.getMediaType(),
log.info(msg1); endpoint.createdAt());
this.infoHandler.sendInfo(msg1); endpoint.kmsEvents.add(kmsEvent);
this.CDR.recordKmsEvent(kmsEvent);
this.infoHandler.sendInfo(msg);
log.warn(msg);
}); });
endpoint.getWebEndpoint().addIceGatheringDoneListener(event -> { endpoint.getWebEndpoint().addIceGatheringDoneListener(event -> {
endpoint.kmsEvents.add(new KmsEvent(event, endpoint.createdAt())); String msg = "KMS event [IceGatheringDone] -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | timestamp: " + event.getTimestamp();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getEndpointName(), endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.recordKmsEvent(kmsEvent);
this.infoHandler.sendInfo(msg);
log.warn(msg);
}); });
endpoint.getWebEndpoint().addConnectionStateChangedListener(event -> { endpoint.getWebEndpoint().addConnectionStateChangedListener(event -> {
endpoint.kmsEvents.add(new KmsEvent(event, endpoint.createdAt())); String msg = "KMS event [ConnectionStateChanged]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | oldState: " + event.getOldState() + " | newState: " + event.getNewState()
+ " | timestamp: " + event.getTimestamp();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getEndpointName(), endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.recordKmsEvent(kmsEvent);
this.infoHandler.sendInfo(msg);
log.warn(msg);
}); });
endpoint.getWebEndpoint().addNewCandidatePairSelectedListener(event -> { endpoint.getWebEndpoint().addNewCandidatePairSelectedListener(event -> {
endpoint.selectedLocalIceCandidate = event.getCandidatePair().getLocalCandidate(); endpoint.selectedLocalIceCandidate = event.getCandidatePair().getLocalCandidate();
endpoint.selectedRemoteIceCandidate = event.getCandidatePair().getRemoteCandidate(); endpoint.selectedRemoteIceCandidate = event.getCandidatePair().getRemoteCandidate();
endpoint.kmsEvents.add(new KmsEvent(event, endpoint.createdAt())); String msg = "KMS event [NewCandidatePairSelected]: -> endpoint: " + endpoint.getEndpointName() + " ("
String msg = "ICE CANDIDATE SELECTED (" + endpoint.getEndpoint().getName() + "): LOCAL CANDIDATE: " + typeOfEndpoint + ") | local: " + endpoint.selectedLocalIceCandidate + " | remote: "
+ endpoint.selectedLocalIceCandidate + " | REMOTE CANDIDATE: " + endpoint.selectedRemoteIceCandidate + endpoint.selectedRemoteIceCandidate + " | timestamp: " + event.getTimestamp();
+ " | TIMESTAMP: " + System.currentTimeMillis(); KmsEvent kmsEvent = new KmsEvent(event, endpoint.getEndpointName(), endpoint.createdAt());
log.warn(msg); endpoint.kmsEvents.add(kmsEvent);
this.CDR.recordKmsEvent(kmsEvent);
this.infoHandler.sendInfo(msg); this.infoHandler.sendInfo(msg);
log.warn(msg);
}); });
endpoint.getEndpoint().addMediaTranscodingStateChangeListener(event -> { endpoint.getEndpoint().addMediaTranscodingStateChangeListener(event -> {
endpoint.kmsEvents.add(new KmsMediaEvent(event, event.getMediaType(), endpoint.createdAt())); String msg = "KMS event [MediaTranscodingStateChange]: -> endpoint: " + endpoint.getEndpointName() + " ("
+ typeOfEndpoint + ") | state: " + event.getState().name() + " | mediaType: " + event.getMediaType()
+ " | binName: " + event.getBinName() + " | timestamp: " + event.getTimestamp();
KmsEvent kmsEvent = new KmsMediaEvent(event, endpoint.getEndpointName(), event.getMediaType(),
endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.recordKmsEvent(kmsEvent);
this.infoHandler.sendInfo(msg);
log.warn(msg);
}); });
endpoint.getWebEndpoint().addIceComponentStateChangeListener(event -> { endpoint.getWebEndpoint().addIceComponentStateChangeListener(event -> {
if (!event.getState().equals(IceComponentState.READY)) { // if (!event.getState().equals(IceComponentState.READY)) {
endpoint.kmsEvents.add(new KmsEvent(event, endpoint.createdAt())); String msg = "KMS event [IceComponentStateChange]: -> endpoint: " + endpoint.getEndpointName() + " ("
} + typeOfEndpoint + ") | state: " + event.getState().name() + " | componentId: "
+ event.getComponentId() + " | streamId: " + event.getStreamId() + " | timestamp: "
+ event.getTimestamp();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getEndpointName(), endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.recordKmsEvent(kmsEvent);
this.infoHandler.sendInfo(msg);
log.warn(msg);
// }
}); });
endpoint.getWebEndpoint().addErrorListener(event -> {
String msg = "KMS event [ERROR]: -> endpoint: " + endpoint.getEndpointName() + " (" + typeOfEndpoint
+ ") | errorCode: " + event.getErrorCode() + " | description: " + event.getDescription()
+ " | timestamp: " + event.getTimestamp();
KmsEvent kmsEvent = new KmsEvent(event, endpoint.getEndpointName(), endpoint.createdAt());
endpoint.kmsEvents.add(kmsEvent);
this.CDR.recordKmsEvent(kmsEvent);
this.infoHandler.sendInfo(msg);
log.error(msg);
});
} }
public MediaPipeline getPipeline() { public MediaPipeline getPipeline() {

View File

@ -169,8 +169,10 @@ public class KurentoSession extends Session {
} }
} }
public void sendIceCandidate(String participantId, String endpointName, IceCandidate candidate) { public void sendIceCandidate(String participantPrivateId, String senderPublicId, String endpointName,
this.kurentoSessionHandler.onIceCandidate(sessionId, participantId, endpointName, candidate); IceCandidate candidate) {
this.kurentoSessionHandler.onIceCandidate(sessionId, participantPrivateId, senderPublicId, endpointName,
candidate);
} }
public void sendMediaError(String participantId, String description) { public void sendMediaError(String participantId, String description) {

View File

@ -32,13 +32,16 @@ public class KurentoSessionEventsHandler extends SessionEventsHandler {
public KurentoSessionEventsHandler() { public KurentoSessionEventsHandler() {
} }
public void onIceCandidate(String roomName, String participantId, String endpointName, IceCandidate candidate) { public void onIceCandidate(String roomName, String participantPrivateId, String senderPublicId, String endpointName,
IceCandidate candidate) {
JsonObject params = new JsonObject(); JsonObject params = new JsonObject();
params.addProperty(ProtocolElements.ICECANDIDATE_SENDERCONNECTIONID_PARAM, senderPublicId);
params.addProperty(ProtocolElements.ICECANDIDATE_EPNAME_PARAM, endpointName); params.addProperty(ProtocolElements.ICECANDIDATE_EPNAME_PARAM, endpointName);
params.addProperty(ProtocolElements.ICECANDIDATE_SDPMLINEINDEX_PARAM, candidate.getSdpMLineIndex()); params.addProperty(ProtocolElements.ICECANDIDATE_SDPMLINEINDEX_PARAM, candidate.getSdpMLineIndex());
params.addProperty(ProtocolElements.ICECANDIDATE_SDPMID_PARAM, candidate.getSdpMid()); params.addProperty(ProtocolElements.ICECANDIDATE_SDPMID_PARAM, candidate.getSdpMid());
params.addProperty(ProtocolElements.ICECANDIDATE_CANDIDATE_PARAM, candidate.getCandidate()); params.addProperty(ProtocolElements.ICECANDIDATE_CANDIDATE_PARAM, candidate.getCandidate());
rpcNotificationService.sendNotification(participantId, ProtocolElements.ICECANDIDATE_METHOD, params); rpcNotificationService.sendNotification(participantPrivateId, ProtocolElements.ICECANDIDATE_METHOD, params);
} }
public void onPipelineError(String roomName, Set<Participant> participants, String description) { public void onPipelineError(String roomName, Set<Participant> participants, String description) {

View File

@ -17,20 +17,43 @@
package io.openvidu.server.kurento.endpoint; package io.openvidu.server.kurento.endpoint;
import org.kurento.client.ErrorEvent;
import org.kurento.client.MediaEvent; import org.kurento.client.MediaEvent;
import org.kurento.client.RaiseBaseEvent;
import com.google.gson.JsonObject;
public class KmsEvent { public class KmsEvent {
long timestamp; long timestamp;
long msSinceCreation; long msSinceCreation;
String endpoint; String endpoint;
MediaEvent event; RaiseBaseEvent event;
public KmsEvent(MediaEvent event, long createdAt) { public KmsEvent(RaiseBaseEvent event, String endpointName, long createdAt) {
this.event = event; this.event = event;
this.endpoint = event.getSource().getName(); this.endpoint = endpointName;
this.event.setSource(null);
this.timestamp = System.currentTimeMillis(); this.timestamp = System.currentTimeMillis();
this.msSinceCreation = this.timestamp - createdAt; this.msSinceCreation = this.timestamp - createdAt;
} }
public JsonObject toJson() {
JsonObject json = new JsonObject();
if (event instanceof ErrorEvent) {
ErrorEvent errorEvent = (ErrorEvent) event;
json.addProperty("eventType", errorEvent.getType());
json.addProperty("errorCode", errorEvent.getErrorCode());
json.addProperty("description", errorEvent.getDescription());
} else {
MediaEvent mediaEvent = (MediaEvent) event;
json.addProperty("eventType", mediaEvent.getType());
}
json.addProperty("timestamp", timestamp);
json.addProperty("msSinceEndpointCreation", msSinceCreation);
json.addProperty("endpoint", this.endpoint);
return json;
}
} }

View File

@ -20,13 +20,22 @@ package io.openvidu.server.kurento.endpoint;
import org.kurento.client.MediaEvent; import org.kurento.client.MediaEvent;
import org.kurento.client.MediaType; import org.kurento.client.MediaType;
import com.google.gson.JsonObject;
public class KmsMediaEvent extends KmsEvent { public class KmsMediaEvent extends KmsEvent {
MediaType mediaType; MediaType mediaType;
public KmsMediaEvent(MediaEvent event, MediaType mediaType, long createdAt) { public KmsMediaEvent(MediaEvent event, String endpointName, MediaType mediaType, long createdAt) {
super(event, createdAt); super(event, endpointName, createdAt);
this.mediaType = mediaType; this.mediaType = mediaType;
} }
@Override
public JsonObject toJson() {
JsonObject json = super.toJson();
json.addProperty("mediaType", this.mediaType.name());
return json;
}
} }

View File

@ -73,7 +73,10 @@ public abstract class MediaEndpoint {
private final int minSendKbps; private final int minSendKbps;
private KurentoParticipant owner; private KurentoParticipant owner;
private String endpointName; protected String endpointName; // KMS media object identifier. Unique for every MediaEndpoint
protected String streamId; // OpenVidu Stream identifier. Common property for a
// PublisherEndpoint->SubscriberEndpoint flow. Equal to endpointName for
// PublisherEndpoints, different for SubscriberEndpoints
protected Long createdAt; // Timestamp when this [publisher / subscriber] started [publishing / receiving] protected Long createdAt; // Timestamp when this [publisher / subscriber] started [publishing / receiving]
private MediaPipeline pipeline = null; private MediaPipeline pipeline = null;
@ -208,22 +211,22 @@ public abstract class MediaEndpoint {
this.pipeline = pipeline; this.pipeline = pipeline;
} }
/**
* @return name of this endpoint (as indicated by the browser)
*/
public String getEndpointName() { public String getEndpointName() {
return endpointName; return endpointName != null ? endpointName : this.getEndpoint().getName();
} }
/**
* Sets the endpoint's name (as indicated by the browser).
*
* @param endpointName the name
*/
public void setEndpointName(String endpointName) { public void setEndpointName(String endpointName) {
this.endpointName = endpointName; this.endpointName = endpointName;
} }
public String getStreamId() {
return streamId;
}
public void setStreamId(String streamId) {
this.streamId = streamId;
}
/** /**
* Unregisters all error listeners created for media elements owned by this * Unregisters all error listeners created for media elements owned by this
* instance. * instance.
@ -413,7 +416,7 @@ public abstract class MediaEndpoint {
* @see Participant#sendIceCandidate(String, IceCandidate) * @see Participant#sendIceCandidate(String, IceCandidate)
* @throws OpenViduException if thrown, unable to register the listener * @throws OpenViduException if thrown, unable to register the listener
*/ */
protected void registerOnIceCandidateEventListener() throws OpenViduException { protected void registerOnIceCandidateEventListener(String senderPublicId) throws OpenViduException {
if (!this.isWeb()) { if (!this.isWeb()) {
return; return;
} }
@ -424,7 +427,7 @@ public abstract class MediaEndpoint {
webEndpoint.addOnIceCandidateListener(new EventListener<OnIceCandidateEvent>() { webEndpoint.addOnIceCandidateListener(new EventListener<OnIceCandidateEvent>() {
@Override @Override
public void onEvent(OnIceCandidateEvent event) { public void onEvent(OnIceCandidateEvent event) {
owner.sendIceCandidate(endpointName, event.getCandidate()); owner.sendIceCandidate(senderPublicId, endpointName, event.getCandidate());
} }
}); });
} }

View File

@ -74,8 +74,6 @@ public class PublisherEndpoint extends MediaEndpoint {
private Map<String, ListenerSubscription> elementsErrorSubscriptions = new HashMap<String, ListenerSubscription>(); private Map<String, ListenerSubscription> elementsErrorSubscriptions = new HashMap<String, ListenerSubscription>();
private String streamId;
public PublisherEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline, public PublisherEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline,
OpenviduConfig openviduConfig) { OpenviduConfig openviduConfig) {
super(web, owner, endpointName, pipeline, openviduConfig, log); super(web, owner, endpointName, pipeline, openviduConfig, log);
@ -177,7 +175,7 @@ public class PublisherEndpoint extends MediaEndpoint {
*/ */
public synchronized String publish(SdpType sdpType, String sdpString, boolean doLoopback, public synchronized String publish(SdpType sdpType, String sdpString, boolean doLoopback,
MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) { MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) {
registerOnIceCandidateEventListener(); registerOnIceCandidateEventListener(this.getOwner().getParticipantPublicId());
if (doLoopback) { if (doLoopback) {
if (loopbackAlternativeSrc == null) { if (loopbackAlternativeSrc == null) {
connect(this.getEndpoint(), loopbackConnectionType); connect(this.getEndpoint(), loopbackConnectionType);
@ -576,12 +574,4 @@ public class PublisherEndpoint extends MediaEndpoint {
+ this.filterListeners.toString() + ", subscribers: " + this.subscribersToFilterEvents.toString() + "}"; + this.filterListeners.toString() + ", subscribers: " + this.subscribersToFilterEvents.toString() + "}";
} }
public void setStreamId(String publisherStreamId) {
this.streamId = publisherStreamId;
this.getEndpoint().setName(publisherStreamId);
}
public String getStreamId() {
return this.streamId != null ? this.streamId : this.getEndpoint().getName();
}
} }

View File

@ -47,7 +47,7 @@ public class SubscriberEndpoint extends MediaEndpoint {
} }
public synchronized String subscribe(String sdpOffer, PublisherEndpoint publisher) { public synchronized String subscribe(String sdpOffer, PublisherEndpoint publisher) {
registerOnIceCandidateEventListener(); registerOnIceCandidateEventListener(publisher.getOwner().getParticipantPublicId());
String sdpAnswer = processOffer(sdpOffer); String sdpAnswer = processOffer(sdpOffer);
gatherCandidates(); gatherCandidates();
publisher.connect(this.getEndpoint()); publisher.connect(this.getEndpoint());
@ -82,11 +82,6 @@ public class SubscriberEndpoint extends MediaEndpoint {
} catch (NullPointerException ex) { } catch (NullPointerException ex) {
json.addProperty("streamId", "NOT_FOUND"); json.addProperty("streamId", "NOT_FOUND");
} }
try {
json.addProperty("publisher", this.publisher.getEndpointName());
} catch (NullPointerException ex) {
json.addProperty("publisher", "NOT_FOUND");
}
return json; return json;
} }