您的位置:首页 > 理论基础 > 计算机网络

【OVS2.5.0源码分析】sFlow实现分析(1)

2016-09-09 22:33 573 查看
sFlow实现可以分成4个部分:1)配置;2)流表生成(生成datapath的流表);3)datapath中的处理(已经分析);4)sFlow处理过程。本篇分析sFlow的配置过程。

1、bridge_configure_sflow函数

/* Set sFlow configuration on 'br'. */
static void
bridge_configure_sflow(struct bridge *br, int *sflow_bridge_number)
{
const struct ovsrec_sflow *cfg = br->cfg->sflow;
struct ovsrec_controller **controllers;
struct ofproto_sflow_options oso;
size_t n_controllers;
size_t i;

if (!cfg) {
ofproto_set_sflow(br->ofproto, NULL);
return;
}

memset(&oso, 0, sizeof oso);

sset_init(&oso.targets);
sset_add_array(&oso.targets, cfg->targets, cfg->n_targets); //sFlow统计信息接收器

oso.sampling_rate = SFL_DEFAULT_SAMPLING_RATE;
if (cfg->sampling) {
oso.sampling_rate = *cfg->sampling; //采样频率
}

oso.polling_interval = SFL_DEFAULT_POLLING_INTERVAL;
if (cfg->polling) {
oso.polling_interval = *cfg->polling;
}

oso.header_len = SFL_DEFAULT_HEADER_SIZE;
if (cfg->header) {
oso.header_len = *cfg->header;
}

oso.sub_id = (*sflow_bridge_number)++;
oso.agent_device = cfg->agent; //agent设备,可以提取本地IP地址

oso.control_ip = NULL;
n_controllers = bridge_get_controllers(br, &controllers);
for (i = 0; i < n_controllers; i++) {
if (controllers[i]->local_ip) {
oso.control_ip = controllers[i]->local_ip; //控制IP,作为本地IP使用
break;
}
}
ofproto_set_sflow(br->ofproto, &oso); //配置sflow

sset_destroy(&oso.targets);
}

2、ofproto_set_sflow函数

int
ofproto_set_sflow(struct ofproto *ofproto,
const struct ofproto_sflow_options *oso)
{
if (oso && sset_is_empty(&oso->targets)) {
oso = NULL;
}

if (ofproto->ofproto_class->set_sflow) {
return ofproto->ofproto_class->set_sflow(ofproto, oso); //配置sFlow,实际为set_sflow函数
} else {
return oso ? EOPNOTSUPP : 0;
}
}3、set_sflow函数
static int
set_sflow(struct ofproto *ofproto_,
const struct ofproto_sflow_options *sflow_options)
{
struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
struct dpif_sflow *ds = ofproto->sflow;

if (sflow_options) {
uint32_t old_probability = ds ? dpif_sflow_get_probability(ds) : 0;
if (!ds) {
struct ofport_dpif *ofport;

ds = ofproto->sflow = dpif_sflow_create(); //如果ds不存在,则新建dpif_sflow对象
HMAP_FOR_EACH (ofport, up.hmap_node, &ofproto->up.ports) {
dpif_sflow_add_port(ds, &ofport->up, ofport->odp_port); //sflow添加端口,第一次创建ds时会进入该流程
}
}
dpif_sflow_set_options(ds, sflow_options); //设置sflow信息
if (dpif_sflow_get_probability(ds) != old_probability) {
ofproto->backer->need_revalidate = REV_RECONFIGURE;
}
} else {
if (ds) {
dpif_sflow_unref(ds);
ofproto->backer->need_revalidate = REV_RECONFIGURE;
ofproto->sflow = NULL;
}
}
return 0;
}4、dpif_sflow_set_options函数
void
dpif_sflow_set_options(struct dpif_sflow *ds,
const struct ofproto_sflow_options *options)
OVS_EXCLUDED(mutex)
{
struct dpif_sflow_port *dsp;
bool options_changed;
SFLReceiver *receiver;
SFLAddress agentIP;
time_t now;
SFLDataSource_instance dsi;
uint32_t dsIndex;
SFLSampler *sampler;
SFLPoller *poller;

ovs_mutex_lock(&mutex);
if (sset_is_empty(&options->targets) || !options->sampling_rate) {
/* No point in doing any work if there are no targets or nothing to
* sample. */
dpif_sflow_clear__(ds);
goto out;
}

options_changed = (!ds->options
|| !ofproto_sflow_options_equal(options, ds->options));

/* Configure collectors if options have changed or if we're shortchanged in
* collectors (which indicates that opening one or more of the configured
* collectors failed, so that we should retry). */
if (options_changed
|| collectors_count(ds->collectors) < sset_count(&options->targets)) {
collectors_destroy(ds->collectors);
collectors_create(&options->targets, SFL_DEFAULT_COLLECTOR_PORT, //创建collector,即sflow接收器,最终会通过collector对象发送报文
&ds->collectors);
if (ds->collectors == NULL) {
VLOG_WARN_RL(&rl, "no collectors could be initialized, "
"sFlow disabled");
dpif_sflow_clear__(ds);
goto out;
}
}

/* Choose agent IP address and agent device (if not yet setup) */
if (!sflow_choose_agent_address(options->agent_device, //设置agent地址,优先选择agent_device
&options->targets,
options->control_ip, &agentIP)) {
dpif_sflow_clear__(ds);
goto out;
}

/* Avoid reconfiguring if options didn't change. */
if (!options_changed) {
goto out;
}
ofproto_sflow_options_destroy(ds->options);
ds->options = ofproto_sflow_options_clone(options);

/* Create agent. */
VLOG_INFO("creating sFlow agent %d", options->sub_id);
if (ds->sflow_agent) {
sflow_global_counters_subid_clear(ds->sflow_agent->subId);
sfl_agent_release(ds->sflow_agent);
}
ds->sflow_agent = xcalloc(1, sizeof *ds->sflow_agent); //创建sflow agent对象
now = time_wall();
sfl_agent_init(ds->sflow_agent,
&agentIP,
options->sub_id,
now, /* Boot time. */
now, /* Current time. */
ds, /* Pointer supplied to callbacks. */
sflow_agent_alloc_cb,
sflow_agent_free_cb,
sflow_agent_error_cb,
sflow_agent_send_packet_cb);

receiver = sfl_agent_addReceiver(ds->sflow_agent); //创建receiver对象
sfl_receiver_set_sFlowRcvrOwner(receiver, "Open vSwitch sFlow");
sfl_receiver_set_sFlowRcvrTimeout(receiver, 0xffffffff);

/* Set the sampling_rate down in the datapath. */
ds->probability = MAX(1, UINT32_MAX / ds->options->sampling_rate);

/* Add a single sampler for the bridge. This appears as a PHYSICAL_ENTITY
because it is associated with the hypervisor, and interacts with the server
hardware directly. The sub_id is used to distinguish this sampler from
others on other bridges within the same agent. */
dsIndex = 1000 + options->sub_id;
SFL_DS_SET(dsi, SFL_DSCLASS_PHYSICAL_ENTITY, dsIndex, 0);
sampler = sfl_agent_addSampler(ds->sflow_agent, &dsi); //创建Sampler对象
sfl_sampler_set_sFlowFsPacketSamplingRate(sampler, ds->options->sampling_rate);
sfl_sampler_set_sFlowFsMaximumHeaderSize(sampler, ds->options->header_len);
sfl_sampler_set_sFlowFsReceiver(sampl
4000
er, RECEIVER_INDEX); //Sampler设置receiver对象

/* Add a counter poller for the bridge so we can use it to send
global counters such as datapath cache hit/miss stats. */
poller = sfl_agent_addPoller(ds->sflow_agent, &dsi, ds,
sflow_agent_get_global_counters);
sfl_poller_set_sFlowCpInterval(poller, ds->options->polling_interval);
sfl_poller_set_sFlowCpReceiver(poller, RECEIVER_INDEX); //Poller设置receiver对象

/* Add pollers for the currently known ifindex-ports */
HMAP_FOR_EACH (dsp, hmap_node, &ds->ports) {
if (SFL_DS_INDEX(dsp->dsi)) {
dpif_sflow_add_poller(ds, dsp);
}
}

out:
ovs_mutex_unlock(&mutex);
}
5、collectors_create函数
int
collectors_create(const struct sset *targets, uint16_t default_port,
struct collectors **collectorsp)
{
struct collectors *c;
const char *name;
int retval = 0;

c = xmalloc(sizeof *c);
c->fds = xmalloc(sizeof *c->fds * sset_count(targets));
c->n_fds = 0;
SSET_FOR_EACH (name, targets) {
int error;
int fd;

error = inet_open_active(SOCK_DGRAM, name, default_port, NULL, &fd, 0); //创建sock对象,用于发送数据
if (fd >= 0) {
c->fds[c->n_fds++] = fd;
} else {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);

VLOG_WARN_RL(&rl, "couldn't open connection to collector %s (%s)",
name, ovs_strerror(error));
if (!retval) {
retval = error;
}
}
}

if (c->n_fds) {
*collectorsp = c;
} else {
collectors_destroy(c);
*collectorsp = NULL;
}

return retval;
}
sflow主要的数据结构如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息