您的位置:首页 > 其它

FastDFS源码分析之tracker协议分析

2016-06-12 16:00 507 查看
本篇博客主要讲解fastdfs中tracker协议的讲解。

fastdfs主要是存储文件,直接把整个文件存储到磁盘上,所以,简单直接。但是也有很大的局限性。

因此,fastdfs对文件的目录设置和存储是最为核心的。

为什么这么突然的讲解这些。因为我已经看了一段时间的fastdfs,主要结构都已经搞的比较清晰了。因此,这篇文章,我就主要一tracker这一部分的协议来分析。

其他具体介绍tracker的请百度。我就不介绍了,我就直接从

[cpp] view
plain copy







int tracker_deal_task(struct fast_task_info *pTask)

这个方法开始对每个case分析。

1、storage心跳协议

[cpp] view
plain copy







case TRACKER_PROTO_CMD_STORAGE_BEAT:

TRACKER_CHECK_LOGINED(pTask)

result = tracker_deal_storage_beat(pTask);

break;

自然,该协议是从storage层发送给tracker层的数据包,

[cpp] view
plain copy







#define TRACKER_PROTO_CMD_STORAGE_BEAT 83 //storage heart beat

那么,storage主要是做了什么:

storage在启动的时候,会开启一个线程,该线程为

[cpp] view
plain copy







static void *tracker_report_thread_entrance(void *arg)

该函数主要是做了根据配置连接相应的它的组的tacker,做一些事情,这里有个while循环,代码如下

[cpp] view
plain copy







current_time = g_current_time;

if (current_time - last_beat_time >= \

g_heart_beat_interval)

{

if (tracker_heart_beat(pTrackerServer, \

&stat_chg_sync_count, \

&bServerPortChanged) != 0)

{

break;

}

也就是至少30秒钟来一次心跳,心跳包的主要数据是包头和当前storage的状态信息,

[cpp] view
plain copy







char out_buff[sizeof(TrackerHeader) + sizeof(FDFSStorageStatBuff)];

[cpp] view
plain copy







/* struct for network transfering */

typedef struct

{

char sz_total_upload_count[8];

char sz_success_upload_count[8];

char sz_total_append_count[8];

char sz_success_append_count[8];

char sz_total_modify_count[8];

char sz_success_modify_count[8];

char sz_total_truncate_count[8];

char sz_success_truncate_count[8];

char sz_total_set_meta_count[8];

char sz_success_set_meta_count[8];

char sz_total_delete_count[8];

char sz_success_delete_count[8];

char sz_total_download_count[8];

char sz_success_download_count[8];

char sz_total_get_meta_count[8];

char sz_success_get_meta_count[8];

char sz_total_create_link_count[8];

char sz_success_create_link_count[8];

char sz_total_delete_link_count[8];

char sz_success_delete_link_count[8];

char sz_total_upload_bytes[8];

char sz_success_upload_bytes[8];

char sz_total_append_bytes[8];

char sz_success_append_bytes[8];

char sz_total_modify_bytes[8];

char sz_success_modify_bytes[8];

char sz_total_download_bytes[8];

char sz_success_download_bytes[8];

char sz_total_sync_in_bytes[8];

char sz_success_sync_in_bytes[8];

char sz_total_sync_out_bytes[8];

char sz_success_sync_out_bytes[8];

char sz_total_file_open_count[8];

char sz_success_file_open_count[8];

char sz_total_file_read_count[8];

char sz_success_file_read_count[8];

char sz_total_file_write_count[8];

char sz_success_file_write_count[8];

char sz_last_source_update[8];

char sz_last_sync_update[8];

char sz_last_synced_timestamp[8];

char sz_last_heart_beat_time[8];

} FDFSStorageStatBuff;

tracker主要是做了什么呢?

对其进行解包,然后对这个保存在本地的storage的信息进行保存到文件中,调用

[cpp] view
plain copy







status = tracker_save_storages();

调用

[cpp] view
plain copy







tracker_mem_active_store_server(pClientInfo->pGroup, \

pClientInfo->pStorage);

将这个存储服务器如果没有,就插入到group中。

最后调用

[cpp] view
plain copy







static int tracker_check_and_sync(struct fast_task_info *pTask, \

const int status)

检查相应的改变状态,并将其同步等。(需要再详细看看)

2、报告相应同步时间

[cpp] view
plain copy







#define TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT 89 //report src last synced time as dest server

同样在storage的report线程执行

[cpp] view
plain copy







if (sync_time_chg_count != g_sync_change_count && \

current_time - last_sync_report_time >= \

g_heart_beat_interval)

{

if (tracker_report_sync_timestamp( \

pTrackerServer, &bServerPortChanged)!=0)

{

break;

}

sync_time_chg_count = g_sync_change_count;

last_sync_report_time = current_time;

}

具体的数据包为

[cpp] view
plain copy







pEnd = g_storage_servers + g_storage_count;

for (pServer=g_storage_servers; pServer<pEnd; pServer++)

{

memcpy(p, pServer->server.id, FDFS_STORAGE_ID_MAX_SIZE);

p += FDFS_STORAGE_ID_MAX_SIZE;

int2buff(pServer->last_sync_src_timestamp, p);

p += 4;

}

也就是遍历当前进程的本组所有storage服务器,和上次同步的时间戳,给tracker服务器。

然后tracker的服务器存储结构为

[cpp] view
plain copy







pClientInfo->pGroup->last_sync_timestamps \

[src_index][dest_index] = sync_timestamp;

dest_index 值为当前连接所在组的索引值

[cpp] view
plain copy







dest_index = tracker_mem_get_storage_index(pClientInfo->pGroup,

pClientInfo->pStorage);

if (dest_index < 0 || dest_index >= pClientInfo->pGroup->count)

{

status = 0;

break;

}

因为 本链接的storage是固定不变的,而src_index就是为本组的其他storage的id索引,

首相通过id,(ip地址)找到具体的storage,然后在通过指针找到索引位置,

最后,调用

[cpp] view
plain copy







if (++g_storage_sync_time_chg_count % \

TRACKER_SYNC_TO_FILE_FREQ == 0)

{

status = tracker_save_sync_timestamps();

}

else

{

status = 0;

}

} while (0);

