1、流程
启动一个定时任务:
--定时监测日志源目录
--获取需要采集的文件
--移动这些文件到一个待上传临时目录中
--遍历待上传目录中各个文件,逐一传输到HDFS的目标路径,同时将传输完成的文件移动到备份目录中去
启动一个定时任务
--探测备份目录中的备份数据,检查是否已经超出最长备份时长,如果超出,则删除。
2、规划各种路径
日志路径:d:/logs/accesslog/
待上传路径:d:/logs/toupload/
备份路径:d:/logs/backup/日期/
HDFS存储路径 /logs/日期
hdfs中的文件前缀:access_log_
hdsf中的文件后缀:.log
import java.util.Timer; public class DataCollectMain { public static void main(String[] args) { Timer timer = new Timer(); //定时任务 timer.schedule(new CollectTask(), 0, 60*60*1000L); timer.schedule(new BackupCleanTask(), 0, 60*60*1000L); } }
package cn.edu360.hdfs.datacollect; import java.io.File; import java.io.FilenameFilter; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.Properties; import java.util.TimerTask; import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; public class CollectTask extends TimerTask { @Override public void run() { /** * ——定时探测日志源目录 ——获取需要采集的文件 ——移动这些文件到一个待上传临时目录 * ——遍历待上传目录中各文件,逐一传输到HDFS的目标路径,同时将传输完成的文件移动到备份目录 * */ try { // 获取配置参数 Properties props = PropertyHolderLazy.getProps(); // 构造一个log4j日志对象 Logger logger = Logger.getLogger("logRollingFile"); // 获取本次采集时的日期 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); String day = sdf.format(new Date()); File srcDir = new File(props.getProperty(Constants.LOG_SOURCE_DIR)); // 列出日志源目录中需要采集的文件 File[] listFiles = srcDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { if (name.startsWith(props.getProperty(Constants.LOG_LEGAL_PREFIX))) { return true; } return false; } }); // 记录日志 logger.info("探测到如下文件需要采集:" + Arrays.toString(listFiles)); // 将要采集的文件移动到待上传临时目录 File toUploadDir = new File(props.getProperty(Constants.LOG_TOUPLOAD_DIR)); for (File file : listFiles) { FileUtils.moveFileToDirectory(file, toUploadDir, true); } // 记录日志 logger.info("上述文件移动到了待上传目录" + toUploadDir.getAbsolutePath()); // 构造一个HDFS的客户端对象 FileSystem fs = FileSystem.get(new URI(props.getProperty(Constants.HDFS_URI)), new Configuration(), "root"); File[] toUploadFiles = toUploadDir.listFiles(); // 检查HDFS中的日期目录是否存在,如果不存在,则创建 Path hdfsDestPath = new Path(props.getProperty(Constants.HDFS_DEST_BASE_DIR) + day); if (!fs.exists(hdfsDestPath)) { fs.mkdirs(hdfsDestPath); } // 检查本地的备份目录是否存在,如果不存在,则创建 File backupDir = new File(props.getProperty(Constants.LOG_BACKUP_BASE_DIR) + day + "/"); if (!backupDir.exists()) { backupDir.mkdirs(); } for (File file : toUploadFiles) { // 传输文件到HDFS并改名access_log_ Path destPath = new Path(hdfsDestPath + "/" + UUID.randomUUID() + props.getProperty(Constants.HDFS_FILE_SUFFIX)); fs.copyFromLocalFile(new Path(file.getAbsolutePath()), destPath); // 记录日志 logger.info("文件传输到HDFS完成:" + file.getAbsolutePath() + "-->" + destPath); // 将传输完成的文件移动到备份目录 FileUtils.moveFileToDirectory(file, backupDir, true); // 记录日志 logger.info("文件备份完成:" + file.getAbsolutePath() + "-->" + backupDir); } } catch (Exception e) { e.printStackTrace(); } } }
package cn.edu360.hdfs.datacollect; import java.io.File; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimerTask; import org.apache.commons.io.FileUtils; public class BackupCleanTask extends TimerTask { @Override public void run() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); long now = new Date().getTime(); try { // 探测本地备份目录 File backupBaseDir = new File("d:/logs/backup/"); File[] dayBackDir = backupBaseDir.listFiles(); // 判断备份日期子目录是否已超24小时 for (File dir : dayBackDir) { long time = sdf.parse(dir.getName()).getTime(); if(now-time>24*60*60*1000L){ FileUtils.deleteDirectory(dir); } } } catch (Exception e) { e.printStackTrace(); } } }
package cn.edu360.hdfs.datacollect; import java.util.Properties; /** * 单例模式:懒汉式——考虑了线程安全 * @author ThinkPad * */ public class PropertyHolderLazy { private static Properties prop = null; public static Properties getProps() throws Exception { if (prop == null) { synchronized (PropertyHolderLazy.class) { if (prop == null) { prop = new Properties(); prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties")); } } } return prop; } }
package cn.edu360.hdfs.datacollect; public class Constants { /** * 日志源目录参数key */ public static final String LOG_SOURCE_DIR = "LOG_SOURCE_DIR"; /** * 日志待上传目录参数key */ public static final String LOG_TOUPLOAD_DIR = "LOG_TOUPLOAD_DIR"; public static final String LOG_BACKUP_BASE_DIR = "LOG_BACKUP_BASE_DIR"; public static final String LOG_BACKUP_TIMEOUT = "LOG_BACKUP_TIMEOUT"; public static final String LOG_LEGAL_PREFIX = "LOG_LEGAL_PREFIX"; public static final String HDFS_URI = "HDFS_URI"; public static final String HDFS_DEST_BASE_DIR = "HDFS_DEST_BASE_DIR"; public static final String HDFS_FILE_PREFIX = "HDFS_FILE_PREFIX"; public static final String HDFS_FILE_SUFFIX = "HDFS_FILE_SUFFIX"; }
LOG_SOURCE_DIR=d:/logs/accesslog/ LOG_TOUPLOAD_DIR=d:/logs/toupload/ LOG_BACKUP_BASE_DIR=d:/logs/backup/ LOG_BACKUP_TIMEOUT=24 LOG_LEGAL_PREFIX=access.log. HDFS_URI=hdfs://hdp-01:9000/ HDFS_DEST_BASE_DIR=/logs/ HDFS_FILE_PREFIX=access_log_ HDFS_FILE_SUFFIX=.log
<configuration> <property> <name>dfs.replication</name> <value>4</value> </property> <property> <name>dfs.blocksize</name> <value>16m</value> </property> </configuration>
### \u8BBE\u7F6E### #log4j.rootLogger=debug,stdout,genlog log4j.rootLogger=INFO,logRollingFile,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n ### log4j.logger.logRollingFile= ERROR,test1 log4j.appender.test1 = org.apache.log4j.RollingFileAppender log4j.appender.test1.layout = org.apache.log4j.PatternLayout log4j.appender.test1.layout.ConversionPattern =%d{yyyy-MMM-dd HH:mm:ss}-[TS] %p %t %c - %m%n log4j.appender.test1.Threshold = DEBUG log4j.appender.test1.ImmediateFlush = TRUE log4j.appender.test1.Append = TRUE log4j.appender.test1.File = d:/logs/collect/collect.log log4j.appender.test1.MaxFileSize = 102400KB log4j.appender.test1.MaxBackupIndex = 200 ### log4j.appender.test1.Encoding = UTF-8