[Hadoop]-通过WordCount了解hadoop的工作

上一篇博客运行了例子wordCount,用Hadoop来做词频统计,流程是这样的:

1)先用HDFS的命令行工具,将要统计的文件(假设很大,1000G)复制到HDFS上;

2)用Java写MapReduce代码,写完后调试编译,然后打包成Jar包;

3)执行Hadoop命令,用这个Jar包在Hadoop集群上处理1000G的文件,然后将结果文件存放到指定的目录。

4)用HDFS的命令行工具查看处理结果文件。

示例

两个文件file01,file02

1
2
3
4
5
$ bin/hadoop dfs -cat /wordcount/input/file01 
Hello World Bye World

$ bin/hadoop dfs -cat /wordcount/input/file02
Hello Hadoop Goodbye Hadoop

运行

1
bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /wordcount/input /wordcount/output

结果

1
2
3
4
5
6
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2

代码

结构

main主程序,负责执行整个流程

TokenizerMapper.java文件是做Map的代码

IntSumReducer.java是做Reduce的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

public class WordCount {
public WordCount() {
}

public static void main(String[] args) throws Exception {
...
}

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
...
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
...
}
}

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
...

public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
... }
}
}

map

import java.util.StringTokenizer;StringTokenizer将符合一定格式的字符串拆分开,单参数的构造方法方法使用默认的分隔符“\t\n\r\f”: the space character, the tab character, the newline character, the carriage-return character, and the form-feed character。他比split的效率高一些,split运用了正则匹配(应该和要构造NFA和右胁正则表达式写的不好导致过多回溯影响效率)

map的过程:通过StringTokenizer 以空格为分隔符将一行切分为若干tokens,之后,输出< , 1> 形式的键值对。

关于组成一个指定作业的map数目的确定,以及如何以更精细的方式去控制这些map,我们将在后边学习到更多的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();//Text类是存储字符串的可比较可序列化类

public TokenizerMapper() {
}

public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());

while(itr.hasMoreTokens()) {//map过程
this.word.set(itr.nextToken());
context.write(this.word, one);//context会存储键值留待Reduce过程处理。
}

}
}

reduce

引入hadoop的Reducer类,这个类负责MapReduce的Reduce过程。

将每个key(本例中就是单词)出现的次数求和,每次调用reduce,进行一个key的所有value值的求和,然后将合并后的键值对写入context。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();

public IntSumReducer() {
}

public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;

IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}

this.result.set(sum);
context.write(key, this.result);
}
}

主程序

job.setCombinerClass(WordCount.IntSumReducer.class);指定了一个combiner (46行)。因此,每次map运行之后,会对输出按照key进行排序,然后把输出传递给本地的combiner(按照作业的配置与Reducer一样),进行本地聚合。

map过程

对于示例中的第一个输入,map输出是:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

第二个输入,map输出是:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>

combiner本地聚合

第一个map的输出是:
< Bye, 1>
< Hello, 1>
< World, 2>

第二个map的输出是:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>

reduce过程

作业的输出就是:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}

Job job = Job.getInstance(conf, "word count"); //任务名字
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);//指定mapper
job.setCombinerClass(WordCount.IntSumReducer.class);//指定combiner
job.setReducerClass(WordCount.IntSumReducer.class);//指定reducer
job.setOutputKeyClass(Text.class); //指定key的类型
job.setOutputValueClass(IntWritable.class);//指定value的类型

for(int i = 0; i < otherArgs.length - 1; ++i) {
//指定输入文件的路径,可能有多个输入文件
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
//指定输出文件的路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
//job.waitForCompletion提交任务且监控这执行过程
System.exit(job.waitForCompletion(true) ? 0 : 1); }

可能出现的bug

执行到running job卡住

可能是是集群的资源不足,无法分配给新任务的资源,需要调节yarn-site.xml的调度器获得资源的参数。

调整:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
      <property>
<!--单个node节点能占用的最大内存-->
<name>yarn.nodemanager.resource.memory-mb</name>
<value>20480</value>
</property>
<property>
<!-- 每个container运行需要的最小内存-->
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<!--每单位的物理内存总量对应的虚拟内存量,默认是2.1,表示每使用1MB的物理内存,最多可以使用2.1MB的虚拟内存总量。-->
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>
</property>

提示安全模式

安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性,同时根据策略必要的复制或者删除部分数据块。运行期通过命令也可以进入安全模式。在实践过程中,系统启动的时候去修改和删除文件也会有安全模式不允许修改的出错提示,只需要等待一会儿即可。

现在就清楚了,那现在要解决这个问题,我想让Hadoop不处在safe mode 模式下,能不能不用等,直接解决呢?
答案是可以的,只要在Hadoop的目录下输入:
bin/hadoop dfsadmin -safemode leave