【Kafka连接异常】
【部署方式】:集群
【异常信息】:
broker节点上:
java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.
at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.throwIfTransactionCoordinatorDisabled(KafkaRequestHandler.java:2619) ~[?:?]
at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.getTransactionCoordinator(KafkaRequestHandler.java:281) ~[?:?]
at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.handleInitProducerId(KafkaRequestHandler.java:2048) ~[?:?]
at io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.channelRead(KafkaCommandDecoder.java:332) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.100.Final.jar:4.1.100.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[io.netty-netty-codec-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[io.netty-netty-handler-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392]
解决方案:https://medium.com/@selectdb.marketing/how-to-import-data-from-apache-pulsar-into-apache-doris-quickly-and-seamlessly-3c277e9e3918
在broker节点上加入这两个配置,并重启节点服务:
kafkaTransactionCoordinatorEnabled=true
transactionCoordinatorEnabled=true