Child and parent communication

First off, I am going to start to say: this helped me wrap my head around it a bit more.

raw code

server.ts

import { Worker, MessageChannel } from 'worker_threads';
 
const worker = new Worker('./worker.js', {
    workerData: {
        path: './worker.ts'
    }
});
 
worker.on('message', (result) => {
    console.log('Result:', result);
 
    // wait a second
    setTimeout(() => {
        worker.postMessage({ value: Math.floor(result * 2 / 3) + 0.5 });
    }, 750);
});
 
worker.postMessage({ value: 1 })

worker.js

const path = require('path');
const { workerData } = require('worker_threads');
 
require('ts-node').register();
require(path.resolve(__dirname, workerData.path));

worker.ts

import { parentPort } from 'worker_threads';
import Data from './Data';
 
if (!parentPort) {
    throw new Error('No parentPort');
}
console.log('Worker started');
 
parentPort.on('message', (data: Data) => {
    console.log('Received data:', data);
    if (!parentPort) {
        throw new Error('No parentPort');
    }
    parentPort.postMessage(data.value * 2);
});

Data.ts

import { MessagePort } from 'worker_threads';
 
export default interface Data {
    port: MessagePort;
    value: number;
}

explenation

As seen above, we need 3 files. server.ts is the parent, he starts the other thread. server.ts calls worker.js who calls worker.ts. Data.ts could theoretically be merged with any of the typescript files. The reason why we need worker.js is because server.ts can’t import worker.ts directly.

This code in the server.ts initiates the worker.

import { Worker, MessageChannel } from 'worker_threads';
 
const worker = new Worker('./worker.js', {
    workerData: {
        path: './worker.ts'
    }
});

worker.js then Initiates workerData.path which is worker.ts’, because we set it to use that path.

then worker.ts checks if it has a parent port:

import { parentPort } from 'worker_threads';
import Data from './Data';
 
if (!parentPort) {
    throw new Error('No parentPort');
}

If worker.ts does have a parent port, it will listen to it, and execute the function once it receives a message.

parentPort.on('message', (data: Data) => {
...
});

Then we go back to server.ts, there we also start to listen to our worker for messages:

worker.on('message', (result) => {
...
});

Then we send a message to our worker:

worker.postMessage({ value: 1 })

Which executes the following function in our worker.ts:

parentPort.on('message', (data: Data) => {
    console.log('Received data:', data);
    if (!parentPort) {
        throw new Error('No parentPort');
    }
    parentPort.postMessage(data.value * 2);
});

This function sends a message back to our server.ts, which starts a loop of messages between our worker.ts and our server.ts:

worker.on('message', (result) => {
    console.log('Result:', result);
 
    // wait a second
    setTimeout(() => {
        worker.postMessage({ value: Math.floor(result * 2 / 3) + 0.5 });
    }, 1000);
});

The reason we are waiting a bit is so that you can clearly see things happening in order. But it is not needed.

child and child communication

Now we have explained how a parent can talk to a child thread, but what if 2 threads need to communicate without it being the child<→parent combo.

code

server.ts

import { Worker, MessageChannel } from 'worker_threads';
 
const { port1, port2 } = new MessageChannel();
 
const worker1 = new Worker('./worker.js', {
    workerData: {
        path: './worker.ts'
    }
});
const worker2 = new Worker('./worker.js', {
    workerData: {
        path: './worker.ts'
    }
});
 
worker1.on('message', (result) => {});
worker2.on('message', (result) => {});
 
worker1.postMessage({ port: port1, name: "Henk" }, [port1]);
worker2.postMessage({ port: port2, name: "Mara" }, [port2]);
worker1.postMessage({ value: 'Send Message' });

worker.js

const path = require('path');
const { workerData } = require('worker_threads');
 
require('ts-node').register();
require(path.resolve(__dirname, workerData.path));

worker.ts

import { parentPort } from 'worker_threads';
import Data from './Data';
 
var port: any = undefined;
var name: string = '';
 
if (!parentPort) {
    throw new Error('No parentPort');
}
console.log('Worker started');
 
parentPort.on('message', (data: Data) => {
    if (data.value == 'Send Message' && port) {
        port.postMessage({ value: 0, name: name });
        return;
    }
    port = data.port;
    name = data.name;
    console.log('Worker ' + name + ' received message from parent');
 
    port.on('message', (message: any) => {
        console.log('Worker ' + name + ' received message from ' + message.name + ':', message.value);
 
        setTimeout(() => {
            port.postMessage({ value: message.value + 1, name: data.name });
        }, 500);
    });
});

Data.ts

import { MessagePort } from 'worker_threads';
 
export default interface Data {
    port: MessagePort;
    value: any;
    name: string;
}

The theory

The structure is unchanged. We don’t need any new files. It also still uses the following system:

