add node-match advanced example using worker thread pool

pull/193/head
Vladimir Mandic 2021-10-13 08:06:11 -04:00
parent 7b61812dff
commit 0a14ea8bca
9 changed files with 939 additions and 636 deletions

View File

@ -11,6 +11,7 @@
### **HEAD -> main** 2021/10/12 mandic00@live.com
- optimize image preprocessing
### **release: 2.3.2** 2021/10/11 mandic00@live.com

View File

@ -30,6 +30,7 @@ Experimental support only until support is officially added in Chromium
- Optical Flow: <https://docs.opencv.org/3.3.1/db/d7f/tutorial_js_lucas_kanade.html>
- TFLite Models: <https://js.tensorflow.org/api_tflite/0.0.1-alpha.4/>
- Histogram Equalization: Regular, Adaptive, Contrast Limited
<br><hr><br>

View File

@ -0,0 +1,69 @@
const threads = require('worker_threads');
let debug = false;
/** @type SharedArrayBuffer */
let buffer;
/** @type Float32Array */
let view;
let threshold = 0;
let records = 0;
const descLength = 1024; // descriptor length in bytes
function distance(descriptor1, index, options = { order: 2 }) {
let sum = 0;
for (let i = 0; i < descriptor1.length; i++) {
const diff = (options.order === 2) ? (descriptor1[i] - view[index * descLength + i]) : (Math.abs(descriptor1[i] - view[index * descLength + i]));
sum += (options.order === 2) ? (diff * diff) : (diff ** options.order);
}
return sum;
}
function match(descriptor, options = { order: 2 }) {
let best = Number.MAX_SAFE_INTEGER;
let index = -1;
for (let i = 0; i < records; i++) {
const res = distance(descriptor, i, { order: options.order });
if (res < best) {
best = res;
index = i;
}
if (best < threshold || best === 0) break; // short circuit
}
best = (options.order === 2) ? Math.sqrt(best) : best ** (1 / options.order);
return { index, distance: best, similarity: Math.max(0, 100 - best) / 100.0 };
}
threads.parentPort?.on('message', (msg) => {
if (typeof msg.descriptor !== 'undefined') { // actual work order to find a match
const t0 = performance.now();
const result = match(msg.descriptor);
const t1 = performance.now();
threads.parentPort?.postMessage({ request: msg.request, time: Math.trunc(t1 - t0), ...result });
return; // short circuit
}
if (msg instanceof SharedArrayBuffer) { // called only once to receive reference to shared array buffer
buffer = msg;
view = new Float32Array(buffer); // initialize f64 view into buffer
if (debug) threads.parentPort?.postMessage(`buffer: ${buffer?.byteLength}`);
}
if (typeof msg.records !== 'undefined') { // recived every time when number of records changes
records = msg.records;
if (debug) threads.parentPort?.postMessage(`records: ${records}`);
}
if (typeof msg.debug !== 'undefined') { // set verbose logging
debug = msg.debug;
if (debug) threads.parentPort?.postMessage(`debug: ${debug}`);
}
if (typeof msg.threshold !== 'undefined') { // set minimum similarity threshold
threshold = msg.threshold;
if (debug) threads.parentPort?.postMessage(`threshold: ${threshold}`);
}
if (typeof msg.shutdown !== 'undefined') { // got message to close worker
if (debug) threads.parentPort?.postMessage('shutting down');
process.exit(0);
}
});
if (debug) threads.parentPort?.postMessage('started');

View File

