多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > 服務器 > Flume+Hadoop+Hive的離線分析系統基本架構

Flume+Hadoop+Hive的離線分析系統基本架構

來源:程序員人生   發布時間:2016-06-21 11:39:00 閱讀次數:4176次
      PS:歷史緣由作者賬號名為:ymh198816,但事實上作者的生日其實不是1988年1月6日偷笑

      最近在學習大數據的離線分析技術,所以在這里通過做1個簡單的網站點擊流數據分析離線系統來和大家1起梳理1下離線分析系統的架構模型。固然這個架構模型只能是離線分析技術的1個簡單的入門級架構,實際生產環境中的大數據離線分析技術還觸及到很多細節的處理和高可用的架構。這篇文章的目的只是帶大家入個門,讓大家對離線分析技術有1個簡單的認識,并和大家1起做學習交換。

離線分析系統的結構圖
    

      全部離線分析的整體架構就是使用FlumeFTP服務器上收集日志文件,并存儲在Hadoop HDFS文件系統上,再接著用Hadoopmapreduce清洗日志文件,最后使用HIVE構建數據倉庫做離線分析。任務的調度使用Shell腳本完成,固然大家也能夠嘗試1些自動化的任務調度工具,比如說AZKABANOOZIE等。
      分析所使用的點擊流日志文件主要來自Nginxaccess.log日志文件,需要注意的是在這里其實不是用Flume直接去生產環境上拉取nginx的日志文件,而是多設置了1層FTP服務器來緩沖所有的日志文件,然后再用Flume監聽FTP服務器上指定的目錄并拉取目錄里的日志文件到HDFS服務器上(具體緣由下面分析)。從生產環境推送日志文件到FTP服務器的操作可以通過Shell腳本配合Crontab定時器來實現。

網站點擊流數據

       
         
         圖片來源:http://webdataanalysis.net/data-collection-and-preprocessing/weblog-to-clickstream/#comments

      1般在WEB系統中,用戶對站點的頁面的訪問閱讀,點擊行動等1系列的數據都會記錄在日志中,每條日志記錄就代表著上圖中的1個數據點;而點擊流數據關注的就是所有這些點連起來后的1個完全的網站閱讀行動記錄,可以認為是1個用戶對網站的閱讀session。比如說用戶從哪個外站進入到當前的網站,用戶接下來閱讀了當前網站的哪些頁面,點擊了哪些圖片鏈接按鈕等1系列的行動記錄,這1個整體的信息就稱為是該用戶的點擊流記錄。這篇文章中設計的離線分析系統就是搜集WEB系統中產生的這些數據日志,并清洗日志內容存儲散布式的HDFS文件存儲系統上,接著使用離線分析工具HIVE去統計所有用戶的點擊流信息。

      本系統中我們采取Nginx的access.log來做點擊流分析的日志文件。access.log日志文件的格式以下:
      樣例數據格式:
      124.42.13.230 - - [18/Sep/2013:06:57:50 +0000] "GET /shoppingMall?ver=1.2.1 HTTP/1.1" 200 7200 "http://www.baidu.com.cn" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)"
        格式分析: 
        1、 訪客ip地址:124.42.13.230
        2、訪客用戶信息: - -
          3、要求時間:[18/Sep/2013:06:57:50 +0000]
        4、要求方式:GET
          5、要求的url/shoppingMall?ver=1.10.2
        6、要求所用協議:HTTP/1.1
          7、響應碼:200
        8、返回的數據流量:7200
        9、訪客的來源urlhttp://www.baidu.com.cn
        10、訪客所用閱讀器:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)

 搜集用戶數據
      網站會通過前端JS代碼或服務器真個后臺代碼搜集用戶閱讀數據并存儲在網站服務器中。1般運維人員會在離線分析系統和真實生產環境之間部署FTP服務器,并將生產環境上的用戶數據每天定時發送到FTP服務器上,離線分析系統就會從FTP服務上收集數據而不會影響到生產環境。
      
收集數據的方式有多種,1種是通過自己編寫shell腳本或Java編程收集數據,但是工作量大,不方便保護,另外一種就是直接使用第3方框架去進行日志的收集,1般第3方框架的硬朗性,容錯性和易用性都做得很好也易于保護。本文彩用第3方框架Flume進行日志收集,Flume是1個散布式的高效的日志收集系統,它能把散布在不同服務器上的海量日志文件數據統1搜集到1個集中的存儲資源中,FlumeApache的1個頂級項目,與Hadoop也有很好的兼容性。不過需要注意的是Flume其實不是1個高可用的框架,這方面的優化得用戶自己去保護。
        Flume