return tracker_check_and_sync(pTask, status);

定时保存文件和检查等

3、上报磁盘情况

#define TRACKER_PROTO_CMD_STORAGE_REPORT_DISK_USAGE 84 //report disk usage

同样线程定时调用,

[cpp] view
plain copy







if (current_time - last_df_report_time >= \

g_stat_report_interval)

{

if (tracker_report_df_stat(pTrackerServer, \

&bServerPortChanged) != 0)

{

break;

}

last_df_report_time = current_time;

}

同样上报这些数据

[cpp] view
plain copy







for (i=0; i<g_fdfs_store_paths.count; i++)

{

if (statvfs(g_fdfs_store_paths.paths[i], &sbuf) != 0)

{

logError("file: "__FILE__", line: %d, " \

"call statfs fail, errno: %d, error info: %s.",\

__LINE__, errno, STRERROR(errno));

if (pBuff != out_buff)

{

free(pBuff);

}

return errno != 0 ? errno : EACCES;

}

g_path_space_list[i].total_mb = ((int64_t)(sbuf.f_blocks) * \

sbuf.f_frsize) / FDFS_ONE_MB;

g_path_space_list[i].free_mb = ((int64_t)(sbuf.f_bavail) * \

sbuf.f_frsize) / FDFS_ONE_MB;

long2buff(g_path_space_list[i].total_mb, pStatBuff->sz_total_mb);

long2buff(g_path_space_list[i].free_mb, pStatBuff->sz_free_mb);

pStatBuff++;

}

tracker这边存储在

int64_t *path_total_mbs; //total disk storage in MB

int64_t *path_free_mbs; //free disk storage in MB

这里

[cpp] view
plain copy







path_total_mbs[i] = buff2long(pStatBuff->sz_total_mb);

path_free_mbs[i] = buff2long(pStatBuff->sz_free_mb);

pClientInfo->pStorage->total_mb += path_total_mbs[i];

pClientInfo->pStorage->free_mb += path_free_mbs[i];

4、storage服加入到tracker

[cpp] view
plain copy







