http urls monitor.

export.go 8.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. package export
  2. import (
  3. "bytes"
  4. "errors"
  5. "hash/crc32"
  6. "io"
  7. "net"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/wpajqz/linker"
  13. "github.com/wpajqz/linker/plugins"
  14. )
  15. const (
  16. MaxPayload = 1024 * 1024
  17. Version = "1.0"
  18. )
  19. const (
  20. CONNECTING = 0 // 连接还没开启
  21. OPEN = 1 // 连接已开启并准备好进行通信
  22. CLOSING = 2 // 连接正在关闭的过程中
  23. CLOSED = 3 // 连接已经关闭,或者连接无法建立
  24. )
  25. type Handler interface {
  26. Handle(header, body []byte)
  27. }
  28. type RequestStatusCallback interface {
  29. OnSuccess(header, body []byte)
  30. OnError(status int, message string)
  31. OnStart()
  32. OnEnd()
  33. }
  34. type ReadyStateCallback interface {
  35. OnOpen()
  36. OnClose()
  37. OnError(err string)
  38. }
  39. type Client struct {
  40. readyStateCallback ReadyStateCallback
  41. readyState int
  42. mutex *sync.Mutex
  43. rwMutex *sync.RWMutex
  44. timeout, retryInterval time.Duration
  45. handlerContainer sync.Map
  46. packet chan linker.Packet
  47. maxPayload int32
  48. request, response struct {
  49. Header, Body []byte
  50. }
  51. }
  52. type handlerFunc func(header, body []byte)
  53. func (f handlerFunc) Handle(header, body []byte) {
  54. f(header, body)
  55. }
  56. func NewClient(server string, port int, readyStateCallback ReadyStateCallback) *Client {
  57. c := &Client{
  58. readyState: CONNECTING,
  59. mutex: new(sync.Mutex),
  60. rwMutex: new(sync.RWMutex),
  61. retryInterval: 5 * time.Second,
  62. packet: make(chan linker.Packet, 1024),
  63. handlerContainer: sync.Map{},
  64. }
  65. if readyStateCallback != nil {
  66. c.readyStateCallback = readyStateCallback
  67. }
  68. go c.connect(server, port)
  69. return c
  70. }
  71. // 获取链接运行状态
  72. func (c *Client) GetReadyState() int {
  73. return c.readyState
  74. }
  75. // 心跳处理,客户端与服务端保持长连接
  76. func (c *Client) Ping(interval int64, param []byte, callback RequestStatusCallback) error {
  77. if callback == nil {
  78. return errors.New("callback can't be nil")
  79. }
  80. if c.readyState != OPEN {
  81. return errors.New("ping getsockopt: connection refuse")
  82. }
  83. sequence := time.Now().UnixNano()
  84. listener := int64(linker.OPERATOR_HEARTBEAT) + sequence
  85. c.handlerContainer.Store(listener, handlerFunc(func(header, body []byte) {
  86. code := c.GetResponseProperty("code")
  87. if code != "" {
  88. message := c.GetResponseProperty("message")
  89. if callback.OnError != nil {
  90. v, _ := strconv.Atoi(code)
  91. callback.OnError(v, message)
  92. }
  93. } else {
  94. if callback.OnSuccess != nil {
  95. callback.OnSuccess(header, body)
  96. }
  97. if callback.OnEnd != nil {
  98. callback.OnEnd()
  99. }
  100. }
  101. }))
  102. // 建立连接以后就发送心跳包建立会话信息,后面的定期发送
  103. p, err := linker.NewPacket(linker.OPERATOR_HEARTBEAT, sequence, c.request.Header, param, []linker.PacketPlugin{
  104. &plugins.Encryption{},
  105. })
  106. if err != nil {
  107. return err
  108. }
  109. c.packet <- p
  110. ticker := time.NewTicker(time.Duration(interval) * time.Second)
  111. for {
  112. select {
  113. case <-ticker.C:
  114. if c.readyState != OPEN {
  115. return nil
  116. }
  117. c.packet <- p
  118. }
  119. }
  120. }
  121. // 向服务端发送请求,同步处理服务端返回结果
  122. func (c *Client) SyncSend(operator string, param []byte, callback RequestStatusCallback) error {
  123. if callback == nil {
  124. return errors.New("callback can't be nil")
  125. }
  126. if c.readyState != OPEN {
  127. return errors.New("SyncSend getsockopt: connection refuse")
  128. }
  129. nType := crc32.ChecksumIEEE([]byte(operator))
  130. sequence := time.Now().UnixNano()
  131. listener := int64(nType) + sequence
  132. // 对数据请求的返回状态进行处理,同步阻塞处理机制
  133. c.mutex.Lock()
  134. quit := make(chan bool)
  135. if callback.OnStart != nil {
  136. callback.OnStart()
  137. }
  138. c.handlerContainer.Store(listener, handlerFunc(func(header, body []byte) {
  139. code := c.GetResponseProperty("code")
  140. if code != "" {
  141. message := c.GetResponseProperty("message")
  142. if callback.OnError != nil {
  143. v, _ := strconv.Atoi(code)
  144. callback.OnError(v, message)
  145. }
  146. } else {
  147. if callback.OnSuccess != nil {
  148. callback.OnSuccess(header, body)
  149. }
  150. }
  151. if callback.OnEnd != nil {
  152. callback.OnEnd()
  153. }
  154. c.handlerContainer.Delete(listener)
  155. quit <- true
  156. }))
  157. p, err := linker.NewPacket(nType, sequence, c.request.Header, param, []linker.PacketPlugin{
  158. &plugins.Encryption{},
  159. })
  160. if err != nil {
  161. return err
  162. }
  163. c.packet <- p
  164. <-quit
  165. c.mutex.Unlock()
  166. return nil
  167. }
  168. // 向服务端发送请求,异步处理服务端返回结果
  169. func (c *Client) AsyncSend(operator string, param []byte, callback RequestStatusCallback) error {
  170. if callback == nil {
  171. return errors.New("callback can't be nil")
  172. }
  173. if c.readyState != OPEN {
  174. return errors.New("AsyncSend getsockopt: connection refuse")
  175. }
  176. nType := crc32.ChecksumIEEE([]byte(operator))
  177. sequence := time.Now().UnixNano()
  178. listener := int64(nType) + sequence
  179. if callback.OnStart != nil {
  180. callback.OnStart()
  181. }
  182. c.handlerContainer.Store(listener, handlerFunc(func(header, body []byte) {
  183. code := c.GetResponseProperty("code")
  184. if code != "" {
  185. message := c.GetResponseProperty("message")
  186. if callback.OnError != nil {
  187. v, _ := strconv.Atoi(code)
  188. callback.OnError(v, message)
  189. }
  190. } else {
  191. if callback.OnSuccess != nil {
  192. callback.OnSuccess(header, body)
  193. }
  194. }
  195. if callback.OnEnd != nil {
  196. callback.OnEnd()
  197. }
  198. c.handlerContainer.Delete(listener)
  199. }))
  200. p, err := linker.NewPacket(nType, sequence, c.request.Header, param, []linker.PacketPlugin{
  201. &plugins.Encryption{},
  202. })
  203. if err != nil {
  204. return err
  205. }
  206. c.packet <- p
  207. return nil
  208. }
  209. // 设置可处理的数据包的最大长度
  210. func (c *Client) SetMaxPayload(maxPayload int32) {
  211. c.maxPayload = maxPayload
  212. }
  213. // 添加事件监听器
  214. func (c *Client) AddMessageListener(listener string, callback Handler) {
  215. c.handlerContainer.Store(int64(crc32.ChecksumIEEE([]byte(listener))), callback)
  216. }
  217. // 移除事件监听器
  218. func (c *Client) RemoveMessageListener(listener string) {
  219. c.handlerContainer.Delete(int64(crc32.ChecksumIEEE([]byte(listener))))
  220. }
  221. // 设置请求属性
  222. func (c *Client) SetRequestProperty(key, value string) {
  223. v := c.GetRequestProperty(key)
  224. if v != "" {
  225. c.request.Header = bytes.Trim(c.request.Header, key+"="+value+";")
  226. }
  227. c.request.Header = append(c.request.Header, []byte(key+"="+value+";")...)
  228. }
  229. // 获取请求属性
  230. func (c *Client) GetRequestProperty(key string) string {
  231. values := strings.Split(string(c.request.Header), ";")
  232. for _, value := range values {
  233. kv := strings.Split(value, "=")
  234. if kv[0] == key {
  235. return kv[1]
  236. }
  237. }
  238. return ""
  239. }
  240. // 获取响应属性
  241. func (c *Client) GetResponseProperty(key string) string {
  242. values := strings.Split(string(c.response.Header), ";")
  243. for _, value := range values {
  244. kv := strings.Split(value, "=")
  245. if kv[0] == key {
  246. return kv[1]
  247. }
  248. }
  249. return ""
  250. }
  251. // 设置响应属性
  252. func (c *Client) SetResponseProperty(key, value string) {
  253. v := c.GetResponseProperty(key)
  254. if v != "" {
  255. c.response.Header = bytes.Trim(c.response.Header, key+"="+value+";")
  256. }
  257. c.response.Header = append(c.response.Header, []byte(key+"="+value+";")...)
  258. }
  259. // 设置断线重连的间隔时间, 单位s
  260. func (c *Client) SetRetryInterval(interval int) {
  261. c.retryInterval = time.Duration(interval) * time.Second
  262. }
  263. // 设置服务端默认超时时间, 单位s
  264. func (c *Client) SetTimeout(timeout int) {
  265. c.timeout = time.Duration(timeout) * time.Second
  266. }
  267. func (c *Client) connect(server string, port int) {
  268. // 检测conn的状态,断线以后进行重连操作
  269. address := strings.Join([]string{server, strconv.Itoa(port)}, ":")
  270. conn, err := net.Dial("tcp", address)
  271. for {
  272. if err != nil {
  273. c.readyState = CLOSED
  274. if err == io.EOF {
  275. if c.readyStateCallback.OnClose != nil {
  276. c.readyStateCallback.OnClose()
  277. }
  278. } else {
  279. if c.readyStateCallback.OnError != nil {
  280. c.readyStateCallback.OnError(err.Error())
  281. }
  282. }
  283. time.Sleep(c.retryInterval) // 重连失败以后休息一会再干活
  284. conn, err = net.Dial("tcp", address)
  285. } else {
  286. quit := make(chan bool, 1)
  287. go func(conn net.Conn) {
  288. err = c.handleConnection(conn)
  289. if err != nil {
  290. quit <- true
  291. }
  292. }(conn)
  293. c.readyState = OPEN
  294. if c.readyStateCallback.OnOpen != nil {
  295. c.readyStateCallback.OnOpen()
  296. }
  297. <-quit
  298. }
  299. }
  300. }