diff --git a/CONTRIBUTING b/CONTRIBUTING index c8412306..bf2d1010 100644 --- a/CONTRIBUTING +++ b/CONTRIBUTING @@ -14,7 +14,7 @@ Procedure for contributing: `npm run lint` - Test your changes in Browser and NodeJS `npm run dev` and naviate to https://localhost:10031 - `node demo/node.js` + `node test/test-node.js` - Push changes to your fork Exclude files in `/dist', '/types', '/typedoc' from the commit as they are dynamically generated during build - Submit a PR (pull request) diff --git a/demo/node-multiprocess-worker.js b/demo/node-multiprocess-worker.js new file mode 100644 index 00000000..4f434f98 --- /dev/null +++ b/demo/node-multiprocess-worker.js @@ -0,0 +1,76 @@ +// @ts-nocheck + +const fs = require('fs'); +// eslint-disable-next-line import/no-extraneous-dependencies, node/no-unpublished-require +const log = require('@vladmandic/pilogger'); + +// workers actual import tfjs and faceapi modules +// eslint-disable-next-line import/no-extraneous-dependencies, node/no-unpublished-require +const tf = require('@tensorflow/tfjs-node'); +const Human = require('../dist/human.node.js').default; // or const Human = require('../dist/human.node-gpu.js').default; + +let human = null; + +const myConfig = { + backend: 'tensorflow', + modelBasePath: 'file://models/', + debug: false, + videoOptimized: false, + async: false, + face: { + enabled: true, + detector: { enabled: true, rotation: false }, + mesh: { enabled: true }, + iris: { enabled: false }, + description: { enabled: true }, + emotion: { enabled: true }, + }, + hand: { + enabled: false, + }, + // body: { modelPath: 'efficientpose.json', enabled: true }, + // body: { modelPath: 'blazepose.json', enabled: true }, + body: { enabled: false }, + object: { enabled: false }, +}; + +// read image from a file and create tensor to be used by faceapi +// this way we don't need any monkey patches +// you can add any pre-proocessing here such as resizing, etc. +async function image(img) { + const buffer = fs.readFileSync(img); + const tensor = tf.tidy(() => tf.node.decodeImage(buffer).toFloat().expandDims()); + return tensor; +} + +// actual faceapi detection +async function detect(img) { + const tensor = await image(img); + const result = await human.detect(tensor); + process.send({ image: img, detected: result }); // send results back to main + process.send({ ready: true }); // send signal back to main that this worker is now idle and ready for next image + tensor.dispose(); +} + +async function main() { + // on worker start first initialize message handler so we don't miss any messages + process.on('message', (msg) => { + if (msg.exit) process.exit(); // if main told worker to exit + if (msg.test) process.send({ test: true }); + if (msg.image) detect(msg.image); // if main told worker to process image + log.data('Worker received message:', process.pid, msg); // generic log + }); + + // wait until tf is ready + await tf.ready(); + // create instance of human + human = new Human(myConfig); + // pre-load models + log.state('Worker: PID:', process.pid, `TensorFlow/JS ${human.tf.version_core} Human ${human.version} Backend: ${human.tf.getBackend()}`); + await human.load(); + + // now we're ready, so send message back to main that it knows it can use this worker + process.send({ ready: true }); +} + +main(); diff --git a/demo/node-multiprocess.js b/demo/node-multiprocess.js new file mode 100644 index 00000000..fd69ae87 --- /dev/null +++ b/demo/node-multiprocess.js @@ -0,0 +1,83 @@ +// @ts-nocheck + +const fs = require('fs'); +const path = require('path'); +// eslint-disable-next-line import/no-extraneous-dependencies, node/no-unpublished-require +const log = require('@vladmandic/pilogger'); // this is my simple logger with few extra features +const child_process = require('child_process'); +// note that main process import faceapi or tfjs at all + +const imgPathRoot = './demo'; // modify to include your sample images +const numWorkers = 4; // how many workers will be started +const workers = []; // this holds worker processes +const images = []; // this holds queue of enumerated images +const t = []; // timers +let numImages; + +// trigered by main when worker sends ready message +// if image pool is empty, signal worker to exit otherwise dispatch image to worker and remove image from queue +async function detect(worker) { + if (!t[2]) t[2] = process.hrtime.bigint(); // first time do a timestamp so we can measure initial latency + if (images.length === numImages) worker.send({ test: true }); // for first image in queue just measure latency + if (images.length === 0) worker.send({ exit: true }); // nothing left in queue + else { + log.state('Main: dispatching to worker:', worker.pid); + worker.send({ image: images[0] }); + images.shift(); + } +} + +// loop that waits for all workers to complete +function waitCompletion() { + const activeWorkers = workers.reduce((any, worker) => (any += worker.connected ? 1 : 0), 0); + if (activeWorkers > 0) setImmediate(() => waitCompletion()); + else { + t[1] = process.hrtime.bigint(); + log.info('Processed:', numImages, 'images in', 'total:', Math.trunc(parseInt(t[1] - t[0]) / 1000000), 'ms', 'working:', Math.trunc(parseInt(t[1] - t[2]) / 1000000), 'ms', 'average:', Math.trunc(parseInt(t[1] - t[2]) / numImages / 1000000), 'ms'); + } +} + +function measureLatency() { + t[3] = process.hrtime.bigint(); + const latencyInitialization = Math.trunc(parseInt(t[2] - t[0]) / 1000 / 1000); + const latencyRoundTrip = Math.trunc(parseInt(t[3] - t[2]) / 1000 / 1000); + log.info('Latency: worker initializtion: ', latencyInitialization, 'message round trip:', latencyRoundTrip); +} + +async function main() { + log.header(); + log.info('FaceAPI multi-process test'); + + // enumerate all images into queue + const dir = fs.readdirSync(imgPathRoot); + for (const imgFile of dir) { + if (imgFile.toLocaleLowerCase().endsWith('.jpg')) images.push(path.join(imgPathRoot, imgFile)); + } + numImages = images.length; + + t[0] = process.hrtime.bigint(); + // manage worker processes + for (let i = 0; i < numWorkers; i++) { + // create worker process + workers[i] = await child_process.fork('demo/node-multiprocess-worker.js', ['special']); + // parse message that worker process sends back to main + // if message is ready, dispatch next image in queue + // if message is processing result, just print how many faces were detected + // otherwise it's an unknown message + workers[i].on('message', (msg) => { + if (msg.ready) detect(workers[i]); + else if (msg.image) log.data('Main: worker finished:', workers[i].pid, 'detected faces:', msg.detected.face?.length); + else if (msg.test) measureLatency(); + else log.data('Main: worker message:', workers[i].pid, msg); + }); + // just log when worker exits + workers[i].on('exit', (msg) => log.state('Main: worker exit:', workers[i].pid, msg)); + // just log which worker was started + log.state('Main: started worker:', workers[i].pid); + } + + // wait for all workers to complete + waitCompletion(); +} + +main(); diff --git a/wiki b/wiki index 00a239fa..a84586b6 160000 --- a/wiki +++ b/wiki @@ -1 +1 @@ -Subproject commit 00a239fa51eda5b366f0e1d05d66fccf4edb0ce1 +Subproject commit a84586b61f8cd89d7d7b8c16496de25a1bac8e32