kafka-Streaming错误:在IDEA连接kafka时出现错误

首先说明这个错误的前提,我没有自己在虚拟机上搭建,因为华为送了服务器,我就直接在它的服务器上搭建了docker,弄了三个容器装了kafka,直接使用docker-compose搭建集群 

映射的端口就是这样子,但是呢,在IDEA连接kafka集群的时候

首先连接IP:5000,5002,5004

再连接返回的host.name =kafka1,kafka2,kafka3

最后继续连接advertised.host.name=kafka1,kafka2,kafka3

这样的情况,如果是普通服务器还好,直接在本地hosts添加主机IP映射即可

但是这个容器就添加不了了,容器的IP地址是内网设定的,我们本地访问ip肯定访问不到了。

20/01/16 22:11:04 INFO AppInfoParser: Kafka version: 2.4.0
20/01/16 22:11:04 INFO AppInfoParser: Kafka commitId: 77a89fcf8d7fa018
20/01/16 22:11:04 INFO AppInfoParser: Kafka startTimeMs: 1579183864167
20/01/16 22:11:04 INFO KafkaConsumer: [Consumer clientId=consumer-groupid1-1, groupId=groupid1] Subscribed to topic(s): test, topicongbo
20/01/16 22:11:04 INFO Metadata: [Consumer clientId=consumer-groupid1-1, groupId=groupid1] Cluster ID: Kkwgy0gkSkmGAlsC_5cz9A
20/01/16 22:11:04 INFO AbstractCoordinator: [Consumer clientId=consumer-groupid1-1, groupId=groupid1] Discovered group coordinator kafka3:9092 (id: 2147483644 rack: null)
20/01/16 22:11:06 WARN NetworkClient: [Consumer clientId=consumer-groupid1-1, groupId=groupid1] Error connecting to node kafka3:9092 (id: 2147483644 rack: null)
java.net.UnknownHostException: kafka3
	at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
	at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
	at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
	at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
	at java.net.InetAddress.getAllByName(InetAddress.java:1193)
	at java.net.InetAddress.getAllByName(InetAddress.java:1127)
	at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104)
	at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
	at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
	at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:955)
	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:289)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:572)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:757)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:737)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:260)
	at org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:54)
	at org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:54)
	at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:145)
	at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)
	at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
	at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
	at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
	at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)
	at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

那么这个错误怎么解决的呢,而且华为的安全组我没有权限修改,只能5000-5010的端口对外开放

第二个错误

kafka connect failed to start with 'Uncaught exception in herder work thread… position could be determined''

The error indicates that some records are put into the queue at a faster rate than they can be sent from the client.、

这个错误显示了某些消息记录放入队列的速度超过了发送给客户端的速度

When your Producer (Kafka Connect in this case) sends messages, they are stored in buffer (before sending the to the target broker) and the records are grouped together into batches in order to increase throughput. When a new record is added to the batch, it must be sent within a -configurable- time window which is controlled by request.timeout.ms (the default is set to 30 seconds). If the batch is in the queue for longer time, a TimeoutException is thrown and the batch records will then be removed from the queue and won't be delivered to the broker.

Increasing the value of request.timeout.ms should do the trick for you.

In case this does not work, you can also try decreasing batch.size so that batches are sent more often (but this time will include fewer messages) and make sure that linger.ms is set to 0 (which is the default value).

If you still get the error I assume that something wrong is going on with your network. Have you enabled SSL?

发布了66 篇原创文章 · 获赞 31 · 访问量 2万+

猜你喜欢

转载自blog.csdn.net/weixin_43272605/article/details/104011859