WordCount是一个简单的应用,它读入文本文件,然后统计出字符出现的频率。输入是文本文件,输出也是文本文件,它的每一行包含了一个字符和它出现的频率,用一个制表符隔开。这是《Hadoop Map/Reduce教程》中的一个入门的Map/Reduce编程例子,可以说是Map/Reduce版的Hello,World.
先随便找一个英文的文本文件,重新命名为a01.dat,通过Upload files to DFS,将a01.dat文件上传到DFS中。
在新建项目向导中,新建一个Map/Reduce项目。一个Map/Reduce项目,包含三个主要文件,一个是Map文件,一个是Reduce文件,还有一个是主文件。源代码如下:
Map.java
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
public class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
Reduce.java
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
WordCount.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/a01.dat"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
job.waitForCompletion(true);
}
}
选择Run As - Run on Hadoop
该程序将文本文件的输入,通过Map函数,转换成一组 key,value 有序对。然后根据key,合并成 key,value1,value2....,然后再通过Reducer函数,做累加操作,计算出每个单词的出现次数,生成新的 key,sum 有序对后输出。
手头上有个邮件列表,包含了几万个邮件地址,于是修改了一下map函数,统计各个邮箱的使用情况。修改后的map为:
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] sarray=value.toString().split("@");
word.set(sarray[1]);
context.write(word, one);
}
运行后得到以下结果:
17230
573
35928
1372
223
385
143
2228
11021
437
562
22185
9671
540
222
4106
2676
129
589
355
285
14607
315
10770
252
828