原文链接:Versioned Value
分布式系统模式:版本化值
版本化值即将每次更新存储为带新版本号的值,以支持历史值读取。
在分布式系统中,节点需要能够判断键的哪个值是最新的,有时还需要获取历史值,以便能够对值的变化做出正确的反应。
解决方案
为每个值存储一个版本号,该版本号在值每次更新时都会递增。这允许将每次更新操作转换为新写入操作,而不会阻塞读取。客户端也可以读取特定版本号的历史值。
一个复制键值存储的简单示例:集群的 Leader 负责处理对键值存储的所有写入请求,它将写请求保存在 预写日志(Write-Ahead Log)中,且预写日志在 Leader 和 Followers 之间复制。Leader 将来自 高水位(High-Water Mark)的预写日志的条目应用到键值存储。
这是一种称为 状态机复制(State Machine Replication)的标准复制方式,大多数基于共识算法的数据系统(如 Raft)都是以这种方式实现的。键值存储保留一个整型的版本计数器,每次根据预写日志执行写入命令时,版本计数器都会增加,然后使用递增的版本计数器构造新键,就不会更新现有值,但每个写入请求都会继续将新值添加到备份存储。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class ReplicatedKVStore {
int version = 0; MVCCStore mvccStore = new MVCCStore(); @Override public CompletableFuture<Response> put(String key, String value) { return server.propose(new SetValueCommand(key, value)); } private Response applySetValueCommand(SetValueCommand setValueCommand) { getLogger().info("Setting key value " + setValueCommand); version = version + 1; mvccStore.put(new VersionedKey(setValueCommand.getKey(), version), setValueCommand.getValue()); Response response = Response.success(version); return response; } }
|
版本化键的排序
在具体实现上,一个重要的问题是要支持快速导航到最佳匹配版本,所以版本化键是通过版本号作为键的后缀来形成自然排序。这维护了一个与底层数据结构非常匹配的顺序,例如当密钥有两个版本 key1 和 key2,则 key1 排在 key2 之前。
为了存储版本化的键值,要使用可实现快速查找最近匹配版本的数据结构,例如跳表。 在 Java 中,MVCC 存储可以按如下方式构建:
1 2 3 4 5 6 7 8 9 10 11
| class MVCCStore…
public class MVCCStore { NavigableMap<VersionedKey, String> kv = new ConcurrentSkipListMap<>(); public void put(VersionedKey key, String value) { kv.put(key, value); } } }
|
为了使用可导航的 Map,版本化的键实现如下。它实现了一个基于键的自然排序的比较器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| class VersionedKey {
public class VersionedKey implements Comparable<VersionedKey> { private String key; private int version; public VersionedKey(String key, int version) { this.key = key; this.version = version; } public String getKey() { return key; } public int getVersion() { return version; } @Override public int compareTo(VersionedKey other) { int keyCompare = this.key.compareTo(other.key); if (keyCompare != 0) { return keyCompare; } return Integer.compare(this.version, other.version); } } }
|
在此实现中,使用可导航 Map 的 API 可获取特定版本的值。
1 2 3 4 5 6 7
| class MVCCStore {
public Optional<String> get(final String key, final int readAt) { Map.Entry<VersionedKey, String> entry = kv.floorEntry(new VersionedKey(key, readAt)); return (entry == null)? Optional.empty(): Optional.of(entry.getValue()); } }
|
在下面的示例中,存储在版本号 1、2、3 和 5 的键有四个版本。根据客户端指定的版本来读取值,返回与键最匹配的版本。
在特定版本上读取:

存储特定键值的版本返回给客户端,然后客户端可以使用此版本读取值。
读取指定版本:

处理请求:

