多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > 服務器 > hadoop基礎----hadoop實戰(五)-----myeclipse開發MapReduce---WordCount例子---解析MapReduce的寫法

hadoop基礎----hadoop實戰(五)-----myeclipse開發MapReduce---WordCount例子---解析MapReduce的寫法

來源:程序員人生   發布時間:2016-12-01 15:52:43 閱讀次數:3094次



我們在上1章節已了解了怎樣在myeclipse中開發運行MapReduce

hadoop基礎----hadoop實戰(4)-----myeclipse開發MapReduce---myeclipse搭建hadoop開發環境并運行wordcount

也在很早的章節中了解了MapReduce的原理

hadoop基礎----hadoop理論(4)-----hadoop散布式并行計算模型MapReduce詳解


目標

MapReduce主要的流程是 map----》reduce。

我們本章節來詳細學習java代碼中,是怎樣配置實現MapReduce的。就以WordCount例子為例。

本章節的目的是 熟習MapReduce的寫法以后,我們能寫出更多的業務處理,解決更多的其它問題。



MapReduce的結構

寫1個MapReduce主要有3部份:

Mapper接口的實現,Reducer接口的實現,Job的配置。

Mapper接口和Reducer接口的實現就是要分別編寫兩個類(例如分別叫做Map類和Reduce類)。

在Map類中規定如何將輸入的<key, value>對轉化為中間結果的<key, list of values>對。

在Reduce類中規定如何將Map輸出的中間結果進1步處理,轉化為終究的結果輸出<key, value>對。

而對Job的配置是要在main函數中創建相干對象,調用其方法實現的。



完全代碼

package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { //編寫完成Map任務的靜態內部類,類的名字就叫TokenizerMapper,繼承Mapper類 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } //編寫完成Reduce任務的靜態內部類,類的名字就叫IntSumReducer,繼承Reducer類 public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } //main函數中所要做的就是Job的配置和提交 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }



文件中的內容

我們用來作測試的文件有2個,分別是file1.txt和file2.txt。

file1.txt中是

hello world

file2.txt中是

hello hadoop




Mapper接口的實現分析

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }


這段代碼實現了Map的功能,我們聲明了1個類TokenizerMapper(類名隨便,我們也能夠起名WordCountMap但是必須繼承Mapper接口)繼承Mapper接口---接口的參數是固定的,也就是寫其它功能的MapReduce也繼承這個接口,用這幾個參數或適當調劑。

熟習java的同學會看到出現了1些新的數據類型:比如Text,IntWritable,Context。

LongWritable, IntWritable, Text 均是 Hadoop 中實現的用于封裝 Java 數據類型的類,這些類實現了WritableComparable接口,它們 都能夠被串行化從而便于在散布式環境中進行數據交換,你可以將它們分別視為long,int,String 的替換品。 

Context則是負責搜集鍵值對的中間結果或終究結果,有些版本可以用OutputCollector<Text, IntWritable> output,但是用法都1樣,都是用來搜集結果。


private final static IntWritable one = new IntWritable(1);
定義了1個int賦值1,作為計數器。


private Text word = new Text();
定義1個變量,用來保存key。這個key會用來作為map辨別數據。


public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {
Mapper接口中的必須有map方法實現功能,傳入參數1般也是固定的。

參數中context負責搜集鍵值對的中間結果傳遞給reduce。

我們的file1.txt和file2.txt在hadoop中會經過TextInputFormat,每一個文件(或其1部份)都會單獨地作為map的輸入,而這是繼承自FileInputFormat的。以后,每行數據都會生成1條記錄,每條記錄則表示為<key,value>情勢:key值是每一個數據的記錄在數據分片中的字節偏移量,數據類型是LongWritable;value值是每行的內容,數據類型是Text。


也就是說 我們寫在文本中的內容 就存在  Text value這個參數中。


StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); }
StringTokenizer 是1個分詞器。用來把句子拆分成1個個單詞。 value是我們文本中的內容,這里是把內容分成1個個單詞。

然后通過while去遍歷, 把單詞放入word變量中。

