mvcc简介
悲观锁
在对于一些临界资源进行读写的时候,为了防止其他人进行同步的修改数据,直接将当前的数据锁住,不让别人使用,来实现并发安全
乐观锁
在对临界资源进行操作的时候,不锁住数据,实现独占,而是判断当前数据是否有被其他人修改过,如果修改了,则修改失败
乐观锁是在操作数据之前,先获得数据的版本号或时间戳,然后在提交的时候检查数据的版本号/时间戳是否发生了变化,如果变化了则操作失败,否则就操作成功
MVCC
MVCC是一种数据库并发控制技术,为每个事务操作分配唯一的时间戳来实现并发控制,在每个事务中,只能开到在此之前已经提交的数据版本,而不会受到其他事务的干扰,
etcd的关键结构
在内存中使用一个Btree
维护key的所有版本索引信息
- 通过某个
key
在btree
中进行查找,获得到某个key的keyIndex
信息 - 在当前的
keyIndex
中存储着某个key的KeyIndex的全部版本信息,最新的修改版本记录,以及各个generation的版本的迭代器记录 - 在
generation
中存储着某个key从创建到删除的全部版本记录,在generation中记录着当前generaatioin的创建的版本号,以及当前generation的所有的版本号信息 - 同时在
revision
版本中记录着某个具体事务的ID,和事务中某个子操作的ID
KeyValue
etcd在boltdb中存储的是key是key的版本号信息,value是当前的KeyValue结构体序列化之后的数据
type KeyValue struct {// key is the key in bytes. An empty key is not allowed.Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`// create_revision is the revision of last creation on this key.CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"`// mod_revision is the revision of last modification on this key.ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"`// version is the version of the key. A deletion resets// the version to zero and any modification of the key// increases its version.Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`// value is the value held by the key, in bytes.Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"`// lease is the ID of the lease that attached to key.// When the attached lease expires, the key will be deleted.// If lease is 0, then no lease is attached to the key.Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"`XXX_NoUnkeyedLiteral struct{} `json:"-"`XXX_unrecognized []byte `json:"-"`XXX_sizecache int32 `json:"-"`
}
某次操作的key信息和value信息
TreeIndex
// etcd的key索引是基础btree实现的
// etcd中的mvcc模块可以分成两个部分,索引组件+存储组件
// 首先根据key在treeindex中查找到对应的revision信息,treeindex根据查询的key从b-tree中查找到一个keyindex对象
//btree里面的key就是一个普通的keyindex匿名对象,只有一个key字段,而value就是一个keyindex对象,这个keyindex对象里面存储了当前key的revision信息
type treeIndex struct {sync.RWMutextree *btree.BTreeG[*keyIndex] //节点的key是一个keyindex结构lg *zap.Logger
}
treeIndex中存储的是一个btree,btree的key是一个keyIndex,里面只存储了一个key对象,而获得到的value里面存储的就是一个当前的具体的keyIndex的对象
KeyIndex
type keyIndex struct {key []byte //用户key的名字modified Revision // the main rev of the last modification,最后一次修改key的etcd版本号generations []generation //key的若干代版本号信息,每代中包含对key的多次修改的版本号列表,append递增
}
KeyIndex中包含了一个key的所有创建与删除的版本信息
generation
// generation contains multiple revisions of a key.
// 表示一个key从创建到删除的过程中,每代对应key的一个生命周期的开始与结束
// 当第一次创建一个key的时候,就会生成第0代,后续的操作都是再第0代中追加版本号
// 当把key删除掉的时候,就会生成新的的空的generation,一个key不断经历创建,删除的过程,就回u生成多个代
// etcd主要通过压缩操作来回收generation,revision和boltdb中的冗余数据
type generation struct {ver int64 //表示此key在当前代,修改的次数created Revision // when the generation is created (put in first revision).当前generation创建时候的版本号revs []Revision //对此key的修改的版本号记录列表
}
表示一个key从创建到删除的过程中,每代对应key的一个生命周期的开始与结束
在删除某个key的时候,就会在当前的key的generation增加一个tomb,并且增加一个新的generation
Revision
type Revision struct {// Main is the main revision of a set of changes that happen atomically.Main int64 //事务内的主版本号(etcd时钟),一个事务内的主版本号是唯一的,全局递增的,随着put/txn/delete等事务递增// Sub is the sub revision of a change in a set of changes that happen// atomically. Each change has different increasing sub revision in that// set.//一个事务内发生put,del操作的时候,从0开始递增,Sub int64 //事务内的子版本号
}
revision是对当前key的某个操作的版本信息
Main:某个具体的事务ID
Sub:某个事务内部的子版本号
Put
etcd在server层同样也是要开启一个事务,只不过当前是一个写事务,当前的写事务中继承了所有的事务的相关的接口
type storeTxnWrite struct {storeTxnCommon//事务的具体的操作tx backend.BatchTx// beginRev is the revision where the txn begins; it will write to the next revision.beginRev int64 //当前事务启动的时候,这个版本号信息changes []mvccpb.KeyValue
}
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {rev := tw.beginRev + 1 //在这个地方进行原子性的递增c := revoldLease := lease.NoLease// if the key exists before, use its previous created and// get its previous leaseID_, created, ver, err := tw.s.kvindex.Get(key, rev) //根据给定的rev,找到比这个rev小的这个key'的事务数据,创建,和修改的次数if err == nil {c = created.Main//如果当前的key已经存在过了,就直接获得他的创建信息....}ibytes := NewRevBytes() //版本号的字节流数据idxRev := Revision{Main: rev, Sub: int64(len(tw.changes))} //执行当前的put操作的执行//这个就是boltdb中的key:这个key是revision,ibytes = RevToBytes(idxRev, ibytes) //把revision转化成bytesver = ver + 1 //因为是要插入数据,所以更新他的这个key的修改次数kv := mvccpb.KeyValue{ //mvcc当前操作的key和value的相关数据Key: key,Value: value,CreateRevision: c, //当前的key创建的时候,所在的版本ModRevision: rev, //当前的rev所在的版本Version: ver, //当前key修改的次数Lease: int64(leaseID),}d, err := kv.Marshal() //将上面的这些kv数据全部进行编码,boltdb中插入的数据,keyvalue序列话之后的数据....//boltdb中的key就是把revision序列话之后的数据,因为对于每一个数据的版本操作都是唯一的tw.tx.UnsafeSeqPut(schema.Key, ibytes, d) //把编码的数据进行插入tw.s.kvindex.Put(key, idxRev) //插入完之后,修改这个key的版本信息,当前就是已经修改过了的信息tw.changes = append(tw.changes, kv)......
}
- 根据当前的key和rev信息,在内存中查找treeIndex,获得当前的KeyIndex对象,并在得到的keyIndex对象中查找符合条件的那个距离最近的那个generation信息,并且从当前的这个generation获得到举例当前atRev中最近的那个版本号信息,当前key一共修改的次数,以及其创建时候的版本号信息
- 将当前的版本号信息转化成字节流,并且生成KeyValue结构体,并将他序列化成字节流
- 调用
UnsafeSeqPut
将当前的kv数据插入到磁盘中,key:版本号的字节流信息,value:就是序列化之后keyvalue的数据,写入到底层的boltdb - 最后调用
tw.s.kvindex.Put(key, idxRev)
更新treeIndex信息,包括当前操作的key,和此时对应的版本号信息
func (ti *treeIndex) Put(key []byte, rev Revision) {keyi := &keyIndex{key: key} //传入的就是他当前正在操作的key,以及传入当前他的revision,构造一个keyindex,每一次事务开启他的main版本号都递增ti.Lock() //上锁defer ti.Unlock() //走的时候解锁okeyi, ok := ti.tree.Get(keyi) //因为当前的tree的key是keyindex对象,根据给定的keyindex对象获得他的value数据if !ok { //如果当前并没有找到这个keyindex的的时候,我们就可以直接插入这个数据,说明当前是第一次进来keyi.put(ti.lg, rev.Main, rev.Sub) //给当前的key插入版本号ti.tree.ReplaceOrInsert(keyi) //插入当前key到btree中return}okeyi.put(ti.lg, rev.Main, rev.Sub) //否则就把这个进行插入
}
- 根据给定的key构造一个keyIndex的对象,并在btree中调用Get获得这个key的具体的keyIndex信息
- keyIndex中的put方法,传入的是当前要操作的版本号信息
// put puts a revision to the keyIndex.put每次操作的都是最后一个generation,因为最后一个generation都是最新的,一旦增加一个generation,说明都是前面的key有经历过删除的操作
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {rev := Revision{Main: main, Sub: sub}//给当前的keyindex中插入一个revisionif !rev.GreaterThan(ki.modified) { //如果插入的revision并不比之前的哪个revision大的话,就要报错lg.Panic("'put' with an unexpected smaller revision",zap.Int64("given-revision-main", rev.Main),zap.Int64("given-revision-sub", rev.Sub),zap.Int64("modified-revision-main", ki.modified.Main),zap.Int64("modified-revision-sub", ki.modified.Sub),)}if len(ki.generations) == 0 {ki.generations = append(ki.generations, generation{}) //当前如果一个generation都没有的话,先插入一个空的generation}g := &ki.generations[len(ki.generations)-1] //获得此时正在操作的最新的的generationif len(g.revs) == 0 { // create a new key,如果当前的的generation并没有修改过,说明当前的generation是第一次进来keysGauge.Inc()g.created = rev //因为当前的genneration中一个revision都没有,说明当前就没有}g.revs = append(g.revs, rev) //将当前revision添加到当前generation代中g.ver++ //更改次数增加ki.modified = rev //当前修改的版本就是此时新增加的这个revision
}
- 获得当前最新操作的generation信息
- 在当前的generation中添加当前的版本号信息
- 并更新当前的generation信息
Read
每次range操作都会在etcd开启一个storeTxnCommon
事务,调用txn的方法,可以获取到范围内的所有keyIndex信息,实现读写分离的操作
func (tr *storeTxnCommon) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) { //currev标识是当前事务的ID,拿到了key之后以及感兴趣的revision后,我们就可以取treeindex中查找rev := ro.Revif rev > curRev { //检查锁查找的rev是否有效,超过当前版本的不行return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev}if rev <= 0 {rev = curRev //如果当前的版本小于0,更改为当前事务的版本}if rev < tr.s.compactMainRev { //在已经compact事务执行之前的版本也不行return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted}//if ro.Count { //为真表示计算满足条件的的版本号数量,而不是检索键值对total := tr.s.kvindex.CountRevisions(key, end, rev) //调用这个来查找满足条件的的版本数量tr.trace.Step("count revisions from in-memory index tree")return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil}//根据指定的版本取kvindex的btree中查找,所有符合rev版本从key到end的版本信息revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit)) //获得一系列符号条件的版本信息tr.trace.Step("range keys from in-memory index tree")if len(revpairs) == 0 { //没有找到匹配的return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil}limit := int(ro.Limit) //这个来限制返回的版本数量if limit <= 0 || limit > len(revpairs) {limit = len(revpairs)}kvs := make([]mvccpb.KeyValue, limit) //根据配置,创建需要可以返回的kv数据revBytes := NewRevBytes()for i, revpair := range revpairs[:len(kvs)] { //截取前面的limit个进行遍历//遍历所有的版本select {case <-ctx.Done():return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())default:}revBytes = RevToBytes(revpair, revBytes) //将当前的版本号revision序列化成字符串,到boltdb中查询//因为底层的boltdb中的key就是某次put的revisiono的序列化之后的数据,我们就是通过这个序列化之后的数据获得时局的key/value数据_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0) //从底层的boltdb中获得真实的key/value信息if len(vs) != 1 {tr.s.lg.Fatal("range failed to find revision pair",zap.Int64("revision-main", revpair.Main),zap.Int64("revision-sub", revpair.Sub),zap.Int64("revision-current", curRev),zap.Int64("range-option-rev", ro.Rev),zap.Int64("range-option-limit", ro.Limit),zap.Binary("key", key),zap.Binary("end", end),zap.Int("len-revpairs", len(revpairs)),zap.Int("len-values", len(vs)),)} //判断根据这个revision从boltdb中获得的数据是否有效的序列化if err := kvs[i].Unmarshal(vs[0]); err != nil {tr.s.lg.Fatal("failed to unmarshal mvccpb.KeyValue",zap.Error(err),)}}tr.trace.Step("range keys from bolt db")return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil //获得到符合条件的信息
}
- 检查要查找的rev版本号是否有效,超过当前版本的不行,已经被compacted删除的也不行
- 调用
tr.s.kvindex.Revisions
在内存中的treeIndex中的btree中进行查找,找到符合条件的limit个版本信息 - 遍历所有符合条件的版本,UnSafeGet从底层磁盘中的boltdb中获得真实的key/value信息
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []Revision, total int) {ti.RLock()defer ti.RUnlock()if end == nil { //end=nil,表示只需要检索key所对应的版本信息,end表示我们要检索的key的范围的结束信息rev, _, _, err := ti.unsafeGet(key, atRev) //根据当前的事务版本获得感兴趣的revision信息,获得当前事务的最新的modify的版本信息if err != nil {return nil, 0}return []Revision{rev}, 1}ti.unsafeVisit(key, end, func(ki *keyIndex) bool { //如果end!=nil,就需要进行一个范围检索if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {if limit <= 0 || len(revs) < limit {revs = append(revs, rev)}total++}return true})return revs, total //返回当前符合条件的信息
}
- 如果当前没有作为end作为对key范围查找的结束字符串的话,就直接调用treeIndex的unsafeGet查找atRev中的一个指定的revision信息
- 否则就需要进行进行范围查找多个指定的revision信息
func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {keyi := &keyIndex{key: key} //给定一个key,和atrev这个版本,构造一个keyindex对象,(treeindex中的btree里面的key就是这个对象)if keyi = ti.keyIndex(keyi); keyi == nil { //根据这个keyindex,在btree中查找,如果已经存在了的话,下面我们就可以直接使用,如果不存在的话,就返回不存在return Revision{}, Revision{}, 0, ErrRevisionNotFound //当前是没有找到,所以返回没有找到}return keyi.get(ti.lg, atRev) //存在这个keyindex,所以在这个keyindex中查找指定的数据
}
- 首先调用
keyIndex
获得当前key的一个keyIndex信息 - 并在当前的keyindex中调用
get
,获得某个指定的revision