多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php框架 > 框架設計 > [置頂] 分布式內存網格Hazelcast源碼導讀

[置頂] 分布式內存網格Hazelcast源碼導讀

來源:程序員人生   發布時間:2016-06-27 16:27:04 閱讀次數:4978次

去年項目需要看了hazelcast源碼,當時記錄的筆記。



Node是節點的抽象,里面包括節點引擎、客戶端引擎、分區服務、集群服務、組播服務、連接收理、命令管理、組播屬性、節點配置、本地成員、tcp地址、組播地址、連接者、節點初始化器、管理中心、安全上下文、


Config類,包括GroupConfig、NetworkConfig、MapConfig、TopicConfig、QueueConfig、MultiMapConfig、ListConfig、SetConfig、ExecutorConfig、SemaphoreConfig、WanReplicationConfig、ServicesConfig、SecurityConfig、ListenerConfig、PartitionGroupConfig、ManagementCenterConfig、SerializationConfig。

GroupConfig集群用戶名及密碼DEFAULT_GROUP_PASSWORD = "dev-pass";DEFAULT_GROUP_NAME = "dev";
NetworkConfig網絡相干配置,包括InterfacesConfig、JoinConfig、SSLConfig等,其中JoinConfig包括MulticastConfig、TcpIpConfig、AwsConfig。
SecurityConfig客戶端登陸相干配置。
WanReplicationConfig集群復制配置,包括WanTargetClusterConfig配置,所有目標節點相干配置。

ClusterServiceImpl用于保護集群各個成員節點,實例化時把本地節點加入成員Map中,MulticastService

Node實例化時根據MulticastConfig使用組播加入1個組,MulticastService提供節點基于組播傳遞的,使用監聽模式每當接收到其他節點傳播的消息調用監聽器的onMessage,傳遞的參數為JoinMessage,它由序列化模塊提供轉化,默許NodeMulticastListener監聽器,會對集群的節點校驗,是不是是同個集群用戶名密碼。

MulticastJoiner用于加入集群節點,創建JoinRequest對象用MulticastService發送給其他成員,不斷發送加入集群要求直到node里面的masteraddress,如果配置里面指定了targetAddress就不用使用這類不斷發送的方式選舉主節點。MulticastService負責發送申請加入的組播消息和組播消息接收及處理的工作。

找主節點方式:循環發送JoinRequest消息向組內發送,如果已加入狀態且是master的節點接收到后會向外組播JoinMessage,告知其他節點我的組成員信息,還沒加入集群的成員則將自己節點的master地址設置為master發出的JoinMessage中的地址。所以JoinMessage只有master才會發出。其他已加入組的成員節點接收到JoinMessage類型消息則直接疏忽,

JoinMessage包括包版本、地址等信息。

SerializationService序列化轉化模塊,SerializationServiceBuilder生產者,默許字節寄存順序序列為ByteOrder.BIG_ENDIAN,即便用ByteArrayInputOutputFactory,

Packet為數據包結構,第1字節為版本號,第2字節表示為長度,第3字節表示類型。

SocketConnector會履行連接操作,使用的是阻塞讀寫。

TcpIpConnectionManager集成ConnectionManager,用于管理TCPIP連接

JoinMessage{
protected byte packetVersion;
    protected int buildNumber;
    protected Address address;
    protected String uuid;
    protected ConfigCheck configCheck;
    protected int memberCount;
}

JoinRequest繼承JoinMessage,并添加credentials、tryCount屬性。

默許5701作為每一個節點對外暴露的端口。

ReadHandler的handle方法會初始化socketReader,集群成員保護的話使用SocketPacketReader讀取報文,

OperationServiceImpl類內部類RemoteOperationProcessor用于ResponseOperation對象
response.beforeRun();
response.run();
response.afterRun();
即會調用JoinRequestOperation的run方法,調用ClusterServiceImpl里面的handleJoinRequest方法,完成成員更新工作,接收獲功會向源節點返回SetMasterOperation、MemberInfoUpdateOperation等等更新源節點成員及節點joined狀態。

里面1切需要處理的都用run方法,會統1處理。

