Profile Image
Ishan Upamanyu

kafka consumer application is stuck and not processing data

Hi,

This tread dump is taken from a running kafka consumer instance where we have setup 1 thread per partition.

 

The consumer runs fine for a day or two and then it just hangs. It stops consuming any data and current offset does not move for any partition.

 

A restart of the application fixes the probelm. Not sure why it happens.

 

Does anything in thread dump help?


Report URL - https://fastthread.io/my-thread-report.jsp?p=c2hhcmVkLzIwMjIvMTEvNC9OQ0FfVGhyZWFkX0R1bXBfMDRfMTFfMjAyMi5kdW1wLS0xNS00NC01MQ==

  • kafka consumer application is stuck

  • data not processing

  • netty threads waiting

  • netty-client-worker

Please Sign In or to post your comment or answer

Profile Image

Ram Lakshmanan

Hello Ishan!

 

 Greetings. I have couple of observations on your thread dump.

 

1. Is thread dump captured on kafka consumer instance at right time?

 

In your thread dump I see 3 threads with 'kafka' name prefix.

 

kafka-coordinator-heartbeat-thread | streaming-consumer-native-catalog-immediate-qa
PRIORITY : 5
THREAD ID : 0X00007F78281EB000
NATIVE ID : 0X76
NATIVE ID (DECIMAL) : 118
STATE : WAITING


stackTrace:
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1374)
- locked <0x00000006d3a88bd0> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

 

kafka-producer-network-thread | commonProducer
PRIORITY : 5
THREAD ID : 0X00007F7971309000
NATIVE ID : 0X62
NATIVE ID (DECIMAL) : 98
STATE : WAITING

stackTrace:
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007493da638> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
at com.rr.catalog.common.kafka.CommonProducers$1.lambda$sendJobStatus$0(CommonProducers.java:60)
at com.rr.catalog.common.kafka.CommonProducers$1$$Lambda$595/407605972.apply(Unknown Source)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at com.rr.kafka.toolkit.producer.IdempotentProducer.lambda$sendFuture$1(IdempotentProducer.java:60)
at com.rr.kafka.toolkit.producer.IdempotentProducer$$Lambda$576/1581044928.onCompletion(Unknown Source)
at com.rr.kafka.toolkit.producer.IdempotentProducer.lambda$send$0(IdempotentProducer.java:49)
at com.rr.kafka.toolkit.producer.IdempotentProducer$$Lambda$580/129500578.onCompletion(Unknown Source)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1390)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:234)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:198)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:758)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634)
at org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575)
at org.apache.kafka.clients.producer.internals.Sender$$Lambda$606/1405124917.accept(Unknown Source)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562)
at org.apache.kafka.clients.producer.internals.Sender$$Lambda$605/1990909841.accept(Unknown Source)
at java.lang.Iterable.forEach(Iterable.java:75)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562)
at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
at org.apache.kafka.clients.producer.internals.Sender$$Lambda$602/1405434884.onComplete(Unknown Source)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
at java.lang.Thread.run(Thread.java:748)

 

kafka-producer-network-thread | streamingProducer
PRIORITY : 5
THREAD ID : 0X00007F79712E4000
NATIVE ID : 0X61
NATIVE ID (DECIMAL) : 97
STATE : RUNNABLE

stackTrace:
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000006d4b83e90> (a sun.nio.ch.Util$3)
- locked <0x00000006d4b83e80> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000006d4b83630> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:873)
at org.apache.kafka.common.network.Selector.poll(Selector.java:465)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
at java.lang.Thread.run(Thread.java:748)

 

 All the 3 threads doesn't seem to be consuming messages from Kafka? You do you have different thread name for kafka consumer thread? May be consumer thread crashed (or exited) and you are no longer seeing it in the thread dump. That's why messages aren't consumed from Kafka

 

 To  isolate this problem, you want to capture few snapshots of thread dumps may be every 1 hour and do the analysis. You may find this open source script helpful, which captures 360-degree from your application stack in a non-intrusive manner.

 

2. netty threads waiting

 This is not related to the problem you are bringing up, however would like to share this observation. There are 70+ threads in your 'netty-client-worker' thread pool not doing anything. You may consider reducing the min thread pool size of this pool.

 

 

Here is the stacktrace of one of the netty-client-worker thread

netty-client-worker
PRIORITY : 8
THREAD ID : 0X00007F7820019000
NATIVE ID : 0X58F
NATIVE ID (DECIMAL) : 1423
STATE : RUNNABLE


stackTrace:
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:148)
at io.netty.channel.epoll.Native.epollWait(Native.java:141)
at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)

 

Got something else on mind? Post Your Question

Not the answer you're looking for? Browse other questions tagged
  • kafka consumer application is stuck

  • data not processing

  • netty threads waiting

  • netty-client-worker