<p data-nodeid="83336">我们在前面几讲介绍了 etcd 存储、etcd-raft 模块以及 MVCC 多版本控制实现的原理。今天我们继续介绍 etcd 中事务的实现。</p>
<p data-nodeid="83337">在业务场景中，一般我们希望无论在什么样的故障场景下，一组操作要么同时完成，要么都失败。etcd 就实现了<strong data-nodeid="83459">在一个事务中，原子地执行冲突检查、更新多个 keys 的值</strong>。除此之外，etcd 将底层 MVCC 机制的版本信息暴露出来，根据版本信息封装出了一套基于乐观锁的事务框架 STM，并实现了不同的隔离级别。</p>
<p data-nodeid="83338">这一讲我们就来详细了解 etcd 事务的概念、基本使用和 STM 事务的隔离级别。</p>
<h3 data-nodeid="83339">什么是事务？</h3>
<p data-nodeid="83340">事务通常是指数据库事务。事务具有 ACID 特性，即<strong data-nodeid="83467">原子性、一致性、隔离性和持久性</strong>。</p>
<ul data-nodeid="83341">
<li data-nodeid="83342">
<p data-nodeid="83343">原子性（Atomicity）：事务作为一个整体被执行，其包含的对数据库的操作要么全部被执行，要么都不执行。</p>
</li>
<li data-nodeid="83344">
<p data-nodeid="83345">一致性（Consistency）：事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态的含义是数据库中的数据应满足完整性约束。</p>
</li>
<li data-nodeid="83346">
<p data-nodeid="83347">隔离性（Isolation）：多个事务并发执行时，一个事务的执行不应影响其他事务的执行。</p>
</li>
<li data-nodeid="83348">
<p data-nodeid="83349">持久性（Durability）：一个事务一旦提交，它对数据库的修改应该永久保存在数据库中。</p>
</li>
</ul>
<p data-nodeid="83350">常见的关系型数据库如 MySQL ，其 InnoDB 事务的实现基于锁实现数据库事务。事务操作执行时，需要获取对应数据库记录的锁，才能进行操作；如果发生冲突，事务会阻塞，甚至会出现死锁。在整个事务执行的过程中，客户端与 MySQL 多次交互，MySQL 为客户端维护事务所需的资源，直至事务提交。而 etcd 中的事务实现则是基于<strong data-nodeid="83477">CAS</strong>（Compare and Swap，即比较并交换） 方式。</p>
<p data-nodeid="83351">etcd 使用了不到四百行的代码实现了迷你事务，其对应的语法为<code data-backticks="1" data-nodeid="83479">If-Then-Else</code>。<strong data-nodeid="83485">etcd 允许用户在一次修改中批量执行多个操作</strong>，即这一组操作被绑定成一个原子操作，并共享同一个修订号。其写法类似 CAS，如下所示：</p>
<pre class="lang-go" data-nodeid="83352"><code data-language="go">Txn().If(cond1, cond2, ...).Then(op1, op2, ...,).Else(op1, op2)
</code></pre>
<p data-nodeid="83353">根据上面的实现，其实很好理解事务实现的逻辑。如果 If 冲突判断语句为真，对应返回值为 true，Then 中的语句将会被执行，否则执行 Else 中的逻辑。</p>
<p data-nodeid="84931" class="">在 etcd 事务执行过程中，客户端与 etcd 服务端之间没有维护事务会话。冲突判断及其执行过程作为一个原子过程来执行，因此 <strong data-nodeid="84937">etcd 事务不会发生阻塞</strong>，无论事务执行成功还是失败都会返回。当发生冲突导致执行失败时，需要应用进行重试，业务代码需要考虑这部分的重试逻辑。</p>

<h3 data-nodeid="83355">etcd 事务的使用示例</h3>
<p data-nodeid="83356">我们来演示一个转账的过程，发送者向接收者发起转账事务。etcd 的事务基于乐观锁检测冲突并重试，检测冲突时使用了<strong data-nodeid="83499">ModRevision</strong>进行校验，该字段表示某个 key 上一次被更改时，全局的版本是多少。因此，我们实现转账业务的流程如下所示：</p>
<p data-nodeid="85572" class=""><img src="https://s0.lgstatic.com/i/image6/M01/16/07/CioPOWBF7_KAN6nXAAArzi47oUU475.png" alt="Drawing 0.png" data-nodeid="85575"></p>

