import java.io.File;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import org.apache.hadoop.examples.EJob;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
public class ReadDB {
public static class Map extends MapReduceBase implements
Mapper<LongWritable, StudentRecord, LongWritable, Text> {
// map
public void map(LongWritable key, StudentRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
collector.collect(new LongWritable(value.id),
new Text(value.toString()));
}
}
//reducer
public static class Reduce extends MapReduceBase implements Reducer<LongWritable,Text,LongWritable,Text>{
@Override
public void reduce(LongWritable key, Iterator<Text> value,
OutputCollector<LongWritable,Text> collector, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
while (value.hasNext()){
collector.collect(key,value.next());
}
}
}
public static class StudentRecord implements Writable, DBWritable {
public int id;
public String name;
public String sex;
public int age;
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
this.sex = Text.readString(in);
this.age = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
Text.writeString(out, this.name);
Text.writeString(out, this.sex);
out.writeInt(this.age);
}
@Override
public void readFields(ResultSet result) throws SQLException {
this.id = result.getInt(1);
this.name = result.getString(2);
this.sex = result.getString(3);
this.age = result.getInt(4);
}
@Override
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
stmt.setString(3, this.sex);
stmt.setInt(4, this.age);
}
@Override
public String toString() {
return new String("学号" + this.id + "_姓名:" + this.name
+ "_性别:"+ this.sex + "_年龄:" + this.age);
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(ReadDB.class);
//设置地址
conf.set("fs.default.name", "hdfs://192.168.71.128:9000");
conf.set("mapred.job.tracker", "192.168.71.128:9001");
conf.set("dfs.permissions","false");
File jarFile = EJob.createTempJar("bin");
EJob.addClasspath("/usr/hadoop/conf");
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
conf.setJar(jarFile.toString());
DistributedCache.addFileToClassPath(new Path(
"/usr/hadoop/lib/mysql-connector-java-5.1.18-bin.jar"), conf);
Class.forName("com.mysql.jdbc.Driver");
// 设置map和reduce类
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
// 设置数据库
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.71.128:3306/song", "root", "mysql");
// 设置表字段
String[] fields = { "id", "name", "sex", "age" };
DBInputFormat.setInput(conf, StudentRecord.class, "student", null,"id", fields);
// 设置输入类型
conf.setInputFormat(DBInputFormat.class);
//conf.setMapOutputKeyClass(Text.class);
//conf.setMapOutputValueClass(LongWritable.class);
// 设置输出类型
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
// 输出路径
FileOutputFormat.setOutputPath(conf, new Path("rdb_out"));
JobClient.runJob(conf);
}
}
//需要把mysql驱动包添加到工程中
mapreduce--读取mysql数据库数据
猜你喜欢
转载自chrro.iteye.com/blog/1924866
今日推荐
周排行