您的位置:首页 > 其它

fastDFS遇到的并发问题recv cmd: 0 is not correct, expect cmd: 100

2018-01-16 20:28 671 查看
此贴是我对一篇帖子FastDFS并发会有bug,其实我也不太信?进行的一些测试和我自己也发现的相关问题与解决方案。

最开始我也按照帖子进行了测试,结果发现确实存在并发问题(下面代码改为单线程是木有问题的)。以下列出我尝试的几种情况:

先列出我进行测试的有main函数的FastConcurrence类:

public class FastConcurrence {
private static int poolSize=2;//定义线程个数
public static void main(String[] args) throws InterruptedException {
latchTest();
}
private static void latchTest() throws InterruptedException {
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch end = new CountDownLatch(poolSize);
ExecutorService exce = Executors.newFixedThreadPool(poolSize);
for (int i = 0; i < poolSize; i++) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
start.await();
testLoad();
} catch (Exception e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
};
exce.submit(run);
}
start.countDown();
end.await();
exce.shutdown();
}

private static void testLoad() throws Exception {
String filePath="D:\\fastDFS\\ques.png";
File content=new File(filePath);
TestFileManager test=new TestFileManager();
FastDFSFile file =test.getFastFile(content,"png");
for (int i=0;i<10;i++){                                                                                  FileManager.upload(file);//里面封装了我转文件为byte[]和上传文件方法storageClient.upload_file
}
System.out.println("完成一个线程!");
}
}

①把trackerClient,trackerServer,storageServer,storageClient设为全局变量。在类加载的时候,就进行了初始化,关键代码如下:

private static TrackerClient  trackerClient;
private static TrackerServer  trackerServer;
private static StorageServer  storageServer;
private static StorageClient  storageClient;

static {
try {
// 初始化文件资源
ClientGlobal.init("C:\\Users\\jianbo\\Downloads\\FastDFS\\conf\\client.conf");
trackerClient = new TrackerClient();
trackerServer = trackerClient.getConnection();
//有并发问题,所以勿重用storageClient
storageClient = new StorageClient(trackerServer, storageServer);
} catch (Exception e) {
logger.error(logger,  e);
}
}

果不其然报错了:

java.net.SocketException: socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at org.csource.fastdfs.ProtoCommon.recvPackage(ProtoCommon.java:263)
at org.csource.fastdfs.TrackerClient.getStoreStorage(TrackerClient.java:143)
at org.csource.fastdfs.StorageClient.newWritableStorageConnection(StorageClient.java:1938)
at org.csource.fastdfs.StorageClient.do_upload_file(StorageClient.java:703)
at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:208)
at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:226)
at fastdfs.FileManager.upload(FileManager.java:76)
at fastdfs.FastConcurrence.testLoad(FastConcurrence.java:48)
at fastdfs.FastConcurrence.access$0(FastConcurrence.java:42)
at fastdfs.FastConcurrence$1.run(FastConcurrence.java:27)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
java.io.IOException: recv cmd: 0 is not correct, expect cmd: 100
at org.csource.fastdfs.ProtoCommon.recvHeader(ProtoCommon.java:219)
at org.csource.fastdfs.ProtoCommon.recvPackage(ProtoCommon.java:250)
at org.csource.fastdfs.TrackerClient.getStoreStorage(TrackerClient.java:143)
at org.csource.fastdfs.StorageClient.newWritableStorageConnection(StorageClient.java:1938)
at org.csource.fastdfs.StorageClient.do_upload_file(StorageClient.java:703)
at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:208)
at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:226)
at fastdfs.FileManager.upload(FileManager.java:76)
at fastdfs.FastConcurrence.testLoad(FastConcurrence.java:48)
at fastdfs.FastConcurrence.access$0(FastConcurrence.java:42)
at fastdfs.FastConcurrence$1.run(FastConcurrence.java:27)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

②我在调用storageClient.upload_file(byte[] file_buff, String file_ext_name, NameValuePair[] meta_list)前一句再storageClient = new StorageClient(trackerServer, storageServer); 
关键代码如下:

public static String[] upload(FastDFSFile file) {
try {
storageClient = new StorageClient(trackerServer, storageServer);//新加的
uploadResults = storageClient.upload_file(file.getContent(), file.getExt(), meta_list);
} catch (Exception e) {
logger.error("Exception when uploadind the file:" + file.getName(), e);
}
//省略一部分代码
return uploadResults;
}

好了,出现了和上面①一样的问题。
我发现,其实这样本质没有变,还是去改了全局storageClient 。应该new个新的,所以产生了③。

③我在调用storageClient.upload_file(byte[] file_buff, String file_ext_name, NameValuePair[]
meta_list)前一句再StorageClient storageClient = new StorageClient(trackerServer, storageServer); 就是原帖最后说明的解决办法。

public static String[] upload(FastDFSFile file) {
try {
StorageClient storageClient = new StorageClient(trackerServer, storageServer);//新加的
uploadResults = storageClient.upload_file(file.getContent(), file.getExt(), meta_list);
} catch (Exception e) {
logger.error("Exception when uploadind the file:" + file.getName(), e);
}
//省略一部分代码
return uploadResults;
}

我还是报错,和前面一样的异常。原因还没有来得及找,不过我采用锁是没有问题的。
④在调用方法的时候采用锁就能解决。例如这个方法我加了锁就没有问题了。

private synchronized static void testLoad() throws Exception {
String filePath="D:\\fastDFS\\ques.png";
File content=new File(filePath);
TestFileManager test=new TestFileManager();
FastDFSFile file =test.getFastFile(content,"png");
for (int i=0;i<10;i++){
FileManager.upload(file);
}
System.out.println("
4000
完成一个线程!");
}

一路顺畅。不过③为什么报错了,我还要研究下。

补:对于③我找到问题了。

源码下载下来,然后我就查StorageClient类,发现在do_upload_file方法中,有一段代码:

bUploadSlave = ((group_name != null && group_name.length() > 0) &&
(master_filename != null && master_filename.length() > 0) &&
(prefix_name != null));
if (bUploadSlave)
{
bNewConnection = this.newUpdatableStorageConnection(group_name, master_filename);
}
else
{
bNewConnection = this.newWritableStorageConnection(group_name);
}其中的
this.newWritableStorageConnection(group_name);
具体方法是:
protected boolean newWritableStorageConnection(String group_name) throws IOException, MyException
{
if (this.storageServer != null)
{
return false;
}
else
{
TrackerClient tracker = new TrackerClient();this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);
if (this.storageServer == null)
{
throw new MyException("getStoreStorage fail, errno code: " + tracker.getErrorCode());
}
return true;
}
}而这个方法里面的

this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);
找进去发现每次trackerServer用完了,就关闭连接了。
trackerServer.close();

所以后面线程都被关闭了,就报IO异常,只要在方法③中new StorageClient前面再加一句:

TrackerServer trackerServer = trackerClient.getConnection();就解决问题了。

this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  fastDFS 并发 多线程
相关文章推荐