<p data-nodeid="83358">在 etcd 中的实现代码如下所示：</p>
<pre class="lang-go" data-nodeid="83359"><code data-language="go"><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">txnTransfer</span><span class="hljs-params">(etcd *v3.Client, sender, receiver <span class="hljs-keyword">string</span>, amount <span class="hljs-keyword">uint</span>)</span> <span class="hljs-title">error</span></span> {
	<span class="hljs-comment">// 失败重试</span>
	<span class="hljs-keyword">for</span> {
		<span class="hljs-keyword">if</span> ok, err := doTxn(etcd, sender, receiver, amount); err != <span class="hljs-literal">nil</span> {
			<span class="hljs-keyword">return</span> err
		} <span class="hljs-keyword">else</span> <span class="hljs-keyword">if</span> ok {
			<span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span>
		}
	}
}
<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">doTxn</span><span class="hljs-params">(etcd *v3.Client, sender, receiver <span class="hljs-keyword">string</span>, amount <span class="hljs-keyword">uint</span>)</span> <span class="hljs-params">(<span class="hljs-keyword">bool</span>, error)</span></span> {
	<span class="hljs-comment">// 第一个事务，利用事务的原子性，同时获取发送和接收者的余额以及 ModRevision</span>
	getresp, err := etcd.Txn(context.TODO()).Then(v3.OpGet(sender), v3.OpGet(receiver)).Commit()
	<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
		<span class="hljs-keyword">return</span> <span class="hljs-literal">false</span>, err
	}
	senderKV := getresp.Responses[<span class="hljs-number">0</span>].GetResponseRange().Kvs[<span class="hljs-number">0</span>]
	receiverKV := getresp.Responses[<span class="hljs-number">1</span>].GetResponseRange().Kvs[<span class="hljs-number">1</span>]
	senderNum, receiverNum := toUInt64(senderKV.Value), toUInt64(receiverKV.Value)
	<span class="hljs-comment">// 验证账户余额是否充足</span>
	<span class="hljs-keyword">if</span> senderNum &lt; amount {
		<span class="hljs-keyword">return</span> <span class="hljs-literal">false</span>, fmt.Errorf(<span class="hljs-string">"资金不足"</span>)
	}
	<span class="hljs-comment">// 发起转账事务，冲突判断 ModRevision 是否发生变化</span>
	txn := etcd.Txn(context.TODO()).If(
		v3.Compare(v3.ModRevision(sender), <span class="hljs-string">"="</span>, senderKV.ModRevision),
		v3.Compare(v3.ModRevision(receiver), <span class="hljs-string">"="</span>, receiverKV.ModRevision))
	txn = txn.Then(
		v3.OpPut(sender, fromUint64(senderNum-amount)), <span class="hljs-comment">// 更新发送者账户余额</span>
		v3.OpPut(receiver, fromUint64(receiverNum-amount))) <span class="hljs-comment">// 更新接收者账户余额</span>
    resp, err := txn.Commit()         <span class="hljs-comment">// 提交事务</span>
	<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
		<span class="hljs-keyword">return</span> <span class="hljs-literal">false</span>, err
	}
	<span class="hljs-keyword">return</span> resp.Succeeded, <span class="hljs-literal">nil</span>
}
</code></pre>
<p data-nodeid="83360">etcd 事务的实现基于乐观锁，涉及两次事务操作，第一次事务利用<strong data-nodeid="83513">原子性</strong>同时获取发送方和接收方的当前账户金额。第二次事务发起转账操作，<strong data-nodeid="83514">冲突检测 ModRevision 是否发生变化</strong>，如果没有变化则正常提交事务；若发生了冲突，则需要进行重试。</p>
<p data-nodeid="86210" class="">上述过程的实现较为烦琐，除了业务逻辑，还有大量的代码用来判断冲突以及重试。因此，etcd 社区基于事务特性，<strong data-nodeid="86216">实现了一个简单的事务框架 STM，</strong> 构建了多种事务隔离级别，下面我们看看如何基于 STM 框架实现 etcd 事务。</p>