@ -0,0 +1,174 @@
const fs = require('fs');
const path = require('path');
const log = require('@vladmandic/pilogger');
const threads = require('worker_threads');
// global optinos
const options = {
dbFile: './faces.json', // sample face db
dbMax: 10000, // maximum number of records to hold in memory
threadPoolSize: 6, // number of worker threads to create in thread pool
workerSrc: './node-match-worker.js', // code that executes in the worker thread
debug: false, // verbose messages
minThreshold: 0.9, // match returns first record that meets the similarity threshold, set to 0 to always scan all records
descLength: 1024, // descriptor length
};
// test options
const testOptions = {
dbFact: 100, // load db n times to fake huge size
maxJobs: 100, // exit after processing this many jobs
fuzDescriptors: true, // randomize descriptor content before match for harder jobs
};
// global data structures
const data = {
/** @type string[] */
labels: [], // array of strings, length of array serves as overal number of records so has to be maintained carefully
/** @type SharedArrayBuffer | null */
buffer: null,
/** @type Float32Array | null */
view: null,
/** @type threads.Worker[] */
workers: [], // holds instance of workers. worker can be null if exited
requestID: 0, // each request should increment this counter as its used for round robin assignment
};
let t0 = process.hrtime.bigint(); // used for perf counters
const appendRecords = (labels, descriptors) => {
if (!data.view) return 0;
if (descriptors.length !== labels.length) {
log.error('append error:', { descriptors: descriptors.length, labels: labels.length });
}
// if (options.debug) log.state('appending:', { descriptors: descriptors.length, labels: labels.length });
for (let i = 0; i < descriptors.length; i++) {
for (let j = 0; j < descriptors[i].length; j++) {
data.view[data.labels.length * descriptors[i].length + j] = descriptors[i][j]; // add each descriptors element to buffer
}
data.labels.push(labels[i]); // finally add to labels
}
for (const worker of data.workers) { // inform all workers how many records we have
if (worker) worker.postMessage({ records: data.labels.length });
}
return data.labels.length;
};
const getLabel = (index) => data.labels[index];
const getDescriptor = (index) => {
if (!data.view) return [];
const descriptor = [];
for (let i = 0; i < 1024; i++) descriptor.push(data.view[index * options.descLength + i]);
return descriptor;
};
const fuzDescriptor = (descriptor) => {
for (let i = 0; i < descriptor.length; i++) descriptor[i] += Math.random() - 0.5;
return descriptor;
};
const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
async function workersClose() {
const current = data.workers.filter((worker) => !!worker).length;
log.info('closing workers:', { poolSize: data.workers.length, activeWorkers: current });
for (const worker of data.workers) {
if (worker) worker.postMessage({ shutdown: true }); // tell worker to exit
}
await delay(250); // wait a little for threads to exit on their own
const remaining = data.workers.filter((worker) => !!worker).length;
if (remaining > 0) {
log.info('terminating remaining workers:', { remaining: current, pool: data.workers.length });
for (const worker of data.workers) {
if (worker) worker.terminate(); // if worker did not exit cleany terminate it
}
}
}
const workerMessage = (index, msg) => {
if (msg.request) {
if (options.debug) log.data('message:', { worker: index, request: msg.request, time: msg.time, label: getLabel(msg.index), similarity: msg.similarity });
if (msg.request >= testOptions.maxJobs) {
const t1 = process.hrtime.bigint();
const elapsed = Math.round(Number(t1 - t0) / 1000 / 1000);
log.state({ matchJobsFinished: testOptions.maxJobs, totalTimeMs: elapsed, averageTimeMs: Math.round(100 * elapsed / testOptions.maxJobs) / 100 });
workersClose();
}
} else {
log.data('message:', { worker: index, msg });
}
};
async function workerClose(id, code) {
const previous = data.workers.filter((worker) => !!worker).length;
delete data.workers[id];
const current = data.workers.filter((worker) => !!worker).length;
if (options.debug) log.state('worker exit:', { id, code, previous, current });
}
async function workersStart(numWorkers) {
const previous = data.workers.filter((worker) => !!worker).length;
log.info('starting worker thread pool:', { totalWorkers: numWorkers, alreadyActive: previous });
for (let i = 0; i < numWorkers; i++) {
if (!data.workers[i]) { // worker does not exist, so create it
const worker = new threads.Worker(path.join(__dirname, options.workerSrc));
worker.on('message', (msg) => workerMessage(i, msg));
worker.on('error', (err) => log.error('worker error:', { err }));
worker.on('exit', (code) => workerClose(i, code));
worker.postMessage(data.buffer); // send buffer to worker
data.workers[i] = worker;
}
data.workers[i]?.postMessage({ records: data.labels.length, threshold: options.minThreshold, debug: options.debug }); // inform worker how many records there are
}
await delay(100); // just wait a bit for everything to settle down
}
const match = (descriptor) => {
const available = data.workers.filter((worker) => !!worker).length; // find number of available workers
if (available > 0) data.workers[data.requestID % available].postMessage({ descriptor, request: data.requestID }); // round robin to first available worker
else log.error('no available workers');
};
async function loadDB(count) {
const previous = data.labels.length;
if (!fs.existsSync(options.dbFile)) {
log.error('db file does not exist:', options.dbFile);
return;
}
t0 = process.hrtime.bigint();
for (let i = 0; i < count; i++) { // test loop: load entire face db from array of objects n times into buffer
const db = JSON.parse(fs.readFileSync(options.dbFile).toString());
const names = db.map((record) => record.name);
const descriptors = db.map((record) => record.embedding);
appendRecords(names, descriptors);
}
log.data('db loaded:', { existingRecords: previous, newRecords: data.labels.length });
}
async function createBuffer() {
data.buffer = new SharedArrayBuffer(4 * options.dbMax * options.descLength); // preallocate max number of records as sharedarraybuffers cannot grow
data.view = new Float32Array(data.buffer); // create view into buffer
data.labels.length = 0;
log.data('created shared buffer:', { maxDescriptors: data.view?.length / options.descLength, totalBytes: data.buffer.byteLength, totalElements: data.view?.length });
}
async function main() {
log.header();
log.info('options:', options);
await createBuffer(); // create shared buffer array
await loadDB(testOptions.dbFact); // loadDB is a test method that calls actual addRecords
await workersStart(options.threadPoolSize); // can be called at anytime to modify worker pool size
for (let i = 0; i < testOptions.maxJobs; i++) {
const idx = Math.trunc(data.labels.length * Math.random()); // grab a random descriptor index that we'll search for
const descriptor = getDescriptor(idx); // grab a descriptor at index
data.requestID++; // increase request id
if (testOptions.fuzDescriptors) match(fuzDescriptor(descriptor)); // fuz descriptor for harder match
else match(descriptor);
if (options.debug) log.info('submited job', data.requestID); // we already know what we're searching for so we can compare results
}
log.state('submitted:', { matchJobs: testOptions.maxJobs, poolSize: data.workers.length, activeWorkers: data.workers.filter((worker) => !!worker).length });
}
main();

View File

@ -0,0 +1,60 @@
# NodeJS Multi-Threading Match Solution
See `node-match.js` and `node-match-worker.js`
## Methods and Properties in `node-match`
- `createBuffer`: create shared buffer array
single copy of data regardless of number of workers
fixed size based on `options.dbMax`
- `appendRecord`: add additional batch of descriptors to buffer
can append batch of records to buffer at anytime
workers are informed of the new content after append has been completed
- `workersStart`: start or expand pool of `threadPoolSize` workers
each worker runs `node-match-worker` and listens for messages from main thread
can shutdown workers or create additional worker threads on-the-fly
safe against workers that exit
- `workersClose`: close workers in a pool
first request workers to exit then terminate after timeout
- `match`: dispach a match job to a worker
returns first match that satisfies `minThreshold`
assigment to workers using round-robin
since timing for each job is near-fixed and predictable
- `getDescriptor`: get descriptor array for a given id from a buffer
- `fuzDescriptor`: small randomize descriptor content for harder match
- `getLabel`: fetch label for resolved descriptor index
- `loadDB`: load face database from a JSON file `dbFile`
extracts descriptors and adds them to buffer
extracts labels and maintains them in main thread
for test purposes loads same database `dbFact` times to create a very large database
`node-match` runs in a listens for messages from workers until `maxJobs` have been reached
## Performance
Linear performance decrease that depends on number of records in database
Non-linear performance that increases with number of worker threads due to communication overhead
- Face dataase with 10k records:
> threadPoolSize: 1 => ~60 ms / match job
> threadPoolSize: 6 => ~25 ms / match job
- Face database with 50k records:
> threadPoolSize: 1 => ~300 ms / match job
> threadPoolSize: 6 => ~100 ms / match job
- Face database with 100k records:
> threadPoolSize: 1 => ~600 ms / match job
> threadPoolSize: 6 => ~200 ms / match job
## Example
> node node-match
```js
2021-10-13 07:53:36 INFO: options: { dbFile: './faces.json', dbMax: 10000, threadPoolSize: 6, workerSrc: './node-match-worker.js', debug: false, minThreshold: 0.9, descLength: 1024 }
2021-10-13 07:53:36 DATA: created shared buffer: { maxDescriptors: 10000, totalBytes: 40960000, totalElements: 10240000 }
2021-10-13 07:53:36 DATA: db loaded: { existingRecords: 0, newRecords: 5700 }
2021-10-13 07:53:36 INFO: starting worker thread pool: { totalWorkers: 6, alreadyActive: 0 }
2021-10-13 07:53:36 STATE: submitted: { matchJobs: 100, poolSize: 6, activeWorkers: 6 }
2021-10-13 07:53:38 STATE: { matchJobsFinished: 100, totalTimeMs: 1769, averageTimeMs: 17.69 }
2021-10-13 07:53:38 INFO: closing workers: { poolSize: 6, activeWorkers: 6 }
```