Node.start--TcpIpConnectionManager.start--InSelectorImpl.run(輸入即讀取選擇器,run不斷選擇可讀的對象并獲得attachment,調用它的handle方法,這里的attachment是在TcpIpConnection實例化的時候創建readHandle對象并調用start方法把readHandler當作attachment注冊到socketChannel里面的,readHandler調用handle方法,如果是集群協議CLUSTER則實例化SocketPacketReader,SocketPacketReader包括1個PacketReader,默許是DefaultPacketReader,用于將套接字讀出來的字節轉化成Packet包并調用handlePacket方法處理消息包,如果是集群成員消息包則調用handleMemberPacket方法處理,間接調用NodeEngineImpl的handlePacket方法,有3種不同頭部類型:Packet.HEADER_OP、Packet.HEADER_EVENT、Packet.HEADER_WAN_REPLICATION,HEADER_OP表示操作類型,會調用operationService的handleOperation方法處理包消息,OperationThread會1直跑調用OperationRunner處理需要履行的Operation,OperationRunner線程數由hazelcast.operation.thread.count設置,如果⑴則為機器cpu個數,接著會履行operation的beforeRun、run、operation如果設置了響應的話還要履行產生響應內容在返回給調用者、afterRun方法。)--OutSelectorImpl.run(輸出即寫入選擇器)--SocketAcceptor.run--TcpIpConnection.start(TcpIpConnection包括WriteHandler和ReadHandler)--

private void process(Object task) {
        processedCount++;


        if (task instanceof Operation) {
            processOperation((Operation) task);
            return;
        }


        if (task instanceof Packet) {
            processPacket((Packet) task);
            return;
        }


        if (task instanceof PartitionSpecificRunnable) {
            processPartitionSpecificRunnable((PartitionSpecificRunnable) task);
            return;
        }


        throw new IllegalStateException("Unhandled task type for task:" + task);
    }


設計得不錯的是它直接支持各種Operation的傳遞,并履行,本質也是序列化反序列化,然后調用Operation的beforerun、run、afterrun方法,后面還會自動履行handleResponse方法,此方法用于向其他備份節點同步數據,這部份操作是在operation的afterrun之前完成備份。備份工作由OperationBackupHandler完成,backup方法,備份又分為同步備份和異步備份,相加等于總的需要備份數。map操作默許備份1份數據且是同步的,異步的默許為0.異步備份不會阻塞操作。備份的operation是Backup,它的run會履行PutOperation的run方法,即把數據放到緩存中并修改版本,這里的run不會再履行復制操作。
<hazelcast>
  <map name="default">
    <backup-count>1</backup-count>
    <async-backup-count>1</async-backup-count>
  </map>
</hazelcast>
operation的run一樣會在備份節點上履行,putoperation其實就是在本地緩存更新值。備份的進程沒有1個ack機制,信息傳輸的可靠性如何保證?

1個節點調用map的put操作時,會在本節點上緩存這個結果,再把operation傳輸到對應partition的第1個備份節點(這個節點可能就是自己本地節點)上,第1個節點接收到后備份到第2個節點上,所以默許就只有兩個備份數據。所以nearcache緩存是可能存在每一個節點上的。

PutOperation的afterRun方法主要是觸發1些攔截器,觸發各個節點的事件監聽器甚么的、更新各個備份節點緩存等等。

IOBalancer平衡io讀取寫入。

packet報文結構,1byte版本+2byte頭部+4byte分區+4byte長度+nbyte消息。

主節點1個1個發給其他成員節點關于成員的消息,其他節點進行更新。

ServiceManagerImpl用于管理所有的遠程,啟動時會注冊經常使用的1些服務,包括以下的服務,甚么map、queue甚么的
    private void registerCoreServices() {
        Node node = nodeEngine.getNode();
        registerService(ClusterServiceImpl.SERVICE_NAME, node.getClusterService());
        registerService(InternalPartitionService.SERVICE_NAME, node.getPartitionService());
        registerService(ProxyServiceImpl.SERVICE_NAME, nodeEngine.getProxyService());
        registerService(TransactionManagerServiceImpl.SERVICE_NAME, nodeEngine.getTransactionManagerService());
        registerService(ClientEngineImpl.SERVICE_NAME, node.clientEngine);
        registerService(QuorumServiceImpl.SERVICE_NAME, nodeEngine.getQuorumService());
    }


    private void registerDefaultServices(ServicesConfig servicesConfig) {
        registerService(MapService.SERVICE_NAME, createService(MapService.class));
        registerService(LockService.SERVICE_NAME, new LockServiceImpl(nodeEngine));
        registerService(QueueService.SERVICE_NAME, new QueueService(nodeEngine));
        registerService(TopicService.SERVICE_NAME, new TopicService());
        registerService(ReliableTopicService.SERVICE_NAME, new ReliableTopicService(nodeEngine));
        registerService(MultiMapService.SERVICE_NAME, new MultiMapService(nodeEngine));
        registerService(ListService.SERVICE_NAME, new ListService(nodeEngine));
        registerService(SetService.SERVICE_NAME, new SetService(nodeEngine));
        registerService(DistributedExecutorService.SERVICE_NAME, new DistributedExecutorService());
        registerService(AtomicLongService.SERVICE_NAME, new AtomicLongService());
        registerService(AtomicReferenceService.SERVICE_NAME, new AtomicReferenceService());
        registerService(CountDownLatchService.SERVICE_NAME, new CountDownLatchService());
        registerService(SemaphoreService.SERVICE_NAME, new SemaphoreService(nodeEngine));
        registerService(IdGeneratorService.SERVICE_NAME, new IdGeneratorService(nodeEngine));
        registerService(MapReduceService.SERVICE_NAME, new MapReduceService(nodeEngine));
        registerService(ReplicatedMapService.SERVICE_NAME, new ReplicatedMapService(nodeEngine));
        registerService(RingbufferService.SERVICE_NAME, new RingbufferService(nodeEngine));
        registerService(XAService.SERVICE_NAME, new XAService(nodeEngine));
        registerCacheServiceIfAvailable();
    }

