您的位置:首页 > 运维架构 > 网站架构

从ZooKeeper源代码看如何实现分布式系统(二)数据的高可用存储

2016-01-08 11:22 603 查看
    这篇先从数据的高可用存储说起。ZooKeeper提供了分布式的目录服务,它存储的数据相比一个分布式存储系统来说很少,它主要是用来做分布式协同操作的。但是麻雀虽小,五脏俱全,ZooKeeper也必须要提供数据的高可用存储,对数据进行备份和恢复,以防出现服务器宕机导致数据丢失的情况。

    高可用的数据存储有一个比较通用的解决方案,就是数据文件 + 日志文件的方式。比如传统数据库中的数据文件 + undo/redo日志就可以来进行数据备份和恢复,在日志文件中加入检查点checkpoint,可以更加快速地进行数据的恢复。所以对于高可用的数据存储来说,我们要考察3个方面:

数据文件
日志文件
检查点
数据文件

ZooKeeper的数据文件采用快照文件的方式来记录和持久化运行时的数据。顶层接口是SnapShot,提供了对运行时的数据DataTree和session的序列化和反序列化操作。DataTree保存了运行时的数据。

public interface SnapShot {

long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException;

void serialize(DataTree dt, Map<Long, Integer> sessions,
File name)
throws IOException;

File findMostRecentSnapshot() throws IOException;

void close() throws IOException;
}


SnapShot的默认实现类是FileSnapShot,提供了把DataTree和Session持久化到文件的能力。来看一下它的序列化实现

1. 创建一个具备校验和的文件输出流

2. 对象的序列化采用apache jute框架,创建一个jute的OutputArchive的实现。下面给出了OutputArchive接口的定义,可以看到它和Thrift的TProtocol的定义基本一致,提供了一系列的write类型和read类型接口,是jute 序列化的顶层接口

3. OutputArchive的默认实现是BinaryOutputArchive,和Thrift的TBinaryProtocol实现基本一致,提供了二进制的序列化协议,内部采用DataOutputStream,把不同的数据类型写到Byte数组中

4. 快照文件的文件头对象FileHeader,包含一个魔数ZKSN, 版本号和dbId。 FileHeader实现了jute的Record接口,提供了serialize和deserialize方法实现

5. 快照文件体使用SerializeUtils这个辅助类来实现,先序列化Session,序列化Session时,先写一个Long类型的SessionId,再写一个int类型的timeout。再序列化DataTree,它也实现了Jute的Record类,实现了序列化自己的serialize方法

6. DataTree的serialize方法,先序列化ACL信息,再序列化DataTree中的DataNode,采用中序遍历的方式递归遍历DataTree的所有节点。最后写入"/"表示文件结尾

// FileSnapshot
public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
throws IOException {
if (!close) {
OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32());
//CheckedOutputStream cout = new CheckedOutputStream()
OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
serialize(dt,sessions,oa, header);
long val = crcOut.getChecksum().getValue();
oa.writeLong(val, "val");
oa.writeString("/", "path");
sessOS.flush();
crcOut.close();
sessOS.close();
}
}

 protected void serialize(DataTree dt,Map<Long, Integer> sessions,
            OutputArchive oa, FileHeader header) throws IOException {
        // this is really a programmatic error and not something that can
        // happen at runtime
        if(header==null)
            throw new IllegalStateException(
                    "Snapshot's not open for writing: uninitialized header");
        header.serialize(oa, "fileheader");
        SerializeUtils.serializeSnapshot(dt,oa,sessions);
    }

public class FileHeader implements Record {
  private int magic;
  private int version;
  private long dbid;
  public FileHeader() {
  }
 public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(this,tag);
    a_.writeInt(magic,"magic");
    a_.writeInt(version,"version");
    a_.writeLong(dbid,"dbid");
    a_.endRecord(this,tag);
  }
  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(tag);
    magic=a_.readInt("magic");
    version=a_.readInt("version");
    dbid=a_.readLong("dbid");
    a_.endRecord(tag);
}

