pom.xml关联包:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-flume-ng</artifactId>
</dependency>
</dependencies>
log4j2.xml配置:
<?xml version="1.0" encoding="UTF-8"?>
<!--设置log4j2的自身log级别为warn-->
<!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
<!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,
当设置成trace时,你会看到log4j2内部各种详细输出-->
<!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数-->
<configuration status="warn" monitorInterval="30">
<!--先定义所有的appender-->
<appenders>
<!--这个输出控制台的配置-->
<console name="Console" target="SYSTEM_OUT">
<!--输出日志的格式-->
<PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
</console>
<Flume name="eventLogger" compress="true" type="avro">
<Agent host="master" port="4444"/>
<Agent host="note1" port="4444"/>
<RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
</Flume>
</appenders>
<!--然后定义logger,只有定义了logger并引入的appender,appender才会生效-->
<loggers>
<root level="all">
<appender-ref ref="Console"/>
<AppenderRef ref="eventLogger"/>
</root>
</loggers>
<!-- <Loggers>
<Logger name="org.apache.kafka" level="ON"/>
<Logger name="org.springframework.kafka.listener.KafkaMessageListenerContainer" level="ON"/>
<Root level="INFO">
<AppenderRef ref="Console"/>
<AppenderRef ref="infoLog"/>
<AppenderRef ref="errorLog"/>
<AppenderRef ref="debugLog"/>
</Root>
</Loggers> -->
</configuration>
flume新建一个conf文件flume-kafka.conf配置如下:
agent1.sources = s1
agent1.sinks = k2
agent1.channels = c2
# Describe/configure the source
agent1.sources.s1.type = avro
agent1.sources.s1.bind = master
agent1.sources.s1.port = 4444
agent1.sinks.k2.type = logger
agent1.sinks.k2.maxBytesToLog = 4096
agent1.channels.c2.type=memory
agent1.channels.c2.capacity=10000
agent1.channels.c2.transactionCapacity=10000
# Bind the source and sink to the channel
agent1.sources.s1.channels = c2
agent1.sinks.k2.channel = c2
#连接kafka
agent1.sinks.k2.channel=c2
agent1.sinks.k2.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k2.kafka.bootstrap.servers=master:9092,note1:9092,note2:9092
agent1.sinks.k2.kafka.topic=test
agent1.sinks.k2.kafka.batchSize=20
agent1.sinks.k2.kafka.producer.requiredAcks=1
#设置编码
agent1.sinks.k2.custom.encoding=UTF-8
启动命令:
bin/flume-ng agent -c conf -f conf/flume-kafka.conf -n agent1 -Dflume.root.logger=INFO,console