Map通過AbstractMapServiceFactory創建,使用MapRemoteService處理遠程操作,RemoteService服務有兩個方法createDistributedObject和destroyDistributedObject方法,終究是通過MapProxyImpl。Map的partition策略在MapContainer里面。

PutOperation用于履行集群節點的put操作。1致性哈希根據key使用MurmurHash哈希計算出結果,再根據分區數(默許271)取余。每一個節點都是使用new ConcurrentHashMap<Data, Record>(1000, 0.75f, 1)寄存記錄。

成員組MemberGroup1共有若干個組,多少個成員就多少個組,最大的復制節點數為7,如果成員組小于7則使用成員組數量。

address[271][4]

Address[271][7] 
1、addr0 addr9 addr3 addr4 null null null
2、.
.
.
.
.
271、addr1 addr2 addr3 addr4 null null null


對分配的結果嘗試重新分配,把過載的組分配1些成員給不足的組,并檢測每一個成員組內的節點數相差不會超過系數1.1,否則重新分組,盡量到達均勻。


終究構成1張表,271個分區每一個分區都對應著若干個復制的成員地址。

int avgPartitionPerGroup = partitionCount / groupSize;
就是說最多4個組,假設5個結點,分組情況為2,1,1,1,則每一個組分到的partition個數為271/4,可能有些組多1個partition。

1共有271條線程處理operation,OperationThread,里面有隊列ScheduleQueue,線程會不斷處理,ScheduleQueue用于operation履行緩沖隊列,里面的有兩種隊列,normalQueue和priorityQueue,1種用于正常的排隊,1種用于設置優先級的隊列,take方法會優先從priorityQueue中獲得需要優先處理的operation。

有個同步復制、異步復制。

OperationServiceImpl.createInvocationBuilder(此方法有兩個,1個用于partition、1個用于指定的address),->  實例化1個InvocationBuilderImpl對象,調用invoke方法會創建Invocation,Invocation包括PartitionInvocation(針對分區)和TargetInvocation(針對指定節點)兩種。

在集群中傳輸1切以Data情勢傳輸。

Map的服務名為hz:impl:mapService。
假設HazelcastInstance履行instance.getMap("customers")則,通過HazelcastInstanceProxy的getMap方法,代理是調HazelcastInstanceImpl的getMap方法,調用getDistributedObject方法,它會通過ProxyService(它代理了所有服務)代理找到MapService,調用mapservice的createDistributedObject方法創建DistributedObject,間接調用MapRemoteService的createDistributedObject方法創建MapProxyImpl,調用MapProxyImpl的put方法把數據放到集群,主要以下操作
final Data key = toData(k, partitionStrategy);
final Data value = toData(v);
final Data result = putInternal(key, value, ttl, timeunit);
return (V) toObject(result);
分別將key和value轉化為Data,實際上是序列化,方便網絡傳輸。putInternal方法使用PutOperation,并且要根據key計算出partitionId,接著再完成operation的調用。

SerializationServiceImpl提供各種類型的序列化支持,toData提供由object到Data的轉化,toObject提供由Data到object的轉化,Data默許是使用DefaultData,DefaultData里面其實就是包括了1個字節數組還有不同的偏移量,例如從幾位到幾位表示類型,序列化工作其實也跟這類類似,把某1對象的相干信息轉化為字節數組,傳遞到目的地后再根據約定反向組裝成指定對象。