View File

@ -55,6 +55,7 @@
"tensorflow"
],
"devDependencies": {
"@tensorflow/tfjs": "^3.9.0",
"@tensorflow/tfjs-backend-cpu": "^3.9.0",
"@tensorflow/tfjs-backend-wasm": "^3.9.0",
"@tensorflow/tfjs-backend-webgl": "^3.9.0",
@ -63,30 +64,27 @@
"@tensorflow/tfjs-core": "^3.9.0",
"@tensorflow/tfjs-data": "^3.9.0",
"@tensorflow/tfjs-layers": "^3.9.0",
"@tensorflow/tfjs-node-gpu": "^3.9.0",
"@tensorflow/tfjs-node": "^3.9.0",
"@tensorflow/tfjs": "^3.9.0",
"@types/node": "^16.10.4",
"@tensorflow/tfjs-node-gpu": "^3.9.0",
"@types/node": "^16.10.5",
"@typescript-eslint/eslint-plugin": "^5.0.0",
"@typescript-eslint/parser": "^5.0.0",
"@vladmandic/build": "^0.6.0",
"@vladmandic/pilogger": "^0.3.3",
"canvas": "^2.8.0",
"dayjs": "^1.10.7",
"esbuild": "^0.13.4",
"esbuild": "^0.13.5",
"eslint": "8.0.0",
"eslint-config-airbnb-base": "^14.2.1",
"eslint-plugin-import": "^2.25.1",
"eslint-plugin-import": "^2.25.2",
"eslint-plugin-json": "^3.1.0",
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-promise": "^5.1.0",
"eslint": "8.0.0",
"node-fetch": "^3.0.0",
"rimraf": "^3.0.2",
"seedrandom": "^3.0.5",
"tslib": "^2.3.1",
"typedoc": "0.22.5",
"typescript": "4.4.3"
},
"dependencies": {
"typescript": "4.4.4"
}
}

View File

