需求:
spark读取HBase中的数据,提取某三列,经过聚合,输出到mysql中。
代码实现:
import java.util.Properties import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.sql.types.{DataTypes, StructField} import org.apache.spark.sql.{RowFactory, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ListBuffer object Demo { def main(args: Array[String]): Unit = { val sconf = new SparkConf().setAppName("demo").setMaster("local") val sc = new SparkContext(sconf) val spark = SparkSession.builder().getOrCreate() val prop = new Properties() prop.put("user", "root") prop.put("password", "123456") val url = "jdbc:mysql://192.168.1.12:3306/jp_analysis?useUnicode=true&characterEncoding=utf-8" // hbase配置 val tableName = "buss_stu_info_tmp3" // val tableName = "user" val hconf = HBaseConfiguration.create() hconf.set("hbase.zookeeper.quorum","192.168.1.11, 192.168.1.12, 192.168.1.13") hconf.set("hbase.zookeeper.property.clientPort", "2181") hconf.set(TableInputFormat.INPUT_TABLE, tableName) // 读取数据并转化成rdd val hBaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val rdd = hBaseRDD.map(rdd => { val values = rdd._2 val createtime = Bytes.toString(values.getValue(Bytes.toBytes("extInfo"), Bytes.toBytes("createtime"))).substring(0, 10).replaceAll("-", "") val discode = Bytes.toString(values.getValue(Bytes.toBytes("baseInfo"), Bytes.toBytes("discode"))) val inscode = Bytes.toString(values.getValue(Bytes.toBytes("baseInfo"), Bytes.toBytes("inscode"))) (createtime, discode, inscode) }) // 已经聚合 val rdd2 = rdd.map((_, 1)).reduceByKey(_ + _).sortBy(_._1._1).map(line => { RowFactory.create(line._1._1, line._1._2, line._1._3, new Integer(line._2)) })//.foreach(println(_)) val structFields = new ListBuffer[StructField] structFields.append(DataTypes.createStructField("datestring", DataTypes.StringType, false)) structFields.append(DataTypes.createStructField("district", DataTypes.StringType, false)) structFields.append(DataTypes.createStructField("inscode", DataTypes.StringType, false)) structFields.append(DataTypes.createStructField("num", DataTypes.IntegerType, true)) // 构建StructType,用于最后的DataFrame元数据描述 val structType = DataTypes.createStructType(structFields.toArray) // 构造DataFrame val stuDF = spark.createDataFrame(rdd2, structType) // 写入数据 stuDF.write.mode("append").jdbc(url, "buss_stu_info", prop) sc.stop() } }