http urls monitor.

tcp.go 3.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package linker
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "runtime"
  9. "time"
  10. "github.com/wpajqz/linker/utils/convert"
  11. )
  12. func (s *Server) handleTCPConnection(conn *net.TCPConn) error {
  13. ctx := &ContextTcp{common: common{Context: context.Background()}, Conn: conn}
  14. if s.constructHandler != nil {
  15. s.constructHandler.Handle(ctx)
  16. }
  17. defer func() {
  18. if s.destructHandler != nil {
  19. s.destructHandler.Handle(ctx)
  20. }
  21. conn.Close()
  22. }()
  23. if s.config.ReadBufferSize > 0 {
  24. conn.SetReadBuffer(s.config.ReadBufferSize)
  25. }
  26. if s.config.WriteBufferSize > 0 {
  27. conn.SetWriteBuffer(s.config.WriteBufferSize)
  28. }
  29. var (
  30. bType = make([]byte, 4)
  31. bSequence = make([]byte, 8)
  32. bHeaderLength = make([]byte, 4)
  33. bBodyLength = make([]byte, 4)
  34. sequence int64
  35. headerLength uint32
  36. bodyLength uint32
  37. )
  38. for {
  39. if s.config.Timeout != 0 {
  40. conn.SetDeadline(time.Now().Add(s.config.Timeout))
  41. }
  42. if _, err := io.ReadFull(conn, bType); err != nil {
  43. return err
  44. }
  45. if _, err := io.ReadFull(conn, bSequence); err != nil {
  46. return err
  47. }
  48. if _, err := io.ReadFull(conn, bHeaderLength); err != nil {
  49. return err
  50. }
  51. if _, err := io.ReadFull(conn, bBodyLength); err != nil {
  52. return err
  53. }
  54. sequence = convert.BytesToInt64(bSequence)
  55. headerLength = convert.BytesToUint32(bHeaderLength)
  56. bodyLength = convert.BytesToUint32(bBodyLength)
  57. pacLen := headerLength + bodyLength + uint32(20)
  58. if pacLen > s.config.MaxPayload {
  59. _, file, line, _ := runtime.Caller(1)
  60. return SystemError{time.Now(), file, line, "packet larger than MaxPayload"}
  61. }
  62. header := make([]byte, headerLength)
  63. if _, err := io.ReadFull(conn, header); err != nil {
  64. return err
  65. }
  66. body := make([]byte, bodyLength)
  67. if _, err := io.ReadFull(conn, body); err != nil {
  68. return err
  69. }
  70. rp, err := NewPacket(convert.BytesToUint32(bType), sequence, header, body, s.config.PluginForPacketReceiver)
  71. if err != nil {
  72. return err
  73. }
  74. ctx = NewContextTcp(ctx.Context, conn, rp.Operator, rp.Sequence, rp.Header, rp.Body, s.config)
  75. go s.handleTCPPacket(ctx, rp)
  76. }
  77. }
  78. func (s *Server) handleTCPPacket(ctx Context, rp Packet) {
  79. defer func() {
  80. if r := recover(); r != nil {
  81. if s.errorHandler != nil {
  82. buf := make([]byte, 1<<12)
  83. n := runtime.Stack(buf, false)
  84. s.errorHandler(errors.New(string(buf[:n])))
  85. }
  86. }
  87. }()
  88. if rp.Operator == OPERATOR_HEARTBEAT {
  89. if s.pingHandler != nil {
  90. s.pingHandler.Handle(ctx)
  91. }
  92. ctx.Success(nil)
  93. }
  94. handler, ok := s.router.handlerContainer[rp.Operator]
  95. if !ok {
  96. ctx.Error(StatusInternalServerError, "server don't register your request.")
  97. }
  98. if rm, ok := s.router.routerMiddleware[rp.Operator]; ok {
  99. for _, v := range rm {
  100. ctx = v.Handle(ctx)
  101. }
  102. }
  103. for _, v := range s.router.middleware {
  104. ctx = v.Handle(ctx)
  105. if tm, ok := v.(TerminateMiddleware); ok {
  106. tm.Terminate(ctx)
  107. }
  108. }
  109. handler.Handle(ctx)
  110. ctx.Success(nil) // If it don't call the function of Success or Error, deal it by default
  111. }
  112. // RunTCP 开始运行Tcp服务
  113. func (s *Server) RunTCP(name, address string) error {
  114. tcpAddr, err := net.ResolveTCPAddr(name, address)
  115. if err != nil {
  116. return err
  117. }
  118. listener, err := net.ListenTCP(name, tcpAddr)
  119. if err != nil {
  120. return err
  121. }
  122. defer listener.Close()
  123. fmt.Printf("tcp server running on %s\n", address)
  124. for {
  125. conn, err := listener.AcceptTCP()
  126. if err != nil {
  127. continue
  128. }
  129. go s.handleTCPConnection(conn)
  130. }
  131. }