多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php框架 > 框架設計 > Hadoop RPC通信Client客戶端的流程分析

Hadoop RPC通信Client客戶端的流程分析

來源:程序員人生   發布時間:2015-01-10 08:58:38 閱讀次數:4043次

            Hadoop的RPC的通訊與其他系統的RPC通訊不太1樣,作者針對Hadoop的使用特點,專門的設計了1套RPC框架,這套框架個人感覺還是有點小復雜的。所以我打算分成Client客戶端和Server服務端2個模塊做分析。如果你對RPC的整套流程已非常了解的條件下,對Hadoop的RPC,你也1定可以非常迅速的了解的。OK,下面切入正題。

            Hadoop的RPC的相干代碼都在org.apache.hadoop.ipc的包下,首先RPC的通訊必須遵照許多的協議,其中最最基本的協議即便以下;

/** * Superclass of all protocols that use Hadoop RPC. * Subclasses of this interface are also supposed to have * a static final long versionID field. * Hadoop RPC所有協議的基類,返回協議版本號 */ public interface VersionedProtocol { /** * Return protocol version corresponding to protocol interface. * @param protocol The classname of the protocol interface * @param clientVersion The version of the protocol that the client speaks * @return the version that the server will speak */ public long getProtocolVersion(String protocol, long clientVersion) throws IOException; }

他是所有協議的基類,他的下面還有1堆的子類,分別對應于不同情況之間的通訊,下面是1張父子類圖:

          

 顧名思義,只有客戶端和服務端遵守相同的版本號,才能進行通訊。

           RPC客戶真個所有相干操作都被封裝在了1個叫Client.java的文件中:

/** A client for an IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on * a port and is defined by a parameter class and a value class. * RPC客戶端類 * @see Server */ public class Client { public static final Log LOG = LogFactory.getLog(Client.class); //客戶端到服務真個連接 private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>(); //回調值類 private Class<? extends Writable> valueClass; // class of call values //call回調id的計數器 private int counter; // counter for call ids //原子變量判斷客戶端是不是還在運行 private AtomicBoolean running = new AtomicBoolean(true); // if client runs final private Configuration conf; //socket工廠,用來創建socket private SocketFactory socketFactory; // how to create sockets private int refCount = 1; ......
從代碼中明顯的看到,這里存在著1個類似于connections連接池的東西,其實這暗示著連接是可以被復用的,在hashtable中,與每一個Connecttion連接的對應的是1個ConnectionId,明顯這里不是1個Long類似的數值:

/** * This class holds the address and the user ticket. The client connections * to servers are uniquely identified by <remoteAddress, protocol, ticket> * 連接的唯1標識,主要通過<遠程地址,協議類型,用戶組信息> */ static class ConnectionId { //遠程的socket地址 InetSocketAddress address; //用戶組信息 UserGroupInformation ticket; //協議類型 Class<?> protocol; private static final int PRIME = 16777619; private int rpcTimeout; private String serverPrincipal; private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs private int maxRetries; //the max. no. of retries for socket connections private boolean tcpNoDelay; // if T then disable Nagle's Algorithm private int pingInterval; // how often sends ping to the server in msecs ....
這里用了3個屬性組成唯1的標識屬性,為了保證可以進行ID的復用,所以作者對ConnectionId的equal比較方法和hashCode 進行了重寫:

/** * 作者重寫了equal比較方法,只要成員變量都想等也就想到了 */ @Override public boolean equals(Object obj) { if (obj == this) { return true; } if (obj instanceof ConnectionId) { ConnectionId that = (ConnectionId) obj; return isEqual(this.address, that.address) && this.maxIdleTime == that.maxIdleTime && this.maxRetries == that.maxRetries && this.pingInterval == that.pingInterval && isEqual(this.protocol, that.protocol) && this.rpcTimeout == that.rpcTimeout && isEqual(this.serverPrincipal, that.serverPrincipal) && this.tcpNoDelay == that.tcpNoDelay && isEqual(this.ticket, that.ticket); } return false; } /** * 重寫了hashCode的生成規則,保證不同的對象產生不同的hashCode值 */ @Override public int hashCode() { int result = 1; result = PRIME * result + ((address == null) ? 0 : address.hashCode()); result = PRIME * result + maxIdleTime; result = PRIME * result + maxRetries; result = PRIME * result + pingInterval; result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode()); result = PRIME * rpcTimeout; result = PRIME * result + ((serverPrincipal == null) ? 0 : serverPrincipal.hashCode()); result = PRIME * result + (tcpNoDelay ? 1231 : 1237); result = PRIME * result + ((ticket == null) ? 0 : ticket.hashCode()); return result; }
這樣就可以保證對應同類型的連接就可以夠完全復用了,而不是僅僅憑仗援用的關系判斷對象是不是相等,這里就是1個不錯的設計了

            與連接Id對應的就是Connection了,它里面保護是1下的1些變量;

/** Thread that reads responses and notifies callers. Each connection owns a * socket connected to a remote address. Calls are multiplexed through this * socket: responses may be delivered out of order. */ private class Connection extends Thread { //所連接的服務器地址 private InetSocketAddress server; // server ip:port //服務真個krb5的名字,與安全方面相干 private String serverPrincipal; // server's krb5 principal name //連接頭部,內部包括了,所用的協議,客戶端用戶組信息和驗證的而方法 private ConnectionHeader header; // connection header //遠程連接ID private final ConnectionId remoteId; // connection id //連接驗證方法 private AuthMethod authMethod; // authentication method //下面3個變量都是安全方面的 private boolean useSasl; private Token<? extends TokenIdentifier> token; private SaslRpcClient saslRpcClient; //下面是1組socket通訊方面的變量 private Socket socket = null; // connected socket private DataInputStream in; private DataOutputStream out; private int rpcTimeout; private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs private int maxRetries; //the max. no. of retries for socket connections //tcpNoDelay可設置是不是阻塞模式 private boolean tcpNoDelay; // if T then disable Nagle's Algorithm private int pingInterval; // how often sends ping to the server in msecs // currently active calls 當前活躍的回調,1個連接 可能會有很多個call回調 private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>(); //最后1次IO活動通訊的時間 private AtomicLong lastActivity = new AtomicLong();// last I/O activity time //連接關閉標記 private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed private IOException closeException; // close reason .....
里面保護了大量的和連接通訊相干的變量,在這里有1個很成心思的東西connectionHeader,連接頭部,里面的數據時為了在通訊最開始的時候被使用:

class ConnectionHeader implements Writable { public static final Log LOG = LogFactory.getLog(ConnectionHeader.class); //客戶端和服務端通訊的協議名稱 private String protocol; //客戶真個用戶組信息 private UserGroupInformation ugi = null; //驗證的方式,關系到寫入數據的時的格式 private AuthMethod authMethod; .....
起到標識驗證的作用。1個Client類的基本結構我們基本可以描繪出來了,下面是完全的類關系圖:


在上面這幅圖中,你肯定會發現我少了1個很關鍵的類了,就是Call回調類。Call回調在很多異步通訊中是常常出現的。由于在通訊進程中,當1個對象通過網絡發送要求給另外1個對象的時候,如果采取同步的方式,會1直阻塞在那里,會帶來非常不好的效力和體驗的,所以很多時候,我們采取的是1種叫回調接口的方式。在這期間,用戶可以繼續做自己的事情。所以一樣的Call這個概念固然也是適用在Hadoop RPC中。在Hadoop的RPC的核心調用原理, 簡單的說,就是我把parame參數序列化到1個對象中,通過參數的情勢把對象傳入,進行RPC通訊,最后服務端把處理好的結果值放入call對象,在返回給客戶端,也就是說客戶端和服務端都是通過Call對象進行操作,Call里面存著,要求的參數,和處理后的結構值2個變量。通過Call對象的封裝,客戶單實現了完善的不必知道細節的調用。下面是Call類的類按時

/** A call waiting for a value. */ //客戶真個1個回調 private class Call { //回調ID int id; // call id //被序列化的參數 Writable param; // parameter //返回值 Writable value; // value, null if error //出錯時返回的異常 IOException error; // exception, null if value //回調是不是已被完成 boolean done; // true when call is done ....
看到這個Call回調類,或許你漸漸的會明白Hadoop RPC的1個基本原型了,這些Call固然是存在于某個連接中的,1個連接可能會產生多個回調,所以在Connection中保護了calls列表:
private class Connection extends Thread { .... // currently active calls 當前活躍的回調,1個連接 可能會有很多個call回調 private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
作者在設計Call類的時候,比較聰明的斟酌1種并發情況下的Call調用,所以為此設計了下面這個Call的子類,就是專門用于短時間內的瞬間Call調用:

/** Call implementation used for parallel calls. */ /** 繼承自Call回調類,可以并行的使用,通過加了index下標做Call的辨別 */ private class ParallelCall extends Call { //每一個ParallelCall并行的回調就會有對應的結果類 private ParallelResults results; //index作為Call的辨別 private int index; ....
如果要查找值,就通過里面的ParallelCall查找,原理是根據index索引:

/** Result collector for parallel calls. */ private static class ParallelResults { //并行結果類中具有1組返回值,需要ParallelCall的index索引匹配 private Writable[] values; //結果值的數量 private int size; //values中已知的值的個數 private int count; ..... /** Collect a result. */ public synchronized void callComplete(ParallelCall call) { //將call中的值賦給result中 values[call.index] = call.value; // store the value count++; // count it //如果計數的值等到終究大小,通知caller if (count == size) // if all values are in notify(); // then notify waiting caller } }
由于Call結構集是這些并發Call共有的,所以用的是static變量,都存在在了values數組中了,只有所有的并發Call都把值取出來了,才算回調成功,這個是個非常細小的輔助設計,這個在有些書籍上并沒有多少提及。下面我們看看1般Call回調的流程,正如剛剛說的,終究客戶端看到的情勢就是,傳入參數,取得結果,疏忽內部1切邏輯,這是怎樣做到的呢,答案在下面:

在履行之前,你會先得到ConnectionId:

public Writable call(Writable param, InetSocketAddress addr, Class<?> protocol, UserGroupInformation ticket, int rpcTimeout) throws InterruptedException, IOException { ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout, conf); return call(param, remoteId); }
接著才是主流程:

public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { //根據參數構造1個Call回調 Call call = new Call(param); //根據遠程ID獲得連接 Connection connection = getConnection(remoteId, call); //發送參數 connection.sendParam(call); // send the parameter boolean interrupted = false; synchronized (call) { //如果call.done為false,就是Call還沒完成 while (!call.done) { try { //等待遠端程序的履行終了 call.wait(); // wait for the result } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } //如果是異常中斷,則終止當前線程 if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } //如果call回到出錯,則返回call出錯信息 if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception // use the connection because it will reflect an ip change, unlike // the remoteId throw wrapException(connection.getRemoteAddress(), call.error); } } else { //如果是正常情況下,返回回調解理后的值 return call.value; } } }
在這上面的操作步驟中,重點關注2個函數,獲得連接操作,看看人家是如何保證連接的復用性的:

private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException { ..... /* we could avoid this allocation for each RPC by having a * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ do { synchronized (connections) { //從connection連接池中獲得連接,可以保證相同的連接ID可以復用 connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId); connections.put(remoteId, connection); } } } while (!connection.addCall(call));
有點單例模式的味道哦,還有1個方法叫sendParam發送參數方法:

public void sendParam(Call call) { if (shouldCloseConnection.get()) { return; } DataOutputBuffer d=null; try { synchronized (this.out) { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); //for serializing the //data to be written //將call回調中的參數寫入到輸出流中,傳向服務端 d = new DataOutputBuffer(); d.writeInt(call.id); call.param.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); out.writeInt(dataLength); //first put the data length out.write(data, 0, dataLength);//write the data out.flush(); } ....
代碼只發送了Call的id,和要求參數,并沒有把所有的Call的內容都扔出去了,1定是為了減少數據量的傳輸,這里還把數據的長度寫入了,這是為了方燕服務端準確的讀取到不定長的數據。這服務端中間的處理操作不是今天討論的重點。Call的履行進程就是這樣。那末Call是如何被調用的呢,這又要重新回到了Client客戶端上去了,Client有1個run()函數,所有的操作都是始于此的;

public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); //等待工作,等待要求調用 while (waitForWork()) {//wait here for work - read or close connection //調用完要求,則立即獲得回復 receiveResponse(); } close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }
操作很簡單,程序1直跑著,有要求,處理要求,獲得要求,沒有要求,就死等

private synchronized boolean waitForWork() { if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { long timeout = maxIdleTime- (System.currentTimeMillis()-lastActivity.get()); if (timeout>0) { try { wait(timeout); } catch (InterruptedException e) {} } } ....
獲得回復的操作以下:

/* Receive a response. * Because only one receiver, so no synchronization on in. * 獲得回復值 */ private void receiveResponse() { if (shouldCloseConnection.get()) { return; } //更新最近1次的call活動時間 touch(); try { int id = in.readInt(); // try to read an id if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); //從獲得call中獲得相應的call Call call = calls.get(id); //判斷該結果狀態 int state = in.readInt(); // read call status if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value call.setValue(value); calls.remove(id); } else if (state == Status.ERROR.state) { call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); calls.remove(id); } else if (state == Status.FATAL.state) { // Close the connection markClosed(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); } ..... } catch (IOException e) { markClosed(e); } }
從之前保護的Call列表中取出,做判斷。Client本身的履行流程比較的簡單:




Hadoop RPC客戶真個通訊模塊的部份大致就是我上面的這個流程,中間其實還疏忽了很多的細節,大家學習的時候,針對源碼會有助于更好的理解,Hadoop RPC的服務真個實現更加復雜,所以建議采取分模塊的學習也許會更好1點。

生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 国产精品性视频免费播放 | 宇都宫紫苑乳在线观看 | 另类小说区| 久久精品国产亚洲麻豆 | 亚洲视频欧美 | 欧美日韩性视频一区二区三区 | 性色影院| 色中色在线视频 | 爽好舒服快奶水视频 | 图片区 日韩 欧美 亚洲 | 特级淫片aaaaa片毛片 | 手机看片欧美日韩 | 欧美一级aa免费毛片 | 国产成人亚洲精品2020 | 国内精品免费视频精选在线观看 | 日韩欧美亚洲国产精品字幕久久久 | 国产精品亚洲欧美 | www.在线视频 | jizz.日本| 伊人丁香婷婷综合一区二区 | 色婷婷伊人 | 亚洲a在线播放 | 日韩欧美视频在线 | 日本在线中文 | 日韩a级毛片免费视频 | 欧美一区二区激情三区 | 欧美xxxx做受欧美人妖 | 国产一区二区成人 | 日本免费新一区二区三区 | 视频一区二区精品的福利 | 69久久| www.黄色在线 | 国产或人精品日本亚洲77美色 | 欧美精品亚洲精品日韩一区 | 日韩精品在线播放 | 成年人在线视频网站 | 亚洲精品综合一区二区三区 | 中文乱码字字幕在线第5页 中文欧美日韩 | 国产高清一区二区三区视频 | 全部无卡免费的毛片在线看 | 亚洲aⅴ天堂 |