From 4b0bb6aa506280b117291f84e973ce5cb237b84a Mon Sep 17 00:00:00 2001 From: Vladimir Mandic Date: Wed, 13 Oct 2021 08:06:11 -0400 Subject: [PATCH] add node-match advanced example using worker thread pool --- CHANGELOG.md | 1 + TODO.md | 1 + demo/facematch/node-match-worker.js | 69 +++++++++++ demo/facematch/node-match.js | 174 ++++++++++++++++++++++++++++ demo/facematch/node-match.md | 60 ++++++++++ package.json | 16 ++- wiki | 2 +- 7 files changed, 313 insertions(+), 10 deletions(-) create mode 100644 demo/facematch/node-match-worker.js create mode 100644 demo/facematch/node-match.js create mode 100644 demo/facematch/node-match.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 57799137..65e2a5c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ ### **HEAD -> main** 2021/10/12 mandic00@live.com +- optimize image preprocessing ### **release: 2.3.2** 2021/10/11 mandic00@live.com diff --git a/TODO.md b/TODO.md index ad31cff3..1329df03 100644 --- a/TODO.md +++ b/TODO.md @@ -30,6 +30,7 @@ Experimental support only until support is officially added in Chromium - Optical Flow: - TFLite Models: +- Histogram Equalization: Regular, Adaptive, Contrast Limited


