另客网go项目公用的代码库

client.go 2.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package hooks
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/sirupsen/logrus"
  11. "github.com/wpajqz/brpc/export"
  12. )
  13. const (
  14. timeout = 5 * time.Second
  15. retryInterval = 100 * time.Microsecond
  16. )
  17. const (
  18. notifyPath = "/v1/notify"
  19. )
  20. var (
  21. defaultClient *Client
  22. once = &sync.Once{}
  23. )
  24. type Client struct{ tcpClient *export.Client }
  25. func (c *Client) Notify(req *NotifyRequest) (*NotifyResponse, error) {
  26. if err := req.Check(); err != nil {
  27. return nil, err
  28. }
  29. param, err := json.Marshal(req)
  30. if err != nil {
  31. return nil, err
  32. }
  33. var resp NotifyResponse
  34. err = c.tcpClient.SyncSend(notifyPath, param, &RequestStatusCallback{
  35. Success: func(header, body []byte) {
  36. err = json.Unmarshal(body, &resp)
  37. },
  38. Error: func(code int, message string) {
  39. err = errors.New(message)
  40. },
  41. })
  42. if err != nil {
  43. return nil, err
  44. }
  45. return &resp, err
  46. }
  47. func NewClient(server string, port int) *Client {
  48. once.Do(func() {
  49. address := strings.Join([]string{server, strconv.Itoa(port)}, ":")
  50. c, err := export.NewClient(address, &ReadyStateCallback{})
  51. if err != nil {
  52. panic("connect link faild")
  53. }
  54. defaultClient = &Client{tcpClient: c}
  55. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  56. defer cancel()
  57. for {
  58. select {
  59. case <-time.After(retryInterval):
  60. if defaultClient.tcpClient.GetReadyState() == export.OPEN {
  61. return
  62. }
  63. case <-ctx.Done():
  64. panic("connect link timeout ...")
  65. }
  66. }
  67. })
  68. return defaultClient
  69. }
  70. func GetClient() *Client {
  71. return defaultClient
  72. }
  73. type (
  74. NotifyRequest struct {
  75. Project string `json:"project"`
  76. Maintainers []string `json:"maintainers"`
  77. Message string `json:"message"`
  78. Time string `json:"time"`
  79. }
  80. NotifyResponse struct {
  81. }
  82. )
  83. func (ar *NotifyRequest) Check() error {
  84. return nil
  85. }
  86. type (
  87. ReadyStateCallback struct{}
  88. RequestStatusCallback struct {
  89. Start func()
  90. End func()
  91. Success func(header, body []byte)
  92. Error func(code int, message string)
  93. }
  94. )
  95. func (readyStateCallback *ReadyStateCallback) OnOpen() {
  96. logrus.Info("open link socket connection")
  97. }
  98. func (readyStateCallback *ReadyStateCallback) OnClose() {
  99. logrus.Info("close link socket connection")
  100. }
  101. func (readyStateCallback *ReadyStateCallback) OnError(err string) {
  102. logrus.Infof("error:%s", err)
  103. }
  104. func (r RequestStatusCallback) OnStart() {
  105. if r.Start != nil {
  106. r.Start()
  107. }
  108. }
  109. func (r RequestStatusCallback) OnSuccess(header, body []byte) {
  110. if r.Success != nil {
  111. r.Success(header, body)
  112. }
  113. }
  114. func (r RequestStatusCallback) OnError(code int, message string) {
  115. if r.Error != nil {
  116. r.Error(code, message)
  117. }
  118. }
  119. func (r RequestStatusCallback) OnEnd() {
  120. if r.End != nil {
  121. r.End()
  122. }
  123. }