public interface Record {
    public void serialize(OutputArchive archive, String tag)
        throws IOException;
    public void deserialize(InputArchive archive, String tag)
        throws IOException;
}
public interface OutputArchive {
    public void writeByte(byte b, String tag) throws IOException;
    public void writeBool(boolean b, String tag) throws IOException;
    public void writeInt(int i, String tag) throws IOException;
    public void writeLong(long l, String tag) throws IOException;
    public void writeFloat(float f, String tag) throws IOException;
    public void writeDouble(double d, String tag) throws IOException;
    public void writeString(String s, String tag) throws IOException;
    public void writeBuffer(byte buf[], String tag)
        throws IOException;
    public void writeRecord(Record r, String tag) throws IOException;
    public void startRecord(Record r, String tag) throws IOException;
    public void endRecord(Record r, String tag) throws IOException;
    public void startVector(List v, String tag) throws IOException;
    public void endVector(List v, String tag) throws IOException;
    public void startMap(TreeMap v, String tag) throws IOException;
    public void endMap(TreeMap v, String tag) throws IOException;

}

public class BinaryOutputArchive implements OutputArchive {
    private ByteBuffer bb = ByteBuffer.allocate(1024);

    private DataOutput out;
    
    public static BinaryOutputArchive getArchive(OutputStream strm) {
        return new BinaryOutputArchive(new DataOutputStream(strm));
    }
    
    /** Creates a new instance of BinaryOutputArchive */
    public BinaryOutputArchive(DataOutput out) {
        this.out = out;
    }
    
    public void writeByte(byte b, String tag) throws IOException {
        out.writeByte(b);
    }
    
    public void writeBool(boolean b, String tag) throws IOException {
        out.writeBoolean(b);
    }
    
    public void writeInt(int i, String tag) throws IOException {
        out.writeInt(i);
    }
    
    public void writeLong(long l, String tag) throws IOException {
        out.writeLong(l);
    }
    
    public void writeFloat(float f, String tag) throws IOException {
        out.writeFloat(f);
    }
    
    public void writeDouble(double d, String tag) throws IOException {
        out.writeDouble(d);
    }

// SerializeUtils
 public static void serializeSnapshot(DataTree dt,OutputArchive oa,
            Map<Long, Integer> sessions) throws IOException {
        HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);
        oa.writeInt(sessSnap.size(), "count");
        for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
            oa.writeLong(entry.getKey().longValue(), "id");
            oa.writeInt(entry.getValue().intValue(), "timeout");
        }
        dt.serialize(oa, "tree");
    }

// DataTree
 public void serialize(OutputArchive oa, String tag) throws IOException {
        scount = 0;
        serializeList(longKeyMap, oa);
        serializeNode(oa, new StringBuilder(""));
        // / marks end of stream
        // we need to check if clear had been called in between the snapshot.
        if (root != null) {
            oa.writeString("/", "path");
        }
    }
 


涉及到的几个接口和类



反序列化即把快照文件反序列化成DataTree的过程和序列化的过程正好相反,值得注意的是,反序列化时,找的是最新的,可用的snapshot文件

1. findNValidSnapshots找100个以内的快照文件,并且按照zxid从大到小排列,保证最新的快照最先被处理

2. 如果最新的快照被成功处理,就返回,否则找第二新的快照,直到结束