读取多个版本
有时客户端需要从给定的版本号中获取所有版本。例如在状态监控中,客户端需要从特定版本获取所有事件。
集群节点可以使用额外的索引结构来存储一个键的所有版本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| class IndexedMVCCStore {
public class IndexedMVCCStore { NavigableMap<String, List<Integer>> keyVersionIndex = new TreeMap<>(); NavigableMap<VersionedKey, String> kv = new TreeMap<>(); ReadWriteLock rwLock = new ReentrantReadWriteLock(); int version = 0; public int put(String key, String value) { rwLock.writeLock().lock(); try { kv.put(new VersionedKey(key, ++version), value); updateVersionIndex(key, version); return version; } finally { rwLock.writeLock().unlock(); } } private void updateVersionIndex(String key, int newVersion) { List<Integer> versions = getVersions(key); versions.add(newVersion); keyVersionIndex.put(key, versions); } private List<Integer> getVersions(String key) { List<Integer> versions = keyVersionIndex.get(key); if (versions == null) { versions = new ArrayList<>(); keyVersionIndex.put(key, versions); } return versions; } } }
|
然后可以向客户端提供 API 以从特定版本或版本范围读取值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class IndexedMVCCStore {
public List<String> getRange(String key, final int fromRevision, int toRevision) { rwLock.readLock().lock(); try { List<Integer> versions = keyVersionIndex.get(key); Integer maxRevisionForKey = versions.stream().max(Integer::compareTo).get(); Integer revisionToRead = maxRevisionForKey > toRevision ? toRevision : maxRevisionForKey; SortedMap<VersionedKey, String> versionMap = kv.subMap(new VersionedKey(key, revisionToRead), new VersionedKey(key, toRevision)); getLogger().info("Available version keys " + versionMap + ". Reading@" + fromRevision + ":" + toRevision); return new ArrayList<>(versionMap.values()); } finally { rwLock.readLock().unlock(); } } }
|
在更新和读取索引时使用锁必须小心,其中有一种替代实现:使用键保存所有版本化值的列表,如 Gossip 传播中使用的那样,可以避免不必要的状态交换。
MVCC 与事务隔离
数据库使用版本化值来实现 MVCC 和 事务隔离。
并发控制是考虑当有多个并发请求访问相同数据时如何使用锁的问题。当使用锁来实现同步时,所有其他请求都会被阻塞,直到持有锁的请求完成并释放锁。使用版本化值,每个写入请求都会添加一条新记录,因此允许使用非阻塞数据结构来存储值。
事务隔离级别,例如 快照级别隔离 也可以自然实现。当客户端从特定版本开始读取时,即使存在并发写入事务在多个读取请求之间提交不同的值,也保证每次从数据库读取时都获得相同的值,
读取快照:

使用诸如 RocksDB 的存储引擎
使用 RocksDB 或类似的嵌入式存储引擎常用于数据存储的存储后端。例如 Etcd 使用 boltdb,CockroachDB 之前使用 RocksDB,现在使用 RocksDb 的 Golang 版本 pebble.
这些存储引擎提供版本化值存储的实现,它们在内部使用跳表,并依赖于键的排序。 有一种方法可以为自定义键提供自定义比较器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class VersionedKeyComparator {
public class VersionedKeyComparator extends Comparator { public VersionedKeyComparator() { super(new ComparatorOptions()); } @Override public String name() { return "VersionedKeyComparator"; } @Override public int compare(Slice s1, Slice s2) { VersionedKey key1 = VersionedKey.deserialize(ByteBuffer.wrap(s1.data())); VersionedKey key2 = VersionedKey.deserialize(ByteBuffer.wrap(s2.data())); return key1.compareTo(key2); } } }
|
使用 RocksDB 的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| class RocksDBMvccStore {
private final RocksDB db; public RocksDBMvccStore(File cacheDir) throws RocksDBException { Options options = new Options(); options.setKeepLogFileNum(30); options.setCreateIfMissing(true); options.setLogFileTimeToRoll(TimeUnit.DAYS.toSeconds(1)); options.setComparator(new VersionedKeyComparator()); db = RocksDB.open(options, cacheDir.getPath()); } public void put(String key, int version, String value) throws RocksDBException { VersionedKey versionKey = new VersionedKey(key, version); db.put(versionKey.serialize(), value.getBytes()); } public String get(String key, int readAtVersion) { RocksIterator rocksIterator = db.newIterator(); rocksIterator.seekForPrev(new VersionedKey(key, readAtVersion).serialize()); byte[] valueBytes = rocksIterator.value(); return new String(valueBytes); } }
|
应用实例