然后把word變量和計數器1作為結果 存起來。


那末經過了map以后的context中的結果就是

<hello,1>

<world,1>

<hello,1>

<hadoop,1>

這個結果會自動傳給reduce。





Reducer接口的實現分析

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }

這段代碼實現了Reduce的功能,我們聲明了1個類IntSumReducer(類名隨便,我們也能夠起名WordCountReduce但是必須繼承Reducer接口)繼承了Reducer接口---接口的參數是固定的,也就是寫其它功能的MapReduce也繼承這個接口,用這幾個參數或適當調劑參數類型。


private IntWritable result = new IntWritable();

定義1個變量,用來裝每組的計數結果。


public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException {
Reducer接口中的必須有reduce方法實現功能,傳入參數1般也是固定的。


參數中context負責搜集鍵值對的終究結果。


key對應map傳遞過濾的key,values對應map傳遞過濾的value。

為何這里是values呢。

由于進入reduce方法時會自動分組,只有key1樣的數據才會同時進入1個reduce中。

map傳遞過來的結果中是

<hello,1>

<world,1>

<hello,1>

<hadoop,1>


也就是 這個例子中會進入3次reduce,

第1次 key 是 hello,   values是[1,1]

第2次key 是 world,values是[1]

第3次key是 hadoop ,values是[1]


int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result);
循環values列表,相加計數后放入終究結果容器context中。



所以終究的結果是

<hello,2>

<world,1>

<hadoop,1>





Job的配置--main方法

public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }

mapreduce中需要1個main方法配置參數,向hadoop框架描寫map-reduce履行的工作,并提交運行。 



Configuration conf = new Configuration();
創建1個配置實例。


String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); }
獲得參數并判斷是不是合法。



Job job = new Job(conf, "word count");
新建1個job任務。


job.setJarByClass(WordCount.class);
設置運行的jar類,也就是mapreduce的主類名。



job.setMapperClass(TokenizerMapper.class);
設置map類,也就是繼承map接口的類名。


job.setCombinerClass(IntSumReducer.class);
設置Combiner類,其實map到reduce還有1道工序是Combiner,如果有特殊需求可以新建1個類,沒有的話直接使用繼承reduce接口的類便可。


job.setReducerClass(IntSumReducer.class);
設置reduce類,也就是繼承reduce接口的類名。



job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
設置輸出結果的 key 和value的數據類型。


FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
設置輸入輸入結果的路徑,我們可以把路徑寫死,也能夠通過參數傳進來,我們這里就是用的接受的參數的值。




System.exit(job.waitForCompletion(true) ? 0 : 1);
提交運行,完成后退出程序。





生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 国产日韩高清一区二区三区 | 欧美一级高清视频在线播放 | 国产成人精品免费视频大 | 欧美性视频一区二区三区 | 最新欧美精品一区二区三区不卡 | 最近中文版字幕在线观看 | 成年人视频免费在线观看 | 中文字幕免费在线观看 | 国产玖玖在线 | 日韩欧美一区黑人vs日本人 | 九九99久久精品影视 | 国内自拍在线观看 | 在线h观看 | 国产三级手机在线 | 免费网站在线看 | 久久乐精品 | 九色在线 | 欧美一级欧美一级在线播放 | 国产粉嫩00福利福利福利 | 久久国产精品免费一区二区三区 | 香蕉高清免费永久在线视频 | 欧美一区二区三区视频在线观看 | 91精品国产露脸在线 | 国人精品视频在线观看 | 免费理论片在线观看 | 亚洲最大的黄色网址 | 秋霞日韩理论高清在线观看 | 中文字幕激情视频 | 日本在线视频不卡 | 久久国内精品 | 亚洲成人福利网站 | 久久在线免费观看视频 | 亚洲嫩草影院久久精品 | 91亚洲国产成人精品性色 | 欧美性视频在线播放 | 欧美视频xxxx | 91亚洲精品一区二区福利 | 美国毛片在线观看 | www.国产精品视频 | 男女男精品视频在线观看 | 在线 | 一区二区三区四区 |