編者按:Streaming是Hadoop的一個(gè)工具,用來(lái)創(chuàng)建和運(yùn)行一類特殊的Map/Reduce作業(yè)。Streaming使用“標(biāo)準(zhǔn)輸入”和“標(biāo)準(zhǔn)輸出”與我們編寫的Map和Reduce進(jìn)行數(shù)據(jù)的交換。由此可知,任何能夠使用“標(biāo)準(zhǔn)輸入”和“標(biāo)準(zhǔn)輸出”的編程語(yǔ)言都可以用來(lái)編寫MapReduce程序。今天給大家分享一篇來(lái)自董西成的博文“利用Hadoop Streaming處理二進(jìn)制格式文件”,文中介紹了如何使用Streaming處理二進(jìn)制格式的文件。
CSDN推薦:歡迎免費(fèi)訂閱《Hadoop與大數(shù)據(jù)周刊》獲取更多Hadoop技術(shù)文獻(xiàn)、大數(shù)據(jù)技術(shù)分析、企業(yè)實(shí)戰(zhàn)經(jīng)驗(yàn),生態(tài)圈發(fā)展趨勢(shì)。
Hadoop Streaming是Hadoop提供的多語(yǔ)言編程工具,用戶可以使用自己擅長(zhǎng)的編程語(yǔ)言(比如python、php或C#等)編寫Mapper和Reducer處理文本數(shù)據(jù)。Hadoop Streaming自帶了一些配置參數(shù)可友好地支持多字段文本數(shù)據(jù)的處理,參與Hadoop Streaming介紹和編程,可參考我的這篇文章:“Hadoop Streaming編程實(shí)例”。然而,隨著Hadoop應(yīng)用越來(lái)越廣泛,用戶希望Hadoop Streaming不局限在處理文本數(shù)據(jù)上,而是具備更加強(qiáng)大的功能,包括能夠處理二進(jìn)制數(shù)據(jù);能夠支持多語(yǔ)言編寫Combiner等組件。隨著Hadoop 2.x的發(fā)布,這些功能已經(jīng)基本上得到了完整的實(shí)現(xiàn),本文將介紹如何使用Hadoop Streaming處理二進(jìn)制格式的文件,包括SequenceFile,HFile等。
注:本文用到的程序?qū)嵗稍诎俣仍疲篽adoop-streaming-binary-examples 下載。
在詳細(xì)介紹操作步驟之前,先介紹本文給出的實(shí)例。假設(shè)有這樣的SequenceFile,它保存了手機(jī)通訊錄信息,其中,key是好友名,value是描述該好友的一個(gè)結(jié)構(gòu)體或者對(duì)象,為此,本文使用了google開源的protocol buffer這一序列化/反序列化框架,protocol buffer結(jié)構(gòu)體定義如下:
option java_package = "";
option java_outer_classname="PersonInfo";
message Person {
optional string name = 1;
optional int32 age = 2;
optional int64 phone = 3;
optional string address = 4;
}
SequenceFile文件中的value便是保存的Person對(duì)象序列化后的字符串,這是典型的二進(jìn)制數(shù)據(jù),不能像文本數(shù)據(jù)那樣可通過(guò)換行符解析出每條記錄,因?yàn)槎M(jìn)制數(shù)據(jù)的每條記錄中可能包含任意字符,包括換行符。
一旦有了這樣的SequenceFile之后,我們將使用Hadoop Streaming編寫這樣的MapReduce程序:這個(gè)MapReduce程序只有Map Task,任務(wù)是解析出文件中的每條好友記錄,并以name age,phone,address的文本格式保存到HDFS上。
1. 準(zhǔn)備數(shù)據(jù)
首先,我們需要準(zhǔn)備上面介紹的SequenceFile數(shù)據(jù),生成數(shù)據(jù)的核心代碼如下:
final SequenceFile.Writer out =
SequenceFile.createWriter(fs, getConf(), new Path(args[0]),
Text.class, BytesWritable.class);
Text nameWrapper = new Text();
BytesWritable personWrapper = new BytesWritable();
System.out.println("Generating " + num + " Records......");
for(int i = 0; i < num; i++) {
genOnePerson(nameWrapper, personWrapper);
System.out.println("Generating " + i + " Records," + nameWrapper.toString() + "......");
out.append(nameWrapper, personWrapper);
}
out.close();
當(dāng)然,為了驗(yàn)證我們產(chǎn)生的數(shù)據(jù)是否正確,需要編寫一個(gè)解析程序,核心代碼如下:
Reader reader = new Reader(fs, new Path(args[0]), getConf());
Text key = new Text();
BytesWritable value = new BytesWritable();
while(reader.next(key, value)) {
System.out.println("key:" + key.toString());
value.setCapacity(value.getSize()); // Very important!!! Very Tricky!!!
PersonInfo.Person person = PersonInfo.Person.parseFrom(value.getBytes());
System.out.println("age:" + person.getAge()
+ ",address:" + person.getAddress()
+",phone:" + person.getPhone());
}
reader.close();
需要注意的,Value保存類型為BytesWritable,使用這個(gè)類型非常容易犯錯(cuò)誤。當(dāng)你把一堆byte[]數(shù)據(jù)保存到BytesWritable后,通過(guò)BytesWritable.getBytes()再讀到的數(shù)據(jù)并不一定是原數(shù)據(jù),可能變長(zhǎng)了很多,這是因?yàn)锽ytesWritable采用了自動(dòng)內(nèi)存增長(zhǎng)算法,你保存的數(shù)據(jù)長(zhǎng)度為size時(shí),它可能將數(shù)據(jù)保存到了長(zhǎng)度為capacity(capacity>size)的buffer中,這時(shí)候,你通過(guò)BytesWritable.getBytes()得到的數(shù)據(jù)最后一些字符是多余的,如果里面保存的是protocol buffer序列化后的字符串,則無(wú)法反序列化,這時(shí)候可以使用BytesWritable.setCapacity (value.getSize())將后面多余空間剔除掉。
2. 使用Hadoop Streaming編寫C++程序
為了說(shuō)明Hadoop Streaming如何處理二進(jìn)制格式數(shù)據(jù),本文僅僅以C++語(yǔ)言為例進(jìn)行說(shuō)明,其他語(yǔ)言的設(shè)計(jì)方法類似。
先簡(jiǎn)單說(shuō)一下原理。當(dāng)輸入數(shù)據(jù)是二進(jìn)制格式時(shí),Hadoop Streaming會(huì)對(duì)輸入key和value進(jìn)行編碼后,通過(guò)標(biāo)準(zhǔn)輸入傳遞給你的Hadoop Streaming程序,目前提供了兩種編碼格式,分別是rawtypes和 typedbytes,你可以設(shè)計(jì)你想采用的格式,這兩種編碼規(guī)則如下(具體在文章“Hadoop Streaming高級(jí)編程”中已經(jīng)介紹了):
rawbytes:key和value均用【4個(gè)字節(jié)的長(zhǎng)度+原始字節(jié)】表示
typedbytes:key和value均用【1字節(jié)類型+4字節(jié)長(zhǎng)度+原始字節(jié)】表示
本文將采用第一種編碼格式進(jìn)行說(shuō)明。采用這種編碼意味著你不能想文本數(shù)據(jù)那樣一次獲得一行內(nèi)容,而是依次獲得key和value序列,其中key和value都由兩部分組成,第一部分是長(zhǎng)度(4個(gè)字節(jié)),第二部分是字節(jié)內(nèi)容,比如你的key是dongxicheng,value是goodman,則傳遞給hadoop streaming程序的輸入數(shù)據(jù)格式為11 dongxicheng 7 goodman。為此,我們編寫下面的Mapper程序解析這種數(shù)據(jù):
int main() {
string key, value;
while(!cin.eof()) {
if(!FileUtil::ReadString(key, cin))
break;
FileUtil::ReadString(value, cin);
Person person;
ProtoUtil::ParseFromString(value, person);
cout << person.name() << " " << person.age()
<< "," << person.address()
<< "," << person.phone() << endl;
}
return 0;
}
其中,輔助函數(shù)實(shí)現(xiàn)如下:
class ProtoUtil {
public:
static bool ParseFromString(const string& str, Person &person) {
if(person.ParseFromString(str))
return true;
return false;
}
};
class FileUtil {
public:
static bool ReadInt(unsigned int *len, istream &stream) {
if(!stream.read((char *)len, sizeof(unsigned int)))
return false;
*len = bswap_32(*len);
return true;
}
static bool ReadString(string &str, istream &stream) {
unsigned int len;
if(!ReadInt(&len, stream))
return false;
str.resize(len);
if(!ReadBytes(&str[0], len, stream))
return false;
return true;
}
static bool ReadBytes(char *ptr, unsigned int len, istream &stream) {
stream.read(ptr, sizeof(unsigned char) * len);
if(stream.eof()) return false;
return true;
}
};
該程序需要注意以下幾點(diǎn):
(1)注意大小端編碼規(guī)則,解析key和value長(zhǎng)度時(shí),需要對(duì)長(zhǎng)度進(jìn)行字節(jié)翻轉(zhuǎn)。
(2)注意循環(huán)結(jié)束條件,僅僅靠!cin.eof()判定是不夠的,僅靠這個(gè)判定會(huì)導(dǎo)致多輸出一條重復(fù)數(shù)據(jù)。
(3)本程序只能運(yùn)行在linux系統(tǒng)下,windows操作系統(tǒng)下將無(wú)法運(yùn)行,因?yàn)閣indows下的標(biāo)準(zhǔn)輸入cin并直接支持二進(jìn)制數(shù)據(jù)讀取,需要將其強(qiáng)制以二進(jìn)制模式重新打開后再使用。
3. 程序測(cè)試與運(yùn)行
程序?qū)懞煤螅谝徊绞蔷幾gC++程序。由于該程序需要運(yùn)行在多節(jié)點(diǎn)的Hadoop集群上,為了避免部署或者分發(fā)動(dòng)態(tài)庫(kù)帶來(lái)的麻煩,我們直接采用靜態(tài)編譯方式,這也是編寫Hadoop C++程序的基本規(guī)則。為了靜態(tài)編譯以上MapReduce程序,安裝protocol buffers時(shí),需采用以下流程(強(qiáng)調(diào)第一步),
./configure生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)
![]()