practise

kafka in wsl home foldercd ~/kafka_2.13-3.8.0bin/zookeeper-server-start.sh config/zookeeper.propertiesbin/kafka-server-start.sh config/server.propertiesnano ~/kafka_2.13-3.8.0/config/server.properties
Listen on all network interfaces
listeners=PLAINTEXT://0.0.0.0:9092
Advertise WSL IP so Windows IntelliJ can connect
advertised.listeners=PLAINTEXT://
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.22.144.1:9092");
INTELLIJ CODE
package org.example;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration; import java.util.Collections; import java.util.Properties;
public class Main {
public static void main(String[] args) {
// Kafka consumer configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.20.191.167:9092");
// Use a unique consumer group to always read from beginning
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-" + System.currentTimeMillis());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to a topic
consumer.subscribe(Collections.singletonList("test-topic"));
System.out.println("Kafka Consumer started. Waiting for messages...");
// Add shutdown hook to close consumer gracefully
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Closing consumer...");
consumer.close();
}));
// Poll loop
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received: " + record.value() +
" | Partition: " + record.partition() +
" | Offset: " + record.offset());
}
}
}
}
pom.xml
"<project xmlns="http://maven.apache.org/POM/4.0.0""""
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka_consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
</dependencies>
1 Installation of kafka on windows and running the kafka server and zookeeper.Creating a topic
installation on ubuntu
1cd ~
2wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
3tar -xvzf kafka_2.13-3.8.0.tgz
4cd kafka_2.13-3.8.0
start zookeeper
1bin/zookeeper-server-start.sh config/zookeeper.properties
start kafka server
1bin/kafka-server-start.sh config/server.properties
2
3creating a topic
cd ~/kafka_2.13-3.8.0 bin/kafka-topics.sh --create --topic demo --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
1
2check that topic exists
bin/kafka-topics.sh --list --bootstrap-server localhost:9092