#define TRACKER_PROTO_CMD_STORAGE_JOIN 81

storage线程同样在该处调用

[cpp] view
plain copy







if (tracker_report_join(pTrackerServer, tracker_index, \

sync_old_done) != 0)

{

sleep(g_heart_beat_interval);

continue;

}

发送的包体数据包为:

[cpp] view
plain copy







typedef struct

{

char group_name[FDFS_GROUP_NAME_MAX_LEN+1];

char storage_port[FDFS_PROTO_PKG_LEN_SIZE];

char storage_http_port[FDFS_PROTO_PKG_LEN_SIZE];

char store_path_count[FDFS_PROTO_PKG_LEN_SIZE];

char subdir_count_per_path[FDFS_PROTO_PKG_LEN_SIZE];

char upload_priority[FDFS_PROTO_PKG_LEN_SIZE];

char join_time[FDFS_PROTO_PKG_LEN_SIZE]; //storage join timestamp

char up_time[FDFS_PROTO_PKG_LEN_SIZE]; //storage service started timestamp

char version[FDFS_VERSION_SIZE]; //storage version

char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE];

char init_flag;

signed char status;

char tracker_count[FDFS_PROTO_PKG_LEN_SIZE]; //all tracker server count

} TrackerStorageJoinBody;

当赋值完成后,在气候变加入

[cpp] view
plain copy







p = out_buff + sizeof(TrackerHeader) + sizeof(TrackerStorageJoinBody);

pServerEnd = g_tracker_group.servers + g_tracker_group.server_count;

for (pServer=g_tracker_group.servers; pServer<pServerEnd; pServer++)

{

/*

if (strcmp(pServer->ip_addr, pTrackerServer->ip_addr) == 0 && \

pServer->port == pTrackerServer->port)

{

continue;

}

tracker_count++;

*/

sprintf(p, "%s:%d", pServer->ip_addr, pServer->port);

p += FDFS_PROTO_IP_PORT_SIZE;

}

加入所有tracker的服务器信息格式为ip:port

tracker 服务器接收

[cpp] view
plain copy







case TRACKER_PROTO_CMD_STORAGE_JOIN:

result = tracker_deal_storage_join(pTask);

break;

获取到的相关信息存储到

[cpp] view
plain copy







typedef struct

{

int storage_port;

int storage_http_port;

int store_path_count;

int subdir_count_per_path;

int upload_priority;

int join_time; //storage join timestamp (create timestamp)

int up_time; //storage service started timestamp

char version[FDFS_VERSION_SIZE]; //storage version

char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];

char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE];

char init_flag;

signed char status;

int tracker_count;

ConnectionInfo tracker_servers[FDFS_MAX_TRACKERS];

} FDFSStorageJoinBody;

这些结构体内

同时插入本地内存

result = tracker_mem_add_group_and_storage(pClientInfo, \

pTask->client_ip, &joinBody, true);

同时把发消息报的id传过来

[cpp] view
plain copy







pJoinBodyResp = (TrackerStorageJoinBodyResp *)(pTask->data + \

sizeof(TrackerHeader));

memset(pJoinBodyResp, 0, sizeof(TrackerStorageJoinBodyResp));

if (pClientInfo->pStorage->psync_src_server != NULL)

{

strcpy(pJoinBodyResp->src_id, \

pClientInfo->pStorage->psync_src_server->id);

}

5、报告存储状态

[cpp] view
plain copy







#define TRACKER_PROTO_CMD_STORAGE_REPORT_STATUS 76 //report specified storage server status

storage服务器调用

[cpp] view
plain copy







int tracker_report_storage_status(ConnectionInfo *pTrackerServer, \

FDFSStorageBrief *briefServer)

内容主要是组名字

[cpp] view
plain copy







strcpy(out_buff + sizeof(TrackerHeader), g_group_name);

和简要信息

[cpp] view
plain copy







memcpy(out_buff + sizeof(TrackerHeader) + FDFS_GROUP_NAME_MAX_LEN, \

briefServer, sizeof(FDFSStorageBrief));

其结构体如下

[cpp] view
plain copy







typedef struct

