Mit6.824-paxos lab实现

平常说paxos一般是指lamport提出的basic paxos算法,后续所有的multi paxos 算法或者协议都是basic paxos算法的工程实现,以raft为例,都是借鉴了basic paxos 两阶段+投票的思想,但是在集群中各个节点的角色设置,如何处理网络故障,数据snapshot等方面做了basic paxos协议未提及的优化和实现。
basic paxos是为了解决分布式系统各个节点在不能保证可靠且实时通信的条件下,各个节点可以并发的提议请求,但是最终所有节点都对一个唯一的结果达成一致的算法。这是一个充满浪漫主义的算法,类比一些multi paxos 或者raft 算法就多了很多限制,比如加了所有节点中只有一个领导者可以提议请求,那么,整个一致性过程会变的简单的多。

paxos 回顾

basic paxos处理的基本问题:

  • 有一台或多台服务器会并发的提议(propose)一些值。
  • 系统必须通过一定机制从propose的值中选定(chose)一个值。
  • 只有一个值能被选定(chosen)。即不允许整个系统先选定了值A,后面又改选定值B。

basic的基本过程如下:

image

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  proposer(v):
while not decided:
choose n, unique and higher than any n seen so far
send prepare(n) to all servers including self
if prepare_ok(n_a, v_a) from majority:
v' = v_a with highest n_a; choose own v otherwise
send accept(n, v') to all
if accept_ok(n) from majority:
send decided(v') to all

acceptor's state:
n_p (highest prepare seen)
n_a, v_a (highest accept seen)

acceptor's prepare(n) handler:
if n > n_p
n_p = n
reply prepare_ok(n_a, v_a)
else
reply prepare_reject

acceptor's accept(n, v) handler:
if n >= n_p
n_p = n
n_a = n
v_a = v
reply accept_ok(n)
else
reply accept_reject

paxos算法本身的过程很简单,都说paxos难,难点有二:

  • paxos难以理解,主要难以理解为什么是正确的
  • 难以实现一个完全正确的paxos过程,上述伪代码中,集群中所有节点都会同时运行着所有的过程,每行代码之间都可能会并发和同步。

Mit6.824 paxos实验

Mit6.824 关于 paxos的实验就是为了实现一个基本正确的paxos协议。之所以说基本正确是因为其提供的测试样例肯定没有考虑的所有的边界情况。但是,实现一个基本正确的paxos算法有助于理解上述提到的两个难点。整个算法的实现完全按照理论部分的过程,对于各个节点上唯一的提议号,我采用了初始序号+节点id的形式,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type ProposeId struct {
Ballot int
Id int
}

func greaterProposeId(a, b ProposeId) bool {
if (a.Ballot > b.Ballot) || ((a.Ballot == b.Ballot) && a.Id > b.Id) {
return true
}
return false
}

func equalProposeId(a, b ProposeId) bool {
return a.Ballot == b.Ballot && a.Id == b.Id
}

用到的实验的版本是2014年版。整个实验过程大体如下:

这个实现中必须要实现以下的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Make是用来创建一个paxos节点的,也就是说一个paxos节点的初始化函数
px = paxos.Make(peers []string, me int)

// 这个是用来对一个实例发起一致性过程的函数,seq代表paxos中的序列号,v代表这个序列号需要达成的内容是什么。。比如 100,“asdf”,都行
px.Start(seq int, v interface{}) // start agreement on new instance

// 用来获取一个节点是否认为已经决定了一个实例,如果已经决定了,那么商定的值是多少。Status()应该只检查本地节点的状态;它不应该与其他Paxos节点联系。
px.Status(seq int) (decided bool, v interface{}) // get info about an instance

px.Done(seq int) // ok to forget all instances <= seq

px.Max() int // highest instance seq known, or -1

px.Min() int // instances before this have been forgotten
  • 程序调用Make(peer,me)来创建Paxos peers。peer参数包含所有peers的端口(包括这个),me参数是对等点数组中这个对等点的索引。Start(seq,v)请求Paxos启动实例seq协议,建议值v;Start()应该立即返回,而不是等待协议完成。应用程序调用Status(seq)来查明Paxos对等方是否认为实例已经达成协议,如果已经达成协议,那么商定的值是多少。Status()应查询本地Paxos对等点的状态并立即返回;它不应该与其他同行通信。应用程序可以为旧实例调用Status()(但是请参阅下面关于Done()的讨论)。

  • 实现需要能够同时在多个实例的协议上进行。也就是说,如果程序中各个peer使用不同的序列号同时调用Start(),那么实现应该同时为它们运行Paxos协议。在启动协议(例如i +1)之前,不应该等待协议(例如i)完成。每个实例都应该有自己独立的Paxos协议执行。

  • 长时间运行的基于paxos的服务器必须忘记不再需要的实例,并释放存储这些实例信息的内存。如果应用程序仍然希望能够调用该实例的Status(),或者另一个Paxos对等方可能尚未就该实例达成协议,则需要一个实例。您的Paxos应该通过以下方式实现实例的释放。当某个特定的对等应用程序不再需要为任何实例调用Status()时,它应该调用Done(x)。该Paxos对等点还不能丢弃实例,因为其他一些Paxos对等点可能还没有同意该实例。因此,每个Paxos对等点应该告诉其他对等点其本地应用程序提供的最高已完成参数。然后,每个Paxos对等点之间都有一个Done值。它们应该找到最小的那个值,并丢弃序列号<=该最小值的所有实例。Min()方法返回这个最小序列号加1。

  • 实现可以使用协议协议包中的Done值;也就是说,对等点P1可以在下一次P2向P1发送协议消息时只学习P2的最新Done值。如果调用Start()的序列号小于Min(),则应该忽略Start()调用。如果使用小于Min()的序列号调用Status(),则Status()应该返回false(表示没有达成一致了)。

    这里给出一个比较合理的实现步骤:

  • 在Paxos中向Paxos结构体添加元素。根据课程伪代码,去保存你需要的状态。您需要定义一个结构来保存关于每个协议实例的信息。

  • 基于lecture伪代码,为Paxos协议消息定义RPC参数/应答类型。rpc必须包含它们所引用的协议实例的序列号。记住RPC结构中的字段名必须以大写字母开头。

  • 为实例编写一个驱动Paxos协议的proposal函数,以及实现acceptor的RPC处理程序。根据需要在自己的线程中为每个实例启动一个proposal函数(例如,in Start())。

提示:

  • 一个以上的Paxos实例可能在一个给定的时间执行,start 和 decide 过程需要容忍乱序执行。
  • 为了通过假定网络不可靠的测试,您的paxos应该通过函数call调用本地接受器,而不是RPC。
  • 请记住,多个peer可能在同一个实例上调用Start(),可能使用不同的提议值。peer序甚至可以为已经确定的实例调用Start()。
  • 在开始编写代码之前,考虑一下您的paxos将如何忘记(丢弃)关于旧实例的信息。每个Paxospeer将需要在允许删除单个实例记录的数据结构中存储实例信息(以便Go垃圾收集器可以释放/重用内存)。
  • 不需要来处理Paxos程序在崩溃后需要重新启动的情况。如果一个Paxos peer崩溃,它将永远不会重新启动。
  • 让每个Paxos 节点为每个未决定的实例启动一个线程,该实例的任务是作为一个提议者最终驱动实例达成协议。
  • 一个Paxos对等点可以同时作为同一个实例的接受者和提议者。将这两个活动尽可能分开。
  • 一个提议者需要一种方法来选择一个比目前所见的任何一个都要高的提议数。这是提议人和接受人应该分开的规则的合理例外。如果提案RPC处理程序拒绝RPC,则返回已知的最高提案号,以帮助调用者下次选择更高的提案号。

Mit6.824-MapReduce 原型实现

进入21世纪以来,谷歌带领着工业界和学术界一起迈向大数据时代。大数据的核心为分布式,而在谷歌的三驾马车三篇论文发表之前,工业界和学术界都没有意识到分布式系统中的重点在于容错,无论是分布式计算,分布式调度还是分布式存储。mapreduce是谷歌在分布式计算领域的开篇之作,主要介绍了在计算调度以及容错方面做的工作。原理方面,一图以蔽之:

image

mapreduce 原型框架

MIT 6.824 lab1 主要实现了一个简化版的mapreduce框架,包括计算,调度和容错过程。实验已经完成并通过了所有测试,代码见github
image

下面介绍一下整个实验的框架理解一下上面原理图的各个过程:

  1. 客户端提供了多个输入文件、一个map函数、一个reduce函数和reduce任务的数量(nReduce)。
  2. maprecude的中心角色master首先它启动一个RPC服务器(master_rpc.go),并等待工作节点worker注册。 当tasks可用时(在步骤4和5中),schedule()函数决定如何将这些tasks分配给workers,以及如何处理worker故障。
  3. master将每个输入文件视为一个map任务,并调用doMap()对每个map任务执行至少一次,可以直接(在使用Sequential()时)执行对应的操作,也可以通过向worker发出DoTask RPC来执行。 每次对doMap()的调用都会读取相应的文件,对该文件的内容调用map函数,并将生成的key/value对写入nReduce个中间文件中。doMap()通过hash每个key以选择中间文件,从而选择将处理该key的reduce任务。所有map任务完成后,会产生nMap x nReduce个文件。
  4. 每个worker必须能够读任何其他worker写入的文件。实际部署中使用分布式存储系统(如GFS)来允许这种访问,即使worker运行在不同的节点上。在这个实验中,所有的worker在同一个节点上,并公用本地文件系统。
  5. master接下来调用doReduce(),对每一项reduce任务至少执行一次。与doMap()一样,它可以直接执行,也可以通过一个worker执行。reduce任务r的doReduce()从每个map任务收集第r个中间文件,并为这些文件中出现的每个key调用reduce函数。reduce任务生成nReduce个结果文件。
  6. master调用mr.merge() 将前面步骤生成的所有nReduce个文件合并到一个输出中文件中。
  7. master向每个worker发送一个关闭RPC,然后关闭它自己的RPC服务

下面简要介绍一个本实验各个部分的内容,主要分为计算,调度和容错三个部分。

Part1 实现map和reduce过程

本part主要实现划分map任务输出的函数,以及为reduce任务收集所有输入的函数。这个功能主要由common_map中的doMap()函数完成以及common_reduce中的doReduce()函数完成。

Part3 分布式的mapreduce任务调度

part1中主要实现了一次运行一个map和reduce任务。Map/Reduce最大的卖点之一是它可以自动并行化执行各个tasks,而不需要使用人员做任何额外的工作。在本part中将完成一个分布式版本的MapReduce,它通过一组在多个核上并行运行的工作线程来分割工作。
虽然不像在实际的Map/Reduce部署中那样分布在多台机器上,将使用RPC来模拟分布式计算。

  • mapreduce/master.go完成了管理MapReduce任务的大部分工作。mapreduce/worker中主要实现了worker线程的完整逻辑。mapreduce/common_rpc.go用于处理RPC。
  • 本part主要工作是在mapreduce/schedule.go中实现schedule()。在MapReduce作业中,master调用schedule()两次,一次是在Map阶段,一次是在Reduce阶段。schedule()的工作是将任务分配给可用的worker。通常会有比工作线程更多的任务,因此schedule()必须给每个工作线程一个任务序列,一次一个。schedule()应该等待所有任务完成,然后返回。
  • schedule()通过读取它的registerChan参数来了解worker集合。该channel保存了每个worker的RPC地址。有些worker可能在schedule()调用之前就存在,有些worker可能在schedule()运行时启动;所有的都会出现在registerChan上。
  • schedule()应该使用所有的worker,包括在它启动后出现的worker。schedule()通过发送一个DoTask RPC来告诉worker执行一个任务。这个RPC的参数是由mapreduce/common_rpc.go中的DoTaskArgs定义的。File参数仅用于Map任务,是要读取的文件的名称;schedule()可以在mapFiles中找到这些文件名。

Part4 处理worker故障

本part中,主要实现master处理失败的worker以进任务的重新分配。

  • 如果一个worker在处理来自master的RPC时失败,master对应的调用函数call()将最终由于超时而返回false。在这种情况下,master应该把分配给失败worker的任务重新分配给另一个worker。
  • RPC失败并不一定代表worker没有执行任务;worker可能已经执行了但是响应丢失了,或者worker可能仍然在执行,但是master的RPC超时了。因此,可能会发生两个worker接收相同的任务,计算并生成输出。对于给定的输入,需要两次调用map或reduce函数来生成相同的输出(即map和reduce函数是“幂等性”),所以如果后续处理有时读取一个输出,有时读取另一个输出,就不会出现不一致的情况。

Mysql-mgr-选主优化

选主优化

原生选主策略

MGR在单主模式下,当主节点退出集群时会触选主过程,默认的选主策略在选主时会比较节点的server_uuid 和 group_replication_member_weight, 选择哪个secondary作为新的主节点。

首先比较group_replication_member_weight,值最高的成员被选为新的主节点。如果相同,按照server_uuid排序,排序在最前的被选择为主节点。

此外可以通过group_replication_set_as_primary() 函数指定主节点:

1
select group_replication_set_as_primary(server_uuid);

查看group_replication_member_weight的值,范围为0-100,默认为50:

1
show variables like 'group_replication_member_weight';

设置group_replication_member_weight的值:

1
set global group_replication_member_weight = 200;

当想要设置的值小于0时,会自动设置为0,当想要设置的值大于100时,会默认等于100。调整权重后不能自动识别进行主从切换,当新加入的从节点权重较现有的主节点高时也不会发生主从切换,只有当主节点退出重新触发选主时才可以进行。

优化思路

在原生选主方式中,希望通过设置group_replication_member_weight参数,希望在发生主从切换时,将硬件配置、性能较好的节点选为主节点,因为理论上这种节点的选择回放速度最快,系统的响应时间最快。但在现实应用场景中,配置较好的节点回放速度不一定更快。因此当发生选主时,可以判断一下集群中所有节点回放速度,选择最快的节点作为主节点。当回放速度相同或者回放过程完成时,再按照原生策略进行选主。

回放速度可以通过gtid反应出来,在同一时刻,从节点中已执行的gtid值较大的节点回放速度加快,因此选主时选则gtid最新的节点为主节点。

通过增添和设置参数 primary_election_self_adaption=ON/OFF控制是否需要根据gtid按照节点执行速度选主,还是按照默认的权重+uuid方式选主。默认为OFF

新加入集群的节点参数primary_election_self_adaption必须与集群中参数相同。

优化细节

MGR中class Group_member_info 及 class Group_member_info_manager 保存组中所有成员的状态信息,MGR中已经回放完成的事务对应的gtid保存在executed_gtid_set中。所有节点中executed_gtid_set值最大的节点回放速度最快。一个节点可以通过以下两种方式获取其他节点的executed_gtid_set:

  • method 1:流控时可以得知其他节点的execute_gtid set。但是选主过程中只要最新时刻的gtid。通过流控过程传播的gitd需要不停的更新各个节点的最新gtid,代价较高。如果只在发生选主时才去读取流控得到的最新gtid,此时流控交互 的信息可能并没有发生传播。

  • method 2: 选主时进入视图变更阶段,视图变更阶段的前期需要与其他节点交互获取其节点的member_weight,execute_gtid_ set等信息,并且进程选主过程的节点不再接受新的写请求,此时交互的信息即为当前集群中各个节点状态的最新信息。

  • 选择方法2的方式获取其他节点的execute_gtid_set。Group_member_info 中添加成员变量primary_election_self_adaption,用来表示当前的选主方式。

  • class Group_member_info 中 增加成员函数comparator_group_member_executed_gtid(), 用于选主排序sort函数的仿函数。通过子集的方式判断节点间gitd的大小关系。

具体实现

Mysql Mgr模块研究之孟子协议实现:xcom模块

xcom是MGR中实现一致性协议的核心模块,借鉴Mencius算法的思想实现了一套类paxos协议。xcom首先通过GCS接受到MGR封装的事务消息,然后后通过paxos协议在各个节点上达成一致,最终发送再发送给上层MGR。因此MGR中与xcom相关的模块主要包括与GCS的交互接口,xcom的核心模块paxos实现。在xcom中,首先实现了一套协程机制,xcom所有过程都使用这套协程机制。因此,本文主要介绍一下与上层MGR的交互过程,paxos协议的实现,协程机制的实现,最后看一下paxos各个过程如何通过这套协程实现。

1. 与上层MGR的交互

MGR中的事务以Paxos请求的方式发送给xcom,Paxos通过两阶段协议(propose、accept)或者三阶段的(prepare、propose、accept)方式使各节点达成一致后返回给MGR在进行后续处理。

在Gcs_xcom_communication::send_message()接口中会将消息类型设置为Gcs_internal_message_header::CT_USER_DATA,交由 Gcs_xcom_proxy_impl::xcom_client_send_data()发送。

xcom_client_send_data 将事务消息放入m_xcom_input_queue(无锁MPSC队列)中,然后通过与xcom的socket连接,通知xcom模块有消息进入队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool Gcs_xcom_proxy_impl::xcom_client_send_data(unsigned long long len,
char *data) {
/* We own data. */

....

if (len <= std::numeric_limits<unsigned int>::max()) {
assert(len > 0);
app_data_ptr msg = new_app_data();
/* Takes ownership of data. */
msg = init_app_msg(msg, data, static_cast<uint32_t>(len));
successful = xcom_input_try_push(msg){

m_xcom_input_queue.push(data); // 将消息放入MGR向xcom发送消息的缓冲队列
if (pushed) successful = ::xcom_input_signal(); // 通过tcp通知xcom模块缓冲队列中存入了数据
return successful;
}

if (!successful) {
MYSQL_GCS_LOG_DEBUG("xcom_client_send_data: Failed to push into XCom.");
}
}
....
}

在xcom中,通过本地协程local_server,等待socket请求的到来,然后从m_xcom_input_queue队列中读取消息,调用dispatch_op进行处理,对于op为client_msg的消息,dispatch_op会进一步调用handle_client_msg()插入到prop_input_queue请求channel的末尾。每个MGR节点的Xcom有一个proposer_task,会获取prop_input_queue头部的请求,然后进入paxos的流程。

local_server的主要过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int local_server(task_arg arg) {
.....
while (!xcom_shutdown) {
/* 等待客户端信号,以便可以从m_xcom_input_queue中读取数据. */
TASK_CALL(task_read(&ep->rfd, ep->buf, 1024, &ep->nr_read));
ep->request = xcom_try_pop_from_input_cb();
while (ep->request != NULL) {
....
dispatch_op(NULL, ep->request_pax_msg, &ep->internal_reply_queue);
....
}

}
}

xcom中的executor_task会按序获取已经完成Paxos处理流程的事务请求,调用execute_msg()执行该请求,对于app_type类型的请求,会调用deliver_to_app(),该函数最终调用了在MGR初始化时注册的xcom_data_receiver处理函数cb_xcom_receive_data(),发送到上层客户端(GCS)。

下面主要介绍一下事务消息在xcom模块中的处理过程,即paxos的实现。

2. paxos核心协议流程(ing)

Mencius协议回顾

basic paxos

basic paxos

  • multi paxos:集群中设置leader节点,可以跳过prepare阶段(变成两阶段);leader故障时走basic paxos 协议的过程(三阶段)。

    多领导者

image

simple paxos

  • coordinator(leader):在自己负责的日志序列中对应位置,可以提议客户端请求和no-op操作
  • 非 coordinator:只能提议no-op

image

优化

  • 将no-op消息piggyback到accept阶段

  • 将no-op消息piggyback到未来的propose阶段

    image

目前xcom模块已经实现了Mencius算法中的paxos和simple paxos部分,但由于Mencius算法中优化算法依赖于异步的FIFO通信机制(Asynchronous FIFO communication channel)来保证正确性,因此xcom中并没有实现。

2.1 主要数据结构

在xcom模块中比较重要的数据结构如下:

struct site_def 描述了一个有时效性的MGR/Paxos集群配置,每个节点维护了一个唯一的site_def对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 描述了一个有时效性的MGR/Paxos集群配置,每个节点维护了一个site_def对象
struct site_def {
synode_no start; /* Config is active from this message number */
synode_no boot_key; /* The message number of the original unified_boot */
node_no nodeno; /* Node number of this node */
node_list nodes; /* Set of nodes in this config */
server *servers[NSERVERS]; /* Connections to other nodes */
detector_state detected; /* Time of last incoming message for each node */
node_no global_node_count; /* Number of live nodes in global_node_set */
node_set global_node_set; /* The global view */
node_set local_node_set; /* The local view */
int detector_updated; /* Has detector state been updated? */
xcom_proto x_proto;
synode_no delivered_msg[NSERVERS];
double install_time;
xcom_event_horizon event_horizon;
};

主要内容包括集群节点生效的开始消息编号,本节点的编号,在当前视图配置下节点的编号,当前视图中节点的集合,与本节点保持连接的节点,每个节点最新的消息发送时间,全局视图中节点的数量及集合,本节点视图中节点的数量及集合,与正常的paxos协议执行相关的字段包括servers和delivered_msg。前者维护了本节点到集群中其他节点的连接,后者表现各个节点的消息完成状态。

经过paxos处理的消息都有一个消息号synode_no。nodeno是节点在Paxos集群中的序号,该序号是Paxos消息synode_no的组成部分,synode_no的另一部分为消息序号,结合在一起synode_no表现为{X, N},其中X为消息序号,N为节点序号。

1
2
3
4
5
6
7
8
9
10
struct synode_no {
uint32_t group_id; // 用于判断新加入的节点是否是同一个组,MGR节点间交互通过xcom进行,而MGR规定的不同组不能构成集群的要求,只能在xcom中实现
uint64_t msgno;
node_no node;
};

int synode_gt(synode_no x, synode_no y) {
assert(x.group_id == 0 || y.group_id == 0 || x.group_id == y.group_id);
return (x.msgno > y.msgno) || (x.msgno == y.msgno && x.node > y.node);
}

image

在进行prepare、propose、accept和learn等Paxos操作时,需要依赖site_def中的server对象发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* Server definition */
struct server {
int garbage;
int refcnt;
char *srv; /* Server name */
xcom_port port; /* Port */
connection_descriptor con; /* Descriptor for open connection,本节点到指定节点的长连接 */
double active; /* Last activity */
double detected; /* Last incoming */
channel outgoing; /* Outbound messages */
task_env *sender; /* The sender task */
task_env *reply_handler; /* The reply task */
srv_buf out_buf;
int invalid;
int number_of_pings_received; /* Number of pings received from this server */
double last_ping_received; /* Last received ping timestamp */
};

跟消息发送相关的字段主要有:srv是server指向节点的名称,con维护了本节点到server指向节点的长连接,outgoing队列用于缓存本节点需发送到server指向节点的消息,sender是在server初始化时注册的sender_task协程,用来负责从outgoing队列中读取消息发送到server指向节点。在Paxos正常运行期间,server对象是集群中节点间消息发送的媒介,通过con建立2个节点间的联系,发送端即为sender字段对应的sender_task协程,接收端就是server对应节点上的acceptor_learner_task协程。需要注意的是,本节点也会为自己创建一个server对象,此时sener字段即为local_sender_task。

server维护了Paxos集群各节点间的通信管道,pax_msg就是管道中传输的消息。其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct pax_msg{
node_no to; /* To node */
node_no from; /* From node */
uint32_t group_id; /* Unique ID shared by our group */
synode_no max_synode; /* Gossip about the max real synode */
start_t start_type; /* Boot or recovery? DEPRECATED */
ballot reply_to; /* Reply to which ballot */
ballot proposal; /* Proposal number */
pax_op op; /* Opcode: prepare, propose, learn, etc */
synode_no synode; /* The message number */
pax_msg_type msg_type; /* normal, noop, or multi_noop */
bit_set *receivers;
app_data *a; /* Payload */
snapshot *snap; /* Not used */
gcs_snapshot *gcs_snap; /* gcs_snapshot if op == gcs_snapshot_op */
client_reply_code cli_err;
bool force_delivery; /* Deliver this message even if we do not have majority */
int32_t refcnt;
synode_no delivered_msg;
xcom_event_horizon event_horizon;
synode_app_data_array requested_synode_app_data;
}

from和to字段标识了消息的源节点和目标节点,to字段可能为null,表示发往所有节点;op标识消息的操作类型,比如是prepare还是propose消息;synode表示该消息的序号,类型为synode_no,消息组成详见site_def定义;msg_type表示消息类型,是普通消息还是不携带数据的noop消息等;receivers表示有多少节点已经接收到了该消息;a是消息数据,比如对于事务消息,该字段就包含了一系列的log_event。reply_to和proposal是2个ballot,ballot由一个节点序号nodeno和计数器cnt组成,表示某次投票号,显然proposal字段表示本次消息提案的投票号,reply_to表示本次消息是对reply_to对应的提案编号的回复。同个pax_msg的proposal和reply_to字段对应的ballot可能不同,举个例子,A节点向B节点发送prepare消息,B节点收到后,发现本节点已经接受了对应synode的消息,那么B节点在回复A消息时,会将消息中的propsal字段设置为本节点的提案编号并替换掉消息体。ballot是Paxos算法强相关的字段,对于某个确定的synode消息,在prepare阶段,一个节点只能继续处理比其之前收到的所有提案编号更大的提案,在acceptor阶段,一个节点只能处理不小于其之前收到的所有提案编号的提案。

在节点上,pax_msg保存在pax_machine中,在Paxos算法中,pax_machine就是一个Paxos实例,即Mencius算法中的instance,对应一条Paxos日志,一系列有序的Paxos日志组成了Paxos状态机。每个节点维护了一个最小5万个pax_machine对象的缓存(paxos cache),有专门的缓存管理协程cache_manager_task负责维持缓冲区的大小在合理范围内。如果节点上所有pax_machine的缓存大小超过了阈值,就会开始清理无用的pax_machine。淘汰算法采用lru机制,不能清理还未走完Paxos协议的pax_machine。pax_machine定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/* Definition of a Paxos instance */
struct pax_machine {
linkage hash_link; // 在hash表桶中的位置
stack_machine *stack_link; //指向hash_stack中的stack_machine
lru_machine *lru; // 指向当前节点的paxos cache中的LRU缓存
synode_no synode;
double last_modified; /* Start time */
linkage rv; /* 系统中的睡眠协程队列,本字段主要用于将协程挂到这个队列上,和一个协程执行完成后,唤醒睡眠的协程 */

struct {
ballot bal; /* The current ballot we are working on */
bit_set *prep_nodeset; /* Nodes which have answered my prepare */
ballot sent_prop; // 记录响应本节点的消息中号码最大的消息
bit_set *prop_nodeset; /* Nodes which have answered my propose */
pax_msg *msg; /* The value we are trying to push */
ballot sent_learn;
} proposer;

struct {
ballot promise; /* Promise to not accept any proposals less than this */
pax_msg *msg; /* The value we have accepted */
} acceptor;

struct {
pax_msg *msg; /* The value we have learned */
} learner;
int lock; /* 本条pax machine 对应的锁, 其实是一个标志位,标记本条pax machine对象是否正在被使用,比如处于使用状态的pax machine 不能被cache管理协程回收*/
pax_op op;
int force_delivery;
int enforcer;
};

struct ballot {
int32_t cnt;
node_no node;
};

前三个字段与缓存相关。synode标识了该pax_machine处理的消息序号。在MGR中,一个消息/提案是以异步的方式走完Paxos协议,发起投票的proposer_task会在rv字段上等待并周期性被唤醒,直到该提案完成。op表示消息操作类型。proposer、acceptor和learner分别保存了消息序号为synode的提案从本节点视角看到的不同阶段的状态。proposer字段是提案的发起者维护的字段,accetor和learner不会操作proposer字段。MGR中的Paxos实现支持2阶段(multi-paxos)和3阶段(basic paxos)协议,相比2阶段,3阶段协议多了一个prepare的过程。MGR对于正常的事务消息采用2阶段的方式。所以proposer字段包括了prepare和propose 2个阶段,其中bal表示目前的提案编号,prep_nodeset和prop_nodeset分别表示已经回复prepare和propose消息的节点集;sent_prop和sent_learn分别在收到大多数节点回复prepare和propose消息时设置,设置为bal;msg字段在proposer_task进行pax_machine初始化时被置为等待投票的客户端消息,如果有其他节点回复的prepare消息中携带的proposal大于proposer节点的ballot,那么msg会被替换为回复消息对应的pax_msg对象,即投票的value发生改变。acceptor字段由作为提案接受者的节点维护,保存了本节点至今已收到的消息编号为synode的最大ballot,msg字段为对应ballot携带的pax_msg。learner字段表示编号为synode的消息最终被学习的消息体/value。显然,该字段不为空表示对应synode已经完成了Paxos协议。

2.2 paxos协议:Mencius算法实现

2.2.1 basic paxos协议过程

正常事务处理时的流程也即paxos协议的流程,paxos协议主要分为三个阶段:prepare,propose,accept。

basic paxos协议流程

proposer_task

prepare和propose会通过proposer_task协程发起,当xcom模块接受到上层客户端发送来的事务消息时,会被放入prop_input_queue队列中,proposer_task会循环的从队列中获取消息,并对普通的事务消息进行batch操作。而对于视图或者配置变更消息,则不能进行batch。当事务被batch以后,会进行消息的propose操作,典型的paxos协议需要三个阶段,而具有leader的paxos协议只需要两阶段。具有leader节点的paxos协议在正常消息提议时只需要走两阶段,但是当被检测leader故障等情况下需要走三阶段过程。在proposer_task中,会不停的尝试提议消息,直到消息在整个集群中达成一致。

由于paxos整个过程从proposer_task开始,proposer_task中还负责待提议消息号赋值和对应pax machine的获取。
主要逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
static int proposer_task(task_arg arg) {
while (!xcom_shutdown){
// 从prop_input_queue队列中取出消息,存入client_msg中
CHANNEL_GET(&prop_input_queue, &ep->client_msg, msg_link); // ep为当前协程的上下文特征,即栈数据

// 对消息进行batch
while (AUTOBATCH && ep->size <= MAX_BATCH_SIZE &&
ep->nr_batched_app_data <= MAX_BATCH_APP_DATA &&
!link_empty(&prop_input_queue.data)){

msg_link *tmp;
app_data_ptr atmp;
CHANNEL_GET(&prop_input_queue, &tmp, msg_link);
atmp = tmp->p->a;
// 对于视图或者配置变更信息,不可以被batch
if (is_config(atmp->body.c_t) || is_view(atmp->body.c_t) ||
ep->nr_batched_app_data > MAX_BATCH_APP_DATA ||
ep->size > MAX_BATCH_SIZE) {
channel_put_front(&prop_input_queue, &tmp->l);
break;
}
ep->client_msg->p->a = atmp;

}

ep->msgno = current_message; // 初始化消息号,current_message代表了本节点下一个可用的消息号,每当一个消息被提交到上层MGR时都会更新current_message

// 直到要发送的消息在paxos中达成一致,否则不停的propose
for (;;) {
// 从状态机pax machine缓存获取一个实例,用于发送GCS上层发过来的客户端消息,最长等待60s
TASK_CALL(wait_for_cache(&ep->p, ep->msgno, 60));

当自身配置为三阶段提交或者需要force_delivery(如发生配置变更)或者其他节点在等待本节点时超时所以提出了no-op,并且本节点已经accept了,那么就需要三阶段提交
if (threephase || ep->p->force_delivery || ep->p->acceptor.promise.cnt) {
push_msg_3p(ep->site, ep->p, ep->prepare_msg, ep->msgno, normal);
} else {
push_msg_2p(ep->site, ep->p);
}

// Try to get a value accepted
while (!finished(ep->p)) {

// 周期性的睡眠等待和醒来
TIMED_TASK_WAIT(&ep->p->rv, ep->delay = wakeup_delay(ep->delay));
// 判断该pax_machine是否已经走完learn阶段或者如果在本次propose过程中已经得到了一个被提议了一个值,break;
if (finished(ep->p)) break;
push_msg_3p(ep->site, ep->p, ep->prepare_msg, ep->msgno, normal); // 否则,继续走三阶段阶段
}

// 检测本次提议最终获得的值是不是本节点想propose的值
if (match_my_msg(ep->p->learner.msg, ep->client_msg->p)) {
break;
} else{ // 如果不是,进行重试,继续尝试提议本次想要提议的消息
GOTO(retry_new);
}

}

}
}

prepare:

在三阶段过程中,首先进行prepare,根据paxos协议,prepare准备提议一个消息,并从整个集群中得知已经被accept的最大提议号的消息。在prepare阶段只带有本次的提议号,不带有消息。主要过如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static void push_msg_3p(site_def const *site, pax_machine *p, pax_msg *msg,
synode_no msgno, pax_msg_type msg_type) {
if (wait_forced_config) {
force_pax_machine(p, 1);
}

assert(msgno.msgno != 0);

BIT_ZERO(p->proposer.prep_nodeset); // 清空已经回复本节点发出的prepare消息的所有节点
p->proposer.bal.node = get_nodeno(site); // 获取本节点的node号
{
int maxcnt = MAX(p->proposer.bal.cnt, p->acceptor.promise.cnt);
p->proposer.bal.cnt = ++maxcnt; // 找到一个已知的最大的提议号
}
msg->synode = msgno; // 设置本次propose message的消息号为当前pax machine的消息号
msg->proposal = p->proposer.bal; // 提议号
msg->msg_type = msg_type;
msg->force_delivery = p->force_delivery;

assert(p->proposer.msg);
send_to_acceptors(p, "prepare_msg"){
....
// 向当前视图中的所有节点发送prepare消息
for (i = 0; i < max; i++) {
if (test_func(s, i)) { // 调用all函数,总是return 1
retval = _send_server_msg(s, i, p){
msg_link *link = msg_link_new(p, to);
// 最终消息被放入像server发送的outgoing消息队列中
channel_put(&s->outgoing, &link->l);
}
}
}
....
}
}

当另外的节点收到prepare请求时,首先通过这个请求的类型字段prepare_op在dispatch_op中进行对应的处理:

1
2
3
4
// 处理prepare 请求
pax_msg *reply = handle_simple_prepare(p, pm, pm->synode);
// 响应prepare请求,通过reply
if (reply != NULL) SEND_REPLY;

根据paxos协议,当节点收到其他节点发送的prepare消息时,首先判断接受的消息的提议号如果不小于当前的提议号,则响应本节点已经接受的最大提议号的消息值。并做出以后不再接受比接受的prepare提议号小的承诺。在xcom中,处理prepare消息的详过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
pax_msg *handle_simple_prepare(pax_machine *p, pax_msg *pm, synode_no synode) {
pax_msg *reply = NULL;

// 如果本节点已经学到了一个值
if (finished(p)) {
reply = create_learn_msg_for_ignorant_node(p, pm, synode); // 响应已经学到的值
} else {
int greater =
gt_ballot(pm->proposal,
p->acceptor.promise); // 判断收的提议号本节点做出承诺的提议号
if (greater || noop_match(p, pm)) { // 如果满足承诺条件或者消息类型为noop,noop消息可以直接忽略
p->last_modified = task_now();
if (greater) {
p->acceptor.promise = pm->proposal; // 承诺不再接受更小的提议
}
reply = create_ack_prepare_msg(p, pm, synode);// 创建响应prepare的消息
}
}
return reply;
}

其中判断ballot大小的函数为:

1
2
3
int gt_ballot(ballot x, ballot y) {
return x.cnt > y.cnt || (x.cnt == y.cnt && x.node > y.node);
}

可以看出通过比较cnt和节点的node号来确定

根绝paxos协议,对于响应prepare的消息 reply,会带有本节点已经accept的消息值,如果还没accept,则直接响应空消息ack_prepare_empty_op。响应prepare请求的消息通过以下方式创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
reply = create_ack_prepare_msg(p, pm, synode);
static pax_msg *create_ack_prepare_msg(pax_machine *p, pax_msg *pm,
synode_no synode) {
CREATE_REPLY(pm);
reply->synode = synode;
if (accepted(p)) { // 如果已经接受一个值,那么回复消息中带有这个提议
reply->proposal = p->acceptor.msg->proposal;
reply->msg_type = p->acceptor.msg->msg_type;
reply->op = ack_prepare_op; // 把消息类型设置为ack_prepare_op
safe_app_data_copy(&reply, p->acceptor.msg->a);
} else { // 还没有接受任何值,直接返回空
reply->op = ack_prepare_empty_op;
}
return reply;
}

当发起prepare的节点收到其他节点对于prepare请求的ack_prepare_op响应时,如果响应的提议号比本节点的提议号大,则进行响应消息的替换,并检测是否可以进入propose过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void handle_ack_prepare(site_def const *site, pax_machine *p,
pax_msg *m) {

bool_t can_propose = FALSE;
if (m->op == ack_prepare_op &&
gt_ballot(m->proposal, p->proposer.msg->proposal)) { // 如果响应的prepare请求是ack_prepare_op(代表非空)并且提议号比自身提议号大
replace_pax_msg(&p->proposer.msg, m); // 需要将自身的提议内容设置为响应的提议内容
assert(p->proposer.msg);
}
if (gt_ballot(m->reply_to, p->proposer.sent_prop)) { // 如果比自身已经收到的所有响应的提议号都大,才进行check_propose过程
can_propose = check_propose(site, p);
}
return can_propose;
}

如果收到的ack_prepare_op的个数大于集群中节点数目的一大半,则可以进入propose过程,通过以下方式检测是否prepare成功:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bool_t check_propose(site_def const *site, pax_machine *p) {

{
bool_t can_propose = FALSE;
// 判断是否大多数节点都已经回复本次prepare
if (prep_majority(site, p)) {
// 进行一些propose阶段的初始化工作
p->proposer.msg->proposal = p->proposer.bal;
BIT_ZERO(p->proposer.prop_nodeset); // 清空记录收到节点响应的集合
p->proposer.msg->synode = p->synode; // 更新消息号
init_propose_msg(p->proposer.msg); // 将消息初始化为accept_op
p->proposer.sent_prop = p->proposer.bal; // 更新收到所有响应里最大的提议号
can_propose = TRUE;
}
return can_propose;
}
}

propose:

按照与prepare相同的过程,进入propose阶段后会向所有的节点发送propose请求。当其他节点收到其他节点的accept_op消息时,根据paxos协议,如果propose消息的提议号大于等于当前节点做出的承诺号,则接受这个消息。过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static void handle_accept(site_def const *site, pax_machine *p,
linkage *reply_queue, pax_msg *m) {
{
pax_msg *reply = handle_simple_accept(p, m, m->synode){
pax_msg *reply = NULL;
if (finished(p)) { // 已经learned到了一个值
reply = create_learn_msg_for_ignorant_node(p, m, synode);
} else if (!gt_ballot(p->acceptor.promise,
m->proposal) ||
noop_match(p, m)) { // 如果propose的提议号大于等于本节点承诺不再接受的提议号 或者 pax_msg是noop,或者本节点pax_machine对象的状态是已经accept了一个noop消息
p->last_modified = task_now();
replace_pax_msg(&p->acceptor.msg, m);
reply->op = ack_accept_op; // 设置消息类型为ack_accept_op;
reply->synode = synode; // 设置消息号
}
return reply;
};
// 响应propose请求
if (reply != NULL) SEND_REPLY;
}
}

当发出accept_op的节点收到ack_accpt_op的消息时,会检查能否进入learn阶段。做以下的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static void handle_ack_accept(site_def const *site, pax_machine *p,
pax_msg *m) {

pax_msg *learn_msg = handle_simple_ack_accept(site, p, m){
pax_msg *learn_msg = NULL;
if (get_nodeno(site) != VOID_NODE_NO && m->from != VOID_NODE_NO &&
eq_ballot(p->proposer.bal, m->reply_to)) { // 如果是发送本节点的ack_accept_op消息
BIT_SET(m->from, p->proposer.prop_nodeset);
if (gt_ballot(m->proposal, p->proposer.sent_learn)) { // 同ack_prepare_op操作,同样为了拒绝比当前已经接受的accept消息号小的请求
learn_msg = check_learn(site, p); // 检查是否可以进入learn阶段
}
}
return learn_msg;
};
if (learn_msg != NULL) { // 如果learn_msgx消息不为空
if (learn_msg->op == tiny_learn_op) { // 如果设置的learn 消息的方式为tiny_learn_msg,
send_tiny_learn_msg(site, learn_msg);
} else {
/* purecov: begin deadcode */
assert(learn_msg->op == learn_op);
send_learn_msg(site, learn_msg);
/* purecov: end */
}
}
}

learn:

当接受到一半以上的成功accept的响应时,可以进入learn阶段。需要做以下的检查:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static pax_msg *check_learn(site_def const *site, pax_machine *p) {

pax_msg *learn_msg = NULL;
if (get_nodeno(site) != VOID_NODE_NO && prop_majority(site, p)) {

if (no_duplicate_payload) { // no_duplicate_payload 被设置为1
learn_msg = create_tiny_learn_msg(p, p->proposer.msg);
} else {
/* purecov: begin deadcode */
init_learn_msg(p->proposer.msg);
learn_msg = p->proposer.msg;
/* purecov: end */
}
p->proposer.sent_learn = p->proposer.bal;
}
return learn_msg;
}

当进入learn阶段以后,根据收到的消息是tiny_learn_op和learn_op,做出对应的不同的处理,在更新本节点pax_machine对象时,tiny_learn_op是将对象的acceptor.msg赋learner.msg,而learn_op时是将从其他节点收到的pax_msg赋予本节点pax_machine对象的acceptor.msg和learner.msg。对于前者,如果当前节点已经accet值并且接收到tiny_learn_op消息对应的learn值等于自身accpet值,则直接进入进入do_learn()过程,此时一条消息走完了整个paxos协议,在所有节点上达成一致;如果上述两个条件都不满足,则需要调用send_learn向其他节点学习需要最终的值,主要过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
void handle_tiny_learn(site_def const *site, pax_machine *pm, pax_msg *p) {
if (pm->acceptor.msg) {
if (eq_ballot(pm->acceptor.msg->proposal, p->proposal)) { // 如果是当前已经接收的提议
pm->acceptor.msg->op = learn_op;
update_max_synode(p); // 更新已知的最大消息号
handle_learn(site, pm, pm->acceptor.msg);
} else { // 如果接收的learn消息不是本节点已经accept的消息,则向其他节点学习
send_read(p->synode);
}
} else { // 如果本节点还没有接收值,则向其他节点学习
send_read(p->synode);
}
}

在handle_learn函数中,主要消息的替换和把消息当前pax_machine设置为被选定,此时,消息在paxos中的整个流程结束。此时还会激活sweeper_task协程进行pax_machine 缓冲区的清理和进行Mencius协议的simple
paxos 过程。handle_learn主要过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void handle_learn(site_def const *site, pax_machine *p, pax_msg *m) {
if (!finished(p)) { / 避免重复学习新的值
activate_sweeper(); //
do_learn(site, p, m){
if (m->a) m->a->chosen = TRUE; // 首先将消息设置为被选定状态
// 在do_learn 中,将自身作为acceptor和learner角色时的值替换为最终被决定的消息m
replace_pax_msg(&p->acceptor.msg, m);
replace_pax_msg(&p->learner.msg, m);
}
// 处理一些特殊消息
if (m->a && m->a->body.c_t == unified_boot_type) {
XCOM_FSM(x_fsm_net_boot, void_arg(m->a));
}
/* See if someone is forcing a new config */
if (m->force_delivery && m->a) {
/* Immediately install this new config */
switch (m->a->body.c_t) {
case add_node_type:
....
break;
/* purecov: end */
case remove_node_type:
....
break;
/* purecov: end */
case force_config_type:
.....
break;
default:
break;
}
}
}

task_wakeup(&p->rv);
}

在handle_tiny_learn中,如果本节点acceptor.msg不存在或者acceptor.msg->proposal不等于tiny_learn_op消息携带的投票器,则表示没有参与之前的accept流程或者当前accept值不是经过大多数同意的值,这两种情况需要调用send_read处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static void send_read(synode_no find) {
// 找到一个属于当前视图的节点
site_def const *site = find_site_def(find);

/* See if node number matches ours */
if (site) {
// 如果找到的节点不是本节点
if (find.node != get_nodeno(site)) {
pax_msg *pm = pax_msg_new(find, site); // 创建一个pax_msg消息
ref_msg(pm); // 对本条pm消息引用计数
create_read(site, pm); // 将消息类型设置为read_op类型

// 如果本节点还没有分配节点号(why?)
if (get_nodeno(site) == VOID_NODE_NO)
send_to_others(site, pm, "send_read"); // 向所有的其它节点发送read_op消息
else
send_to_someone(site, pm, "send_read"); // 通过round-bin的方式挑选一个live节点发送read_op

unref_msg(&pm); // 解引用,如果引用计数为0,析构本条消息
} else { // 如果找到的节点是本节点自身
pax_msg *pm = pax_msg_new(find, site);
ref_msg(pm);
create_read(site, pm);
send_to_others(site, pm, "send_read");
unref_msg(&pm);
}
}
}

提交消息到上层客户端

当一条消息被learn之后,表明已经在大多数节点上达成一致,此时便可以进入paxos协议的提交阶段,在xcom模块中,即将达成一致的消息返回给MGR的GCS模块。提交阶段主要分为两个过程:x_fetch和x_execute,前者是获取之前已经被do_learn()设置为被选定状态的消息,后者用于将这些消息发送给上层GCS模块。主要通过executor_task()协程进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int executor_task(task_arg arg MY_ATTRIBUTE((unused))) {

if (executed_msg.msgno == 0) executed_msg.msgno = 1;
delivered_msg = executed_msg;
ep->xc.state = x_fetch;
executor_site = find_site_def_rw(executed_msg);

while (!xcom_shutdown && ep->xc.state != 0) {
// 进入请求获取阶段
if (ep->xc.state == x_fetch) {
TASK_CALL(get_xcom_message(&ep->xc.p, executed_msg, FIND_MAX)); // 调用get_xcom_message获取已经被选定但是还未被execute_task感知的消息
x_fetch(&ep->xc){
if (x_check_exit(xc)) {
xc->state = x_terminate;
} else {
SET_EXECUTED_MSG(incr_synode(executed_msg)); // 增加executed_msg号,用于表示已经走完Paxos全流程但还未被executor_task()处理的最小的synode请求。同时递增全局的下一个可用的current_message,用于proposer_task中客户端消息号的赋值
if (x_check_execute_inform(xc)) {
xc->state = x_execute; // 将状态设置为x_execute
}
}
}
} else { // 当协程是execute状态,处于请求执行阶段
ep->xc.state(&ep->xc); // 调用x_execute协程
}
}
}

对于 x_execute 协程,在执行之前会首先判断是否满足执行条件,delivery_limit为视图变更阶段退出节点可以执行的消息号的上限,只有当前消息号小于delivery_limit时才可以执行,详见集群成员变更过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
static void x_execute(execute_context *xc) {
site_def const *x_site = find_site_def(delivered_msg);


xc->p = get_cache(delivered_msg);

if (xc->p->learner.msg->msg_type != no_op) {
/* 只有小于delivery_limit时,才可以执行 */
if (xc->exit_flag == 0 || synode_lt(delivered_msg, xc->delivery_limit)) {

last_delivered_msg = delivered_msg;
execute_msg(find_site_def_rw(delivered_msg), xc->p, xc->p->learner.msg);
}
}
/* delivered_msg 等于当前配置的start号,表示此配置刚开始执行,旧配置已经结束,可以进行垃圾回收机制 */
if (synode_eq(delivered_msg, x_site->start)) {
garbage_collect_servers();
}

/* 检测是否可以退出和增加delivered_msg */
x_check_increment_execute(xc);
}


void execute_msg(site_def *site, pax_machine *pma, pax_msg *p) {

last_delivered_msg = delivered_msg;
execute_msg(find_site_def_rw(delivered_msg), xc->p, xc->p->learner.msg){
app_data_ptr a = p->a;
// 对于不同的消息类型,做不同的处理
switch (a->body.c_t) {
// 配置消息
case unified_boot_type:
case force_config_type:
deliver_config(a);
case add_node_type:
case remove_node_type:
break;
case app_type:
// 如果是客户端类型消息,将会把消息发送到客户端,最终会执行MGR上层注册到Xcom的回调处理函数xcom_receive_data进行处理。
deliver_to_app(pma, a, delivery_ok);
break;
// 全局视图消息
case view_msg:
deliver_global_view_msg(site, p->synode);
break;
default:
break;
}
}

2.2.1 simple paxos协议过程

在Mencius算法中,所谓simple paxos即在设置leader角色的paxos协议过程中,规定了:

  • 只有leader可以正常的提议值,且当leader节点提议no-op值时,不用经过两阶段。
  • 非leader节点只能提议no-op值,且必须经过三阶段

对于第一点要求,由于非leader节点只能提议no-op值,因此leader提议no-op时不经过两阶段过程也不会造成不一致的情况;对于第二点要求,虽然非leader节点只可以提议非no-op值,但最后被decide的值也不一定是no-op,因为此时可能leader节点提议的正常事务消息已经被部分节点accept,这里的三阶段可以利用paxos原理,保证不会发生不一致的现象。

何时提议no-op值?

在Mencius算法中,提议no-op的场景如下:

    1. instance序列中,当大于本节点负责的instance已经被选定value时,所有本节点负责的instance且索引小于这个已经被决定的instance都被提议no-op
    1. 非leader节点怀疑leader节点故障,希望提议no-op填充故障leader节点负责的instance
      image

对于第一点,在xcom模块中,主要通过sweeper_task 协程负责触发提议no-op的过程,首先会寻找本节点上已经执行的消息号executed_msg,然后从此消息号开始检测是否需要提议no-op消息。首先需要判断需要检测的消息号是不是小于距今已知的最大消息号(如果大于,那么代表以后本节点可能会提议此消息号,还不需要进行skip操作),根据此消息号从paxos cache中新建或者查找一个已经存在的pax machine,即instance,如果此instane满足一下条件:没有处于被占用的状态,没有accept任何值,自身没有提议任何值,没有选定任何值,则可以进行skip操作。

sweeper不停的增加消息号,检测每个消息号对应的instance状态,对于满足条件的进行skip操作。主要过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static int sweeper_task(task_arg arg MY_ATTRIBUTE((unused))) {

ep->find = get_sweep_start(); // 初始化为本节点的已经被执行的消息号

while (!xcom_shutdown) { // 不停处于检测状态
ep->find.group_id =
executed_msg.group_id; /* In case group id has changed */
{
while (synode_lt(ep->find, max_synode) && !too_far(ep->find)) {
/* pax_machine * pm = hash_get(ep->find); */
pax_machine *pm = 0;
if (ep->find.node == VOID_NODE_NO) {
if (synode_gt(executed_msg, ep->find)) {
ep->find = get_sweep_start();
}

}
pm = get_cache(ep->find); // 寻找或者新建一个对应消息号的instance
if (pm && !pm->force_delivery) { /* We want full 3 phase Paxos for
forced messages */

if (!is_busy_machine(pm) && pm->acceptor.promise.cnt == 0 &&
!pm->acceptor.msg && !finished(pm)) { //instance未被使用, 没有在对应instance上做出承诺,没有接受别的节点建议的消息或者自身未建议消息,instance没有学习到任何值
pm->op = skip_op;


skip_msg(pax_msg_new(ep->find, find_site_def(ep->find))) // 新建一条skip 消息进行 skip操作{
prepare(p, skip_op);
p->msg_type = no_op;
return send_to_all(p, "skip_msg"); // 根据Mencius算法,对于skip消息协调者可以直接入learn阶段,而不用三阶段或者两阶段
};
printf("can skipping, %lld, %lld\n",(long long)ep->find.msgno, (long long)ep->find.node);
fflush(stdout);

}
}
ep->find = incr_msgno(ep->find); // 递增检测所有的消息号
printf("next to detect skip, %lld, %lld\n",(long long)ep->find.msgno, (long long)ep->find.node);
fflush(stdout);
}
}

}

对于第二点,发生在节点提交到上层MGR时,在basic paxos中的executor_task协程中,在x_fetch,也即获取可以提交给上层MGR的消息时,如果对于一个消息号对应的instance在本节点上还未被决定,首先本节点将会尝试从其他节点learn对应的消息,如果尝试无果,将会通过三阶段(从prepare过程开始)尝试skip这条消息。对应代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
int get_xcom_message(pax_machine **p, synode_no msgno, int n) {

....
*p = force_get_cache(msgno); // 获取消息号对应的paxs machine,即instance

dump_debug_exec_state();
while (!finished(*p)) { // 如果此instance还未达成一致,不停重试
....
find_value(ep->site, &ep->wait, n); // 从其他节点学习或者尝试skip这条instance
*p = get_cache(msgno);
...
}
}

在 find_value中,当前的节点会在4次以内获取指定msgno消息的尝试。第一次和第二次会尝试从其他节点learn对应的消息。而从第3次开始,本节点将通过检测集群中还活动的节点,如果本节点可以成为leader,会通过三阶段尝试跳过这条消息,否则还是从其他节点读取。如果前三次尝试仍然失败,从第4次开始,集群中所有节点都会进行三阶段过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void find_value(site_def const *site, unsigned int *wait, int n) {

switch (*wait) {
case 0:
case 1: // 第一次和第二次都会尝试从其他节点读取
read_missing_values(n);
(*wait)++;
break;
case 2: // 第三次:如果本节点是集群中节点号最小的节点,则由本节点负责提议no op,接下来进入push_msg_3p过程
if (iamthegreatest(site))
propose_missing_values(n);
else
read_missing_values(n);
(*wait)++;
break;
case 3:
propose_missing_values(n);
break;
default:
break;
}
}

2.2 故障检测

MGR中故障响应机制主要包括故障探测,移除故障节点和自动重新加入,其中xcom模块主要负责故障探测。每个节点都有一个server对象,其中与故障检测相关的字段如下:

1
2
3
4
5
6
7
8
9
10
11
12
/* Server definition */
struct server {
...
double detected; /* Last incoming */
};

struct site_def {
....
detector_state detected; /* 每个节点的最后一条传入消息的时间 */
};

typedef double detector_state[NSERVERS];

xcom在每次成功的发送数据到某节点或者从某个节点接收到数据时都会记录当前时间(detected time,即server字段的detected)。意味着这个节点在当前时间还活着。

与故障检测相关的协程主要为alive_task()和detector_task(),

alive_task负责:

  • 在节点空闲了0.5秒后,alive_task()会发送i_am_alive_op消息给其他节点。空闲是指0.5秒内没有发送任何消息出去。

    1
    if (server_active(site, get_nodeno(site)) < sec - 0.5)
  • 当某个节点4秒(#define MAX_SILENT 4)内没有任何通信时(代码中称作may_be_dead),发送are_you_alive_op消息给这个节点,去探查该节点是否还活着。

detector_task()主要任务是:

每1秒做一轮检查。检查节点是否还活着。判断的标准是:

1
2
3
#define DETECTOR_LIVE_TIMEOUT 5.0

detected_time > task_now() - DETECTOR_LIVE_TIMEOUT

即5秒内没有和这个节点通信。如果此节点的detected time在5秒之内,则为活着,否则认为该节点可能已经死去。

当detector_task()发现任何节点的状态发生了变化:从活着变成可能死了;从可能死了变成活着

都会通过local view回调函数xcom_receive_local_view来处理状态变化。状态的变化由GCS模块处理。

2.3 集群成员变更

xcom集群成员变更算法参考了两篇论文,一篇是lamport论文 Reconfiguring a State Machine 中的方法。论文解读如下:

文档:paper Reconfiguring a State Machine

链接:http://note.youdao.com/noteshare?id=fe63ab4a6e146e67927cca351bc2e806&sub=56A28ECB00134D44A989A080C9E0984E

另一篇是:

The SMART way to migrate replicated stateful services. In Proc. EuroSys’06, ACM SIGOPS/EuroSys European Conference on Computer Systems

集群成员变更分为加入节点和删除节点的过程,一致性协议中集群成员变更的方法主要分为两类,一类是两阶段方式,一类是raft 论文中提到的joint consencus方式。在xcom中采用了上述论文中的Ra算法,其中a为alpha个延迟,即集群成员变更的消息被提议以后会经过alpha延迟后才生效,这可以允许alpha个消息的并发提交。经过alpah个消息后,每个节点按照新的配置进行集群成员变更的操作。在xcom中,alpha对应的变量为event_horizon

当有新的节点加入集群时,不会发生数据丢失的问题。但是当有节点退出时,处理过程比较复杂。下面通过一个例子介绍一下具体的节点退出过程:

考虑三个配置C1和C2,C3。C1有两个节点,A和B。C2只有节点B。C3为空。
消息编号为N的配置将在(至少)alpha消息延迟后激活,其中alpha 是event horizon大小。

所以,C1.start=C1+alpha,C2.start=C2+alpha。
从C1中删除的A在新的配置C2(在本例中为B)中的大多数节点从配置C1中学习了所有消息(所有小于C2.start的消息)之前,不能退出。
如何知道大部分C2已经学习到了这些信息?

假设E表示尚未由被选定(并执行)的第一条消息,则提议者将不会尝试提出编号大于等于E+alpha的消息,
并且所有消息编号大于等于E+alpha的传入消息都将被忽略。
E由executor_task递增,因此所有小于E的消息都是已知的。这意味着当E+alpha的值可以被学习到时,直到E(包括E)的所有消息也都已经被学习到,
尽管并非所有消息E+1..E+alpha-1都需要被learn。

这就要求退出的节点(A)需要等待,直到它知道C2.start+alpha的值,
因为到那时它知道C2中的大多数节点已经准备好执行C2.start,
这反过来意味着C2中的大多数节点都知道来自配置C1的所有值。

注意,离开C1的节点应该传递给应用程序的最后一条消息是C2.start-1,这是C1的最后一条消息。

退出的节点如何从下一个配置中获取被选定的值?有两种方法都被使用。
首先,尝试退出的节点可以简单地请求消息。get_xcom_message()将对所有消息小于等于max_synode执行此操作,但可能需要一些时间。
其次,C2的节点可以将消息C2.start..C2.start+alpha发送给被移除的节点(C1中的节点,而不是C2中的节点)。
inform_removed()函数负责执行此操作。通过跟踪包含要退出的节点的最旧配置来处理配置足够接近C0<C1<=C0+alpha的情况。

上述方式将处理离开C1的节点的情况。如果处理离开C2的节点呢?C3为空,因此离开C2的B不能等待来自C3的消息。
但由于C3是空的,所以不需要等待。它可以在执行完C3.start‐1(C2的最后一条消息)后立即退出。
如果C3.start-1<C2.start+alpha呢?如果C2和C3接近,就会发生这种情况:
B将在A有机会学习C2.start+alpha之前退出,这将使A永远都被hang住。
显然,需要施加一个额外的约束,即C3.start必须大于C2.start+alpha。这由空配置的特殊测试来处理。

除了上述方法之外,还有中比较完美的解决方案尚未被实现即raft论文中的共同一致,因为它需要对一致性协议的过程进行更多的修改。
按照共同一致算法,在xcom中,如果我们要求消息C2..C2.start‐1的大部分来自C1中的节点和C2中的节点,
那么不在C2中的节点可以在执行消息C2.start-1后退出,因为我们知道C2的大多数节点也同意这些消息,
因此它们不再依赖于不在C2中的节点。即使C2是空的,这也是有效的。
请注意,要求C1和C2的多数与要求C1+C2的多数不同,这意味着提议者逻辑需要考虑来自两组不同接受者的应答者。

当集群成员变更的配置消息传来时,首先会进行配置的更改,如果配置更改成功,会判断本节点是属于删除的节点还是新添加的节点还是旧配置中存在,新配置中还会继续保留的节点,其中后两种情况的处理过程相同;下面看一下节点加入或者退出时具体的执行过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static void x_fetch(execute_context *xc) {
/* 在准备好执行来自新配置定义的消息之前,不要传递消息。此时需要保证大多数节点已经从旧配置集群中学习到了所有消息。 */

app_data *app = xc->p->learner.msg->a;
if (app && is_config(app->body.c_t) &&
synode_gt(executed_msg, get_site_def()->boot_key)) /* 判断是否是配置变更请求,当节点退出时,消息类型为remove_node_type */
{
site_def *site = 0;
bool_t reconfiguration_successful =
handle_config(app, (xc->p->learner.msg->force_delivery != 0));
if (reconfiguration_successful) {
/*如果重新配置失败,则不会产生任何影响。只有当重新配置生效时,下面的过程才有意义 */
set_last_received_config(executed_msg);
garbage_collect_site_defs(delivered_msg);
site = get_site_def_rw();
if (site == 0) {
xc->state = x_terminate;
return;
}

if (xc->exit_flag == 0) {
/* We have not yet set the exit trigger */
setup_exit_handling(xc, site); // 设置退出逻辑,包括设置退出的延迟消息号等
}
}
}
/* 检查是否可以退出和增加 executed_msg */
x_check_increment_fetch(xc);
}

对于setup_exit_handling 函数,主要过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static void setup_exit_handling(execute_context *xc, site_def *site) {
synode_no delay_until;
if (is_member(site)) { // 如果本节点属于新的配置,那么代表本节点不是新添加进来的节点,也不是被删除的节点,可能是新被加入的节点,也可能是已经存在的节点
delay_until = compute_delay(site->start, site->event_horizon); // 根据event_horizon 计算退出的延迟,计算方式为start.msgno += event_horizon
} else {
/* 看看本节点离开时下一个配置是否会是空,即上述例子中的C3配置。如果新的site为空,应该在从配置传递完最后一条消息后退出。 */

/* 在下一个配置开始生效后不能传递任何消息 */
xc->delivery_limit = site->start;

/*
如果本节点不是新配置的成员,应该在看到超过当前节点结束消息号的足够多的消息后退出,对应于上述例子中的c2+start。这样可以确保下一个配置的大多数节点都学习到了属于当前配置的所有消息。
*/
xc->exit_synode = compute_delay(site->start, site->event_horizon);
if (is_empty_site(site)) {
/* 如果新配置为空,增加start以允许节点在start之前终止。这就好像在exit_synode之后有一个非空的组,有效地允许当前组的大多数成员在exit_synode之前的所有消息上达成一致。
*/
site->start = compute_delay(
compute_delay(site->start, site->event_horizon), site->event_horizon);
}
if (!synode_lt(xc->exit_synode, max_synode)) {
/* 需要来自下一个配置的消息,所以相应地设置max_synode。 */
set_max_synode(incr_synode(xc->exit_synode));
}
/* 设置在哪里切换到执行并通知删除的节点 */
delay_until = xc->exit_synode;

/* 代表本节点是被删除的节点且将要退出 */
xc->exit_flag = 1;
}


if (synode_gt(delay_until, max_synode))
set_max_synode(incr_msgno(delay_until)); // 更新m最大消息号
fifo_insert(delay_until);
(xc->inform_index)++;

}

下面检测此时是否可以退出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

static void x_check_increment_fetch(execute_context *xc) {
if (x_check_exit(xc)) { // 表明本节点需要退出
xc->state = x_terminate;
} else { // 本节点还属于新的配置中,不需要退出
SET_EXECUTED_MSG(incr_synode(executed_msg));
if (x_check_execute_inform(xc)) { // inform_removed 函数,将消息推送到将要离开的节点
xc->state = x_execute; 然后便可以继续执行
}
}
}
// 当本节点是被删除的节点且满足删除条件时,可以退出;满足删除的条件为本节点delivered_msg消息号没有超过传递限制且本节点executed_msg消息号已经等于exit_synode消息号了,那么此时也可以保证新配置中节点已经学习到了旧配置中所有的消息
static int x_check_exit(execute_context *xc) {
return (xc->exit_flag && !synode_lt(executed_msg, xc->exit_synode) &&
!synode_lt(delivered_msg, xc->delivery_limit));
}

3. 协程模块task

xcom首先实现了一套协程库:

1
2
3
4
5
6
/** \file
Rudimentary task system in portable C, based on Tom Duff's switch-based
coroutine trick
and a stack of environment structs. (continuations?)
Nonblocking IO and event handling need to be rewritten for each new OS.
*/

可以看出xcom中采用了基于”Duff-switch”的协程实现,所谓”Duff-switch”,是指主要通过c语言中的switch语句与 循环语句的嵌套实现函数退出后循环语句的继续执行,下面展示一种协程的无堆栈实现:

1
2
3
4
5
6
7
8
9
10
11
int function(void) {
static int i, state = 0;
switch (state) {
case 0:
for (i = 0; i < 10; i++) {
state = 1;
return i;
case 1:;
}
}
}

该函数的作用是第i次调用时返回i,最多10次,其核心部分为switch语句以及return前后两句,通过设置不同的state来保证下一次调用时从上次退出的地方继续执行。

因此在每次调用return时设置的不同的state,利用switch的条件跳转可以跳转到函数指定的位置继续执行。可以利用LINE宏来设置state,下面是简单利用LINE 实现协程举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <stdio.h>

#define TASK_BEGIN static int state = 0; switch (state) { case 0:
#define TASK_YIELD(x) do { state = __LINE__; return x; case __LINE__:; } while (0)
#define TASK_END }



void f1() {
TASK_BEGIN;
puts("1");
puts("2");
TASK_YIELD();
puts("3");
TASK_END;
}

void f2() {
TASK_BEGIN;
puts("x");
TASK_YIELD();
puts("y");
puts("z");
TASK_END;
}

int main (void) {
f1();
f2();
f1();
f2();
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void f1(){
static int state = 0;
switch (state)
{ case 0:
puts("1");
puts("2");
do{
state = __LINE__;
return;
case __LINE__:;
} while (0);
puts("3");
};
}

上述代码运行后会依次输出1,2,x,3,y,z。可以看出各个函数执行是一个交互的过程,并能保证函数退出后继续从上次返回的位置执行。
main()中演示的过程便是协程切换的过程。

因为switch内部不能任意定义变量,所以需要在TASK_BEGIN之前定义所需变量,在xcom模块中,每个协程开始之前都会定义和初始化与协程运行和切换相关的上下文。xcom中协程机制的实现和如上的实现方式相同,但是添加了堆栈,更为复杂。xcom中所有的协程(task)也通过类似于TASK_BEGIN, TASK_YEILD, TASK_END以及其他更为完善的机制进行协程的开始,睡眠,抢占和跳转。下面将主要介绍xcom的协程机制:

协程的数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct task_env {
linkage l; /* Used for runnable tasks and wait queues */
linkage all; /* Links all tasks */
int heap_pos; /* Index in time priority queue, necessary for efficient removal
*/
terminate_enum
terminate; /* Set this and activate task to make it terminate */
int refcnt; /* Number of references to task */
int taskret; /* Return value from task function */
task_func func; /* The task function */
task_arg arg; /* Argument passed to the task */
const char *name; /* The task name */
TaskAlign *where; /* High water mark in heap */
TaskAlign *stack_top; /* The stack top */
TaskAlign *sp; /* The current stack pointer */
double time; /* Time when the task should be activated */
TaskAlign buf[TASK_POOL_ELEMS]; /* Heap and stack */
int debug;
int waitfd;
int interrupt; /* Set if timeout while waiting */
};

前两个个字段用于所有协程底层数据结构的阻止方式;refcnt用于表示一个协程被引用的次数,协程只被创建一次,但是可以被引用多次,当引用次数为0时,便可以删除;func字段是一个函数指针,指向协程代表的函数,taskret表示协程的返回值,用于判断协程被挂起还是真的退出,name表示协程代表的函数的名;缓冲区buf用于协程的堆栈,where代表堆的最高水位,初始指向buf[0]的位置,stack_top字段指向栈顶,初始指向buf[TASK_POOL_ELEMS -1 ]位置,每分配一次协程栈便减一,sp用于表示某个协程在栈中的位置。

另外比较重要的数据结构包括task queue:

1
2
3
4
5
6
/* Priority queue of task_env */
struct task_queue {
int curn;
task_env *x[MAXTASKS + 1];
};
typedef struct task_queue task_queue;

task_queue 定义的全局变量为task_time_q,用来保存休眠的协程,处于task_time_q的协程不能立刻执行,需要等待一段时间。

此外由全局活动(等待执行)协程列表tasks,是协程创建(task_new())后构成的双向循环列表,处于执行状态和等待执行的协程都处于列表中

协程创建

主要用于状态初始化,绑定对应的函数,加入到活动协程列表tasks中去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
task_env *task_new(task_func func, task_arg arg, const char *name, int debug) {
task_env *t;
if (link_empty(&free_tasks))
t = (task_env *)malloc(sizeof(task_env));
else
t = container_of(link_extract_first(&free_tasks), task_env, l);
IFDBG(D_NONE, FN; PTREXP(t); STREXP(name); NDBG(active_tasks, d););
task_init(t);
t->func = func;
t->arg = arg;
t->name = name;
t->debug = debug;
t->waitfd = -1;
t->interrupt = 0;
activate(t){
link_into(&t->l, &tasks); //tasks为可执行协程双向循环列表,tasks为列表的头结点, t加入到tasks的前面
}
task_ref(t);
active_tasks++;
return t;
}

协程开始

协程开始的过程如下:

1
2
3
4
5
6
7
8
#define TASK_BEGIN                                            \

switch (stack->sp->state) { \
case 0: \
pushp(stack, TASK_ALLOC(stack, struct env)); \
ep = _ep; \
assert(ep); \
TERM_CHECK;

由于协程的运行机制为单线程模式,因此某一时刻只能有一个协程在运行,因此xcom设置一个全局变量stack,作为当前正在运行的协程栈,stack->sp->state为协程运行的位置,即代码的第几行,初始时设置为0,因此在协程被创建(task_new)第一次执行时,会进入switch语句的case 0 分支,首先会分为此协程分配栈帧,并且将该协程的运行环境上下文ep与全局变量_ep(stack->sp->ptr)绑定。因此在协程被挂起之前,运行过程一直处于switch 语句的case 0 分支内。

协程挂起(YIELD)

当协程被挂起后会暂停执行,调度过程会切换到下一个活动的协程执行,协程从运行状态转化为挂起状态的过程如下所示:

1
2
3
4
5
6
7
8
9
10
11
#define TASK_YIELD                     \
{ \
TASK_DEBUG("TASK_YIELD"); \
stack->sp->state = __LINE__; \
return 1; \
case __LINE__: \
TASK_DEBUG("RETURN FROM YIELD"); \
ep = _ep; \
assert(ep); \
TERM_CHECK; \
}

注意,在case LINE 语句之前的所有代码段都处于switch 的case 0 分支内,当调用 TASK_YIELD时,只需要保存当前协程的运行位置,即stack->sp->state = LINE, 如果下次协程被重新调度执行,那又会进行TASK_BEGIN过程内,此时会命中switch过程的 case LINE 位置,这样便可以接着上次挂起的位置继续执行,通过 ep = _ep 语句恢复一下运行上下文。

当一个协程被TASK_YIELD之后依然会处于活动协程队列tasks中,如果活动队列中没有其他的活动事务,可以又被执行;还有一种延迟执行的挂起方式,即首先将协程从活动队列中移除,按照延时设置的时间,放入task_time_q中相对位置(emmmm,比如一个函数设置延时为1秒,另一个为20秒,但是队列中只有着两个,则延迟20秒的会在延迟1秒的执行后立即执行),放入task_time_q中的协程只能通过task_wakeup等函数唤醒,加入到活动队列tasks中去。

此外,还有挂起当前协程的操作TASK_WAIT

1
2
3
4
5
6
#define TASK_WAIT(queue)     \
{ \
TASK_DEBUG("TASK_WAIT"); \
task_wait(stack, queue); \
TASK_YIELD; \
}

本协程调用协程funcall,在funcall返回期望的返回值(0)之前,本协程一直挂起,并让出cpu,当funcall返回0后,本协程等调度后继续执行

1
2
3
4
5
6
7
8
9
10
11
12
13
#define TASK_CALL(funcall)            \
{ \
reset_state(stack); \
TASK_DEBUG("BEFORE CALL"); \
do { \
stack->sp--; \
stack->taskret = funcall; \
stack->sp++; \
TERM_CHECK; \
if (stack->taskret) TASK_YIELD; \
} while (stack->taskret); \
TASK_DEBUG("AFTER CALL"); \
}

本协程被deactivate,只有被重新active时才可以继续执行

1
2
3
4
5
6
#define TASK_DEACTIVATE            \
{ \
TASK_DEBUG("TASK_DEACTIVATE"); \
task_deactivate(stack); \
TASK_YIELD; \
}

协程调度

task_loop为协程管理器,只有当协程退出或者被decactive时,才从列表中删除。。当前活动的协程变量为全局变量stack,在协程YIELD时,会进行压栈操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void task_loop() {
task_env *t = 0;
/* While there are tasks */
for (;;) {
/* check forced exit callback */
if (get_should_exit()) {
terminate_and_exit();
}

t = first_runnable(); // 获取双向循环列表中头结点的下一个结点,由于在task_new时在tasks中采用了前插的方法,即插入到tasks的尾部,因此此时获取最后插入的协程
// 通过判断tasks是否为空判断是否有可以执行的协程
while (runnable_tasks()) {
task_env *next = next_task(t); // 获取下一个
if (!is_task_head(t)) { // 不是tasks的头结点
/*IFDBG(D_NONE, FN; PTREXP(t); STRLIT(t->name ? t->name : "TASK WITH NO
* NAME")); */
stack = t; // 设置当前运行的协程栈为t
assert(stack);
assert(t->terminate != TERMINATED);
{
/* double when = seconds(); */
int val = 0;
assert(t->func);
assert(stack == t);
val = t->func(t->arg); // 通过函数指针调用对应的协程,如果协程退出结束,返回0,如果协程只是暂时挂起,返回1
//printf("corioutine change:%s\n",t->name);
//fflush(stdout);
assert(ash_nazg_gimbatul.type == TYPE_HASH("task_env"));
if (!val) { /* 协程结束 */
deactivate(t); // 调用link_out,将此协程从tasks中删除
t->terminate = TERMINATED;
task_unref(t); // 还会将协程从task all 列表中删除,active_tasks--
stack = NULL;
}
}
}
t = next;
}
if (active_tasks <= 0) break; // 如果没有可以执行的协程的了,直接退出

// 如果可以运行到这里,代表tasks里没有了,但是active_tasks还大于0,需要等待休眠的协程被唤醒

{
double time = seconds();
if (delayed_tasks()) {
.....
task_env *delayed_task = extract_first_delayed(); /* May be NULL */
if (delayed_task) activate(delayed_task); /* Make it runnable */
.....
}
}
task_sys_deinit();
}

4. 基于协程的paxos实现

在xcom中,通过TASK_BEGIN, TASK_YIELD, TASK_DELAY,TASK_WAIT等过程进行协程的开始,挂起,休眠等操作。

xcom初始化时,会新建一系列活动协程,比如:

1
2
3
4
5
6
7
8
9
10
set_task(&executor, task_new(executor_task, null_arg, "executor_task",
XCOM_THREAD_DEBUG));
set_task(&sweeper,
task_new(sweeper_task, null_arg, "sweeper_task", XCOM_THREAD_DEBUG));
set_task(&detector, task_new(detector_task, null_arg, "detector_task",
XCOM_THREAD_DEBUG));
set_task(&alive_t,
task_new(alive_task, null_arg, "alive_task", XCOM_THREAD_DEBUG));
set_task(&cache_task, task_new(cache_manager_task, null_arg,
"cache_manager_task", XCOM_THREAD_DEBUG));

协程创建完成以后,便可以通过task_loop进行调度,以sweeper_task为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int sweeper_task(task_arg arg MY_ATTRIBUTE((unused))) {
DECL_ENV
synode_no find;
END_ENV;
TASK_BEGIN
ep->find = get_sweep_start();
//printf("%lld, %lld\n",(long long)ep->find.msgno, (long long)ep->find.node);
//fflush(stdout);
while (!xcom_shutdown) {
while (synode_lt(ep->find, max_synode) && !too_far(ep->find)) {
//此处会进行判断本节点负责的消息需不需要skip操作
//并进行对应的操作
}
deactivate:
TASK_DEACTIVATE;
}
TASK_END;
}

将宏全部展开后,整个函数变成由switch控制的结构,case 0 和case LINE决定了函数多次进入的时机,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static int sweeper_task(task_arg arg MY_ATTRIBUTE((unused))) {
struct env {
synode_no find;
};

struct env MY_ATTRIBUTE((unused)) * ep
switch (stack->sp->state) {
case 0:
pushp(stack, TASK_ALLOC(stack, struct env));
ep = _ep;
assert(ep);
TERM_CHECK;
ep->find = get_sweep_start();

while (!xcom_shutdown) {
while (synode_lt(ep->find, max_synode) && !too_far(ep->find)) {
//此处会进行判断本节点负责的消息需不需要skip操作
//并进行对应的操作
}
deactivate:
task_deactivate(stack);
stack->sp->state = __LINE__;
return 1;
case __LINE__:
ep = _ep;
assert(ep);
TERM_CHECK;
}
}
stack->sp->state = 0;
stack->where = (TaskAlign *)stack->sp->ptr;
popp(stack);
return 0;
}

5. future

性能优化

多线程改造: 数据结构加锁,仿照协程机制进行线程之间的调用和同步;

借鉴x-paxos:
X-Paxos的服务层是一个基于C++ 11特性实现的多线程异步框架。常见的状态机/回调模型存在开发效率较低,可读性差等问题,一直被开发者所诟病;而协程又因其单线程的瓶颈,而使其应用场景受到限制。C++ 11以后的新版本提供了完美转发(argument forwarding)、可变模板参数(variadic templates)等特性,可以比较方便的实现异步调用模型。

多组xcom:每组单线程,抽离xcom模块中视图变更、故障检测模块,统一封装gcs接口

借鉴 phxpaxos && multi raft:
phxpaxos架构上采用单Paxos单线程设计,但是支持多Paxos分区以扩展多线程能力,其单分区单线程,多实例聚合的方式也提升总吞吐。

6. 简要介绍

xcom可以保证消息在所有节点上以相同的顺序接收,还可以保证,如果一条消息被传递到一个节点,那么它最终也会在所有其他节点上看到。如果至少有一个知道消息值的节点没有崩溃,当崩溃的节点恢复时,xcom可以在这个崩溃的节点上恢复消息。日志记录可以添加到磁盘,以使消息在系统崩溃时持久化,以增加可以缓存的消息数量。但是xcom不能保证来自不同节点的消息顺序,甚至不能保证来自同一节点的多个消息的顺序。只有由客户机在发送下一条消息之前等待一条消息才能保证这样结果。xcom可以通知客户端消息已超时,在这种情况下,xcom将尝试取消消息,但它不能保证不会传递超时的消息。xcom在每个消息传递到客户机时为其附加一个节点集。这个节点集反映了xcom认为是活动的当前节点集,并不意味着消息已经传递到集合中的所有节点。也不意味着消息没有被传递到不在集合中的节点。Paxos状态机的每个实例实现基本的Paxos协议。paxos消息的缓存是一个经典的固定大小的LRU,具有哈希索引。

已经实现了部分mencius算法,主要包括simple paxos部分:

一个节点拥有其自身节点号的所有synode的所有权。只有节点号为N的节点才能为synode{X,N}提出一个值,其中X是序列号,N是节点号。其他节点只能为synode{X,N}提出特殊值no_op。这样做的原因是保留了无领导的Paxos算法,但避免了竞争同一个synode数的节点之间的冲突。在这个方案中,每个节点在正常运行时都有自己唯一的数序列。该方法具有以下含义:

  1. 如果一个节点N还没有为synode{X,N}提出一个值,它可以在任何时候用保留值no_op向其他节点发送学习消息,而不经过Paxos的第1和第2阶段。这是因为其他节点都被限制为不建议这个概要,所以最终的结果始终是no-op,为了避免不必要的消息传输,一个节点将尝试通过携带基本Paxos协议消息上的信息来广播no_op学习消息。

  2. 其他想要找到synode{X,N}值的节点可以通过遵循基本的Paxos算法来获得不可接受的值。结果将是node N提出的实际值(如果它已经提议了),否则最终结果只能是no_op。这通常只在一个节点关闭时才需要,而其他节点需要从丢失的节点中查找值,以便能够继续执行。

消息按顺序发送到客户端,顺序由序列号和节点号决定,序列号是最重要的部分。

xcom模块主要使用以下术语:

节点是xcom线程的实例。代理中只有一个xcom线程实例。

客户机是使用xcom发送消息的应用程序。

线程是真正的操作系统线程。

task是一个逻辑过程。它由协程和显式堆栈实现。task和非阻塞套接字操作的实现在task.h和task.c中是隔离的。

一个节点将打开到其他每个节点的tcp连接。此连接用于节点启动的所有通信,对消息的答复将到达发送消息的连接上。

xcom中主要协程如下:

static int tcp_server(task_arg);

tcp_server监听xcom端口,并在检测到新连接时启动acceptor_learner_task协程。

static int tcp_reaper_task(task_arg);
用于当一个tcp连接长时间被占用时被关闭

static int sender_task(task_arg);

sender_task在其输入队列上等待tcp消息,并在tcp套接字上发送它。如果套接字因任何原因关闭,sender_task将重新连接socket。每个socket都有一个sender_task。sender_task主要是为了简化其他任务中的逻辑,但是它可以被一个协程所取代,该协程在为其client 的task保留了套接字之后处理连接逻辑。
其从队列中获取消息并发送到其他服务器。使用一个单独的队列和任务来执行此操作简化了逻辑,因为其他的task不需要等待发送。

static int generator_task(task_arg);

generator_task从客户机队列读取消息,并将其移动到proposer_task的输入队列中

static int proposer_task(task_arg);

为传入的消息分配一个消息编号,并尝试使其被接受。每个节点上可能有多个proposer tasks并行工作。如果有多个proposer tasks,xcom不能保证消息将按照从客户端接收的相同顺序发送。

static int acceptor_learner_task(task_arg);

这是xcom线程的服务部分。系统中的每个节点都有一个acceptor_learner_task。acceptor learner_任务从套接字读取消息,找到正确的Paxos状态机,并将状态机和消息作为参数发送到正确的消息处理程序。

static int reply_handler_task(task_arg);

reply_handler_task执行与acceptor_learner_task相同的工作,但侦听节点用于发送消息的套接字,因此它将仅处理该套接字上的回复。

static int executor_task(task_arg);

ececutor_task等待接收Paxos消息。当消息被接受时,它被传递到客户端,除非它是一个no-op。在任何一种情况下,executor_task都会进入下一条消息并重复等待。如果等待消息超时,它将尝试接受一个no-op。

static int alive_task(task_arg);

如果有一段时间没有正常通信,则向其他节点发送i-am-alive。它还ping似乎不活动的节点。

static int detector_task(task_arg);

detector_task定期扫描来自其他节点的连接集合,并查看是否存在任何活动。如果有一段时间没有活动,它将假定节点已经宕机,并向客户机发送一条视图变更消息。

重新配置:

xcom重新配置的过程借鉴Lamport在“Reconfiguring a State Machine” 论文中描述的过程, 以此作为R-alpha算法。xcom会立即执行重新配置命令,但是配置只有在alpha消息的延迟之后才有效。

##temp notes

pax machine 缓存机制:

缓存机制的整个缓存结构由以下方式组织:总的数据结构为hash_stack, 由节点类型为stack_machine按照linkage方式构成的双向循环链表,每个stack_machine由维护了一个hash表pax_hash, pax_hash由数据类型为pax_machine链表构成的数组。

1
2
3
4
5
6
struct stack_machine {
linkage stack_link;
uint64_t start_msgno;
uint occupation;
linkage *pax_hash;
};
1
2
3
4
5
/* Paxos machine cache */
struct lru_machine {
linkage lru_link;
pax_machine pax;
};

缓存模块以静态分配的方式分配 pax_machine cache, 除了保存pax_machine的hash_stack之外,还有protected_lru:按照最近使用的顺序跟踪正在使用的pax machine;probation_lru:空闲的列表。

1
2
3
4
5
6
static linkage hash_stack = {0, &hash_stack,
&hash_stack}; /* hash stack 的头结点*/
static linkage protected_lru = {
0, &protected_lru, &protected_lru}; /* 最近使用链表的头结点 */
static linkage probation_lru = {
0, &probation_lru, &probation_lru}; /* 空闲链表的头结点 */

按照消息号获取对应pax_machine时,首先在hash_stack中找到对应的stack_machine, 然后在这个stack_machine的hash表pax_hash中找到对应的pax_nachine,过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
pax_machine *hash_get(synode_no synode) {
/* static pax_machine *cached_machine = NULL; */
stack_machine *hash_table = NULL;

/* if(cached_machine && synode_eq(synode, cached_machine->synode)) */
/* return cached_machine; */

FWD_ITER(&hash_stack, stack_machine, {
/* 在hash_stack 中寻找比synode号小或者等于0的instance*/
if (link_iter->start_msgno < synode.msgno || link_iter->start_msgno == 0) {
hash_table = link_iter;
break;
}
})

if (hash_table != NULL) {
linkage *bucket = &hash_table->pax_hash[synode_hash(synode)];

FWD_ITER(bucket, pax_machine, {
if (synode_eq(link_iter->synode, synode)) {
/* cached_machine = link_iter; */
return link_iter;
}
});
}
return NULL;
}

其中宏 FWD_ITER 为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Forward iterator */
/* 当前节点开始,向后遍历,因为是双向循环链表,总会回到自身 */
#define FWD_ITER(head, type, action) \
{ \
linkage *p = link_first(head); \
while (p != (head)) { \
linkage *_next = link_first(p); \
{ \
type *link_iter = (type *)p; \
(void)link_iter; \
action; \
} /* Cast to void avoids unused variable warning */ \
p = _next; \
} \
}

hash_get中第一个FWD_ITER可以展开为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{ 
linkage *p = link_first(&hash_stack);
while (p != (&hash_stack)) {
linkage *_next = link_first(p);
{
stack_machine *link_iter = (stack_machine *)p; // linkage类型转化为对应的stack_machine类型
(void)link_iter;
{
if (link_iter->start_msgno < synode.msgno ||link_iter->start_msgno == 0) {
hash_table = link_iter;
break;
}
};
}
p = _next;
}
}

在缓存中查找对应消息号的pax machine 时,如果找不到,首先从probation_lru(空闲链表)中查找下一个空闲节点,如果找不到空闲节点,将会从protected_lru中寻找空闲状态的的pax machine,优先从空闲状态的实例取出已经被执行过的节点(通过deliverd_msg判断, deliverd_msg是已经提交给上层客户端的最新消息号,因此比其小的消息都是可以被清理掉的),如果将force参数设置为了true,那么只要是空闲的pax_machine 不管有没有被提交到上层客户端,就会被抢占。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pax_machine *get_cache_no_touch(synode_no synode, bool_t force) {
pax_machine *retval = hash_get(synode);
/* IFDBG(D_NONE, FN; SYCEXP(synode); STREXP(task_name())); */
IFDBG(D_NONE, FN; SYCEXP(synode); PTREXP(retval));
if (!retval) {
lru_machine *l =
lru_get(force); /* Need to know when it is safe to re-use... */
if (!l) return NULL;
IFDBG(D_NONE, FN; PTREXP(l); COPY_AND_FREE_GOUT(dbg_pax_machine(&l->pax)););
/* assert(l->pax.synode > log_tail); */

retval = hash_out(&l->pax); /* 从hash表中删除 */
init_pax_machine(retval, l, synode); /* Initialize */
hash_in(retval); /* Insert in hash table again */
}
IFDBG(D_NONE, FN; SYCEXP(synode); PTREXP(retval));
return retval;
}

协程启动过程

以proposer_task为例,查看协程的启动过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#0  proposer_task (arg=...)
at /home/zhaoguodong/msBuild/mysql-8.0.22/plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_base.cc:1817
#1 0x00007fff8cbea9d5 in task_loop ()
at /home/zhaoguodong/msBuild/mysql-8.0.22/plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/task.cc:1133
#2 0x00007fff8cb8b760 in xcom_taskmain2 (listen_port=24903)
at /home/zhaoguodong/msBuild/mysql-8.0.22/plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_base.cc:1279
#3 0x00007fff8cb70ace in Gcs_xcom_proxy_impl::xcom_init (this=0x7fff38027e10, xcom_listen_port=24903)
at /home/zhaoguodong/msBuild/mysql-8.0.22/plugin/group_replication/libmysqlgcs/src/bindings/xcom/gcs_xcom_proxy.cc:185
#4 0x00007fff8cc1455e in xcom_taskmain_startup (ptr=0x7fff3800f9b0)
at /home/zhaoguodong/msBuild/mysql-8.0.22/plugin/group_replication/libmysqlgcs/src/bindings/xcom/gcs_xcom_control_interface.cc:102
#5 0x000055555a809a7c in pfs_spawn_thread (arg=0x7fff380229e0)
at /home/zhaoguodong/msBuild/mysql-8.0.22/storage/perfschema/pfs.cc:2880
#6 0x00007ffff7bbd6db in start_thread (arg=0x7fff2b7fe700) at pthread_create.c:463
#7 0x00007ffff613aa3f in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

在xcom_taskmain_startup 主要有以下过程:

1
2
3
4
5
Gcs_xcom_proxy *proxy = gcs_ctrl->get_xcom_proxy();

proxy->set_should_exit(false);

proxy->xcom_init(port); // 开启一个新的线程用于xcom初始化

当xcom初始化失败时会不停的重新建立新的线程用于xcom模块。

在xcom_init中:

1
2
3
::xcom_fsm(x_fsm_init, int_arg(0)); // 用于

::xcom_taskmain2(xcom_listen_port);

在 xcom_taskmain2中,注册tcp_server协程来监听socket服务器端连接,每当有新的连接进来,就会创建一个acceptor_learner_task协程来处理该连接的后续消息。task_loop作为协程管理器,会时刻检测可以执行的协程,并使其执行

1
2
3
4
task_new(tcp_server, int_arg(tcp_fd.val), "tcp_server", XCOM_THREAD_DEBUG); // 注册tcp_server协程来监听socket服务器端连接
task_new(tcp_reaper_task, null_arg, "tcp_reaper_task", XCOM_THREAD_DEBUG);

task_loop(); // 会循环不停的进行协程的切换和运行

在tcp_server 协程中,主要逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int tcp_server(task_arg arg) {

G_MESSAGE(
"XCom initialized and ready to accept incoming connections on port %d",
xcom_listen_port);
do {

// 调用accept_tcp协程,等待新的连接到来
TASK_CALL(accept_tcp(ep->fd, &ep->cfd));

// acceptor_learner_task协程来处理该连接的后续消息
task_new(acceptor_learner_task, int_arg(ep->cfd), "acceptor_learner_task", XCOM_THREAD_DEBUG);
} while (!xcom_shutdown && (ep->cfd >= 0 || ep->refused));

}

task_loop作为整个协程机制的调度器,主要通过循环的方式从协程栈中恢复协程上下文,调度等待执行的协程继续执行,在 task_loop中,主要逻辑如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for (;;) {
...
// 获取可以执行的task
t = first_runnable();
while (runnable_tasks()) {
// 获取下一个可以执行的协程
task_env *next = next_task(t);
if (!is_task_head(t)) {
{
val = t->func(t->arg);
}
}
t = next;
}
...
}

PostgreSQL内核分析教程

前言

现有的关系型数据库在系统稳定性,安全性等方面积累数十年的优势,如何改进现有的关系型数据库使之适用于当今的OLAP,机器学习(如提高单机大规模神经网络训练的能力)场景具有较大的潜力。目前为止Tidb的Tiflash并未开源(据说其论文目前于VLDB2020在审),PostgreSQL作为开源关系数据库中的王者,其内核值得学习,虽然现在一些工作已经对pg做出改进,但是spark引领的系统展现出势不可挡的趋势,期待传统关系型数据库重新复盘审视自己的设计模式,以总结出更多的经验,展现出更强大的生命力。

学习资料

1. 系统概述

1.1 简介及发展历程

2. Postgresql 体系结构

image

2.1系统表

系统表存储了postgresql中所有数据对象及其属性的描述信息和对象之间的关系的描述信息,对象属性的自然语言含义和数据库状态信息的变化历史。

2.1.1 主要系统表

编号 系统表 功能
1 pg_namespace 存储所有数据对象的名字空间,比如数据库,表,索引,视图
2 pg_tablespace 存储表空间信息,所有数据库共享一份pg_tablespace,有助于磁盘存储布局。
3 pg_dataspace 存储数据库信息
4 pg_class 存储表空间信息,所有数据库共享一份pg_tablespace,有助于磁盘存储布局。

2.1.1 系统视图

由系统表生成,用于访问系统内部信息。

2.2 数据集蔟

  • 定义: 用户数据库和系统数据库的集合。

  • 数据库是具有特殊文件名,存储位置等属性信息的文件集合。

  • OID是一个无符号整数,用于唯一标识数据集蔟中的所有对象,包括数据库,表,索引,视图,元组,类型等。其分配由一个全局计数器管理,互斥锁访问。

2.2.1 initdb的使用

使用postgresql之前用于初始化数据集蔟的程序,负责创建数据库系统目录,系统表,模板数据库。

3. 存储管理

3.2 外存管理

3.2.4 空闲空间映射表FSM

  • 随着表不断删除和插入元组(record),文件块中会产生空闲空间。
  • 插入元组时优先插入到表的空闲空间中。
  • 每个表文件都有一个空闲空间映射表文件:关系表OID_fsm。
  • FSM按照最大堆二叉树的形式保存空闲空间,能够使空闲空间最大值提升到根节点,这样只需要判断根节点是否满足需求,那可知是否有没有满足需求的空闲空间。

3.2.5 可见性映射表VM

  • VM表用来加快VACUUM(快速清理操作)查找无效元组文件块的过程,为表的每一个文件块设置了标志位,用来标记该文件块是否存在无效元组。

3.2.5 大数据存储

Postgresql提供了两种大数据存储方式:

  • TOAST:使用数据压缩和线外存储实现。
  • 大对象机制:使用专门的系统表存储。
3..2.5.1 TOAST
  • TOAST机制主要优点在于查询时可以有效的减少占用的存储空间。
    因为在查询时只比较压缩后的数据。
  • 同样,压缩后的数据可以放到内存中,加速排序的速度。

    3.3 内存管理

  • C++内存池的简单原理及实现

4. 索引

  • 在postgre里,postgre的查询规则器会自动优化和选择索引

4.1 概述

5. 查询编译

查询模块框架和主要流程如下:
image

5.2 查询分析

查询分析主要流程:
image

5.2.1 lex 和 yacc简介

  • lex进行词法分析(正则表达式)
  • yacc进行语法分析(BNF范式)
  • 源代码经过lex进行词法分析提取关键词,传入yacc中进行语法分析,进行相应的操作,如加减乘除。

5.5.2 词法和语法分析

  • scan.l:lex文件用于识别SQL的关键字
  • gram.y:yacc用于分析词法分析后的语法
  • 本节将主要分析SQL结构和经过语法分析后在内存中的数据结构

image

以 select语句为例,select语句可以由简单的select语句或者接where,sort等语句构成的复杂语句,


持续更新ing…

日志

远程隐藏

2020/–/–

2020/–/–

2020/–/–

流系统漫游(持续更新中...)

本文从谷歌Dataflow system论文的角度初探streaming system。谷歌从GFS、MapReduce开始,到目前的分布式操作系统Borg和流系统,大部分的论文都是从描述现实的应用场景出发,然后提出解决方案。其本质是需求驱动技术的发展。目前大部分的科研是:提出一种方法,解决一个问题,方法的效果。与单纯的科研驱动发展不同比如数理化,单纯的研究最开始时是没有实用价值的,但后来才被自动化机器学习等专业应用后证明其应用潜力。

1. 应用场景

一个流媒体视频提供商(Google)想要通过显示视频广告和向广告客户(Apple)收取观看广告的费用来实现广告变现。

该平台(YOutube)支持内容和广告的在线和离线查看。视频提供商想知道每天每个广告客户需要支付多少费用,并收集有关视频和广告的统计数据。此外,他们还想对大量的历史数据进行有效的离线实验。

广告商/内容提供商想要知道他们的视频被观看的频率和时间,观看的内容/广告是什么,观看的人群是什么。他们也想知道他们要付多少钱。他们希望尽可能快地获得所有这些信息,这样他们就可以调整预算和投标,改变目标,调整活动,并尽可能实时地规划未来的方向。因为涉及到钱,所以正确性是最重要的。

虽然数据处理系统本质上是复杂的,但是视频提供商需要一个简单而灵活的编程模型。最后,由于互联网极大地扩展了任何可以沿着其主干分布的业务的范围,它们还需要一个能够处理全球范围内散居的数据的系统。

对于这样的用例,必须计算的信息本质上是每个视频观看的时间和长度、谁观看了它,以及它与哪个广告或内容配对(即每个用户、每个视频观看会话)。从概念上讲,这很简单,但是现有的模型和系统都不能满足规定的需求。

2. 平台

Flink是一个流系统,在德语中, Flink 一词表示快速和灵巧,项目采用一只松鼠的彩色图案作为 logo,
这不仅是因为松鼠具有快速和灵巧的特点,还因为柏林的松鼠有一种迷人的红棕色,
而 Flink 的松鼠 logo 拥有可爱的尾巴,尾巴的颜色与 Apache 软件基金会的 logo 颜
色相呼应,也就是说,这是一只 Apache 风格的松鼠。

Flink 项目的理念是:“ Apache Flink 是为分布式、高性能、随时可用以及准确
的流处理应用程序打造的开源流处理框架”。
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。 Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模
来执行分布式计算。

3. Flink架构

image

4.1 事件驱动型(Event-driven)

应用从一个或多个事件流提取数据,并
根据到来的事件触发计算、状态更新或其他外部动作。比较典型的就是以 kafka 为
代表的消息队列几乎都是事件驱动型应用(但是与Sparkstreaming不同)。

image

4.2 流与批的世界观

  • 批处理的特点是有界、持久、大量, 非常适合需要访问全套记录才能完成的计
    算工作,一般用于离线统计。
  • 流处理的特点是无界、实时, 无需针对整个数据集执行操作,而是对通过系统
    传输的每个数据项执行操作,一般用于实时统计。

在 spark 的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实
时数据是由一个一个无限的小批次组成的。

而在 flink 的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数
据是一个没有界限的流,这就是所谓的有界流和无界流。

5. Flinkj架构

只要由作业管理器(JobManager)、资源管理器(ResourceManager)、任务管理器(TaskManager),
以及分发器(Dispatcher)组成。

JogManager

控制应用程序执行的主进程,每个应用程序都会被一个不同的
JobManager 所控制执行。 JobManager 会先接收到要执行的应用程序, 这个应用程序会包括:
作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它
资源的 JAR 包。 JobManager 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫做
“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。 JobManager 会向资源管
理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上
的插槽( slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的
TaskManager 上。而在运行过程中, JobManager 会负责所有需要中央协调的操作,比如说检
查点(checkpoints)的协调。

ResourceManager

主要负责管理任务管理器(TaskManager)的插槽(slot), TaskManger 插槽是 Flink 中
定义的处理资源单元。 Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如
YARN、 Mesos、 K8s,以及 standalone 部署。当 JobManager 申请插槽资源时, ResourceManager
会将有空闲插槽的 TaskManager 分配给 JobManager。如果 ResourceManager 没有足够的插槽
来满足 JobManager 的请求,它还可以向资源提供平台发起会话,以提供启动 TaskManager
进程的容器。另外, ResourceManager 还负责终止空闲的 TaskManager,释放计算资源。

TaskManager

通常在 Flink 中会有多个 TaskManager 运行,每一个 TaskManager
都包含了一定数量的插槽(slots)。插槽的数量限制了 TaskManager 能够执行的任务数量。
启动之后, TaskManager 会向资源管理器注册它的插槽;收到资源管理器的指令后,
TaskManager 就会将一个或者多个插槽提供给 JobManager 调用。 JobManager 就可以向插槽
分配任务(tasks)来执行了。在执行过程中,一个 TaskManager 可以跟其它运行同一应用程
序的 TaskManager 交换数据。

else

实践

flink安装

未来数据库趋势-Tidb

引言

  • 未来数据库趋势:大一统,就像《魔戒》中说的 :ONE RING TO RULE THEM ALL,就是一套解决方案去解决扩展性,ACID,高性能,稳定性,大数据,机器学习计算能力(HATP+分布式)。

  • 目前大多数的科研论文陷入跑分思维,在一个特定的 Workload下,然后把 Oracle 摁在地上摩擦,这样的论文有很多。但是大家回头看看 Oracle 还是王者。

  • 紧跟现实应用场景(这是典型的工程思维,科研应该在理论上有所突破)

未来发展的基石

  • 硬件的发展:SDD,多核CPU,万兆网卡,虚拟化。
  • 数据库这个行业里面很多的假设,在现在新的硬件的环境下其实都是不成立的。
  • 为什么 B-Tree 就一定会比 LSM-Tree 要快呢?不一定,我跑到 Flash 或者 NVMe SSD 、Optane 甚至未来的持久化内存这种介质上,那数据结构设计完全就发生变化了。过去可能需要投入很多精力去做的数据结构,现在可以采用暴力算法。
  • 分布式理论的发展,如raft

未来趋势

1. log is the new database

  • Hyper实验中正常的 SQL 语句的执行时间,比如说直接把一语句放到另外一个库里去执行,耗时最多。逻辑日志存放时,耗时大概能快 23%,存放物理日志时能快 56%。所以TiDB 里的 TiFlash 其实同步的是 Raft 日志,而并不是同步 Binlog 或者其他。
  • Aurora同步的是 redo log 。其实他的好处也很明显,也比较直白,就是 I/O 更小,网络传输的 size 也更小,所以就更快。
  • 多raft 组

    2. Vectorized

  • TiDB SQL 引擎用 Volcano 模型,这个模型遍历一棵物理计划的树,不停的调 Next,每一次 Next 都是调用他的子节点的 Next,然后再返回结果。这个模型有几个问题:第一是每一次都是拿一行,导致 CPU 的 L1、L2 缓存利用率很差,没有办法利用多 CPU 的 Cache。第二,在真正实现的时候,它内部的架构是一个多级的虚函数调用。大家知道虚函数调用在 Runtime 本身的开销是很大的,在《MonetDB/X100: Hyper-Pipelining Query Execution》(http://cidrdb.org/cidr2005/papers/P19.pdf) 里面提到,在跑 TPC-H 的时候,Volcano 模型在 MySQL 上跑,大概有 90% 的时间是花在 MySQL 本身的 Runtime 上,而不是真正的数据扫描。所以这就是 Volcano 模型一个比较大的问题。第三,如果使用一个纯静态的列存的数据结构,大家知道列存特别大问题就是它的更新是比较麻烦的, 至少过去在 TiFlash 之前,没有一个列存数据库能够支持做增删改查。那在这种情况下,怎么保证数据的新鲜?这些都是问题。
  • TiDB SQL 引擎的 Volcano 模型,已经从一行一行变成了一个 Chunk 一个 Chunk,每个 Chunk 里面是一个批量的数据,所以聚合的效率会更高。
  • TiDB 中算子推到 TiKV 来做, TiKV会成为一个全向量化的执行器的框架。

3. Workload Isolation

  • 尽可能地把 OLTP 跟 OLAP 隔离开。
  • Google Spanner做le一个新的数据结构,来替代 Bigtable-Like SSTable 数据结构,这个数据结构叫 Ressi《Spanner: Becoming a SQL System》。其实表面上看还是行存,但内部也是一个 Chunk 变成列存这样的一个结构。
  • 但即使是换一个新的数据结构,也没有办法很好做隔离,因为毕竟还是在一台机器上,在同一个物理资源上。最彻底的隔离是物理隔离。

TiFlash 用了好几种技术来去保证数据是更新的。一是增加了 Raft Leaner,二是把 TiDB 的 MVCC 也实现在了 TiFlash 的内部。第三在 TiFlash 接触了更新(的过程),在 TiFlash 内部还有一个小的 Memstore,来处理更新的热数据结果,最后查询的时候,是列存跟内存里的行存去 merge 并得到最终的结果。

  • TiFlash 的核心思想就是通过 Raft 的副本来做物理隔离。这个有什么好处呢?这是我们今天给出的答案,但是背后的思考,到底是什么原因呢?为什么我们不能直接去同步一个 binlog 到另外一个 dedicate 的新集群上(比如 TiFlash 集群),而一定要走 Raft log?最核心的原因是,我们认为 Raft log 的同步可以水平扩展的。因为 TiDB 内部是 Mult-Raft 架构,Raft log 是发生在每一个 TiKV 节点的同步上。
  • 大家想象一下,如果中间是通过 Kafka 沟通两边的存储引擎,那么实时的同步会受制于中间管道的吞吐。比如一部分一直在更新,另一边并发写入每秒两百万,但是中间的 Kafka 集群可能只能承载 100 万的写入,那么就会导致中间的 log 堆积,而且下游的消费也是不可控的。而通过 Raft 同步, Throughput 可以根据实际存储节点的集群大小,能够线性增长。这是一个特别核心的好处。

4. SIMD

  • 现代的 CPU 会支持一些批量的指令,比如像 _mm_add_epi32,可以一次通过一个 32 位字长对齐的命令,批量的操作 4 个累加。看上去只是省了几个 CPU 的指令,但如果是在一个大数据量的情况下,基本上能得到 4 倍速度的提升。
  • I/O不是瓶颈,未来的瓶颈在于CPU
  • 怎么去用新的硬件,去尽可能的把计算效率提升,这是未来数据库发展的重点。比如说我怎么在数据库里 leverage GPU 的计算能力,因为如果 GPU 用的好,其实可以很大程度上减少计算的开销。
  • 所以,如果在单机 I/O 这些都不是问题的话,下一个最大问题就是怎么做好分布式,这也是为什么Tidb一开始就选择了一条看上去更加困难的路:做一个 Share-nothing 的数据库,并不是像 Aurora 底下共享一个存储。

5. Dynamic Data placement

  • 今天其实看不到未来十年数据增长是怎样的,十年前大家不能想到现在我们的数据量有这么大。

  • 所以新的架构或者新的数据库,一定要去面向我们未知的 Scale 做设计。比如大家想象现在有业务 100T 的数据,目前看可能还挺大的,但是有没有办法设计一套方案去解决 1P、2P 这样数据量的架构?

  • 在海量的数据量下,怎么把数据很灵活的分片是一个很大的学问。

  • 分库分表的 Router 是静态的。如果出现分片不均衡,比如业务可能按照 User ID 分表,但是发现某一地方 / 某一部分的 User ID 特别多,导致数据不均衡。

  • TiDB 彻底把分片从数据库里隔离了出来,放到了另外一个模块里。分片应该是根据业务的负载、根据数据的实时运行状态,来决定这个数据应该放在哪儿。这是传统的静态分片不能相比的,不管传统的用一致性哈希,还是用最简单的对机器数取模的方式去分片(都是不能比的)。

  • 在这个架构下,甚至未来我们还能让 AI 来帮忙。把分片操作放到 PD 里面,它就像一个 DBA 一样,甚至预测 Workload 给出数据分布操作。比如课程报名数据库系统,系统发现可能明天会是报名高峰,就事先把数据给切分好,放到更好的机器上。这在传统方案下是都需要人肉操作,其实这些事情都应该是自动化的。

  • Dynamic Data placement 好处首先是让事情变得更 flexible ,对业务能实时感知和响应。另外还有一点,为什么我们有了 Dynamic Placement 的策略,还要去做 Table Partition。Table Partition相当于业务已经告诉我们数据应该怎么分片比较好,我们还可以做更多针对性的优化。这个 Partition 指的是逻辑上的 Partition ,是可能根据你的业务相关的,比如说一张表存着 2018 年的数据,虽然还是 TiDB 通过 PD 去调度,但是我知道你 Drop 这个 Table 的时候,一定是 Drop 这些数据,所以这样会更好,而且更加符合用户的直觉。但这样架构仍然有比较大的挑战。当然这个挑战在静态分片的模型上也都会有。

  • 比如说围绕着这个问题,怎么更快的发现数据的热点,比如说我们的调度器,如果最好能做到,比如突然来个秒杀业务,我们马上就发现了,就赶紧把这块数据挪到好的机器上,或者把这块数据赶紧添加副本,再或者把它放到内存的存储引擎里。这个事情应该是由数据库本身去做的。所以为什么我们这么期待 AI 技术能够帮我们,是因为虽然在 TiDB 内部,用了很多规则和方法来去做这个事情,但人工的规则不是万能的。

6. Storage and Computing Seperation

说存储计算分离本质:存储依赖的物理资源,跟计算所依赖的物理资源并不一样。

  • 比如计算可能需要很多 CPU,需要很多内存来去做聚合,存储节点可能需要很多的磁盘和 I/O,如果全都放在一个组件里 ,调度器就会很难受:我到底要把这个节点作为存储节点还是计算节点?
  • 在这块,可以让调度器根据不同的机型(来做决定),是计算型机型就放计算节点,是存储型机型就放存储节点。

7. Everything is Pluggable

  • 每一层我们未来都会去对外暴露一个非常抽象的接口,能够去 leverage 不同的系统的好处。 F1 Query 这篇论文,基本表述了一个大规模的分布式系统的期待,架构的切分非常漂亮。

8. Distributed Transaction

  • ACID 事务肯定是必要的,除了 Google 用了原子钟,Truetime 非常牛。
  • 当然,时间戳,不管是用硬件还是软件分配,仍然是现今的最好方法。
  • 因为如果要摆脱中心事务管理器,时间戳还是很重要的。所以在这方面的挑战就会变成:怎么去减少两阶段提交带来的网络的 round-trips?或者如果有一个时钟的 PD 服务,怎么能尽可能的少去拿时间戳?
  • Tidb把 Percolator 模型做了一些优化,能够在数学上证明,可以少拿一次时钟数学证明的过程已经开源,用TLA+ 数学工具做了证明( https://github.com/pingcap/tla-plus/blob/master/OptimizedCommitTS/OptimizedCommitTS.tla)。
  • Follower Read。很多场景读多写少,读的业务压力很多时候是要比写大很多的,Follower Read 能够帮我们线性扩展读的性能,而且在我们的模型上,因为==没有时间戳== ,所以能够在一些特定情况下保证不会去牺牲一致性。

9. Cloud-Native Architecture

  • 多租户没有做在 TiDB 的系统内部,因为其设计理念是「数据库就是数据库」,它并不是一个操作系统,不是一个容器管理平台。
  • 模块和结构化是更清晰一种==软件工程==的方式。
  • Kubernetes 在这块已经做的足够好了,未来 K8s 会变成集群的新操作系统,会变成一个 Linux。比如说如果单机时代做一个数据库,会在数据库里面内置一个操作系统吗?肯定不会。
  • 所以在模块抽象的边界,可以依赖 K8s 。《Large-scale cluster management at Google with Borg》这篇论文里面提到了一句话,BigTable 其实也跑在 Borg 上。

reference

docker安装Spark

疫情期间身边只有笔记本,想学习一哈spark(因为正在研究declarative ml,要看SystemML的论文),配置环境太麻烦,因此想用docker pull 一下网上现成的镜像,由于是私人hub,docker直接pull时还是网络慢到让人崩溃。。。采用github上的自己build,由于国内网络原因,安装了n次,好几个g的镜像,简直让人崩溃。

过了n天后,偶然想起阿里云提供了免费的doker 仓库并支在线build。真是爽歪歪。。。

一句话总结整个过程的安装方式:利用github上的Dockerfile,使用阿里云提供的容器镜像服务,构建对应的 Docker镜像,从阿里云中pull到本地。

从dockerfile 到 build 全部在线完成,没有网络问题,速度也杠杠的,不得不感谢阿里云云计算的NB(因为免费,所以感谢,所以NB,haha)。

现在记录一下过程:

人生之不如意,十有八九。下面记载我直接build过程会遇到如下错误(如果用github+阿里云,都会遇到下面的问题):

image

找不到epel安装源,大概是/etc/yum.repos.d/epel.repo 中镜像列表是https的问题,所以我直接修正并新建了一个epel.repo文件,build时替换掉原先的文件。

  • 此外按照原先的rpm安装epel方式在安装R语言时会遇到依赖问题,因此我直接把Dockerfile中的

    1
    RUN rpm -ivh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm

    修改为

    1
    RUN yum -y install epel-release

    因为镜像在build的过程中不能和命令交互,因此yum时别忘了参数 ==-y==。

  • 因为镜像太大,阿里云build时成功率不会是百分之百,比如:
    image

具体原因是:

image

所以多尝试几次就好了(毕竟免费,免费)。。。

修改完后可以直接build成功的github仓库可以直接fork:https://github.com/kisisjrlly/docker-spark/

然后在本机上可以直接pull阿里云上build好的镜像啦,然后就可以愉快的玩耍啦。。。

ref

Spark

Introduction

spark:hadoop的升级版,通过定义新的数据抽象解决现有的问题,类似于数据结构中有数组,高维数组,集合,map,但是现有的数据结构不适用于特定的问题,spark抛开能想到的基础数据结构,重新定义新的数据抽象解决了现有以Hadoop为基础的大数据平台的特定问题。

  • 现有的大数据平台的缺点:
    • Hadoop:保存到中间文件写磁盘,且不能重复利用中间数据,较多的磁盘IO等overead。
    • 存在对hadoop的改进版本系统:Pregel,然而,这些框架只支持特定的计算模式(例如,循环一系列MapReduce步骤),并隐式地为这些模式执行数据共享。它们不提供更一般重用的抽象,例如,让用户将多个数据集加载到内存中,并在其中运行特别的查询。
  • spark的解决方法
    • 定义数据抽象RDD:具有容错,数据并行执行,数据可显示保存到内存中去,优化的数据存放策略和丰富的API的特性。
    • 设计RDD主要的挑战是如何有效的容错(Google 的GFS,MapReduce 本质上是解决FT问题,因此引领大数据技术的发展)。
      • 现有的基于传统数据库的容错方式:replication+log overhead仍然较高,RDD可以通过重新计算(Lineage)的方式恢复(本质上的方法类似造火箭:我知道你是如何造出来的,你坏掉了我重新再造一个出来就好了)。
      • 但是Lineage计算较长时还是会用到log。
      • Spark设计目的专注于批处理分析的高效编程模型,不适用于对共享变量进行异步细粒度更新的应用程序,比如分布式InMemoryDB,对于这些系统的工作还是交给RamCloud好了。。。

scala

是一种多范式的编程语言,类似于java,python编程,设计初衷是要集成面向对象编程和函数式编程的各种特性。

什么是RDD

RDD是一种数据抽象:弹性分布式数据集合(Resilient Distributed Dataset),是spark的基本元素,不可变,可分区,里面的元素可以被分布式并行执行(对码农透明)。

  • Resilient
    • 代表数据可以存在内存也可以存到磁盘,计算快
  • Distributed
    • 一个RDD被分布式存储,容错,又可以分布式并行计算
  • Dataset
    • 类似于Hadoop中的文件,是一种抽象的分布式数据集合

RDD五大特性

具有以下性质:

  • A list of partitions
    • 一个RDD有多个分区,是一组分区列表,spark的task是以分区为单位,每个分布对应一个task线程(并行计算)。运行再worker节点的executor线程中。
  • A function for computing each split
    • 函数会同时作用在所有的分区上。
  • A list of dependencies on other RDDs
    • 新产生的RDD依赖于前期的存在的RDD(RDD被保存在内存中,可以被重复使用;可以实现无checkpoint+log的Fault tolerance机制)
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    • (可选项)对于KV类型的RDD可以hashparition存储(必须产生shffule),如果不是KV类型—-就表示木有
    • 在spark中,有两种分区函数:
      • 第一种:HashPationer函数 对key去hashcode,然后对分区数取余得到对应的分区号———-> key.hashcode % 分区数 = 分区号 。
      • 第二种:RangeParitoner函数,按照一定的范围进行分区,相同范围的key会进入同一个分区。(A-H)—-> 1号分区,(I-Z)—-> 2号分区。
    • Optionally, a list of preferred locations to compute each split on (e.g. block locations for
      an HDFS file)
      • (可选项) 一组最有的数据库位置列表:数据的本地性,数据的位置最优
        • spark后期任务计算优先考虑存在数据的节点开启计算任务,也就是说数据在哪里,就在哪里开启计算任务,大大减少网络传输。

以单词划分为例:
image

RDD 算子操作分类

    1. transformation(转换)
    • 它可以把一份RDD转换生成一个心的Rdd,延迟加载,不会立即触发任务的真正运行
    • 比如flatMap/map/reduceByKey
    1. action(动作)
    • 会触发任务的真正运行
    • 比如collect/SaveAsTextFile

RDD的依赖关系

image

    1. narrow dependencies 窄依赖
    • where each partition of the parent RDD is used by at most one partition of the child RDD :父RDD的每一个分区至多只被子RDD的一个分区使用
    • 如map/filter/flaMap
    • 不会产生shuffle
    1. wide dependencies 宽依赖
    • where multiple child partitions may depend on it:子RDD的多个分区会依赖于父RDD的同一个分区
    • 比如reduceByKey
    • 会产生shuffle
    1. lineage 血统
    • RDD的产生路径
    • 有向无环图,后期如果某个RDD的分区数据丢失,可以通过lineage重新计算恢复。

RDD的缓存机制

可以把RDD的数据缓存在内存或者磁盘中,后期需要时从缓存中取出,不用重新计算。

  • 可以设置不同的缓存级别,如DISK_ONLY,DISK_ONLY_2,MEMORY_AND_DISK_SER_2(内存和磁盘都保存两份并序列化)
  • 对计算复杂的RDD设置缓存。

DAG的构建和构建stage

  • lineage
    • 它是按照RDD之间的依赖生成的有向无环图
  • stage
    • 后期会根据DAG划分stage:从图的lowest节点往前,构建初始stage,往前遍历DAG,如果是窄依赖,则加入此stage,如果是宽依赖则构建新的stage。
    • stage也会产生依赖:前面stage中task差生的数据流入后面stage中的task去。
    • 划分stage的原因
      • 由于一个job任务中可能会有大量的宽依赖,由于宽依赖不会产生shufflw,宽依赖会产生shuffle。划分完stage后,在同一个stage中只有窄依赖,则可以对应task并行执行: 所有的stage中由并行执行的task组成。
  • App,job,Stage,Task之间的关系:
    • application 是spark的一个应用程序,包含了客户端写好的代码以及任务运行时所需要的资源信息。后期一个app中有多个action操作,每个action对应一个job,一个job由产生了多个stage,每一个stage内部有很多并行运行的task构成的集合。

ref

else

spark内存计算

  • © 2015-2021 John Doe
  • PV: UV:

请我喝杯咖啡吧~

微信