package network import ( "World/conf" pb "World/pb" "bytes" "common/beegomap" "common/logger" "encoding/binary" "errors" "io" "net" "sync" "time" _ "time" "github.com/golang/protobuf/proto" ) const ( SOCKET_RECV_BUFFER_LEN = 4096 Min_Message_Size = 16 Max_Message_Size = 4096 ) type MessageItem struct { Header pb.MessageHeader BodyBytes []byte } type SocketInfo struct { conn net.Conn seq uint32 MsgCh chan *MessageItem expire int64 } var ( m_conn_map *beegomap.BeeMap // = make(map[uint32]SocketInfo) //playerid -- socket conn m_locker *sync.RWMutex ) func init() { m_conn_map = beegomap.NewBeeMap() m_locker = new(sync.RWMutex) } func CheckErr(err error) { if err != nil { panic(err) } } func StartRecycle() { for { m_locker.Lock() items := m_conn_map.Items() now := time.Now().Unix() for uid, vv := range items { sock := vv.(*SocketInfo) if sock.expire > 0 && now > sock.expire { close(sock.MsgCh) m_conn_map.Delete(uid) logger.Info("recycle socket seq:%v uid:%v", sock.seq, uid) break } } m_locker.Unlock() time.Sleep(time.Second * 20) } } func StartSocketServer() { go StartRecycle() // log.Println("StartSocketServer") addr := conf.GetSocketListenAddr() netListen, err := net.Listen("tcp", addr) CheckErr(err) defer netListen.Close() logger.Info("listen ok on :%v", addr) var seq uint32 for { conn, err := netListen.Accept() if err != nil { continue } seq++ logger.Info("got tcp connection from %v seq:%v", conn.RemoteAddr().String(), seq) go handleConnection(seq, conn) } } func handleConnection(seq uint32, conn net.Conn) { //todo: buffer长度参数化 var buffer_len uint32 var conn_uid uint32 conn_uid = 0 buffer := make([]byte, 0, 4096) buffer_len = 0 tmp := make([]byte, 1024) defer conn.Close() var end pb.NoticeConnectionClose do_end := false for { n, err := conn.Read(tmp) if err != nil { if err != io.EOF { end.Reason = 3 logger.Notic("read socket error:%v uid:%v seq:%v", err, conn_uid, seq) } else { logger.Info("client closed connection uid:%v seq:%v", conn_uid, seq) //todo: 通知到逻辑中? end.Reason = 1 } do_end = true break } buffer = append(buffer, tmp[:n]...) buffer_len += uint32(n) for buffer_len >= uint32(Min_Message_Size) { el, err, uid := handleBuffer(conn, buffer, buffer_len, seq) if err != nil { logger.Info("handler buffer error:%v", err) break } else { buffer_len -= el buffer = buffer[el:] } if conn_uid == 0 { conn_uid = uid //first } } } if do_end { if buffer_len > 0 { //clear rest imcomplete data buffer_len = 0 buffer = buffer[:0] } if conn_uid > 0 { var header pb.MessageHeader header.PlayerID = conn_uid header.Seq = seq header.MsgID = uint16(pb.MSGID_MsgID_ConnClose_Notice) tmp, _ := proto.Marshal(&end) item := new(MessageItem) item.Header = header item.BodyBytes = append(item.BodyBytes, tmp...) m_locker.RLock() vv := m_conn_map.Get(conn_uid) if vv != nil { sock := vv.(*SocketInfo) sock.MsgCh <- item m_locker.RUnlock() logger.Notic("send player close message to player channel uid:%v seq:%v", conn_uid, seq) } else { m_locker.RUnlock() } //HandleRawData(conn, header, tmp) } //check end destroy m_locker.Lock() vv := m_conn_map.Get(conn_uid) if vv != nil { sock := vv.(*SocketInfo) if sock.seq == seq { sock.expire = time.Now().Unix() + 30 //close(sock.MsgCh) //m_conn_map.Delete(conn_uid) m_locker.Unlock() logger.Notic("destroy player socket info uid:%v seq:%v", conn_uid, seq) } else { m_locker.Unlock() logger.Notic("skip destroy player socket info uid:%v seq:%v conn seq:%v", conn_uid, seq, sock.seq) } } else { m_locker.Unlock() } } } func handleBuffer(conn net.Conn, buffer []byte, size uint32, seq uint32) (eat_len uint32, err error, uid uint32) { var header pb.MessageHeader rd := bytes.NewReader(buffer[0:Min_Message_Size]) binary.Read(rd, binary.BigEndian, &header) uid = header.PlayerID if header.PackageLen > Max_Message_Size { logger.Info(" got invalid message size:%v", header.PackageLen) //todo: reset connection or skip this message? eat_len = size err = nil return } if uint32(header.PackageLen) > size { eat_len = 0 err = errors.New("not enough") return } if MsgIDExist(header.MsgID) == false { logger.Info("message id:%v %v not register", header.MsgID, pb.MSGID(header.MsgID).String()) eat_len = size err = nil return } //CheckAddPlayerConn(header.PlayerID, conn) CheckAddPlayerConn(header.PlayerID, header.MsgID, conn, seq) if header.MsgID == uint16(pb.MSGID_MsgID_Logon_Request) { header.Seq = seq m_locker.RLock() vv := m_conn_map.Get(header.PlayerID) if vv != nil { m_locker.RUnlock() logger.Notic("send player login message to player channel uid:%v seq:%v", header.PlayerID, seq) sock := vv.(*SocketInfo) item := new(MessageItem) item.Header = header item.BodyBytes = append(item.BodyBytes, buffer[Min_Message_Size:header.PackageLen]...) sock.MsgCh <- item } else { m_locker.RUnlock() } } else { HandleRawData(conn, header, buffer[Min_Message_Size:header.PackageLen]) } eat_len = uint32(header.PackageLen) err = nil return } func CheckAddPlayerConn(uid uint32, msgid uint16, conn net.Conn, seq uint32) { if msgid != uint16(pb.MSGID_MsgID_Logon_Request) { return } m_locker.Lock() defer m_locker.Unlock() vv := m_conn_map.Get(uid) if vv == nil { sock := new(SocketInfo) sock.conn = conn sock.seq = seq sock.MsgCh = make(chan *MessageItem, 32) m_conn_map.Set(uid, sock) logger.Notic("new socket info uid:%v seq:%v", uid, seq) go playerRecvLoop(uid, sock) } else { //清除之前的socket conn old, _ := vv.(*SocketInfo) old.conn = conn logger.Notic("replace socket info uid:%v seq:%v old seq:%v", uid, seq, old.seq) old.seq = seq m_conn_map.Replace(uid, old) } } func playerRecvLoop(uid uint32, sock *SocketInfo) { for item := range sock.MsgCh { logger.Info("login-close msg queue uid:%v sock seq:%v msgid:%+v msg seq:%v", uid, sock.seq, pb.MSGID(item.Header.MsgID), item.Header.Seq) HandleRawData(sock.conn, item.Header, item.BodyBytes) } logger.Notic("recv routine end uid :%v seq:%v", uid, sock.seq) }