您的位置:首页 > 理论基础 > 计算机网络

大量小文件的实时同步方案

2010-11-02 14:20 369 查看
http://jusescn.javaeye.com/blog/205013

 

http://blog.daviesliu.net/2008/04/24/sync/

传统的文件同步方案有rsync(单向) 和 unison(双向)等,它们需要扫描所有文件后进行比对,差量传输。如果文件数量达到了百万甚至千万量级,扫描所有文件将非常耗时。而且正在发生变化的往往是其中很少的一部分,这是非常低效的方式。

之前看了Amazon的Dynamo的设计文档,它们每个节点的数据是通过Hash Tree来实现同步,既有通过日志来同步的软实时特点(msyql, bdb等),也可以保证最终数据的一致性(rsync, unison等)。Hash Tree的大体思路是将所有数据存储成树状结构,每个节点的Hash是其所有子节点的Hash的Hash,叶子节点的Hash是其内容的Hash。这样一旦某个节点发生变化,其Hash的变化会迅速传播到根节点。需要同步的系统只需要不断查询跟节点的hash,一旦有变化,顺着树状结构就能够在logN级别的时间找到发生变化的内容,马上同步。

文件系统天然的是树状结构,尽管不是平衡的数。如果文件的修改时间是可靠的,可以表征文件的变化,那就可以用它作为文件的Hash值。另一方面,文件的修改通常是按顺序执行的,后修改的文件比早修改的文件具有更大的修改时间,这样就可以把一个目录内的最大修改时间作为它的修改时间,以实现Hash Tree。这样,一旦某个文件被修改,修改时间的信息就会迅速传播到根目录。

一般的文件系统都不是这样做的,目录的修改时间表示的是目录结构最后发生变化的时间,不包括子目录,否则会不堪重负。因为我们需要自己实现这个功能,利用Linux 2.6内核的新特性inotify获得某个目录内文件发生变化的信息,并把其修改时间传播到它的上级目录(以及再上级目录)。Python 有 pyinotify,watch.py的代码如下:

 

view plaincopy to clipboardprint?

#!/usr/bin/python   

  

from pyinotify import *   

import os, os.path   

  

flags = IN_CLOSE_WRITE|IN_CREATE|IN_Q_OVERFLOW   

dirs = {}   

base = '/log/lighttpd/cache/images/icon/u241'   

base = 'tmp'   

  

class UpdateParentDir(ProcessEvent):   

    def process_IN_CLOSE_WRITE(self, event):   

        print 'modify', event.pathname   

        mtime = os.path.getmtime(event.pathname)   

        p = event.path   

        while p.startswith(base):   

            m = os.path.getmtime(p)   

            if m < mtime:   

                print 'update', p   

                os.utime(p, (mtime,mtime))   

            elif m > mtime:   

                mtime = m   

            p = os.path.dirname(p)   

       

    process_IN_MODIFY = process_IN_CLOSE_WRITE   

  

    def process_IN_Q_OVERFLOW(self, event):   

        print 'over flow'   

        max_queued_events.value *= 2   

  

    def process_default(self, event):   

        pass  

  

wm = WatchManager()   

notifier = Notifier(wm, UpdateParentDir())   

dirs.update(wm.add_watch(base, flags, rec=True, auto_add=True))   

  

notifier.loop()   

 

Python代码 

#!/usr/bin/python  

  

from pyinotify import *  

import os, os.path  

  

flags = IN_CLOSE_WRITE|IN_CREATE|IN_Q_OVERFLOW  

dirs = {}  

base = '/log/lighttpd/cache/images/icon/u241'  

base = 'tmp'  

  

class UpdateParentDir(ProcessEvent):  

    def process_IN_CLOSE_WRITE(self, event):  

        print 'modify', event.pathname  

        mtime = os.path.getmtime(event.pathname)  

        p = event.path  

        while p.startswith(base):  

            m = os.path.getmtime(p)  

            if m < mtime:  

                print 'update', p  

                os.utime(p, (mtime,mtime))  

            elif m > mtime:  

                mtime = m  

            p = os.path.dirname(p)  

      

    process_IN_MODIFY = process_IN_CLOSE_WRITE  

  

    def process_IN_Q_OVERFLOW(self, event):  

        print 'over flow'  

        max_queued_events.value *= 2  

  

    def process_default(self, event):  

        pass  

  

wm = WatchManager()  

notifier = Notifier(wm, UpdateParentDir())  

dirs.update(wm.add_watch(base, flags, rec=True, auto_add=True))  

  

notifier.loop()  

 

 

在已经有Hash Tree的时候,同步就比较简单了,不停地获取根目录的修改时间并顺着目录结构往下找即可。需要注意的是,在更新完文件后,需要设置修改时间为原文件的修改时间,目录也是,保证Hash Tree的一致性,否则没法同步。mirror.py的代码如下

view plaincopy to clipboardprint?

#!/usr/bin/python   

  

import sys,time,re,urllib  

import os,os.path   

from os.path import exists, isdir, getmtime   

  

src = sys.argv[1]   

dst = sys.argv[2]   

  

def local_mirror(src, dst):   

    if exists(dst) and mtime == getmtime(dst):   

        return  

    if not isdir(src):   

        print 'update:', dst   

        open(dst,'wb').write(open(src).read())   

    else:   

        if not exists(dst):   

            os.makedirs(dst)   

        for filename in os.listdir(src):   

            local_mirror(os.path.join(src,filename), os.path.join(dst,filename))   

    os.utime(dst, (mtime,mtime))   

  

def get_info(path):   

    f = urllib.urlopen(path)   

    mtime = f.headers.get('Last-Modified')   

    if mtime:   

        mtime = time.mktime(time.strptime(mtime, '%a, %d %b %Y %H:%M:%S %Z'))   

    content = f.read()   

    f.close()   

    return int(mtime), content   

  

p = re.compile(r'([/d.]+?) +([/w/]+)')   

  

def remote_mirror(src, dst):   

    mtime, content = get_info(src)   

    if exists(dst) and mtime == int(getmtime(dst)):   

        return  

    print 'update:', dst, src   

    if not src.endswith('/'):   

        open(dst,'wb').write(content)   

    else:   

        if not exists(dst):   

            os.makedirs(dst)   

        for mt,filename in p.findall(content):   

            mt = int(float(mt))   

            lpath = dst+filename   

            if not exists(lpath) or int(getmtime(lpath)) != mt:   

                remote_mirror(src+filename, lpath)   

    os.utime(dst, (mtime,mtime))   

  

if src.startswith('http://'):   

    mirror = remote_mirror   

else:   

    mirror = local_mirror   

  

while True:   

    mirror(src, dst)   

    time.sleep(1)   

 

Python代码 

#!/usr/bin/python  

  

import sys,time,re,urllib  

import os,os.path  

from os.path import exists, isdir, getmtime  

  

src = sys.argv[1]  

dst = sys.argv[2]  

  

def local_mirror(src, dst):  

    if exists(dst) and mtime == getmtime(dst):  

        return  

    if not isdir(src):  

        print 'update:', dst  

        open(dst,'wb').write(open(src).read())  

    else:  

        if not exists(dst):  

            os.makedirs(dst)  

        for filename in os.listdir(src):  

            local_mirror(os.path.join(src,filename), os.path.join(dst,filename))  

    os.utime(dst, (mtime,mtime))  

  

def get_info(path):  

    f = urllib.urlopen(path)  

    mtime = f.headers.get('Last-Modified')  

    if mtime:  

        mtime = time.mktime(time.strptime(mtime, '%a, %d %b %Y %H:%M:%S %Z'))  

    content = f.read()  

    f.close()  

    return int(mtime), content  

  

p = re.compile(r'([/d.]+?) +([/w/]+)')  

  

def remote_mirror(src, dst):  

    mtime, content = get_info(src)  

    if exists(dst) and mtime == int(getmtime(dst)):  

        return  

    print 'update:', dst, src  

    if not src.endswith('/'):  

        open(dst,'wb').write(content)  

    else:  

        if not exists(dst):  

            os.makedirs(dst)  

        for mt,filename in p.findall(content):  

            mt = int(float(mt))  

            lpath = dst+filename  

            if not exists(lpath) or int(getmtime(lpath)) != mt:  

                remote_mirror(src+filename, lpath)  

    os.utime(dst, (mtime,mtime))  

  

if src.startswith('http://'):  

    mirror = remote_mirror  

else:  

    mirror = local_mirror  

  

while True:  

    mirror(src, dst)  

    time.sleep(1)  

 

 

如果源文件不在同一台机器上,可以通过NFS等共享过来。或者可以通过支持列目录的HTTP服务器来访问远程目录,mirror.py 已经支持这种访问方式。server.py 是用webpy做的一个简单的只是列目录的文件服务器。由于瓶颈在IO上,它的性能不是关键。server.py的代码如下:

view plaincopy to clipboardprint?

#!/usr/bin/python   

  

import os,os.path   

import web   

import time  

  

root = 'tmp'   

  

HTTP_HEADER_TIME = '%a, %d %b %Y %H:%M:%S %Z'   

  

class FileServer:   

    def GET(self, path):   

        path = root + path   

        if not os.path.exists(path):   

            return 404   

        mtime = time.localtime(os.path.getmtime(path))   

        web.header('Last-Modified', time.strftime(HTTP_HEADER_TIME, mtime))   

        if os.path.isdir(path):   

            for file in os.listdir(path):   

                if file.startswith('.'): continue  

                p = os.path.join(path,file)   

                m = os.path.getmtime(p)   

                if os.path.isdir(p):   

                    file += '/'   

                print m, file  

        else:   

            print open(path,'rb').read()   

  

urls = (   

   "(/.*)", "FileServer",   

)   

  

if __name__ == '__main__':   

    web.run(urls, globals())   

 

Python代码 

#!/usr/bin/python  

  

import os,os.path  

import web  

import time  

  

root = 'tmp'  

  

HTTP_HEADER_TIME = '%a, %d %b %Y %H:%M:%S %Z'  

  

class FileServer:  

    def GET(self, path):  

        path = root + path  

        if not os.path.exists(path):  

            return 404  

        mtime = time.localtime(os.path.getmtime(path))  

        web.header('Last-Modified', time.strftime(HTTP_HEADER_TIME, mtime))  

        if os.path.isdir(path):  

            for file in os.listdir(path):  

                if file.startswith('.'): continue  

                p = os.path.join(path,file)  

                m = os.path.getmtime(p)  

                if os.path.isdir(p):  

                    file += '/'  

                print m, file  

        else:  

            print open(path,'rb').read()  

  

urls = (  

   "(/.*)", "FileServer",  

)  

  

if __name__ == '__main__':  

    web.run(urls, globals())  

 

 

为了获得更好性能,以达到更好的实时性,Hash Tree最好是平衡的,比如BTree。如果一个文件发生变化,同步它需要进行的IO操作为N*M,其中N为数的层数,M为每层的文件数目。现在我们N为2,M最大为10000,适当减少它可以获得更好的性能,比如N为4,M为100。在以后创建目录结构时,最好能够考虑这方面的因素。

之前hongqn推荐过一个利用inotify的文件同步方案,同步方式类似于mysql和bdb等,由于过于复杂导致不可靠而没有采用。上面这个方案只用了一百多行Python代码就基本解决问题了,是不是很帅?:-)

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息