{

char status;

char port[4];

char id[FDFS_STORAGE_ID_MAX_SIZE];

char ip_addr[IP_ADDRESS_SIZE];

} FDFSStorageBrief;

6、从tracker获取storage状态。

[cpp] view
plain copy







#define TRACKER_PROTO_CMD_STORAGE_GET_STATUS 71 //get storage status from tracker

该协议是由client发起

调用流程如下:

[cpp] view
plain copy







int tracker_get_storage_status(ConnectionInfo *pTrackerServer, \

const char *group_name, const char *ip_addr, \

FDFSStorageBrief *pDestBuff)

int tracker_get_storage_max_status(TrackerServerGroup *pTrackerGroup, \

const char *group_name, const char *ip_addr, \

char *storage_id, int *status)

int tracker_get_storage_status(ConnectionInfo *pTrackerServer, \

const char *group_name, const char *ip_addr, \

FDFSStorageBrief *pDestBuff)

获取自己的状态,

包体格式 组名 ip的字符串

tracker通过获取了相应的数据,查找到storage的信息

结构体为:

[cpp] view
plain copy







typedef struct

{

char status;

char port[4];

char id[FDFS_STORAGE_ID_MAX_SIZE];

char ip_addr[IP_ADDRESS_SIZE];

} FDFSStorageBrief;

赋值后,返回

7、通过tracker获取storageid

#define TRACKER_PROTO_CMD_STORAGE_GET_SERVER_ID
70 //get storage server id from tracker

和上以协议请求一样 groupname+ip 组成。

tracker处理方法

[cpp] view
plain copy







static int tracker_deal_get_storage_id(struct fast_task_info *pTask)

tracker最后通过

[cpp] view
plain copy







FDFSStorageIdInfo *fdfs_get_storage_id_by_ip(const char *group_name, \

const char *pIpAddr)

{

FDFSStorageIdInfo target;

memset(&target, 0, sizeof(FDFSStorageIdInfo));

snprintf(target.group_name, sizeof(target.group_name), "%s", group_name);

snprintf(target.ip_addr, sizeof(target.ip_addr), "%s", pIpAddr);

return (FDFSStorageIdInfo *)bsearch(&target, g_storage_ids_by_ip, \

g_storage_id_count, sizeof(FDFSStorageIdInfo), \

fdfs_cmp_group_name_and_ip);

}

该方法获取了了

[cpp] view
plain copy







FDFSStorageIdInfo

信息,然后赋值,返回。

8、通过tracker获取所有storage服务器

[cpp] view
plain copy







#define TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS 69 //get all storage ids from tracker

[cpp] view
plain copy







for (i=0; i<5; i++)

{

for (pGServer=pServerStart; pGServer<pServerEnd; pGServer++)

{

memcpy(pTServer, pGServer, sizeof(ConnectionInfo));

pTServer->sock = -1;

result = fdfs_get_storage_ids_from_tracker_server(pTServer);

if (result == 0)

{

return result;

}

}

if (pServerStart != pTrackerGroup->servers)

{

pServerStart = pTrackerGroup->servers;

}

sleep(1);

}

调用顺序

[cpp] view
plain copy







int storage_func_init(const char *filename, char *bind_addr, const int addr_size)

int fdfs_get_storage_ids_from_tracker_group(TrackerServerGroup *pTrackerGroup)

int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer)

tracker函数,每秒钟中调用,遍历所有的trackers服务器

tracker服务器获取

[cpp] view
plain copy







case TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS:

result = tracker_deal_fetch_storage_ids(pTask);

break;

然后通过这种协议格式

[cpp] view
plain copy







</pre></p><p>返回的数据</p><p></p><pre name="code" class="cpp">pIdsStart = g_storage_ids_by_ip + start_index;

pIdsEnd = g_storage_ids_by_ip + g_storage_id_count;

for (pIdInfo = pIdsStart; pIdInfo < pIdsEnd; pIdInfo++)

{

if ((int)(p - pTask->data) > pTask->size - 64)

{

break;

}

p += sprintf(p, "%s %s %s\n", pIdInfo->id, \

pIdInfo->group_name, pIdInfo->ip_addr);

}

返回给请求者。

9、回复给新的storage

[cpp] view
plain copy







