mirror of https://github.com/OpenVidu/openvidu.git
ov-components: Enhance data handling in SessionComponent and add safeJsonParse utility
parent
9d75a429a6
commit
fb1dc9d95a
|
|
@ -584,6 +584,7 @@
|
|||
"integrity": "sha512-4cKBO9wR75r0BeIWWWId9XK9Lj6La5X846Zw9dFfzMRw38IlTk2iCcUt6hsyiDRcPidc55ZParFYDXi0nXOeLQ==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"esbuild": "^0.25.0",
|
||||
"fdir": "^6.5.0",
|
||||
|
|
@ -1000,18 +1001,6 @@
|
|||
"win32"
|
||||
]
|
||||
},
|
||||
"node_modules/@angular-devkit/build-angular/node_modules/@types/node": {
|
||||
"version": "24.10.0",
|
||||
"resolved": "https://registry.npmjs.org/@types/node/-/node-24.10.0.tgz",
|
||||
"integrity": "sha512-qzQZRBqkFsYyaSWXuEHc2WR9c0a0CXwiE5FWUvn7ZM+vdy1uZLfCunD38UzhuB7YN/J11ndbDBcTmOdxJo9Q7A==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"optional": true,
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"undici-types": "~7.16.0"
|
||||
}
|
||||
},
|
||||
"node_modules/@angular-devkit/build-angular/node_modules/rollup": {
|
||||
"version": "4.52.3",
|
||||
"resolved": "https://registry.npmjs.org/rollup/-/rollup-4.52.3.tgz",
|
||||
|
|
@ -14810,6 +14799,7 @@
|
|||
"integrity": "sha512-SL0JY3DaxylDuo/MecFeiC+7pedM0zia33zl0vcjgwcq1q1FWWF1To9EIauPbl8GbMCU0R2e0uJ8bZunhYKD2g==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"cli-truncate": "^4.0.0",
|
||||
"colorette": "^2.0.20",
|
||||
|
|
@ -21794,6 +21784,7 @@
|
|||
"integrity": "sha512-QcQ72gh8a+7JO63TAx/6XZf/CWhgMzu5m0QirvPfGvptOusAxG12w2+aua1Jkjr7hzaWDnJ2n6JFeexMHI+Zjg==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"@types/bonjour": "^3.5.13",
|
||||
"@types/connect-history-api-fallback": "^1.5.4",
|
||||
|
|
|
|||
|
|
@ -51,6 +51,7 @@ import { RecordingStatus } from '../../models/recording.model';
|
|||
import { TemplateManagerService, SessionTemplateConfiguration } from '../../services/template/template-manager.service';
|
||||
import { ViewportService } from '../../services/viewport/viewport.service';
|
||||
import { E2eeService } from '../../services/e2ee/e2ee.service';
|
||||
import { safeJsonParse } from '../../utils/utils';
|
||||
|
||||
/**
|
||||
* @internal
|
||||
|
|
@ -464,105 +465,138 @@ export class SessionComponent implements OnInit, OnDestroy {
|
|||
this.room.on(
|
||||
RoomEvent.DataReceived,
|
||||
async (payload: Uint8Array, participant?: RemoteParticipant, _?: DataPacket_Kind, topic?: string) => {
|
||||
const storedParticipant = this.participantService.getRemoteParticipantBySid(participant?.sid || '');
|
||||
if (!storedParticipant) {
|
||||
this.log.w('DataReceived from unknown participant', participant);
|
||||
return;
|
||||
}
|
||||
|
||||
const { identity: participantIdentity, name: participantName } = storedParticipant;
|
||||
// Decrypt payload if it's a CHAT message and E2EE is enabled
|
||||
let decryptedPayload: Uint8Array = payload;
|
||||
if (topic === DataTopic.CHAT && this.e2eeService.isEnabled) {
|
||||
decryptedPayload = await this.e2eeService.decryptOrMask(
|
||||
payload,
|
||||
participantIdentity,
|
||||
JSON.stringify({ message: '******' }) // The fallback text must be a valid JSON
|
||||
);
|
||||
}
|
||||
|
||||
// Decode and parse the JSON event
|
||||
let event: any;
|
||||
try {
|
||||
event = JSON.parse(new TextDecoder().decode(decryptedPayload));
|
||||
const rawText = new TextDecoder().decode(payload);
|
||||
this.log.d('DataReceived (raw)', { topic });
|
||||
|
||||
const message = safeJsonParse(rawText);
|
||||
if (!message) {
|
||||
this.log.w('Discarding data: malformed JSON', rawText);
|
||||
return;
|
||||
}
|
||||
|
||||
const fromServer = participant === undefined;
|
||||
|
||||
// Validate source and resolve participant info
|
||||
const storedParticipant = participant
|
||||
? this.participantService.getRemoteParticipantBySid(participant.sid || '')
|
||||
: undefined;
|
||||
if (participant && !storedParticipant) {
|
||||
this.log.w('DataReceived from unknown participant', participant);
|
||||
return;
|
||||
}
|
||||
if (!fromServer && !participant) {
|
||||
this.log.w('DataReceived from unknown source', payload);
|
||||
return;
|
||||
}
|
||||
|
||||
const participantIdentity = storedParticipant?.identity || '';
|
||||
const participantName = storedParticipant?.name || '';
|
||||
|
||||
// Decrypt if required
|
||||
const decryptedPayload = await this.decryptIfNeeded(topic, payload, participantIdentity);
|
||||
|
||||
// Parse event payload after possible decryption
|
||||
const event = safeJsonParse(new TextDecoder().decode(decryptedPayload));
|
||||
if (!event) {
|
||||
this.log.e('Error parsing data message after decryption');
|
||||
return;
|
||||
}
|
||||
this.log.d(`Data event received: ${topic}`);
|
||||
} catch (parseError) {
|
||||
this.log.e('Error parsing data message:', parseError);
|
||||
return; // Can't process malformed data
|
||||
}
|
||||
|
||||
// Handle the event based on topic
|
||||
switch (topic) {
|
||||
case DataTopic.CHAT:
|
||||
this.chatService.addRemoteMessage(event.message, participantName || participantIdentity || 'Unknown');
|
||||
break;
|
||||
case DataTopic.RECORDING_STARTING:
|
||||
this.log.d('Recording is starting', event);
|
||||
this.recordingService.setRecordingStarting();
|
||||
break;
|
||||
case DataTopic.RECORDING_STARTED:
|
||||
this.log.d('Recording has been started', event);
|
||||
this.recordingService.setRecordingStarted(event);
|
||||
break;
|
||||
case DataTopic.RECORDING_STOPPING:
|
||||
this.log.d('Recording is stopping', event);
|
||||
this.recordingService.setRecordingStopping();
|
||||
break;
|
||||
case DataTopic.RECORDING_STOPPED:
|
||||
this.log.d('RECORDING_STOPPED', event);
|
||||
this.recordingService.setRecordingStopped(event);
|
||||
break;
|
||||
|
||||
case DataTopic.RECORDING_DELETED:
|
||||
this.log.d('RECORDING_DELETED', event);
|
||||
this.recordingService.deleteRecording(event);
|
||||
break;
|
||||
|
||||
case DataTopic.RECORDING_FAILED:
|
||||
this.log.d('RECORDING_FAILED', event);
|
||||
this.recordingService.setRecordingFailed(event.error);
|
||||
break;
|
||||
|
||||
case DataTopic.BROADCASTING_STARTING:
|
||||
this.broadcastingService.setBroadcastingStarting();
|
||||
break;
|
||||
case DataTopic.BROADCASTING_STARTED:
|
||||
this.log.d('Broadcasting has been started', event);
|
||||
this.broadcastingService.setBroadcastingStarted(event);
|
||||
break;
|
||||
|
||||
case DataTopic.BROADCASTING_STOPPING:
|
||||
this.broadcastingService.setBroadcastingStopping();
|
||||
break;
|
||||
case DataTopic.BROADCASTING_STOPPED:
|
||||
this.broadcastingService.setBroadcastingStopped();
|
||||
break;
|
||||
|
||||
case DataTopic.BROADCASTING_FAILED:
|
||||
this.broadcastingService.setBroadcastingFailed(event.error);
|
||||
break;
|
||||
|
||||
case DataTopic.ROOM_STATUS:
|
||||
const { recordingList, isRecordingStarted, isBroadcastingStarted, broadcastingId } = event as RoomStatusData;
|
||||
|
||||
if (this.libService.showRecordingActivityRecordingsList()) {
|
||||
this.recordingService.setRecordingList(recordingList);
|
||||
}
|
||||
if (isRecordingStarted) {
|
||||
const recordingActive = recordingList.find((recording) => recording.status === RecordingStatus.STARTED);
|
||||
this.recordingService.setRecordingStarted(recordingActive);
|
||||
}
|
||||
if (isBroadcastingStarted) {
|
||||
this.broadcastingService.setBroadcastingStarted(broadcastingId);
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
// Dispatch handling
|
||||
this.handleDataEvent(topic, event, participantName || participantIdentity || 'Unknown');
|
||||
} catch (err) {
|
||||
this.log.e('Unhandled error processing DataReceived', err);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private handleDataEvent(topic: string | undefined, event: any, participantName: string) {
|
||||
// Handle the event based on topic
|
||||
switch (topic) {
|
||||
case DataTopic.CHAT:
|
||||
this.chatService.addRemoteMessage(event.message, participantName);
|
||||
break;
|
||||
case DataTopic.RECORDING_STARTING:
|
||||
this.log.d('Recording is starting', event);
|
||||
this.recordingService.setRecordingStarting();
|
||||
break;
|
||||
case DataTopic.RECORDING_STARTED:
|
||||
this.log.d('Recording has been started', event);
|
||||
this.recordingService.setRecordingStarted(event);
|
||||
break;
|
||||
case DataTopic.RECORDING_STOPPING:
|
||||
this.log.d('Recording is stopping', event);
|
||||
this.recordingService.setRecordingStopping();
|
||||
break;
|
||||
case DataTopic.RECORDING_STOPPED:
|
||||
this.log.d('RECORDING_STOPPED', event);
|
||||
this.recordingService.setRecordingStopped(event);
|
||||
break;
|
||||
|
||||
case DataTopic.RECORDING_DELETED:
|
||||
this.log.d('RECORDING_DELETED', event);
|
||||
this.recordingService.deleteRecording(event);
|
||||
break;
|
||||
|
||||
case DataTopic.RECORDING_FAILED:
|
||||
this.log.d('RECORDING_FAILED', event);
|
||||
this.recordingService.setRecordingFailed(event.error);
|
||||
break;
|
||||
|
||||
case DataTopic.BROADCASTING_STARTING:
|
||||
this.broadcastingService.setBroadcastingStarting();
|
||||
break;
|
||||
case DataTopic.BROADCASTING_STARTED:
|
||||
this.log.d('Broadcasting has been started', event);
|
||||
this.broadcastingService.setBroadcastingStarted(event);
|
||||
break;
|
||||
|
||||
case DataTopic.BROADCASTING_STOPPING:
|
||||
this.broadcastingService.setBroadcastingStopping();
|
||||
break;
|
||||
case DataTopic.BROADCASTING_STOPPED:
|
||||
this.broadcastingService.setBroadcastingStopped();
|
||||
break;
|
||||
|
||||
case DataTopic.BROADCASTING_FAILED:
|
||||
this.broadcastingService.setBroadcastingFailed(event.error);
|
||||
break;
|
||||
|
||||
case DataTopic.ROOM_STATUS:
|
||||
const { recordingList, isRecordingStarted, isBroadcastingStarted, broadcastingId } = event as RoomStatusData;
|
||||
|
||||
if (this.libService.showRecordingActivityRecordingsList()) {
|
||||
this.recordingService.setRecordingList(recordingList);
|
||||
}
|
||||
if (isRecordingStarted) {
|
||||
const recordingActive = recordingList.find((recording) => recording.status === RecordingStatus.STARTED);
|
||||
this.recordingService.setRecordingStarted(recordingActive);
|
||||
}
|
||||
if (isBroadcastingStarted) {
|
||||
this.broadcastingService.setBroadcastingStarted(broadcastingId);
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private async decryptIfNeeded(topic: string | undefined, payload: Uint8Array, identity: string): Promise<Uint8Array> {
|
||||
if (topic === DataTopic.CHAT && this.e2eeService.isEnabled) {
|
||||
try {
|
||||
return await this.e2eeService.decryptOrMask(payload, identity, JSON.stringify({ message: '******' }));
|
||||
} catch (e) {
|
||||
this.log.e('Error decrypting payload, using masked fallback', e);
|
||||
// In case of decryption error, return a masked JSON so subsequent parsing won't crash
|
||||
return new TextEncoder().encode(JSON.stringify({ message: '******' }));
|
||||
}
|
||||
}
|
||||
return payload;
|
||||
}
|
||||
|
||||
private subscribeToReconnection() {
|
||||
this.room.on(RoomEvent.Reconnecting, () => {
|
||||
this.log.w('Connection lost: Reconnecting');
|
||||
|
|
|
|||
|
|
@ -0,0 +1,7 @@
|
|||
export const safeJsonParse = <T = any>(text: string): T | null => {
|
||||
try {
|
||||
return JSON.parse(text) as T;
|
||||
} catch (e) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
Loading…
Reference in New Issue