Pulsar — to be or not to be? Real project experience

Introduction

On the other hand, we could use fallbacks that notify a user that a message can’t be sent because of a bad connection, but users usually ignore such notifications. If our devs do take an extra care of the entire messaging in client applications, the team should use a message broker.

To clarify — message broker is a software that enables applications, systems, and services to communicate with each other and exchange information in a very prompt manner and in a long run. To provide reliable message storage and guaranteed delivery, message brokers often rely on a substructure or component called a message queue that stores and orders the messages until the consuming applications can process them.

In a message queue, messages are stored in the exact order in which they were transmitted and remain in the queue until receipt is confirmed. So, we’ve got a huge problem of choice between existing queue solutions such as RabbitMQ, Kafka, Pulsar and others.

The final decision of stack included the following technologies (with tips about not common ones):

  • Docker,
  • Node.JS,
  • ElasticSearch (ES, a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents),
  • Kibana (perfect GUI for ES),
  • Pulsar (is a multimedia broker of messages).

Research

Bunch of different comparisons are available on the internet, but none states a cool implementation that will solve our main “pains”. Let’s provide some graphics below from this source:

Figure 1 (Source: https://www.confluent.io/kafka-vs-pulsar/)
Figure 2 (Source: https://www.confluent.io/kafka-vs-pulsar/)

The total primacy of Pulsar latency in front of other ones (Figure 1) and taking second place related to throughput (Figure 2). Furthermore, consumers in Kafka can not accept a message from another flow and don’t have the ability to use multi-user structures (multi-tenancy — security, isolation).

Multi-Tenancy: Pulsar’s decoupled architecture reduces architectural complexity and puts millions of topics into a single cluster. Pulsar’s distributed storage system design is segment-based. This hierarchical topic namespace enables Pulsar users to maintain millions of topics in a single cluster.

Pulsar can be more scalable and has a geo-replication.

So, according to the research our team decided to move with Apache Pulsar.

What is the Pulsar?

Hint: the model of message exchanging is flexible. It unites both concepts: queue management and producer/subscriber concept which includes an ability to choose the approach of using messages under the same subscription. In our case that includes add/update/delete functions for chatting that is so good.

Terminology in the Pulsar environment

  • producer — a part that is an emitter of messages
  • client — a Pulsar client as an entry point of connection to this url: pulsar://your_server:6650
  • consumer — a part that will be listening to a topic and receive a message

For more details of terms related to Apache Pulsar pop into this:

Figure 3 (Source: https://pulsar.apache.org/docs/ja/concepts-messaging/#subscriptions)

There are 4 subscription types (following Figure 3):

  • exclusive — only a single consumer is allowed to attach to the subscription,
  • shared — messages are delivered in a round-robin distribution across consumers, and any given message is delivered to only one consumer,
  • failover — a master consumer is picked for non-partitioned topic or each partition of a partitioned topic and receives messages
  • key_shared — messages are delivered in distribution across consumers and messages with the same key or same ordering key are delivered to only one consumer.

Apache Pulsar is a relatively fresh technology that will pull us in the risk and adventure of how to deep learn and use it. Find out here what’s the size of user community, how documentation is made, and learning materials are provided. It could bring us to the obstacles of how to deal with it. It’s part of our nature to try such new things. The team admits that it wasn’t clear and convenient for the first touch. And complex as well.

To have it working any developer had to read the sources and only after that our team was able to start working with Pulsar, the documentation was not enough at that time. Also, we’ve noticed that Pulsar documentation has a typo sometimes (that’s bad actually as just cannot follow it blindly!) which can’t be seen at the first glance because of trust when someone copy-paste their examples.

How to use the Pulsar on a Node.JS project?

  1. Initialize the npm project:
npm init

2. Install the Pulsar library. Pulsar developers produced the library only for these platforms Linux and macOS.

For macOS:

brew install libpulsar

For Linux:

a. Install 2 requirement packages into your Linux OS. For more details read here. If it doesn’t work. Please use this tutorial

b. Install pulsar-client:

npm install pulsar-client

3. Connect Pulsar to ElasticSearch (ES). There are 2 ways to save data to ElasticSearch:

a. Using ElasticSearch connector

b. Manually by using @elastic/elasticsearch library

The team decided to connect Pulsar to ElasticSearch through a connector. All connectors are available to download here. Details about all connectors can check here.

  1. Download ElasticSearch sink connector by using CURL:
curl -o pulsar-io-elastic-search-2.6.1.nar http://mirror.linux-ia64.org/apache/pulsar/pulsar-2.6.1/connectors/pulsar-io-elastic-search-2.6.1.nar

and save this file to the “connectors” folder

2. Then create the docker-compose.yml file with ES, Kibana, Pulsar with the following configuration:

# ./docker-compose.yml
version: “3”
volumes:
pulsardata:
pulsarconf:
elasticdata:
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.7.1
environment:
— discovery.type=single-node
— cluster.name=es-cluster
— node.name=es-node-1
— path.data=/usr/share/elasticsearch/data
— http.port=9200
— http.host=0.0.0.0
— transport.host=127.0.0.1
— bootstrap.memory_lock=true
— “ES_JAVA_OPTS=-Xms512m -Xmx512m”
ulimits:
memlock:
soft: -1
hard: -1
volumes:
— elasticdata:/usr/share/elasticsearch/data
ports:
— “9200:9200”
kibana:
image: “docker.elastic.co/kibana/kibana:7.7.1”
environment:
— server.port=127.0.0.1:5601
— elasticsearch.url=”http://elasticsearch:9200"
— server.name=”kibana”
ports:
— “5601:5601”
depends_on:
— elasticsearch
pulsar:
image: apachepulsar/pulsar:2.6.0
ports:
— 8080:8080
— 6650:6650
environment:
PULSAR_MEM: “ -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g”
volumes:
— pulsardata:/pulsar/data
— pulsarconf:/pulsar/conf
— ./connectors:/pulsar/connectors
command: >
/bin/bash -c
“bin/apply-config-from-env.py conf/standalone.conf
&& bin/pulsar standalone”
# … other parameters

3. Run the project:

docker-compose up

Note: there is only one thing that can irritate you — it takes about one or two minutes to run Pulsar on your local machine when you start docker-compose.

4. Create init.sh file with the following code:

curl — header “Content-Type: application/json” \
— request DELETE \
http://localhost:8080/admin/v2/tenants/tenant-1
curl — header “Content-Type: application/json” \
— request PUT \
— data ‘{ “allowedClusters”: [“standalone”]}’ \
http://localhost:8080/admin/v2/tenants/tenant-1
curl — header “Content-Type: application/json” \
— request PUT \
— data ‘{}’ \
http://localhost:8080/admin/v2/namespaces/tenant-1/ns-1
curl — header “Content-Type: multipart/form-data” \
— request POST \
-F url=’file:///pulsar/connectors/pulsar-io-elastic-search-2.6.1.nar;type=text/plain’ \
-F sinkConfig=’{ “className”: “org.apache.pulsar.io.elasticsearch.ElasticSearchSink”, “archive”: “/pulsar/connectors/pulsar-io-elastic-search-2.6.1.nar”, “inputs”: [“persistent://tenant-1/ns-1/elastic-test”], “processingGuarantees”: “EFFECTIVELY_ONCE”, “parallelism”: 1, “configs”: {“elasticSearchUrl”: “http://elasticsearch:9200", “indexName”: “test_index” } };type=application/json’ \
http://localhost:8080/admin/v3/sinks/tenant-1/ns-1/elasticsearch
curl — request POST \
http://localhost:8080/admin/v3/sinks/tenant-1/ns-1/elasticsearch/start
curl — request POST \
http://localhost:8080/admin/v3/sinks/tenant-1/ns-1/elasticsearch/restart

5. When Pulsar is ready, run init.sh for turning on a connection to ES. For ES Pulsar connector has actions such as CREATE and READ. ES was developed for full-text searching so it was enough to use it just for saving logging data. It gives a fast searching in this case.

6. Then create your client for connecting to Pulsar and the producer for sending a message. Here is a small test example:

test_pulsar_elastic_connection.js

const Pulsar = require(“pulsar-client”);(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: “pulsar://localhost:6650”,
});
// Create a producer
const producer = await client.createProducer({
topic: “persistent://tenant-1/ns-1/elastic-test”,
});
// Send messages
const dataset = [];
for (let i = 0; i < 10; i += 1) {
const id = Math.floor(Math.random() * 100);
const msg = `{ “test-pulsar”: ${id} }`;
dataset.push(msg);
producer.send({
data: Buffer.from(msg),
properties: { ACTION: “UPSERT”, ID: id },
});
console.log(`Sent message: ${msg}`);
}
await producer.flush();
await producer.close();
await client.close();
})();

consumer.js

const Pulsar = require(“pulsar-client”);(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: “pulsar://localhost:6650”,
});
const consumer = await client.subscribe({
topic: “persistent://tenant-1/ns-1/elastic-test”,
subscription: “my-subscription”,
});
const msg = await consumer.receive();
const str = msg.getData().toString();
console.log(“RECEIVED!!!!!!!!!!”, str);
consumer.acknowledge(msg);
await consumer.close();
await client.close();
})();

