socket_server.go 6.29 KB
package network

import (
	"World/conf"
	pb "World/pb"
	"bytes"
	"common/beegomap"
	"common/logger"
	"encoding/binary"
	"errors"
	"io"
	"net"
	"sync"
	"time"
	_ "time"

	"github.com/golang/protobuf/proto"
)

const (
	SOCKET_RECV_BUFFER_LEN = 4096
	Min_Message_Size       = 16
	Max_Message_Size       = 4096
)

type MessageItem struct {
	Header    pb.MessageHeader
	BodyBytes []byte
}

type SocketInfo struct {
	conn   net.Conn
	seq    uint32
	MsgCh  chan *MessageItem
	expire int64
}

var (
	m_conn_map *beegomap.BeeMap //     = make(map[uint32]SocketInfo) //playerid -- socket conn
	m_locker   *sync.RWMutex
)

func init() {
	m_conn_map = beegomap.NewBeeMap()
	m_locker = new(sync.RWMutex)
}

func CheckErr(err error) {
	if err != nil {
		panic(err)
	}
}

func StartRecycle() {
	for {
		m_locker.Lock()
		items := m_conn_map.Items()
		now := time.Now().Unix()
		for uid, vv := range items {
			sock := vv.(*SocketInfo)
			if sock.expire > 0 && now > sock.expire {
				close(sock.MsgCh)
				m_conn_map.Delete(uid)
				logger.Info("recycle socket seq:%v uid:%v", sock.seq, uid)
				break
			}
		}
		m_locker.Unlock()
		time.Sleep(time.Second * 20)
	}
}

func StartSocketServer() {
	go StartRecycle()
	//	log.Println("StartSocketServer")
	addr := conf.GetSocketListenAddr()
	netListen, err := net.Listen("tcp", addr)
	CheckErr(err)
	defer netListen.Close()

	logger.Info("listen ok on :%v", addr)

	var seq uint32
	for {
		conn, err := netListen.Accept()
		if err != nil {
			continue
		}

		seq++
		logger.Info("got tcp connection from %v seq:%v", conn.RemoteAddr().String(), seq)

		go handleConnection(seq, conn)
	}
}

func handleConnection(seq uint32, conn net.Conn) {
	//todo: buffer长度参数化
	var buffer_len uint32
	var conn_uid uint32
	conn_uid = 0

	buffer := make([]byte, 0, 4096)
	buffer_len = 0

	tmp := make([]byte, 1024)

	defer conn.Close()
	var end pb.NoticeConnectionClose
	do_end := false
	for {

		n, err := conn.Read(tmp)

		if err != nil {
			if err != io.EOF {
				end.Reason = 3
				logger.Notic("read socket error:%v uid:%v seq:%v", err, conn_uid, seq)
			} else {
				logger.Info("client closed connection uid:%v seq:%v", conn_uid, seq)
				//todo: 通知到逻辑中?
				end.Reason = 1
			}
			do_end = true
			break
		}

		buffer = append(buffer, tmp[:n]...)
		buffer_len += uint32(n)

		for buffer_len >= uint32(Min_Message_Size) {

			el, err, uid := handleBuffer(conn, buffer, buffer_len, seq)
			if err != nil {
				logger.Info("handler buffer error:%v", err)
				break
			} else {
				buffer_len -= el
				buffer = buffer[el:]
			}
			if conn_uid == 0 {
				conn_uid = uid
				//first
			}
		}
	}

	if do_end {
		if buffer_len > 0 {
			//clear rest imcomplete data
			buffer_len = 0
			buffer = buffer[:0]
		}

		if conn_uid > 0 {

			var header pb.MessageHeader
			header.PlayerID = conn_uid
			header.Seq = seq
			header.MsgID = uint16(pb.MSGID_MsgID_ConnClose_Notice)
			tmp, _ := proto.Marshal(&end)

			item := new(MessageItem)
			item.Header = header
			item.BodyBytes = append(item.BodyBytes, tmp...)

			m_locker.RLock()
			vv := m_conn_map.Get(conn_uid)
			if vv != nil {
				sock := vv.(*SocketInfo)
				sock.MsgCh <- item

				m_locker.RUnlock()

				logger.Notic("send player close message to player channel uid:%v seq:%v",
					conn_uid, seq)
			} else {
				m_locker.RUnlock()
			}
			//HandleRawData(conn, header, tmp)
		}

		//check end destroy
		m_locker.Lock()
		vv := m_conn_map.Get(conn_uid)
		if vv != nil {

			sock := vv.(*SocketInfo)
			if sock.seq == seq {
				sock.expire = time.Now().Unix() + 30
				//close(sock.MsgCh)
				//m_conn_map.Delete(conn_uid)

				m_locker.Unlock()

				logger.Notic("destroy player socket info uid:%v seq:%v",
					conn_uid, seq)
			} else {
				m_locker.Unlock()

				logger.Notic("skip destroy player socket info uid:%v seq:%v conn seq:%v",
					conn_uid, seq, sock.seq)
			}
		} else {
			m_locker.Unlock()
		}
	}
}

