《hadoop進(jìn)階》PeopleRank從社交關(guān)系中挖掘價(jià)值用戶
來(lái)源:程序員人生 發(fā)布時(shí)間:2016-06-07 16:16:59 閱讀次數(shù):4420次
轉(zhuǎn)載請(qǐng)注明出處: 轉(zhuǎn)載自 Thinkgamer的CSDN博客: blog.csdn.net/gamer_gyt
代碼下載地址:點(diǎn)擊查看
pagerank算法的python實(shí)現(xiàn)請(qǐng)參考:http://blog.csdn.net/gamer_gyt/article/details/47443877
pagerank算法的mapreduce實(shí)現(xiàn)請(qǐng)參考:http://blog.csdn.net/gamer_gyt/article/details/47451021
1:PageRank 與 PeopleRank
2:需求分析:發(fā)掘CSDN博客的價(jià)值用戶
3:算法模型:PeopleRank算法
4:架構(gòu)設(shè)計(jì):從數(shù)據(jù)準(zhǔn)備到PR算法的MR化
5:程序開(kāi)發(fā):hadoop實(shí)現(xiàn)PeopleRank算法
1:PageRank與PeopleRank
PageRank算法是Google從垃圾堆里撿黃金的重量級(jí)算法,它讓谷歌的搜索引擎1度成為No.1,固然谷歌所公然的PR算法畢竟是過(guò)去式了,既然它能公然,那末肯定不是它最新的算法演變版本,但是不管怎樣,我們照舊從中學(xué)習(xí)到很多創(chuàng)新和獨(dú)特的思想。
PR算法主要用于網(wǎng)頁(yè)評(píng)分計(jì)算,它利用互聯(lián)網(wǎng)的網(wǎng)頁(yè)之間的連接關(guān)系,給網(wǎng)頁(yè)進(jìn)行打分,終究PR值越高的網(wǎng)頁(yè)價(jià)值也就越高。
自2012以來(lái),中國(guó)開(kāi)始進(jìn)入社交網(wǎng)絡(luò)的時(shí)期,開(kāi)心網(wǎng),人人網(wǎng),新浪微博,騰訊微博,微信等社交網(wǎng)絡(luò)利用,開(kāi)始進(jìn)入大家的生活。最早是由“搶車(chē)位”,“偷菜”等社交游戲帶動(dòng)的社交網(wǎng)絡(luò)的興起,如今人們會(huì)更多的利用社交網(wǎng)絡(luò),獲得信息和分享信息。我們的互聯(lián)網(wǎng),正在從以網(wǎng)頁(yè)信息為核心的網(wǎng)絡(luò),向著以人為核心的網(wǎng)絡(luò)轉(zhuǎn)變著。
因而有人就提出了,把PageRank模型利用于社交網(wǎng)絡(luò),定義以人為核心的個(gè)體價(jià)值。這樣PageRank模型就有了新的利用領(lǐng)域,同時(shí)也有了1個(gè)新的名字PeopleRank。
2 . 需求分析:發(fā)掘CSDN博客的價(jià)值用戶

如上圖所示,CSDN博客的每一個(gè)用戶都有關(guān)注人數(shù)和粉絲人數(shù),這在1定程度上和網(wǎng)頁(yè)之間的連接關(guān)系是10分相似的,我個(gè)人比較菜,粉絲數(shù)太少,固然我希望看過(guò)我博客的人,如果你感覺(jué)不錯(cuò)的話是不是可以關(guān)注以下呢,閑話少說(shuō),這類相互關(guān)注的關(guān)系在1定程度上體現(xiàn)了用戶的價(jià)值,粉絲數(shù)目越多的人,在1定程度上,其本身所具有的重要性。
順便給大家看1個(gè)CSDN排名47的牛人

