mirror of https://github.com/vladmandic/human
added node-multiprocess demo
parent
774f649f5a
commit
3fc3bf4082
|
@ -14,7 +14,7 @@ Procedure for contributing:
|
|||
`npm run lint`
|
||||
- Test your changes in Browser and NodeJS
|
||||
`npm run dev` and naviate to https://localhost:10031
|
||||
`node demo/node.js`
|
||||
`node test/test-node.js`
|
||||
- Push changes to your fork
|
||||
Exclude files in `/dist', '/types', '/typedoc' from the commit as they are dynamically generated during build
|
||||
- Submit a PR (pull request)
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
// @ts-nocheck
|
||||
|
||||
const fs = require('fs');
|
||||
// eslint-disable-next-line import/no-extraneous-dependencies, node/no-unpublished-require
|
||||
const log = require('@vladmandic/pilogger');
|
||||
|
||||
// workers actual import tfjs and faceapi modules
|
||||
// eslint-disable-next-line import/no-extraneous-dependencies, node/no-unpublished-require
|
||||
const tf = require('@tensorflow/tfjs-node');
|
||||
const Human = require('../dist/human.node.js').default; // or const Human = require('../dist/human.node-gpu.js').default;
|
||||
|
||||
let human = null;
|
||||
|
||||
const myConfig = {
|
||||
backend: 'tensorflow',
|
||||
modelBasePath: 'file://models/',
|
||||
debug: false,
|
||||
videoOptimized: false,
|
||||
async: false,
|
||||
face: {
|
||||
enabled: true,
|
||||
detector: { enabled: true, rotation: false },
|
||||
mesh: { enabled: true },
|
||||
iris: { enabled: false },
|
||||
description: { enabled: true },
|
||||
emotion: { enabled: true },
|
||||
},
|
||||
hand: {
|
||||
enabled: false,
|
||||
},
|
||||
// body: { modelPath: 'efficientpose.json', enabled: true },
|
||||
// body: { modelPath: 'blazepose.json', enabled: true },
|
||||
body: { enabled: false },
|
||||
object: { enabled: false },
|
||||
};
|
||||
|
||||
// read image from a file and create tensor to be used by faceapi
|
||||
// this way we don't need any monkey patches
|
||||
// you can add any pre-proocessing here such as resizing, etc.
|
||||
async function image(img) {
|
||||
const buffer = fs.readFileSync(img);
|
||||
const tensor = tf.tidy(() => tf.node.decodeImage(buffer).toFloat().expandDims());
|
||||
return tensor;
|
||||
}
|
||||
|
||||
// actual faceapi detection
|
||||
async function detect(img) {
|
||||
const tensor = await image(img);
|
||||
const result = await human.detect(tensor);
|
||||
process.send({ image: img, detected: result }); // send results back to main
|
||||
process.send({ ready: true }); // send signal back to main that this worker is now idle and ready for next image
|
||||
tensor.dispose();
|
||||
}
|
||||
|
||||
async function main() {
|
||||
// on worker start first initialize message handler so we don't miss any messages
|
||||
process.on('message', (msg) => {
|
||||
if (msg.exit) process.exit(); // if main told worker to exit
|
||||
if (msg.test) process.send({ test: true });
|
||||
if (msg.image) detect(msg.image); // if main told worker to process image
|
||||
log.data('Worker received message:', process.pid, msg); // generic log
|
||||
});
|
||||
|
||||
// wait until tf is ready
|
||||
await tf.ready();
|
||||
// create instance of human
|
||||
human = new Human(myConfig);
|
||||
// pre-load models
|
||||
log.state('Worker: PID:', process.pid, `TensorFlow/JS ${human.tf.version_core} Human ${human.version} Backend: ${human.tf.getBackend()}`);
|
||||
await human.load();
|
||||
|
||||
// now we're ready, so send message back to main that it knows it can use this worker
|
||||
process.send({ ready: true });
|
||||
}
|
||||
|
||||
main();
|
|
@ -0,0 +1,83 @@
|
|||
// @ts-nocheck
|
||||
|
||||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
// eslint-disable-next-line import/no-extraneous-dependencies, node/no-unpublished-require
|
||||
const log = require('@vladmandic/pilogger'); // this is my simple logger with few extra features
|
||||
const child_process = require('child_process');
|
||||
// note that main process import faceapi or tfjs at all
|
||||
|
||||
const imgPathRoot = './demo'; // modify to include your sample images
|
||||
const numWorkers = 4; // how many workers will be started
|
||||
const workers = []; // this holds worker processes
|
||||
const images = []; // this holds queue of enumerated images
|
||||
const t = []; // timers
|
||||
let numImages;
|
||||
|
||||
// trigered by main when worker sends ready message
|
||||
// if image pool is empty, signal worker to exit otherwise dispatch image to worker and remove image from queue
|
||||
async function detect(worker) {
|
||||
if (!t[2]) t[2] = process.hrtime.bigint(); // first time do a timestamp so we can measure initial latency
|
||||
if (images.length === numImages) worker.send({ test: true }); // for first image in queue just measure latency
|
||||
if (images.length === 0) worker.send({ exit: true }); // nothing left in queue
|
||||
else {
|
||||
log.state('Main: dispatching to worker:', worker.pid);
|
||||
worker.send({ image: images[0] });
|
||||
images.shift();
|
||||
}
|
||||
}
|
||||
|
||||
// loop that waits for all workers to complete
|
||||
function waitCompletion() {
|
||||
const activeWorkers = workers.reduce((any, worker) => (any += worker.connected ? 1 : 0), 0);
|
||||
if (activeWorkers > 0) setImmediate(() => waitCompletion());
|
||||
else {
|
||||
t[1] = process.hrtime.bigint();
|
||||
log.info('Processed:', numImages, 'images in', 'total:', Math.trunc(parseInt(t[1] - t[0]) / 1000000), 'ms', 'working:', Math.trunc(parseInt(t[1] - t[2]) / 1000000), 'ms', 'average:', Math.trunc(parseInt(t[1] - t[2]) / numImages / 1000000), 'ms');
|
||||
}
|
||||
}
|
||||
|
||||
function measureLatency() {
|
||||
t[3] = process.hrtime.bigint();
|
||||
const latencyInitialization = Math.trunc(parseInt(t[2] - t[0]) / 1000 / 1000);
|
||||
const latencyRoundTrip = Math.trunc(parseInt(t[3] - t[2]) / 1000 / 1000);
|
||||
log.info('Latency: worker initializtion: ', latencyInitialization, 'message round trip:', latencyRoundTrip);
|
||||
}
|
||||
|
||||
async function main() {
|
||||
log.header();
|
||||
log.info('FaceAPI multi-process test');
|
||||
|
||||
// enumerate all images into queue
|
||||
const dir = fs.readdirSync(imgPathRoot);
|
||||
for (const imgFile of dir) {
|
||||
if (imgFile.toLocaleLowerCase().endsWith('.jpg')) images.push(path.join(imgPathRoot, imgFile));
|
||||
}
|
||||
numImages = images.length;
|
||||
|
||||
t[0] = process.hrtime.bigint();
|
||||
// manage worker processes
|
||||
for (let i = 0; i < numWorkers; i++) {
|
||||
// create worker process
|
||||
workers[i] = await child_process.fork('demo/node-multiprocess-worker.js', ['special']);
|
||||
// parse message that worker process sends back to main
|
||||
// if message is ready, dispatch next image in queue
|
||||
// if message is processing result, just print how many faces were detected
|
||||
// otherwise it's an unknown message
|
||||
workers[i].on('message', (msg) => {
|
||||
if (msg.ready) detect(workers[i]);
|
||||
else if (msg.image) log.data('Main: worker finished:', workers[i].pid, 'detected faces:', msg.detected.face?.length);
|
||||
else if (msg.test) measureLatency();
|
||||
else log.data('Main: worker message:', workers[i].pid, msg);
|
||||
});
|
||||
// just log when worker exits
|
||||
workers[i].on('exit', (msg) => log.state('Main: worker exit:', workers[i].pid, msg));
|
||||
// just log which worker was started
|
||||
log.state('Main: started worker:', workers[i].pid);
|
||||
}
|
||||
|
||||
// wait for all workers to complete
|
||||
waitCompletion();
|
||||
}
|
||||
|
||||
main();
|
2
wiki
2
wiki
|
@ -1 +1 @@
|
|||
Subproject commit 00a239fa51eda5b366f0e1d05d66fccf4edb0ce1
|
||||
Subproject commit a84586b61f8cd89d7d7b8c16496de25a1bac8e32
|
Loading…
Reference in New Issue