diff --git a/demo/facematch/node-match-worker.js b/demo/facematch/node-match-worker.js new file mode 100644 index 00000000..d0be27ba --- /dev/null +++ b/demo/facematch/node-match-worker.js @@ -0,0 +1,69 @@ +const threads = require('worker_threads'); + +let debug = false; + +/** @type SharedArrayBuffer */ +let buffer; +/** @type Float32Array */ +let view; +let threshold = 0; +let records = 0; + +const descLength = 1024; // descriptor length in bytes + +function distance(descriptor1, index, options = { order: 2 }) { + let sum = 0; + for (let i = 0; i < descriptor1.length; i++) { + const diff = (options.order === 2) ? (descriptor1[i] - view[index * descLength + i]) : (Math.abs(descriptor1[i] - view[index * descLength + i])); + sum += (options.order === 2) ? (diff * diff) : (diff ** options.order); + } + return sum; +} + +function match(descriptor, options = { order: 2 }) { + let best = Number.MAX_SAFE_INTEGER; + let index = -1; + for (let i = 0; i < records; i++) { + const res = distance(descriptor, i, { order: options.order }); + if (res < best) { + best = res; + index = i; + } + if (best < threshold || best === 0) break; // short circuit + } + best = (options.order === 2) ? Math.sqrt(best) : best ** (1 / options.order); + return { index, distance: best, similarity: Math.max(0, 100 - best) / 100.0 }; +} + +threads.parentPort?.on('message', (msg) => { + if (typeof msg.descriptor !== 'undefined') { // actual work order to find a match + const t0 = performance.now(); + const result = match(msg.descriptor); + const t1 = performance.now(); + threads.parentPort?.postMessage({ request: msg.request, time: Math.trunc(t1 - t0), ...result }); + return; // short circuit + } + if (msg instanceof SharedArrayBuffer) { // called only once to receive reference to shared array buffer + buffer = msg; + view = new Float32Array(buffer); // initialize f64 view into buffer + if (debug) threads.parentPort?.postMessage(`buffer: ${buffer?.byteLength}`); + } + if (typeof msg.records !== 'undefined') { // recived every time when number of records changes + records = msg.records; + if (debug) threads.parentPort?.postMessage(`records: ${records}`); + } + if (typeof msg.debug !== 'undefined') { // set verbose logging + debug = msg.debug; + if (debug) threads.parentPort?.postMessage(`debug: ${debug}`); + } + if (typeof msg.threshold !== 'undefined') { // set minimum similarity threshold + threshold = msg.threshold; + if (debug) threads.parentPort?.postMessage(`threshold: ${threshold}`); + } + if (typeof msg.shutdown !== 'undefined') { // got message to close worker + if (debug) threads.parentPort?.postMessage('shutting down'); + process.exit(0); + } +}); + +if (debug) threads.parentPort?.postMessage('started'); diff --git a/demo/facematch/node-match.js b/demo/facematch/node-match.js new file mode 100644 index 00000000..58245f6b --- /dev/null +++ b/demo/facematch/node-match.js @@ -0,0 +1,174 @@ +const fs = require('fs'); +const path = require('path'); +const log = require('@vladmandic/pilogger'); +const threads = require('worker_threads'); + +// global optinos +const options = { + dbFile: './faces.json', // sample face db + dbMax: 10000, // maximum number of records to hold in memory + threadPoolSize: 6, // number of worker threads to create in thread pool + workerSrc: './node-match-worker.js', // code that executes in the worker thread + debug: false, // verbose messages + minThreshold: 0.9, // match returns first record that meets the similarity threshold, set to 0 to always scan all records + descLength: 1024, // descriptor length +}; + +// test options +const testOptions = { + dbFact: 100, // load db n times to fake huge size + maxJobs: 100, // exit after processing this many jobs + fuzDescriptors: true, // randomize descriptor content before match for harder jobs +}; + +// global data structures +const data = { + /** @type string[] */ + labels: [], // array of strings, length of array serves as overal number of records so has to be maintained carefully + /** @type SharedArrayBuffer | null */ + buffer: null, + /** @type Float32Array | null */ + view: null, + /** @type threads.Worker[] */ + workers: [], // holds instance of workers. worker can be null if exited + requestID: 0, // each request should increment this counter as its used for round robin assignment +}; + +let t0 = process.hrtime.bigint(); // used for perf counters + +const appendRecords = (labels, descriptors) => { + if (!data.view) return 0; + if (descriptors.length !== labels.length) { + log.error('append error:', { descriptors: descriptors.length, labels: labels.length }); + } + // if (options.debug) log.state('appending:', { descriptors: descriptors.length, labels: labels.length }); + for (let i = 0; i < descriptors.length; i++) { + for (let j = 0; j < descriptors[i].length; j++) { + data.view[data.labels.length * descriptors[i].length + j] = descriptors[i][j]; // add each descriptors element to buffer + } + data.labels.push(labels[i]); // finally add to labels + } + for (const worker of data.workers) { // inform all workers how many records we have + if (worker) worker.postMessage({ records: data.labels.length }); + } + return data.labels.length; +}; + +const getLabel = (index) => data.labels[index]; + +const getDescriptor = (index) => { + if (!data.view) return []; + const descriptor = []; + for (let i = 0; i < 1024; i++) descriptor.push(data.view[index * options.descLength + i]); + return descriptor; +}; + +const fuzDescriptor = (descriptor) => { + for (let i = 0; i < descriptor.length; i++) descriptor[i] += Math.random() - 0.5; + return descriptor; +}; + +const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +async function workersClose() { + const current = data.workers.filter((worker) => !!worker).length; + log.info('closing workers:', { poolSize: data.workers.length, activeWorkers: current }); + for (const worker of data.workers) { + if (worker) worker.postMessage({ shutdown: true }); // tell worker to exit + } + await delay(250); // wait a little for threads to exit on their own + const remaining = data.workers.filter((worker) => !!worker).length; + if (remaining > 0) { + log.info('terminating remaining workers:', { remaining: current, pool: data.workers.length }); + for (const worker of data.workers) { + if (worker) worker.terminate(); // if worker did not exit cleany terminate it + } + } +} + +const workerMessage = (index, msg) => { + if (msg.request) { + if (options.debug) log.data('message:', { worker: index, request: msg.request, time: msg.time, label: getLabel(msg.index), similarity: msg.similarity }); + if (msg.request >= testOptions.maxJobs) { + const t1 = process.hrtime.bigint(); + const elapsed = Math.round(Number(t1 - t0) / 1000 / 1000); + log.state({ matchJobsFinished: testOptions.maxJobs, totalTimeMs: elapsed, averageTimeMs: Math.round(100 * elapsed / testOptions.maxJobs) / 100 }); + workersClose(); + } + } else { + log.data('message:', { worker: index, msg }); + } +}; + +async function workerClose(id, code) { + const previous = data.workers.filter((worker) => !!worker).length; + delete data.workers[id]; + const current = data.workers.filter((worker) => !!worker).length; + if (options.debug) log.state('worker exit:', { id, code, previous, current }); +} + +async function workersStart(numWorkers) { + const previous = data.workers.filter((worker) => !!worker).length; + log.info('starting worker thread pool:', { totalWorkers: numWorkers, alreadyActive: previous }); + for (let i = 0; i < numWorkers; i++) { + if (!data.workers[i]) { // worker does not exist, so create it + const worker = new threads.Worker(path.join(__dirname, options.workerSrc)); + worker.on('message', (msg) => workerMessage(i, msg)); + worker.on('error', (err) => log.error('worker error:', { err })); + worker.on('exit', (code) => workerClose(i, code)); + worker.postMessage(data.buffer); // send buffer to worker + data.workers[i] = worker; + } + data.workers[i]?.postMessage({ records: data.labels.length, threshold: options.minThreshold, debug: options.debug }); // inform worker how many records there are + } + await delay(100); // just wait a bit for everything to settle down +} + +const match = (descriptor) => { + const available = data.workers.filter((worker) => !!worker).length; // find number of available workers + if (available > 0) data.workers[data.requestID % available].postMessage({ descriptor, request: data.requestID }); // round robin to first available worker + else log.error('no available workers'); +}; + +async function loadDB(count) { + const previous = data.labels.length; + if (!fs.existsSync(options.dbFile)) { + log.error('db file does not exist:', options.dbFile); + return; + } + t0 = process.hrtime.bigint(); + for (let i = 0; i < count; i++) { // test loop: load entire face db from array of objects n times into buffer + const db = JSON.parse(fs.readFileSync(options.dbFile).toString()); + const names = db.map((record) => record.name); + const descriptors = db.map((record) => record.embedding); + appendRecords(names, descriptors); + } + log.data('db loaded:', { existingRecords: previous, newRecords: data.labels.length }); +} + +async function createBuffer() { + data.buffer = new SharedArrayBuffer(4 * options.dbMax * options.descLength); // preallocate max number of records as sharedarraybuffers cannot grow + data.view = new Float32Array(data.buffer); // create view into buffer + data.labels.length = 0; + log.data('created shared buffer:', { maxDescriptors: data.view?.length / options.descLength, totalBytes: data.buffer.byteLength, totalElements: data.view?.length }); +} + +async function main() { + log.header(); + log.info('options:', options); + + await createBuffer(); // create shared buffer array + await loadDB(testOptions.dbFact); // loadDB is a test method that calls actual addRecords + await workersStart(options.threadPoolSize); // can be called at anytime to modify worker pool size + for (let i = 0; i < testOptions.maxJobs; i++) { + const idx = Math.trunc(data.labels.length * Math.random()); // grab a random descriptor index that we'll search for + const descriptor = getDescriptor(idx); // grab a descriptor at index + data.requestID++; // increase request id + if (testOptions.fuzDescriptors) match(fuzDescriptor(descriptor)); // fuz descriptor for harder match + else match(descriptor); + if (options.debug) log.info('submited job', data.requestID); // we already know what we're searching for so we can compare results + } + log.state('submitted:', { matchJobs: testOptions.maxJobs, poolSize: data.workers.length, activeWorkers: data.workers.filter((worker) => !!worker).length }); +} + +main(); diff --git a/demo/facematch/node-match.md b/demo/facematch/node-match.md new file mode 100644 index 00000000..4a669d08 --- /dev/null +++ b/demo/facematch/node-match.md @@ -0,0 +1,60 @@ +# NodeJS Multi-Threading Match Solution + +See `node-match.js` and `node-match-worker.js` + +## Methods and Properties in `node-match` + +- `createBuffer`: create shared buffer array + single copy of data regardless of number of workers + fixed size based on `options.dbMax` +- `appendRecord`: add additional batch of descriptors to buffer + can append batch of records to buffer at anytime + workers are informed of the new content after append has been completed +- `workersStart`: start or expand pool of `threadPoolSize` workers + each worker runs `node-match-worker` and listens for messages from main thread + can shutdown workers or create additional worker threads on-the-fly + safe against workers that exit +- `workersClose`: close workers in a pool + first request workers to exit then terminate after timeout +- `match`: dispach a match job to a worker + returns first match that satisfies `minThreshold` + assigment to workers using round-robin + since timing for each job is near-fixed and predictable +- `getDescriptor`: get descriptor array for a given id from a buffer +- `fuzDescriptor`: small randomize descriptor content for harder match +- `getLabel`: fetch label for resolved descriptor index +- `loadDB`: load face database from a JSON file `dbFile` + extracts descriptors and adds them to buffer + extracts labels and maintains them in main thread + for test purposes loads same database `dbFact` times to create a very large database + +`node-match` runs in a listens for messages from workers until `maxJobs` have been reached + +## Performance + +Linear performance decrease that depends on number of records in database +Non-linear performance that increases with number of worker threads due to communication overhead + +- Face dataase with 10k records: + > threadPoolSize: 1 => ~60 ms / match job + > threadPoolSize: 6 => ~25 ms / match job +- Face database with 50k records: + > threadPoolSize: 1 => ~300 ms / match job + > threadPoolSize: 6 => ~100 ms / match job +- Face database with 100k records: + > threadPoolSize: 1 => ~600 ms / match job + > threadPoolSize: 6 => ~200 ms / match job + +## Example + +> node node-match + +```js +2021-10-13 07:53:36 INFO: options: { dbFile: './faces.json', dbMax: 10000, threadPoolSize: 6, workerSrc: './node-match-worker.js', debug: false, minThreshold: 0.9, descLength: 1024 } +2021-10-13 07:53:36 DATA: created shared buffer: { maxDescriptors: 10000, totalBytes: 40960000, totalElements: 10240000 } +2021-10-13 07:53:36 DATA: db loaded: { existingRecords: 0, newRecords: 5700 } +2021-10-13 07:53:36 INFO: starting worker thread pool: { totalWorkers: 6, alreadyActive: 0 } +2021-10-13 07:53:36 STATE: submitted: { matchJobs: 100, poolSize: 6, activeWorkers: 6 } +2021-10-13 07:53:38 STATE: { matchJobsFinished: 100, totalTimeMs: 1769, averageTimeMs: 17.69 } +2021-10-13 07:53:38 INFO: closing workers: { poolSize: 6, activeWorkers: 6 } +``` diff --git a/package.json b/package.json index 5cc43da6..7ac29164 100644 --- a/package.json +++ b/package.json @@ -55,6 +55,7 @@ "tensorflow" ], "devDependencies": { + "@tensorflow/tfjs": "^3.9.0", "@tensorflow/tfjs-backend-cpu": "^3.9.0", "@tensorflow/tfjs-backend-wasm": "^3.9.0", "@tensorflow/tfjs-backend-webgl": "^3.9.0", @@ -63,30 +64,27 @@ "@tensorflow/tfjs-core": "^3.9.0", "@tensorflow/tfjs-data": "^3.9.0", "@tensorflow/tfjs-layers": "^3.9.0", - "@tensorflow/tfjs-node-gpu": "^3.9.0", "@tensorflow/tfjs-node": "^3.9.0", - "@tensorflow/tfjs": "^3.9.0", - "@types/node": "^16.10.4", + "@tensorflow/tfjs-node-gpu": "^3.9.0", + "@types/node": "^16.10.5", "@typescript-eslint/eslint-plugin": "^5.0.0", "@typescript-eslint/parser": "^5.0.0", "@vladmandic/build": "^0.6.0", "@vladmandic/pilogger": "^0.3.3", "canvas": "^2.8.0", "dayjs": "^1.10.7", - "esbuild": "^0.13.4", + "esbuild": "^0.13.5", + "eslint": "8.0.0", "eslint-config-airbnb-base": "^14.2.1", - "eslint-plugin-import": "^2.25.1", + "eslint-plugin-import": "^2.25.2", "eslint-plugin-json": "^3.1.0", "eslint-plugin-node": "^11.1.0", "eslint-plugin-promise": "^5.1.0", - "eslint": "8.0.0", "node-fetch": "^3.0.0", "rimraf": "^3.0.2", "seedrandom": "^3.0.5", "tslib": "^2.3.1", "typedoc": "0.22.5", - "typescript": "4.4.3" - }, - "dependencies": { + "typescript": "4.4.4" } } diff --git a/wiki b/wiki index 89065def..e3e6effa 160000 --- a/wiki +++ b/wiki @@ -1 +1 @@ -Subproject commit 89065def6080803129c020a2c824f6bdb0db9851 +Subproject commit e3e6effa32ae988716728d870b06be0da83951ec