這恰好符合PR算法,我們是不是可以斟酌使用PeopleRank算法,利用用戶之間的關(guān)注關(guān)系,來(lái)計(jì)算不同用戶的PR值,從而提取出“價(jià)值”更高的用戶呢?答案是肯定的。
3 . 算法模型:PeopleRank算法
那末甚么是PageRank算法?固然本篇博客其實(shí)不是來(lái)談PR算法的,而是將如何利用hadoop實(shí)現(xiàn)pr算法從而發(fā)掘有價(jià)值的用戶,所以以下只是簡(jiǎn)單的對(duì)pr算法的描寫(xiě),更多還請(qǐng)自己搜索查看(以下部份摘自:http://blog.jobbole.com/71431/)
互聯(lián)網(wǎng)中的網(wǎng)頁(yè)可以看出是1個(gè)有向圖,其中網(wǎng)頁(yè)是結(jié)點(diǎn),如果網(wǎng)頁(yè)A有鏈接到網(wǎng)頁(yè)B,則存在1條有向邊A->B,下面是1個(gè)簡(jiǎn)單的示例:

這個(gè)例子中只有4個(gè)網(wǎng)頁(yè),如果當(dāng)前在A網(wǎng)頁(yè),那末悠閑的上網(wǎng)者將會(huì)各以1/3的幾率跳轉(zhuǎn)到B、C、D,這里的3表示A有3條出鏈,如果1個(gè)網(wǎng)頁(yè)有k條出鏈,那末跳轉(zhuǎn)任意1個(gè)出鏈上的幾率是1/k,同理D到B、C的幾率各為1/2,而B(niǎo)到C的幾率為0。1般用轉(zhuǎn)移矩陣表示上網(wǎng)者的跳轉(zhuǎn)幾率,如果用n表示網(wǎng)頁(yè)的數(shù)目,則轉(zhuǎn)移矩陣M是1個(gè)n*n的方陣;如果網(wǎng)頁(yè)j有k個(gè)出鏈,那末對(duì)每個(gè)出鏈指向的網(wǎng)頁(yè)i,有M[i][j]=1/k,而其他網(wǎng)頁(yè)的M[i][j]=0;上面示例圖對(duì)應(yīng)的轉(zhuǎn)移矩陣以下:

初試時(shí),假定上網(wǎng)者在每個(gè)網(wǎng)頁(yè)的幾率都是相等的,即1/n,因而初試的幾率散布就是1個(gè)所有值都為1/n的n維列向量V0,用V0去右乘轉(zhuǎn)移矩陣M,就得到了第1步以后上網(wǎng)者的幾率散布向量MV0,(nXn)*(nX1)仍然得到1個(gè)nX1的矩陣。下面是V1的計(jì)算進(jìn)程:

注意矩陣M中M[i][j]不為0表示用1個(gè)鏈接從j指向i,M的第1行乘以V0,表示累加所有網(wǎng)頁(yè)到網(wǎng)頁(yè)A的幾率即得到9/24。得到了V1后,再用V1去右乘M得到V2,1直下去,終究V會(huì)收斂,即Vn=MV(n⑴),上面的圖示例,不斷的迭代,終究V=[3/9,2/9,2/9,2/9]’:
4 .架構(gòu)設(shè)計(jì):從數(shù)據(jù)準(zhǔn)備到PR算法的MR化
這里我采取的是用戶和用戶之間的關(guān)注關(guān)系,例如 用戶A 關(guān)注 用戶B
1:數(shù)據(jù)收集
使用Python爬蟲(chóng)收集CSDN博客的用戶和用戶的關(guān)注關(guān)系,這里我使用的收集程序架構(gòu)圖以下:

由于我這個(gè)PR計(jì)算是我做的另外1個(gè)項(xiàng)目(博客統(tǒng)計(jì)分析系統(tǒng):github地址 在線演示地址:點(diǎn)擊查看
該地址會(huì)在1定的時(shí)間內(nèi)有效)的其中的1部份,所以數(shù)據(jù)也是從其中摘取的,本來(lái)的收集程序是為了收集所有CSDN博客用戶udell信息和博客內(nèi)容的,固然由于各種關(guān)系,終究收集的用戶數(shù)量為7萬(wàn)左右,終究收集到的數(shù)據(jù)格式以下:
用戶信息數(shù)據(jù):

博客信息數(shù)據(jù):

2:數(shù)據(jù)整理
我從中隨機(jī)抽取了100個(gè)用戶,同時(shí)利用1定的技術(shù)手段,給這個(gè)100個(gè)用戶之間賦予1定的關(guān)注關(guān)系,整理后的數(shù)據(jù)以下,主要包括兩部份,第1部份是用戶之間的關(guān)注關(guān)系(用戶id,關(guān)注的用戶id),第2是給每一個(gè)用戶賦予1定的初始值(用戶id,初始用戶pr值全部為1)
(1)
(2) 
3:PR算法的MR化設(shè)計(jì)
我么以下面這個(gè)圖來(lái)講1下

ID=1的頁(yè)面鏈向2,3,4頁(yè)面,所以1個(gè)用戶從ID=1的頁(yè)面跳轉(zhuǎn)到2,3,4的幾率各為1/3
ID=2的頁(yè)面鏈向3,4頁(yè)面,所以1個(gè)用戶從ID=2的頁(yè)面跳轉(zhuǎn)到3,4的幾率各為1/2
ID=3的頁(yè)面鏈向4頁(yè)面,所以1個(gè)用戶從ID=3的頁(yè)面跳轉(zhuǎn)到4的幾率各為1
ID=4的頁(yè)面鏈向2頁(yè)面,所以1個(gè)用戶從ID=4的頁(yè)面跳轉(zhuǎn)到2的幾率各為1
(1):構(gòu)造鄰接矩陣

(2):構(gòu)造鄰接矩陣

(3):轉(zhuǎn)換為幾率矩陣(轉(zhuǎn)移矩陣)

(4):阻尼系數(shù)幾率矩陣

(5):進(jìn)行迭代計(jì)算

至于迭代的次數(shù)有自己設(shè)定,其實(shí)不是越多越好,根據(jù)6度分割理論來(lái)說(shuō),1般迭代6次
5 .
程序開(kāi)發(fā):hadoop實(shí)現(xiàn)PeopleRank算法
程序架構(gòu)以下:

個(gè)人代碼目錄:

下面我們具體說(shuō)1說(shuō)每個(gè)文件是干甚么的
day7_author100_mess.csv:源文件,由dataEtl.java處理成我們所需要的數(shù)據(jù)格式
people.csv,peoplerank.txt :day7_author100_mess.csv處理后得到的文件
prjob.java:程序調(diào)度的主函數(shù)
prMatrix.java:數(shù)據(jù)轉(zhuǎn)換為矩陣情勢(shì)
prJisuan.java: 計(jì)算每一個(gè)用戶的PR值
prNormal.java:PR值的標(biāo)準(zhǔn)化
prSort.java:對(duì)轉(zhuǎn)化后的PR值進(jìn)行排序
終究的輸出文件目錄

下面只對(duì)部份代碼進(jìn)行展現(xiàn),更多請(qǐng)前往github下載:點(diǎn)擊查看
dataEtl.java
package pagerankjisuan;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
public class dataEtl {
public static void main() throws IOException {
File f1 = new File("MyItems/pagerankjisuan/people.csv");
if(f1.isFile()){
f1.delete();
}
File f = new File("MyItems/pagerankjisuan/peoplerank.txt");
if(f.isFile()){
f.delete();
}
//打開(kāi)文件
File file = new File("MyItems/pagerankjisuan/day7_author100_mess.csv");
//定義1個(gè)文件指針
BufferedReader reader = new BufferedReader(new FileReader(file));
try {
String line=null;
//判斷讀取的1行是不是為空
while( (line=reader.readLine()) != null)
{
String[] userMess = line.split( "," );
//第1字段為id,第是個(gè)字段為粉絲列表
String userid = userMess[0];
if(userMess.length!=0){
if(userMess.length==11)
{
int i=0;
String[] focusName = userMess[10].split("\\|"); // | 為轉(zhuǎn)義符
for (i=1;i < focusName.length; i++)
{
write(userid,focusName[i]);
// System.out.println(userid+ " " + focusName[i]);
}
}
else
{
int j =0;
String[] focusName = userMess[9].split("\\|"); // | 為轉(zhuǎn)義符
for (j=1;j < focusName.length; j++)
{
write(userid,focusName[j]);
// System.out.println(userid+ " " + focusName[j]);
}
}
}
}
}
catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
finally
{
reader.close();
//etl peoplerank.txt
for(int i=1;i<=100;i++){
FileWriter writer = new FileWriter("MyItems/pagerankjisuan/peoplerank.txt",true);
writer.write(i + "\t" + 1 + "\n");
writer.close();
}
}
System.out.println("OK..................");
}
private static void write(String userid, String nameid) {
// TODO Auto-generated method stub
//定義寫(xiě)文件,按行寫(xiě)入
try {
if(!nameid.contains("\n")){
FileWriter writer = new FileWriter("MyItems/pagerankjisuan/people.csv",true);
writer.write(userid + "," + nameid + "\n");
writer.close();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
prjob.java
package pagerankjisuan;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;
/*
* 調(diào)度函數(shù)
*/
public class prjob {
public static final String HDFS = "hdfs://127.0.0.1:9000";
public static void main(String[] args) {
Map <String, String> path= new HashMap<String, String>();
path.put("page" ,"/home/thinkgamer/MyCode/hadoop/MyItems/pagerankjisuan/people.csv");
path.put("pr" ,"/home/thinkgamer/MyCode/hadoop/MyItems/pagerankjisuan/peoplerank.txt");
path.put("input", HDFS + "/mr/blog_analysic_system/people"); // HDFS的目錄
path.put("input_pr", HDFS + "/mr/blog_analysic_system/pr"); // pr存儲(chǔ)目錄
path.put("tmp1", HDFS + "/mr/blog_analysic_system/tmp1"); // 臨時(shí)目錄,寄存鄰接矩陣
path.put("tmp2", HDFS + "/mr/blog_analysic_system/tmp2"); // 臨時(shí)目錄,計(jì)算到得PR,覆蓋input_pr
path.put("result", HDFS + "/mr/blog_analysic_system/result"); // 計(jì)算結(jié)果的PR
path.put("sort", HDFS + "/mr/blog_analysic_system/sort"); //終究排序輸出的結(jié)果
try {
dataEtl.main();
prMatrix.main(path);
int iter = 3; // 迭代次數(shù)
for (int i = 0; i < iter; i++) {
prJisuan.main(path);
}
prNormal.main(path);
prSort.main(path);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(0);
}
public static String scaleFloat(float f) {// 保存6位小數(shù)
DecimalFormat df = new DecimalFormat("##0.000000");
return df.format(f);
}
}
prSort.java
package pagerankjisuan;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class prSort {
/**
* @param args
* @throws IOException
* @throws IllegalArgumentException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static class myComparator extends Comparator {
@SuppressWarnings("rawtypes")
public int compare( WritableComparable a,WritableComparable b){
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
public static class sortMap extends Mapper<Object,Text,FloatWritable,IntWritable>{
public void map(Object key,Text value,Context context) throws NumberFormatException, IOException, InterruptedException{
String[] split = value.toString().split("\t");
context.write(new FloatWritable(Float.parseFloat(split[1])),new IntWritable(Integer.parseInt(split[0])) );
}
}
public static class Reduce extends Reducer<FloatWritable,IntWritable,IntWritable,FloatWritable>{
public void reduce(FloatWritable key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException{
for (IntWritable text : values) {
context.write( text,key);
}
}
}
public static void main(Map<String, String> path) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
String input = path.get("result");
String output = path.get("sort");
hdfsGYT hdfs = new hdfsGYT();
hdfs.rmr(output);
Job job = new Job();
job.setJarByClass(prSort.class);
// 1
FileInputFormat.setInputPaths(job, new Path(input) );
// 2
job.setMapperClass(sortMap.class);
job.setMapOutputKeyClass(FloatWritable.class);
job.setMapOutputValueClass(IntWritable.class);
// 3
// 4 自定義排序
job.setSortComparatorClass( myComparator.class);
// 5
job.setNumReduceTasks(1);
// 6
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(FloatWritable.class);
// 7
FileOutputFormat.setOutputPath(job, new Path(output));
// 8
System.exit(job.waitForCompletion(true)? 0 :1 );
}
}
終究排序輸出的結(jié)果為:

生活不易,碼農(nóng)辛苦
如果您覺(jué)得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)