#define TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG 85 //repl new storage servers

storage服务器调用流程:

剩下的协议

[cpp] view
plain copy







static int tracker_merge_servers(ConnectionInfo *pTrackerServer, \

FDFSStorageBrief *briefServers, const int server_count)

[cpp] view
plain copy







case TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE:

result = tracker_deal_service_query_fetch_update( \

pTask, pHeader->cmd);

break;

case TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE:

result = tracker_deal_service_query_fetch_update( \

pTask, pHeader->cmd);

break;

case TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ALL:

result = tracker_deal_service_query_fetch_update( \

pTask, pHeader->cmd);

break;

case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE:

result = tracker_deal_service_query_storage( \

pTask, pHeader->cmd);

break;

case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE:

result = tracker_deal_service_query_storage( \

pTask, pHeader->cmd);

break;

case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ALL:

result = tracker_deal_service_query_storage( \

pTask, pHeader->cmd);

break;

case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ALL:

result = tracker_deal_service_query_storage( \

pTask, pHeader->cmd);

break;

case TRACKER_PROTO_CMD_SERVER_LIST_ONE_GROUP:

result = tracker_deal_server_list_one_group(pTask);

break;

case TRACKER_PROTO_CMD_SERVER_LIST_ALL_GROUPS:

result = tracker_deal_server_list_all_groups(pTask);

break;

case TRACKER_PROTO_CMD_SERVER_LIST_STORAGE:

result = tracker_deal_server_list_group_storages(pTask);

break;

case TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ:

result = tracker_deal_storage_sync_src_req(pTask);

break;

case TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ:

TRACKER_CHECK_LOGINED(pTask)

result = tracker_deal_storage_sync_dest_req(pTask);

break;

case TRACKER_PROTO_CMD_STORAGE_SYNC_NOTIFY:

result = tracker_deal_storage_sync_notify(pTask);

break;

case TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_QUERY:

result = tracker_deal_storage_sync_dest_query(pTask);

break;

case TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE:

result = tracker_deal_server_delete_storage(pTask);

break;

case TRACKER_PROTO_CMD_SERVER_SET_TRUNK_SERVER:

result = tracker_deal_server_set_trunk_server(pTask);

break;

case TRACKER_PROTO_CMD_STORAGE_REPORT_IP_CHANGED:

result = tracker_deal_storage_report_ip_changed(pTask);

break;

case TRACKER_PROTO_CMD_STORAGE_CHANGELOG_REQ:

result = tracker_deal_changelog_req(pTask);

break;

case TRACKER_PROTO_CMD_STORAGE_PARAMETER_REQ:

result = tracker_deal_parameter_req(pTask);

break;

case FDFS_PROTO_CMD_QUIT:

close(pTask->ev_read.ev_fd);

task_finish_clean_up(pTask);

return 0;

case FDFS_PROTO_CMD_ACTIVE_TEST:

result = tracker_deal_active_test(pTask);

break;

case TRACKER_PROTO_CMD_TRACKER_GET_STATUS:

result = tracker_deal_get_tracker_status(pTask);

break;

case TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_START:

result = tracker_deal_get_sys_files_start(pTask);

break;

case TRACKER_PROTO_CMD_TRACKER_GET_ONE_SYS_FILE:

result = tracker_deal_get_one_sys_file(pTask);

break;

case TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_END:

result = tracker_deal_get_sys_files_end(pTask);

break;

case TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FID:

TRACKER_CHECK_LOGINED(pTask)

result = tracker_deal_report_trunk_fid(pTask);

break;

case TRACKER_PROTO_CMD_STORAGE_FETCH_TRUNK_FID:

TRACKER_CHECK_LOGINED(pTask)

result = tracker_deal_get_trunk_fid(pTask);

break;

case TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FREE:

TRACKER_CHECK_LOGINED(pTask)

result = tracker_deal_report_trunk_free_space(pTask);

break;

case TRACKER_PROTO_CMD_TRACKER_PING_LEADER:

result = tracker_deal_ping_leader(pTask);

break;

case TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER:

result = tracker_deal_notify_next_leader(pTask);

break;

case TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER:

result = tracker_deal_commit_next_leader(pTask);

break;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: