版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
今天我从豆瓣网上爬取了正在上映的电影保存到本地以(json文件),在以此文件为原数据,用MapReduce统计出region字段的统计次数。
第一步:爬取豆瓣数据
import requests
from lxml import etree
url = "https://movie.douban.com/cinema/nowplaying/weifang/"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36',
'Referer': 'https://movie.douban.com/cinema/nowplaying/weifang/'
}
respones = requests.get(url=url,headers=headers)
text = respones.text #.text 返回的是经过解码后的字符串,str(unicode)类型
# print(text) #.content返回的是一个原生的字符串,就是从网页上直接爬取下来的,没有经过处理的字符串,是byte类型
html = etree.HTML(text)
ul = html.xpath('//ul[@class="lists"]')[0] #有两个标签,正在上映和即将上映,我们要的是正在上映
# print(ul) #打印的是状态
# print(etree.tostring(ul,encoding='utf-8').decode('utf-8')) #打印的是具体的网页代码
lis = ul.xpath('./li') #xpath返回的是一个列表 ./代表当前标签下
movies = []
for li in lis:
title = li.xpath("@data-title")[0] #获取电影名字段
score = li.xpath("@data-score")[0]
duration = li.xpath("@data-duration")[0]
region = li.xpath("@data-region")[0]
director = li.xpath("@data-director")[0]
img = li.xpath(".//img/@src")[0]
movie = {
'title':title,
'score':score,
'duration':duration,
'region':region,
'director':director,
'img':img}
movies.append(movie)
# print(movies)
with open('D:/pachong/pachong.json','w',encoding='utf-8') as f:
for i in movies:
ii = str(i) + '\n' #转str
f.write(ii) #一行一行写
结果如下:
第二步:MapReduce阶段
由于在写MapReduce过程中 无法解析上面的json文件,所以只能转换成能够被MapReduce解析的文件(将里面的单引号都换成双引号)。
在进行MapReduce就可以了。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
//import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
public class douban {
public static class pachongmap extends Mapper<LongWritable, Text,Text, IntWritable> {
ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
pachongaa pach= objectMapper.readValue(line, pachongaa.class);
context.write(new Text(pach.getRegion()),new IntWritable(1));
}
}
public static class pachongreduce extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Integer count = 0;
for (IntWritable W:values) {
count ++;
}
context.write(key , new IntWritable(count));
}
}
public static void main(String[] args) throws Exception {
BasicConfigurator.configure();//自动快速使用缺省log4j的环境
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(douban.class);
job.setMapperClass(pachongmap.class);
job.setReducerClass(pachongreduce.class);
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job,new Path("D:\\pachong\\pachong.json"));
FileOutputFormat.setOutputPath(job,new Path("D:\\pachong\\output"));
job.submit();
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
pachongaa类
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class pachongaa implements WritableComparable<pachongaa> {
private String title;
private String score;
private String duration;
private String region;
private String director;
private String img;
public String getDirector() {
return director;
}
public void setDirector(String director) {
this.director = director;
}
public String getImg() {
return img;
}
public void setImg(String img) {
this.img = img;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getScore() {
return score;
}
public void setScore(String score) {
this.score = score;
}
public String getDuration() {
return duration;
}
public void setDuration(String duration) {
this.duration = duration;
}
public String getRegion() {
return region;
}
public void setRegion(String region) {
this.region = region;
}
@Override
public int compareTo(pachongaa o) {
return o.getRegion().compareTo(this.region);
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.region);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.region = dataInput.readUTF();
}
@Override
public String toString() {
return "pachongaa{" +
"title='" + title + '\'' +
", score=" + score +
", duration='" + duration + '\'' +
", region='" + region + '\'' +
", director='" + director + '\'' +
", img='" + img + '\'' +
'}';
}
}
这样我的需求就解决了。元气满满的一天。