OperationThread專門用于履行接收到的要求,process方法,可以有3種要求,包括Operation、Packet、PartitionSpecificRunnable,分別不同的處理邏輯,Operation則直接反序列化后調用beforeRun、run、afterRun等方法。

HealthMonitor是獨立1條線程用于監控健康,包括
private class HealthMetrics {
        private final long memoryFree;
        private final long memoryTotal;
        private final long memoryUsed;
        private final long memoryMax;
        private final double memoryUsedOfTotalPercentage;
        private final double memoryUsedOfMaxPercentage;
        //following three load variables are always between 0 and 100.
        private final double processCpuLoad;
        private final double systemLoadAverage;
        private final double systemCpuLoad;
        private final int threadCount;
        private final int peakThreadCount;
        private final long clusterTimeDiff;
        private final int asyncExecutorQueueSize;
        private final int clientExecutorQueueSize;
        private final int queryExecutorQueueSize;
        private final int scheduledExecutorQueueSize;
        private final int systemExecutorQueueSize;
        private final int eventQueueSize;
        private final int pendingInvocationsCount;
        private final double pendingInvocationsPercentage;
        private final int operationServiceOperationExecutorQueueSize;
        private final int operationServiceOperationPriorityExecutorQueueSize;
        private final int operationServiceOperationResponseQueueSize;
        private final int runningOperationsCount;
        private final int remoteOperationsCount;
        private final int proxyCount;
        private final int clientEndpointCount;
        private final int activeConnectionCount;
        private final int currentClientConnectionCount;
        private final int connectionCount;
        private final int ioExecutorQueueSize;
}

PerformanceMonitor表示性能監控,監控的參數包括inSelector的已讀取數量,outSelect的已寫入事件數量,OperationService相干的性能參數,例如掛起的調用比例、整體調用比例、最大調用數量、271個分區線程已履行數量、271個分區operation線程正在掛起線程(即任務排隊隊列中的數量),常規operation線程的排隊隊列數量、常規operation線程已履行數量、響應線程已履行數量、響應線程排隊隊列數量。

hazelcast核心——Node,包括各種各樣重要的基礎服務,日志、節點關閉鉤子、序列化服務、節點引擎、客戶端引擎、分區服務、集群服務、廣播服務、連接收理服務、命令服務、配置文件服務、群組屬性服務、本節點地址、本地集群成員對象、主節點地址、hazelcast實例援用、日志服務、集群節點加入服務、節點擴大服務、管理中心服務、安全上下文、創建信息服務、版本校訂服務、hazelcast線程組。
public class Node {
    private final ILogger logger;
    private final NodeShutdownHookThread shutdownHookThread = new NodeShutdownHookThread("hz.ShutdownThread");
    private final SerializationService serializationService;
    public final NodeEngineImpl nodeEngine;
    public final ClientEngineImpl clientEngine;
    public final InternalPartitionService partitionService;
    public final ClusterServiceImpl clusterService;
    public final MulticastService multicastService;
    public final ConnectionManager connectionManager;
    public final TextCommandServiceImpl textCommandService;
    public final Config config;
    public final GroupProperties groupProperties;
    public final Address address;
    public final MemberImpl localMember;
    private volatile Address masterAddress = null;
    public final HazelcastInstanceImpl hazelcastInstance;
    public final LoggingServiceImpl loggingService;
    private final Joiner joiner;
    private final NodeExtension nodeExtension;
    private ManagementCenterService managementCenterService;
    public final SecurityContext securityContext;
    private final ClassLoader configClassLoader;
    private final BuildInfo buildInfo;
    private final VersionCheck versionCheck = new VersionCheck();
    private final HazelcastThreadGroup hazelcastThreadGroup;
}

NodeEngineImpl作為節點引擎包括了許多服務,重要的例如,事件服務、operation服務、履行服務、等待通知服務、service(內置許多service,例如Map、Queue,用戶自定義的服務可配置到hazelcast.xml,啟動時會加載進來)管理服務、事務管理服務、代理服務、wan復制服務、包傳輸服務、證明人服務。
public class NodeEngineImpl implements NodeEngine {


    private final Node node;
    private final ILogger logger;
    private final EventServiceImpl eventService;
    private final OperationServiceImpl operationService;
    private final ExecutionServiceImpl executionService;
    private final WaitNotifyServiceImpl waitNotifyService;
    private final ServiceManagerImpl serviceManager;
    private final TransactionManagerServiceImpl transactionManagerService;
    private final ProxyServiceImpl proxyService;
    private final WanReplicationService wanReplicationService;
    private final PacketTransceiver packetTransceiver;
    private final QuorumServiceImpl quorumService;


}

ServiceManagerImpl用于管理所有服務,啟動時默許會實例化核心的service、默許的service,如果用戶通過配置文件配置了自定義service則也會實例化。啟動時注冊行將service實例put到ConcurrentMap中,核心service包括ClusterServiceImpl、InternalPartitionService、ProxyServiceImpl、TransactionManagerServiceImpl、ClientEngineImpl、QuorumServiceImpl。默許service包括MapService、LockService、QueueService、TopicService、ReliableTopicService、MultiMapService、ListService、SetService、DistributedExecutorService、AtomicLongService、AtomicReferenceService、CountDownLatchService、SemaphoreService、IdGeneratorService、MapReduceService、ReplicatedMapService、RingbufferService、XAService。如果允許還將把緩存服務CacheService添加進來。
public final class ServiceManagerImpl implements ServiceManager {


    private final ConcurrentMap<String, ServiceInfo> services = new ConcurrentHashMap<String, ServiceInfo>(20, .75f, 1);


}

節點加入,Joiner負責加入工作,例如廣播則使用MulticastJoiner、單播則使用TcpIpJoiner、AWS則使用TcpIpJoinerOverAWS。Node啟動時會根據情況啟動個線程,
multicast只是做節點發現工作,真實的節點加入工作是交由tcpip做,向主節點發送加入要求,主節點把要求節點添加到成員列表中,然后返回要求節點讓它把主節點地址設置為本人。

DefaultSerializers包括DateSerializer、ObjectSerializer、ClassSerializer等等序列化器,實現StreamSerializer的read和write方法完成序列化和反序列化處理。

Date.class, new DateSerializer());
        BigInteger.class, new BigIntegerSerializer());
        BigDecimal.class, new BigDecimalSerializer());
        Externalizable.class, new Externalizer(enableCompression));
        Serializable.class, new ObjectSerializer(enableSharedObject, enableCompression));
        Class.class, new ClassSerializer());
        Enum.class, new EnumSerializer());

DataSerializable.class, dataSerializerAdapter);
        Portable.class, portableSerializerAdapter);
        Byte.class, new ByteSerializer());
        Boolean.class, new BooleanSerializer());
        Character.class, new CharSerializer());
        Short.class, new ShortSerializer());
        Integer.class, new IntegerSerializer());
        Long.class, new LongSerializer());
        Float.class, new FloatSerializer());
        Double.class, new DoubleSerializer());
        byte[].class, new TheByteArraySerializer());
        char[].class, new CharArraySerializer());
        short[].class, new ShortArraySerializer());
        int[].class, new IntegerArraySerializer());
        long[].class, new LongArraySerializer());
        float[].class, new FloatArraySerializer());
        double[].class, new DoubleArraySerializer());
        String.class, new StringSerializer());

每一個service都有自己的context,例如mapservice的MapServiceContext,它里面是保存了1份partition映照表的副本,在底層完成遷移之前其實不會更新,固然底層的map數據也不會1邊遷移1邊刪除,而是復制1份進行刪除



喜歡java的同學可以交個朋友:





生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 最近最新中文字幕大全免费10 | 国产一区在线视频 | 久久99精品久久久久久秒播 | 国产精品成人不卡在线观看 | 欧美小说图片 | 久久911| 免费一级淫片aaa片毛片a级 | 亚洲女人影院想要爱 | 国内精品久久精品 | 日韩久久中文字幕 | 豆国产96在线 | 亚洲 | 国产欧美精品区一区二区三区 | 色妞影视 | 国产成人久久精品二区三区 | 免费jizz在在线播放国产 | www.精品国产| 亚洲欧洲国产综合 | 日韩视频观看 | 伊人院| 日韩激情中文字幕一区二区 | 欧美亚洲图片小说 | 国产三级精品三级在线观看 | 国产日韩欧美精品一区二区三区 | 亚洲欧美综合精品成 | 求av网址 | 亚洲成人h | 欧美性受xxxx黑人 | 男女激情视频 | 麻豆国产96在线 | 日韩 | 久久久久国产精品免费免费不卡 | 久久www免费人成看国产片 | 中文字幕在第10页线观看 | 亚洲第一视频在线 | 久久精品免视看国产明星 | 青青草久热精品视频在线观看 | 午夜亚洲一区二区福利 | 亚洲最大福利视频 | 国产亚洲精品久久综合影院 | 午夜久久视频 | 久久亚洲欧美成人精品 | 中文字幕在线免费观看 |