本文为spark下通过jdbc从mysql读取数据和将计算结果存入到mysql的例子,具体代码如下:
package com.cxd.sql;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class MysqlDemo {
final static String URL = "jdbc:mysql://localhost:3306/spark";
final static String DBTABLE_NAME = "student";
final static String USER = "root";
final static String PASSWORD = "123456";
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("MoveDataToMySqlTest").setMaster("local");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
long sTime = System.currentTimeMillis();
//saveToMysql(spark);
readFromMysql(spark);
long eTime = System.currentTimeMillis();
System.out.println(eTime - sTime);
}
@SuppressWarnings("unused")
private static void saveToMysql(SparkSession spark)
{
Dataset<Row> stuDf = spark.read().parquet("parquet.res/*");
stuDf.write()
.format("jdbc")
.option("url",URL)
.option("dbtable",DBTABLE_NAME)
.option("user", USER)
.option("password", PASSWORD)
.save();
}
private static void readFromMysql(SparkSession spark)
{
Map<String,String> options=new HashMap<String,String>();
options.put("url", URL);
options.put("dbtable",DBTABLE_NAME);
options.put("user", USER);
options.put("password", PASSWORD);
Dataset<Row> jdbcDF = spark.read().format("jdbc").options(options).load();
//jdbcDF.write().parquet(warehouse path);此处可将数据转换成parquet格式文件存入到数据仓库中
jdbcDF.createOrReplaceTempView("table1");
String sql = "select * from table1 where sage=24";
spark.sql(sql).show();
}
@SuppressWarnings("unused")
private static void readFormSparkSql(SparkSession spark)
{
spark.sql("select * from table1 where imsi = '460079856771464'").show();
}
}
附环境版本信息:
spark:2.2.0
jdk:1.8.0_112