springboot+flume + log4j2 + kafka

pom.xml关联包:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-flume-ng</artifactId>
        </dependency>
    </dependencies>

log4j2.xml配置:

<?xml version="1.0" encoding="UTF-8"?>
<!--设置log4j2的自身log级别为warn-->
<!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
<!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,
    当设置成trace时,你会看到log4j2内部各种详细输出-->
<!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数-->
<configuration status="warn" monitorInterval="30">
    <!--先定义所有的appender-->
    <appenders>
        <!--这个输出控制台的配置-->
        <console name="Console" target="SYSTEM_OUT">
            <!--输出日志的格式-->
            <PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
        </console>
        
        <Flume name="eventLogger" compress="true" type="avro">
          <Agent host="master" port="4444"/>
          <Agent host="note1" port="4444"/>
          <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
        </Flume>
 
    </appenders>
    <!--然后定义logger,只有定义了logger并引入的appender,appender才会生效-->
    <loggers>
        <root level="all">
            <appender-ref ref="Console"/>
            <AppenderRef ref="eventLogger"/>
        </root>
    </loggers>
     
      <!-- <Loggers>  
        <Logger name="org.apache.kafka" level="ON"/>  
        <Logger name="org.springframework.kafka.listener.KafkaMessageListenerContainer" level="ON"/>  
        <Root level="INFO">  
            <AppenderRef ref="Console"/>  
            <AppenderRef ref="infoLog"/>  
            <AppenderRef ref="errorLog"/>  
            <AppenderRef ref="debugLog"/>  
        </Root>  
    </Loggers> -->
</configuration>


flume新建一个conf文件flume-kafka.conf配置如下:

agent1.sources = s1
agent1.sinks = k2
agent1.channels = c2

# Describe/configure the source
agent1.sources.s1.type = avro
agent1.sources.s1.bind = master
agent1.sources.s1.port = 4444

agent1.sinks.k2.type = logger
agent1.sinks.k2.maxBytesToLog = 4096

agent1.channels.c2.type=memory
agent1.channels.c2.capacity=10000
agent1.channels.c2.transactionCapacity=10000

# Bind the source and sink to the channel
agent1.sources.s1.channels = c2
agent1.sinks.k2.channel = c2

#连接kafka
agent1.sinks.k2.channel=c2
agent1.sinks.k2.type=org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k2.kafka.bootstrap.servers=master:9092,note1:9092,note2:9092
agent1.sinks.k2.kafka.topic=test
agent1.sinks.k2.kafka.batchSize=20
agent1.sinks.k2.kafka.producer.requiredAcks=1
#设置编码
agent1.sinks.k2.custom.encoding=UTF-8

启动命令:

bin/flume-ng agent -c conf -f conf/flume-kafka.conf -n agent1 -Dflume.root.logger=INFO,console

猜你喜欢

转载自blog.csdn.net/JHC_binge/article/details/87721213