转载地址:https://blog.csdn.net/weixin_40209426/article/details/81220313
在项目过程中,有这样一个需求,需要用storm消费实时流日志,存储hdfs,如果伙伴们要是抱着自己开发存储的心态去做,可能会耗费一定的开发周期,比较不划算,因为官方做了一个storm-hdfs的开发包供storm使用者轻松开发storm程序存储hdfs,但是这个包中是按固定路径去存储的,在实际生产业务中,我们往往会有利用tuple中的字段作为存储路径的需求,这样的话,只需要重写一个源码中的一些类即可满足以上要求,下面我贴出更改的注意事项。
先说明一下,这里pom依赖是:
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>1.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
我这里只贴出storm之外的,storm相关这里不多说。
然后我粘贴一下topology构建的主方法,然后围绕主方法做一些讲解和说明:
import com.xes.bolt.NewHdfsBolt;
import com.xes.bolt.SumBolt;
import com.xes.business.LogFileNameFormat;
import com.xes.business.PathPartitioner;
import com.xes.spout.CommonSpout;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.hdfs.common.Partitioner;
import org.apache.storm.topology.TopologyBuilder;
public class AppTest
{
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
Partitioner partitioner = new PathPartitioner();
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("\\n");
SyncPolicy syncPolicy = new CountSyncPolicy(1);
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES);
FileNameFormat fileNameFormat = new LogFileNameFormat().withPath("/wd/storm").withPrefix("test_").withExtension(".log");
NewHdfsBolt hdfsBolt = new NewHdfsBolt().withFsUrl("hdfs://10.99.2.194:9000")
.withRecordFormat(format)
.withFileNameFormat(fileNameFormat)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.withPartitioner(partitioner);
builder.setSpout("data_spout",new CommonSpout());
builder.setBolt("sum_bolt",new SumBolt()).shuffleGrouping("data_spout");
builder.setBolt("hdfs_bolt",hdfsBolt).shuffleGrouping("sum_bolt");
try {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("topo_wd", new Config(), builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
}
其中data_spout是我自定的一个数据源,每2秒往程序中喷发测试数据。
sum_bolt做数据格式清洗和转换,这里不多说。
原来的hdfs_bolt是封装好的一层bolt,它继承了BaseRichBolt类
源码地址:https://github.com/apache/storm/tree/master/external/storm-hdfs
然后重写的过程如下:
首先是写个Partitioner的实现类,原框架里只有一个实现类,叫NullPartition,返回是空。
我们自定义PathPartitioner:
public class PathPartitioner implements Partitioner{
@Override
public String getPartitionPath(final Tuple tuple) {
String steam = tuple.getString(0);
org.json.simple.JSONObject json = null;
try
{
json = (JSONObject) JSONValue.parse(steam);
}
catch(Exception e)
{
e.printStackTrace();
}
String hostname = (String) json.get("hostname");
String filename = (String) json.get("filename");
return "/"+hostname+"/"+filename;
}
}
这个类的作用是解析tuple中的关键信息作为路径。
然后是重写AbstractHdfsBolt这个类,新名字叫NewAbstractHdfsBolt.java。
58行更改为:
this.partitioner = new PathPartitioner();
然后按照主方法的注册方式,把存储hdfs的地址、路径,头,尾椎名都带上即可。
解析数据的方式写在了PathPartition里,如果格式有变化,只需要更改这个即可。