Apache Kafka Java API Example

posted on Nov 20th, 2016

Apache Kafka

Apache Kafka is an open-source message broker project developed by the Apache Software Foundation written in Scala. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. It is, in its essence, a "massively scalable pub/sub message queue architected as a distributed transaction log," making it highly valuable for enterprise infrastructures to process streaming data.

Pre Requirements

1) A machine with Ubuntu 14.04 LTS operating system

2) Apache Kakfa 2.9.2- software installed. (How to install kafka on ubuntu)

Kafka Java API Example

Let us create an application for publishing and consuming messages using a Java client.

Steps to follow

Step 1 - Open a new terminal (CTRL + ALT + T) and change the directory to /usr/local/kafka

$ cd /usr/local/kafka

Step 2 - Start the zookeeper

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

Step 3 - Open a new terminal (CTRL + ALT + T) and change the directory to /usr/local/kafka

$ cd /usr/local/kafka

Step 4 - Start the kafka server

$ ./bin/kafka-server-start.sh config/server.properties

Step 5 - Open a new terminal (CTRL + ALT + T) and just type 'jps' (without quotes). It shows all the java processes that are running.

Apache Kafka Java API Example

Here Kafka is the kafka server and QuorumPeerMain is zookeeper process.


import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer {
	private static Producer<Integer, String> producer;
	private static final String topic = "mytopic";

	public void initialize() {
		Properties producerProps = new Properties();
		producerProps.put("metadata.broker.list", "localhost:9092");
		producerProps.put("serializer.class", "kafka.serializer.StringEncoder");
		producerProps.put("request.required.acks", "1");
		ProducerConfig producerConfig = new ProducerConfig(producerProps);
		producer = new Producer<Integer, String>(producerConfig);

	public void publishMesssage() throws Exception{            
             BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));               
         while (true){
             System.out.print("Enter message to send to kafka broker (Press 'Y' to close producer): ");
           String msg = null;
           msg = reader.readLine(); // Read message from console
           //Define topic name and message
           KeyedMessage<Integer, String> keyedMsg =
                        new KeyedMessage<Integer, String>(topic, msg);
           producer.send(keyedMsg); // This publishes message on given topic
           if("Y".equals(msg)){ break; }
           System.out.println("--> Message [" + msg + "] sent. Check message on Consumer's program console");

	public static void main(String[] args) throws Exception {
		KafkaProducer kafkaProducer = new KafkaProducer();
		// Initialize producer
		// Publish message
		// Close the producer


import java.util.*;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {
	private ConsumerConnector consumerConnector = null;
	private final String topic = "mytopic";

	public void initialize() {
		Properties props = new Properties();
		props.put("zookeeper.connect", "localhost:2181");
		props.put("group.id", "testgroup");
		props.put("zookeeper.session.timeout.ms", "400");
		props.put("zookeeper.sync.time.ms", "300");
		props.put("auto.commit.interval.ms", "1000");
		ConsumerConfig conConfig = new ConsumerConfig(props);
		consumerConnector = Consumer.createJavaConsumerConnector(conConfig);

	public void consume() {
		// Key = topic name, Value = No. of threads for topic
		Map<String, Integer> topicCount = new HashMap<String, Integer>();
		topicCount.put(topic, new Integer(1));

		// ConsumerConnector creates the message stream for each topic
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector

		// Get Kafka stream for topic 'mytopic'
		List<KafkaStream<byte[], byte[]>> kStreamList = consumerStreams
		// Iterate stream using ConsumerIterator
		for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
			ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();

			while (consumerIte.hasNext())
				System.out.println("Message consumed from topic [" + topic
						+ "] : " + new String(consumerIte.next().message()));
		// Shutdown the consumer connector
		if (consumerConnector != null)

	public static void main(String[] args) throws InterruptedException {
		KafkaConsumer kafkaConsumer = new KafkaConsumer();
		// Configure Kafka consumer
		// Start consumption

Step 6 - Open a new terminal (CTRL + ALT + T) and compile both the programs.

$ javac -cp "/usr/local/kafka/lib/*" *.java

Step 7 - Open a new terminal (CTRL + ALT + T) and run KafkaProducer

$ java KafkaProducer

Step 8 - Open a new terminal (CTRL + ALT + T) and run KafkaConsumer

$ java KafkaConsumer

Step 9 - Stop zookeeper and kafka server. Just press 'CTRL + D' in the terminals of zookeeper and kafka server.

Please share this blog post and follow me for latest updates on

facebook             google+             twitter             feedburner

Previous Post                                                                                          Next Post

Labels : Apache Kafka Installation on Ubuntu   Kafka Single Node Single Broker Configuration   Kafka Single Node Multiple Brokers Configuration   Creating a Kafka topic   Modifying a Kafka topic   Deleting a Kafka topic