从ZooKeeper源代码看如何实现分布式系统(二)数据的高可用存储
2016-01-08 11:22
603 查看
这篇先从数据的高可用存储说起。ZooKeeper提供了分布式的目录服务,它存储的数据相比一个分布式存储系统来说很少,它主要是用来做分布式协同操作的。但是麻雀虽小,五脏俱全,ZooKeeper也必须要提供数据的高可用存储,对数据进行备份和恢复,以防出现服务器宕机导致数据丢失的情况。
高可用的数据存储有一个比较通用的解决方案,就是数据文件 + 日志文件的方式。比如传统数据库中的数据文件 + undo/redo日志就可以来进行数据备份和恢复,在日志文件中加入检查点checkpoint,可以更加快速地进行数据的恢复。所以对于高可用的数据存储来说,我们要考察3个方面:
数据文件
日志文件
检查点
数据文件
ZooKeeper的数据文件采用快照文件的方式来记录和持久化运行时的数据。顶层接口是SnapShot,提供了对运行时的数据DataTree和session的序列化和反序列化操作。DataTree保存了运行时的数据。
SnapShot的默认实现类是FileSnapShot,提供了把DataTree和Session持久化到文件的能力。来看一下它的序列化实现
1. 创建一个具备校验和的文件输出流
2. 对象的序列化采用apache jute框架,创建一个jute的OutputArchive的实现。下面给出了OutputArchive接口的定义,可以看到它和Thrift的TProtocol的定义基本一致,提供了一系列的write类型和read类型接口,是jute 序列化的顶层接口
3. OutputArchive的默认实现是BinaryOutputArchive,和Thrift的TBinaryProtocol实现基本一致,提供了二进制的序列化协议,内部采用DataOutputStream,把不同的数据类型写到Byte数组中
4. 快照文件的文件头对象FileHeader,包含一个魔数ZKSN, 版本号和dbId。 FileHeader实现了jute的Record接口,提供了serialize和deserialize方法实现
5. 快照文件体使用SerializeUtils这个辅助类来实现,先序列化Session,序列化Session时,先写一个Long类型的SessionId,再写一个int类型的timeout。再序列化DataTree,它也实现了Jute的Record类,实现了序列化自己的serialize方法
6. DataTree的serialize方法,先序列化ACL信息,再序列化DataTree中的DataNode,采用中序遍历的方式递归遍历DataTree的所有节点。最后写入"/"表示文件结尾
涉及到的几个接口和类
反序列化即把快照文件反序列化成DataTree的过程和序列化的过程正好相反,值得注意的是,反序列化时,找的是最新的,可用的snapshot文件
1. findNValidSnapshots找100个以内的快照文件,并且按照zxid从大到小排列,保证最新的快照最先被处理
2. 如果最新的快照被成功处理,就返回,否则找第二新的快照,直到结束
// FileSnap
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should give up
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
for (int i = 0; i < snapList.size(); i++) {
snap = snapList.get(i);
InputStream snapIS = null;
CheckedInputStream crcIn = null;
try {
LOG.info("Reading snapshot " + snap);
snapIS = new BufferedInputStream(new FileInputStream(snap));
crcIn = new CheckedInputStream(snapIS, new Adler32());
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
deserialize(dt,sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch(IOException e) {
LOG.warn("problem reading snap file " + snap, e);
} finally {
if (snapIS != null)
snapIS.close();
if (crcIn != null)
crcIn.close();
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
return dt.lastProcessedZxid;
}
private List<File> findNValidSnapshots(int n) throws IOException {
List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot", false);
int count = 0;
List<File> list = new ArrayList<File>();
for (File f : files) {
// we should catch the exceptions
// from the valid snapshot and continue
// until we find a valid one
try {
if (Util.isValidSnapshot(f)) {
list.add(f);
count++;
if (count == n) {
break;
}
}
} catch (IOException e) {
LOG.info("invalid snapshot " + f, e);
}
}
return list;
}
日志文件
ZooKeeper将事务类分为两部分,TxnHeader表示事务头,包含了事务的基本信息。xxxTxn类表示具体类型的事务,包含了事务对应的操作路径和数据。
先来看看ZooKeeper中的事务基础类定义Txn,它只包含了type属性和data属性,实现了Jute Record接口,处理自身的序列化操作。具体的增删改操作事务各自定义了单独的类,都实现了Record接口。单独的事务类和Txn类可以根据type和data互相转化。
public class Txn implements Record {
private int type;
private byte[] data;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeInt(type,"type");
a_.writeBuffer(data,"data");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
type=a_.readInt("type");
data=a_.readBuffer("data");
a_.endRecord(tag);
}
public class CreateTxn implements Record {
private String path;
private byte[] data;
private java.util.List<org.apache.zookeeper.data.ACL> acl;
private boolean ephemeral;
private int parentCVersion;
}
public class DeleteTxn implements Record {
private String path;
}
}
public class SetDataTxn implements Record {
private String path;
private byte[] data;
private int version;
}
单独事务类和Txn类互相转换的示例如下
for (Txn subtxn : txns) {
ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
Record record = null;
switch (subtxn.getType()) {
case OpCode.create:
record = new CreateTxn();
break;
case OpCode.delete:
record = new DeleteTxn();
break;
case OpCode.setData:
record = new SetDataTxn();
break;
case OpCode.error:
record = new ErrorTxn();
post_failed = true;
break;
case OpCode.check:
record = new CheckVersionTxn();
break;
default:
throw new IOException("Invalid type of op: " + subtxn.getType());
}
ByteBufferInputStream.byteBuffer2Record(bb, record);
}
TxnHeader类定义了事务的基本信息,通过type可以确定具体的事务类型
public class TxnHeader implements Record {
private long clientId;
private int cxid;
private long zxid;
private long time;
private int type;
public TxnHeader() {
}
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeLong(clientId,"clientId");
a_.writeInt(cxid,"cxid");
a_.writeLong(zxid,"zxid");
a_.writeLong(time,"time");
a_.writeInt(type,"type");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
clientId=a_.readLong("clientId");
cxid=a_.readInt("cxid");
zxid=a_.readLong("zxid");
time=a_.readLong("time");
type=a_.readInt("type");
a_.endRecord(tag);
}
TxnHeader和具体事务类的交互例子如下:
switch (header.getType()) {
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
createNode(
createTxn.getPath(),
createTxn.getData(),
createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(), header.getTime());
break;
case OpCode.delete:
DeleteTxn deleteTxn = (DeleteTxn) txn;
rc.path = deleteTxn.getPath();
deleteNode(deleteTxn.getPath(), header.getZxid());
break;
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn
.getData(), setDataTxn.getVersion(), header
.getZxid(), header.getTime());
break;
ZooKeeper的日志文件接口是TxnLog接口,它提供了对事务日志的操作。
public interface TxnLog {
void rollLog() throws IOException;
boolean append(TxnHeader hdr, Record r) throws IOException;
TxnIterator read(long zxid) throws IOException;
long getLastLoggedZxid() throws IOException;
boolean truncate(long zxid) throws IOException;
long getDbId() throws IOException;
void commit() throws IOException;
void close() throws IOException;
}
public interface TxnIterator {
TxnHeader getHeader();
Record getTxn();
boolean next() throws IOException;
void close() throws IOException;
}
}
TxnLog的默认实现类是FileTxnLog,从它的描述可以看到事务日志文件的格式如下:
FileHeader TxnList ZeroPad
其中FileHeader和数据文件中的FileHeader一样,三要素。
FileHeader: {
magic 4bytes (ZKLG)
version 4bytes
dbid 8bytes
}
TxnList是事务列表,Txn表示单个事务,格式如下,Record表示具体的事务类
checksum Txnlen TxnHeader Record 0x42
来看一下FileTxnLog如何写入一条事务日志
1. append操作负责写入一条事务日志,一条事务日志包含了TxnHeader和Record两部分,这个方法是同步方法,一次只能有一个线程写日志。
2. lastZxidSeen表示当前这个类处理过的最新的事务id,如果要写入的事务id比lastZxidSeen小,记录warn信息
3. 如果文件输出流为空,就新建一个文件输出流,文件名是log.zxid
4. 先写FileHeader文件头
5. padFile(fos)扩大文件大小,在当前位置距离文件尾部还有4KB的时候会扩大文件。 currentSize记录了当前的文件大小
6. 把TxnHeader和Txn序列化到byte数组
7. 计算checksum
8. 先写checksum,再写事务的byte数组,最后写入0x42表示end of record, 写入一条事务结束
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr != null) {
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
}
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: log." +
Long.toHexString(hdr.getZxid()));
}
logFileWrite = new File(logDir, ("log." +
Long.toHexString(hdr.getZxid())));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
logStream.flush();
currentSize = fos.getChannel().position();
streamsToFlush.add(fos);
}
padFile(fos);
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);
return true;
}
return false;
}
看看如何获取日志文件中最新的事务id
1. 先从日志目录下,zxid最大的日志文件名获取zxid
2. 然后根据这个zxid获得从这个事务id开始的事务链TxnIterator。遍历这个事务链表,最后的事务就是最新的事务
public long getLastLoggedZxid() {
File[] files = getLogFiles(logDir.listFiles(), 0);
long maxLog=files.length>0?
Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;
// if a log file is more recent we must scan it to find
// the highest zxid
long zxid = maxLog;
TxnIterator itr = null;
try {
FileTxnLog txn = new FileTxnLog(logDir);
itr = txn.read(maxLog);
while (true) {
if(!itr.next())
break;
TxnHeader hdr = itr.getHeader();
zxid = hdr.getZxid();
}
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
} finally {
close(itr);
}
return zxid;
}
FileTxnLog的commit方法保证日志文件的持久化,flush方法是将文件文件内容写到页缓存。如果强制更新到硬盘,调用FileChannel的force方法强制从页缓存刷新到硬盘,并且记录写硬盘的时间,如果超过阀值就记录warn信息
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
if (forceSync) {
long startSyncNS = System.nanoTime();
log.getChannel().force(false);
long syncElapsedMS =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();
}
}
检查点和数据恢复
这里的检查点就是zxid,可以根据zxid找到对应的事务日志文件,然后在最新的快照文件上进行回放进行数据的恢复。
1. Snapshot.deserialize方法会把最新的快照文件反序列化到DataTree对象和Session中去
2. 快照文件中的最大的zxid作为数据文件目前最大的zxid
3. 用这个最大的zxid + 1去事务日志文件中找事务日志
4. 如果找到了正确的事务日志,使用processTransaction方法进行事务日志的回放
5. PlayBackListener接口提供了在回放时的钩子
// FileTxnSnapshot
public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
try {
while (true) {
// iterator points to
// the first valid txn when initialized
hdr = itr.getHeader();
if (hdr == null) {
//empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error("{}(higestZxid) > {}(next log) for type {}",
new Object[] { highestZxid, hdr.getZxid(),
hdr.getType() });
} else {
highestZxid = hdr.getZxid();
}
try {
processTransaction(hdr,dt,sessions, itr.getTxn());
} catch(KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: " +
hdr.getType() + " error: " + e.getMessage(), e);
}
listener.onTxnLoaded(hdr, itr.getTxn());
if (!itr.next())
break;
}
} finally {
if (itr != null) {
itr.close();
}
}
return highestZxid;
}
public void processTransaction(TxnHeader hdr,DataTree dt,
Map<Long, Integer> sessions, Record txn)
throws KeeperException.NoNodeException {
ProcessTxnResult rc;
switch (hdr.getType()) {
case OpCode.createSession:
sessions.put(hdr.getClientId(),
((CreateSessionTxn) txn).getTimeOut());
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
"playLog --- create session in log: 0x"
+ Long.toHexString(hdr.getClientId())
+ " with timeout: "
+ ((CreateSessionTxn) txn).getTimeOut());
}
// give dataTree a chance to sync its lastProcessedZxid
rc = dt.processTxn(hdr, txn);
break;
case OpCode.closeSession:
sessions.remove(hdr.getClientId());
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
"playLog --- close session in log: 0x"
+ Long.toHexString(hdr.getClientId()));
}
rc = dt.processTxn(hdr, txn);
break;
default:
rc = dt.processTxn(hdr, txn);
}
/**
* Snapshots are lazily created. So when a snapshot is in progress,
* there is a chance for later transactions to make into the
* snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS
* errors could occur. It should be safe to ignore these.
*/
if (rc.err != Code.OK.intValue()) {
LOG.debug("Ignoring processTxn failure hdr:" + hdr.getType()
+ ", error: " + rc.err + ", path: " + rc.path);
}
}
public interface PlayBackListener {
void onTxnLoaded(TxnHeader hdr, Record rec);
}
高可用的数据存储有一个比较通用的解决方案,就是数据文件 + 日志文件的方式。比如传统数据库中的数据文件 + undo/redo日志就可以来进行数据备份和恢复,在日志文件中加入检查点checkpoint,可以更加快速地进行数据的恢复。所以对于高可用的数据存储来说,我们要考察3个方面:
数据文件
日志文件
检查点
数据文件
ZooKeeper的数据文件采用快照文件的方式来记录和持久化运行时的数据。顶层接口是SnapShot,提供了对运行时的数据DataTree和session的序列化和反序列化操作。DataTree保存了运行时的数据。
public interface SnapShot { long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException; void serialize(DataTree dt, Map<Long, Integer> sessions, File name) throws IOException; File findMostRecentSnapshot() throws IOException; void close() throws IOException; }
SnapShot的默认实现类是FileSnapShot,提供了把DataTree和Session持久化到文件的能力。来看一下它的序列化实现
1. 创建一个具备校验和的文件输出流
2. 对象的序列化采用apache jute框架,创建一个jute的OutputArchive的实现。下面给出了OutputArchive接口的定义,可以看到它和Thrift的TProtocol的定义基本一致,提供了一系列的write类型和read类型接口,是jute 序列化的顶层接口
3. OutputArchive的默认实现是BinaryOutputArchive,和Thrift的TBinaryProtocol实现基本一致,提供了二进制的序列化协议,内部采用DataOutputStream,把不同的数据类型写到Byte数组中
4. 快照文件的文件头对象FileHeader,包含一个魔数ZKSN, 版本号和dbId。 FileHeader实现了jute的Record接口,提供了serialize和deserialize方法实现
5. 快照文件体使用SerializeUtils这个辅助类来实现,先序列化Session,序列化Session时,先写一个Long类型的SessionId,再写一个int类型的timeout。再序列化DataTree,它也实现了Jute的Record类,实现了序列化自己的serialize方法
6. DataTree的serialize方法,先序列化ACL信息,再序列化DataTree中的DataNode,采用中序遍历的方式递归遍历DataTree的所有节点。最后写入"/"表示文件结尾
// FileSnapshot public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot) throws IOException { if (!close) { OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot)); CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32()); //CheckedOutputStream cout = new CheckedOutputStream() OutputArchive oa = BinaryOutputArchive.getArchive(crcOut); FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId); serialize(dt,sessions,oa, header); long val = crcOut.getChecksum().getValue(); oa.writeLong(val, "val"); oa.writeString("/", "path"); sessOS.flush(); crcOut.close(); sessOS.close(); } } protected void serialize(DataTree dt,Map<Long, Integer> sessions, OutputArchive oa, FileHeader header) throws IOException { // this is really a programmatic error and not something that can // happen at runtime if(header==null) throw new IllegalStateException( "Snapshot's not open for writing: uninitialized header"); header.serialize(oa, "fileheader"); SerializeUtils.serializeSnapshot(dt,oa,sessions); } public class FileHeader implements Record { private int magic; private int version; private long dbid; public FileHeader() { } public void serialize(OutputArchive a_, String tag) throws java.io.IOException { a_.startRecord(this,tag); a_.writeInt(magic,"magic"); a_.writeInt(version,"version"); a_.writeLong(dbid,"dbid"); a_.endRecord(this,tag); } public void deserialize(InputArchive a_, String tag) throws java.io.IOException { a_.startRecord(tag); magic=a_.readInt("magic"); version=a_.readInt("version"); dbid=a_.readLong("dbid"); a_.endRecord(tag); } public interface Record { public void serialize(OutputArchive archive, String tag) throws IOException; public void deserialize(InputArchive archive, String tag) throws IOException; } public interface OutputArchive { public void writeByte(byte b, String tag) throws IOException; public void writeBool(boolean b, String tag) throws IOException; public void writeInt(int i, String tag) throws IOException; public void writeLong(long l, String tag) throws IOException; public void writeFloat(float f, String tag) throws IOException; public void writeDouble(double d, String tag) throws IOException; public void writeString(String s, String tag) throws IOException; public void writeBuffer(byte buf[], String tag) throws IOException; public void writeRecord(Record r, String tag) throws IOException; public void startRecord(Record r, String tag) throws IOException; public void endRecord(Record r, String tag) throws IOException; public void startVector(List v, String tag) throws IOException; public void endVector(List v, String tag) throws IOException; public void startMap(TreeMap v, String tag) throws IOException; public void endMap(TreeMap v, String tag) throws IOException; } public class BinaryOutputArchive implements OutputArchive { private ByteBuffer bb = ByteBuffer.allocate(1024); private DataOutput out; public static BinaryOutputArchive getArchive(OutputStream strm) { return new BinaryOutputArchive(new DataOutputStream(strm)); } /** Creates a new instance of BinaryOutputArchive */ public BinaryOutputArchive(DataOutput out) { this.out = out; } public void writeByte(byte b, String tag) throws IOException { out.writeByte(b); } public void writeBool(boolean b, String tag) throws IOException { out.writeBoolean(b); } public void writeInt(int i, String tag) throws IOException { out.writeInt(i); } public void writeLong(long l, String tag) throws IOException { out.writeLong(l); } public void writeFloat(float f, String tag) throws IOException { out.writeFloat(f); } public void writeDouble(double d, String tag) throws IOException { out.writeDouble(d); } // SerializeUtils public static void serializeSnapshot(DataTree dt,OutputArchive oa, Map<Long, Integer> sessions) throws IOException { HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions); oa.writeInt(sessSnap.size(), "count"); for (Entry<Long, Integer> entry : sessSnap.entrySet()) { oa.writeLong(entry.getKey().longValue(), "id"); oa.writeInt(entry.getValue().intValue(), "timeout"); } dt.serialize(oa, "tree"); } // DataTree public void serialize(OutputArchive oa, String tag) throws IOException { scount = 0; serializeList(longKeyMap, oa); serializeNode(oa, new StringBuilder("")); // / marks end of stream // we need to check if clear had been called in between the snapshot. if (root != null) { oa.writeString("/", "path"); } }
涉及到的几个接口和类
反序列化即把快照文件反序列化成DataTree的过程和序列化的过程正好相反,值得注意的是,反序列化时,找的是最新的,可用的snapshot文件
1. findNValidSnapshots找100个以内的快照文件,并且按照zxid从大到小排列,保证最新的快照最先被处理
2. 如果最新的快照被成功处理,就返回,否则找第二新的快照,直到结束
// FileSnap
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should give up
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
for (int i = 0; i < snapList.size(); i++) {
snap = snapList.get(i);
InputStream snapIS = null;
CheckedInputStream crcIn = null;
try {
LOG.info("Reading snapshot " + snap);
snapIS = new BufferedInputStream(new FileInputStream(snap));
crcIn = new CheckedInputStream(snapIS, new Adler32());
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
deserialize(dt,sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch(IOException e) {
LOG.warn("problem reading snap file " + snap, e);
} finally {
if (snapIS != null)
snapIS.close();
if (crcIn != null)
crcIn.close();
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
return dt.lastProcessedZxid;
}
private List<File> findNValidSnapshots(int n) throws IOException {
List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot", false);
int count = 0;
List<File> list = new ArrayList<File>();
for (File f : files) {
// we should catch the exceptions
// from the valid snapshot and continue
// until we find a valid one
try {
if (Util.isValidSnapshot(f)) {
list.add(f);
count++;
if (count == n) {
break;
}
}
} catch (IOException e) {
LOG.info("invalid snapshot " + f, e);
}
}
return list;
}
日志文件
ZooKeeper将事务类分为两部分,TxnHeader表示事务头,包含了事务的基本信息。xxxTxn类表示具体类型的事务,包含了事务对应的操作路径和数据。
先来看看ZooKeeper中的事务基础类定义Txn,它只包含了type属性和data属性,实现了Jute Record接口,处理自身的序列化操作。具体的增删改操作事务各自定义了单独的类,都实现了Record接口。单独的事务类和Txn类可以根据type和data互相转化。
public class Txn implements Record {
private int type;
private byte[] data;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeInt(type,"type");
a_.writeBuffer(data,"data");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
type=a_.readInt("type");
data=a_.readBuffer("data");
a_.endRecord(tag);
}
public class CreateTxn implements Record {
private String path;
private byte[] data;
private java.util.List<org.apache.zookeeper.data.ACL> acl;
private boolean ephemeral;
private int parentCVersion;
}
public class DeleteTxn implements Record {
private String path;
}
}
public class SetDataTxn implements Record {
private String path;
private byte[] data;
private int version;
}
单独事务类和Txn类互相转换的示例如下
for (Txn subtxn : txns) {
ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
Record record = null;
switch (subtxn.getType()) {
case OpCode.create:
record = new CreateTxn();
break;
case OpCode.delete:
record = new DeleteTxn();
break;
case OpCode.setData:
record = new SetDataTxn();
break;
case OpCode.error:
record = new ErrorTxn();
post_failed = true;
break;
case OpCode.check:
record = new CheckVersionTxn();
break;
default:
throw new IOException("Invalid type of op: " + subtxn.getType());
}
ByteBufferInputStream.byteBuffer2Record(bb, record);
}
TxnHeader类定义了事务的基本信息,通过type可以确定具体的事务类型
public class TxnHeader implements Record {
private long clientId;
private int cxid;
private long zxid;
private long time;
private int type;
public TxnHeader() {
}
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeLong(clientId,"clientId");
a_.writeInt(cxid,"cxid");
a_.writeLong(zxid,"zxid");
a_.writeLong(time,"time");
a_.writeInt(type,"type");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
clientId=a_.readLong("clientId");
cxid=a_.readInt("cxid");
zxid=a_.readLong("zxid");
time=a_.readLong("time");
type=a_.readInt("type");
a_.endRecord(tag);
}
TxnHeader和具体事务类的交互例子如下:
switch (header.getType()) {
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
createNode(
createTxn.getPath(),
createTxn.getData(),
createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(), header.getTime());
break;
case OpCode.delete:
DeleteTxn deleteTxn = (DeleteTxn) txn;
rc.path = deleteTxn.getPath();
deleteNode(deleteTxn.getPath(), header.getZxid());
break;
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn
.getData(), setDataTxn.getVersion(), header
.getZxid(), header.getTime());
break;
ZooKeeper的日志文件接口是TxnLog接口,它提供了对事务日志的操作。
public interface TxnLog {
void rollLog() throws IOException;
boolean append(TxnHeader hdr, Record r) throws IOException;
TxnIterator read(long zxid) throws IOException;
long getLastLoggedZxid() throws IOException;
boolean truncate(long zxid) throws IOException;
long getDbId() throws IOException;
void commit() throws IOException;
void close() throws IOException;
}
public interface TxnIterator {
TxnHeader getHeader();
Record getTxn();
boolean next() throws IOException;
void close() throws IOException;
}
}
TxnLog的默认实现类是FileTxnLog,从它的描述可以看到事务日志文件的格式如下:
FileHeader TxnList ZeroPad
其中FileHeader和数据文件中的FileHeader一样,三要素。
FileHeader: {
magic 4bytes (ZKLG)
version 4bytes
dbid 8bytes
}
TxnList是事务列表,Txn表示单个事务,格式如下,Record表示具体的事务类
checksum Txnlen TxnHeader Record 0x42
来看一下FileTxnLog如何写入一条事务日志
1. append操作负责写入一条事务日志,一条事务日志包含了TxnHeader和Record两部分,这个方法是同步方法,一次只能有一个线程写日志。
2. lastZxidSeen表示当前这个类处理过的最新的事务id,如果要写入的事务id比lastZxidSeen小,记录warn信息
3. 如果文件输出流为空,就新建一个文件输出流,文件名是log.zxid
4. 先写FileHeader文件头
5. padFile(fos)扩大文件大小,在当前位置距离文件尾部还有4KB的时候会扩大文件。 currentSize记录了当前的文件大小
6. 把TxnHeader和Txn序列化到byte数组
7. 计算checksum
8. 先写checksum,再写事务的byte数组,最后写入0x42表示end of record, 写入一条事务结束
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr != null) {
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
}
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: log." +
Long.toHexString(hdr.getZxid()));
}
logFileWrite = new File(logDir, ("log." +
Long.toHexString(hdr.getZxid())));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
logStream.flush();
currentSize = fos.getChannel().position();
streamsToFlush.add(fos);
}
padFile(fos);
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);
return true;
}
return false;
}
看看如何获取日志文件中最新的事务id
1. 先从日志目录下,zxid最大的日志文件名获取zxid
2. 然后根据这个zxid获得从这个事务id开始的事务链TxnIterator。遍历这个事务链表,最后的事务就是最新的事务
public long getLastLoggedZxid() {
File[] files = getLogFiles(logDir.listFiles(), 0);
long maxLog=files.length>0?
Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;
// if a log file is more recent we must scan it to find
// the highest zxid
long zxid = maxLog;
TxnIterator itr = null;
try {
FileTxnLog txn = new FileTxnLog(logDir);
itr = txn.read(maxLog);
while (true) {
if(!itr.next())
break;
TxnHeader hdr = itr.getHeader();
zxid = hdr.getZxid();
}
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
} finally {
close(itr);
}
return zxid;
}
FileTxnLog的commit方法保证日志文件的持久化,flush方法是将文件文件内容写到页缓存。如果强制更新到硬盘,调用FileChannel的force方法强制从页缓存刷新到硬盘,并且记录写硬盘的时间,如果超过阀值就记录warn信息
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
if (forceSync) {
long startSyncNS = System.nanoTime();
log.getChannel().force(false);
long syncElapsedMS =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();
}
}
检查点和数据恢复
这里的检查点就是zxid,可以根据zxid找到对应的事务日志文件,然后在最新的快照文件上进行回放进行数据的恢复。
1. Snapshot.deserialize方法会把最新的快照文件反序列化到DataTree对象和Session中去
2. 快照文件中的最大的zxid作为数据文件目前最大的zxid
3. 用这个最大的zxid + 1去事务日志文件中找事务日志
4. 如果找到了正确的事务日志,使用processTransaction方法进行事务日志的回放
5. PlayBackListener接口提供了在回放时的钩子
// FileTxnSnapshot
public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
try {
while (true) {
// iterator points to
// the first valid txn when initialized
hdr = itr.getHeader();
if (hdr == null) {
//empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error("{}(higestZxid) > {}(next log) for type {}",
new Object[] { highestZxid, hdr.getZxid(),
hdr.getType() });
} else {
highestZxid = hdr.getZxid();
}
try {
processTransaction(hdr,dt,sessions, itr.getTxn());
} catch(KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: " +
hdr.getType() + " error: " + e.getMessage(), e);
}
listener.onTxnLoaded(hdr, itr.getTxn());
if (!itr.next())
break;
}
} finally {
if (itr != null) {
itr.close();
}
}
return highestZxid;
}
public void processTransaction(TxnHeader hdr,DataTree dt,
Map<Long, Integer> sessions, Record txn)
throws KeeperException.NoNodeException {
ProcessTxnResult rc;
switch (hdr.getType()) {
case OpCode.createSession:
sessions.put(hdr.getClientId(),
((CreateSessionTxn) txn).getTimeOut());
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
"playLog --- create session in log: 0x"
+ Long.toHexString(hdr.getClientId())
+ " with timeout: "
+ ((CreateSessionTxn) txn).getTimeOut());
}
// give dataTree a chance to sync its lastProcessedZxid
rc = dt.processTxn(hdr, txn);
break;
case OpCode.closeSession:
sessions.remove(hdr.getClientId());
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
"playLog --- close session in log: 0x"
+ Long.toHexString(hdr.getClientId()));
}
rc = dt.processTxn(hdr, txn);
break;
default:
rc = dt.processTxn(hdr, txn);
}
/**
* Snapshots are lazily created. So when a snapshot is in progress,
* there is a chance for later transactions to make into the
* snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS
* errors could occur. It should be safe to ignore these.
*/
if (rc.err != Code.OK.intValue()) {
LOG.debug("Ignoring processTxn failure hdr:" + hdr.getType()
+ ", error: " + rc.err + ", path: " + rc.path);
}
}
public interface PlayBackListener {
void onTxnLoaded(TxnHeader hdr, Record rec);
}
相关文章推荐
- Shell脚本实现自动安装zookeeper
- 基于Zookeeper的使用详解
- mesos + marathon + docker部署
- 基于zk的配置管理
- SolrCloud4.9+zookeeper在CentOS上的搭建与安装
- 基于外部ZooKeeper的GlusterFS作为分布式文件系统的完全分布式HBase集群安装指南
- redis集群搭建
- 使用 RMI + ZooKeeper 实现远程调用框架
- 轻量级分布式 RPC 框架
- 整合Kafka到Spark Streaming——代码示例和挑战
- HBase 系统架构
- 使用zookeeper实现分布式共享锁
- 将Zookeeper集成到你的应用中
- 基于zookeeper+leveldb搭建activemq集群
- Fourinone四合一分布式计算框架整体介绍
- windows 64位 安装zookeeper
- 工作中的zookeeper
- storm
- zookeeper技术浅析
- ZOOKEEPER的作用