As seen above, we need 3 files. server.ts is the parent, he starts the other thread. server.ts calls worker.js who calls worker.ts. Data.ts could theoretically be merged with any of the typescript files. The reason why we need worker.js is because server.ts can’t import worker.ts directly.

Link to original

You might think: Huh? But this was about 2 threads talking without a parent? We still need a parent to initiate the 2 threads, and create a message tunnel for the 2 threads. I am first going to explain the theory behind it.

When we initiate the threads, the parent has a 2 way connection with the threads. This is where The parent-child communication example takes place.

graph LR   
Parent <--> Thread1   
Parent <--> Thread2

But what we need is to create a link between Thread1 and Thread2. The only way to do this is for the parent to link them together.

There are 2 approaches for this:

  1. Lend my tunnel In this approach we give our connection with Thread2 to Thread1, but this makes it so that the parent can’t communicate with Thread2 anymore, at least until Thread1 gives it back:
graph LR   
Parent <--> Thread1   
Thread1 <--> Thread2
  1. New tunnel In this approach (Which I think is better) the parent makes a new 2-way connection. And then gives 1 end of that tunnel to Thread1, and the other end of that tunnel to Thread2:
graph LR   
Parent <--> Thread1   
Thread1 <--> Thread2
Thread1 <--> Thread2

This is the approach which I am going to explain.

Explaining the code.

This code in the server.ts initiates the workers.

import { Worker, MessageChannel } from 'worker_threads';
 
const worker1 = new Worker('./worker.js', {
    workerData: {
        path: './worker.ts'
    }
});
const worker2 = new Worker('./worker.js', {
    workerData: {
        path: './worker.ts'
    }
});

worker.js then Initiates workerData.path which is worker.ts’, because we set it to use that path.

then worker.ts checks if it has a parent port, and it defines 2 variables.

import { parentPort } from 'worker_threads';
import Data from './Data';
 
var masterPort: any = undefined;
var name: string = '';
 
if (!parentPort) {
    throw new Error('No parentPort');
}

If worker.ts does have a parent port, it will listen to it, and execute the function once it receives a message.

parentPort.on('message', (data: Data) => {
...
});

Then we go back to our server.ts, here we listen to our threads. But in our case we do nothing when we receive a message:

worker1.on('message', (result) => { });
worker2.on('message', (result) => { });

Then we are going to send a message to our workers. But first I’ll show you the messageChannel we made:

const { port1, port2 } = new MessageChannel();

And the server.ts gives both our workers a port:

worker1.postMessage({ port: port1, name: "Henk" }, [port1]);
worker2.postMessage({ port: port2, name: "Mara" }, [port2]);

Which will activate the following function in our workers:

parentPort.on('message', (data: Data) => {
    if (data.value == 'Send Message' && port) {
        port.postMessage({ value: 0, name: name });
        return;
    }
    port = data.port;
    name = data.name;
    console.log('Worker ' + name + ' received message from parent');
 
    port.on('message', (message: any) => {
        console.log('Worker ' + name + ' received message from ' + message.name + ':', message.value);
 
        setTimeout(() => {
            port.postMessage({ value: message.value + 1, name: data.name });
        }, 500);
    });
});

This function does the following:

  1. Check if it has a specific message && if we already received a port, which is not true.
if (data.value == 'Send Message' && port) {
...
}
  1. Set the name and port of the worker, so that it knows who to talk to and how to define himself. (You won’t need the name identifier if you use a different file for every thread, but since we use the same file multiple times, we do want some sort of identifier.)
port = data.port;
name = data.name;
  1. Then we start to listen to the port we just received from our parent. So that if the other thread sends us something, we can do something with it.
port.on('message', (message: any) => {
...
});

Now that the workers are done setting this up, we go back to the server.ts where we run the initiating message. We only send this to one of the threads, but theoretically we can do it to both.

worker1.postMessage({ value: 'Send Message' });

Running this activates the parentPort.on('message', (data: Data) => { function again in our worker. But this time we do enter the if statement because we have 'Send Message' as value, and we have a port defined.

parentPort.on('message', (data: Data) => {
    if (data.value == 'Send Message' && port) {
        port.postMessage({ value: 0, name: name });
        return;
    }
    ...
});

This makes our worker send a message to the port, which is connected to the other worker his port. Which will activate the following function:

port.on('message', (message: any) => {
        console.log('Worker ' + name + ' received message from ' + 
        message.name + ':', message.value);
 
        setTimeout(() => {
            port.postMessage({ value: message.value + 1, name: data.name });
        }, 500);
    });

This function will send the message back through the port causing an endless loop between our 2 workers.

notes

instead of giving data in the first call:

port = data.port;
name = data.name;

You can also give more data in it’s creation:

const worker = new Worker('./src/tenantWorkers/worker/worker.js', {
	workerData: {
		path: './worker.ts',
		dbUrl: allTenants[index].connectionString
	}
});

And then your worker can immediately access it through the following:

import { parentPort, workerData } from 'worker_threads';
console.log(workerData)