您的位置:首页 > 大数据

Avro 多reocrds(multi-records)

2018-01-12 15:53 183 查看

需求描述

最近有一个需求,要在同一个filed下放下可选的数据,需要用到Avro的Union和多个record。。

但是在网上搜索的时候并没有找到好的解决方案。。。

解决方案

好吧,不多说,直接上代码,如果有avro基础就能看懂

不同的record

{"type": "record",
"name": "Ethernet",
"fields": [
{"name": "Length",  "type":   "int"  },
{"name": "Time",  "type":   "long"  },
{"name": "SrcMAC",  "type":   "string"  },
{"name": "DstMAC",  "type":   "string"  },
{"name": "NetworkProtocol",  "type":   "string"  }
]
}

{"type": "record",
"name": "IP",
"fields": [
{"name": "IHL",  "type":   "int"  },
{"name": "TOS",  "type":   "int"  },
{"name": "Id",  "type":   "int"  },
{"name": "Flags",  "type":   "string"  },
{"name": "FragOffset",  "type":   "int"  },
{"name": "TTL",  "type":   "int"  },
{"name": "SrcIP",  "type":   "string"  },
{"name": "DstIP",  "type":   "string"  }
]
}

{"type": "record",
"name": "ICMP",
"fields": [
{"name": "Id",  "type":   "int"  },
{"name": "Seq",  "type":   "int"  },
{"name": "Checksum",  "type":   "int"  },
{"name": "type",  "type":   "string"  }
]
}

{"type": "record",
"name": "TCP",
"fields": [
{"name": "TransportProtol",  "type":   "string"  },
{"name": "SrcPort",  "type":   "int"  },
{"name": "DstPort",  "type":   "int"  },
{"name": "Seq",  "type":   "long"  },
{"name": "Ack",  "type":   "long"  },
{"name": "FIN",  "type":   "boolean"  },
{"name": "SYN",  "type":   "boolean"  },
{"name": "RST",  "type":   "boolean"  },
{"name": "PSH",  "type":   "boolean"  },
{"name": "ACK",  "type":   "boolean"  },
{"name": "URG",  "type":   "boolean"  },
{"name": "ECE",  "type":   "boolean"  },
{"name": "CWR",  "type":   "boolean"  },
{"name": "NS",  "type":   "boolean"  },
{"name": "Window",  "type":   "int"  },
{"name": "CheckSum",  "type":   "int"  }
]
}

{"type": "record",
"name": "UDP",
"fields": [
{"name": "SrcPort",  "type":   "int"  },
{"name": "DstPort",  "type":   "int"  },
{"name": "Length",  "type":   "int"  },
{"name": "Checksum",  "type":   "int"  },
{"name": "NetworkProtocol",  "type":   "string"  }
]
}


使用Avro的Union和multi-record

{
"type": "record",
"name": "data",
"fields": [
{
"name": "NetworkAccessLayer",
"type": {
"type": "record",
"name": "Ethernet",
"fields": [
{
"name": "Length",
"type": "int"
},
{
"name": "Time",
"type": "long"
},
{
"name": "SrcMAC",
"type": "string"
},
{
"name": "DstMAC",
"type": "string"
},
{
"name": "NetworkProtocol",
"type": "string"
}
]
}
},
{
"name": "InternetLayer",
"type": [
"null",
{
"type": "record",
"name": "IP",
"fields": [
{
"name": "IHL",
"type": "int"
},
{
"name": "TOS",
"type": "int"
},
{
"name": "Id",
"type": "int"
},
{
"name": "Flags",
"type": "string"
},
{
"name": "FragOffset",
"type": "int"
},
{
"name": "TTL",
"type": "int"
},
{
"name": "SrcIP",
"type": "string"
},
{
"name": "DstIP",
"type": "string"
}
]
},
{
"type": "record",
"name": "ICMP",
"fields": [
{
"name": "Id",
"type": "int"
},
{
"name": "Seq",
"type": "int"
},
{
"name": "Checksum",
"type": "int"
},
{
"name": "type",
"type": "string"
}
]
}
]
},
{
"name": "TransportLayer",
"type": [
"null",
{
"type": "record",
"name": "TCP",
"fields": [
{
"name": "ApplicationProtocol",
"type": "string"
},
{
"name": "SrcPort",
"type": "int"
},
{
"name": "DstPort",
"type": "int"
},
{
"name": "Seq",
"type": "long"
},
{
"name": "Ack",
"type": "long"
},
{
"name": "FIN",
"type": "boolean"
},
{
"name": "SYN",
"type": "boolean"
},
{
"name": "RST",
"type": "boolean"
},
{
"name": "PSH",
"type": "boolean"
},
{
"name": "ACK",
"type": "boolean"
},
{
"name": "URG",
"type": "boolean"
},
{
"name": "ECE",
"type": "boolean"
},
{
"name": "CWR",
"type": "boolean"
},
{
"name": "NS",
"type": "boolean"
},
{
"name": "Window",
"type": "int"
},
{
"name": "CheckSum",
"type": "int"
}
]
},
{
"type": "record",
"name": "UDP",
"fields": [
{
"name": "SrcPort",
"type": "int"
},
{
"name": "DstPort",
"type": "int"
},
{
"name": "Length",
"type": "int"
},
{
"name": "CheckSum",
"type": "int"
},
{
"name": "ApplicationProtocol",
"type": "string"
}
]
}
]
}
]
}


有点太长了啊,如果嫌麻烦直接看下面的总结

总结

准确来说使用了Avro的Union,在type中放入多个record的Union

原理如下

//最简单Union的例子
["null","string","int"]


这么用,注意是golang的代码(暂时没有些其他语言解释的想法,有问题可以私信我,不过估计这个偏门的问题,也没人关注)

buf, err := codec.TextFromNative(nil, goavro.Union("string", "some string"))
if err != nil {
fmt.Println(err)
}
fmt.Println(string(buf))
// Output: {"string":"some string"}


注意

要注意namespace的问题

在我的schema的定义中删除掉了所有的namespace,如果需要加入namespace,最好让所有的record处于同样的namespace下,否则会报错。

补充

补充两个坑,如果不加入namespace可能在用java进行反序列化时出错。主要是因为找不到类名造成

那么怎么做呢

1. 加入namesapce

"namespace":"com.lee"


2.对于所有的子record的输入要用全名称,否则会出错。

data["TransportLayer"] = goavro.Union("com.lee.UDP", GetUDPData(&packet))


这样就不会出错了。不过一般也不会用到这么复杂的schema。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息