另客网go项目公用的代码库

connection.go 2.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package export
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net"
  7. "strconv"
  8. "time"
  9. "github.com/wpajqz/linker"
  10. "github.com/wpajqz/linker/plugins"
  11. "github.com/wpajqz/linker/utils/convert"
  12. )
  13. // 处理客户端连接
  14. func (c *Client) handleConnection(conn net.Conn) (err error) {
  15. ctx, cancel := context.WithCancel(context.Background())
  16. defer func(cancel context.CancelFunc) { cancel() }(cancel)
  17. q := make(chan bool, 2)
  18. go func(conn net.Conn) {
  19. err = c.handleSendPackets(ctx, conn)
  20. if err != nil {
  21. q <- true
  22. }
  23. }(conn)
  24. go func(conn net.Conn) {
  25. err = c.handleReceivedPackets(conn)
  26. if err != nil {
  27. q <- true
  28. }
  29. }(conn)
  30. <-q
  31. return
  32. }
  33. // 对发送的数据包进行处理
  34. func (c *Client) handleSendPackets(ctx context.Context, conn net.Conn) error {
  35. for {
  36. select {
  37. case p := <-c.packet:
  38. _, err := conn.Write(p.Bytes())
  39. if err != nil {
  40. return err
  41. }
  42. if c.timeout != 0 {
  43. conn.SetWriteDeadline(time.Now().Add(c.timeout))
  44. }
  45. case <-ctx.Done():
  46. return nil
  47. }
  48. }
  49. }
  50. // 对接收到的数据包进行处理
  51. func (c *Client) handleReceivedPackets(conn net.Conn) error {
  52. var (
  53. bType = make([]byte, 4)
  54. bSequence = make([]byte, 8)
  55. bHeaderLength = make([]byte, 4)
  56. bBodyLength = make([]byte, 4)
  57. sequence int64
  58. headerLength uint32
  59. bodyLength uint32
  60. pacLen uint32
  61. )
  62. for {
  63. if c.timeout != 0 {
  64. conn.SetReadDeadline(time.Now().Add(c.timeout))
  65. }
  66. if n, err := io.ReadFull(conn, bType); err != nil && n != 4 {
  67. return err
  68. }
  69. if n, err := io.ReadFull(conn, bSequence); err != nil && n != 8 {
  70. return err
  71. }
  72. if n, err := io.ReadFull(conn, bHeaderLength); err != nil && n != 4 {
  73. return err
  74. }
  75. if n, err := io.ReadFull(conn, bBodyLength); err != nil && n != 4 {
  76. return err
  77. }
  78. nType := convert.BytesToUint32(bType)
  79. sequence = convert.BytesToInt64(bSequence)
  80. headerLength = convert.BytesToUint32(bHeaderLength)
  81. bodyLength = convert.BytesToUint32(bBodyLength)
  82. pacLen = headerLength + bodyLength + 20
  83. if pacLen > MaxPayload {
  84. return fmt.Errorf("the packet is big than %v", strconv.Itoa(MaxPayload))
  85. }
  86. header := make([]byte, headerLength)
  87. if n, err := io.ReadFull(conn, header); err != nil && n != int(headerLength) {
  88. return err
  89. }
  90. body := make([]byte, bodyLength)
  91. if n, err := io.ReadFull(conn, body); err != nil && n != int(bodyLength) {
  92. return err
  93. }
  94. receive, err := linker.NewPacket(nType, sequence, header, body, []linker.PacketPlugin{
  95. &plugins.Decryption{},
  96. })
  97. if err != nil {
  98. return err
  99. }
  100. c.response.Header = receive.Header
  101. c.response.Body = receive.Body
  102. operator := int64(nType) + sequence
  103. if handler, ok := c.handlerContainer.Load(operator); ok {
  104. if v, ok := handler.(Handler); ok {
  105. v.Handle(receive.Header, receive.Body)
  106. }
  107. }
  108. }
  109. }