最近做了点Hbase抽取权重的业务,也是第一次做吧,所以记录下来,仅供参考。代码已经注释,如果有问题,可以留言提问。
package com.#data.uaa;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.#data.face.ScanEngine;
import com.#data.face.model.*;
import com.#data.face.model.passport.*;
import com.#data.face.mr.*;
import knowledge.face.RpcCommon.User;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;
import com.google.protobuf.Message;
import com.#data.face.DataCenter;
import com.#data.knowledge.face.store.FaceType;
public class DeviceScan extends FaceMapper<Text, Text> {
private static final Logger LOG = Logger.getLogger(DeviceScan.class);
public static void main(String[] args) throws Exception {
User user = User.newBuilder() // 构建User Builder
.setName("") // 设置扫描用户名
.setPassword("") // 设置扫描密码
.build();
Scan scan = FaceScan.newBuilder() // 构建FaceScan Builder
.maxResultSize(1024L * 1024L) // 设置扫描结果最大返回字节,防止极少数很大的单条数据影响扫描效率
.family("identification").family("device") // 添加要扫描的一级字段,调用多
// 次即可选择多个一级字段,只有为All_COLUMNS权限时可以使用
.family("profile")
.column("preference", "tag")
.build();
Configuration conf = FaceConfiguration.newBuilder() // 构建Configuration的Builder
.create(DataCenter.JYLT) // 设置要扫描的集群T
.type(FaceType.DEVICE) // 目前设置扫描的数据表DEVICE(设备表)或者PASSPORT(用户表)
.output("output dir") // 设置扫描结果输出路径
.user(user) // 目前扫描的帐号
.scan(scan) // 设置scan
.engine(ScanEngine.MAPREDUCE) // 使用 mapreduce 必须设置为 ScanEngine.MAPREDUCE
.maps(1000) // 设置map数目(1--1000), 默认值为1000, 最大值为1000
.build();
try {
new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf);
job.setNumReduceTasks(0);
// 设置扫描的map实现类
job.setMapperClass(DeviceScan.class);
job.setJarByClass(DeviceScan.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FaceMapReduce.initEntityMapperJob(job);
// TableMapReduceUtil.addDependencyJars(job);
// TableMapReduceUtil.initCredentials(job);
job.waitForCompletion(true);
} catch (Exception e) {
LOG.error("running mr job error", e);
}
}
// 用户扫描实现的map函数
@Override
protected void map(FaceIdWritable key, Message value, Context context) throws IOException, InterruptedException {
long entity_id = key.getId(); // 消息对应的entity_id
//if (entity_id % 1000 != 344) return; //需求过滤
UserModel userModel = (UserModel) value; // 将message强转为扫描的类型
String device_id=""; //过滤空字符,防止报错
if (userModel.getIdentification().getKeyId().getDeviceIdList().isEmpty()){
device_id="";}
else{
device_id = userModel.getIdentification().getKeyId().getDeviceId(0);}
List<WeightedLabel> tagObj = userModel.getPreference().getFavoriteTag().getItemList();
List<String> tagList = new ArrayList<>();
for (WeightedLabel tag : tagObj)
tagList.add(tag.getLabel() + ":" + tag.getWeight());
String tags = StringUtils.join(tagList, ",");
String result = String.valueOf(tags);
context.write( new Text(device_id), new Text(result));//写进文件,然后利用hive 导入表格,在利用lateral view explode split 进行解析
}
}