pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core -->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.13.1-HBase-1.2</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>4.13.1-HBase-1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
调用示例
public class Test {
/**
* phoenix jdbc config
*/
private static final String DB_PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
private static final String DB_PHOENIX_URL = "jdbc:phoenix:hadoop101,hadoop102,hadoop103";
private static final String DB_PHOENIX_USER = "";
private static final String DB_PHOENIX_PASS = "";
private static final String DB_PHOENIX_FETCHSIZE = "10000";
/**
* 加载数据查询SQL
*/
private static final String SQL_QUERY = "(SELECT date,member_id FROM events WHERE time>='%s' AND time<'%s' AND event='login') events";
/**
* 任务名称
*/
private static final String APP_NAME = "Test";
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName(APP_NAME)
.setMaster("local[1]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(
new Class[]{}
);
SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
DateTime start = new DateTime(args[0]), end = new DateTime(args[1]);
String sql = String.format(SQL_QUERY, start.toString("yyyy-MM-dd"), end.toString("yyyy-MM-dd"));
// 正常去重后拼接字符串保存与后边测试binary做对比
sinkDataByVarchar(sparkSession, sql);
sparkSession.stop();
}
/**
* 普通方式去重并存储
*
* @param sparkSession
* @param query
* @return
*/
private static void sinkDataByVarchar(SparkSession sparkSession, String query) {
try {
// JDBC连接属性
Properties connProp = new Properties();
connProp.put("driver", DB_PHOENIX_DRIVER);
connProp.put("user", DB_PHOENIX_USER);
connProp.put("password", DB_PHOENIX_PASS);
connProp.put("fetchsize", DB_PHOENIX_FETCHSIZE);
JavaRDD<Row> rows = sparkSession
.read()
.jdbc(DB_PHOENIX_URL, query, connProp)
.filter("member_id != -1")
.javaRDD()
.mapToPair(r -> new Tuple2<>(
r.getString(0)
, r.getLong(1)
))
.distinct()
.groupByKey()
.map(r -> {
StringBuffer sb = new StringBuffer();
r._2.forEach(v -> {
sb.append(v);
});
return RowFactory.create(r._1, sb.toString());
});
// schema
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("date", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("dist_mem", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
// 写入
String insertTable = "test_string";
sparkSession
.createDataFrame(rows, schema)
.write()
.format("org.apache.phoenix.spark")
.mode(SaveMode.Overwrite)
.option("table", insertTable)
.option("zkUrl", DB_PHOENIX_URL)
.save();
} catch (Exception e) {
e.printStackTrace();
}
}
}
调用:
public class App {
@Test
public void testJob() {
String[] args = new String[]{"2017-06-01", "2017-07-01"};
Test.main(args);
}
}