@ -1,24 +1,24 @@
2021-10-12 14:13:03 INFO:  @vladmandic/human version 2.3.3
2021-10-12 14:13:03 INFO:  User: vlado Platform: linux Arch: x64 Node: v16.10.0
2021-10-12 14:13:03 INFO:  Application: {"name":"@vladmandic/human","version":"2.3.3"}
2021-10-12 14:13:03 INFO:  Environment: {"profile":"production","config":"build.json","tsconfig":true,"eslintrc":true,"git":true}
2021-10-12 14:13:03 INFO:  Toolchain: {"build":"0.6.0","esbuild":"0.13.4","typescript":"4.4.3","typedoc":"0.22.5","eslint":"8.0.0"}
2021-10-12 14:13:03 INFO:  Build: {"profile":"production","steps":["clean","compile","typings","typedoc","lint","changelog"]}
2021-10-12 14:13:03 STATE: Clean: {"locations":["dist/*","types/*","typedoc/*"]}
2021-10-12 14:13:03 STATE: Compile: {"name":"tfjs/nodejs/cpu","format":"cjs","platform":"node","input":"tfjs/tf-node.ts","output":"dist/tfjs.esm.js","files":1,"inputBytes":102,"outputBytes":1275}
2021-10-12 14:13:03 STATE: Compile: {"name":"human/nodejs/cpu","format":"cjs","platform":"node","input":"src/human.ts","output":"dist/human.node.js","files":53,"inputBytes":515658,"outputBytes":428192}
2021-10-12 14:13:03 STATE: Compile: {"name":"tfjs/nodejs/gpu","format":"cjs","platform":"node","input":"tfjs/tf-node-gpu.ts","output":"dist/tfjs.esm.js","files":1,"inputBytes":110,"outputBytes":1283}
2021-10-12 14:13:03 STATE: Compile: {"name":"human/nodejs/gpu","format":"cjs","platform":"node","input":"src/human.ts","output":"dist/human.node-gpu.js","files":53,"inputBytes":515666,"outputBytes":428196}
2021-10-12 14:13:03 STATE: Compile: {"name":"tfjs/nodejs/wasm","format":"cjs","platform":"node","input":"tfjs/tf-node-wasm.ts","output":"dist/tfjs.esm.js","files":1,"inputBytes":149,"outputBytes":1350}
2021-10-12 14:13:03 STATE: Compile: {"name":"human/nodejs/wasm","format":"cjs","platform":"node","input":"src/human.ts","output":"dist/human.node-wasm.js","files":53,"inputBytes":515733,"outputBytes":428268}
2021-10-12 14:13:03 STATE: Compile: {"name":"tfjs/browser/version","format":"esm","platform":"browser","input":"tfjs/tf-version.ts","output":"dist/tfjs.version.js","files":1,"inputBytes":1063,"outputBytes":1631}
2021-10-12 14:13:03 STATE: Compile: {"name":"tfjs/browser/esm/nobundle","format":"esm","platform":"browser","input":"tfjs/tf-browser.ts","output":"dist/tfjs.esm.js","files":2,"inputBytes":3085,"outputBytes":856}
2021-10-12 14:13:03 STATE: Compile: {"name":"human/browser/esm/nobundle","format":"esm","platform":"browser","input":"src/human.ts","output":"dist/human.esm-nobundle.js","files":53,"inputBytes":515239,"outputBytes":429801}
2021-10-12 14:13:04 STATE: Compile: {"name":"tfjs/browser/esm/bundle","format":"esm","platform":"browser","input":"tfjs/tf-browser.ts","output":"dist/tfjs.esm.js","files":8,"inputBytes":3085,"outputBytes":2691961}
2021-10-12 14:13:05 STATE: Compile: {"name":"human/browser/iife/bundle","format":"iife","platform":"browser","input":"src/human.ts","output":"dist/human.js","files":53,"inputBytes":3206344,"outputBytes":1607589}
2021-10-12 14:13:05 STATE: Compile: {"name":"human/browser/esm/bundle","format":"esm","platform":"browser","input":"src/human.ts","output":"dist/human.esm.js","files":53,"inputBytes":3206344,"outputBytes":2921701}
2021-10-12 14:13:23 STATE: Typings: {"input":"src/human.ts","output":"types","files":6}
2021-10-12 14:13:29 STATE: TypeDoc: {"input":"src/human.ts","output":"typedoc","objects":35,"generated":true}
2021-10-12 14:14:01 STATE: Lint: {"locations":["*.json","src/**/*.ts","test/**/*.js","demo/**/*.js"],"files":87,"errors":0,"warnings":0}
2021-10-12 14:14:01 STATE: ChangeLog: {"repository":"https://github.com/vladmandic/human","branch":"main","output":"CHANGELOG.md"}
2021-10-12 14:14:01 INFO:  Done...
2021-10-13 08:02:50 INFO:  @vladmandic/human version 2.3.3
2021-10-13 08:02:50 INFO:  User: vlado Platform: linux Arch: x64 Node: v16.10.0
2021-10-13 08:02:50 INFO:  Application: {"name":"@vladmandic/human","version":"2.3.3"}
2021-10-13 08:02:50 INFO:  Environment: {"profile":"production","config":"build.json","tsconfig":true,"eslintrc":true,"git":true}
2021-10-13 08:02:50 INFO:  Toolchain: {"build":"0.6.0","esbuild":"0.13.5","typescript":"4.4.4","typedoc":"0.22.5","eslint":"8.0.0"}
2021-10-13 08:02:50 INFO:  Build: {"profile":"production","steps":["clean","compile","typings","typedoc","lint","changelog"]}
2021-10-13 08:02:50 STATE: Clean: {"locations":["dist/*","types/*","typedoc/*"]}
2021-10-13 08:02:50 STATE: Compile: {"name":"tfjs/nodejs/cpu","format":"cjs","platform":"node","input":"tfjs/tf-node.ts","output":"dist/tfjs.esm.js","files":1,"inputBytes":102,"outputBytes":1275}
2021-10-13 08:02:50 STATE: Compile: {"name":"human/nodejs/cpu","format":"cjs","platform":"node","input":"src/human.ts","output":"dist/human.node.js","files":53,"inputBytes":515633,"outputBytes":428192}
2021-10-13 08:02:50 STATE: Compile: {"name":"tfjs/nodejs/gpu","format":"cjs","platform":"node","input":"tfjs/tf-node-gpu.ts","output":"dist/tfjs.esm.js","files":1,"inputBytes":110,"outputBytes":1283}
2021-10-13 08:02:50 STATE: Compile: {"name":"human/nodejs/gpu","format":"cjs","platform":"node","input":"src/human.ts","output":"dist/human.node-gpu.js","files":53,"inputBytes":515641,"outputBytes":428196}
2021-10-13 08:02:50 STATE: Compile: {"name":"tfjs/nodejs/wasm","format":"cjs","platform":"node","input":"tfjs/tf-node-wasm.ts","output":"dist/tfjs.esm.js","files":1,"inputBytes":149,"outputBytes":1350}
2021-10-13 08:02:50 STATE: Compile: {"name":"human/nodejs/wasm","format":"cjs","platform":"node","input":"src/human.ts","output":"dist/human.node-wasm.js","files":53,"inputBytes":515708,"outputBytes":428268}
2021-10-13 08:02:50 STATE: Compile: {"name":"tfjs/browser/version","format":"esm","platform":"browser","input":"tfjs/tf-version.ts","output":"dist/tfjs.version.js","files":1,"inputBytes":1063,"outputBytes":1631}
2021-10-13 08:02:50 STATE: Compile: {"name":"tfjs/browser/esm/nobundle","format":"esm","platform":"browser","input":"tfjs/tf-browser.ts","output":"dist/tfjs.esm.js","files":2,"inputBytes":3085,"outputBytes":856}
2021-10-13 08:02:50 STATE: Compile: {"name":"human/browser/esm/nobundle","format":"esm","platform":"browser","input":"src/human.ts","output":"dist/human.esm-nobundle.js","files":53,"inputBytes":515214,"outputBytes":429801}
2021-10-13 08:02:51 STATE: Compile: {"name":"tfjs/browser/esm/bundle","format":"esm","platform":"browser","input":"tfjs/tf-browser.ts","output":"dist/tfjs.esm.js","files":8,"inputBytes":3085,"outputBytes":2691961}
2021-10-13 08:02:52 STATE: Compile: {"name":"human/browser/iife/bundle","format":"iife","platform":"browser","input":"src/human.ts","output":"dist/human.js","files":53,"inputBytes":3206319,"outputBytes":1607589}
2021-10-13 08:02:52 STATE: Compile: {"name":"human/browser/esm/bundle","format":"esm","platform":"browser","input":"src/human.ts","output":"dist/human.esm.js","files":53,"inputBytes":3206319,"outputBytes":2921701}
2021-10-13 08:03:10 STATE: Typings: {"input":"src/human.ts","output":"types","files":6}
2021-10-13 08:03:17 STATE: TypeDoc: {"input":"src/human.ts","output":"typedoc","objects":35,"generated":true}
2021-10-13 08:03:46 STATE: Lint: {"locations":["*.json","src/**/*.ts","test/**/*.js","demo/**/*.js"],"files":89,"errors":0,"warnings":0}
2021-10-13 08:03:47 STATE: ChangeLog: {"repository":"https://github.com/vladmandic/human","branch":"main","output":"CHANGELOG.md"}
2021-10-13 08:03:47 INFO:  Done...

File diff suppressed because it is too large Load Diff

2
wiki

@ -1 +1 @@
Subproject commit 89065def6080803129c020a2c824f6bdb0db9851
Subproject commit e3e6effa32ae988716728d870b06be0da83951ec