您的位置:首页 > 理论基础 > 计算机网络

golang实现tcp接入服务器

2014-07-19 11:08 375 查看
接入服务器和后端业务服务其维持tcp连接,多个前端请求通过接入服务器访问后端业务服务器,接入服务器可以方便增加路由功能,维护多个业务服务器,根据消息ID路由到具体的业务服务器。



项目目录如下

simplelotus
  src
    lotus
      main.go
    lotuslib
      tcplotus.go
    test
      tcpclient.go
      tcpserver.go
  install


install源码如下:

#!/usr/bin/env bash

if [ ! -f install ]; then
echo 'install must be run within its container folder' 1>&2
exit 1
fi

CURDIR=`pwd`
OLDGOPATH="$GOPATH"
export GOPATH="$CURDIR"

gofmt -w src

go install lotus

export GOPATH="$OLDGOPATH"

echo 'finished'


main.go

package main

import (
"lotuslib"
)

const (
ip   = "0.0.0.0"
port = 1987
)

func main() {
tcplotus.TcpLotusMain(ip, port)
}


tcplotus.go(和上游维持tcp连接)

package tcplotus

import (
"encoding/json"
"log"
"net"
"strconv"
"time"
)

const (
proxy_timeout = 5
proxy_server  = "127.0.0.1:1988"
msg_length    = 1024
)

type Request struct {
reqId      int
reqContent string
rspChan    chan<- string // writeonly chan
}

//store request map
var requestMap map[int]*Request

type Clienter struct {
client  net.Conn
isAlive bool
SendStr chan *Request
RecvStr chan string
}

func (c *Clienter) Connect() bool {
if c.isAlive {
return true
} else {
var err error
c.client, err = net.Dial("tcp", proxy_server)
if err != nil {
return false
}
c.isAlive = true
log.Println("connect to " + proxy_server)
}
return true
}

//send msg to upstream server
func ProxySendLoop(c *Clienter) {

//store reqId and reqContent
senddata := make(map[string]string)
for {
if !c.isAlive {
time.Sleep(1 * time.Second)
c.Connect()
}
if c.isAlive {
req := <-c.SendStr

//construct request json string
senddata["reqId"] = strconv.Itoa(req.reqId)
senddata["reqContent"] = req.reqContent
sendjson, err := json.Marshal(senddata)
if err != nil {
continue
}

_, err = c.client.Write([]byte(sendjson))
if err != nil {
c.RecvStr <- string("proxy server close...")
c.client.Close()
c.isAlive = false
log.Println("disconnect from " + proxy_server)
continue
}
//log.Println("Write to proxy server: " + string(sendjson))
}
}
}

//recv msg from upstream server
func ProxyRecvLoop(c *Clienter) {
buf := make([]byte, msg_length)
recvdata := make(map[string]string, 2)
for {
if !c.isAlive {
time.Sleep(1 * time.Second)
c.Connect()
}
if c.isAlive {
n, err := c.client.Read(buf)
if err != nil {
c.client.Close()
c.isAlive = false
log.Println("disconnect from " + proxy_server)
continue
}
//log.Println("Read from proxy server: " + string(buf[0:n]))

if err := json.Unmarshal(buf[0:n], &recvdata); err == nil {
reqidstr := recvdata["reqId"]
if reqid, err := strconv.Atoi(reqidstr); err == nil {
req, ok := requestMap[reqid]
if !ok {
continue
}
req.rspChan <- recvdata["resContent"]
}
continue
}
}
}
}

//one handle per request
func handle(conn *net.TCPConn, id int, tc *Clienter) {

data := make([]byte, msg_length)
handleProxy := make(chan string)
request := &Request{reqId: id, rspChan: handleProxy}

requestMap[id] = request
for {
n, err := conn.Read(data)
if err != nil {
log.Println("disconnect from " + conn.RemoteAddr().String())
conn.Close()
delete(requestMap, id)
return
}
request.reqContent = string(data[0:n])
//send to proxy
select {

case tc.SendStr <- request:
case <-time.After(proxy_timeout * time.Second):
//proxyChan <- &Request{cancel: true, reqId: id}
_, err = conn.Write([]byte("proxy server send timeout."))
if err != nil {
conn.Close()
delete(requestMap, id)
return
}
continue
}

//read from proxy
select {
case rspContent := <-handleProxy:
_, err := conn.Write([]byte(rspContent))
if err != nil {
conn.Close()
delete(requestMap, id)
return
}
case <-time.After(proxy_timeout * time.Second):
_, err = conn.Write([]byte("proxy server recv timeout."))
if err != nil {
conn.Close()
delete(requestMap, id)
return
}
continue
}
}
}