func handleBuffer(conn net.Conn, buffer []byte, size uint32, seq uint32) (eat_len uint32, err error, uid uint32) {
	var header pb.MessageHeader
	rd := bytes.NewReader(buffer[0:Min_Message_Size])
	binary.Read(rd, binary.BigEndian, &header)
	uid = header.PlayerID

	if header.PackageLen > Max_Message_Size {
		logger.Info(" got invalid message size:%v", header.PackageLen)
		//todo: reset connection or skip this message?
		eat_len = size
		err = nil
		return
	}

	if uint32(header.PackageLen) > size {
		eat_len = 0
		err = errors.New("not enough")
		return
	}

	if MsgIDExist(header.MsgID) == false {
		logger.Info("message id:%v %v not register", header.MsgID, pb.MSGID(header.MsgID).String())
		eat_len = size
		err = nil
		return
	}

	//CheckAddPlayerConn(header.PlayerID, conn)
	CheckAddPlayerConn(header.PlayerID, header.MsgID, conn, seq)
	if header.MsgID == uint16(pb.MSGID_MsgID_Logon_Request) {
		header.Seq = seq
		m_locker.RLock()
		vv := m_conn_map.Get(header.PlayerID)
		if vv != nil {
			m_locker.RUnlock()
			logger.Notic("send player login message to player channel uid:%v seq:%v",
				header.PlayerID, seq)

			sock := vv.(*SocketInfo)
			item := new(MessageItem)
			item.Header = header
			item.BodyBytes = append(item.BodyBytes, buffer[Min_Message_Size:header.PackageLen]...)
			sock.MsgCh <- item
		} else {
			m_locker.RUnlock()
		}
	} else {
		HandleRawData(conn, header, buffer[Min_Message_Size:header.PackageLen])
	}

	eat_len = uint32(header.PackageLen)
	err = nil
	return
}

func CheckAddPlayerConn(uid uint32, msgid uint16, conn net.Conn, seq uint32) {
	if msgid != uint16(pb.MSGID_MsgID_Logon_Request) {
		return
	}
	m_locker.Lock()
	defer m_locker.Unlock()

	vv := m_conn_map.Get(uid)
	if vv == nil {
		sock := new(SocketInfo)
		sock.conn = conn
		sock.seq = seq
		sock.MsgCh = make(chan *MessageItem, 32)
		m_conn_map.Set(uid, sock)
		logger.Notic("new socket info uid:%v seq:%v", uid, seq)
		go playerRecvLoop(uid, sock)
	} else {
		//清除之前的socket conn
		old, _ := vv.(*SocketInfo)
		old.conn = conn

		logger.Notic("replace socket info uid:%v seq:%v old seq:%v", uid, seq, old.seq)

		old.seq = seq
		m_conn_map.Replace(uid, old)
	}
}

func playerRecvLoop(uid uint32, sock *SocketInfo) {
	for item := range sock.MsgCh {
		logger.Info("login-close msg queue uid:%v sock seq:%v msgid:%+v msg seq:%v", uid, sock.seq, pb.MSGID(item.Header.MsgID), item.Header.Seq)
		HandleRawData(sock.conn, item.Header, item.BodyBytes)
	}
	logger.Notic("recv routine end  uid :%v seq:%v", uid, sock.seq)
}