added node-multiprocess demo

pull/134/head
Vladimir Mandic 2021-04-16 08:34:16 -04:00
parent 76d0111e09
commit 39c11b381f
4 changed files with 161 additions and 2 deletions

View File

@ -14,7 +14,7 @@ Procedure for contributing:
`npm run lint`
- Test your changes in Browser and NodeJS
`npm run dev` and naviate to https://localhost:10031
`node demo/node.js`
`node test/test-node.js`
- Push changes to your fork
Exclude files in `/dist', '/types', '/typedoc' from the commit as they are dynamically generated during build
- Submit a PR (pull request)

View File

@ -0,0 +1,76 @@
// @ts-nocheck
const fs = require('fs');
// eslint-disable-next-line import/no-extraneous-dependencies, node/no-unpublished-require
const log = require('@vladmandic/pilogger');
// workers actual import tfjs and faceapi modules
// eslint-disable-next-line import/no-extraneous-dependencies, node/no-unpublished-require
const tf = require('@tensorflow/tfjs-node');
const Human = require('../dist/human.node.js').default; // or const Human = require('../dist/human.node-gpu.js').default;
let human = null;
const myConfig = {
backend: 'tensorflow',
modelBasePath: 'file://models/',
debug: false,
videoOptimized: false,
async: false,
face: {
enabled: true,
detector: { enabled: true, rotation: false },
mesh: { enabled: true },
iris: { enabled: false },
description: { enabled: true },
emotion: { enabled: true },
},
hand: {
enabled: false,
},
// body: { modelPath: 'efficientpose.json', enabled: true },
// body: { modelPath: 'blazepose.json', enabled: true },
body: { enabled: false },
object: { enabled: false },
};
// read image from a file and create tensor to be used by faceapi
// this way we don't need any monkey patches
// you can add any pre-proocessing here such as resizing, etc.
async function image(img) {
const buffer = fs.readFileSync(img);
const tensor = tf.tidy(() => tf.node.decodeImage(buffer).toFloat().expandDims());
return tensor;
}
// actual faceapi detection
async function detect(img) {
const tensor = await image(img);
const result = await human.detect(tensor);
process.send({ image: img, detected: result }); // send results back to main
process.send({ ready: true }); // send signal back to main that this worker is now idle and ready for next image
tensor.dispose();
}
async function main() {
// on worker start first initialize message handler so we don't miss any messages
process.on('message', (msg) => {
if (msg.exit) process.exit(); // if main told worker to exit
if (msg.test) process.send({ test: true });
if (msg.image) detect(msg.image); // if main told worker to process image
log.data('Worker received message:', process.pid, msg); // generic log
});
// wait until tf is ready
await tf.ready();
// create instance of human
human = new Human(myConfig);
// pre-load models
log.state('Worker: PID:', process.pid, `TensorFlow/JS ${human.tf.version_core} Human ${human.version} Backend: ${human.tf.getBackend()}`);
await human.load();
// now we're ready, so send message back to main that it knows it can use this worker
process.send({ ready: true });
}
main();

83
demo/node-multiprocess.js Normal file
View File

@ -0,0 +1,83 @@
// @ts-nocheck
const fs = require('fs');
const path = require('path');
// eslint-disable-next-line import/no-extraneous-dependencies, node/no-unpublished-require
const log = require('@vladmandic/pilogger'); // this is my simple logger with few extra features
const child_process = require('child_process');
// note that main process import faceapi or tfjs at all
const imgPathRoot = './demo'; // modify to include your sample images
const numWorkers = 4; // how many workers will be started
const workers = []; // this holds worker processes
const images = []; // this holds queue of enumerated images
const t = []; // timers
let numImages;
// trigered by main when worker sends ready message
// if image pool is empty, signal worker to exit otherwise dispatch image to worker and remove image from queue
async function detect(worker) {
if (!t[2]) t[2] = process.hrtime.bigint(); // first time do a timestamp so we can measure initial latency
if (images.length === numImages) worker.send({ test: true }); // for first image in queue just measure latency
if (images.length === 0) worker.send({ exit: true }); // nothing left in queue
else {
log.state('Main: dispatching to worker:', worker.pid);
worker.send({ image: images[0] });
images.shift();
}
}
// loop that waits for all workers to complete
function waitCompletion() {
const activeWorkers = workers.reduce((any, worker) => (any += worker.connected ? 1 : 0), 0);
if (activeWorkers > 0) setImmediate(() => waitCompletion());
else {
t[1] = process.hrtime.bigint();
log.info('Processed:', numImages, 'images in', 'total:', Math.trunc(parseInt(t[1] - t[0]) / 1000000), 'ms', 'working:', Math.trunc(parseInt(t[1] - t[2]) / 1000000), 'ms', 'average:', Math.trunc(parseInt(t[1] - t[2]) / numImages / 1000000), 'ms');
}
}
function measureLatency() {
t[3] = process.hrtime.bigint();
const latencyInitialization = Math.trunc(parseInt(t[2] - t[0]) / 1000 / 1000);
const latencyRoundTrip = Math.trunc(parseInt(t[3] - t[2]) / 1000 / 1000);
log.info('Latency: worker initializtion: ', latencyInitialization, 'message round trip:', latencyRoundTrip);
}
async function main() {
log.header();
log.info('FaceAPI multi-process test');
// enumerate all images into queue
const dir = fs.readdirSync(imgPathRoot);
for (const imgFile of dir) {
if (imgFile.toLocaleLowerCase().endsWith('.jpg')) images.push(path.join(imgPathRoot, imgFile));
}
numImages = images.length;
t[0] = process.hrtime.bigint();
// manage worker processes
for (let i = 0; i < numWorkers; i++) {
// create worker process
workers[i] = await child_process.fork('demo/node-multiprocess-worker.js', ['special']);
// parse message that worker process sends back to main
// if message is ready, dispatch next image in queue
// if message is processing result, just print how many faces were detected
// otherwise it's an unknown message
workers[i].on('message', (msg) => {
if (msg.ready) detect(workers[i]);
else if (msg.image) log.data('Main: worker finished:', workers[i].pid, 'detected faces:', msg.detected.face?.length);
else if (msg.test) measureLatency();
else log.data('Main: worker message:', workers[i].pid, msg);
});
// just log when worker exits
workers[i].on('exit', (msg) => log.state('Main: worker exit:', workers[i].pid, msg));
// just log which worker was started
log.state('Main: started worker:', workers[i].pid);
}
// wait for all workers to complete
waitCompletion();
}
main();

2
wiki

@ -1 +1 @@
Subproject commit 00a239fa51eda5b366f0e1d05d66fccf4edb0ce1
Subproject commit a84586b61f8cd89d7d7b8c16496de25a1bac8e32