<h3 data-nodeid="83362">使用 STM 实现转账</h3>
<p data-nodeid="83363">为了简化 etcd 事务实现的过程，etcd clientv3 提供了 STM（Software Transactional Memory，软件事务内存），帮助我们自动处理这些烦琐的过程。使用 STM 优化之后的转账业务代码如下：</p>
<pre class="lang-go" data-nodeid="83364"><code data-language="go"><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">txnStmTransfer</span><span class="hljs-params">(cli *v3.Client, from, to <span class="hljs-keyword">string</span>, amount <span class="hljs-keyword">uint</span>)</span> <span class="hljs-title">error</span></span> {
	<span class="hljs-comment">// NewSTM 创建了一个原子事务的上下文，业务代码作为一个函数传进去</span>
	_, err := concurrency.NewSTM(cli, <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">(stm concurrency.STM)</span> <span class="hljs-title">error</span></span> {
		<span class="hljs-comment">// stm.Get 封装了事务的读操作</span>
		senderNum := toUint64(stm.Get(from))
		receiverNum := toUint64(stm.Get(to))
		<span class="hljs-keyword">if</span> senderNum &lt; amount {
			<span class="hljs-keyword">return</span> fmt.Errorf(<span class="hljs-string">"余额不足"</span>)
		}
		<span class="hljs-comment">// 事务的写操作</span>
		stm.Put(to, fromUint64(receiverNum + amount))
		stm.Put(from, fromUint64(senderNum - amount))
		<span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span>
	})
	<span class="hljs-keyword">return</span> err
}
</code></pre>
<p data-nodeid="83365">上述操作基于 STM 实现了转账业务流程，我们只需要关注转账逻辑的实现即可，事务相关的其他操作由 STM 完成。</p>
<h3 data-nodeid="83366">STM 实现细节</h3>
<p data-nodeid="83367">下面我们来看 STM 的实现原理。通过上面转账的例子，我们可以看到 STM 的使用特别简单，只需<strong data-nodeid="83529">把业务相关的代码封装成可重入的函数传给 stm，而 STM 可自行处理事务相关的细节</strong>。</p>
<p data-nodeid="83368">concurrency.STM 是一个接口，提供了对某个 key 的&nbsp;CURD&nbsp;操作：</p>
<pre class="lang-go" data-nodeid="83369"><code data-language="go"><span class="hljs-comment">// 位于 clientv3/concurrency/stm.go:25</span>
<span class="hljs-keyword">type</span> STM <span class="hljs-keyword">interface</span> {
	<span class="hljs-comment">// Get 返回键的值，并将该键插入 txn 的 read set 中。如果 Get 失败，它将以错误中止事务，没有返回</span>
	Get(key ...<span class="hljs-keyword">string</span>) <span class="hljs-keyword">string</span>
	<span class="hljs-comment">// Put 在 write set 中增加键值对</span>
	Put(key, val <span class="hljs-keyword">string</span>, opts ...v3.OpOption)
	<span class="hljs-comment">// Rev 返回 read set 中某个键指定的版本号</span>
	Rev(key <span class="hljs-keyword">string</span>) <span class="hljs-keyword">int64</span>
	<span class="hljs-comment">// Del 删除某个键</span>
	Del(key <span class="hljs-keyword">string</span>)
	<span class="hljs-comment">// commit 尝试提交事务到 etcd server</span>
	commit() *v3.TxnResponse
	reset()
}
</code></pre>
<p data-nodeid="83370">STM 是软件事务存储的接口。其中定义了 Get、Put、Rev、Del、commit、reset 等接口方法。STM 的接口有两个实现类：stm&nbsp;和&nbsp;stmSerializable。具体选择哪一个，由我们指定的隔离级别决定。</p>
<p data-nodeid="83371">STM 对象在内部构造 txn 事务，业务函数转换成<code data-backticks="1" data-nodeid="83533">If-Then</code>，自动提交事务以及处理失败重试等工作，直到事务执行成功。核心的<code data-backticks="1" data-nodeid="83535">NewSTM</code>函数的实现如下所示：</p>
<pre class="lang-go" data-nodeid="83372"><code data-language="go"><span class="hljs-comment">// 位于 clientv3/concurrency/stm.go:89</span>
<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">NewSTM</span><span class="hljs-params">(c *v3.Client, apply <span class="hljs-keyword">func</span>(STM)</span> <span class="hljs-title">error</span>, <span class="hljs-title">so</span> ...<span class="hljs-title">stmOption</span>) <span class="hljs-params">(*v3.TxnResponse, error)</span></span> {
   opts := &amp;stmOptions{ctx: c.Ctx()}
   <span class="hljs-keyword">for</span> _, f := <span class="hljs-keyword">range</span> so {
      f(opts)
   }
   <span class="hljs-keyword">if</span> <span class="hljs-built_in">len</span>(opts.prefetch) != <span class="hljs-number">0</span> {
      f := apply
      apply = <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">(s STM)</span> <span class="hljs-title">error</span></span> {
         s.Get(opts.prefetch...)
         <span class="hljs-keyword">return</span> f(s)
      }
   }
   <span class="hljs-keyword">return</span> runSTM(mkSTM(c, opts), apply)
}
</code></pre>
<p data-nodeid="83373">根据源码可以知道，<code data-backticks="1" data-nodeid="83538">NewSTM</code>首先判断该事务是否存在预取的键值对，如果存在，会无条件地直接 apply 函数；否则会创建一个 stm，并运行 stm 事务。runSTM 代码如下所示：</p>
<pre data-nodeid="83374"><code>// 位于 clientv3/concurrency/stm.go:140
func runSTM(s STM, apply func(STM) error) (*v3.TxnResponse, error) {
	outc := make(chan stmResponse, 1)
	go func() {
		defer func() {
			if r := recover(); r != nil {
				e, ok := r.(stmError)
				if !ok {
					// 执行异常
					panic(r)
				}
				outc &lt;- stmResponse{nil, e.err}
			}
		}()
		var out stmResponse
		for {
            // 重置 stm
			s.reset()
            // 执行事务操作，apply 函数
			if out.err = apply(s); out.err != nil {
				break
			}
            // 提交事务
			if out.resp = s.commit(); out.resp != nil {
				break
			}
		}
		outc &lt;- out
	}()
	r := &lt;-outc
	return r.resp, r.err
}
</code></pre>
<p data-nodeid="83375">runSTM 函数首先重置了 stm，清空 STM 的读写缓存；接着执行事务操作，apply 应用函数；最后将事务提交。提交事务的实现如下：</p>
<pre class="lang-go" data-nodeid="83376"><code data-language="go"><span class="hljs-comment">// 位于 clientv3/concurrency/stm.go:265</span>
<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(s *stm)</span> <span class="hljs-title">commit</span><span class="hljs-params">()</span> *<span class="hljs-title">v3</span>.<span class="hljs-title">TxnResponse</span></span> {
   txnresp, err := s.client.Txn(s.ctx).If(s.conflicts()...).Then(s.wset.puts()...).Commit()
   <span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
      <span class="hljs-built_in">panic</span>(stmError{err})
   }
   <span class="hljs-keyword">if</span> txnresp.Succeeded {
      <span class="hljs-keyword">return</span> txnresp
   }
   <span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span>
}
</code></pre>
<p data-nodeid="83377">上述 commit 的实现包含了我们前面所介绍的 etcd 事务语法。If 中封装了冲突检测条件，提交事务则是 etcd 的 Txn 将 wset 中的数据写入并提交的过程。</p>
<p data-nodeid="83378">下面我们来看看 etcd 隔离级别以及在 STM 封装基础上如何实现事务。</p>
<h3 data-nodeid="83379">etcd 事务隔离级别</h3>
<p data-nodeid="83380">数据库一般有以下几种事务隔离级别。</p>
<ul data-nodeid="83381">
<li data-nodeid="83382">
<p data-nodeid="83383"><strong data-nodeid="83553">未提交读</strong>（Read Uncommitted）：能够读取到其他事务中还未提交的数据，这可能会导致<strong data-nodeid="83554">脏读</strong>的问题。</p>
</li>
<li data-nodeid="83384">
<p data-nodeid="83385"><strong data-nodeid="83563">读已提交</strong>（Read Committed）：只能读取到已经提交的数据，即别的事务一提交，当前事务就能读取到被修改的数据，这可能导致<strong data-nodeid="83564">不可重复读</strong>的问题。</p>
</li>
<li data-nodeid="83386">
<p data-nodeid="83387"><strong data-nodeid="83569">可重复读</strong>（Repeated Read）：一个事务中，同一个读操作在事务的任意时刻都能得到同样的结果，其他事务的提交操作对本事务不会产生影响。</p>
</li>
<li data-nodeid="83388">
<p data-nodeid="83389"><strong data-nodeid="83578">串行化</strong>（Serializable）：串行化执行事务，即一个事务的执行会阻塞其他事务。该隔离级别通过牺牲并发能力换取数据的安全，属于<strong data-nodeid="83579">最高的隔离级别</strong>。</p>
</li>
</ul>
<p data-nodeid="83390">etcd 的事务可以看作是一种“微事务”，在它之上，可以构建出各种隔离级别的事务。STM 的事务级别通过 stmOption 指定，位于 clientv3/concurrency/stm.go 中，分别为 SerializableSnapshot、Serializable、RepeatableReads 和 ReadCommitted。</p>
<p data-nodeid="83391">构造 STM 的实现如下所示：</p>
<pre class="lang-go" data-nodeid="83392"><code data-language="go"><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">mkSTM</span><span class="hljs-params">(c *v3.Client, opts *stmOptions)</span> <span class="hljs-title">STM</span></span> {
   <span class="hljs-keyword">switch</span> opts.iso {
   <span class="hljs-comment">// 串行化快照</span>
   <span class="hljs-keyword">case</span> SerializableSnapshot:
      s := &amp;stmSerializable{
         stm:      stm{client: c, ctx: opts.ctx},
         prefetch: <span class="hljs-built_in">make</span>(<span class="hljs-keyword">map</span>[<span class="hljs-keyword">string</span>]*v3.GetResponse),
      }
      s.conflicts = <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">()</span> []<span class="hljs-title">v3</span>.<span class="hljs-title">Cmp</span></span> {
         <span class="hljs-keyword">return</span> <span class="hljs-built_in">append</span>(s.rset.cmps(), s.wset.cmps(s.rset.first()+<span class="hljs-number">1</span>)...)
      }
      <span class="hljs-keyword">return</span> s
   <span class="hljs-comment">// 串行化</span>
   <span class="hljs-keyword">case</span> Serializable:
      s := &amp;stmSerializable{
         stm:      stm{client: c, ctx: opts.ctx},
         prefetch: <span class="hljs-built_in">make</span>(<span class="hljs-keyword">map</span>[<span class="hljs-keyword">string</span>]*v3.GetResponse),
      }
      s.conflicts = <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">()</span> []<span class="hljs-title">v3</span>.<span class="hljs-title">Cmp</span></span> { <span class="hljs-keyword">return</span> s.rset.cmps() }
      <span class="hljs-keyword">return</span> s
   <span class="hljs-comment">// 可重复读   </span>
   <span class="hljs-keyword">case</span> RepeatableReads:
      s := &amp;stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
      s.conflicts = <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">()</span> []<span class="hljs-title">v3</span>.<span class="hljs-title">Cmp</span></span> { <span class="hljs-keyword">return</span> s.rset.cmps() }
      <span class="hljs-keyword">return</span> s
   <span class="hljs-comment">// 已提交读</span>
   <span class="hljs-keyword">case</span> ReadCommitted:
      s := &amp;stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
      s.conflicts = <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">()</span> []<span class="hljs-title">v3</span>.<span class="hljs-title">Cmp</span></span> { <span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span> }
      <span class="hljs-keyword">return</span> s
   <span class="hljs-keyword">default</span>:
      <span class="hljs-built_in">panic</span>(<span class="hljs-string">"unsupported stm"</span>)
   }
}
</code></pre>
<p data-nodeid="83393">该函数是根据隔离级别定义的。每一类隔离级别对应不同的<strong data-nodeid="83591">冲突检测条件</strong>，存在<strong data-nodeid="83592">读操作差异</strong>，因此我们需要搞清楚每一类隔离级别在这两方面的实现。</p>
<p data-nodeid="83394">从构建 SMT 的实现代码可以知道，etcd 隔离级别与一般的数据库隔离级别的差异是<strong data-nodeid="83598">没有未提交读的隔离级别</strong>，这是因为 etcd 通过 MVCC 机制实现读写不阻塞，并解决脏读的问题。下面我们将从低到高分别介绍 etcd 事务隔离级别。</p>
<h4 data-nodeid="83395">ReadCommitted 已提交读</h4>
<p data-nodeid="83396">ReadCommitted 是 etcd 中的<strong data-nodeid="83605">最低事务级别</strong>。ReadCommitted 是指一个事务提交之后，它做的变更才会被其他事务看到，只允许客户端获取已经提交的数据。</p>
<p data-nodeid="83397">由构造 STM 的源码可知，ReadCommitted 调用的是 stm 的实现。对于不一样的隔离级别，我们主要关注的就是读操作和提交时的冲突检测条件。而对于写操作，会先写进本地缓存，直到事务提交时才真正写到 etcd 里。</p>
<ul data-nodeid="83398">
<li data-nodeid="83399">
<p data-nodeid="83400">读操作</p>
</li>
</ul>
<pre class="lang-java" data-nodeid="83401"><code data-language="java">func (s *stm) Get(keys ...string) string {
   <span class="hljs-keyword">if</span> wv := s.wset.get(keys...); wv != nil {
      <span class="hljs-keyword">return</span> wv.val
   }
   <span class="hljs-keyword">return</span> respToValue(s.fetch(keys...))
}
</code></pre>
<p data-nodeid="83402">从 etcd 读取 keys，就像普通的 kv 操作一样。第一次 Get 后，在事务中缓存，后续不再从 etcd 读取。</p>
<ul data-nodeid="83403">
<li data-nodeid="83404">
<p data-nodeid="83405">冲突检测条件</p>
</li>
</ul>
<pre class="lang-go" data-nodeid="83406"><code data-language="go">s.conflicts = <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">()</span> []<span class="hljs-title">v3</span>.<span class="hljs-title">Cmp</span></span> { <span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span> }
</code></pre>
<p data-nodeid="83407">ReadCommitted 只需要确保自己读到的是别人已经提交的数据，由于 etcd 的 kv 操作都是原子操作，所以不可能读到未提交的修改。</p>
<h4 data-nodeid="83408">RepeatableReads 可重复读</h4>
<p data-nodeid="83409">RepeatableReads 与 ReadCommitted 类似，调用的也是 stm 的实现。可重复读是指多次读取同一个数据时，其值都和事务开始时刻是一致的，因此可以实现可重复读。</p>
<ul data-nodeid="83410">
<li data-nodeid="83411">
<p data-nodeid="83412">读操作</p>
</li>
</ul>
<p data-nodeid="83413">与 ReadCommitted 类似，用 readSet 缓存已经读过的数据，这样下次再读取相同数据的时候才能得到同样的结果，确保了可重复读。</p>
<ul data-nodeid="83414">
<li data-nodeid="83415">
<p data-nodeid="83416">冲突检测条件</p>
</li>
</ul>
<pre class="lang-go" data-nodeid="83417"><code data-language="go">s.conflicts = <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">()</span> []<span class="hljs-title">v3</span>.<span class="hljs-title">Cmp</span></span> { <span class="hljs-keyword">return</span> s.rset.cmps() }
</code></pre>
<p data-nodeid="83418">在事务提交时，确保事务中 Get 的 keys 没有被改动过。因此使用 readSet 数据的 ModRevision 做冲突检测，<strong data-nodeid="83621">确保本事务读到的数据都是最新的</strong>。</p>
<p data-nodeid="83419">可重复读隔离级别的场景中，每个 key 的 Get 是独立的。在事务提交时，如果这些 keys 没有变动过，那么事务就可以提交。</p>
<h4 data-nodeid="83420">Serializable 串行读</h4>
<p data-nodeid="83421">串行化调用的实现类为 stmSerializable，当出现读写锁冲突的时候，后续事务必须等前一个事务执行完成，才能继续执行。这就相当于在事务开始时，对 etcd 做了一个快照，这样它读取到的数据就不会受到其他事务的影响，从而达到事务串行化（Serializable）执行的效果。</p>
<ul data-nodeid="83422">
<li data-nodeid="83423">
<p data-nodeid="83424">读操作</p>
</li>
</ul>
<pre class="lang-java" data-nodeid="83425"><code data-language="java">func (s *stmSerializable) Get(keys ...string) string {
   <span class="hljs-keyword">if</span> wv := s.wset.get(keys...); wv != nil {
      <span class="hljs-keyword">return</span> wv.val
   }
   <span class="hljs-comment">// 判断是否第一次读</span>
   firstRead := len(s.rset) == <span class="hljs-number">0</span>
   <span class="hljs-keyword">for</span> _, key := range keys {
      <span class="hljs-keyword">if</span> resp, ok := s.prefetch[key]; ok {
         delete(s.prefetch, key)
         s.rset[key] = resp
      }
   }
   resp := s.stm.fetch(keys...)
   <span class="hljs-keyword">if</span> firstRead {
      <span class="hljs-comment">// 记录下第一次读的版本作为基准</span>
      s.getOpts = []v3.OpOption{
         v3.WithRev(resp.Header.Revision),
         v3.WithSerializable(),
      }
   }
   <span class="hljs-keyword">return</span> respToValue(resp)
}
</code></pre>
<p data-nodeid="83426">事务中第一次读操作完成时，保存当前版本号 Revision；后续其他读请求会带上这个版本号，获取指定 Revision 版本的数据。这确保了该事务所有的读操作读到的都是同一时刻的内容。</p>
<ul data-nodeid="83427">
<li data-nodeid="83428">
<p data-nodeid="83429">冲突检测条件</p>
</li>
</ul>
<pre class="lang-java" data-nodeid="83430"><code data-language="java"> s.conflicts = func() []v3.Cmp { <span class="hljs-keyword">return</span> s.rset.cmps() }
</code></pre>
<p data-nodeid="83431">在事务提交时，需要检查事务中 Get 的 keys 是否被改动过，而 etcd 串行化的约束还不够，它缺少了验证事务要修改的 keys 这一步。下面的 SerializableSnapshot 事务增加了这个约束。</p>
<h4 data-nodeid="83432">SerializableSnapshot串行化快照读</h4>
<p data-nodeid="83433">SerializableSnapshot串行化快照隔离，提供可序列化的隔离，并检查写冲突。<strong data-nodeid="83635">etcd 默认采用这种隔离级别</strong>，串行化快照隔离是最严格的隔离级别，可以避免幻影读。其读操作与冲突检测的过程如下。</p>
<ul data-nodeid="83434">
<li data-nodeid="83435">
<p data-nodeid="83436">读操作</p>
</li>
</ul>
<p data-nodeid="83437">与 Serializable 串行化读类似。事务中的第一个 Get 操作发生时，保存服务器返回的当前 Revision；后续对其他 keys 的 Get 操作，指定获取 Revision 版本的 value。</p>
<ul data-nodeid="83438">
<li data-nodeid="83439">
<p data-nodeid="83440">冲突检测条件</p>
</li>
</ul>
<pre class="lang-go" data-nodeid="83441"><code data-language="go">s.conflicts = <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">()</span> []<span class="hljs-title">v3</span>.<span class="hljs-title">Cmp</span></span> {
    <span class="hljs-keyword">return</span> <span class="hljs-built_in">append</span>(s.rset.cmps(), s.wset.cmps(s.rset.first()+<span class="hljs-number">1</span>)...)
}
</code></pre>
<p data-nodeid="83442">在事务提交时，检查事务中 Get 的 keys 以及要修改的 keys 是否被改动过。</p>
<p data-nodeid="83443">SerializableSnapshot 不仅确保了读取过的数据是最新的，同时也确保了要写入的数据同样没有被其他事务更改过，是隔离的最高级别。</p>
<p data-nodeid="83444">如果这些语义不能满足你的业务需求，通过扩展 etcd 的官方 Client SDK，写一个新 STM 事务类型即可。</p>
<h3 data-nodeid="83445">小结</h3>
<p data-nodeid="83446">这一讲我们先介绍了数据库中的事务定义，以及 etcd 中的事务实现，事务降低了客户端应用编码的复杂度；接着通过一个转账的案例来演示 etcd 基于乐观锁如何实现事务，以及 STM 改进的转账案例。最后我们介绍了 etcd STM 微事务及其几种隔离机制。</p>
<p data-nodeid="83447">本讲内容总结如下：</p>
<p data-nodeid="86853" class="te-preview-highlight"><img src="https://s0.lgstatic.com/i/image6/M00/16/0B/Cgp9HWBF8RqAEKc1AAIFtf2bepo790.png" alt="Drawing 1.png" data-nodeid="86856"></p>

<p data-nodeid="83449">通过上面的分析，我们清楚了如何使用 etcd 的 txn 事务构建符合 ACID 语义的事务框架。需要强调的是， etcd 的 STM 事务是 CAS 重试模式，在发生冲突时会多次重试，这就要<strong data-nodeid="83653">保证业务代码是可重试的</strong>，因此不同于数据库事务的加锁模式。</p>
<p data-nodeid="83450">学习完这一讲，我要给大家留一个问题，你知道乐观锁适用于哪些场景吗？欢迎在留言区写下你的答案。下一讲，我们将继续介绍 etcd watch 机制的实现原理。</p>