Kafaka配置

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" 
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
         http://www.springframework.org/schema/beans/spring-beans.xsd 
         http://www.springframework.org/schema/context 
         http://www.springframework.org/schema/context/spring-context.xsd"> 
 
    <!-- 定义producer的参数 --> 
    <bean id="producerProperties" class="java.util.HashMap"> 
        <constructor-arg> 
            <map> 
                <entry key="bootstrap.servers" value="192.168.0.129:2222" /> 
                <entry key="group.id" value="0" /> 
                <entry key="retries" value="1" /> 
                <entry key="batch.size" value="16384" /> 
                <entry key="linger.ms" value="1" /> 
                <entry key="buffer.memory" value="33554432" /> 
                <entry key="key.serializer" 
                       value="org.apache.kafka.common.serialization.StringSerializer" /> 
                <entry key="value.serializer" 
                       value="org.apache.kafka.common.serialization.StringSerializer" /> 
            </map> 
 
        </constructor-arg> 
    </bean> 
 
    <!-- 创建kafkatemplate需要使用的producerfactory bean --> 
    <bean id="producerFactory" 
          class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 
        <constructor-arg> 
            <ref bean="producerProperties" /> 
        </constructor-arg> 
    </bean> 
 
    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 --> 
    <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> 
        <constructor-arg ref="producerFactory" /> 
        <constructor-arg name="autoFlush" value="true" /> 
        <property name="defaultTopic" value="tracker" /> 
    </bean> 
   
 
    <!-- 定义consumer的参数 --> 
    <bean id="consumerProperties" class="java.util.HashMap"> 
        <constructor-arg > 
            <map> 
                <entry key="bootstrap.servers" value="192.168.0.129:9000"/> 
                <entry key="group.id" value="0"/> 
                <entry key="enable.auto.commit" value="true"/> 
                <entry key="auto.commit.interval.ms" value="1000"/> 
                <entry key="session.timeout.ms" value="30000"/> 
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> 
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> 
            </map> 
        </constructor-arg>
    </bean> 
 
    <!-- 创建消费者 --> 
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 
        <constructor-arg> 
            <ref bean="consumerProperties"/> 
        </constructor-arg> 
    </bean> 
 
    <!-- 实际执行消息消费的类 --> 
    <bean id="messageListerner" class="com.xxx.carhere.filter.KafkaConsumerServer"/> 
 
    <!-- 消费者容器配置信息 --> 
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> 
        <constructor-arg value="tracker"/> 
        <property name="messageListener" ref="messageListerner"/> 
    </bean> 
 
    <!-- 创建messageListenerContainer bean,使用的时候,只需要注入这个bean --> 
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" 
          init-method="doStart"> 
        <constructor-arg ref="consumerFactory"/> 
        <constructor-arg ref="containerProperties"/> 
    </bean> 
 
 
</beans> 

猜你喜欢

转载自blog.csdn.net/tomcatandoracle/article/details/80271072