您的位置:首页 > 理论基础 > 计算机网络

基于Rsync算法的简单云盘实现(下)

2016-06-01 00:00 483 查看
摘要: 在上一篇博客的基础上,本次将与大家分享基于网络编程实现的简单云盘。如果你还没有看过上一篇博客,请先阅读上一篇博客后再来阅读此篇。

1、首先先介绍下简单的网络编程,需要编写一个服务端类(Server)和一个客户端类(Client),服务端可以接受多个客户端的连接。将客户端与服务端之间发送接收的消息封装为Msg类。代码如下:

//服务端

public class Server {

static boolean flag = true;

public static void main(String[] args){

try {

ServerSocket server = new ServerSocket(9528);

System.out.println("开始启动服务器");

while(flag){

//阻塞的IO

Socket socket = server.accept();

System.out.println("[Client Request]:"+socket.getRemoteSocketAddress().toString());

//可以读或写对象

ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());

ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

//单线程

Msg msg = (Msg)ois.readObject();

System.out.println(msg.getContent());

ois.close();

oos.close();

}

server.close();

} catch (Exception e) {

e.printStackTrace();

}

}

}

//客户端

public class Client {

public static void main(String[] args) {

try {

Socket socket = new Socket("127.0.0.1", 9528);

//可以读或写对象

ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());

//单线程

Msg msg = new Msg();

msg.setContent("Hello");

oos.writeObject(msg);

ois.close();

oos.close();

//监测所有文件状态变化

} catch (Exception e) {

e.printStackTrace();

}

}

}

/**

* 消息

* @author ZD

*/

public class Msg implements Serializable {

private static final long serialVersionUID = 1L;

private String content; //消息内容

public String getContent() {

return content;

}

public void setContent(String content) {

this.content = content;

}

}

运行结果如下:



以上是单线程。为了提高处理文件的效率,需改用为多线程。较为简单就不展示了。

2、解析:

现在我们需要将客户端即本地源文件上传至服务端,首先我们需将源文件分块计算文件的弱校验和与强校验和,并且设置块号,然后将消息(Msg类)传入服务端,所以此处还需要为消息添加消息类型(MsgType类),服务端再根据消息类型做相应回应。

消息类型如下:

public class MsgType {

public static final int Req_MetaData = 1; //请求元数据

public static final int Req_Patch = 2; //补丁

public static final int Req_NewFile = 3; //新文件

public static final int SUCCESS = 99;

public static final int ERROR = -1;

}

/**

* 消息

* @author ZD

*/

public class Msg implements Serializable {

private static final long serialVersionUID = 1L;

private String content;

private int msgType; //消息类型

private String filePath; //文件路径

private boolean isDir = false; //是否是目录

private boolean isNew = false; //服务器是否存在

private Map<Long, List<Chunk>> metaData; //服务器文件元数据

private Patch patch; //补丁

private byte[] datas; //如果是新文件全部放这里

... //省略set、get方法

}

客户端需要一个阻塞队列来保存消息,然后根据接收到服务端传来的Msg类型做相应处理。具体类型不同操作请看代码,如下:

//客户端

public class Client {

// 阻塞队列

public static BlockingQueue<Msg> msgQueue = new LinkedBlockingQueue<>();

public static void main(String[] args) {

try {

Socket socket = new Socket("127.0.0.1", 9528);

// 可以读或写对象

ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());

ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());

new ClientWriteThread(oos).start();

new ClientReadThread(ois).start();

// 文件固定

Msg msg = new Msg();

msg.setMsgType(MsgType.Req_MetaData);

msg.setFilePath("Hello.txt"); msgQueue.put(msg);

} catch (Exception e) {

e.printStackTrace();

}

}

}

其中ClientReadThread类为客户端读线程,若接收到MsgType为获取元数据时,需要创建补丁文件并将此消息加入队列中发送至服务端;若为服务端不存在该文件,即此文件为新文件时,需要将该文件的数据放至消息中并告诉服务端该消息为新文件类型;若为成功即代表该操作成功。为了方便观察,其中添加了一些输出语句。具体实现如下:

/**

* 客户端读线程

* @author ZD

*/

