http urls monitor.

sentinel.go 7.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package redis
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "net"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/go-redis/redis/internal"
  10. "github.com/go-redis/redis/internal/pool"
  11. )
  12. //------------------------------------------------------------------------------
  13. // FailoverOptions are used to configure a failover client and should
  14. // be passed to NewFailoverClient.
  15. type FailoverOptions struct {
  16. // The master name.
  17. MasterName string
  18. // A seed list of host:port addresses of sentinel nodes.
  19. SentinelAddrs []string
  20. // Following options are copied from Options struct.
  21. OnConnect func(*Conn) error
  22. Password string
  23. DB int
  24. MaxRetries int
  25. MinRetryBackoff time.Duration
  26. MaxRetryBackoff time.Duration
  27. DialTimeout time.Duration
  28. ReadTimeout time.Duration
  29. WriteTimeout time.Duration
  30. PoolSize int
  31. MinIdleConns int
  32. MaxConnAge time.Duration
  33. PoolTimeout time.Duration
  34. IdleTimeout time.Duration
  35. IdleCheckFrequency time.Duration
  36. TLSConfig *tls.Config
  37. }
  38. func (opt *FailoverOptions) options() *Options {
  39. return &Options{
  40. Addr: "FailoverClient",
  41. OnConnect: opt.OnConnect,
  42. DB: opt.DB,
  43. Password: opt.Password,
  44. MaxRetries: opt.MaxRetries,
  45. DialTimeout: opt.DialTimeout,
  46. ReadTimeout: opt.ReadTimeout,
  47. WriteTimeout: opt.WriteTimeout,
  48. PoolSize: opt.PoolSize,
  49. PoolTimeout: opt.PoolTimeout,
  50. IdleTimeout: opt.IdleTimeout,
  51. IdleCheckFrequency: opt.IdleCheckFrequency,
  52. TLSConfig: opt.TLSConfig,
  53. }
  54. }
  55. // NewFailoverClient returns a Redis client that uses Redis Sentinel
  56. // for automatic failover. It's safe for concurrent use by multiple
  57. // goroutines.
  58. func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
  59. opt := failoverOpt.options()
  60. opt.init()
  61. failover := &sentinelFailover{
  62. masterName: failoverOpt.MasterName,
  63. sentinelAddrs: failoverOpt.SentinelAddrs,
  64. opt: opt,
  65. }
  66. c := Client{
  67. baseClient: baseClient{
  68. opt: opt,
  69. connPool: failover.Pool(),
  70. onClose: func() error {
  71. return failover.Close()
  72. },
  73. },
  74. }
  75. c.baseClient.init()
  76. c.cmdable.setProcessor(c.Process)
  77. return &c
  78. }
  79. //------------------------------------------------------------------------------
  80. type SentinelClient struct {
  81. baseClient
  82. }
  83. func NewSentinelClient(opt *Options) *SentinelClient {
  84. opt.init()
  85. c := &SentinelClient{
  86. baseClient: baseClient{
  87. opt: opt,
  88. connPool: newConnPool(opt),
  89. },
  90. }
  91. c.baseClient.init()
  92. return c
  93. }
  94. func (c *SentinelClient) PubSub() *PubSub {
  95. pubsub := &PubSub{
  96. opt: c.opt,
  97. newConn: func(channels []string) (*pool.Conn, error) {
  98. return c.newConn()
  99. },
  100. closeConn: c.connPool.CloseConn,
  101. }
  102. pubsub.init()
  103. return pubsub
  104. }
  105. func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
  106. cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
  107. c.Process(cmd)
  108. return cmd
  109. }
  110. func (c *SentinelClient) Sentinels(name string) *SliceCmd {
  111. cmd := NewSliceCmd("SENTINEL", "sentinels", name)
  112. c.Process(cmd)
  113. return cmd
  114. }
  115. type sentinelFailover struct {
  116. sentinelAddrs []string
  117. opt *Options
  118. pool *pool.ConnPool
  119. poolOnce sync.Once
  120. mu sync.RWMutex
  121. masterName string
  122. _masterAddr string
  123. sentinel *SentinelClient
  124. }
  125. func (d *sentinelFailover) Close() error {
  126. return d.resetSentinel()
  127. }
  128. func (d *sentinelFailover) Pool() *pool.ConnPool {
  129. d.poolOnce.Do(func() {
  130. d.opt.Dialer = d.dial
  131. d.pool = newConnPool(d.opt)
  132. })
  133. return d.pool
  134. }
  135. func (d *sentinelFailover) dial() (net.Conn, error) {
  136. addr, err := d.MasterAddr()
  137. if err != nil {
  138. return nil, err
  139. }
  140. return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
  141. }
  142. func (d *sentinelFailover) MasterAddr() (string, error) {
  143. d.mu.Lock()
  144. defer d.mu.Unlock()
  145. addr, err := d.masterAddr()
  146. if err != nil {
  147. return "", err
  148. }
  149. d._switchMaster(addr)
  150. return addr, nil
  151. }
  152. func (d *sentinelFailover) masterAddr() (string, error) {
  153. // Try last working sentinel.
  154. if d.sentinel != nil {
  155. addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
  156. if err == nil {
  157. addr := net.JoinHostPort(addr[0], addr[1])
  158. return addr, nil
  159. }
  160. internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
  161. d.masterName, err)
  162. d._resetSentinel()
  163. }
  164. for i, sentinelAddr := range d.sentinelAddrs {
  165. sentinel := NewSentinelClient(&Options{
  166. Addr: sentinelAddr,
  167. DialTimeout: d.opt.DialTimeout,
  168. ReadTimeout: d.opt.ReadTimeout,
  169. WriteTimeout: d.opt.WriteTimeout,
  170. PoolSize: d.opt.PoolSize,
  171. PoolTimeout: d.opt.PoolTimeout,
  172. IdleTimeout: d.opt.IdleTimeout,
  173. })
  174. masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
  175. if err != nil {
  176. internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
  177. d.masterName, err)
  178. sentinel.Close()
  179. continue
  180. }
  181. // Push working sentinel to the top.
  182. d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
  183. d.setSentinel(sentinel)
  184. addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
  185. return addr, nil
  186. }
  187. return "", errors.New("redis: all sentinels are unreachable")
  188. }
  189. func (c *sentinelFailover) switchMaster(addr string) {
  190. c.mu.Lock()
  191. c._switchMaster(addr)
  192. c.mu.Unlock()
  193. }
  194. func (c *sentinelFailover) _switchMaster(addr string) {
  195. if c._masterAddr == addr {
  196. return
  197. }
  198. internal.Logf("sentinel: new master=%q addr=%q",
  199. c.masterName, addr)
  200. _ = c.Pool().Filter(func(cn *pool.Conn) bool {
  201. return cn.RemoteAddr().String() != addr
  202. })
  203. c._masterAddr = addr
  204. }
  205. func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) {
  206. d.discoverSentinels(sentinel)
  207. d.sentinel = sentinel
  208. go d.listen(sentinel)
  209. }
  210. func (d *sentinelFailover) resetSentinel() error {
  211. var err error
  212. d.mu.Lock()
  213. if d.sentinel != nil {
  214. err = d._resetSentinel()
  215. }
  216. d.mu.Unlock()
  217. return err
  218. }
  219. func (d *sentinelFailover) _resetSentinel() error {
  220. err := d.sentinel.Close()
  221. d.sentinel = nil
  222. return err
  223. }
  224. func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
  225. sentinels, err := sentinel.Sentinels(d.masterName).Result()
  226. if err != nil {
  227. internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err)
  228. return
  229. }
  230. for _, sentinel := range sentinels {
  231. vals := sentinel.([]interface{})
  232. for i := 0; i < len(vals); i += 2 {
  233. key := vals[i].(string)
  234. if key == "name" {
  235. sentinelAddr := vals[i+1].(string)
  236. if !contains(d.sentinelAddrs, sentinelAddr) {
  237. internal.Logf(
  238. "sentinel: discovered new sentinel=%q for master=%q",
  239. sentinelAddr, d.masterName,
  240. )
  241. d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
  242. }
  243. }
  244. }
  245. }
  246. }
  247. func (d *sentinelFailover) listen(sentinel *SentinelClient) {
  248. pubsub := sentinel.PubSub()
  249. defer pubsub.Close()
  250. err := pubsub.Subscribe("+switch-master")
  251. if err != nil {
  252. internal.Logf("sentinel: Subscribe failed: %s", err)
  253. d.resetSentinel()
  254. return
  255. }
  256. for {
  257. msg, err := pubsub.ReceiveMessage()
  258. if err != nil {
  259. if err == pool.ErrClosed {
  260. d.resetSentinel()
  261. return
  262. }
  263. internal.Logf("sentinel: ReceiveMessage failed: %s", err)
  264. continue
  265. }
  266. switch msg.Channel {
  267. case "+switch-master":
  268. parts := strings.Split(msg.Payload, " ")
  269. if parts[0] != d.masterName {
  270. internal.Logf("sentinel: ignore addr for master=%q", parts[0])
  271. continue
  272. }
  273. addr := net.JoinHostPort(parts[3], parts[4])
  274. d.switchMaster(addr)
  275. }
  276. }
  277. }
  278. func contains(slice []string, str string) bool {
  279. for _, s := range slice {
  280. if s == str {
  281. return true
  282. }
  283. }
  284. return false
  285. }