您的位置:首页 > 其它

RocketMQ源码解析-PullConsumer取消息(2)

2017-10-30 22:46 751 查看
如果在调用DefaultMQPullConsumer的pull方法的时候添加了pullcallback参数,那么就会调用DefaultMQPullConsumerImpl的pullAsyncImpl()方法进行异步发送。

private void pullAsyncImpl(//
final MessageQueue mq,//
final String subExpression,//
final long offset,//
final int maxNums,//
final PullCallback pullCallback,//
final boolean block,//
final long timeout) throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK();

if (null == mq) {
throw new MQClientException("mq is null", null);
}

if (offset < 0) {
throw new MQClientException("offset < 0", null);
}

if (maxNums <= 0) {
throw new MQClientException("maxNums <= 0", null);
}

if (null == pullCallback) {
throw new MQClientException("pullCallback is null", null);
}

this.subscriptionAutomatically(mq.getTopic());

try {
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);

final SubscriptionData subscriptionData;
try {
subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}

long timeoutMillis =
block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;

this.pullAPIWrapper.pullKernelImpl(//
mq, // 1
subscriptionData.getSubString(), // 2
0L, // 3
offset, // 4
maxNums, // 5
sysFlag, // 6
0, // 7
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
timeoutMillis, // 9
CommunicationMode.ASYNC, // 10
new PullCallback() {

@Override
public void onException(Throwable e) {
pullCallback.onException(e);
}

@Override
public void onSuccess(PullResult pullResult) {
pullCallback.onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper
.processPullResult(mq, pullResult, subscriptionData));
}
});
} catch (MQBrokerException e) {
throw new MQClientException("pullAsync unknow exception", e);
}
}


与同步发送的方法区别最大之处在于,异步发送的时候会要求传入一个实现了pullCallBack接口的参数,可以看到pullCallBack接口的组成。

public void onSuccess(final PullResult pullResult);

public void onException(final Throwable e);


可以很清楚的看到实现了这个接口对于提交的请求的成功接收与异常处理明确提出了实现的要求,用以处理异步发送的请求在接收到之后的处理。在PullAPIWrapper传递参数的时候,会将pullCallBack外面在包一层pullCallBack。

而异步处理在PullAPIWrapper完成了requestHeader的封装之后,将会交给客户端实例的MQClientAPIImpl来继续发送取消息的要求。

public PullResult pullMessage(//
final String addr,//
final PullMessageRequestHeader requestHeader,//
final long timeoutMillis,//
final CommunicationMode communicationMode,//
final PullCallback pullCallback//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);

switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}

return null;
}


在这里可以清楚见到,异步请求比同步请求多了一个pullCallBack参数,用来处理异步接受消息接收的操作。

private void pullMessageAsync(//
final String addr,// 1
final RemotingCommand request,//
final long timeoutMillis,//
final PullCallback pullCallback//
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
}
catch (Exception e) {
pullCallback.onException(e);
}
}
else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed", responseFuture.getCause()));
}
else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause()));
}
else {
pullCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause()));
}
}
}
});
}

可以清楚见到,在提交给netty客户端的实现了新的类继承了InvokeCallBack接口,
b575
在接口定义的operationComplete()方法中清楚地在完成了对拉取消息的请求进行处理(此处与同步发送方式一样)之后,根据拉取消息的成功与否,异常出现与否调用onSuccess()方法或者onException()方法实现自己在pullCallBack当中对相应拉取消息结果的处理。

那么究竟,是如何在netty客户端实现的呢?

@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis,
InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
// test the channel writable or not
if (!channel.isWritable()) {
throw new RemotingTooMuchRequestException(String.format(
"the channel[%s] is not writable now", channel.toString()));
}

try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
}
catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
}
else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}

在NettyRemotingClient里,首先,会给传进来的Broker地址尝试寻找相应的通道。首先会尝试在map里寻找地址与ChannelWrapper的键值对,如果没有寻找到,则会尝试从新建立一个通道。ChannelWrapper作为NettyRemotingClient的内部类,用来包装ChannelFuture来维护通道对象。

private Channel createChannel(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}

// 进入临界区后,不能有阻塞操作,网络连接采用异步方式
if (this.lockChannelTables.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection = false;
cw = this.channelTables.get(addr);
if (cw != null) {
// channel正常
if (cw.isOK()) {
return cw.getChannel();
}
// 正在连接,退出锁等待
else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
}
// 说明连接不成功
else {
this.channelTables.remove(addr);
createNewConnection = true;
}
}
// ChannelWrapper不存在
else {
createNewConnection = true;
}

if (createNewConnection) {
ChannelFuture channelFuture =
this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
this.channelTables.put(addr, cw);
}
}
catch (Exception e) {
log.error("createChannel: create channel exception", e);
}
finally {
this.lockChannelTables.unlock();
}
}
else {
log.warn("createChannel: try to lock channel table, but timeout, {}ms", LockTimeoutMillis);
}

if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
log.info("createChannel: connect remote host[{}] success, {}", addr,
channelFuture.toString());
return cw.getChannel();
}
else {
log.warn(
"createChannel: connect remote host[" + addr + "] failed, "
+ channelFuture.toString(), channelFuture.cause());
}
}
else {
log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr,
this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString());
}
}

return null;
}

在这里会尝试与地址进行连接建立通道。

 

在得到通道之后,在调用rpc钩子来调用InvokeAsyncImpl()方法来继续发送。

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);

final ResponseFuture responseFuture =
new ResponseFuture(request.getOpaque(), timeoutMillis, invokeCallback, once);
this.responseTable.put(request.getOpaque(), responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
else {
responseFuture.setSendRequestOK(false);
}

responseFuture.putResponse(null);
responseTable.remove(request.getOpaque());
try {
responseFuture.executeInvokeCallback();
}
catch (Throwable e) {
plog.warn("excute callback in writeAndFlush addListener, and callback throw", e);
}
finally {
responseFuture.release();
}

plog.warn("send a request command to channel <{}> failed.",
RemotingHelper.parseChannelRemoteAddr(channel));
plog.warn(request.toString());
}
});
}
catch (Exception e) {
responseFuture.release();
plog.warn(
"send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel)
+ "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
}
else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
}
else {
String info =
String
.format(
"invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
timeoutMillis,//
this.semaphoreAsync.getQueueLength(),//
this.semaphoreAsync.availablePermits()//
);
plog.warn(info);
plog.warn(request.toString());
throw new RemotingTimeoutException(info);
}
}
}


在这里,对已经处理过pullCallBack的InvokeCallBack进行进一步的封装。在这里,对于消息的异步请求的回答,被封装成了ResponseFuture。
private volatile RemotingCommand responseCommand;
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;
private final int opaque;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final CountDownLatch countDownLatch = new CountDownLatch(1);

// 保证信号量至多至少只被释放一次
private final SemaphoreReleaseOnlyOnce once;

// 保证回调的callback方法至多至少只被执行一次
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);

这里用来封装的ResponseFuture完成了用户对于消息接收的具体操作的封装,以及对于消息被处理操作的线程安全处理。

由此可见,ResponseFuture又被封装在了netty给出的ChannelFutureListener当中,实现了operationComplete方法来完成对于消息异步请求接受的处理。

在ChannelFutureListener中,调用ResponseFuture的executeInvokeCallBack()方法,来具体进行对于response的异步操作,这里response并没有直接调用自己的实际操作方法,而是先尝试给atomicBoolean类进行cas操作,用以完成请求的单次操作。

ChannelFutureListener作为监听器,加入在了这个Channel当中作为监听器等待消息的异步处理。

取消息不支持oneway。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: