1、搭建材料准备
-
jdk 1.8
-
flume1.8
-
用于连接oracle数据库:
-
ojdbc6.jar
-
-
解析JSON字符串:
-
commons-beanutils-1.8.3.jar
-
commons-collections-3.2.1.jar
-
commons-lang-2.6.jar
-
commons-logging-1.1.3.jar
-
ezmorph-1.0.6.jar
-
json-lib-2.4-jdk15.jar
-
2、安装好jdk1.8,然后解压flume1.8,并改名为flume放于/app/中
3、编辑配置文件。
cd /app/flume/conf
vi flume_config.properties
a1.sources = r2
a1.sinks = k2
a1.channels = c2
#################各Server运行错误日志###########################
# 配置读取的日志
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /app/flume/test/node_err_info
# Describe the sink
a1.sinks.k2.type = com.hex.obase.service.OracleSinkErr
# 配置数据库信息
a1.sinks.k2.hostname = *****
a1.sinks.k2.port = ****
a1.sinks.k2.databaseName = ***
a1.sinks.k2.user = *****
a1.sinks.k2.password = *****
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel
a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2
4、编辑Sink,用于对日志文件加工,然后存入Oracle中
package com.hex.obase.service;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import net.sf.json.JSONObject;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.*;
/**
* Created by tzb on 2018/8/24.
*/
public class OracleSinkErr extends AbstractSink implements Configurable {
private Logger logger = LoggerFactory.getLogger(OracleSinkErr.class);
private String hostname;
private String port;
private String databaseName;
private String user;
private String password;
private PreparedStatement preparedStatement;
private Connection conn;
private int batchSize;
public OracleSinkErr() {
logger.info("OracleSinkErr start ...");
}
public void configure(Context context) {
hostname = context.getString("hostname");
Preconditions.checkNotNull(hostname, "hostname must be set!");
port = context.getString("port");
Preconditions.checkNotNull(port, "port must be set!");
databaseName = context.getString("databaseName");
Preconditions.checkNotNull(databaseName, "databaseName must be set!");
user = context.getString("user");
Preconditions.checkNotNull(user, "user must be set!");
password = context.getString("password");
Preconditions.checkNotNull(password, "password must be set!");
batchSize = context.getInteger("batchSize", 10);
Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!");
}
public void start() {
super.start();
String url = "jdbc:oracle:thin:@" + hostname + ":" + port + "/" + databaseName;
try {
Class.forName("oracle.jdbc.OracleDriver");
conn = DriverManager.getConnection(url, user, password);
conn.setAutoCommit(false);
//创建一个Statement对象
preparedStatement = conn.prepareStatement("insert into OBASE_ERR_LOG (PK_ID, MSG) values (?,?)");
} catch (ClassNotFoundException e1) {
e1.printStackTrace();
} catch (SQLException e2) {
e2.printStackTrace();
}
}
public void stop() {
super.stop();
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public synchronized Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event;
String content;
int index = 0;
transaction.begin();
try {
preparedStatement.clearBatch();
for (int i = 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {
content = new String(event.getBody());
if (!"".equals(content)) {
logger.info(content);
//存储 event 的 content
Map<String, String> map = JSONObject.fromObject(content);
preparedStatement.setString(1, UUID.randomUUID().toString().substring(0, 32));
preparedStatement.setString(5, map.get("msg"));
preparedStatement.addBatch();
index++;
}
} else {
result = Status.BACKOFF;
break;
}
}
if (index > 0) {
preparedStatement.executeBatch();
conn.commit();
logger.info("sql执行");
}
transaction.commit();
} catch (Exception e) {
try {
transaction.rollback();
} catch (Exception e2) {
logger.error("Exception in rollback. Rollback might not have been successful.", e2);
}
logger.error("Failed to commit transaction.Transaction rolled back.", e);
Throwables.propagate(e);
} finally {
transaction.close();
}
return result;
}
}
把这个java文件编译成class文件,然后打包成jar包。
把开始准备的jar包和刚生成的jar包放入/app/flume/lib目录下。
5、执行flume。
cd /app/flume
bin/flume-ng agent -c conf -f conf/flume2kafka.properties -n Flume2KafkaAgent -Dflume.root.logger=INFO,console