public class ClientReadThread extends Thread {

private ObjectInputStream ois;

private String clientPath="F:/rsyncTemp/local/";

public ClientReadThread(ObjectInputStream ois) {

this.ois = ois;

}

@Override

public void run() {

try {

while(true){

Msg msg = (Msg) ois.readObject();

//System.out.println(msg.getContent());

switch(msg.getMsgType()){

case MsgType.Req_MetaData:

doReqMetaData(msg);

break;

case MsgType.Req_NewFile:

doNewFile(msg);

break;

case MsgType.SUCCESS:

doSuccess(msg);

default:

break;

}

}

} catch (Exception e) {

e.printStackTrace();

}

}

private void doNewFile(Msg msg) throws IOException, InterruptedException {

System.out.println("[Client]:"+msg.getFilePath()+"服务器端不存在");

String filePath = clientPath+msg.getFilePath();

File file = new File(filePath);

byte[] datas = new byte[(int)file.length()]; //文件不是很大的时候

FileInputStream fis = new FileInputStream(file);

fis.read(datas);

fis.close();

Msg reMsg = new Msg();

reMsg.setMsgType(MsgType.Req_NewFile);

reMsg.setFilePath(msg.getFilePath());

reMsg.setDatas(datas);

Client.msgQueue.put(reMsg);

}

private void doSuccess(Msg msg) {

System.out.println("[Client]:"+msg.getFilePath()+"同步成功");

}

private void doReqMetaData(Msg msg) throws Exception {

System.out.println("[Client]:"+msg.getFilePath()+"获取元数据成功");

String filePath = clientPath+msg.getFilePath();

//打补丁

Patch patch = RsyncUtil.createPatch(new File(filePath), msg.getMetaData());

Msg patchMsg = new Msg();

patchMsg.setMsgType(MsgType.Req_Patch);

patchMsg.setPatch(patch);

patchMsg.setFilePath(msg.getFilePath());

Client.msgQueue.put(patchMsg);

}

}

ClientWriteThread类为客户端写线程,具体实现如下:

/**

* 客户端写线程

* @author ZD

*/

