您的位置:首页 > 其它

学习开源推荐引擎Mahout中的刷新数据的设计

2012-03-18 15:40 351 查看
我们的系统中的数据可以以这种方式将数据大致分为两类(个人观点,欢迎指点),一类是变化的数据,一类的非变化的数据;变化的数据:每次被访问时,需要重新读取(暂时撇开缓存不考虑),他们大多以数据库或文件的方式存放着;另一部分是非变化数据:像是系统中的配置参数或地址之类,基本不会变化,一般在系统启动时加载到内存。

正如紧跟党的领导,就是紧跟变化走。我们的分类也是,你会发现当系统变大时,有那么一部分数据,它是变化的数据,依赖数据源;但它又不是直接被读取后就呈现给用户,而是经过计算后呈现给用户;这个计算由于某种原因,不能每次都计算,所以运算后的结果放在内存中不回写到数据源。于是当数据源积累了一定变化时,我们需要刷新运算结果,以保证数据显示的实时性。因此我们会用到这个主题——刷新系统中的内存数据。

简单的刷新任务,中间结果往往只有一个,计算也不会分成;我们大多数据时候会根据一定的业务,或通过定时任务的方式,或通过触发的方式,启动一个或几个线程,在后台对更新过的数据源重新进行运算,最后将上层的引用指向新的对象。于是悄悄的,在不影响访问的情况下完成了数据的刷新。

而较为复杂的刷新任务中,中间结果有多个,彼此存在着依赖关系,更新时有先后顺序,计算也因此分层。于是各种需要我们考虑的问题继踵而至:如何保证更新的顺序,如何避免重复更新导致的死循环,如何保证中间结果的可扩展;在开源项目Apache Mahout 有一些优秀的实现,这里与大家分享,也算是一个笔记,希望自己能在以后的程序中有所应用。

下面是我根据Apache Mahout的源码,自己绘制的UML图,图中只包含了和数据刷新有关的接口:



图中可以了解到,Refreshable是整个项目的核心接口,Mahout中绝大部分和数据沾边的类,都实现了这个接口,可见刷新对推荐引擎来说是多么的重要。

而图中的Client类是刷新触发者的一个总称,英文不太好,所以胡乱取了个名字。触发类往往都包含了一个RefreshHelper对象,他们实现的refresh( ) 主要也是调用RefreshHelper对象的refresh() 方法。而RefreshHelper 算是整个刷新流程的一个总指挥者,我的UML图也画的比较详细,后面会着重说。

首先,我们来看看Refreshable接口,我觉得设计者是在充分考虑了项目要做什么的情况下,设计的次接口,而不是为设计而设计,这是值得我学习。

void refresh(Collection<Refreshable> alreadyRefreshed);


接口没有返回,但它的参数很关键(Collection<Refreshabl>),一次复杂的刷新,可能会涉及到多个不同数据源的刷新,它们之间会有依赖,基本上都是调用此接口。隐藏着一个严重的的问题是—死锁,若有两个数据源的刷新相互依赖了,你等待我先刷新,我也等待你先刷新;而这个参数就可以解决这个死锁的问题:在刷新前建立一个空Collection<Refreshabl>,每次刷新接口被调用时,都将此参数传递给它,而在refresh()方法内部需要调用其他刷新接口时,都先判断下,被调用的刷新接口是否在Collection<Refreshabl>中,如果不在,则将其添加到Collection<Refreshabl>中,紧接着执行,如果在其中则不执行也不添加;如此一个刷新流程下来是不会产生死锁,感叹设计的巧妙~!下面是 RefreshHelper中实现上述思路的方法maybeRefresh, 一个负责调用其他需依赖刷新的公共方法:

public static void maybeRefresh(Collection<Refreshable> alreadyRefreshed, Refreshable refreshable) {
if (!alreadyRefreshed.contains(refreshable)) {
alreadyRefreshed.add(refreshable);
log.info("Added refreshable: {}", refreshable);
refreshable.refresh(alreadyRefreshed);
log.info("Refreshed: {}", alreadyRefreshed);
}
}


上面已经提到过,RefreshHelper是一个刷新流程的总指挥,在UML图中我们可以看见,它包含一个List<Refreshable>对象的引用dependencies;还有一个addDependency()和 removeDependency() 方法很明显就是用来添加移除依赖刷新的,由此可以在开发时较方便的扩展一个刷新流程;就是一个List.add(T)方法的调用,就不贴码了。

接下来我们看看RefreshHelper 的其他两个方法(buildRefreshed,refresh),最主要的是它的Refreshable接口的实现 refresh() ,能更详细的诠释整个刷新流程的设计思路

/**
* Typically this is called in  and is the entire body of
* that method.
*/
@Override
public void refresh(Collection<Refreshable> alreadyRefreshed) {
if (refreshLock.tryLock()) {
try {
alreadyRefreshed = buildRefreshed(alreadyRefreshed);
for (Refreshable dependency : dependencies) {
maybeRefresh(alreadyRefreshed, dependency);
}
if (refreshRunnable != null) {
try {
refreshRunnable.call();
} catch (Exception e) {
log.warn("Unexpected exception while refreshing", e);
}
}
} finally {
refreshLock.unlock();
}
}
}

/**
* Creates a new and empty {@link Collection} if the method parameter is {@code null}.
*
* @param currentAlreadyRefreshed
*          {@link Refreshable}s to refresh later on
* @return an empty {@link Collection} if the method param was {@code null} or the unmodified method
*         param.
*/
public static Collection<Refreshable> buildRefreshed(Collection<Refreshable> currentAlreadyRefreshed) {
return currentAlreadyRefreshed == null ? new HashSet<Refreshable>(3) : currentAlreadyRefreshed;
}


在refresh()方法中,第一步:加锁,以保证数据的一致性;

第二步:调用buildRefresh方法,判断传递过来的Collection<Refreshabl>是否为null,若是null建一个新的,避免空指针异常。

第三步:根据所依赖的刷新接口dependencies,循环调用maybeRefresh方法,保证此刷新所依赖的所有数据都是最新的。而maybeRefresh方法的实现前面已经介绍过了,能排重,避免死锁。

最后:对于方法自己需要刷新的数据,通过调用一个Callable 来完成;这样也是RefreshHelper中最后那个属性Callable<?> refreshRunnable;存在的意义,Callable一个类似线程Runnerable的一个接口,老鸟应该都知道。在RefreshHelper的构造函数中,要求传递,表明他是不能少的。

在Mahout的大部分调用RefreshHelper对象的实现中,都会通过匿名内部类的方式,建RefreshHelper,以实现call中的具体刷新逻辑,我们也可以参考如下实现:

this.refreshHelper = new RefreshHelper(new Callable<Object>() {
@Override
public Object call() throws TasteException {
//此处可添加刷新任务的具体逻辑
………………………………
return null;
}
});
//添加依赖刷新的接口
refreshHelper.addDependency(XXX);


以上就是Mahout中刷新流程的大体逻辑,归根到底,最好的设计永远要根据业务来;要学的还太多,哈哈

原创博客,转载请注明 /article/3478142.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