// FileSnap
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should give up
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
for (int i = 0; i < snapList.size(); i++) {
snap = snapList.get(i);
InputStream snapIS = null;
CheckedInputStream crcIn = null;
try {
LOG.info("Reading snapshot " + snap);
snapIS = new BufferedInputStream(new FileInputStream(snap));
crcIn = new CheckedInputStream(snapIS, new Adler32());
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
deserialize(dt,sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch(IOException e) {
LOG.warn("problem reading snap file " + snap, e);
} finally {
if (snapIS != null)
snapIS.close();
if (crcIn != null)
crcIn.close();
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
return dt.lastProcessedZxid;
}

private List<File> findNValidSnapshots(int n) throws IOException {
        List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot", false);
        int count = 0;
        List<File> list = new ArrayList<File>();
        for (File f : files) {
            // we should catch the exceptions
            // from the valid snapshot and continue
            // until we find a valid one
            try {
                if (Util.isValidSnapshot(f)) {
                    list.add(f);
                    count++;
                    if (count == n) {
                        break;
                    }
                }
            } catch (IOException e) {
                LOG.info("invalid snapshot " + f, e);
            }
        }
        return list;
    }




日志文件

ZooKeeper将事务类分为两部分,TxnHeader表示事务头,包含了事务的基本信息。xxxTxn类表示具体类型的事务,包含了事务对应的操作路径和数据。

先来看看ZooKeeper中的事务基础类定义Txn,它只包含了type属性和data属性,实现了Jute Record接口,处理自身的序列化操作。具体的增删改操作事务各自定义了单独的类,都实现了Record接口。单独的事务类和Txn类可以根据type和data互相转化。

public class Txn implements Record {
private int type;
private byte[] data;
  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(this,tag);
    a_.writeInt(type,"type");
    a_.writeBuffer(data,"data");
    a_.endRecord(this,tag);
  }
  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(tag);
    type=a_.readInt("type");
    data=a_.readBuffer("data");
    a_.endRecord(tag);
}

public class CreateTxn implements Record {
  private String path;
  private byte[] data;
  private java.util.List<org.apache.zookeeper.data.ACL> acl;
  private boolean ephemeral;
  private int parentCVersion;
}

public class DeleteTxn implements Record {
  private String path;
}
}

public class SetDataTxn implements Record {
  private String path;
  private byte[] data;
  private int version;
}


单独事务类和Txn类互相转换的示例如下
for (Txn subtxn : txns) {
ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
Record record = null;
switch (subtxn.getType()) {
case OpCode.create:
record = new CreateTxn();
break;
case OpCode.delete:
record = new DeleteTxn();
break;
case OpCode.setData:
record = new SetDataTxn();
break;
case OpCode.error:
record = new ErrorTxn();
post_failed = true;
break;
case OpCode.check:
record = new CheckVersionTxn();
break;
default:
throw new IOException("Invalid type of op: " + subtxn.getType());
}

ByteBufferInputStream.byteBuffer2Record(bb, record);
}


TxnHeader类定义了事务的基本信息,通过type可以确定具体的事务类型

public class TxnHeader implements Record {
  private long clientId;
  private int cxid;
  private long zxid;
  private long time;
  private int type;
  public TxnHeader() {
  }
  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(this,tag);
    a_.writeLong(clientId,"clientId");
    a_.writeInt(cxid,"cxid");
    a_.writeLong(zxid,"zxid");
    a_.writeLong(time,"time");
    a_.writeInt(type,"type");
    a_.endRecord(this,tag);
  }
  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(tag);
    clientId=a_.readLong("clientId");
    cxid=a_.readInt("cxid");
    zxid=a_.readLong("zxid");
    time=a_.readLong("time");
    type=a_.readInt("type");
    a_.endRecord(tag);
}
TxnHeader和具体事务类的交互例子如下:

switch (header.getType()) {
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
createNode(
createTxn.getPath(),
createTxn.getData(),
createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(), header.getTime());
break;
case OpCode.delete:
DeleteTxn deleteTxn = (DeleteTxn) txn;
rc.path = deleteTxn.getPath();
deleteNode(deleteTxn.getPath(), header.getZxid());
break;
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn
.getData(), setDataTxn.getVersion(), header
.getZxid(), header.getTime());
break;

ZooKeeper的日志文件接口是TxnLog接口,它提供了对事务日志的操作。

public interface TxnLog {

void rollLog() throws IOException;

boolean append(TxnHeader hdr, Record r) throws IOException;

TxnIterator read(long zxid) throws IOException;

long getLastLoggedZxid() throws IOException;

boolean truncate(long zxid) throws IOException;

long getDbId() throws IOException;

void commit() throws IOException;

void close() throws IOException;
}

public interface TxnIterator {
        TxnHeader getHeader();
        
        Record getTxn();
  
        boolean next() throws IOException;
 
        void close() throws IOException;
    }

}


TxnLog的默认实现类是FileTxnLog,从它的描述可以看到事务日志文件的格式如下:
FileHeader TxnList ZeroPad

其中FileHeader和数据文件中的FileHeader一样,三要素。
FileHeader: {
magic 4bytes (ZKLG)
version 4bytes
dbid 8bytes
}

TxnList是事务列表,Txn表示单个事务,格式如下,Record表示具体的事务类
checksum Txnlen TxnHeader Record 0x42

来看一下FileTxnLog如何写入一条事务日志
1. append操作负责写入一条事务日志,一条事务日志包含了TxnHeader和Record两部分,这个方法是同步方法,一次只能有一个线程写日志。

2. lastZxidSeen表示当前这个类处理过的最新的事务id,如果要写入的事务id比lastZxidSeen小,记录warn信息

3. 如果文件输出流为空,就新建一个文件输出流,文件名是log.zxid

4. 先写FileHeader文件头

5. padFile(fos)扩大文件大小,在当前位置距离文件尾部还有4KB的时候会扩大文件。 currentSize记录了当前的文件大小

6. 把TxnHeader和Txn序列化到byte数组

7. 计算checksum

