CSP (communicating sequential processes) is a formal language for describing patterns of interaction in concurrent systems. It's used in Go, Crystal, Clojure's core.async, and a couple of other places.
The idea is nothing so complicated, but it offers some interesting capabilities. Think about a channel that we can use to transfer messages. We attach publishers and subscribers. Nothing unusual. Every event-based system works like that. However, in CSP, those two groups are synchronized. Meaning that publishing is not possible until we have a subscriber that awaits the message.
Here is a simple implementation of such a channel:
function createChannel() {
const puts = [], takes = [];
return {
put: (data) => new Promise(resolvePut =>
takes.length > 0 ?
(takes.shift()(data), resolvePut()) :
(puts.push(() => resolvePut()))
),
take: () => new Promise(resolveTake =>
puts.length > 0 ?
resolveTake(puts.shift()()) :
takes.push(resolveTake)
)
}
}
The puts
array represents the messages that we want to send. takes
contains the subscribers that wait for those messages. Notice how we first check whether there are consumers on the other side in each of the two methods (put
and take
). Again, we can't put something on the channel if there is no one to take it. And we can't take it if there is nothing in the channel.
Here is one possible use case:
async function A() {
console.log('Waiting for values');
console.log(`Receiving ${await channel.take()}`);
console.log(`Receiving ${await channel.take()}`);
}
async function B() {
console.log('Sending "foo"');
await channel.put('foo');
console.log('Sending "bar"');
await channel.put('bar');
console.log('Messaging over.');
}
A(); B();
// Waiting for values
// Sending "foo"
// Receiving foo
// Sending "bar"
// Receiving bar
// Messaging over.
We execute the two functions simultaneously, but on every line, they wait for each other.