// ── Java Producer ──
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Reliability configs
props.put("acks", "all"); // Wait for all ISRs
props.put("retries", Integer.MAX_VALUE); // Infinite retries
props.put("max.in.flight.requests.per.connection", 5); // Idempotent order
props.put("enable.idempotence", "true"); // Exactly-once to one partition
props.put("linger.ms", 10); // Batch up to 10ms
props.put("batch.size", 16384); // 16KB batch size
props.put("compression.type", "lz4"); // Compress batches
props.put("delivery.timeout.ms", 120000); // 2 min total timeout
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Fire-and-forget
producer.send(new ProducerRecord<>("orders", "key1", "order data"));
// Synchronous send (wait for response)
try {
RecordMetadata meta = producer.send(
new ProducerRecord<>("orders", "key1", "order data")
).get();
System.out.println("Offset: " + meta.offset());
} catch (Exception e) {
e.printStackTrace();
}
// Asynchronous send with callback
producer.send(new ProducerRecord<>("orders", "key1", "order data"),
(metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Sent to partition " + metadata.partition()
+ " offset " + metadata.offset());
}
});
// Send with headers
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders", null, "value"
);
record.headers().add("trace-id", "abc123".getBytes());
record.headers().add("version", "v2".getBytes());
producer.send(record);
// Flush & close
producer.flush();
producer.close();
| acks | Behavior | Durability |
|---|
| 0 | No acknowledgment | No guarantee |
| 1 | Leader only | Leader crash = data loss |
| all / -1 | All ISRs | Strongest (with min.insync.replicas) |
| Config | Purpose |
|---|
| enable.idempotence=true | Dedup within partition |
| acks=all (required) | All ISRs must confirm |
| max.in.flight=<=5 | Maintains ordering with retries |
| retries=MAX_VALUE | Never give up on transient errors |
💡Always use idempotent producer in production. Set enable.idempotence=true, acks=all, and min.insync.replicas=2. This prevents duplicate messages during retries without killing throughput.
// ── Java Consumer ──
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processor");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Reliability configs
props.put("auto.offset.reset", "earliest"); // Start from beginning
props.put("enable.auto.commit", "false"); // Manual commit
props.put("max.poll.records", "500"); // Batch size
props.put("session.timeout.ms", "45000"); // Heartbeat interval
props.put("heartbeat.interval.ms", "3000");
props.put("max.poll.interval.ms", "300000"); // 5 min processing time
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to topic(s)
consumer.subscribe(Arrays.asList("orders", "payments"));
// Or subscribe with pattern
consumer.subscribe(Pattern.compile("orders-.*"));
// Or assign partitions manually (no rebalancing)
consumer.assign(Arrays.asList(
new TopicPartition("orders", 0),
new TopicPartition("orders", 1)
));
// Poll loop with manual commit
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition=%d, offset=%d, key=%s, value=%s%n",
record.partition(), record.offset(), record.key(), record.value());
// Process record...
// If processing fails, seek back to retry
}
// Manual commit after successful processing
consumer.commitSync(); // blocking
// OR
consumer.commitAsync(); // non-blocking
}
} finally {
consumer.close();
}
// Seek to specific offset
consumer.seek(new TopicPartition("orders", 0), 42);
// Seek to beginning / end
consumer.seekToBeginning(Arrays.asList(new TopicPartition("orders", 0)));
consumer.seekToEnd(Arrays.asList(new TopicPartition("orders", 0)));
// Get consumer group info
Map<TopicPartition, OffsetAndMetadata> committed =
consumer.committed(new TopicPartition("orders", 0));
| Strategy | Behavior |
|---|
| Range | Partitions divided by range across consumers |
| RoundRobin | Partitions assigned round-robin |
| Sticky | Minimize partition movement on rebalance |
| CooperativeSticky | Incremental, no stop-the-world |
| Policy | Behavior |
|---|
| earliest | Read from beginning of partition |
| latest | Read only new messages |
| none | Throw exception if no offset found |
🚫Always commit offsets AFTER processing. Use enable.auto.commit=false and commit manually. Auto-commit can cause data loss if the consumer crashes after commit but before processing.
# ── Schema Registry REST API ──
# Base URL: http://localhost:8081
# Register a schema (Avro)
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' \
http://localhost:8081/subjects/orders-value/versions
# Get latest schema
curl http://localhost:8081/subjects/orders-value/versions/latest
# Get all subjects (registered schemas)
curl http://localhost:8081/subjects
# Get all versions of a subject
curl http://localhost:8081/subjects/orders-value/versions
# Check schema compatibility
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "..."}' \
http://localhost:8081/compatibility/subjects/orders-value/versions/latest
# Update compatibility mode
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config/orders-value
// ── Avro Producer with Schema Registry ──
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("schema.registry.url", "http://localhost:8081");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
// Generate Avro class from schema (avro-maven-plugin)
// Order.java is generated from order.avsc
Order order = Order.newBuilder()
.setOrderId("ORD-001")
.setAmount(99.99)
.setItems(Arrays.asList("item1", "item2"))
.build();
producer.send(new ProducerRecord<>("orders", order.getOrderId(), order));
// ── Avro Consumer ──
Properties cProps = new Properties();
cProps.put("bootstrap.servers", "localhost:9092");
cProps.put("schema.registry.url", "http://localhost:8081");
cProps.put("group.id", "order-consumer");
cProps.put("key.deserializer", StringDeserializer.class.getName());
cProps.put("value.deserializer", KafkaAvroDeserializer.class.getName());
cProps.put("specific.avro.reader", "true"); // Use generated class
KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(cProps);
consumer.subscribe(Collections.singletonList("orders"));
ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Order> record : records) {
Order order = record.value();
System.out.println(order.getOrderId() + ": " + order.getAmount());
}
| Format | Pros | Cons |
|---|
| Avro | Compact, schema evolution, SR | Schema required |
| Protobuf | Fast, compact, language-agnostic | Schema required |
| JSON Schema | Human-readable, flexible | Larger payload |
| JSON (raw) | No schema needed | No evolution support |
| String | Simplest | No structure, no validation |
| Mode | Description |
|---|
| BACKWARD | New can read old data |
| FORWARD | Old can read new data |
| FULL | Both backward and forward |
| BACKWARD_TRANSITIVE | New reads all previous versions |
| FORWARD_TRANSITIVE | All previous read new |
| NONE | No compatibility check |
// ── Kafka Streams DSL ──
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// ── Simple transform: filter + map ──
KStream<String, String> orders = builder.stream("orders");
orders
.filter((key, value) -> value.contains("completed"))
.mapValues(value -> value.toUpperCase())
.to("completed-orders");
// ── Aggregate: count orders per customer ──
KTable<String, Long> orderCounts = orders
.groupBy((key, value) -> extractCustomer(value),
Grouped.with(Serdes.String(), Serdes.String()))
.count(Materialized.as("order-counts-store"));
orderCounts.toStream().to("order-counts");
// ── Join: orders with customer data ──
KTable<String, String> customers = builder.table("customers",
Consumed.with(Serdes.String(), Serdes.String()));
KStream<String, String> enriched = orders
.join(customers,
(order, customer) -> order + " | " + customer,
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));
// ── Windowed aggregation: 5-minute tumbling window ──
orders
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1)))
.count()
.toStream()
.map((windowedKey, count) -> new KeyValue<>(
windowedKey.key() + "-" + windowedKey.window().startTime(),
count.toString()))
.to("order-counts-5min");
// ── Build and start ──
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
💡Kafka Streams is a client library — no separate cluster needed. It uses Kafka for input/output and handles failover via consumer group coordination. State stores are changelogged to Kafka topics for fault tolerance.
Q: How does Kafka guarantee message ordering?Kafka guarantees ordering only within a single partition. Messages with the same key always go to the same partition (via hash(key) % num_partitions). For cross-partition ordering, use a single partition (limits throughput) or buffer and reorder in the consumer.
Q: What is the difference between idempotent producer and transactions?Idempotent producer prevents duplicates within a single partition (PID + sequence number). Transactions provide exactly-once across multiple partitions (atomic writes to multiple partitions). Enable both with enable.idempotence=true and transactional.id.
Q: How does consumer rebalancing work?When a consumer joins or leaves a group, the coordinator triggers rebalancing. Each consumer gets a subset of partitions. With CooperativeSticky, rebalancing is incremental — only affected partitions move. With eager (default), all consumers stop and restart.
Q: What is consumer lag and how do you handle it?Consumer lag = difference between latest offset and consumer offset. High lag means the consumer cannot keep up. Solutions: (1) add more consumers (up to partition count), (2) optimize consumer processing, (3) batch processing, (4) use compacted topics to reduce data volume.
Q: When should you use Kafka vs RabbitMQ?Kafka: high throughput, persistent log, replay, event sourcing, stream processing. RabbitMQ: message queuing, request/reply, complex routing, lower latency, per-message acknowledgment. Kafka is a log, RabbitMQ is a queue.
Q: What is log compaction?Log compaction retains only the latest value for each key, removing older records with the same key. Enable with cleanup.policy=compact. Useful for changelog-style topics where you only care about the current state (e.g., user profile updates).
Q: Explain the role of ZooKeeper / KRaft.ZooKeeper stores cluster metadata (brokers, topics, partition assignments, ACLs). KRaft (Kafka Raft) replaces ZooKeeper entirely, removing the operational overhead of maintaining a separate ZooKeeper ensemble. KRaft is production-ready since Kafka 3.3+.
💡Top Kafka interview topics: partitioning and ordering, producer acks and idempotence, consumer groups and rebalancing, exactly-once semantics, Schema Registry, log compaction, consumer lag handling, and Kafka Streams vs Kafka Connect.