8. 先写checksum,再写事务的byte数组,最后写入0x42表示end of record, 写入一条事务结束

public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr != null) {
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
}
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: log." +
Long.toHexString(hdr.getZxid()));
}

logFileWrite = new File(logDir, ("log." +
Long.toHexString(hdr.getZxid())));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
logStream.flush();
currentSize = fos.getChannel().position();
streamsToFlush.add(fos);
}
padFile(fos);
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);

return true;
}
return false;
}

看看如何获取日志文件中最新的事务id
1. 先从日志目录下,zxid最大的日志文件名获取zxid

2. 然后根据这个zxid获得从这个事务id开始的事务链TxnIterator。遍历这个事务链表,最后的事务就是最新的事务

public long getLastLoggedZxid() {
File[] files = getLogFiles(logDir.listFiles(), 0);
long maxLog=files.length>0?
Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;

// if a log file is more recent we must scan it to find
// the highest zxid
long zxid = maxLog;
TxnIterator itr = null;
try {
FileTxnLog txn = new FileTxnLog(logDir);
itr = txn.read(maxLog);
while (true) {
if(!itr.next())
break;
TxnHeader hdr = itr.getHeader();
zxid = hdr.getZxid();
}
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
} finally {
close(itr);
}
return zxid;
}

FileTxnLog的commit方法保证日志文件的持久化,flush方法是将文件文件内容写到页缓存。如果强制更新到硬盘,调用FileChannel的force方法强制从页缓存刷新到硬盘,并且记录写硬盘的时间,如果超过阀值就记录warn信息
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
if (forceSync) {
long startSyncNS = System.nanoTime();

log.getChannel().force(false);

long syncElapsedMS =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();
}
}
检查点和数据恢复
这里的检查点就是zxid,可以根据zxid找到对应的事务日志文件,然后在最新的快照文件上进行回放进行数据的恢复。

1. Snapshot.deserialize方法会把最新的快照文件反序列化到DataTree对象和Session中去

2. 快照文件中的最大的zxid作为数据文件目前最大的zxid

3. 用这个最大的zxid + 1去事务日志文件中找事务日志

4. 如果找到了正确的事务日志,使用processTransaction方法进行事务日志的回放

5. PlayBackListener接口提供了在回放时的钩子

// FileTxnSnapshot
public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
try {
while (true) {
// iterator points to
// the first valid txn when initialized
hdr = itr.getHeader();
if (hdr == null) {
//empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error("{}(higestZxid) > {}(next log) for type {}",
new Object[] { highestZxid, hdr.getZxid(),
hdr.getType() });
} else {
highestZxid = hdr.getZxid();
}
try {
processTransaction(hdr,dt,sessions, itr.getTxn());
} catch(KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: " +
hdr.getType() + " error: " + e.getMessage(), e);
}
listener.onTxnLoaded(hdr, itr.getTxn());
if (!itr.next())
break;
}
} finally {
if (itr != null) {
itr.close();
}
}
return highestZxid;
}

 public void processTransaction(TxnHeader hdr,DataTree dt,
            Map<Long, Integer> sessions, Record txn)
        throws KeeperException.NoNodeException {
        ProcessTxnResult rc;
        switch (hdr.getType()) {
        case OpCode.createSession:
            sessions.put(hdr.getClientId(),
                    ((CreateSessionTxn) txn).getTimeOut());
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
                        "playLog --- create session in log: 0x"
                                + Long.toHexString(hdr.getClientId())
                                + " with timeout: "
                                + ((CreateSessionTxn) txn).getTimeOut());
            }
            // give dataTree a chance to sync its lastProcessedZxid
            rc = dt.processTxn(hdr, txn);
            break;
        case OpCode.closeSession:
            sessions.remove(hdr.getClientId());
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
                        "playLog --- close session in log: 0x"
                                + Long.toHexString(hdr.getClientId()));
            }
            rc = dt.processTxn(hdr, txn);
            break;
        default:
            rc = dt.processTxn(hdr, txn);
        }

        /**
         * Snapshots are lazily created. So when a snapshot is in progress,
         * there is a chance for later transactions to make into the
         * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS
         * errors could occur. It should be safe to ignore these.
         */
        if (rc.err != Code.OK.intValue()) {
            LOG.debug("Ignoring processTxn failure hdr:" + hdr.getType()
                    + ", error: " + rc.err + ", path: " + rc.path);
        }
    }

 public interface PlayBackListener {
        void onTxnLoaded(TxnHeader hdr, Record rec);
    }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  zookeeper