agent是運行在JVM上的,所以各個服務器上的JVM環境必不可少。每個Flume agent部署在1臺服務器上,Flume會搜集web server 產生的日志數據,并封裝成1個個的事件發送給Flume AgentSourceFlume Agent Source會消費這些搜集來的數據事件并放在Flume Agent Channel,Flume Agent Sink會從Channel中搜集這些收集過來的數據,要末存儲在本地的文件系統中要末作為1個消費資源分發給下1個裝在散布式系統中其它服務器上的Flume進行處理。Flume提供了點對點的高可用的保障,某個服務器上的Flume Agent Channel中的數據只有確保傳輸到了另外一個服務器上的Flume Agent Channel里或正確保存到了本地的文件存儲系統中,才會被移除。
本系統中每個FTP服務器Hadoopname node服務器上都要部署1個Flume Agent;FTPFlume Agent收集Web Server的日志并匯總到name node服務器上的Flume Agent,最后由hadoop name node服務器將所有的日志數據下沉到散布式的文件存儲系統HDFS上面。
      需要注意的是Flume的Source在本文的系統當選擇的是Spooling Directory Source,而沒有選擇Exec Source,由于當Flume服務down掉的時候Spooling Directory Source能記錄上1次讀取到的位置,而Exec Source則沒有,需要用戶自己去處理,當重啟Flume服務器的時候如果處理不好就會有重復數據的問題。固然Spooling Directory Source也是有缺點的,會對讀取過的文件重命名,所以多架1層FTP服務器也是為了不Flume“污染”生產環境。Spooling Directory Source另外1個比較大的缺點就是沒法做到靈活監聽某個文件夾底下所有子文件夾里的所有文件里新追加的內容。關于這些問題的解決方案也有很多,比如選擇其它的日志收集工具,像logstash等。
       FTP
服務器上的Flume配置文件以下:   
agent.channels = memorychannel agent.sinks = target agent.sources.origin.type = spooldir agent.sources.origin.spoolDir = /export/data/trivial/weblogs agent.sources.origin.channels = memorychannel agent.sources.origin.deserializer.maxLineLength = 2048 agent.sources.origin.interceptors = i2 agent.sources.origin.interceptors.i2.type = host agent.sources.origin.interceptors.i2.hostHeader = hostname agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.channel = memorychannel agent.channels.memorychannel.type = memory agent.channels.memorychannel.capacity = 10000 agent.sinks.target.type = avro agent.sinks.target.channel = memorychannel agent.sinks.target.hostname = 172.16.124.130 agent.sinks.target.port = 4545
     這里有幾個參數需要說明,Flume Agent Source可以通過配置deserializer.maxLineLength這個屬性來指定每一個Event的大小,默許是每一個Event2048byte。Flume Agent Channel的大小默許等于于本地服務器JVM所獲得到的內存的80%,用戶可以通過byteCapacityBufferPercentagebyteCapacity兩個參數去進行優化。
     
需要特別注意的是FTP上放入Flume監聽的文件夾中的日志文件不能同名,不然Flume會報錯并停止工作,最好的解決方案就是為每份日志文件拼上時間戳。

     Hadoop服務器上的配置文件以下:   
agent.sources = origin agent.channels = memorychannel agent.sinks = target agent.sources.origin.type = avro agent.sources.origin.channels = memorychannel agent.sources.origin.bind = 0.0.0.0 agent.sources.origin.port = 4545 #agent.sources.origin.interceptors = i1 i2 #agent.sources.origin.interceptors.i1.type = timestamp #agent.sources.origin.interceptors.i2.type = host #agent.sources.origin.interceptors.i2.hostHeader = hostname agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.channel = memorychannel agent.channels.memorychannel.type = memory agent.channels.memorychannel.capacity = 5000000 agent.channels.memorychannel.transactionCapacity = 1000000 agent.sinks.target.type = hdfs agent.sinks.target.channel = memorychannel agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S agent.sinks.target.hdfs.filePrefix = data-%{hostname} agent.sinks.target.hdfs.rollInterval = 60 agent.sinks.target.hdfs.rollSize = 1073741824 agent.sinks.target.hdfs.rollCount = 1000000 agent.sinks.target.hdfs.round = true agent.sinks.target.hdfs.roundValue = 10 agent.sinks.target.hdfs.roundUnit = minute agent.sinks.target.hdfs.useLocalTimeStamp = true agent.sinks.target.hdfs.minBlockReplicas=1 agent.sinks.target.hdfs.writeFormat=Text agent.sinks.target.hdfs.fileType=DataStream

round, roundValue,roundUnit3個參數是用來配置每10分鐘在hdfs里生成1個文件夾保存從FTP服務器上拉取下來的數據。

    Troubleshooting 
       使用Flume拉取文件到HDFS中會遇到將文件分散成多個1KB⑸KB的小文件的問題   
       需要注意的是如果遇到Flume會將拉取過來的文件分成很多份1KB⑸KB的小文件存儲到HDFS上,那末極可能是HDFS Sink的配置不正確,致使系統使用了默許配置。spooldir類型的source是將指定目錄中的文件的每行封裝成1個event放入到channel中,默許每行最大讀取1024個字符。在HDFS Sink端主要是通過rollInterval(默許30秒), rollSize(默許1KB), rollCount(默許10個event)3個屬性來決定寫進HDFS的分片文件的大小。rollInterval表示經過量少秒后就將當前.tmp文件(寫入的是從channel中過來的events)下沉到HDFS文件系統中,rollSize表示1旦.tmp文件到達1定的size后,就下沉到HDFS文件系統中,rollCount表示.tmp文件1旦寫入了指定數量的events就下沉到HDFS文件系統中。

       使用Flume拉取到HDFS中的文件格式錯亂
       這是由于HDFS Sink的配置中,hdfs.writeFormat屬性默許為“Writable”會將本來的文件的內容序列化成HDFS的格式,應當手動設置成hdfs.writeFormat=“text”; 并且hdfs.fileType默許是“SequenceFile”類型的,是將所有event拼成1行,應當該手動設置成hdfs.fileType=“DataStream”,這樣就能夠是1行1個event,與原文件格式保持1致

