Apache Kafka is a popular platform for streaming and pub/sub. It is very common at large companies with massive engineering teams, and less common at smaller companies. That is because, unlike many other alternatives in the message queue space, Kafka focuses on being a high performance write log without strictly enforcing message receipt. In other words, Kafka is less of a traditional message queue and more like a distributed event emitter, which is great for cases where you may have dozens of different services looking at a particular message.

Here's how you can get started working with Kafka in Node.js.

Setting Up Kafka Locally

First, you need to set up Kafka locally. There are also cloud services for Kafka if you prefer that approach.

Kafka requires you to have Java 8+ installed on your machine. On Ubuntu, you can just run sudo apt-get install default-jdk. For other operating systems, you can download a JDK from Oracle.

Once you've installed a JDK, you can then download Kafka using curl.

curl -Ol https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz

Then, extract Kafka using tar -zxvf kafka_2.13-2.7.0.tgz && cd kafka_2.13-2.7.0.

Kafka still requires Zookeeper, so you need to run 2 services simultaneously to run Kafka. First, start Zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Once Zookeeper is running, you can start the Kafka broker. A broker is the service responsible for storing messages and allowing clients to read messages. Zookeeper is responsible for helping brokers coordinate with other brokers. Running the below command starts a broker on port 9092.

bin/kafka-server-start.sh config/server.properties

Kafka events are broken up into topics. A topic is analagous to a collection in MongoDB or a table in SQL databases. In Kafka, you need to explicitly create a topic before reading or writing messages. The Kafka tarball includes a handy kafka-topics.sh script that lets you create a new topic from the command line:

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

The above command created a new topic called "quickstart-events". Now that you have a topic, you can write some events to the topic. There's a kafka-console-producer.sh script that lets you enter one or more events. In Kafka, an event is an object with an arbitrary string message. The below shell output shows creating 2 events: one with message 'event 1', and one with message 'event 2'.

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>event 1
>event 2
>^C
$ 

You can then print out these messages with the kafka-console-consumer.sh script:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
event 1
event 2
^CProcessed a total of 2 messages
$ 

Hello, KafkaJS

The command line utilities are neat for getting started, but, in practice, you rarely use Kafka from the command line. KafkaJS is currently the most actively maintained Node.js client for Kafka, although kafka-node is still popular. For the purposes of this tutorial, I'll use KafkaJS.

First, let's connect to Kafka from Node and read the two messages from the 'quickstart-events' collection.

const { Kafka } = require("kafkajs");

run().then(() => console.log("Done"), err => console.log(err));

async function run() {
  const kafka = new Kafka({ brokers: ["localhost:9092"] });
  // If you specify the same group id and run this process multiple times, KafkaJS
  // won't get the events. That's because Kafka assumes that, if you specify a
  // group id, a consumer in that group id should only read each message at most once.
  const consumer = kafka.consumer({ groupId: "" + Date.now() });

  await consumer.connect();

  await consumer.subscribe({ topic: "quickstart-events", fromBeginning: true });
  await consumer.run({ 
    eachMessage: async (data) => {
      console.log(data);
    }
  });
}

The above script will print the below output:

{"level":"INFO","timestamp":"2021-02-11T16:34:22.411Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"1613061262395"}
{"level":"INFO","timestamp":"2021-02-11T16:34:22.425Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"1613061262395","memberId":"kafkajs-d042d5ac-44b8-4802-aeaa-c9002ae091e0","leaderId":"kafkajs-d042d5ac-44b8-4802-aeaa-c9002ae091e0","isLeader":true,"memberAssignment":{"quickstart-events":[0]},"groupProtocol":"RoundRobinAssigner","duration":13}
Done
{
  topic: 'quickstart-events',
  partition: 0,
  message: {
    magicByte: 2,
    attributes: 0,
    timestamp: '1613059276111',
    offset: '0',
    key: null,
    value: <Buffer 65 76 65 6e 74 20 31>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '0',
      firstTimestamp: '1613059276111',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 0,
      producerId: '-1',
      producerEpoch: -1,
      firstSequence: -1,
      maxTimestamp: '1613059276111',
      timestampType: 0,
      magicByte: 2
    }
  }
}
{
  topic: 'quickstart-events',
  partition: 0,
  message: {
    magicByte: 2,
    attributes: 0,
    timestamp: '1613059277500',
    offset: '1',
    key: null,
    value: <Buffer 65 76 65 6e 74 20 32>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '1',
      firstTimestamp: '1613059277500',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 0,
      producerId: '-1',
      producerEpoch: -1,
      firstSequence: -1,
      maxTimestamp: '1613059277500',
      timestampType: 0,
      magicByte: 2
    }
  }
}

Each event has 3 properties: topic, partition, and message. For the purposes of this tutorial, partition doesn't matter, we're just interested in the message. The message.value property contains the message that you generated using kafka-console-producer.sh as a Node.js buffer. Here's how you can print out the message value as a UTF8 string:

await consumer.subscribe({ topic: "quickstart-events", fromBeginning: true });
await consumer.run({ 
  eachMessage: async (data) => {
    // Prints "event 1" followed by "event 2"
    console.log(data.message.value.toString("utf8"));
  }
});

Producing Events from Node.js

KafkaJS has a separate producer() function that creates a new producer object. The previous example created a consumer: consumers can only read messages. Producers can only send messages. Below is how you can send a message 'event 3':

const { Kafka } = require("kafkajs");

run().then(() => console.log("Done"), err => console.log(err));

async function run() {
  const kafka = new Kafka({ brokers: ["localhost:9092"] });

  const producer = kafka.producer();
  await producer.connect();

  await producer.send({
    topic: "quickstart-events",
    messages: [
      { value: "event 3" },
    ]
  });
}

KafkaJS consumers automatically read new messages in realtime. And, because of JavaScript's event loop, you can run a producer and a consumer in the same process. For example, the below script prints 3 events, followed by a 4th event after a 1 second delay.

const { Kafka } = require("kafkajs");

run().then(() => console.log("Done"), err => console.log(err));

async function run() {
  const kafka = new Kafka({ brokers: ["localhost:9092"] });
  const consumer = kafka.consumer({ groupId: '' + Date.now() });

  await consumer.connect();

  await consumer.subscribe({ topic: 'quickstart-events', fromBeginning: true });

  let startTime = Date.now();

  await consumer.run({ 
    eachMessage: async (data) => {
      console.log(Date.now() - startTime, data.message.value.toString('utf8'));
    }
  });

  const producer = kafka.producer();
  await producer.connect();

  // Wait 1 second before sending a new message
  await new Promise(resolve => setTimeout(resolve, 1000));

  await producer.send({
    topic: "quickstart-events",
    messages: [
      { value: "event 4" },
    ]
  });
}

Below is the output:

29 event 1
30 event 2
30 event 3
Done
1046 event 4

Moving On

Kafka is a powerful tool that allows large engineering organizations to effectively break up their code into separate services. Instead of having one monolith that handles all business logic itself, a service can send an event to Kafka and delegate the handling of the event without needing to know what consumers are interested in the event. This is great because it enables different services to work together without being aware of each other, meaning you can build a new service without any other service being aware of it. That can be a huge productivity win for large teams.

Found a typo or error? Open up a pull request! This post is available as markdown on Github
comments powered by Disqus