[置頂] 分布式內存網格Hazelcast源碼導讀
來源:程序員人生 發布時間:2016-06-27 16:27:04 閱讀次數:4978次
去年項目需要看了hazelcast源碼,當時記錄的筆記。
Node是節點的抽象,里面包括節點引擎、客戶端引擎、分區服務、集群服務、組播服務、連接收理、命令管理、組播屬性、節點配置、本地成員、tcp地址、組播地址、連接者、節點初始化器、管理中心、安全上下文、
Config類,包括GroupConfig、NetworkConfig、MapConfig、TopicConfig、QueueConfig、MultiMapConfig、ListConfig、SetConfig、ExecutorConfig、SemaphoreConfig、WanReplicationConfig、ServicesConfig、SecurityConfig、ListenerConfig、PartitionGroupConfig、ManagementCenterConfig、SerializationConfig。
GroupConfig集群用戶名及密碼DEFAULT_GROUP_PASSWORD = "dev-pass";DEFAULT_GROUP_NAME = "dev";
NetworkConfig網絡相干配置,包括InterfacesConfig、JoinConfig、SSLConfig等,其中JoinConfig包括MulticastConfig、TcpIpConfig、AwsConfig。
SecurityConfig客戶端登陸相干配置。
WanReplicationConfig集群復制配置,包括WanTargetClusterConfig配置,所有目標節點相干配置。
ClusterServiceImpl用于保護集群各個成員節點,實例化時把本地節點加入成員Map中,MulticastService
Node實例化時根據MulticastConfig使用組播加入1個組,MulticastService提供節點基于組播傳遞的,使用監聽模式每當接收到其他節點傳播的消息調用監聽器的onMessage,傳遞的參數為JoinMessage,它由序列化模塊提供轉化,默許NodeMulticastListener監聽器,會對集群的節點校驗,是不是是同個集群用戶名密碼。
MulticastJoiner用于加入集群節點,創建JoinRequest對象用MulticastService發送給其他成員,不斷發送加入集群要求直到node里面的masteraddress,如果配置里面指定了targetAddress就不用使用這類不斷發送的方式選舉主節點。MulticastService負責發送申請加入的組播消息和組播消息接收及處理的工作。
找主節點方式:循環發送JoinRequest消息向組內發送,如果已加入狀態且是master的節點接收到后會向外組播JoinMessage,告知其他節點我的組成員信息,還沒加入集群的成員則將自己節點的master地址設置為master發出的JoinMessage中的地址。所以JoinMessage只有master才會發出。其他已加入組的成員節點接收到JoinMessage類型消息則直接疏忽,
JoinMessage包括包版本、地址等信息。
SerializationService序列化轉化模塊,SerializationServiceBuilder生產者,默許字節寄存順序序列為ByteOrder.BIG_ENDIAN,即便用ByteArrayInputOutputFactory,
Packet為數據包結構,第1字節為版本號,第2字節表示為長度,第3字節表示類型。
SocketConnector會履行連接操作,使用的是阻塞讀寫。
TcpIpConnectionManager集成ConnectionManager,用于管理TCPIP連接
JoinMessage{
protected byte packetVersion;
protected int buildNumber;
protected Address address;
protected String uuid;
protected ConfigCheck configCheck;
protected int memberCount;
}
JoinRequest繼承JoinMessage,并添加credentials、tryCount屬性。
默許5701作為每一個節點對外暴露的端口。
ReadHandler的handle方法會初始化socketReader,集群成員保護的話使用SocketPacketReader讀取報文,
OperationServiceImpl類內部類RemoteOperationProcessor用于ResponseOperation對象
response.beforeRun();
response.run();
response.afterRun();
即會調用JoinRequestOperation的run方法,調用ClusterServiceImpl里面的handleJoinRequest方法,完成成員更新工作,接收獲功會向源節點返回SetMasterOperation、MemberInfoUpdateOperation等等更新源節點成員及節點joined狀態。
里面1切需要處理的都用run方法,會統1處理。
Node.start--TcpIpConnectionManager.start--InSelectorImpl.run(輸入即讀取選擇器,run不斷選擇可讀的對象并獲得attachment,調用它的handle方法,這里的attachment是在TcpIpConnection實例化的時候創建readHandle對象并調用start方法把readHandler當作attachment注冊到socketChannel里面的,readHandler調用handle方法,如果是集群協議CLUSTER則實例化SocketPacketReader,SocketPacketReader包括1個PacketReader,默許是DefaultPacketReader,用于將套接字讀出來的字節轉化成Packet包并調用handlePacket方法處理消息包,如果是集群成員消息包則調用handleMemberPacket方法處理,間接調用NodeEngineImpl的handlePacket方法,有3種不同頭部類型:Packet.HEADER_OP、Packet.HEADER_EVENT、Packet.HEADER_WAN_REPLICATION,HEADER_OP表示操作類型,會調用operationService的handleOperation方法處理包消息,OperationThread會1直跑調用OperationRunner處理需要履行的Operation,OperationRunner線程數由hazelcast.operation.thread.count設置,如果⑴則為機器cpu個數,接著會履行operation的beforeRun、run、operation如果設置了響應的話還要履行產生響應內容在返回給調用者、afterRun方法。)--OutSelectorImpl.run(輸出即寫入選擇器)--SocketAcceptor.run--TcpIpConnection.start(TcpIpConnection包括WriteHandler和ReadHandler)--
private void process(Object task) {
processedCount++;
if (task instanceof Operation) {
processOperation((Operation) task);
return;
}
if (task instanceof Packet) {
processPacket((Packet) task);
return;
}
if (task instanceof PartitionSpecificRunnable) {
processPartitionSpecificRunnable((PartitionSpecificRunnable) task);
return;
}
throw new IllegalStateException("Unhandled task type for task:" + task);
}
設計得不錯的是它直接支持各種Operation的傳遞,并履行,本質也是序列化反序列化,然后調用Operation的beforerun、run、afterrun方法,后面還會自動履行handleResponse方法,此方法用于向其他備份節點同步數據,這部份操作是在operation的afterrun之前完成備份。備份工作由OperationBackupHandler完成,backup方法,備份又分為同步備份和異步備份,相加等于總的需要備份數。map操作默許備份1份數據且是同步的,異步的默許為0.異步備份不會阻塞操作。備份的operation是Backup,它的run會履行PutOperation的run方法,即把數據放到緩存中并修改版本,這里的run不會再履行復制操作。
<hazelcast>
<map name="default">
<backup-count>1</backup-count>
<async-backup-count>1</async-backup-count>
</map>
</hazelcast>
operation的run一樣會在備份節點上履行,putoperation其實就是在本地緩存更新值。備份的進程沒有1個ack機制,信息傳輸的可靠性如何保證?
1個節點調用map的put操作時,會在本節點上緩存這個結果,再把operation傳輸到對應partition的第1個備份節點(這個節點可能就是自己本地節點)上,第1個節點接收到后備份到第2個節點上,所以默許就只有兩個備份數據。所以nearcache緩存是可能存在每一個節點上的。
PutOperation的afterRun方法主要是觸發1些攔截器,觸發各個節點的事件監聽器甚么的、更新各個備份節點緩存等等。
IOBalancer平衡io讀取寫入。
packet報文結構,1byte版本+2byte頭部+4byte分區+4byte長度+nbyte消息。
主節點1個1個發給其他成員節點關于成員的消息,其他節點進行更新。
ServiceManagerImpl用于管理所有的遠程,啟動時會注冊經常使用的1些服務,包括以下的服務,甚么map、queue甚么的
private void registerCoreServices() {
Node node = nodeEngine.getNode();
registerService(ClusterServiceImpl.SERVICE_NAME, node.getClusterService());
registerService(InternalPartitionService.SERVICE_NAME, node.getPartitionService());
registerService(ProxyServiceImpl.SERVICE_NAME, nodeEngine.getProxyService());
registerService(TransactionManagerServiceImpl.SERVICE_NAME, nodeEngine.getTransactionManagerService());
registerService(ClientEngineImpl.SERVICE_NAME, node.clientEngine);
registerService(QuorumServiceImpl.SERVICE_NAME, nodeEngine.getQuorumService());
}
private void registerDefaultServices(ServicesConfig servicesConfig) {
registerService(MapService.SERVICE_NAME, createService(MapService.class));
registerService(LockService.SERVICE_NAME, new LockServiceImpl(nodeEngine));
registerService(QueueService.SERVICE_NAME, new QueueService(nodeEngine));
registerService(TopicService.SERVICE_NAME, new TopicService());
registerService(ReliableTopicService.SERVICE_NAME, new ReliableTopicService(nodeEngine));
registerService(MultiMapService.SERVICE_NAME, new MultiMapService(nodeEngine));
registerService(ListService.SERVICE_NAME, new ListService(nodeEngine));
registerService(SetService.SERVICE_NAME, new SetService(nodeEngine));
registerService(DistributedExecutorService.SERVICE_NAME, new DistributedExecutorService());
registerService(AtomicLongService.SERVICE_NAME, new AtomicLongService());
registerService(AtomicReferenceService.SERVICE_NAME, new AtomicReferenceService());
registerService(CountDownLatchService.SERVICE_NAME, new CountDownLatchService());
registerService(SemaphoreService.SERVICE_NAME, new SemaphoreService(nodeEngine));
registerService(IdGeneratorService.SERVICE_NAME, new IdGeneratorService(nodeEngine));
registerService(MapReduceService.SERVICE_NAME, new MapReduceService(nodeEngine));
registerService(ReplicatedMapService.SERVICE_NAME, new ReplicatedMapService(nodeEngine));
registerService(RingbufferService.SERVICE_NAME, new RingbufferService(nodeEngine));
registerService(XAService.SERVICE_NAME, new XAService(nodeEngine));
registerCacheServiceIfAvailable();
}
Map通過AbstractMapServiceFactory創建,使用MapRemoteService處理遠程操作,RemoteService服務有兩個方法createDistributedObject和destroyDistributedObject方法,終究是通過MapProxyImpl。Map的partition策略在MapContainer里面。
PutOperation用于履行集群節點的put操作。1致性哈希根據key使用MurmurHash哈希計算出結果,再根據分區數(默許271)取余。每一個節點都是使用new ConcurrentHashMap<Data, Record>(1000, 0.75f, 1)寄存記錄。
成員組MemberGroup1共有若干個組,多少個成員就多少個組,最大的復制節點數為7,如果成員組小于7則使用成員組數量。
address[271][4]
Address[271][7]
1、addr0 addr9 addr3 addr4 null null null
2、.
.
.
.
.
271、addr1 addr2 addr3 addr4 null null null
對分配的結果嘗試重新分配,把過載的組分配1些成員給不足的組,并檢測每一個成員組內的節點數相差不會超過系數1.1,否則重新分組,盡量到達均勻。
終究構成1張表,271個分區每一個分區都對應著若干個復制的成員地址。
int avgPartitionPerGroup = partitionCount / groupSize;
就是說最多4個組,假設5個結點,分組情況為2,1,1,1,則每一個組分到的partition個數為271/4,可能有些組多1個partition。
1共有271條線程處理operation,OperationThread,里面有隊列ScheduleQueue,線程會不斷處理,ScheduleQueue用于operation履行緩沖隊列,里面的有兩種隊列,normalQueue和priorityQueue,1種用于正常的排隊,1種用于設置優先級的隊列,take方法會優先從priorityQueue中獲得需要優先處理的operation。
有個同步復制、異步復制。
OperationServiceImpl.createInvocationBuilder(此方法有兩個,1個用于partition、1個用于指定的address),-> 實例化1個InvocationBuilderImpl對象,調用invoke方法會創建Invocation,Invocation包括PartitionInvocation(針對分區)和TargetInvocation(針對指定節點)兩種。
在集群中傳輸1切以Data情勢傳輸。
Map的服務名為hz:impl:mapService。
假設HazelcastInstance履行instance.getMap("customers")則,通過HazelcastInstanceProxy的getMap方法,代理是調HazelcastInstanceImpl的getMap方法,調用getDistributedObject方法,它會通過ProxyService(它代理了所有服務)代理找到MapService,調用mapservice的createDistributedObject方法創建DistributedObject,間接調用MapRemoteService的createDistributedObject方法創建MapProxyImpl,調用MapProxyImpl的put方法把數據放到集群,主要以下操作
final Data key = toData(k, partitionStrategy);
final Data value = toData(v);
final Data result = putInternal(key, value, ttl, timeunit);
return (V) toObject(result);
分別將key和value轉化為Data,實際上是序列化,方便網絡傳輸。putInternal方法使用PutOperation,并且要根據key計算出partitionId,接著再完成operation的調用。
SerializationServiceImpl提供各種類型的序列化支持,toData提供由object到Data的轉化,toObject提供由Data到object的轉化,Data默許是使用DefaultData,DefaultData里面其實就是包括了1個字節數組還有不同的偏移量,例如從幾位到幾位表示類型,序列化工作其實也跟這類類似,把某1對象的相干信息轉化為字節數組,傳遞到目的地后再根據約定反向組裝成指定對象。
OperationThread專門用于履行接收到的要求,process方法,可以有3種要求,包括Operation、Packet、PartitionSpecificRunnable,分別不同的處理邏輯,Operation則直接反序列化后調用beforeRun、run、afterRun等方法。
HealthMonitor是獨立1條線程用于監控健康,包括
private class HealthMetrics {
private final long memoryFree;
private final long memoryTotal;
private final long memoryUsed;
private final long memoryMax;
private final double memoryUsedOfTotalPercentage;
private final double memoryUsedOfMaxPercentage;
//following three load variables are always between 0 and 100.
private final double processCpuLoad;
private final double systemLoadAverage;
private final double systemCpuLoad;
private final int threadCount;
private final int peakThreadCount;
private final long clusterTimeDiff;
private final int asyncExecutorQueueSize;
private final int clientExecutorQueueSize;
private final int queryExecutorQueueSize;
private final int scheduledExecutorQueueSize;
private final int systemExecutorQueueSize;
private final int eventQueueSize;
private final int pendingInvocationsCount;
private final double pendingInvocationsPercentage;
private final int operationServiceOperationExecutorQueueSize;
private final int operationServiceOperationPriorityExecutorQueueSize;
private final int operationServiceOperationResponseQueueSize;
private final int runningOperationsCount;
private final int remoteOperationsCount;
private final int proxyCount;
private final int clientEndpointCount;
private final int activeConnectionCount;
private final int currentClientConnectionCount;
private final int connectionCount;
private final int ioExecutorQueueSize;
}
PerformanceMonitor表示性能監控,監控的參數包括inSelector的已讀取數量,outSelect的已寫入事件數量,OperationService相干的性能參數,例如掛起的調用比例、整體調用比例、最大調用數量、271個分區線程已履行數量、271個分區operation線程正在掛起線程(即任務排隊隊列中的數量),常規operation線程的排隊隊列數量、常規operation線程已履行數量、響應線程已履行數量、響應線程排隊隊列數量。
hazelcast核心——Node,包括各種各樣重要的基礎服務,日志、節點關閉鉤子、序列化服務、節點引擎、客戶端引擎、分區服務、集群服務、廣播服務、連接收理服務、命令服務、配置文件服務、群組屬性服務、本節點地址、本地集群成員對象、主節點地址、hazelcast實例援用、日志服務、集群節點加入服務、節點擴大服務、管理中心服務、安全上下文、創建信息服務、版本校訂服務、hazelcast線程組。
public class Node {
private final ILogger logger;
private final NodeShutdownHookThread shutdownHookThread = new NodeShutdownHookThread("hz.ShutdownThread");
private final SerializationService serializationService;
public final NodeEngineImpl nodeEngine;
public final ClientEngineImpl clientEngine;
public final InternalPartitionService partitionService;
public final ClusterServiceImpl clusterService;
public final MulticastService multicastService;
public final ConnectionManager connectionManager;
public final TextCommandServiceImpl textCommandService;
public final Config config;
public final GroupProperties groupProperties;
public final Address address;
public final MemberImpl localMember;
private volatile Address masterAddress = null;
public final HazelcastInstanceImpl hazelcastInstance;
public final LoggingServiceImpl loggingService;
private final Joiner joiner;
private final NodeExtension nodeExtension;
private ManagementCenterService managementCenterService;
public final SecurityContext securityContext;
private final ClassLoader configClassLoader;
private final BuildInfo buildInfo;
private final VersionCheck versionCheck = new VersionCheck();
private final HazelcastThreadGroup hazelcastThreadGroup;
}
NodeEngineImpl作為節點引擎包括了許多服務,重要的例如,事件服務、operation服務、履行服務、等待通知服務、service(內置許多service,例如Map、Queue,用戶自定義的服務可配置到hazelcast.xml,啟動時會加載進來)管理服務、事務管理服務、代理服務、wan復制服務、包傳輸服務、證明人服務。
public class NodeEngineImpl implements NodeEngine {
private final Node node;
private final ILogger logger;
private final EventServiceImpl eventService;
private final OperationServiceImpl operationService;
private final ExecutionServiceImpl executionService;
private final WaitNotifyServiceImpl waitNotifyService;
private final ServiceManagerImpl serviceManager;
private final TransactionManagerServiceImpl transactionManagerService;
private final ProxyServiceImpl proxyService;
private final WanReplicationService wanReplicationService;
private final PacketTransceiver packetTransceiver;
private final QuorumServiceImpl quorumService;
}
ServiceManagerImpl用于管理所有服務,啟動時默許會實例化核心的service、默許的service,如果用戶通過配置文件配置了自定義service則也會實例化。啟動時注冊行將service實例put到ConcurrentMap中,核心service包括ClusterServiceImpl、InternalPartitionService、ProxyServiceImpl、TransactionManagerServiceImpl、ClientEngineImpl、QuorumServiceImpl。默許service包括MapService、LockService、QueueService、TopicService、ReliableTopicService、MultiMapService、ListService、SetService、DistributedExecutorService、AtomicLongService、AtomicReferenceService、CountDownLatchService、SemaphoreService、IdGeneratorService、MapReduceService、ReplicatedMapService、RingbufferService、XAService。如果允許還將把緩存服務CacheService添加進來。
public final class ServiceManagerImpl implements ServiceManager {
private final ConcurrentMap<String, ServiceInfo> services = new ConcurrentHashMap<String, ServiceInfo>(20, .75f, 1);
}
節點加入,Joiner負責加入工作,例如廣播則使用MulticastJoiner、單播則使用TcpIpJoiner、AWS則使用TcpIpJoinerOverAWS。Node啟動時會根據情況啟動個線程,
multicast只是做節點發現工作,真實的節點加入工作是交由tcpip做,向主節點發送加入要求,主節點把要求節點添加到成員列表中,然后返回要求節點讓它把主節點地址設置為本人。
DefaultSerializers包括DateSerializer、ObjectSerializer、ClassSerializer等等序列化器,實現StreamSerializer的read和write方法完成序列化和反序列化處理。
Date.class, new DateSerializer());
BigInteger.class, new BigIntegerSerializer());
BigDecimal.class, new BigDecimalSerializer());
Externalizable.class, new Externalizer(enableCompression));
Serializable.class, new ObjectSerializer(enableSharedObject, enableCompression));
Class.class, new ClassSerializer());
Enum.class, new EnumSerializer());
DataSerializable.class, dataSerializerAdapter);
Portable.class, portableSerializerAdapter);
Byte.class, new ByteSerializer());
Boolean.class, new BooleanSerializer());
Character.class, new CharSerializer());
Short.class, new ShortSerializer());
Integer.class, new IntegerSerializer());
Long.class, new LongSerializer());
Float.class, new FloatSerializer());
Double.class, new DoubleSerializer());
byte[].class, new TheByteArraySerializer());
char[].class, new CharArraySerializer());
short[].class, new ShortArraySerializer());
int[].class, new IntegerArraySerializer());
long[].class, new LongArraySerializer());
float[].class, new FloatArraySerializer());
double[].class, new DoubleArraySerializer());
String.class, new StringSerializer());
每一個service都有自己的context,例如mapservice的MapServiceContext,它里面是保存了1份partition映照表的副本,在底層完成遷移之前其實不會更新,固然底層的map數據也不會1邊遷移1邊刪除,而是復制1份進行刪除
喜歡java的同學可以交個朋友:
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