http urls monitor.

pubsub.go 9.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. package redis
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/go-redis/redis/internal"
  7. "github.com/go-redis/redis/internal/pool"
  8. "github.com/go-redis/redis/internal/proto"
  9. )
  10. // PubSub implements Pub/Sub commands bas described in
  11. // http://redis.io/topics/pubsub. Message receiving is NOT safe
  12. // for concurrent use by multiple goroutines.
  13. //
  14. // PubSub automatically reconnects to Redis Server and resubscribes
  15. // to the channels in case of network errors.
  16. type PubSub struct {
  17. opt *Options
  18. newConn func([]string) (*pool.Conn, error)
  19. closeConn func(*pool.Conn) error
  20. mu sync.Mutex
  21. cn *pool.Conn
  22. channels map[string]struct{}
  23. patterns map[string]struct{}
  24. closed bool
  25. exit chan struct{}
  26. cmd *Cmd
  27. chOnce sync.Once
  28. ch chan *Message
  29. ping chan struct{}
  30. }
  31. func (c *PubSub) init() {
  32. c.exit = make(chan struct{})
  33. }
  34. func (c *PubSub) conn() (*pool.Conn, error) {
  35. c.mu.Lock()
  36. cn, err := c._conn(nil)
  37. c.mu.Unlock()
  38. return cn, err
  39. }
  40. func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) {
  41. if c.closed {
  42. return nil, pool.ErrClosed
  43. }
  44. if c.cn != nil {
  45. return c.cn, nil
  46. }
  47. channels := mapKeys(c.channels)
  48. channels = append(channels, newChannels...)
  49. cn, err := c.newConn(channels)
  50. if err != nil {
  51. return nil, err
  52. }
  53. if err := c.resubscribe(cn); err != nil {
  54. _ = c.closeConn(cn)
  55. return nil, err
  56. }
  57. c.cn = cn
  58. return cn, nil
  59. }
  60. func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error {
  61. return cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
  62. return writeCmd(wr, cmd)
  63. })
  64. }
  65. func (c *PubSub) resubscribe(cn *pool.Conn) error {
  66. var firstErr error
  67. if len(c.channels) > 0 {
  68. err := c._subscribe(cn, "subscribe", mapKeys(c.channels))
  69. if err != nil && firstErr == nil {
  70. firstErr = err
  71. }
  72. }
  73. if len(c.patterns) > 0 {
  74. err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns))
  75. if err != nil && firstErr == nil {
  76. firstErr = err
  77. }
  78. }
  79. return firstErr
  80. }
  81. func mapKeys(m map[string]struct{}) []string {
  82. s := make([]string, len(m))
  83. i := 0
  84. for k := range m {
  85. s[i] = k
  86. i++
  87. }
  88. return s
  89. }
  90. func (c *PubSub) _subscribe(
  91. cn *pool.Conn, redisCmd string, channels []string,
  92. ) error {
  93. args := make([]interface{}, 0, 1+len(channels))
  94. args = append(args, redisCmd)
  95. for _, channel := range channels {
  96. args = append(args, channel)
  97. }
  98. cmd := NewSliceCmd(args...)
  99. return c.writeCmd(cn, cmd)
  100. }
  101. func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
  102. c.mu.Lock()
  103. c._releaseConn(cn, err, allowTimeout)
  104. c.mu.Unlock()
  105. }
  106. func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
  107. if c.cn != cn {
  108. return
  109. }
  110. if internal.IsBadConn(err, allowTimeout) {
  111. c._reconnect(err)
  112. }
  113. }
  114. func (c *PubSub) _reconnect(reason error) {
  115. _ = c._closeTheCn(reason)
  116. _, _ = c._conn(nil)
  117. }
  118. func (c *PubSub) _closeTheCn(reason error) error {
  119. if c.cn == nil {
  120. return nil
  121. }
  122. if !c.closed {
  123. internal.Logf("redis: discarding bad PubSub connection: %s", reason)
  124. }
  125. err := c.closeConn(c.cn)
  126. c.cn = nil
  127. return err
  128. }
  129. func (c *PubSub) Close() error {
  130. c.mu.Lock()
  131. defer c.mu.Unlock()
  132. if c.closed {
  133. return pool.ErrClosed
  134. }
  135. c.closed = true
  136. close(c.exit)
  137. err := c._closeTheCn(pool.ErrClosed)
  138. return err
  139. }
  140. // Subscribe the client to the specified channels. It returns
  141. // empty subscription if there are no channels.
  142. func (c *PubSub) Subscribe(channels ...string) error {
  143. c.mu.Lock()
  144. defer c.mu.Unlock()
  145. err := c.subscribe("subscribe", channels...)
  146. if c.channels == nil {
  147. c.channels = make(map[string]struct{})
  148. }
  149. for _, s := range channels {
  150. c.channels[s] = struct{}{}
  151. }
  152. return err
  153. }
  154. // PSubscribe the client to the given patterns. It returns
  155. // empty subscription if there are no patterns.
  156. func (c *PubSub) PSubscribe(patterns ...string) error {
  157. c.mu.Lock()
  158. defer c.mu.Unlock()
  159. err := c.subscribe("psubscribe", patterns...)
  160. if c.patterns == nil {
  161. c.patterns = make(map[string]struct{})
  162. }
  163. for _, s := range patterns {
  164. c.patterns[s] = struct{}{}
  165. }
  166. return err
  167. }
  168. // Unsubscribe the client from the given channels, or from all of
  169. // them if none is given.
  170. func (c *PubSub) Unsubscribe(channels ...string) error {
  171. c.mu.Lock()
  172. defer c.mu.Unlock()
  173. for _, channel := range channels {
  174. delete(c.channels, channel)
  175. }
  176. err := c.subscribe("unsubscribe", channels...)
  177. return err
  178. }
  179. // PUnsubscribe the client from the given patterns, or from all of
  180. // them if none is given.
  181. func (c *PubSub) PUnsubscribe(patterns ...string) error {
  182. c.mu.Lock()
  183. defer c.mu.Unlock()
  184. for _, pattern := range patterns {
  185. delete(c.patterns, pattern)
  186. }
  187. err := c.subscribe("punsubscribe", patterns...)
  188. return err
  189. }
  190. func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
  191. cn, err := c._conn(channels)
  192. if err != nil {
  193. return err
  194. }
  195. err = c._subscribe(cn, redisCmd, channels)
  196. c._releaseConn(cn, err, false)
  197. return err
  198. }
  199. func (c *PubSub) Ping(payload ...string) error {
  200. args := []interface{}{"ping"}
  201. if len(payload) == 1 {
  202. args = append(args, payload[0])
  203. }
  204. cmd := NewCmd(args...)
  205. cn, err := c.conn()
  206. if err != nil {
  207. return err
  208. }
  209. err = c.writeCmd(cn, cmd)
  210. c.releaseConn(cn, err, false)
  211. return err
  212. }
  213. // Subscription received after a successful subscription to channel.
  214. type Subscription struct {
  215. // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
  216. Kind string
  217. // Channel name we have subscribed to.
  218. Channel string
  219. // Number of channels we are currently subscribed to.
  220. Count int
  221. }
  222. func (m *Subscription) String() string {
  223. return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
  224. }
  225. // Message received as result of a PUBLISH command issued by another client.
  226. type Message struct {
  227. Channel string
  228. Pattern string
  229. Payload string
  230. }
  231. func (m *Message) String() string {
  232. return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
  233. }
  234. // Pong received as result of a PING command issued by another client.
  235. type Pong struct {
  236. Payload string
  237. }
  238. func (p *Pong) String() string {
  239. if p.Payload != "" {
  240. return fmt.Sprintf("Pong<%s>", p.Payload)
  241. }
  242. return "Pong"
  243. }
  244. func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
  245. switch reply := reply.(type) {
  246. case string:
  247. return &Pong{
  248. Payload: reply,
  249. }, nil
  250. case []interface{}:
  251. switch kind := reply[0].(string); kind {
  252. case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
  253. return &Subscription{
  254. Kind: kind,
  255. Channel: reply[1].(string),
  256. Count: int(reply[2].(int64)),
  257. }, nil
  258. case "message":
  259. return &Message{
  260. Channel: reply[1].(string),
  261. Payload: reply[2].(string),
  262. }, nil
  263. case "pmessage":
  264. return &Message{
  265. Pattern: reply[1].(string),
  266. Channel: reply[2].(string),
  267. Payload: reply[3].(string),
  268. }, nil
  269. case "pong":
  270. return &Pong{
  271. Payload: reply[1].(string),
  272. }, nil
  273. default:
  274. return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
  275. }
  276. default:
  277. return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply)
  278. }
  279. }
  280. // ReceiveTimeout acts like Receive but returns an error if message
  281. // is not received in time. This is low-level API and in most cases
  282. // Channel should be used instead.
  283. func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
  284. if c.cmd == nil {
  285. c.cmd = NewCmd()
  286. }
  287. cn, err := c.conn()
  288. if err != nil {
  289. return nil, err
  290. }
  291. err = cn.WithReader(timeout, func(rd *proto.Reader) error {
  292. return c.cmd.readReply(rd)
  293. })
  294. c.releaseConn(cn, err, timeout > 0)
  295. if err != nil {
  296. return nil, err
  297. }
  298. return c.newMessage(c.cmd.Val())
  299. }
  300. // Receive returns a message as a Subscription, Message, Pong or error.
  301. // See PubSub example for details. This is low-level API and in most cases
  302. // Channel should be used instead.
  303. func (c *PubSub) Receive() (interface{}, error) {
  304. return c.ReceiveTimeout(0)
  305. }
  306. // ReceiveMessage returns a Message or error ignoring Subscription and Pong
  307. // messages. This is low-level API and in most cases Channel should be used
  308. // instead.
  309. func (c *PubSub) ReceiveMessage() (*Message, error) {
  310. for {
  311. msg, err := c.Receive()
  312. if err != nil {
  313. return nil, err
  314. }
  315. switch msg := msg.(type) {
  316. case *Subscription:
  317. // Ignore.
  318. case *Pong:
  319. // Ignore.
  320. case *Message:
  321. return msg, nil
  322. default:
  323. err := fmt.Errorf("redis: unknown message: %T", msg)
  324. return nil, err
  325. }
  326. }
  327. }
  328. // Channel returns a Go channel for concurrently receiving messages.
  329. // It periodically sends Ping messages to test connection health.
  330. // The channel is closed with PubSub. Receive* APIs can not be used
  331. // after channel is created.
  332. func (c *PubSub) Channel() <-chan *Message {
  333. c.chOnce.Do(c.initChannel)
  334. return c.ch
  335. }
  336. func (c *PubSub) initChannel() {
  337. c.ch = make(chan *Message, 100)
  338. c.ping = make(chan struct{}, 10)
  339. go func() {
  340. var errCount int
  341. for {
  342. msg, err := c.Receive()
  343. if err != nil {
  344. if err == pool.ErrClosed {
  345. close(c.ch)
  346. return
  347. }
  348. if errCount > 0 {
  349. time.Sleep(c.retryBackoff(errCount))
  350. }
  351. errCount++
  352. continue
  353. }
  354. errCount = 0
  355. // Any message is as good as a ping.
  356. select {
  357. case c.ping <- struct{}{}:
  358. default:
  359. }
  360. switch msg := msg.(type) {
  361. case *Subscription:
  362. // Ignore.
  363. case *Pong:
  364. // Ignore.
  365. case *Message:
  366. c.ch <- msg
  367. default:
  368. internal.Logf("redis: unknown message: %T", msg)
  369. }
  370. }
  371. }()
  372. go func() {
  373. const timeout = 5 * time.Second
  374. timer := time.NewTimer(timeout)
  375. timer.Stop()
  376. healthy := true
  377. var pingErr error
  378. for {
  379. timer.Reset(timeout)
  380. select {
  381. case <-c.ping:
  382. healthy = true
  383. if !timer.Stop() {
  384. <-timer.C
  385. }
  386. case <-timer.C:
  387. pingErr = c.Ping()
  388. if healthy {
  389. healthy = false
  390. } else {
  391. c.mu.Lock()
  392. c._reconnect(pingErr)
  393. c.mu.Unlock()
  394. }
  395. case <-c.exit:
  396. return
  397. }
  398. }
  399. }()
  400. }
  401. func (c *PubSub) retryBackoff(attempt int) time.Duration {
  402. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  403. }