Spark可以通过不同途径读取和存储数据。以下是基于java编程的。
一,读存文本文件里的数据
1,读取数据;file是文件路径
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> inputData = sc.textFile(file);
sc.stop();
2,存储数据
JavaRDD<String> errorsRDD = inputData.filter(x -> x.contains("error"));
errorsRDD.saveAsTextFile("E:\\errorsspark");
二,读存json数据
//json数据对应的java类
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
1,读取json数据;
public class ParseJson implements FlatMapFunction<Iterator<String>, Person> {
@Override
public Iterator<Person> call(Iterator<String> lines) throws Exception {
ArrayList<Person> people = new ArrayList<>();
ObjectMapper mapper = new ObjectMapper();
while (lines.hasNext()) {
String line = lines.next();
try {
people.add(mapper.readValue(line, Person.class));
} catch (Exception e) {
// 跳过失败的数据
}
}
return people.iterator();
}
}
JavaRDD<String> input = sc.textFile("E:\\file.json").map(p -> new String(p.getBytes(), 0, p.length(), "utf-8"));//默认是utf-8,需要和文件编码相同,我是将txt默认编码改成了utf-8
JavaRDD<Person> result = input.mapPartitions(new ParseJson())
.filter(
x->x.getName().equals("gch"));
2,存储json数据
public class WriteJson implements FlatMapFunction<Iterator<Person>, String> {
@Override
public Iterator<String> call(Iterator<Person> people) throws Exception {
ArrayList<String> text = new ArrayList<>();
ObjectMapper mapper = new ObjectMapper();
while (people.hasNext()) {
Person person = people.next();
text.add(mapper.writeValueAsString(person));
}
return text.iterator();
}
}
JavaRDD<String> formatted = result.mapPartitions(new WriteJson());//result是上面的获取到的
formatted.saveAsTextFile("E:\\wjson");
三,读存CSV数据
1,读取CSV 数据,需要把文件当作普通文本文件来读取数据,再对数据进行处理。Java中使用的是opencsv库。
如果字段中没有换行符
import au.com.bytecode.opencsv.CSVReader;
import Java.io.StringReader;
public class ParseLine implements Function<String, String[]> {
public String[] call(String line) throws Exception {
CSVReader reader = new CSVReader(new StringReader(line));
return reader.readNext();
}
}
JavaRDD<String> csvFile = sc.textFile(inputFile);
JavaPairRDD<String[]> csvData = csvFile.map(new ParseLine());
如果字段中有换行符,读取整个文件再处理
public static class ParseLine implements FlatMapFunction<Tuple2<String, String>, String[]> {
public Iterable<String[]> call(Tuple2<String, String> file) throws Exception {
CSVReader reader = new CSVReader(new StringReader(file));
return reader.readAll();
}
}
JavaPairRDD<String, String> csvData = sc.wholeTextFiles(inputFile);
JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());
2,存储CSV