fastDFS遇到的并发问题recv cmd: 0 is not correct, expect cmd: 100
2018-01-16 20:28
671 查看
此贴是我对一篇帖子FastDFS并发会有bug,其实我也不太信?进行的一些测试和我自己也发现的相关问题与解决方案。
最开始我也按照帖子进行了测试,结果发现确实存在并发问题(下面代码改为单线程是木有问题的)。以下列出我尝试的几种情况:
先列出我进行测试的有main函数的FastConcurrence类:
①把trackerClient,trackerServer,storageServer,storageClient设为全局变量。在类加载的时候,就进行了初始化,关键代码如下:
果不其然报错了:
②我在调用storageClient.upload_file(byte[] file_buff, String file_ext_name, NameValuePair[] meta_list)前一句再storageClient = new StorageClient(trackerServer, storageServer);
关键代码如下:
好了,出现了和上面①一样的问题。
我发现,其实这样本质没有变,还是去改了全局storageClient 。应该new个新的,所以产生了③。
③我在调用storageClient.upload_file(byte[] file_buff, String file_ext_name, NameValuePair[]
meta_list)前一句再StorageClient storageClient = new StorageClient(trackerServer, storageServer); 就是原帖最后说明的解决办法。
我还是报错,和前面一样的异常。原因还没有来得及找,不过我采用锁是没有问题的。
④在调用方法的时候采用锁就能解决。例如这个方法我加了锁就没有问题了。
一路顺畅。不过③为什么报错了,我还要研究下。
补:对于③我找到问题了。
源码下载下来,然后我就查StorageClient类,发现在do_upload_file方法中,有一段代码:
bUploadSlave = ((group_name != null && group_name.length() > 0) &&
(master_filename != null && master_filename.length() > 0) &&
(prefix_name != null));
if (bUploadSlave)
{
bNewConnection = this.newUpdatableStorageConnection(group_name, master_filename);
}
else
{
bNewConnection = this.newWritableStorageConnection(group_name);
}其中的
protected boolean newWritableStorageConnection(String group_name) throws IOException, MyException
{
if (this.storageServer != null)
{
return false;
}
else
{
TrackerClient tracker = new TrackerClient();this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);
if (this.storageServer == null)
{
throw new MyException("getStoreStorage fail, errno code: " + tracker.getErrorCode());
}
return true;
}
}而这个方法里面的
trackerServer.close();
所以后面线程都被关闭了,就报IO异常,只要在方法③中new StorageClient前面再加一句:
TrackerServer trackerServer = trackerClient.getConnection();就解决问题了。
最开始我也按照帖子进行了测试,结果发现确实存在并发问题(下面代码改为单线程是木有问题的)。以下列出我尝试的几种情况:
先列出我进行测试的有main函数的FastConcurrence类:
public class FastConcurrence { private static int poolSize=2;//定义线程个数 public static void main(String[] args) throws InterruptedException { latchTest(); } private static void latchTest() throws InterruptedException { final CountDownLatch start = new CountDownLatch(1); final CountDownLatch end = new CountDownLatch(poolSize); ExecutorService exce = Executors.newFixedThreadPool(poolSize); for (int i = 0; i < poolSize; i++) { Runnable run = new Runnable() { @Override public void run() { try { start.await(); testLoad(); } catch (Exception e) { e.printStackTrace(); } finally { end.countDown(); } } }; exce.submit(run); } start.countDown(); end.await(); exce.shutdown(); } private static void testLoad() throws Exception { String filePath="D:\\fastDFS\\ques.png"; File content=new File(filePath); TestFileManager test=new TestFileManager(); FastDFSFile file =test.getFastFile(content,"png"); for (int i=0;i<10;i++){ FileManager.upload(file);//里面封装了我转文件为byte[]和上传文件方法storageClient.upload_file } System.out.println("完成一个线程!"); } }
①把trackerClient,trackerServer,storageServer,storageClient设为全局变量。在类加载的时候,就进行了初始化,关键代码如下:
private static TrackerClient trackerClient; private static TrackerServer trackerServer; private static StorageServer storageServer; private static StorageClient storageClient; static { try { // 初始化文件资源 ClientGlobal.init("C:\\Users\\jianbo\\Downloads\\FastDFS\\conf\\client.conf"); trackerClient = new TrackerClient(); trackerServer = trackerClient.getConnection(); //有并发问题,所以勿重用storageClient storageClient = new StorageClient(trackerServer, storageServer); } catch (Exception e) { logger.error(logger, e); } }
果不其然报错了:
java.net.SocketException: socket closed at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(Unknown Source) at java.net.SocketInputStream.read(Unknown Source) at java.net.SocketInputStream.read(Unknown Source) at org.csource.fastdfs.ProtoCommon.recvPackage(ProtoCommon.java:263) at org.csource.fastdfs.TrackerClient.getStoreStorage(TrackerClient.java:143) at org.csource.fastdfs.StorageClient.newWritableStorageConnection(StorageClient.java:1938) at org.csource.fastdfs.StorageClient.do_upload_file(StorageClient.java:703) at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:208) at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:226) at fastdfs.FileManager.upload(FileManager.java:76) at fastdfs.FastConcurrence.testLoad(FastConcurrence.java:48) at fastdfs.FastConcurrence.access$0(FastConcurrence.java:42) at fastdfs.FastConcurrence$1.run(FastConcurrence.java:27) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) java.io.IOException: recv cmd: 0 is not correct, expect cmd: 100 at org.csource.fastdfs.ProtoCommon.recvHeader(ProtoCommon.java:219) at org.csource.fastdfs.ProtoCommon.recvPackage(ProtoCommon.java:250) at org.csource.fastdfs.TrackerClient.getStoreStorage(TrackerClient.java:143) at org.csource.fastdfs.StorageClient.newWritableStorageConnection(StorageClient.java:1938) at org.csource.fastdfs.StorageClient.do_upload_file(StorageClient.java:703) at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:208) at org.csource.fastdfs.StorageClient.upload_file(StorageClient.java:226) at fastdfs.FileManager.upload(FileManager.java:76) at fastdfs.FastConcurrence.testLoad(FastConcurrence.java:48) at fastdfs.FastConcurrence.access$0(FastConcurrence.java:42) at fastdfs.FastConcurrence$1.run(FastConcurrence.java:27) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)
②我在调用storageClient.upload_file(byte[] file_buff, String file_ext_name, NameValuePair[] meta_list)前一句再storageClient = new StorageClient(trackerServer, storageServer);
关键代码如下:
public static String[] upload(FastDFSFile file) { try { storageClient = new StorageClient(trackerServer, storageServer);//新加的 uploadResults = storageClient.upload_file(file.getContent(), file.getExt(), meta_list); } catch (Exception e) { logger.error("Exception when uploadind the file:" + file.getName(), e); } //省略一部分代码 return uploadResults; }
好了,出现了和上面①一样的问题。
我发现,其实这样本质没有变,还是去改了全局storageClient 。应该new个新的,所以产生了③。
③我在调用storageClient.upload_file(byte[] file_buff, String file_ext_name, NameValuePair[]
meta_list)前一句再StorageClient storageClient = new StorageClient(trackerServer, storageServer); 就是原帖最后说明的解决办法。
public static String[] upload(FastDFSFile file) { try { StorageClient storageClient = new StorageClient(trackerServer, storageServer);//新加的 uploadResults = storageClient.upload_file(file.getContent(), file.getExt(), meta_list); } catch (Exception e) { logger.error("Exception when uploadind the file:" + file.getName(), e); } //省略一部分代码 return uploadResults; }
我还是报错,和前面一样的异常。原因还没有来得及找,不过我采用锁是没有问题的。
④在调用方法的时候采用锁就能解决。例如这个方法我加了锁就没有问题了。
private synchronized static void testLoad() throws Exception { String filePath="D:\\fastDFS\\ques.png"; File content=new File(filePath); TestFileManager test=new TestFileManager(); FastDFSFile file =test.getFastFile(content,"png"); for (int i=0;i<10;i++){ FileManager.upload(file); } System.out.println(" 4000 完成一个线程!"); }
一路顺畅。不过③为什么报错了,我还要研究下。
补:对于③我找到问题了。
源码下载下来,然后我就查StorageClient类,发现在do_upload_file方法中,有一段代码:
bUploadSlave = ((group_name != null && group_name.length() > 0) &&
(master_filename != null && master_filename.length() > 0) &&
(prefix_name != null));
if (bUploadSlave)
{
bNewConnection = this.newUpdatableStorageConnection(group_name, master_filename);
}
else
{
bNewConnection = this.newWritableStorageConnection(group_name);
}其中的
this.newWritableStorageConnection(group_name);具体方法是:
protected boolean newWritableStorageConnection(String group_name) throws IOException, MyException
{
if (this.storageServer != null)
{
return false;
}
else
{
TrackerClient tracker = new TrackerClient();this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);
if (this.storageServer == null)
{
throw new MyException("getStoreStorage fail, errno code: " + tracker.getErrorCode());
}
return true;
}
}而这个方法里面的
this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);找进去发现每次trackerServer用完了,就关闭连接了。
trackerServer.close();
所以后面线程都被关闭了,就报IO异常,只要在方法③中new StorageClient前面再加一句:
TrackerServer trackerServer = trackerClient.getConnection();就解决问题了。
this.storageServer = tracker.getStoreStorage(this.trackerServer, group_name);
相关文章推荐
- MyEclipse中用Tomcat部署项目时遇到的问题2:Document base D:/Tomcat 5.5/server/webapps/ admin does not exist or is not a readable directory
- 《Django学习》遇到的问题:“TypeError:'str' object is not callable"
- (上一篇的问题解决后又遇到的新问题)在eclipse中导入工程后运行任意文件出现"The selection is not within a valid module"
- 提交appstore遇到的问题 ITMS-90239:”Invalid Signature.Code object is not signed at all.The binary at path
- Cocopods 更新后遇到 这样的问题 The dependency xxxxxxxx is not used in any concrete target
- web项目浏览器打开遇到问题:HTTP Status 503 - This application is not currently available
- Android Studio开启虚拟机遇到HAX kernel module is not installed问题
- 在ubuntu10.10中安装VMWare Tool,遇到The path " " is not valid 的问题
- quartz使用时遇到的问题Bean property 'xxx' is not writable or has an invalid setter method. Did you mean 'xxx
- mysql 5.7会遇到 [Err] 1055 - Expression #1 of ORDER BY clause is not in GROUP BY clause ...的问题
- Linux遇到的问题(一)Ubuntu报“xxx is not in the sudoers file.This incident will be reported” 错误解决方法
- jQ中遇到的小问题-----Jquery调用出现ReferenceError: $ is not defined
- 遇到问题---MongoDB--$not和$and搭配使用报错---operator $not is not allowed around criteria chain element: { "$an
- VB遇到的问题【运行EXE时出现“VB未预期的错误”】【Component ‘RICHTX32.0CX’not correctly registered: file is.....”】
- sharepoint webpart开发中遇到的问题 This type of page is not served.
- DWR初次使用遇到的问题:Servlet dwr-invoker is not available
- Spring mvc 上传图片遇到The current request is not a multipart request的问题
- 遇到问题---Hadoop---java.io.IOException: NameNode is not formatted
- JUNIT遇到问题An instrumention test runner is not specified
- Mysql5.6 + Python 3.3遇到问题“ImportError: No module named 'mysql.connector'; mysql is not a package”