hadoop基礎----hadoop實戰(五)-----myeclipse開發MapReduce---WordCount例子---解析MapReduce的寫法
來源:程序員人生 發布時間:2016-12-01 15:52:43 閱讀次數:3094次
我們在上1章節已了解了怎樣在myeclipse中開發運行MapReduce
hadoop基礎----hadoop實戰(4)-----myeclipse開發MapReduce---myeclipse搭建hadoop開發環境并運行wordcount
也在很早的章節中了解了MapReduce的原理
hadoop基礎----hadoop理論(4)-----hadoop散布式并行計算模型MapReduce詳解
目標
MapReduce主要的流程是 map----》reduce。
我們本章節來詳細學習java代碼中,是怎樣配置實現MapReduce的。就以WordCount例子為例。
本章節的目的是 熟習MapReduce的寫法以后,我們能寫出更多的業務處理,解決更多的其它問題。
MapReduce的結構
寫1個MapReduce主要有3部份:
Mapper接口的實現,Reducer接口的實現,Job的配置。
Mapper接口和Reducer接口的實現就是要分別編寫兩個類(例如分別叫做Map類和Reduce類)。
在Map類中規定如何將輸入的<key, value>對轉化為中間結果的<key, list of values>對。
在Reduce類中規定如何將Map輸出的中間結果進1步處理,轉化為終究的結果輸出<key, value>對。
而對Job的配置是要在main函數中創建相干對象,調用其方法實現的。
完全代碼
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
//編寫完成Map任務的靜態內部類,類的名字就叫TokenizerMapper,繼承Mapper類
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
//編寫完成Reduce任務的靜態內部類,類的名字就叫IntSumReducer,繼承Reducer類
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
//main函數中所要做的就是Job的配置和提交
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
文件中的內容
我們用來作測試的文件有2個,分別是file1.txt和file2.txt。
file1.txt中是
hello world
file2.txt中是
hello hadoop
Mapper接口的實現分析
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
這段代碼實現了Map的功能,我們聲明了1個類TokenizerMapper(類名隨便,我們也能夠起名WordCountMap但是必須繼承Mapper接口)繼承了Mapper接口---接口的參數是固定的,也就是寫其它功能的MapReduce也繼承這個接口,用這幾個參數或適當調劑。熟習java的同學會看到出現了1些新的數據類型:比如Text,IntWritable,Context。
LongWritable, IntWritable, Text 均是 Hadoop 中實現的用于封裝 Java 數據類型的類,這些類實現了WritableComparable接口,它們 都能夠被串行化從而便于在散布式環境中進行數據交換,你可以將它們分別視為long,int,String 的替換品。
Context則是負責搜集鍵值對的中間結果或終究結果,有些版本可以用OutputCollector<Text, IntWritable> output,但是用法都1樣,都是用來搜集結果。
private final static IntWritable one = new IntWritable(1);
定義了1個int賦值1,作為計數器。
private Text word = new Text();
定義1個變量,用來保存key。這個key會用來作為map辨別數據。
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
Mapper接口中的必須有map方法實現功能,傳入參數1般也是固定的。參數中context負責搜集鍵值對的中間結果傳遞給reduce。
我們的file1.txt和file2.txt在hadoop中會經過TextInputFormat,每一個文件(或其1部份)都會單獨地作為map的輸入,而這是繼承自FileInputFormat的。以后,每行數據都會生成1條記錄,每條記錄則表示為<key,value>情勢:key值是每一個數據的記錄在數據分片中的字節偏移量,數據類型是LongWritable;value值是每行的內容,數據類型是Text。
也就是說 我們寫在文本中的內容 就存在 Text value這個參數中。
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
StringTokenizer 是1個分詞器。用來把句子拆分成1個個單詞。 value是我們文本中的內容,這里是把內容分成1個個單詞。然后通過while去遍歷, 把單詞放入word變量中。
然后把word變量和計數器1作為結果 存起來。
那末經過了map以后的context中的結果就是
<hello,1>
<world,1>
<hello,1>
<hadoop,1>
這個結果會自動傳給reduce。
Reducer接口的實現分析
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
這段代碼實現了Reduce的功能,我們聲明了1個類IntSumReducer(類名隨便,我們也能夠起名WordCountReduce但是必須繼承Reducer接口)繼承了Reducer接口---接口的參數是固定的,也就是寫其它功能的MapReduce也繼承這個接口,用這幾個參數或適當調劑參數類型。
private IntWritable result = new IntWritable();
定義1個變量,用來裝每組的計數結果。
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
Reducer接口中的必須有reduce方法實現功能,傳入參數1般也是固定的。
參數中context負責搜集鍵值對的終究結果。
key對應map傳遞過濾的key,values對應map傳遞過濾的value。
為何這里是values呢。
由于進入reduce方法時會自動分組,只有key1樣的數據才會同時進入1個reduce中。
map傳遞過來的結果中是
<hello,1>
<world,1>
<hello,1>
<hadoop,1>
也就是 這個例子中會進入3次reduce,
第1次 key 是 hello, values是[1,1]
第2次key 是 world,values是[1]
第3次key是 hadoop ,values是[1]
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
循環values列表,相加計數后放入終究結果容器context中。
所以終究的結果是
<hello,2>
<world,1>
<hadoop,1>
Job的配置--main方法
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
mapreduce中需要1個main方法配置參數,向
Configuration conf = new Configuration();
創建1個配置實例。
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
獲得參數并判斷是不是合法。
Job job = new Job(conf, "word count");
新建1個job任務。
job.setJarByClass(WordCount.class);
設置運行的jar類,也就是mapreduce的主類名。
job.setMapperClass(TokenizerMapper.class);
設置map類,也就是繼承map接口的類名。job.setCombinerClass(IntSumReducer.class);
設置
Combiner類,其實map到reduce還有1道工序是Combiner,如果有特殊需求可以新建1個類,沒有的話直接使用繼承reduce接口的類便可。 job.setReducerClass(IntSumReducer.class);
設置reduce類,也就是繼承reduce接口的類名。 job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
設置輸出結果的 key 和value的數據類型。
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
設置輸入輸入結果的路徑,我們可以把路徑寫死,也能夠通過參數傳進來,我們這里就是用的接受的參數的值。
System.exit(job.waitForCompletion(true) ? 0 : 1);
提交運行,完成后退出程序。
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