func TcpLotusMain(ip string, port int) {
//start tcp server
listen, err := net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP(ip), port, ""})
if err != nil {
log.Fatalln("listen port error")
return
}
log.Println("start tcp server " + ip + " " + strconv.Itoa(port))
defer listen.Close()

//start proxy connect and loop
var tc Clienter
tc.SendStr = make(chan *Request, 1000)
tc.RecvStr = make(chan string)
tc.Connect()
go ProxySendLoop(&tc)
go ProxyRecvLoop(&tc)

//listen new request
requestMap = make(map[int]*Request)
var id int = 0
for {

conn, err := listen.AcceptTCP()
if err != nil {
log.Println("receive connection failed")
continue
}
id++
log.Println("connected from " + conn.RemoteAddr().String())
go handle(conn, id, &tc)

}
}


测试代码如下:

tcpserver.go

package main

import (
"encoding/json"
"fmt"
"net"
)

const (
msg_length = 1024
)

func Echo(c net.Conn) {
data := make([]byte, msg_length)
defer c.Close()

var recvdata map[string]string
recvdata = make(map[string]string, 2)
var senddata map[string]string
senddata = make(map[string]string, 2)

for {
n, err := c.Read(data)
if err != nil {
fmt.Printf("read message from lotus failed")
return
}

if err := json.Unmarshal(data[0:n], &recvdata); err == nil {
senddata["reqId"] = recvdata["reqId"]
senddata["resContent"] = "Hello " + recvdata["reqContent"]

sendjson, err := json.Marshal(senddata)
_, err = c.Write([]byte(sendjson))
if err != nil {
fmt.Printf("disconnect from lotus server")
return
}
}
}
}

func main() {
fmt.Printf("Server is ready...\n")
l, err := net.Listen("tcp", ":1988")
if err != nil {
fmt.Printf("Failure to listen: %s\n", err.Error())
}

for {
if c, err := l.Accept(); err == nil {
go Echo(c) //new thread
}
}
}


tcpclient.go

package main

import (
"bufio"
"fmt"
"net"
"os"
"time"
)

type Clienter struct {
client  net.Conn
isAlive bool
SendStr chan string
RecvStr chan string
}

func (c *Clienter) Connect() bool {
if c.isAlive {
return true
} else {
var err error
c.client, err = net.Dial("tcp", "127.0.0.1:1987")
if err != nil {
fmt.Printf("Failure to connet:%s\n", err.Error())
return false
}
c.isAlive = true
}
return true
}

func (c *Clienter) Echo() {
line := <-c.SendStr
c.client.Write([]byte(line))
buf := make([]byte, 1024)
n, err := c.client.Read(buf)
if err != nil {
c.RecvStr <- string("Server close...")
c.client.Close()
c.isAlive = false
return
}
time.Sleep(1 * time.Second)
c.RecvStr <- string(buf[0:n])
}

func Work(tc *Clienter) {
if !tc.isAlive {
if tc.Connect() {
tc.Echo()
} else {
<-tc.SendStr
tc.RecvStr <- string("Server close...")
}
} else {
tc.Echo()
}
}
func main() {
var tc Clienter
tc.SendStr = make(chan string)
tc.RecvStr = make(chan string)
if !tc.Connect() {
return
}
r := bufio.NewReader(os.Stdin)
for {
switch line, ok := r.ReadString('\n'); true {
case ok != nil:
fmt.Printf("bye bye!\n")
return
default:
go Work(&tc)
tc.SendStr <- line
s := <-tc.RecvStr
fmt.Printf("back:%s\n", s)
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: