您的位置:首页 > 数据库 > MySQL

[debezium 源码分析] MySqlConnectorTask 启动和拉取数据过程分析

2018-03-13 21:41 2181 查看
MySqlConnectorTask
poll
方法会获取,并将这些数据存入
kafka
内。

start 方法

现在先分析
MySqlConnectorTask.start
方法,下面是一部分代码

...
this.taskContext.start();
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
final SourceInfo source = taskContext.source();
...


可以看到
start()
方法里调用了
taskContext.start()
方法,后者会调用
MysqlSchema.start()
方法,
MysqlSchema.start()
方法内调用了
DatabaseHistory.start()
方法,默认使用的
DatabaseHistory
实现是
KafkaDatabaseHistory
;
KafkaDatabaseHistory.start()
方法里初始化了一个
KafkaProducer
实例。

继续分析
MySqlConnectorTask.start
方法

// Get the offsets for our partition ...
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
final SourceInfo source = taskContext.source();
Map<String, ?> offsets = context.offsetStorageReader().offset(taskContext.source().partition());
if (offsets != null) {
// Set the position in our source info ...
source.setOffset(offsets);
logger.info("Found existing offset: {}", offsets);

// Before anything else, recover the database history to the specified binlog coordinates ...
taskContext.loadHistory(source);

if (source.isSnapshotInEffect()) {
// The last offset was an incomplete snapshot that we cannot recover from...
if (taskContext.isSnapshotNeverAllowed()) {
// No snapshots are allowed
String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured "
+ "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.";
throw new ConnectException(msg);
}
// Otherwise, restart a new snapshot ...
startWithSnapshot = true;
logger.info("Prior execution was an incomplete snapshot, so starting new snapshot");
} else {
// No snapshot was in effect, so we should just start reading from the binlog ...
startWithSnapshot = false;

// But check to see if the server still has those binlog coordinates ...
if (!isBinlogAvailable()) {
if (!taskContext.isSnapshotAllowedWhenNeeded()) {
String msg = "The connector is trying to read binlog starting at " + source + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed.";
throw new ConnectException(msg);
}
startWithSnapshot = true;
}
}

} else {
// We have no recorded offsets ...
if (taskContext.isSnapshotNeverAllowed()) {
// We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the
// full history of the database.
logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog");
source.setBinlogStartPoint("", 0L);// start from th
d5ca
e beginning of the binlog
taskContext.initializeHistory();

// Look to see what the first available binlog file is called, and whether it looks like binlog files have
// been purged. If so, then output a warning ...
String earliestBinlogFilename = earliestBinlogFilename();
if (earliestBinlogFilename == null) {
logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
} else if (!earliestBinlogFilename.endsWith("00001")) {
logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
}
} else {
// We are allowed to use snapshots, and that is the best way to start ...
startWithSnapshot = true;
// The snapshot will determine if GTIDs are set
logger.info("Found no existing offset, so preparing to perform a snapshot");
// The snapshot will also initialize history ...
}
}


首先看
offset != null
的内容,如果允许
snapshot
,就设置
startWithSnapshot
true
;否则设为
false
,但是如果这个时候
binlog
,不可用并且设值必要时刻不可用
snapshot
,那么就会抛出异常;如果设置为必要时刻可以
snapshot
,那么就
startWithSnapshot
true
;

现在看
offsets == null
部分的内容,
offset
null
表示从之前不存在同名的
debezium
订阅任务;如果用户上传的配置信息里不允许
snapshot
,那么从
binlog
0
开始的位置进行消费, 之后会调用
earliestBinlogFilename()
方法,获取最早的
binlog
日志名。

protected String earliestBinlogFilename() {
// Accumulate the available binlog filenames ...
List<String> logNames = new ArrayList<>();
try {
logger.info("Checking all known binlogs from MySQL");
taskContext.jdbc().query("SHOW BINARY LOGS", rs -> {
while (rs.next()) {
logNames.add(rs.getString(1));
}
});
} catch (SQLException e) {
throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", e);
}

if (logNames.isEmpty()) return null;
return logNames.get(0);
}


如果用户允许
snapshot
,将
startWithSnapshot
设置为
true
;

下面是剩下的
MySqlConnectorTask.start
内容

// Check whether the row-level binlog is enabled ...
final boolean rowBinlogEnabled = isRowBinlogEnabled();

// Set up the readers, with a callback to `completeReaders` so that we know when it is finished ...
readers = new ChainedReader();
readers.uponCompletion(this::completeReaders);
BinlogReader binlogReader = new BinlogReader("binlog", taskContext);
if (startWithSnapshot) {
// We're supposed to start with a snapshot, so set that up ...
SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
snapshotReader.useMinimalBlocking(taskContext.useMinimalSnapshotLocking());
if (snapshotEventsAreInserts) snapshotReader.generateInsertEvents();
readers.add(snapshotReader);

if (taskContext.isInitialSnapshotOnly()) {
logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
readers.add(new BlockingReader("blocker"));
readers.uponCompletion("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
} else {
if (!rowBinlogEnabled) {
throw new ConnectException("The MySQL server is not configured to use a row-level binlog, which is "
+ "required for this connector to work properly. Change the MySQL configuration to use a "
+ "row-level binlog and restart the connector.");
}
readers.add(binlogReader);
}
} else {
if (!rowBinlogEnabled) {
throw new ConnectException(
"The MySQL server does not appear to be using a row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
}
// We're going to start by reading the binlog ...
readers.add(binlogReader);
}

// And finally initialize and start the chain of readers ...
this.readers.initialize();
this.readers.start();


先介绍
2
Reader
的功能: 1)
BinlogReader
订阅最新的
binlog
数据;2)
SnapshotReader
订阅全量数据;

如果
startWithSnapshot
true
并且不是
initial_only
模式,就会向
readers
里添加
BinlogReader
SnapshotReader
实例,否则如果只是
startWithSnapshot
true
,那么会添加
SnapshotReader
BlockingReader
实例,后者会阻塞订阅任务的运行(因为用户配置的为
initial_only
模式);如果为
false
就只添加
BinlogReader
;

poll 方法

下面是
MysqlConnectorTask.poll
代码

@Override
public List<SourceRecord> poll() throws InterruptedException {
Reader currentReader = readers;
if (currentReader == null) {
return null;
}
PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
try {
logger.trace("Polling for events");
return currentReader.poll();
} finally {
prevLoggingContext.restore();
}
}


currentReader.poll()
方法里会调用
AbstractReader.poll
方法:

@Override
public List<SourceRecord> poll() throws InterruptedException {
// Before we do anything else, determine if there was a failure and throw that exception ...
failureException = this.failure.get();
if (failureException != null) {
// In this case, we'll throw the exception and the Kafka Connect worker or EmbeddedEngine
// will then explicitly stop the connector task. Most likely, however, the reader that threw
// the exception will have already stopped itself and will generate no additional records.
// Regardless, there may be records on the queue that will never be consumed.
throw failureException;
}

// this reader has been stopped before it reached the success or failed end state, so clean up and abort
if (!running.get()) {
cleanupResources();
throw new InterruptedException( "Reader was stopped while polling" );
}

logger.trace("Polling for next batch of records");
List<SourceRecord> batch = new ArrayList<>(maxBatchSize);
while (running.get() && (records.drainTo(batch, maxBatchSize) == 0) && !success.get()) {
// No records are available even though the snapshot has not yet completed, so sleep for a bit ...
metronome.pause();

// Check for failure after waking up ...
failureException = this.failure.get();
if (failureException != null) throw failureException;
}

if (batch.isEmpty() && success.get() && records.isEmpty()) {
// We found no records but the operation completed successfully, so we're done
this.running.set(false);
cleanupResources();
return null;
}
pollComplete(batch);
logger.trace("Completed batch of {} records", batch.size());
return batch;
}


注意这里的
batch
,它是最后会返回的拉取到的数据,这个
list
records
这个
BlockingQueue
来填充。
records
enqueueRecord
被填充数据。

protected void enqueueRecord(SourceRecord record) throws InterruptedException {
if (record != null) {
if (logger.isTraceEnabled()) {
logger.trace("Enqueuing source record: {}", record);
}
this.records.put(record);
}
}


这个方法会被
SnapshotReader
BinlogReader
调用;,这
2
Reader
主要用来收集解析数据。目前为止,
kafka connect
调用
poll
拉取数据的过程已经清楚了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: