Hello Ishan!
Greetings. I have couple of observations on your thread dump.
1. Is thread dump captured on kafka consumer instance at right time?
In your thread dump I see 3 threads with 'kafka' name prefix.
kafka-coordinator-heartbeat-thread | streaming-consumer-native-catalog-immediate-qa PRIORITY : 5 THREAD ID : 0X00007F78281EB000 NATIVE ID : 0X76 NATIVE ID (DECIMAL) : 118 STATE : WAITING stackTrace: java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1374) - locked <0x00000006d3a88bd0> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
kafka-producer-network-thread | commonProducer PRIORITY : 5 THREAD ID : 0X00007F7971309000 NATIVE ID : 0X62 NATIVE ID (DECIMAL) : 98 STATE : WAITING stackTrace: java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007493da638> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) at com.rr.catalog.common.kafka.CommonProducers$1.lambda$sendJobStatus$0(CommonProducers.java:60) at com.rr.catalog.common.kafka.CommonProducers$1$$Lambda$595/407605972.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at com.rr.kafka.toolkit.producer.IdempotentProducer.lambda$sendFuture$1(IdempotentProducer.java:60) at com.rr.kafka.toolkit.producer.IdempotentProducer$$Lambda$576/1581044928.onCompletion(Unknown Source) at com.rr.kafka.toolkit.producer.IdempotentProducer.lambda$send$0(IdempotentProducer.java:49) at com.rr.kafka.toolkit.producer.IdempotentProducer$$Lambda$580/129500578.onCompletion(Unknown Source) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1390) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:234) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:198) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:758) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634) at org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575) at org.apache.kafka.clients.producer.internals.Sender$$Lambda$606/1405124917.accept(Unknown Source) at java.util.ArrayList.forEach(ArrayList.java:1259) at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562) at org.apache.kafka.clients.producer.internals.Sender$$Lambda$605/1990909841.accept(Unknown Source) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562) at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836) at org.apache.kafka.clients.producer.internals.Sender$$Lambda$602/1405434884.onComplete(Unknown Source) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) at java.lang.Thread.run(Thread.java:748)
kafka-producer-network-thread | streamingProducer PRIORITY : 5 THREAD ID : 0X00007F79712E4000 NATIVE ID : 0X61 NATIVE ID (DECIMAL) : 97 STATE : RUNNABLE stackTrace: java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x00000006d4b83e90> (a sun.nio.ch.Util$3) - locked <0x00000006d4b83e80> (a java.util.Collections$UnmodifiableSet) - locked <0x00000006d4b83630> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:873) at org.apache.kafka.common.network.Selector.poll(Selector.java:465) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) at java.lang.Thread.run(Thread.java:748)
All the 3 threads doesn't seem to be consuming messages from Kafka? You do you have different thread name for kafka consumer thread? May be consumer thread crashed (or exited) and you are no longer seeing it in the thread dump. That's why messages aren't consumed from Kafka
To isolate this problem, you want to capture few snapshots of thread dumps may be every 1 hour and do the analysis. You may find this open source script helpful, which captures 360-degree from your application stack in a non-intrusive manner.
2. netty threads waiting
This is not related to the problem you are bringing up, however would like to share this observation. There are 70+ threads in your 'netty-client-worker' thread pool not doing anything. You may consider reducing the min thread pool size of this pool.
Here is the stacktrace of one of the netty-client-worker thread
netty-client-worker PRIORITY : 8 THREAD ID : 0X00007F7820019000 NATIVE ID : 0X58F NATIVE ID (DECIMAL) : 1423 STATE : RUNNABLE stackTrace: java.lang.Thread.State: RUNNABLE at io.netty.channel.epoll.Native.epollWait(Native Method) at io.netty.channel.epoll.Native.epollWait(Native.java:148) at io.netty.channel.epoll.Native.epollWait(Native.java:141) at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748)
Edit your Comment