7. For now, it is enough. Run this example above via Node.JS:

node ./test_pulsar_elastic_connection.js

It will create a message in ES. Find the tutorial here. Sent messages:

8. Run the consumer for receiving:

node ./consumer.js

9. Now some messages were sent by the producer (they were already saved in ES immediately), the consumer received and acknowledged a message and after that, all instances were closed.

10. For checking results in ES refresh data before searching:

curl -s http://localhost:9200/your_topic/_refresh
curl -s http://localhost:9200/your_topic/_search

11. Results of checking ElasticSearch data by CURL:

12. Use Kibana GUI for ES for checking saved messages:

Woohoo! That’s enough for testing the whole flow.

Issue

According to the issue above our ninja-developer dug into the Java sources of Pulsar and implemented these abilities of CRUD, customly of course.

The decision was to create new feature in the Pulsar ElasticSearch connector and publish this idea to the official repository:

  • added ID property for tracking records by ID
  • implemented “ACTION” property which can act as UPSERT or DELETE
  • created Pull Request (PR) to Apache Pulsar GitHub
  • The whole description and discussion about our PR is here.

Despite creating this PR our team decided to change the development up and noticed that they couldn’t catch an exception. The message was just sent and our team couldn’t be sure whether it was saved in ES or not. So, the team decided to use the first approach — manual saving to ES by using @elastic/elasticsearch library. Please find the tutorial for Node.JS here.

