Kafka, Kafka Streams, Producer and Consumer
The theory
Apache Kafaka is high throughput integration system to pass through messages from source to target so Apache Kafka is a kind of pipeline.This is high level overview. Apache Kafka can work as a single instance or cluster of brokers which uses Zookepeer. The Zookeeper is responsible for store information about brokers node and topics. In addition it is new approach to integrate system. Base on Apache Kafka are built awesome projects like i.e.
Confluent (
https://docs.confluent.io/current/platform.html) which has many new connectors (JMS, MQTT, Cassandra, Solr, S3, DynamoDB, HDFS,..) and other elements: Kafka Connect, ksqldb, rest-proxy. Incoming streams can be filter or parse on-fly and results can be store in the destination place (DB, file store, ElasticSearch or other systems). For that purpose we can use Kafka Streams or KSQL. Each element are transported as a binary object and if producer and consumer wants to use complex structure of messages they should use external Schema Registry (it is also available in mentioned Confluent project).

Internally Apache Kafka contains topics and each topic has one or more partitions and replicas. As we can see below I prepare cluster draft which base on 3 brokers. There is a three partitions and each leader partition has got two additional followers. That configuration of Kafka cluster guarantee high durable and throughput. In case of damage one broker the other brokers handle events and automatically switch partition leader to another broker. Very important parameters are:
--replication-factor 3 --partitions 3
The Producer
The Producer prepare message and put them to Kafka. Depends of acknowledge It can work in a few configurations:
- acks - 0 >> No acknowledge is required
- acks -1 >> Only partition leader acknowledge is required
- acks -all >> Partition leader and partitions followers acknowledge is required
Throughput and duration depends on that configuration. If we aggregate application logs we can chose acks=0 (lost some part of data is not critical), but if we want to store business data we should consider acks=all (we must keep all data).
Generally, producer should have got other properties like timeouts, retries, etc. It is also available to configure producer as idempotent producer (
enable.idempotence=true). Kafka checks duplicates and potentially it could be higher latency.
In addition the producer can send a message:
- with defined key (spread across partition based on key hash ordering)
- without a key (spread across partitions with round robin approach)
The Consumer
Kafka store data seven days (default). Each group receives data from topic from all partitions. The number of connection between consumer and partitions strictly depend on number of consumers in single group.
Delivery approach for consumers can be specified as:
- at most one >> as soon as the data is received the offset commit is executed (possibility to lose data if consumer throw an error)
- at least once >> as soon as consumer finished processing the offset commit is executed (possibility to duplicate data - it is necessary to prevent that kind of situation)
- exactly one >> this approach is dedicated for stream processing
The environment
Let's prepare environment with Apache Kafka. There is a few way but I choose Confluent project based on Docker. You can find documentation there:
https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html. Follow the steps and I'm sure You will have entire environment prepared to tests.
In that project is docker-compose.yml with definied serwers (below). There are very important configuration so If You want to change some variables, ports etc. this is the best place.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:5.5.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
...
...
After you run Confluent project you can see running servers
Let's connect to 'broker' server and create new 'artsci' topics from command line:
1. connect to 'broker' server
2. find main Kafka folder location
3. Create topic 'artsci' with 3 partition and 1 replication-factor (because maximum number of replication partition depends on brokers nodes)
kafka-topics --bootstrap-server localhost:9092 --topic artsci-topic --create --partitions 3 --replication-factor 1
4. Let's find new topic 'artsci' in Confluent project web application (
http://localhost:9021/)
The Producer application
To produce messages we can use
CLI (
kafka-console-producer --broker-list 127.0.0.1:9092 --topic artsci-topic --property parse.key=true --property key.separator=,)
To produce message we can use also
Java application
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kafka.artsci</groupId>
<artifactId>kafkaProducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>KafkaProducer</name>
<description>KafkaProducer application in java</description>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.8.0-beta4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Java class:
package kafkaProducer;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ArtsciKafkaProducer {
private static final String kafkaServer = "127.0.0.1:9092";
private static final String topicName = "artsci-topic";
private static final String ALL = "all";
private static final String TRUE = "true";
private static final String MAX_CONN = "5";
static Logger log = LoggerFactory.getLogger(ArtsciKafkaProducer.class.getName());
public static void main(String[] args) {
log.info("Start");
KafkaProducer<String, String> producer = createProducer();
ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, "key_101", "message 101");
producer.send(rec, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
log.error("Error send message!",e);
} else {
log.info("The offset of the record we just sent is: " + metadata.offset());
}
}
});
producer.flush();
producer.close();
}
private static KafkaProducer<String, String> createProducer() {
log.info("Prepare Producer");
Properties prop = new Properties();
prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.setProperty(ProducerConfig.ACKS_CONFIG, ALL);
prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, TRUE);
prop.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_CONN);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
return producer;
}
}
The Consumer application
To consume messages we can use
CLI (
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic artsci-topic --from-beginning --property print.key=true --property key.separator=,)
To consume messages we can use also
Java application.
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kafka.artsci</groupId>
<artifactId>kafkaConsumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>KafkaConsumer</name>
<description>KafkaConsumer</description>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.8.0-beta4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Java class:
package kafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ArtsciKafkaConsumer {
private static final String kafkaServer = "127.0.0.1:9092";
private static final String topicName = "artsci-topic";
private static final String groupId = "artsci-consumer-group";
private static final String offset = "earliest";
static Logger log = LoggerFactory.getLogger(ArtsciKafkaConsumer.class.getName());
public static void main(String[] args) {
log.info("Start");
KafkaConsumer<String, String> consumer = createConsumer();
consumer.subscribe(Arrays.asList(topicName));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
for(ConsumerRecord<String, String> record : records) {
log.info("Receive record -> Key: "+record.key()+" value: "+record.value());
}
}
private static KafkaConsumer<String, String> createConsumer() {
log.info("Prepare Producer");
Properties prop = new Properties();
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
return consumer;
}
}
The output
[main] INFO kafkaConsumer.ArtsciKafkaConsumer - Receive record -> Key: key_2 value: message 2[main] INFO kafkaConsumer.ArtsciKafkaConsumer - Receive record -> Key: key_4 value: message 4[main] INFO kafkaConsumer.ArtsciKafkaConsumer - Receive record -> Key: key_5 value: message 5[main] INFO kafkaConsumer.ArtsciKafkaConsumer - Receive record -> Key: key_1 value: message 1[main] INFO kafkaConsumer.ArtsciKafkaConsumer - Receive record -> Key: key_3 value: message 3[main] INFO kafkaConsumer.ArtsciKafkaConsumer - Receive record -> Key: key_101 value: message 101
Links
https://kafka.apache.org/081/documentation.html
https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html
https://docs.confluent.io/current/platform.html
Others subjects
Very important is security in Kafka cluster. Authentication and authorization for internally API between zookeeper and brokers and external for consumers API.