您的位置:首页 > 产品设计 > UI/UE

mosquitto-0.15-client-pub_client.c源码分析

2012-07-30 10:05 579 查看
/*
Copyright (c) 2009-2012 Roger Light <roger@atchoo.org>
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of mosquitto nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/

#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifndef WIN32
#include <unistd.h>
#else
#include <process.h>
#include <winsock2.h>
#define snprintf sprintf_s
#endif

#include <mosquitto.h>

#define MSGMODE_NONE 0
#define MSGMODE_CMD 1
#define MSGMODE_STDIN_LINE 2
#define MSGMODE_STDIN_FILE 3
#define MSGMODE_FILE 4
#define MSGMODE_NULL 5

#define STATUS_CONNECTING 0
#define STATUS_CONNACK_RECVD 1

static char *topic = NULL;
static char *message = NULL;
static long msglen = 0;
static int qos = 0;
static int retain = 0;
static int mode = MSGMODE_NONE;        //消息类型,默认是MSGMODE_NONE
static int status = STATUS_CONNECTING;
static uint16_t mid_sent = 0;
static bool connected = true;
static char *username = NULL;
static char *password = NULL;
static bool disconnect_sent = false;
static bool quiet = false;

void my_connect_callback(void *obj, int result)    //obj:<mosquitto_new>中提供的用户数据;result:0-成功,1-不可接受的协议版本,2-标示符拒绝,3-broker不可达。。。
{
//mode是MSGMODE_STDIN_FILE和MSGMODE_NULL时发布消息
struct mosquitto *mosq = obj;
int rc = MOSQ_ERR_SUCCESS;

if(!result){
switch(mode){
case MSGMODE_CMD:    //-m
case MSGMODE_FILE:    //-f
case MSGMODE_STDIN_FILE:    //-s
rc = mosquitto_publish(mosq, &mid_sent, topic, msglen, (uint8_t *)message, qos, retain);
break;
case MSGMODE_NULL:    //-n
rc = mosquitto_publish(mosq, &mid_sent, topic, 0, NULL, qos, retain);
break;
case MSGMODE_STDIN_LINE:    //-l
status = STATUS_CONNACK_RECVD;
break;
}
if(rc){
if(!quiet){
switch(rc){
case MOSQ_ERR_INVAL:
fprintf(stderr, "Error: Invalid input. Does your topic contain '+' or '#'?\n");
break;
case MOSQ_ERR_NOMEM:
fprintf(stderr, "Error: Out of memory when trying to publish message.\n");
break;
case MOSQ_ERR_NO_CONN:
fprintf(stderr, "Error: Client not connected when trying to publish.\n");
break;
case MOSQ_ERR_PROTOCOL:
fprintf(stderr, "Error: Protocol error when communicating with broker.\n");
break;
case MOSQ_ERR_PAYLOAD_SIZE:
fprintf(stderr, "Error: Message payload is too large.\n");
break;
}
}
mosquitto_disconnect(mosq);
}
}else{
switch(result){
case 1:
if(!quiet) fprintf(stderr, "Connection Refused: unacceptable protocol version\n");
break;
case 2:
if(!quiet) fprintf(stderr, "Connection Refused: identifier rejected\n");
break;
case 3:
if(!quiet) fprintf(stderr, "Connection Refused: broker unavailable\n");
break;
case 4:
if(!quiet) fprintf(stderr, "Connection Refused: bad user name or password\n");
break;
case 5:
if(!quiet) fprintf(stderr, "Connection Refused: not authorised\n");
break;
default:
if(!quiet) fprintf(stderr, "Connection Refused: unknown reason\n");
break;
}
}
}

void my_disconnect_callback(void *obj)
{
//连接状态conneted设为false
connected = false;
}

void my_publish_callback(void *obj, uint16_t mid)
{
//mode不是MSGMODE_STDIN_LINE(-l,从标准输入读取消息)且disconnect_sent是false(未断开连接)时,与broker断开连接
struct mosquitto *mosq = obj;

if(mode != MSGMODE_STDIN_LINE && disconnect_sent == false){
mosquitto_disconnect(mosq);
disconnect_sent = true;
}
}

int load_stdin(void)
{
//-s,从标准输入按文件读取,复制到message,直到遇到文件终止符EOF(ctrl+D)
long pos = 0, rlen;
char buf[1024];

mode = MSGMODE_STDIN_FILE;

while(!feof(stdin)){
rlen = fread(buf, 1, 1024, stdin);
message = realloc(message, pos+rlen);
if(!message){
if(!quiet) fprintf(stderr, "Error: Out of memory.\n");
return 1;
}
memcpy(&(message[pos]), buf, rlen);
pos += rlen;
}
msglen = pos;

if(!msglen){
if(!quiet) fprintf(stderr, "Error: Zero length input.\n");
return 1;
}

return 0;
}

int load_file(const char *filename)
{
long pos, rlen;
FILE *fptr = NULL;

fptr = fopen(filename, "rb"); //只读打开一个二进制文件
if(!fptr){
if(!quiet) fprintf(stderr, "Error: Unable to open file \"%s\".\n", filename);
return 1;
}
mode = MSGMODE_FILE;
fseek(fptr, 0, SEEK_END);
msglen = ftell(fptr);    //获取文件当前读写位置偏移字节数,即文本长度
if(msglen > 268435455){    //不超过255MB
fclose(fptr);
if(!quiet) fprintf(stderr, "Error: File \"%s\" is too large (>268,435,455 bytes).\n", filename);
return 1;
}
if(msglen == 0){
fclose(fptr);
if(!quiet) fprintf(stderr, "Error: File \"%s\" is empty.\n", filename);
return 1;
}
fseek(fptr, 0, SEEK_SET);
message = malloc(msglen);
if(!message){
fclose(fptr);
if(!quiet) fprintf(stderr, "Error: Out of memory.\n");
return 1;
}
pos = 0;
//读取文件内容至message中
while(pos < msglen){
rlen = fread(&(message[pos]), sizeof(char), msglen-pos, fptr);
pos += rlen;
}
fclose(fptr);
return 0;
}

void print_usage(void)
{
printf("mosquitto_pub is a simple mqtt client that will publish a message on a single topic and exit.\n\n");
printf("Usage: mosquitto_pub [-h host] [-p port] [-q qos] [-r] {-f file | -l | -n | -m message} -t topic\n");
printf("                     [-i id] [-I id_prefix]\n");
printf("                     [-d] [--quiet]\n");
printf("                     [-u username [-P password]]\n");
printf("                     [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]]\n\n");
printf(" -d : enable debug messages.\n");
printf(" -f : send the contents of a file as the message.\n");
printf(" -h : mqtt host to connect to. Defaults to localhost.\n");
printf(" -i : id to use for this client. Defaults to mosquitto_pub_ appended with the process id.\n");
printf(" -I : define the client id as id_prefix appended with the process id. Useful for when the\n");
printf("      broker is using the clientid_prefixes option.\n");
printf(" -l : read messages from stdin, sending a separate message for each line.\n");
printf(" -m : message payload to send.\n");
printf(" -n : send a null (zero length) message.\n");
printf(" -p : network port to connect to. Defaults to 1883.\n");
printf(" -q : quality of service level to use for all messages. Defaults to 0.\n");
printf(" -r : message should be retained.\n");
printf(" -s : read message from stdin, sending the entire input as a message.\n");
printf(" -t : mqtt topic to publish to.\n");
printf(" -u : provide a username (requires MQTT 3.1 broker)\n");
printf(" -P : provide a password (requires MQTT 3.1 broker)\n");
printf(" --quiet : don't print error messages.\n");
printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n");
printf("                  unexpected disconnection. If not given and will-topic is set, a zero\n");
printf("                  length message will be sent.\n");
printf(" --will-qos : QoS level for the client Will.\n");
printf(" --will-retain : if given, make the client Will retained.\n");
printf(" --will-topic : the topic on which to publish the client Will.\n");
printf("\nSee http://mosquitto.org/ for more information.\n\n");
}

int main(int argc, char *argv[])
{
char *id = NULL;    //client ID
char *id_prefix = NULL;    //client ID 前缀
int i;
char *host = "localhost";    //server IP,默认是localhost
int port = 1883;    //server PORT,默认是1883
int keepalive = 60;        //
int opt;
char buf[1024];
bool debug = false;        //是否打印debug消息
struct mosquitto *mosq = NULL;
int rc;
int rc2;
char hostname[21];
char err[1024];

uint8_t *will_payload = NULL;
long will_payloadlen = 0;
int will_qos = 0;
bool will_retain = false;
char *will_topic = NULL;

//获取命令参数
for(i=1; i<argc; i++){
if(!strcmp(argv[i], "-p") || !strcmp(argv[i], "--port")){    //端口号
if(i==argc-1){
fprintf(stderr, "Error: -p argument given but no port specified.\n\n");
print_usage();
return 1;
}else{
port = atoi(argv[i+1]);
if(port<1 || port>65535){
fprintf(stderr, "Error: Invalid port given: %d\n", port);
print_usage();
return 1;
}
}
i++;
}else if(!strcmp(argv[i], "-d") || !strcmp(argv[i], "--debug")){    //debug选项
debug = true;
}else if(!strcmp(argv[i], "-f") || !strcmp(argv[i], "--file")){        //-f,读取文件
if(mode != MSGMODE_NONE){
fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n");
print_usage();
return 1;
}else if(i==argc-1){
fprintf(stderr, "Error: -f argument given but no file specified.\n\n");
print_usage();
return 1;
}else{
if(load_file(argv[i+1])) return 1;
}
i++;
}else if(!strcmp(argv[i], "-h") || !strcmp(argv[i], "--host")){        //-h,server IP
if(i==argc-1){
fprintf(stderr, "Error: -h argument given but no host specified.\n\n");
print_usage();
return 1;
}else{
host = argv[i+1];
}
i++;
}else if(!strcmp(argv[i], "-i") || !strcmp(argv[i], "--id")){        //-i, client id
if(id_prefix){
fprintf(stderr, "Error: -i and -I argument cannot be used together.\n\n");
print_usage();
return 1;
}
if(i==argc-1){
fprintf(stderr, "Error: -i argument given but no id specified.\n\n");
print_usage();
return 1;
}else{
id = argv[i+1];
}
i++;
}else if(!strcmp(argv[i], "-I") || !strcmp(argv[i], "--id-prefix")){     //-I,client id前缀
if(id){
fprintf(stderr, "Error: -i and -I argument cannot be used together.\n\n");
print_usage();
return 1;
}
if(i==argc-1){
fprintf(stderr, "Error: -I argument given but no id prefix specified.\n\n");
print_usage();
return 1;
}else{
id_prefix = argv[i+1];
}
i++;
}else if(!strcmp(argv[i], "-l") || !strcmp(argv[i], "--stdin-line")){        //-l,从标准输入按行读取发送消息,一行发送一条消息
if(mode != MSGMODE_NONE){
fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n");
print_usage();
return 1;
}else{
mode = MSGMODE_STDIN_LINE;
#ifndef WIN32
opt = fcntl(fileno(stdin), F_GETFL, 0); //获取文件的flags
if(opt == -1 || fcntl(fileno(stdin), F_SETFL, opt | O_NONBLOCK) == -1){ //设置文件flags非阻塞O_NONBLOCK
fprintf(stderr, "Error: Unable to set stdin to non-blocking.\n");
return 1;
}
#endif
}
}else if(!strcmp(argv[i], "-m") || !strcmp(argv[i], "--message")){    //-m,从cmd发送消息,消息内容跟在-m之后
if(mode != MSGMODE_NONE){
fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n");
print_usage();
return 1;
}else if(i==argc-1){
fprintf(stderr, "Error: -m argument given but no message specified.\n\n");
print_usage();
return 1;
}else{
message = argv[i+1];
msglen = strlen(message);
mode = MSGMODE_CMD;
}
i++;
}else if(!strcmp(argv[i], "-n") || !strcmp(argv[i], "--null-message")){        //-n,发送空消息
if(mode != MSGMODE_NONE){
fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n");
print_usage();
return 1;
}else{
mode = MSGMODE_NULL;
}
}else if(!strcmp(argv[i], "-q") || !strcmp(argv[i], "--qos")){        //-q,服务质量,0,1,或2
if(i==argc-1){
fprintf(stderr, "Error: -q argument given but no QoS specified.\n\n");
print_usage();
return 1;
}else{
qos = atoi(argv[i+1]);
if(qos<0 || qos>2){
fprintf(stderr, "Error: Invalid QoS given: %d\n", qos);
print_usage();
return 1;
}
}
i++;
}else if(!strcmp(argv[i], "--quiet")){    //-quiet,什么都不打印出来
quiet = true;
}else if(!strcmp(argv[i], "-r") || !strcmp(argv[i], "--retain")){    //-r,retained消息会在服务器上保留,但只保留带-r标志的最后一条消息
retain = 1;
}else if(!strcmp(argv[i], "-s") || !strcmp(argv[i], "--stdin-file")){        //-s,从标准输入按文件读取,到EOF时把所有输入内容用一条消息发送
if(mode != MSGMODE_NONE){
fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n");
print_usage();
return 1;
}else{
if(load_stdin()) return 1;
}
}else if(!strcmp(argv[i], "-t") || !strcmp(argv[i], "--topic")){    //-t,消息主题,只能发布一个主题
if(i==argc-1){
fprintf(stderr, "Error: -t argument given but no topic specified.\n\n");
print_usage();
return 1;
}else{
topic = argv[i+1];
}
i++;
}else if(!strcmp(argv[i], "-u") || !strcmp(argv[i], "--username")){        //-u,username
if(i==argc-1){
fprintf(stderr, "Error: -u argument given but no username specified.\n\n");
print_usage();
return 1;
}else{
username = argv[i+1];
}
i++;
}else if(!strcmp(argv[i], "-P") || !strcmp(argv[i], "--pw")){    //-P,password
if(i==argc-1){
fprintf(stderr, "Error: -P argument given but no password specified.\n\n");
print_usage();
return 1;
}else{
password = argv[i+1];
}
i++;
}else if(!strcmp(argv[i], "--will-payload")){
if(i==argc-1){
fprintf(stderr, "Error: --will-payload argument given but no will payload specified.\n\n");
print_usage();
return 1;
}else{
will_payload = (uint8_t *)argv[i+1];
will_payloadlen = strlen((char *)will_payload);
}
i++;
}else if(!strcmp(argv[i], "--will-qos")){
if(i==argc-1){
fprintf(stderr, "Error: --will-qos argument given but no will QoS specified.\n\n");
print_usage();
return 1;
}else{
will_qos = atoi(argv[i+1]);
if(will_qos < 0 || will_qos > 2){
fprintf(stderr, "Error: Invalid will QoS %d.\n\n", will_qos);
return 1;
}
}
i++;
}else if(!strcmp(argv[i], "--will-retain")){
will_retain = true;
}else if(!strcmp(argv[i], "--will-topic")){
if(i==argc-1){
fprintf(stderr, "Error: --will-topic argument given but no will topic specified.\n\n");
print_usage();
return 1;
}else{
will_topic = argv[i+1];
}
i++;
}else{
fprintf(stderr, "Error: Unknown option '%s'.\n",argv[i]);
print_usage();
return 1;
}
}
if(id_prefix){    //有设定client ID前缀
id = malloc(strlen(id_prefix)+10);
if(!id){
if(!quiet) fprintf(stderr, "Error: Out of memory.\n");
return 1;
}
snprintf(id, strlen(id_prefix)+10, "%s%d", id_prefix, getpid());
}else if(!id){        //没有前缀,也没有设定client ID
id = malloc(30);
if(!id){
if(!quiet) fprintf(stderr, "Error: Out of memory.\n");
return 1;
}
memset(hostname, 0, 21);
gethostname(hostname, 20);    //获得主机名
snprintf(id, 23, "mosq_pub_%d_%s", getpid(), hostname);
}

if(!topic || mode == MSGMODE_NONE){
fprintf(stderr, "Error: Both topic and message must be supplied.\n");
print_usage();
return 1;
}

if(will_payload && !will_topic){
fprintf(stderr, "Error: Will payload given, but no will topic given.\n");
print_usage();
return 1;
}
if(will_retain && !will_topic){
fprintf(stderr, "Error: Will retain given, but no will topic given.\n");
print_usage();
return 1;
}
if(password && !username){
if(!quiet) fprintf(stderr, "Warning: Not using password since username not set.\n");
}
mosquitto_lib_init();    //任何mosquitto functions之前都必须调用的函数,初始化操作
mosq = mosquitto_new(id, NULL);    //新建一个 mosquitto client实例
if(!mosq){        //未建成功client实例
if(!quiet) fprintf(stderr, "Error: Out of memory.\n");
return 1;
}
if(debug){        //需要记录debug信息,初始化
mosquitto_log_init(mosq, MOSQ_LOG_DEBUG | MOSQ_LOG_ERR | MOSQ_LOG_WARNING
| MOSQ_LOG_NOTICE | MOSQ_LOG_INFO, MOSQ_LOG_STDERR);
}
if(will_topic && mosquitto_will_set(mosq, true, will_topic, will_payloadlen, will_payload, will_qos, will_retain)){        //will信息配置,在connect之前调用
if(!quiet) fprintf(stderr, "Error: Problem setting will.\n");
return 1;
}
if(username && mosquitto_username_pw_set(mosq, username, password)){        //设置用户名,密码
if(!quiet) fprintf(stderr, "Error: Problem setting username and password.\n");
return 1;
}

mosquitto_connect_callback_set(mosq, my_connect_callback);    //设置当broker给一个连接回复CONNACK时所调用的函数void callback(void *obj, int rc)
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);    //设置当broker收到DISCONNECT命令且与client断开后调用的函数
mosquitto_publish_callback_set(mosq, my_publish_callback);        //设置当一条被<mosquitto_publish>初始化的消息发送给broker后调用的函数

rc = mosquitto_connect(mosq, host, port, keepalive, true);    //连接到一个MQTT broker
if(rc){
//连接不成功
if(!quiet){
if(rc == MOSQ_ERR_ERRNO){
#ifndef WIN32
strerror_r(errno, err, 1024);
#else
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errno, 0, (LPTSTR)&err, 1024, NULL);
#endif
fprintf(stderr, "Error: %s\n", err);
}else{
fprintf(stderr, "Unable to connect (%d).\n", rc);
}
}
return rc;
}

do{
if(mode == MSGMODE_STDIN_LINE && status == STATUS_CONNACK_RECVD){
if(fgets(buf, 1024, stdin)){
buf[strlen(buf)-1] = '\0';
rc2 = mosquitto_publish(mosq, &mid_sent, topic, strlen(buf), (uint8_t *)buf, qos, retain);
if(rc2){
if(!quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2);
mosquitto_disconnect(mosq);
}
}else if(feof(stdin) && disconnect_sent == false){
mosquitto_disconnect(mosq);
disconnect_sent = true;
}
}
rc = mosquitto_loop(mosq, -1);
}while(rc == MOSQ_ERR_SUCCESS && connected);

if(message && mode == MSGMODE_FILE){
free(message);
}
mosquitto_destroy(mosq);        //释放mosquitto实例的内存空间
mosquitto_lib_cleanup();        //释放library所使用的资源
return rc;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: