您的位置:首页 > 大数据 > 人工智能

solr dataimport 数据导入源码分析(五)

2012-09-09 19:48 465 查看
我们注意到EntityProcessorWrapper的初始化方法

@Override

public void init(Context context) {

delegate.init(context);

}

这里context是上下文对象,通过context获取数据源,context是抽象类

public abstract class Context

{

public abstract DataSource getDataSource();

public abstract DataSource getDataSource(String name);

public abstract EntityProcessor getEntityProcessor();

public abstract boolean isRootEntity();
}

继承类为ContextImpl,注意一下获取数据源的方法,调用DataImporter对象获取数据源(DataImporter 前文介绍过,参考solr dataimport 数据导入源码分析(二) ,通过getDataSourceInstance 方法实例化new JdbcDataSource() 对象)

ContextImpl.java

private DataSource ds;

private DataImporter dataImporter;

@Override

public DataSource getDataSource() {

if (ds != null) return ds;

if(entity == null) return null;

if (entity.dataSrc == null) {

entity.dataSrc = dataImporter.getDataSourceInstance(entity, entity.dataSource, this);

}

if (entity.dataSrc != null && docBuilder != null && docBuilder.verboseDebug &&

Context.FULL_DUMP.equals(currentProcess())) {

//debug is not yet implemented properly for deltas

entity.dataSrc = docBuilder.getDebugLogger().wrapDs(entity.dataSrc);

}

return entity.dataSrc;

}

再看DocBuidler是怎么初始化的EntityProcessorWrapper类的

DocBuidler.java
private EntityProcessorWrapper getEntityProcessor(DataConfig.Entity entity) {

if (entity.processor != null)

return entity.processor;

EntityProcessor entityProcessor = null;

if (entity.proc == null) {

entityProcessor = new SqlEntityProcessor();

} else {

try {

entityProcessor = (EntityProcessor) loadClass(entity.proc, dataImporter.getCore())

.newInstance();

} catch (Exception e) {

wrapAndThrow (SEVERE,e,

"Unable to load EntityProcessor implementation for entity:" + entity.name);

}

}

return entity.processor = new EntityProcessorWrapper(entityProcessor, this);
}

这里返回的是代理类,EntityProcessorWrapper,代理的对象默认是SqlEntityProcessor类

在 DocBuidler的buildDocument方法里面,构造并传入上下文对象,即ContextImpl对象(通过ContextImpl获取数据源)

private void buildDocument(VariableResolverImpl vr, DocWrapper doc,

Map<String, Object> pk, DataConfig.Entity entity, boolean isRoot,

ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy) {

EntityProcessorWrapper entityProcessor = getEntityProcessor(entity);

ContextImpl ctx = new ContextImpl(entity, vr, null,

pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,

session, parentCtx, this);

entityProcessor.init(ctx);

//其他代码略

}

在SqlEntityProcessor的初始化方法里面,调用ContextImpl 的DataSource getDataSource()的方法初始化数据源

SqlEntityProcessor.java

protected DataSource<Iterator<Map<String, Object>>> dataSource;

@Override

@SuppressWarnings("unchecked")

public void init(Context context) {

super.init(context);

dataSource = context.getDataSource();

}

数据查询方法如下,通过传入参数sql语句,初始化数据迭代器

protected void initQuery(String q) {

try {

DataImporter.QUERY_COUNT.get().incrementAndGet();

rowIterator = dataSource.getData(q);

this.query = q;

} catch (DataImportHandlerException e) {

throw e;

} catch (Exception e) {

LOG.error( "The query failed '" + q + "'", e);

throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e);

}
}

SqlEntityProcessor 的全部导入和增量导入方法通过调用initQuery(String q) 获取数据迭代器

最后DocBuilder 类通过包装类EntityProcessorWrapper间接调用
SqlEntityProcessor的这些全部导入和增量导入方法获取数据构造SolrInputDocument对象
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: