场景描述:
netflow是cisco路由器中产生的flow元数据,默认情况下会每5分钟从cache中对外吐一次。
netflow相关信息见http://www.cisco.com/en/US/prod/collateral/iosswrel/ps6537/ps6555/ps6601/prod_case_study0900aecd80
311fc2.pdf
公司有服务器对netflow数据进行收集,收集到的文件为二进制.data文件。现在需要对元数据抽取出源IP、目的IP、源端口、目的端口等信息,转换为txt文件,供后续大数据分析使用。目前有100多个路由器产生flow数据,大概每5分钟有至少100个data文件,共500M左右(每秒7万条flow产生)。
解决方案一:
使用java多线程调用nfdump转换为txt文件。
思路:起一个线程扫描产生data文件的路径;扫描到的文件名存在资源池中,再起多线程(暂定3个)从资源池中取出数据进行处理,处理完后生成的txt放入另一个路径中,删除原路径下的data文件。
具体转换逻辑:
1. java process 调用shell脚本
2. shell脚本中执行nfdump命令进行转换及转换后删除原data文件。这一步需要在服务器上安装nfdump(yum install nfdump)
多线程启动部分代码忽略,贴一下转换过程代码。
java代码:
import java.util.Date;
/**
* 调用shell命令,转换出txt文本
* @author wyh
*
*/
public class ExchangeTxt {
public void execute(String fileName,String path,String targetPath) {
String filePath = path+File.separator+fileName;
String cmdStr = null;
if(filePath != null && filePath.length()>0) {
cmdStr = "/home/hadoop/testfiles/test1.sh "+filePath+" "+targetPath+File.separator+fileName+".txt";
try {
System.out.println("即将执行:"+cmdStr);
Process process = Runtime.getRuntime().exec(cmdStr);
int exitValue = process.waitFor();
System.out.println("执行完毕,返回值"+exitValue);
if (0 != exitValue) {
System.err.println("call shell failed. error code is :" + exitValue);
}
} catch (Exception e) {
System.err.println("call shell failed. ");
e.printStackTrace();
}
}
}
}
相应的shell脚本 test1.sh
#!/bin/bash
if [[ $# -lt 1 ]] ; then echo no params ; exit ; fi
nfdump -r $1 -o "fmt:%sap|%dap|%fwd|%flg|%pr|%stos|%dtos|%pkt|%byt|%in|%out|%sas|%das|%smk|%dmk|%dir|%nhb" >$2
rm -rf $1
脚本分析:原data文件的绝对路径通过参数1传递进来,要保存到的目标文件绝对路径通过参数2传递进来,nfdump命令 -r指明原flow的data文件, -o可定义要转换的列(具体可见nfdump相关网站),>后为要转换到的文件。从java代码可看出,我这里直接在原文件名后缀.txt,保存路径变更了一下(/home/hadoop/00 --> /home/hadoop/testfiles)
转换得到的文件如图:
注意,在执行shell脚本之前,需要将test1.sh的权限放开(chmod 777 test1.sh),否则执行报错:permission denied
此外,在测试过程发现,如果直接使用process执行nfdump命令,转换失败,报错,process.waitfor()返回值为255,查阅了一下相关返回值定义,并没有255。这里没搞清楚,直接换成了执行sh脚本。有知道的请指教一下255什么意思。