[debezium 源码分析] MySqlConnectorTask 启动和拉取数据过程分析
2018-03-13 21:41
2181 查看
MySqlConnectorTask的
poll方法会获取,并将这些数据存入
kafka内。
start 方法
现在先分析MySqlConnectorTask.start方法,下面是一部分代码
... this.taskContext.start(); boolean startWithSnapshot = false; boolean snapshotEventsAreInserts = true; final SourceInfo source = taskContext.source(); ...
可以看到
start()方法里调用了
taskContext.start()方法,后者会调用
MysqlSchema.start()方法,
MysqlSchema.start()方法内调用了
DatabaseHistory.start()方法,默认使用的
DatabaseHistory实现是
KafkaDatabaseHistory;
KafkaDatabaseHistory.start()方法里初始化了一个
KafkaProducer实例。
继续分析
MySqlConnectorTask.start方法
// Get the offsets for our partition ... boolean startWithSnapshot = false; boolean snapshotEventsAreInserts = true; final SourceInfo source = taskContext.source(); Map<String, ?> offsets = context.offsetStorageReader().offset(taskContext.source().partition()); if (offsets != null) { // Set the position in our source info ... source.setOffset(offsets); logger.info("Found existing offset: {}", offsets); // Before anything else, recover the database history to the specified binlog coordinates ... taskContext.loadHistory(source); if (source.isSnapshotInEffect()) { // The last offset was an incomplete snapshot that we cannot recover from... if (taskContext.isSnapshotNeverAllowed()) { // No snapshots are allowed String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured " + "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed."; throw new ConnectException(msg); } // Otherwise, restart a new snapshot ... startWithSnapshot = true; logger.info("Prior execution was an incomplete snapshot, so starting new snapshot"); } else { // No snapshot was in effect, so we should just start reading from the binlog ... startWithSnapshot = false; // But check to see if the server still has those binlog coordinates ... if (!isBinlogAvailable()) { if (!taskContext.isSnapshotAllowedWhenNeeded()) { String msg = "The connector is trying to read binlog starting at " + source + ", but this is no longer " + "available on the server. Reconfigure the connector to use a snapshot when needed."; throw new ConnectException(msg); } startWithSnapshot = true; } } } else { // We have no recorded offsets ... if (taskContext.isSnapshotNeverAllowed()) { // We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the // full history of the database. logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog"); source.setBinlogStartPoint("", 0L);// start from th d5ca e beginning of the binlog taskContext.initializeHistory(); // Look to see what the first available binlog file is called, and whether it looks like binlog files have // been purged. If so, then output a warning ... String earliestBinlogFilename = earliestBinlogFilename(); if (earliestBinlogFilename == null) { logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled."); } else if (!earliestBinlogFilename.endsWith("00001")) { logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required."); } } else { // We are allowed to use snapshots, and that is the best way to start ... startWithSnapshot = true; // The snapshot will determine if GTIDs are set logger.info("Found no existing offset, so preparing to perform a snapshot"); // The snapshot will also initialize history ... } }
首先看
offset != null的内容,如果允许
snapshot,就设置
startWithSnapshot为
true;否则设为
false,但是如果这个时候
binlog,不可用并且设值必要时刻不可用
snapshot,那么就会抛出异常;如果设置为必要时刻可以
snapshot,那么就
startWithSnapshot为
true;
现在看
offsets == null部分的内容,
offset为
null表示从之前不存在同名的
debezium订阅任务;如果用户上传的配置信息里不允许
snapshot,那么从
binlog为
0开始的位置进行消费, 之后会调用
earliestBinlogFilename()方法,获取最早的
binlog日志名。
protected String earliestBinlogFilename() { // Accumulate the available binlog filenames ... List<String> logNames = new ArrayList<>(); try { logger.info("Checking all known binlogs from MySQL"); taskContext.jdbc().query("SHOW BINARY LOGS", rs -> { while (rs.next()) { logNames.add(rs.getString(1)); } }); } catch (SQLException e) { throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", e); } if (logNames.isEmpty()) return null; return logNames.get(0); }
如果用户允许
snapshot,将
startWithSnapshot设置为
true;
下面是剩下的
MySqlConnectorTask.start内容
// Check whether the row-level binlog is enabled ... final boolean rowBinlogEnabled = isRowBinlogEnabled(); // Set up the readers, with a callback to `completeReaders` so that we know when it is finished ... readers = new ChainedReader(); readers.uponCompletion(this::completeReaders); BinlogReader binlogReader = new BinlogReader("binlog", taskContext); if (startWithSnapshot) { // We're supposed to start with a snapshot, so set that up ... SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext); snapshotReader.useMinimalBlocking(taskContext.useMinimalSnapshotLocking()); if (snapshotEventsAreInserts) snapshotReader.generateInsertEvents(); readers.add(snapshotReader); if (taskContext.isInitialSnapshotOnly()) { logger.warn("This connector will only perform a snapshot, and will stop after that completes."); readers.add(new BlockingReader("blocker")); readers.uponCompletion("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate."); } else { if (!rowBinlogEnabled) { throw new ConnectException("The MySQL server is not configured to use a row-level binlog, which is " + "required for this connector to work properly. Change the MySQL configuration to use a " + "row-level binlog and restart the connector."); } readers.add(binlogReader); } } else { if (!rowBinlogEnabled) { throw new ConnectException( "The MySQL server does not appear to be using a row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector."); } // We're going to start by reading the binlog ... readers.add(binlogReader); } // And finally initialize and start the chain of readers ... this.readers.initialize(); this.readers.start();
先介绍
2个
Reader的功能: 1)
BinlogReader订阅最新的
binlog数据;2)
SnapshotReader订阅全量数据;
如果
startWithSnapshot为
true并且不是
initial_only模式,就会向
readers里添加
BinlogReader和
SnapshotReader实例,否则如果只是
startWithSnapshot为
true,那么会添加
SnapshotReader和
BlockingReader实例,后者会阻塞订阅任务的运行(因为用户配置的为
initial_only模式);如果为
false就只添加
BinlogReader;
poll 方法
下面是MysqlConnectorTask.poll代码
@Override public List<SourceRecord> poll() throws InterruptedException { Reader currentReader = readers; if (currentReader == null) { return null; } PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task"); try { logger.trace("Polling for events"); return currentReader.poll(); } finally { prevLoggingContext.restore(); } }
currentReader.poll()方法里会调用
AbstractReader.poll方法:
@Override public List<SourceRecord> poll() throws InterruptedException { // Before we do anything else, determine if there was a failure and throw that exception ... failureException = this.failure.get(); if (failureException != null) { // In this case, we'll throw the exception and the Kafka Connect worker or EmbeddedEngine // will then explicitly stop the connector task. Most likely, however, the reader that threw // the exception will have already stopped itself and will generate no additional records. // Regardless, there may be records on the queue that will never be consumed. throw failureException; } // this reader has been stopped before it reached the success or failed end state, so clean up and abort if (!running.get()) { cleanupResources(); throw new InterruptedException( "Reader was stopped while polling" ); } logger.trace("Polling for next batch of records"); List<SourceRecord> batch = new ArrayList<>(maxBatchSize); while (running.get() && (records.drainTo(batch, maxBatchSize) == 0) && !success.get()) { // No records are available even though the snapshot has not yet completed, so sleep for a bit ... metronome.pause(); // Check for failure after waking up ... failureException = this.failure.get(); if (failureException != null) throw failureException; } if (batch.isEmpty() && success.get() && records.isEmpty()) { // We found no records but the operation completed successfully, so we're done this.running.set(false); cleanupResources(); return null; } pollComplete(batch); logger.trace("Completed batch of {} records", batch.size()); return batch; }
注意这里的
batch,它是最后会返回的拉取到的数据,这个
list由
records这个
BlockingQueue来填充。
records在
enqueueRecord被填充数据。
protected void enqueueRecord(SourceRecord record) throws InterruptedException { if (record != null) { if (logger.isTraceEnabled()) { logger.trace("Enqueuing source record: {}", record); } this.records.put(record); } }
这个方法会被
SnapshotReader或
BinlogReader调用;,这
2个
Reader主要用来收集解析数据。目前为止,
kafka connect调用
poll拉取数据的过程已经清楚了。
相关文章推荐
- Android Activty的加载过程 启动流程 源码分析
- TOMCAT源码分析及启动过程
- Android SurfaceFlinger服务启动过程源码分析
- Zookeeper源码分析(一)-Zookeeper启动过程
- Hyperledger fabric 源码分析之 peer 服务启动过程
- Flume-NG启动过程源码分析(3)
- TQ2440 学习笔记—— 31、移植U-Boot【U-Boot 的启动过程第二阶段源码分析】
- android 4.04的应用程序启动过程及与Zygote的交互(基于静态源码分析)
- 源码分析vxWorks6.x的ARM板BSP启动过程(上)
- 源码分析Flume启动过程
- spark内核揭秘-13-Worker中Executor启动过程源码分析
- Linux内核源码分析--内核启动之(1)zImage自解压过程(Linux-3.0 ARMv7)
- [置顶] android源码4.4.2----系统启动过程分析
- Linux内核源码分析--内核启动命令行的传递过程(Linux-3.0 ARMv7)
- Android 8.0系统源码分析--openCamera启动过程源码分析
- Hadoop源码分析HDFS Client向HDFS写入数据的过程解析
- 城市公交数据下载(续)分析过程及源码(支持全国440个城市)
- Linux内核源码分析--内核启动命令行的传递过程(Linux-3.0 ARMv7)
- System Server进程启动过程源码分析
- Android 4.0 Launcher2源码分析——启动过程分析