版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/xhpscdx/article/details/75258662
一 spring-kafka介绍
- spring-kafka是在kafka-clients的基础上的封装。
- 主要提供KafkaTemplate
二 pom 配置
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>4.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
- 只需要配置spring-kafka,他内部依赖了kafka-clients,可以根据spring的verison版本调整spring-kafka版号。
- 请不依赖kafka_2.10,spring-integration-kafka,貌似只有低版本需要。
三 producer配置
<context:property-placeholder location="classpath:kafka.properties"/>
<!-- 定义producer的参数 http://blog.csdn.net/molingduzun123/article/details/51785141 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="0"/>
<entry key="retries" value="10"/>
<entry key="batch.size" value="16384"/>
<entry key="linger.ms" value="1"/>
<entry key="buffer.memory" value="33554432"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
</map>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties"/>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
<property name="defaultTopic" value="page_visits5"/>
</bean>
1.hashMap参数集合,可以根据自己需求调整。
2.KafkaTemplate提供各种丰富方法。
public class SpringKafkaProducer extends WebBaseTest {
private static final Logger logger = LoggerFactory.getLogger(SnapshotController.class);
@Autowired
private KafkaTemplate<String, String> kafkaProductTemplate;
@Test
public void assignPartitionByKey() throws Exception {
try {
//Assign topicName to string variable
String topicName = "page_visits5";
for (int i = 0; i < 50; i++) {
for(int j=0;j<2;j++) {
ListenableFuture<SendResult<String, String>> future=kafkaProductTemplate.send(topicName,
Integer.toString(j),"ddddddddd洪10002" + i);
logger.info("Message sent successfully");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
四 consumer配置
- consumer提供四种MessageListener
- MessageListener –单条消息自动提交.
- BatchMessageListener –批量消息自动提交,就是ConsumerRecords
<!-- consumer -->
<!-- 定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="group-new"/>
<entry key="enable.auto.commit" value="false"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="max.poll.records" value="10"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<!-- 创建consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- 实际执行消息消费的类 -->
<bean id="messageListernerConsumerService" class="com.calm.b.kafka.service.SpringKafkaConsumerListener"/>
<bean id="springKafkaConsumerAckListener" class="com.calm.b.kafka.service.SpringKafkaConsumerAckListener"/>
<bean id="ackmode" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">
<property name="staticField"
value="org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE" />
</bean>
<!-- 消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="page_visits6"/>
<property name="messageListener" ref="springKafkaConsumerAckListener"/>
<property name="pollTimeout" value="1000"/>
<property name="ackMode" ref="ackmode" />
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
<property name="concurrency" value="3"/>
</bean>
- 手动确认的消费者
/**
* @author andrexu
* @create 2017-07-13
*/
public class SpringKafkaConsumerAckListener implements AcknowledgingMessageListener<String, String> {
private Logger log = LoggerFactory.getLogger(KafkaConsumeService.class);
@Override
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info(record.toString());
acknowledgment.acknowledge();
}
}
- 自动确认消费者
/**
* @author andrexu
* @create 2017-07-13
*/
public class SpringKafkaConsumerListener implements MessageListener<Integer, String> {
private Logger log = LoggerFactory.getLogger(KafkaConsumeService.class);
@Override
public void onMessage(ConsumerRecord<Integer, String> consumerRecord) {
log.info("======"+consumerRecord);
}
}
五 @KafkaListener的用法
同上面consumer消费方式的一样,也四种不同消费方式,不同消费方式需要不同的配置支持。
@KafkaListener 需要@EnableKafka来开启配置,监听类SpringKafkaConsumerServce类也需要在配置bean里初始化。
/**
* @author andrexu
* @create 2017-07-13
*/
@Configuration
@EnableKafka
public class kafkaConfig {
private Logger log = LoggerFactory.getLogger(kafkaConfig.class);
@Autowired
private DefaultKafkaConsumerFactory consumerFactory;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory
() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(1000);
log.info("init kafkaListener annotation container Factory");
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =new HashedMap();
consumerProps.putAll(consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerProps;
}
@Bean
public ConsumerFactory<String, String> manualConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(manualConsumerFactory());
ContainerProperties props = factory.getContainerProperties();
props.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
props.setIdleEventInterval(100L);
// factory.setRecordFilterStrategy(manualFilter());
props.setPollTimeout(1000L);
factory.setAckDiscarded(true);
// factory.setRetryTemplate(new RetryTemplate());
factory.setRecoveryCallback(new RecoveryCallback<Void>() {
@Override
public Void recover(RetryContext context) throws Exception {
return null;
}
});
return factory;
}
/**
* 先init他们
* @return
*/
@Bean
public SpringKafkaConsumerServce kafkaListeners() {
return new SpringKafkaConsumerServce();
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
/**
* @author andrexu
* @create 2017-07-13
*/
public class SpringKafkaConsumerServce {
private Logger log = LoggerFactory.getLogger(SpringKafkaConsumerServce.class);
@KafkaListener(id = "bar5", topicPartitions =
{ @TopicPartition(topic = "page_visits5", partitions = {"0"}),
},group="group-new" )
public void listen(ConsumerRecord<String, String> record) {
log.info("only partion zero :"+ record);
}
@KafkaListener(id = "bar6", topicPartitions =
{ @TopicPartition(topic = "page_visits5", partitions = {"1"}),
},group="group-new" )
public void listenByTwo(ConsumerRecord<String, String> record) {
log.info("only two partion zero :"+ record);
}
@KafkaListener(id = "baz",topics ={"page_visits8"} ,group="group-new",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listenByOne(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info("only partion one :"+ record);
ack.acknowledge();
}
// @KafkaListener(id = "bar3",topics ={"page_visits8"} ,group="group-new" )
// public void listenByOne(ConsumerRecord<String, String> record) {
//
// log.info("only partion one :"+ record);
//
// }
}