human/demo/facematch/node-match.js

185 lines
8.1 KiB
JavaScript

/**
* Human demo app for NodeJS that generates random facial descriptors
* and uses NodeJS multi-threading to start multiple threads for face matching
* uses `node-match-worker.js` to perform actual face matching analysis
*/
const fs = require('fs');
const path = require('path');
const threads = require('worker_threads');
const log = require('@vladmandic/pilogger'); // eslint-disable-line node/no-unpublished-require
// global optinos
const options = {
dbFile: 'demo/facematch/faces.json', // sample face db
dbMax: 10000, // maximum number of records to hold in memory
threadPoolSize: 12, // number of worker threads to create in thread pool
workerSrc: './node-match-worker.js', // code that executes in the worker thread
debug: false, // verbose messages
minThreshold: 0.5, // match returns first record that meets the similarity threshold, set to 0 to always scan all records
descLength: 1024, // descriptor length
};
// test options
const testOptions = {
dbFact: 175, // load db n times to fake huge size
maxJobs: 200, // exit after processing this many jobs
fuzDescriptors: true, // randomize descriptor content before match for harder jobs
};
// global data structures
const data = {
/** @type string[] */
labels: [], // array of strings, length of array serves as overal number of records so has to be maintained carefully
/** @type SharedArrayBuffer | null */
buffer: null,
/** @type Float32Array | null */
view: null,
/** @type threads.Worker[] */
workers: [], // holds instance of workers. worker can be null if exited
requestID: 0, // each request should increment this counter as its used for round robin assignment
};
let t0 = process.hrtime.bigint(); // used for perf counters
const appendRecords = (labels, descriptors) => {
if (!data.view) return 0;
if (descriptors.length !== labels.length) {
log.error('append error:', { descriptors: descriptors.length, labels: labels.length });
}
// if (options.debug) log.state('appending:', { descriptors: descriptors.length, labels: labels.length });
for (let i = 0; i < descriptors.length; i++) {
for (let j = 0; j < descriptors[i].length; j++) {
data.view[data.labels.length * descriptors[i].length + j] = descriptors[i][j]; // add each descriptors element to buffer
}
data.labels.push(labels[i]); // finally add to labels
}
for (const worker of data.workers) { // inform all workers how many records we have
if (worker) worker.postMessage({ records: data.labels.length });
}
return data.labels.length;
};
const getLabel = (index) => data.labels[index];
const getDescriptor = (index) => {
if (!data.view) return [];
const descriptor = [];
for (let i = 0; i < 1024; i++) descriptor.push(data.view[index * options.descLength + i]);
return descriptor;
};
const fuzDescriptor = (descriptor) => {
for (let i = 0; i < descriptor.length; i++) descriptor[i] += Math.random() - 0.5;
return descriptor;
};
const delay = (ms) => new Promise((resolve) => { setTimeout(resolve, ms); });
async function workersClose() {
const current = data.workers.filter((worker) => !!worker).length;
log.info('closing workers:', { poolSize: data.workers.length, activeWorkers: current });
for (const worker of data.workers) {
if (worker) worker.postMessage({ shutdown: true }); // tell worker to exit
}
await delay(250); // wait a little for threads to exit on their own
const remaining = data.workers.filter((worker) => !!worker).length;
if (remaining > 0) {
log.info('terminating remaining workers:', { remaining: current, pool: data.workers.length });
for (const worker of data.workers) {
if (worker) worker.terminate(); // if worker did not exit cleany terminate it
}
}
}
const workerMessage = (index, msg) => {
if (msg.request) {
if (options.debug) log.data('message:', { worker: index, request: msg.request, time: msg.time, label: getLabel(msg.index), similarity: msg.similarity });
if (msg.request >= testOptions.maxJobs) {
const t1 = process.hrtime.bigint();
const elapsed = Math.round(Number(t1 - t0) / 1000 / 1000);
log.state({ matchJobsFinished: testOptions.maxJobs, totalTimeMs: elapsed, averageTimeMs: Math.round(100 * elapsed / testOptions.maxJobs) / 100 });
workersClose();
}
} else {
log.data('message:', { worker: index, msg });
}
};
async function workerClose(id, code) {
const previous = data.workers.filter((worker) => !!worker).length;
delete data.workers[id];
const current = data.workers.filter((worker) => !!worker).length;
if (options.debug) log.state('worker exit:', { id, code, previous, current });
}
async function workersStart(numWorkers) {
const previous = data.workers.filter((worker) => !!worker).length;
log.info('starting worker thread pool:', { totalWorkers: numWorkers, alreadyActive: previous });
for (let i = 0; i < numWorkers; i++) {
if (!data.workers[i]) { // worker does not exist, so create it
const worker = new threads.Worker(path.join(__dirname, options.workerSrc));
worker.on('message', (msg) => workerMessage(i, msg));
worker.on('error', (err) => log.error('worker error:', { err }));
worker.on('exit', (code) => workerClose(i, code));
worker.postMessage(data.buffer); // send buffer to worker
data.workers[i] = worker;
}
data.workers[i]?.postMessage({ records: data.labels.length, threshold: options.minThreshold, debug: options.debug }); // inform worker how many records there are
}
await delay(100); // just wait a bit for everything to settle down
}
const match = (descriptor) => {
// const arr = Float32Array.from(descriptor);
const buffer = new ArrayBuffer(options.descLength * 4);
const view = new Float32Array(buffer);
view.set(descriptor);
const available = data.workers.filter((worker) => !!worker).length; // find number of available workers
if (available > 0) data.workers[data.requestID % available].postMessage({ descriptor: buffer, request: data.requestID }, [buffer]); // round robin to first available worker
else log.error('no available workers');
};
async function loadDB(count) {
const previous = data.labels.length;
if (!fs.existsSync(options.dbFile)) {
log.error('db file does not exist:', options.dbFile);
return;
}
t0 = process.hrtime.bigint();
for (let i = 0; i < count; i++) { // test loop: load entire face db from array of objects n times into buffer
const db = JSON.parse(fs.readFileSync(options.dbFile).toString());
const names = db.map((record) => record.name);
const descriptors = db.map((record) => record.embedding);
appendRecords(names, descriptors);
}
log.data('db loaded:', { existingRecords: previous, newRecords: data.labels.length });
}
async function createBuffer() {
data.buffer = new SharedArrayBuffer(4 * options.dbMax * options.descLength); // preallocate max number of records as sharedarraybuffers cannot grow
data.view = new Float32Array(data.buffer); // create view into buffer
data.labels.length = 0;
log.data('created shared buffer:', { maxDescriptors: (data.view.length || 0) / options.descLength, totalBytes: data.buffer.byteLength, totalElements: data.view.length });
}
async function main() {
log.header();
log.info('options:', options);
await createBuffer(); // create shared buffer array
await loadDB(testOptions.dbFact); // loadDB is a test method that calls actual addRecords
await workersStart(options.threadPoolSize); // can be called at anytime to modify worker pool size
for (let i = 0; i < testOptions.maxJobs; i++) {
const idx = Math.trunc(data.labels.length * Math.random()); // grab a random descriptor index that we'll search for
const descriptor = getDescriptor(idx); // grab a descriptor at index
data.requestID++; // increase request id
if (testOptions.fuzDescriptors) match(fuzDescriptor(descriptor)); // fuz descriptor for harder match
else match(descriptor);
if (options.debug) log.info('submited job', data.requestID); // we already know what we're searching for so we can compare results
}
log.state('submitted:', { matchJobs: testOptions.maxJobs, poolSize: data.workers.length, activeWorkers: data.workers.filter((worker) => !!worker).length });
}
main();