public class ClientWriteThread extends Thread {

private ObjectOutputStream oos;

public ClientWriteThread(ObjectOutputStream oos) {

this.oos = oos;

}

@Override

public void run() {

while(true){

try {

Msg msg = Client.msgQueue.take();

oos.writeObject(msg);

} catch (InterruptedException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

服务端也是根据接收到客户端的Msg做相应处理,若接收到的为请求元数据,先查看文件是否是目录,若为目录则需要查看目录是否存在,不存在则创建;若不为目录,则会先查看文件是否存在,存在则计算文件的CheckSum(调用上一篇博客中提到的RsyncUtil工具类),并且计算每块的强校验和与弱校验和,然后将元数据写给客户端。若不存在则告知客户端服务端不存在该文件;若为打补丁类型,则将客户端与服务端不同部分同步至服务端,更新文件;若为新文件类型,则将新文件同步至服务端。

啰啰嗦嗦半天,可能你还是没看懂,那就直接上代码吧!如下:

/**

* 服务器端的线程

* @author ZD

*/

public class ServerThread extends Thread{

private ObjectInputStream ois;

private ObjectOutputStream oos;

private String serverPath="F:/rsyncTemp/remote/";

public ServerThread(ObjectInputStream ois, ObjectOutputStream oos) {

this.ois = ois;

this.oos = oos;

}

@Override

public void run() {

try {

//需要不停的获取消息

while(true){

Msg msg = (Msg)ois.readObject();

switch(msg.getMsgType()){

case MsgType.Req_MetaData:

if(msg.isDir()){

doDir(msg);

}else{

doReqMetaData(msg);

}

break;

case MsgType.Req_NewFile:

doNewFile(msg);

break;

case MsgType.Req_Patch:

doPatch(msg);

break;

default:

break;

}

}

} catch (ClassNotFoundException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

}

}

private void doNewFile(Msg msg) throws IOException {

String filePath = serverPath+msg.getFilePath();

File file = new File(filePath);

FileOutputStream fos = new FileOutputStream(file);

fos.write(msg.getDatas());

fos.close();

Msg reMsg = new Msg();

reMsg.setMsgType(MsgType.SUCCESS);

reMsg.setFilePath(msg.getFilePath());

oos.writeObject(reMsg);

}

//专处理目录的

private void doDir(Msg msg) throws IOException {

String filePath = serverPath+msg.getFilePath();

File file = new File(filePath);

if(!file.exists()){

file.mkdir();

}

Msg reMsg = new Msg();

reMsg.setMsgType(MsgType.SUCCESS);

reMsg.setFilePath(msg.getFilePath());

oos.writeObject(reMsg);

}

private void doPatch(Msg msg) throws Exception {

System.out.println("[Server]:"+msg.getFilePath()+"开始打补丁");

String filePath = serverPath+msg.getFilePath();

Patch patch = msg.getPatch();

RsyncUtil.applyPatch(patch, filePath);

Msg reMsg = new Msg();

reMsg.setMsgType(MsgType.SUCCESS);

reMsg.setFilePath(msg.getFilePath());

oos.writeObject(reMsg);

}

private void doReqMetaData(Msg msg) throws IOException{

System.out.println("[Server]:"+msg.getFilePath()+"开始请求元数据");

String filePath = serverPath+msg.getFilePath();

File file = new File(filePath);

Map<Long, List<Chunk>> metaData = new HashMap<>();

if(file.exists()){

metaData = RsyncUtil.calcCheckSumFromFile(filePath);

}else{

msg.setMsgType(MsgType.Req_NewFile);

msg.setNew(true);

}

msg.setMetaData(metaData);

oos.writeObject(msg);

}

}

现在差不多就完成了一个简单云盘,只是现在只能针对指定的单个文件,且输出的调试语句也很生硬。下面就给出一个监测的工具类,不仅可以对多个文件操作,还可以实现监测指定路径下所有文件的变化情况。(其实是老师写的~~~抱歉、具体实现没有特别研究,如果你有兴趣可以研究研究哦~~~~)。代码如下:

/**

* 文件监控回调函数

* @author: 0xC000005

*/

public interface WatchCallBack {

public void create(String path);

public void modify(String path);

public void rename(String from , String to);

public void delete(String path);

}

/**

* 回调实现

* @author: 0xC000005

*/

public class WatchCallBackImpl implements WatchCallBack{

private String clientPath="F:/rsyncTemp/local/";

public WatchCallBackImpl() {

}

@Override

public void create(String path) {

System.out.println("创建文件 "+path);

}

@Override

public void modify(String path) { //主要修改该方法

System.out.println("修改文件 "+path);

Msg msg = new Msg();

File file = new File(clientPath+path);

if(file.isDirectory()){

msg.setDir(true);

}

msg.setType(MsgType.Req_MetaData);

msg.setFilePath(path);

try {

Client.msgQueue.put(msg);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

public void rename( String from , String to ){

System.out.println("重命名文件 "+from + " --> "+to);

}

@Override

public void delete(String path) {

}

}

/**

* 磁盘监听器,用于监听目录变化

* @author: 0xC000005

*/

public class Watcher {

@SuppressWarnings("unchecked")

public static boolean listenChanges(String fileName,WatchCallBack callback) throws Exception{

File file = new File(fileName);

if( file.exists() && file.isDirectory() ){

System.out.println("同步服务开始监听 !");

Path path = Bootstrapper.newPath(file);

WatchService ws = Bootstrapper.newWatchService();

path.register(ws,new WatchEvent.Kind<?>[]{

StandardWatchEventKind.ENTRY_CREATE,

StandardWatchEventKind.ENTRY_MODIFY,

StandardWatchEventKind.ENTRY_DELETE,

ExtendedWatchEventKind.ENTRY_RENAME_FROM,

ExtendedWatchEventKind.ENTRY_RENAME_TO},

ExtendedWatchEventModifier.FILE_TREE);

WatchKey watch = null;

boolean flag = true;

while(flag){

watch = ws.take();

List<WatchEvent<?>> events = watch.pollEvents();

watch.reset();

String from ="";

String to ="";

for(WatchEvent<?> event : events){

Kind<Path> kind = (Kind<Path>) event.kind();

PathImpl context = (PathImpl) event.context();

String filePath = context.getFile().getPath();

if( kind.equals(StandardWatchEventKind.ENTRY_MODIFY) ){

System.out.println("修改: "+filePath);

callback.modify(filePath);

}else if( kind.equals(StandardWatchEventKind.ENTRY_CREATE) ){

System.out.println("创建: "+filePath);

callback.create(filePath);

}else if( kind.equals(StandardWatchEventKind.ENTRY_DELETE) ){

System.out.println("删除: "+filePath);

callback.delete(filePath);

}else if( kind.equals(ExtendedWatchEventKind.ENTRY_RENAME_FROM) ){

System.out.println("重命名From: "+filePath);

from = filePath;

}else if( kind.equals(ExtendedWatchEventKind.ENTRY_RENAME_TO) ){

System.out.println("重命名To: "+filePath);

to = filePath;

callback.rename(from, to);

}

}

}

return true;

}else{

System.err.println("文件不存在或不是目录");

return false;

}

}

}

写在最后:若有错误,望纠正。希望我的分析能给大家带来收获。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息