SOFAStack (Scalable Open Financial Architecture Stack)是螞蟻金服自主研發(fā)的金融級分布式架構,包含了構建金融級云原生架構所需的各個組件,是在金融場景里錘煉出來的最佳實踐。SOFARegistry 是螞蟻金服開源的具有承載海量服務注冊和訂閱能力的、高可用的服務注冊中心,在支付寶/螞蟻金服的業(yè)務發(fā)展驅動下,近十年間已經(jīng)演進至第五代。本文為《剖析 | SOFARegistry 框架》第三篇,本篇作者 Yavin,來自考拉海購?!镀饰?| SOFARegistry 框架》系列由 SOFA 團隊和源碼愛好者們出品,項目代號:<SOFARegistry:Lab/>,文末包含往期系列文章。SOFARegistry:https://github.com/sofastack/sofa-registry集群成員管理是分布式系統(tǒng)中繞不開的話題。MetaServer 在 SOFARegistry 中,承擔著集群元數(shù)據(jù)管理的角色,用來維護集群成員列表。本文希望從 MetaServer 的功能和部分源碼切入剖析,為學習研究、或者項目中使用SOFARegistry 的開發(fā)者帶來一些啟發(fā),分為三個部分:MetaServer 作為 SOFARegistry 的元數(shù)據(jù)中心,其核心功能可以概括為集群成員管理。分布式系統(tǒng)中,如何知道集群中有哪些節(jié)點列表,如何處理集群擴容,如何處理集群節(jié)點異常,都是不得不考慮的問題。MetaServer 的存在就是解決這些問題,其在 SOFARegistry 中位置如圖所示:MetaServer 通過 SOFAJRaft 保證高可用和一致性,類似于注冊中心,管理著集群內(nèi)部的成員列表:MetaServer 基于 Bolt,通過 TCP 私有協(xié)議的形式對外提供服務,包括 DataServer,SessionServer 等,處理節(jié)點的注冊,續(xù)約和列表查詢等請求。同時也基于 Http 協(xié)議提供控制接口,比如可以控制 session 節(jié)點是否開啟變更通知,健康檢查接口等。成員列表數(shù)據(jù)存儲在 Repository 中,Repository 被一致性協(xié)議層進行包裝,作為 SOFAJRaft 的狀態(tài)機實現(xiàn),所有對 Repository 的操作都會同步到其他節(jié)點,通過 Rgistry 來操作存儲層。MetaServer 使用 Raft 協(xié)議保證數(shù)據(jù)一致性, 同時也會保持與注冊的節(jié)點的心跳,對于心跳超時沒有續(xù)約的節(jié)點進行驅逐,來保證數(shù)據(jù)的有效性。在可用性方面,只要未超過半數(shù)節(jié)點掛掉,集群都可以正常對外提供服務,半數(shù)以上掛掉,Raft 協(xié)議無法選主和日志復制,因此無法保證注冊的成員數(shù)據(jù)的一致性和有效性。整個集群不可用不會影響 Data 和 Session 節(jié)點的正常功能,只是無法感知節(jié)點列表變化。MetaServer 在啟動時,會啟動三個 Bolt Server,并且注冊 Processor Handler,處理對應的請求, 如下圖所示:- DataServer:處理 DataNode 相關的請求;
- SessionServer:處理 SessionNode 相關的請求;
- MetaServer:處理MetaNode相關的請求;
然后啟動 HttpServer,用于處理 Admin 請求,提供推送開關,集群數(shù)據(jù)查詢等 Http 接口,最后啟動 Raft 服務,每個節(jié)點同時作為 RaftClient 和 RaftServer,用于集群間的變更和數(shù)據(jù)同步。meta.server.sessionServerPort=9610 meta.server.dataServerPort=9611 meta.server.metaServerPort=9612 meta.server.raftServerPort=9614 meta.server.httpServerPort=9615 由上節(jié)可知,DataServer 和 SessionServer 都有處理節(jié)點注冊請求的 Handler。注冊行為由 Registry 完成。實現(xiàn)為:@Override public NodeChangeResult register(Node node) { StoreService storeService = ServiceFactory.getStoreService(node.getNodeType()); return storeService.addNode(node); } Regitsry 根據(jù)不同的節(jié)點類型,獲取對應的 StoreService ,比如 DataNode ,其實現(xiàn)為 DataStoreService 然后由 StoreService 存儲到 Repository 中,具體實現(xiàn)為:// 存儲節(jié)點信息 dataRepositoryService.put(ipAddress, new RenewDecorate(dataNode, RenewDecorate.DEFAULT_DURATION_SECS)); //... // 存儲變更事件 dataConfirmStatusService.putConfirmNode(dataNode, DataOperator.ADD); 調(diào)用 RepositoryService#put 接口存儲后,同時會存儲一個變更事件到隊列中,主要用于數(shù)據(jù)推送,消費處理。節(jié)點數(shù)據(jù)的存儲,其本質(zhì)上是存儲在內(nèi)存的哈希表中,其存儲結構為:// RepositoryService 底層存儲Map<String/*dataCenter*/, NodeRepository> registry;// NodeRepository 底層存儲Map<String/*ipAddress*/, RenewDecorate<T>> nodeMap; 將 RenewDecorate 存儲到該 Map 中,整個節(jié)點注冊的流程就完成了,至于如何和 Raft 協(xié)議進行結合和數(shù)據(jù)同步,下文介紹。節(jié)點移除的邏輯類似,將節(jié)點信息從該 Map 中刪除,也會存儲一個變更事件到隊列。不知道有沒有注意到,節(jié)點注冊的時候,節(jié)點信息被 RenewDecorate 包裝起來了,這個就是實現(xiàn)注冊信息續(xù)約和驅逐的關鍵:private T renewal; // 節(jié)點對象封裝 private long beginTimestamp; // 注冊事件 private volatile long lastUpdateTimestamp; // 續(xù)約時間 private long duration; // 超時時間 該對象為注冊節(jié)點信息,附加了注冊時間、上次續(xù)約時間、過期時間。那么續(xù)約操作就是修改 lastUpdateTimestamp ,是否過期就是判斷System.currentTimeMillis()-lastUpdateTimestamp>duration 是否成立,成立則認為節(jié)點超時進行驅逐。和注冊一樣,續(xù)約請求的處理 Handler 為 ReNewNodesRequestHandler ,最終交由 StoreService 進行續(xù)約操作。另外一點,續(xù)約的時候如果沒有查詢到注冊節(jié)點,會觸發(fā)節(jié)點注冊的操作。驅出的操作是由定時任務完成,MetaServer 在啟動時會啟動多個定時任務,詳見 ExecutorManager#startScheduler ,其中一個任務會調(diào)用 Registry#evict ,其實現(xiàn)為遍歷存儲的 Map,獲得過期的列表,調(diào)用 StoreService#removeNodes 方法,將他們從 Repository 中移除,這個操作也會觸發(fā)變更通知。該任務默認每3秒執(zhí)行一次。上文有介紹到,在處理節(jié)點注冊請求后,也會存儲一個節(jié)點變更事件,即:dataConfirmStatusService.putConfirmNode(dataNode, DataOperator.ADD); DataConfirmStatusService 也是一個由 Raft 協(xié)議進行同步的存儲,其存儲結構為:BlockingQueue<NodeOperator> expectNodesOrders = new LinkedBlockingQueue(); ConcurrentHashMap<DataNode/*node*/, Map<String/*ipAddress*/, DataNode>> expectNodes = new ConcurrentHashMap<>();
expectNodesOrders 用來存儲節(jié)點變更事件;expectNodes 用來存儲變更事件需要確認的節(jié)點,也就是說 NodeOperator 只有得到了其他節(jié)點的確認,才會從expectNodesOrders移除; 那么事件存儲到 BlockingQueue 里,哪里去消費呢?看源碼發(fā)現(xiàn),并不是想象中的使用一個線程阻塞的讀。在 ExecutorManager 中會啟動一個定時任務,輪詢該隊列有沒有數(shù)據(jù)。即周期性的調(diào)用 Registry#pushNodeListChange 方法,獲取隊列的頭節(jié)點并消費。Data 和 Session 各對應一個任務。具體流程如下圖所示:- 首先獲取隊列(expectNodesOrders)頭節(jié)點,如果為Null直接返回;
- 獲取當前數(shù)據(jù)中心的節(jié)點列表,并存儲到確認表(expectNodes);
- 提交節(jié)點變更推送任務(firePushXxListTask);
- 處理任務,即調(diào)用 XxNodeService 的 pushXxxNode 方法,即通過 ConnectionHandler 獲取所有的節(jié)點連接,發(fā)送節(jié)點列表;
- 收到回復后,如果需要確認,則會調(diào)用
StroeService#confirmNodeStatus 方法,將該節(jié)點從expectNodes中移除; - 待所有的節(jié)點從 expectNodes 中移除,則將此次操作從 expectNodesOrders 移除,處理完畢;
Data,Meta,Session Server 都提供 getNodesRequestHandler,用于處理查詢當前節(jié)點列表的請求,其本質(zhì)上從底層存儲 Repository 讀取數(shù)據(jù)返回,這里不在贅述。返回的結果的具體結構見 NodeChangeResult 類,包含各個數(shù)據(jù)中心的節(jié)點列表以及版本號。后端 Repository 可以看作SOFAJRaft 的狀態(tài)機,任何對 Map 的操作都會在集群內(nèi)部,交由 Raft 協(xié)議進行同步,從而達到集群內(nèi)部的一致。從源碼上看,所有的操作都是直接調(diào)用的 RepositoryService 等接口,那么是如何和 Raft 服務結合起來的呢?看源碼會發(fā)現(xiàn),凡是引用 RepositoryService 的地方,都加了 @RaftReference , RepositoryService 的具體實現(xiàn)類都加了 @RaftService 注解。其關鍵就在這里,其處理類為 RaftAnnotationBeanPostProcessor 。具體流程如下:在 processRaftReference 方法中,凡是加了 @RaftReference 注解的屬性,都會被動態(tài)代理類替換,其代理實現(xiàn) ProxyHandler 類,即將方法調(diào)用,封裝為 ProcessRequest ,通過 RaftClient 發(fā)送給 RaftServer。而被加了 @RaftService 的類會被添加到 Procssor類 中,通過 serviceId (interfaceName + uniqueId) 進行區(qū)分。RaftServer 收到請求后,會把它生效到 SOFAJRaft 的狀態(tài)機,具體實現(xiàn)類為 ServiceStateMachine ,即會調(diào)用 Procssor 方法,通過 serviceId 找到這個實現(xiàn)類,執(zhí)行對應的方法調(diào)用。當然如果本機就是主節(jié)點, 對于一些查詢請求不需要走 Raft 協(xié)議而直接調(diào)用本地實現(xiàn)方法。這個過程其實和 RPC 調(diào)用非常類似,在引用方發(fā)起的方法調(diào)用,并不會真正的執(zhí)行方法,而是封裝成請求發(fā)送到 Raft 服務,由 Raft 狀態(tài)機進行真正的方法調(diào)用,比如把節(jié)點信息存儲到 Map 中。在分布式系統(tǒng)中,集群成員管理是避不開的問題,有些集群直接把列表信息寫到配置文件或者配置中心,也有的集群選擇使用 zookeeper 或者 etcd 等維護集群元數(shù)據(jù),SOFARegistry 選擇基于一致性協(xié)議 Raft,開發(fā)獨立的MetaServer,來實現(xiàn)集群列表維護和變更實時推送,以提高集群管理的靈活性和集群的健壯性。
|