Kafka 2017 Update(2)Kafka Producer/Consumer and Architecture
Spark Streaming/Apache Storm
Input Data Stream —> Spark Streaming —> batches of input data —> Spark Engine —> batches of processed data
Try the Word Count Example
https://spark.apache.org/docs/latest/streaming-programming-guide.html
Try that in Scala
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(10))
Exceptions:
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Solution:
Zeppelin already created a sc context for us, so we can directly use this.
val ssc = new StreamingContext(sc, Seconds(10))
Or we can directly run the example
>bin/run-example streaming.NetworkWordCount localhost 9999
>nc -lk 9999
hello world
It will show in the console
Time: 1515044374000 ms
-------------------------------------------
(hello,1)
(world,1)
Kafka Concept
Consumer Group - group ID
Consumer Position - offset
Kafka Producer
pom.xml change is as follow:
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
Running the Producer to send 10 messages to the topic of Kafka
package com.sillycat.sparkjava.app;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProducerApp {
private final static String TOPIC = "sillycat-topic";
private final static String BOOTSTRAP_SERVERS = "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092";
private Producer<Long, String> createProducer() {
Properties props = new Properties();
// server list
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// client ID
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerApp");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
public void runProducer(final int sendMessageCount) throws Exception {
final Producer<Long, String> producer = createProducer();
long time = System.currentTimeMillis();
try {
for (long index = time; index < time + sendMessageCount; index++) {
final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index,
"Hello Sillycat " + index);
RecordMetadata metadata = producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - time;
System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(), metadata.offset(), elapsedTime);
}
} finally {
producer.flush();
producer.close();
}
}
public static void main(String[] args) {
KafkaProducerApp app = new KafkaProducerApp();
try {
app.runProducer(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Kafka Consumer to Pull the Data from Topic
package com.sillycat.sparkjava.app;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaConsumerApp {
private final static String TOPIC = "sillycat-topic";
private final static String BOOTSTRAP_SERVERS = "fr-stage-api:9092,fr-stage-consumer:9092,fr-perf1:9092";
private Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerApp");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
public void runConsumer() throws InterruptedException {
final Consumer<Long, String> consumer = createConsumer();
final int giveUp = 100;
int noRecordsCount = 0;
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
if (consumerRecords.count() == 0) {
noRecordsCount++;
if (noRecordsCount > giveUp)
break;
else
continue;
}
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),
record.partition(), record.offset());
});
// which commit offsets returned on the last call to consumer.poll
consumer.commitAsync();
}
consumer.close();
System.out.println("DONE");
}
public static void main(String[] args) {
KafkaConsumerApp app = new KafkaConsumerApp();
try {
app.runConsumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Kafka Architecture
Records - key(optional), value, timestamp
A topic has a Log which is the topic’s storage on disk, it can be broken up into partitions and segments.
Kafka appends records from a producer(s) to the end of a topic log. A topic log consists of many partitions that are spread over multiple files which can be spread on multiple Kafka cluster nodes.
Consumers read from Kafka topics from where they left off reading.
http://cloudurable.com/blog/kafka-architecture/index.html
Kafka Topic Architecture
http://cloudurable.com/blog/kafka-architecture-topics/index.html
Kafka Consumer Architecture
http://cloudurable.com/blog/kafka-architecture-consumers/index.html
Kafka Producer Architecture
http://cloudurable.com/blog/kafka-architecture-producers/index.html
References:
http://sillycat.iteye.com/blog/2215237
http://sillycat.iteye.com/blog/2406572
http://sillycat.iteye.com/blog/2370527
http://www.cnblogs.com/gaopeng527/p/4959633.html
https://www.jianshu.com/p/d49460799e5b
http://www.cnblogs.com/huxi2b/p/6223228.html
http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html
http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html
Kafka update 1
http://sillycat.iteye.com/blog/2406569
http://blog.csdn.net/eric_sunah/article/details/49762839
https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/0.1%20Spark%20Streaming%20%E5%AE%9E%E7%8E%B0%E6%80%9D%E8%B7%AF%E4%B8%8E%E6%A8%A1%E5%9D%97%E6%A6%82%E8%BF%B0.md
https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/index.html
http://mangocool.com/1479867274843.html
https://spark.apache.org/docs/latest/streaming-programming-guide.html
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
Kafka 2017 Update(2)Kafka Producer/Consumer and Architecture
猜你喜欢
转载自sillycat.iteye.com/blog/2407303
今日推荐
周排行