The next step was developing the subscribe feature and testing the flow. Of course, you can choose any approach that would be more suitable for your project (coz nobody knows exactly the precise requirements!). For example, all processes of messaging can be only at your backend side. One of our ideas was to implement Service Sent Events (SSE, a server push technology enabling a client to receive automatic updates from a server via HTTP connection) with “Content-Type”: “text/event-stream” and subscribe to the endpoint from the frontend. All users who subscribed to the Pulsar topic via SSE will receive new messages from a Pulsar message broker.

Take a look at some parts of the code

  • route
app.get(“/subscribe/:sessionId”, sse.middleware());
  • SSE
const HEADERS = {
“Content-Type”: “text/event-stream”,
Connection: “keep-alive”,
“Cache-Control”: “no-cache”
};
const subscriptions = new Map();const subscribe = (id, handler) => {
if (!subscriptions.has(id)) {
subscriptions.set(id, handler);
} else {
console.warn(“Trying to resubscribe with already created subscription”);
}
return {
unsubscribe: () => subscriptions.delete(id)
};
};
const sse = ({ onClose } = {}) => {
const middleware = (req, res) => {
res.writeHead(200, HEADERS);
res.write(`id: ${nanoid()}\n`);
res.write(“retry: 1\n”);
res.write(`data: ${JSON.stringify({ success: true })}\n\n`);
//… your logic
};
return middleware;
};
const sendEvents = (type, message) => {
for (const onMessage of subscriptions.values()) {
onMessage(type, message);
}
};
const newComment = message => {
sendEvents(pulsarTopics.NEW_COMMENT, message);
};
const updatedComment = message => {
sendEvents(pulsarTopics.UPDATED_COMMENT, message);
};
const deletedComment = message => {
sendEvents(pulsarTopics.DELETED_COMMENT, message);
};
//…

Angular 2+

  • sse.service.ts
//…
export class SseService {
getEventSource(sessionId: string): EventSource {
return new EventSource(environment.serverUrl + `/subscribe/${sessionId}`);
}
}
  • comments.service.ts
getServerSentEvent() {
return new Observable(observer => {
let eventSource = this.sseService.getEventSource(this.currentSession);
setUp();
const self = this;
function setUp() {
eventSource.onmessage = event => {
self.zone.run(() => {
observer.next(event);
});
};
eventSource.addEventListener(‘new_comment’, (event: any) => {
self.zone.run(() => {
const objEvent = JSON.parse(event.data);
//… your logic
eventSource.addEventListener(‘updated_comment’, (event: any) => {
self.zone.run(() => {
const objEvent = JSON.parse(event.data);
//… your logic
eventSource.addEventListener(‘deleted_comment’, (event: any) => {
self.zone.run(() => {
const objEvent = JSON.parse(event.data);
//… your logic

There were 3 event listeners add/update/delete messages. It gives us the whole logic for chatting.

Conclusions

Subscribe to our blog and see you soon!

High-level software engineers for powerful web solutions https://intspirit.com/