Pulsar — to be or not to be? Real project experience

Introduction

Research

Figure 1 (Source: https://www.confluent.io/kafka-vs-pulsar/)
Figure 2 (Source: https://www.confluent.io/kafka-vs-pulsar/)

What is the Pulsar?

Terminology in the Pulsar environment

Figure 3 (Source: https://pulsar.apache.org/docs/ja/concepts-messaging/#subscriptions)

How to use the Pulsar on a Node.JS project?

npm init
brew install libpulsar
npm install pulsar-client
curl -o pulsar-io-elastic-search-2.6.1.nar http://mirror.linux-ia64.org/apache/pulsar/pulsar-2.6.1/connectors/pulsar-io-elastic-search-2.6.1.nar
# ./docker-compose.yml
version: “3”
volumes:
pulsardata:
pulsarconf:
elasticdata:
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.7.1
environment:
— discovery.type=single-node
— cluster.name=es-cluster
— node.name=es-node-1
— path.data=/usr/share/elasticsearch/data
— http.port=9200
— http.host=0.0.0.0
— transport.host=127.0.0.1
— bootstrap.memory_lock=true
— “ES_JAVA_OPTS=-Xms512m -Xmx512m”
ulimits:
memlock:
soft: -1
hard: -1
volumes:
— elasticdata:/usr/share/elasticsearch/data
ports:
— “9200:9200”
kibana:
image: “docker.elastic.co/kibana/kibana:7.7.1”
environment:
— server.port=127.0.0.1:5601
— elasticsearch.url=”http://elasticsearch:9200"
— server.name=”kibana”
ports:
— “5601:5601”
depends_on:
— elasticsearch
pulsar:
image: apachepulsar/pulsar:2.6.0
ports:
— 8080:8080
— 6650:6650
environment:
PULSAR_MEM: “ -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g”
volumes:
— pulsardata:/pulsar/data
— pulsarconf:/pulsar/conf
— ./connectors:/pulsar/connectors
command: >
/bin/bash -c
“bin/apply-config-from-env.py conf/standalone.conf
&& bin/pulsar standalone”
# … other parameters
docker-compose up
curl — header “Content-Type: application/json” \
— request DELETE \
http://localhost:8080/admin/v2/tenants/tenant-1
curl — header “Content-Type: application/json” \
— request PUT \
— data ‘{ “allowedClusters”: [“standalone”]}’ \
http://localhost:8080/admin/v2/tenants/tenant-1
curl — header “Content-Type: application/json” \
— request PUT \
— data ‘{}’ \
http://localhost:8080/admin/v2/namespaces/tenant-1/ns-1
curl — header “Content-Type: multipart/form-data” \
— request POST \
-F url=’file:///pulsar/connectors/pulsar-io-elastic-search-2.6.1.nar;type=text/plain’ \
-F sinkConfig=’{ “className”: “org.apache.pulsar.io.elasticsearch.ElasticSearchSink”, “archive”: “/pulsar/connectors/pulsar-io-elastic-search-2.6.1.nar”, “inputs”: [“persistent://tenant-1/ns-1/elastic-test”], “processingGuarantees”: “EFFECTIVELY_ONCE”, “parallelism”: 1, “configs”: {“elasticSearchUrl”: “http://elasticsearch:9200", “indexName”: “test_index” } };type=application/json’ \
http://localhost:8080/admin/v3/sinks/tenant-1/ns-1/elasticsearch
curl — request POST \
http://localhost:8080/admin/v3/sinks/tenant-1/ns-1/elasticsearch/start
curl — request POST \
http://localhost:8080/admin/v3/sinks/tenant-1/ns-1/elasticsearch/restart
const Pulsar = require(“pulsar-client”);(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: “pulsar://localhost:6650”,
});
// Create a producer
const producer = await client.createProducer({
topic: “persistent://tenant-1/ns-1/elastic-test”,
});
// Send messages
const dataset = [];
for (let i = 0; i < 10; i += 1) {
const id = Math.floor(Math.random() * 100);
const msg = `{ “test-pulsar”: ${id} }`;
dataset.push(msg);
producer.send({
data: Buffer.from(msg),
properties: { ACTION: “UPSERT”, ID: id },
});
console.log(`Sent message: ${msg}`);
}
await producer.flush();
await producer.close();
await client.close();
})();
const Pulsar = require(“pulsar-client”);(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: “pulsar://localhost:6650”,
});
const consumer = await client.subscribe({
topic: “persistent://tenant-1/ns-1/elastic-test”,
subscription: “my-subscription”,
});
const msg = await consumer.receive();
const str = msg.getData().toString();
console.log(“RECEIVED!!!!!!!!!!”, str);
consumer.acknowledge(msg);
await consumer.close();
await client.close();
})();
node ./test_pulsar_elastic_connection.js
node ./consumer.js
curl -s http://localhost:9200/your_topic/_refresh
curl -s http://localhost:9200/your_topic/_search

Issue

Take a look at some parts of the code

app.get(“/subscribe/:sessionId”, sse.middleware());
const HEADERS = {
“Content-Type”: “text/event-stream”,
Connection: “keep-alive”,
“Cache-Control”: “no-cache”
};
const subscriptions = new Map();const subscribe = (id, handler) => {
if (!subscriptions.has(id)) {
subscriptions.set(id, handler);
} else {
console.warn(“Trying to resubscribe with already created subscription”);
}
return {
unsubscribe: () => subscriptions.delete(id)
};
};
const sse = ({ onClose } = {}) => {
const middleware = (req, res) => {
res.writeHead(200, HEADERS);
res.write(`id: ${nanoid()}\n`);
res.write(“retry: 1\n”);
res.write(`data: ${JSON.stringify({ success: true })}\n\n`);
//… your logic
};
return middleware;
};
const sendEvents = (type, message) => {
for (const onMessage of subscriptions.values()) {
onMessage(type, message);
}
};
const newComment = message => {
sendEvents(pulsarTopics.NEW_COMMENT, message);
};
const updatedComment = message => {
sendEvents(pulsarTopics.UPDATED_COMMENT, message);
};
const deletedComment = message => {
sendEvents(pulsarTopics.DELETED_COMMENT, message);
};
//…
//…
export class SseService {
getEventSource(sessionId: string): EventSource {
return new EventSource(environment.serverUrl + `/subscribe/${sessionId}`);
}
}
getServerSentEvent() {
return new Observable(observer => {
let eventSource = this.sseService.getEventSource(this.currentSession);
setUp();
const self = this;
function setUp() {
eventSource.onmessage = event => {
self.zone.run(() => {
observer.next(event);
});
};
eventSource.addEventListener(‘new_comment’, (event: any) => {
self.zone.run(() => {
const objEvent = JSON.parse(event.data);
//… your logic
eventSource.addEventListener(‘updated_comment’, (event: any) => {
self.zone.run(() => {
const objEvent = JSON.parse(event.data);
//… your logic
eventSource.addEventListener(‘deleted_comment’, (event: any) => {
self.zone.run(() => {
const objEvent = JSON.parse(event.data);
//… your logic

Conclusions

High-level software engineers for powerful web solutions https://intspirit.com/