使用Mapreduce清洗日志文件
當把日志文件中的數據拉取到HDFS文件系統后,使用Mapreduce程序去進行日志清洗
第1步,先用Mapreduce過濾掉無效的數據
package com.guludada.clickstream; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.dataparser.WebLogParser; public class logClean { public static class cleanMap extends Mapper<Object,Text,Text,NullWritable> { private NullWritable v = NullWritable.get(); private Text word = new Text(); WebLogParser webLogParser = new WebLogParser(); public void map(Object key,Text value,Context context) { //將1行內容轉成string String line = value.toString(); String cleanContent = webLogParser.parser(line); if(cleanContent != "") { word.set(cleanContent); try { context.write(word,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(logClean.class); //指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(cleanMap.class); //指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //指定job的輸入原始文件所在目錄 Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); FileInputFormat.setInputPaths(job, new Path("/flume/events/" + dateStr + "/*/*")); //指定job的輸出結果所在目錄 FileOutputFormat.setOutputPath(job, new Path("/clickstream/cleandata/"+dateStr+"/")); //將job中配置的相干參數,和job所用的java類所在的jar包,提交給yarn去運行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }

package com.guludada.dataparser; import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.guludada.javabean.WebLogBean; /** * 用正則表達式匹配出合法的日志記錄 * * */ public class WebLogParser { public String parser(String weblog_origin) { WebLogBean weblogbean = new WebLogBean(); // 獲得IP地址 Pattern IPPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+"); Matcher IPMatcher = IPPattern.matcher(weblog_origin); if(IPMatcher.find()) { String IPAddr = IPMatcher.group(0); weblogbean.setIP_addr(IPAddr); } else { return "" } // 獲得時間信息 Pattern TimePattern = Pattern.compile("\\[(.+)\\]"); Matcher TimeMatcher = TimePattern.matcher(weblog_origin); if(TimeMatcher.find()) { String time = TimeMatcher.group(1); String[] cleanTime = time.split(" "); weblogbean.setTime(cleanTime[0]); } else { return ""; } //獲得其余要求信息 Pattern InfoPattern = Pattern.compile( "(\\\"[POST|GET].+?\\\") (\\d+) (\\d+).+?(\\\".+?\\\") (\\\".+?\\\")"); Matcher InfoMatcher = InfoPattern.matcher(weblog_origin); if(InfoMatcher.find()) { String requestInfo = InfoMatcher.group(1).replace('\"',' ').trim(); String[] requestInfoArry = requestInfo.split(" "); weblogbean.setMethod(requestInfoArry[0]); weblogbean.setRequest_URL(requestInfoArry[1]); weblogbean.setRequest_protocol(requestInfoArry[2]); String status_code = InfoMatcher.group(2); weblogbean.setRespond_code(status_code); String respond_data = InfoMatcher.group(3); weblogbean.setRespond_data(respond_data); String request_come_from = InfoMatcher.group(4).replace('\"',' ').trim(); weblogbean.setRequst_come_from(request_come_from); String browserInfo = InfoMatcher.group(5).replace('\"',' ').trim(); weblogbean.setBrowser(browserInfo); } else { return ""; } return weblogbean.toString(); } }
package com.guludada.javabean; public class WebLogBean { String IP_addr; String time; String method; String request_URL; String request_protocol; String respond_code; String respond_data; String requst_come_from; String browser; public String getIP_addr() { return IP_addr; } public void setIP_addr(String iP_addr) { IP_addr = iP_addr; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getMethod() { return method; } public void setMethod(String method) { this.method = method; } public String getRequest_URL() { return request_URL; } public void setRequest_URL(String request_URL) { this.request_URL = request_URL; } public String getRequest_protocol() { return request_protocol; } public void setRequest_protocol(String request_protocol) { this.request_protocol = request_protocol; } public String getRespond_code() { return respond_code; } public void setRespond_code(String respond_code) { this.respond_code = respond_code; } public String getRespond_data() { return respond_data; } public void setRespond_data(String respond_data) { this.respond_data = respond_data; } public String getRequst_come_from() { return requst_come_from; } public void setRequst_come_from(String requst_come_from) { this.requst_come_from = requst_come_from; } public String getBrowser() { return browser; } public void setBrowser(String browser) { this.browser = browser; } @Override public String toString() { return IP_addr + " " + time + " " + method + " " + request_URL + " " + request_protocol + " " + respond_code + " " + respond_data + " " + requst_come_from + " " + browser; } }

第1第二天記清洗后的記錄以下圖:
 


2步,根據訪問記錄生成相應的Session信息記錄,假定Session的過期時間是30分鐘

package com.guludada.clickstream; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Locale; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.clickstream.logClean.cleanMap; import com.guludada.dataparser.SessionParser; import com.guludada.dataparser.WebLogParser; import com.guludada.javabean.WebLogSessionBean; public class logSession { public static class sessionMapper extends Mapper<Object,Text,Text,Text> { private Text IPAddr = new Text(); private Text content = new Text(); private NullWritable v = NullWritable.get(); WebLogParser webLogParser = new WebLogParser(); public void map(Object key,Text value,Context context) { //將1行內容轉成string String line = value.toString(); String[] weblogArry = line.split(" "); IPAddr.set(weblogArry[0]); content.set(line); try { context.write(IPAddr,content); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } static class sessionReducer extends Reducer<Text, Text, Text, NullWritable>{ private Text IPAddr = new Text(); private Text content = new Text(); private NullWritable v = NullWritable.get(); WebLogParser webLogParser = new WebLogParser(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SessionParser sessionParser = new SessionParser(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Date sessionStartTime = null; String sessionID = UUID.randomUUID().toString(); //將IP地址所對應的用戶的所有閱讀記錄按時間排序 ArrayList<WebLogSessionBean> sessionBeanGroup = new ArrayList<WebLogSessionBean>(); for(Text browseHistory : values) { WebLogSessionBean sessionBean = sessionParser.loadBean(browseHistory.toString()); sessionBeanGroup.add(sessionBean); } Collections.sort(sessionBeanGroup,new Comparator<WebLogSessionBean>() { public int compare(WebLogSessionBean sessionBean1, WebLogSessionBean sessionBean2) { Date date1 = sessionBean1.getTimeWithDateFormat(); Date date2 = sessionBean2.getTimeWithDateFormat(); if(date1 == null && date2 == null) return 0; return date1.compareTo(date2); } }); for(WebLogSessionBean sessionBean : sessionBeanGroup) { if(sessionStartTime == null) { //當天日志中某用戶第1次訪問網站的時間 sessionStartTime = timeTransform(sessionBean.getTime()); content.set(sessionParser.parser(sessionBean, sessionID)); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { Date sessionEndTime = timeTransform(sessionBean.getTime()); long sessionStayTime = timeDiffer(sessionStartTime,sessionEndTime); if(sessionStayTime > 30 * 60 * 1000) { //將當前閱讀記錄的時間設為下1個session的開始時間 sessionStartTime = timeTransform(sessionBean.getTime()); sessionID = UUID.randomUUID().toString(); continue; } content.set(sessionParser.parser(sessionBean, sessionID)); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } private Date timeTransform(String time) { Date standard_time = null; try { standard_time = sdf.parse(time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } return standard_time; } private long timeDiffer(Date start_time,Date end_time) { long diffTime = 0; diffTime = end_time.getTime() - start_time.getTime(); return diffTime; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(logClean.class); //指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(sessionMapper.class); job.setReducerClass(sessionReducer.class); //指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定終究輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); //指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path("/clickstream/cleandata/"+dateStr+"/*")); //指定job的輸出結果所在目錄 FileOutputFormat.setOutputPath(job, new Path("/clickstream/sessiondata/"+dateStr+"/")); //將job中配置的相干參數,和job所用的java類所在的jar包,提交給yarn去運行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
package com.guludada.dataparser; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import com.guludada.javabean.WebLogSessionBean; public class SessionParser { SimpleDateFormat sdf_origin = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH); SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public String parser(WebLogSessionBean sessionBean,String sessionID) { sessionBean.setSession(sessionID); return sessionBean.toString(); } public WebLogSessionBean loadBean(String sessionContent) { WebLogSessionBean weblogSession = new WebLogSessionBean(); String[] contents = sessionContent.split(" "); weblogSession.setTime(timeTransform(contents[1])); weblogSession.setIP_addr(contents[0]); weblogSession.setRequest_URL(contents[3]); weblogSession.setReferal(contents[7]); return weblogSession; } private String timeTransform(String time) { Date standard_time = null; try { standard_time = sdf_origin.parse(time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } return sdf_final.format(standard_time); } }
package com.guludada.javabean; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class WebLogSessionBean { String time; String IP_addr; String session; String request_URL; String referal; public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getIP_addr() { return IP_addr; } public void setIP_addr(String iP_addr) { IP_addr = iP_addr; } public String getSession() { return session; } public void setSession(String session) { this.session = session; } public String getRequest_URL() { return request_URL; } public void setRequest_URL(String request_URL) { this.request_URL = request_URL; } public String getReferal() { return referal; } public void setReferal(String referal) { this.referal = referal; } public Date getTimeWithDateFormat() { SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); if(this.time != null && this.time != "") { try { return sdf_final.parse(this.time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return null; } @Override public String toString() { return time + " " + IP_addr + " " + session + " " + request_URL + " " + referal; } }

第2次清算出來的Session信息結構以下:
時間 IP SessionID 要求頁面URL Referal URL
2015-05⑶0 19:38:00 192.168.12.130 Session1 /blog/me www.baidu.com
2015-05⑶0 19:39:00 192.168.12.130 Session1 /blog/me/details www.mysite.com/blog/me
2015-05⑶0 19:38:00 192.168.12.40 Session2 /blog/me www.baidu.com



第3步,清洗第2步生成的Session信息,生成PageViews信息表
package com.guludada.clickstream; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Locale; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.clickstream.logClean.cleanMap; import com.guludada.clickstream.logSession.sessionMapper; import com.guludada.clickstream.logSession.sessionReducer; import com.guludada.dataparser.PageViewsParser; import com.guludada.dataparser.SessionParser; import com.guludada.dataparser.WebLogParser; import com.guludada.javabean.PageViewsBean; import com.guludada.javabean.WebLogSessionBean; public class PageViews { public static class pageMapper extends Mapper<Object,Text,Text,Text> { private Text word = new Text(); public void map(Object key,Text value,Context context) { String line = value.toString(); String[] webLogContents = line.split(" "); //根據session來分組 word.set(webLogContents[2]); try { context.write(word,value); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static class pageReducer extends Reducer<Text, Text, Text, NullWritable>{ private Text session = new Text(); private Text content = new Text(); private NullWritable v = NullWritable.get(); PageViewsParser pageViewsParser = new PageViewsParser(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //上1條記錄的訪問信息 PageViewsBean lastStayPageBean = null; Date lastVisitTime = null; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //將session所對應的所有閱讀記錄按時間排序 ArrayList<PageViewsBean> pageViewsBeanGroup = new ArrayList<PageViewsBean>(); for(Text pageView : values) { PageViewsBean pageViewsBean = pageViewsParser.loadBean(pageView.toString()); pageViewsBeanGroup.add(pageViewsBean); } Collections.sort(pageViewsBeanGroup,new Comparator<PageViewsBean>() { public int compare(PageViewsBean pageViewsBean1, PageViewsBean pageViewsBean2) { Date date1 = pageViewsBean1.getTimeWithDateFormat(); Date date2 = pageViewsBean2.getTimeWithDateFormat(); if(date1 == null && date2 == null) return 0; return date1.compareTo(date2); } }); //計算每一個頁面的停留時間 int step = 0; for(PageViewsBean pageViewsBean : pageViewsBeanGroup) { Date curVisitTime = pageViewsBean.getTimeWithDateFormat(); if(lastStayPageBean != null) { //計算前后兩次訪問記錄像差的時間,單位是秒 Integer timeDiff = (int) ((curVisitTime.getTime() - lastVisitTime.getTime())/1000); //根據當前記錄的訪問信息更新上1條訪問記錄中訪問的頁面的停留時間 lastStayPageBean.setStayTime(timeDiff.toString()); } //更新訪問記錄的步數 step++; pageViewsBean.setStep(step+""); //更新上1條訪問記錄的停留時間后,將當前訪問記錄設定為上1條訪問信息記錄 lastStayPageBean = pageViewsBean; lastVisitTime = curVisitTime; //輸出pageViews信息 content.set(pageViewsParser.parser(pageViewsBean)); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(PageViews.class); //指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(pageMapper.class); job.setReducerClass(pageReducer.class); //指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定終究輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); //指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path("/clickstream/sessiondata/"+dateStr+"/*")); //指定job的輸出結果所在目錄 FileOutputFormat.setOutputPath(job, new Path("/clickstream/pageviews/"+dateStr+"/")); //將job中配置的相干參數,和job所用的java類所在的jar包,提交給yarn去運行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
package com.guludada.dataparser; import com.guludada.javabean.PageViewsBean; import com.guludada.javabean.WebLogSessionBean; public class PageViewsParser { /** * 根據logSession的輸出數據加載PageViewsBean * * */ public PageViewsBean loadBean(String sessionContent) { PageViewsBean pageViewsBean = new PageViewsBean(); String[] contents = sessionContent.split(" "); pageViewsBean.setTime(contents[0] + " " + contents[1]); pageViewsBean.setIP_addr(contents[2]); pageViewsBean.setSession(contents[3]); pageViewsBean.setVisit_URL(contents[4]); pageViewsBean.setStayTime("0"); pageViewsBean.setStep("0"); return pageViewsBean; } public String parser(PageViewsBean pageBean) { return pageBean.toString(); } }
package com.guludada.javabean; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class PageViewsBean { String session; String IP_addr; String time; String visit_URL; String stayTime; String step; public String getSession() { return session; } public void setSession(String s
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 校园春色欧美 | 日本美女一级黄色片 | 亚洲在线中文 | 另类小说图片 | 欧美另类一区 | 自拍偷拍1| 亚洲欧美一 | 免费xx | 欧美一二三区 | 最近免费中文字幕大全高清片 | 中文字幕 日本 | 亚洲午夜伦理 | 国产福利不卡一区二区三区 | 亚洲精品久久一区影院 | 日韩小视频在线播放 | 久久综合九色综合欧洲色 | 国产三级午夜理伦三级 | 欧美日韩视频在线播放 | 成人午夜精品久久久久久久小说 | 亚洲另类春色校园小说 | 国产日产欧美一区二区三区 | 亚欧乱色一区二区三区 | 久久福利一区二区三区 | 国产91在线九色 | 看看免费a一片欧 | 一二三四观看在线视频中文 | 亚洲免费天堂 | 来吧成人综合网 | free性欧美极度另类性性欧美 | 成人午夜视频在线观看 | 亚洲天堂一区二区 | 女人18特级一级毛片免费视频 | 久久国产精品永久免费网站 | 久久欧美精品欧美九久欧美 | 欧美一区二区三区久久综 | xxxx18野外xxxxfreexxxx日本 | 中文字幕第一页亚洲 | 久久久久伊人 | 日本vs欧美一区二区三区 | 视频在线观看免费视频 | 亚洲视频在线网站 |