http urls monitor.

cluster.go 32KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635
  1. package redis
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "math"
  8. "math/rand"
  9. "net"
  10. "runtime"
  11. "sort"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "github.com/go-redis/redis/internal"
  16. "github.com/go-redis/redis/internal/hashtag"
  17. "github.com/go-redis/redis/internal/pool"
  18. "github.com/go-redis/redis/internal/proto"
  19. "github.com/go-redis/redis/internal/singleflight"
  20. )
  21. var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
  22. // ClusterOptions are used to configure a cluster client and should be
  23. // passed to NewClusterClient.
  24. type ClusterOptions struct {
  25. // A seed list of host:port addresses of cluster nodes.
  26. Addrs []string
  27. // The maximum number of retries before giving up. Command is retried
  28. // on network errors and MOVED/ASK redirects.
  29. // Default is 8 retries.
  30. MaxRedirects int
  31. // Enables read-only commands on slave nodes.
  32. ReadOnly bool
  33. // Allows routing read-only commands to the closest master or slave node.
  34. // It automatically enables ReadOnly.
  35. RouteByLatency bool
  36. // Allows routing read-only commands to the random master or slave node.
  37. // It automatically enables ReadOnly.
  38. RouteRandomly bool
  39. // Optional function that returns cluster slots information.
  40. // It is useful to manually create cluster of standalone Redis servers
  41. // and load-balance read/write operations between master and slaves.
  42. // It can use service like ZooKeeper to maintain configuration information
  43. // and Cluster.ReloadState to manually trigger state reloading.
  44. ClusterSlots func() ([]ClusterSlot, error)
  45. // Following options are copied from Options struct.
  46. OnConnect func(*Conn) error
  47. Password string
  48. MaxRetries int
  49. MinRetryBackoff time.Duration
  50. MaxRetryBackoff time.Duration
  51. DialTimeout time.Duration
  52. ReadTimeout time.Duration
  53. WriteTimeout time.Duration
  54. // PoolSize applies per cluster node and not for the whole cluster.
  55. PoolSize int
  56. MinIdleConns int
  57. MaxConnAge time.Duration
  58. PoolTimeout time.Duration
  59. IdleTimeout time.Duration
  60. IdleCheckFrequency time.Duration
  61. TLSConfig *tls.Config
  62. }
  63. func (opt *ClusterOptions) init() {
  64. if opt.MaxRedirects == -1 {
  65. opt.MaxRedirects = 0
  66. } else if opt.MaxRedirects == 0 {
  67. opt.MaxRedirects = 8
  68. }
  69. if opt.RouteByLatency || opt.RouteRandomly {
  70. opt.ReadOnly = true
  71. }
  72. if opt.PoolSize == 0 {
  73. opt.PoolSize = 5 * runtime.NumCPU()
  74. }
  75. switch opt.ReadTimeout {
  76. case -1:
  77. opt.ReadTimeout = 0
  78. case 0:
  79. opt.ReadTimeout = 3 * time.Second
  80. }
  81. switch opt.WriteTimeout {
  82. case -1:
  83. opt.WriteTimeout = 0
  84. case 0:
  85. opt.WriteTimeout = opt.ReadTimeout
  86. }
  87. switch opt.MinRetryBackoff {
  88. case -1:
  89. opt.MinRetryBackoff = 0
  90. case 0:
  91. opt.MinRetryBackoff = 8 * time.Millisecond
  92. }
  93. switch opt.MaxRetryBackoff {
  94. case -1:
  95. opt.MaxRetryBackoff = 0
  96. case 0:
  97. opt.MaxRetryBackoff = 512 * time.Millisecond
  98. }
  99. }
  100. func (opt *ClusterOptions) clientOptions() *Options {
  101. const disableIdleCheck = -1
  102. return &Options{
  103. OnConnect: opt.OnConnect,
  104. MaxRetries: opt.MaxRetries,
  105. MinRetryBackoff: opt.MinRetryBackoff,
  106. MaxRetryBackoff: opt.MaxRetryBackoff,
  107. Password: opt.Password,
  108. readOnly: opt.ReadOnly,
  109. DialTimeout: opt.DialTimeout,
  110. ReadTimeout: opt.ReadTimeout,
  111. WriteTimeout: opt.WriteTimeout,
  112. PoolSize: opt.PoolSize,
  113. MinIdleConns: opt.MinIdleConns,
  114. MaxConnAge: opt.MaxConnAge,
  115. PoolTimeout: opt.PoolTimeout,
  116. IdleTimeout: opt.IdleTimeout,
  117. IdleCheckFrequency: disableIdleCheck,
  118. TLSConfig: opt.TLSConfig,
  119. }
  120. }
  121. //------------------------------------------------------------------------------
  122. type clusterNode struct {
  123. Client *Client
  124. latency uint32 // atomic
  125. generation uint32 // atomic
  126. loading uint32 // atomic
  127. }
  128. func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
  129. opt := clOpt.clientOptions()
  130. opt.Addr = addr
  131. node := clusterNode{
  132. Client: NewClient(opt),
  133. }
  134. node.latency = math.MaxUint32
  135. if clOpt.RouteByLatency {
  136. go node.updateLatency()
  137. }
  138. return &node
  139. }
  140. func (n *clusterNode) String() string {
  141. return n.Client.String()
  142. }
  143. func (n *clusterNode) Close() error {
  144. return n.Client.Close()
  145. }
  146. func (n *clusterNode) updateLatency() {
  147. const probes = 10
  148. var latency uint32
  149. for i := 0; i < probes; i++ {
  150. start := time.Now()
  151. n.Client.Ping()
  152. probe := uint32(time.Since(start) / time.Microsecond)
  153. latency = (latency + probe) / 2
  154. }
  155. atomic.StoreUint32(&n.latency, latency)
  156. }
  157. func (n *clusterNode) Latency() time.Duration {
  158. latency := atomic.LoadUint32(&n.latency)
  159. return time.Duration(latency) * time.Microsecond
  160. }
  161. func (n *clusterNode) MarkAsLoading() {
  162. atomic.StoreUint32(&n.loading, uint32(time.Now().Unix()))
  163. }
  164. func (n *clusterNode) Loading() bool {
  165. const minute = int64(time.Minute / time.Second)
  166. loading := atomic.LoadUint32(&n.loading)
  167. if loading == 0 {
  168. return false
  169. }
  170. if time.Now().Unix()-int64(loading) < minute {
  171. return true
  172. }
  173. atomic.StoreUint32(&n.loading, 0)
  174. return false
  175. }
  176. func (n *clusterNode) Generation() uint32 {
  177. return atomic.LoadUint32(&n.generation)
  178. }
  179. func (n *clusterNode) SetGeneration(gen uint32) {
  180. for {
  181. v := atomic.LoadUint32(&n.generation)
  182. if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
  183. break
  184. }
  185. }
  186. }
  187. //------------------------------------------------------------------------------
  188. type clusterNodes struct {
  189. opt *ClusterOptions
  190. mu sync.RWMutex
  191. allAddrs []string
  192. allNodes map[string]*clusterNode
  193. clusterAddrs []string
  194. closed bool
  195. nodeCreateGroup singleflight.Group
  196. _generation uint32 // atomic
  197. }
  198. func newClusterNodes(opt *ClusterOptions) *clusterNodes {
  199. return &clusterNodes{
  200. opt: opt,
  201. allAddrs: opt.Addrs,
  202. allNodes: make(map[string]*clusterNode),
  203. }
  204. }
  205. func (c *clusterNodes) Close() error {
  206. c.mu.Lock()
  207. defer c.mu.Unlock()
  208. if c.closed {
  209. return nil
  210. }
  211. c.closed = true
  212. var firstErr error
  213. for _, node := range c.allNodes {
  214. if err := node.Client.Close(); err != nil && firstErr == nil {
  215. firstErr = err
  216. }
  217. }
  218. c.allNodes = nil
  219. c.clusterAddrs = nil
  220. return firstErr
  221. }
  222. func (c *clusterNodes) Addrs() ([]string, error) {
  223. var addrs []string
  224. c.mu.RLock()
  225. closed := c.closed
  226. if !closed {
  227. if len(c.clusterAddrs) > 0 {
  228. addrs = c.clusterAddrs
  229. } else {
  230. addrs = c.allAddrs
  231. }
  232. }
  233. c.mu.RUnlock()
  234. if closed {
  235. return nil, pool.ErrClosed
  236. }
  237. if len(addrs) == 0 {
  238. return nil, errClusterNoNodes
  239. }
  240. return addrs, nil
  241. }
  242. func (c *clusterNodes) NextGeneration() uint32 {
  243. return atomic.AddUint32(&c._generation, 1)
  244. }
  245. // GC removes unused nodes.
  246. func (c *clusterNodes) GC(generation uint32) {
  247. var collected []*clusterNode
  248. c.mu.Lock()
  249. for addr, node := range c.allNodes {
  250. if node.Generation() >= generation {
  251. continue
  252. }
  253. c.clusterAddrs = remove(c.clusterAddrs, addr)
  254. delete(c.allNodes, addr)
  255. collected = append(collected, node)
  256. }
  257. c.mu.Unlock()
  258. for _, node := range collected {
  259. _ = node.Client.Close()
  260. }
  261. }
  262. func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
  263. var node *clusterNode
  264. var err error
  265. c.mu.RLock()
  266. if c.closed {
  267. err = pool.ErrClosed
  268. } else {
  269. node = c.allNodes[addr]
  270. }
  271. c.mu.RUnlock()
  272. return node, err
  273. }
  274. func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
  275. node, err := c.Get(addr)
  276. if err != nil {
  277. return nil, err
  278. }
  279. if node != nil {
  280. return node, nil
  281. }
  282. v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
  283. node := newClusterNode(c.opt, addr)
  284. return node, nil
  285. })
  286. c.mu.Lock()
  287. defer c.mu.Unlock()
  288. if c.closed {
  289. return nil, pool.ErrClosed
  290. }
  291. node, ok := c.allNodes[addr]
  292. if ok {
  293. _ = v.(*clusterNode).Close()
  294. return node, err
  295. }
  296. node = v.(*clusterNode)
  297. c.allAddrs = appendIfNotExists(c.allAddrs, addr)
  298. if err == nil {
  299. c.clusterAddrs = append(c.clusterAddrs, addr)
  300. }
  301. c.allNodes[addr] = node
  302. return node, err
  303. }
  304. func (c *clusterNodes) All() ([]*clusterNode, error) {
  305. c.mu.RLock()
  306. defer c.mu.RUnlock()
  307. if c.closed {
  308. return nil, pool.ErrClosed
  309. }
  310. cp := make([]*clusterNode, 0, len(c.allNodes))
  311. for _, node := range c.allNodes {
  312. cp = append(cp, node)
  313. }
  314. return cp, nil
  315. }
  316. func (c *clusterNodes) Random() (*clusterNode, error) {
  317. addrs, err := c.Addrs()
  318. if err != nil {
  319. return nil, err
  320. }
  321. n := rand.Intn(len(addrs))
  322. return c.GetOrCreate(addrs[n])
  323. }
  324. //------------------------------------------------------------------------------
  325. type clusterSlot struct {
  326. start, end int
  327. nodes []*clusterNode
  328. }
  329. type clusterSlotSlice []*clusterSlot
  330. func (p clusterSlotSlice) Len() int {
  331. return len(p)
  332. }
  333. func (p clusterSlotSlice) Less(i, j int) bool {
  334. return p[i].start < p[j].start
  335. }
  336. func (p clusterSlotSlice) Swap(i, j int) {
  337. p[i], p[j] = p[j], p[i]
  338. }
  339. type clusterState struct {
  340. nodes *clusterNodes
  341. Masters []*clusterNode
  342. Slaves []*clusterNode
  343. slots []*clusterSlot
  344. generation uint32
  345. createdAt time.Time
  346. }
  347. func newClusterState(
  348. nodes *clusterNodes, slots []ClusterSlot, origin string,
  349. ) (*clusterState, error) {
  350. c := clusterState{
  351. nodes: nodes,
  352. slots: make([]*clusterSlot, 0, len(slots)),
  353. generation: nodes.NextGeneration(),
  354. createdAt: time.Now(),
  355. }
  356. originHost, _, _ := net.SplitHostPort(origin)
  357. isLoopbackOrigin := isLoopback(originHost)
  358. for _, slot := range slots {
  359. var nodes []*clusterNode
  360. for i, slotNode := range slot.Nodes {
  361. addr := slotNode.Addr
  362. if !isLoopbackOrigin {
  363. addr = replaceLoopbackHost(addr, originHost)
  364. }
  365. node, err := c.nodes.GetOrCreate(addr)
  366. if err != nil {
  367. return nil, err
  368. }
  369. node.SetGeneration(c.generation)
  370. nodes = append(nodes, node)
  371. if i == 0 {
  372. c.Masters = appendUniqueNode(c.Masters, node)
  373. } else {
  374. c.Slaves = appendUniqueNode(c.Slaves, node)
  375. }
  376. }
  377. c.slots = append(c.slots, &clusterSlot{
  378. start: slot.Start,
  379. end: slot.End,
  380. nodes: nodes,
  381. })
  382. }
  383. sort.Sort(clusterSlotSlice(c.slots))
  384. time.AfterFunc(time.Minute, func() {
  385. nodes.GC(c.generation)
  386. })
  387. return &c, nil
  388. }
  389. func replaceLoopbackHost(nodeAddr, originHost string) string {
  390. nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
  391. if err != nil {
  392. return nodeAddr
  393. }
  394. nodeIP := net.ParseIP(nodeHost)
  395. if nodeIP == nil {
  396. return nodeAddr
  397. }
  398. if !nodeIP.IsLoopback() {
  399. return nodeAddr
  400. }
  401. // Use origin host which is not loopback and node port.
  402. return net.JoinHostPort(originHost, nodePort)
  403. }
  404. func isLoopback(host string) bool {
  405. ip := net.ParseIP(host)
  406. if ip == nil {
  407. return true
  408. }
  409. return ip.IsLoopback()
  410. }
  411. func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
  412. nodes := c.slotNodes(slot)
  413. if len(nodes) > 0 {
  414. return nodes[0], nil
  415. }
  416. return c.nodes.Random()
  417. }
  418. func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
  419. nodes := c.slotNodes(slot)
  420. switch len(nodes) {
  421. case 0:
  422. return c.nodes.Random()
  423. case 1:
  424. return nodes[0], nil
  425. case 2:
  426. if slave := nodes[1]; !slave.Loading() {
  427. return slave, nil
  428. }
  429. return nodes[0], nil
  430. default:
  431. var slave *clusterNode
  432. for i := 0; i < 10; i++ {
  433. n := rand.Intn(len(nodes)-1) + 1
  434. slave = nodes[n]
  435. if !slave.Loading() {
  436. break
  437. }
  438. }
  439. return slave, nil
  440. }
  441. }
  442. func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
  443. const threshold = time.Millisecond
  444. nodes := c.slotNodes(slot)
  445. if len(nodes) == 0 {
  446. return c.nodes.Random()
  447. }
  448. var node *clusterNode
  449. for _, n := range nodes {
  450. if n.Loading() {
  451. continue
  452. }
  453. if node == nil || node.Latency()-n.Latency() > threshold {
  454. node = n
  455. }
  456. }
  457. return node, nil
  458. }
  459. func (c *clusterState) slotRandomNode(slot int) *clusterNode {
  460. nodes := c.slotNodes(slot)
  461. n := rand.Intn(len(nodes))
  462. return nodes[n]
  463. }
  464. func (c *clusterState) slotNodes(slot int) []*clusterNode {
  465. i := sort.Search(len(c.slots), func(i int) bool {
  466. return c.slots[i].end >= slot
  467. })
  468. if i >= len(c.slots) {
  469. return nil
  470. }
  471. x := c.slots[i]
  472. if slot >= x.start && slot <= x.end {
  473. return x.nodes
  474. }
  475. return nil
  476. }
  477. func (c *clusterState) IsConsistent() bool {
  478. if c.nodes.opt.ClusterSlots != nil {
  479. return true
  480. }
  481. return len(c.Masters) <= len(c.Slaves)
  482. }
  483. //------------------------------------------------------------------------------
  484. type clusterStateHolder struct {
  485. load func() (*clusterState, error)
  486. state atomic.Value
  487. firstErrMu sync.RWMutex
  488. firstErr error
  489. reloading uint32 // atomic
  490. }
  491. func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder {
  492. return &clusterStateHolder{
  493. load: fn,
  494. }
  495. }
  496. func (c *clusterStateHolder) Reload() (*clusterState, error) {
  497. state, err := c.reload()
  498. if err != nil {
  499. return nil, err
  500. }
  501. if !state.IsConsistent() {
  502. time.AfterFunc(time.Second, c.LazyReload)
  503. }
  504. return state, nil
  505. }
  506. func (c *clusterStateHolder) reload() (*clusterState, error) {
  507. state, err := c.load()
  508. if err != nil {
  509. c.firstErrMu.Lock()
  510. if c.firstErr == nil {
  511. c.firstErr = err
  512. }
  513. c.firstErrMu.Unlock()
  514. return nil, err
  515. }
  516. c.state.Store(state)
  517. return state, nil
  518. }
  519. func (c *clusterStateHolder) LazyReload() {
  520. if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
  521. return
  522. }
  523. go func() {
  524. defer atomic.StoreUint32(&c.reloading, 0)
  525. for {
  526. state, err := c.reload()
  527. if err != nil {
  528. return
  529. }
  530. time.Sleep(100 * time.Millisecond)
  531. if state.IsConsistent() {
  532. return
  533. }
  534. }
  535. }()
  536. }
  537. func (c *clusterStateHolder) Get() (*clusterState, error) {
  538. v := c.state.Load()
  539. if v != nil {
  540. state := v.(*clusterState)
  541. if time.Since(state.createdAt) > time.Minute {
  542. c.LazyReload()
  543. }
  544. return state, nil
  545. }
  546. c.firstErrMu.RLock()
  547. err := c.firstErr
  548. c.firstErrMu.RUnlock()
  549. if err != nil {
  550. return nil, err
  551. }
  552. return nil, errors.New("redis: cluster has no state")
  553. }
  554. func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
  555. state, err := c.Reload()
  556. if err == nil {
  557. return state, nil
  558. }
  559. return c.Get()
  560. }
  561. //------------------------------------------------------------------------------
  562. // ClusterClient is a Redis Cluster client representing a pool of zero
  563. // or more underlying connections. It's safe for concurrent use by
  564. // multiple goroutines.
  565. type ClusterClient struct {
  566. cmdable
  567. ctx context.Context
  568. opt *ClusterOptions
  569. nodes *clusterNodes
  570. state *clusterStateHolder
  571. cmdsInfoCache *cmdsInfoCache
  572. process func(Cmder) error
  573. processPipeline func([]Cmder) error
  574. processTxPipeline func([]Cmder) error
  575. }
  576. // NewClusterClient returns a Redis Cluster client as described in
  577. // http://redis.io/topics/cluster-spec.
  578. func NewClusterClient(opt *ClusterOptions) *ClusterClient {
  579. opt.init()
  580. c := &ClusterClient{
  581. opt: opt,
  582. nodes: newClusterNodes(opt),
  583. }
  584. c.state = newClusterStateHolder(c.loadState)
  585. c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
  586. c.process = c.defaultProcess
  587. c.processPipeline = c.defaultProcessPipeline
  588. c.processTxPipeline = c.defaultProcessTxPipeline
  589. c.init()
  590. _, _ = c.state.Reload()
  591. _, _ = c.cmdsInfoCache.Get()
  592. if opt.IdleCheckFrequency > 0 {
  593. go c.reaper(opt.IdleCheckFrequency)
  594. }
  595. return c
  596. }
  597. // ReloadState reloads cluster state. It calls ClusterSlots func
  598. // to get cluster slots information.
  599. func (c *ClusterClient) ReloadState() error {
  600. _, err := c.state.Reload()
  601. return err
  602. }
  603. func (c *ClusterClient) init() {
  604. c.cmdable.setProcessor(c.Process)
  605. }
  606. func (c *ClusterClient) Context() context.Context {
  607. if c.ctx != nil {
  608. return c.ctx
  609. }
  610. return context.Background()
  611. }
  612. func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
  613. if ctx == nil {
  614. panic("nil context")
  615. }
  616. c2 := c.copy()
  617. c2.ctx = ctx
  618. return c2
  619. }
  620. func (c *ClusterClient) copy() *ClusterClient {
  621. cp := *c
  622. cp.init()
  623. return &cp
  624. }
  625. // Options returns read-only Options that were used to create the client.
  626. func (c *ClusterClient) Options() *ClusterOptions {
  627. return c.opt
  628. }
  629. func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
  630. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  631. }
  632. func (c *ClusterClient) cmdsInfo() (map[string]*CommandInfo, error) {
  633. addrs, err := c.nodes.Addrs()
  634. if err != nil {
  635. return nil, err
  636. }
  637. var firstErr error
  638. for _, addr := range addrs {
  639. node, err := c.nodes.Get(addr)
  640. if err != nil {
  641. return nil, err
  642. }
  643. if node == nil {
  644. continue
  645. }
  646. info, err := node.Client.Command().Result()
  647. if err == nil {
  648. return info, nil
  649. }
  650. if firstErr == nil {
  651. firstErr = err
  652. }
  653. }
  654. return nil, firstErr
  655. }
  656. func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
  657. cmdsInfo, err := c.cmdsInfoCache.Get()
  658. if err != nil {
  659. return nil
  660. }
  661. info := cmdsInfo[name]
  662. if info == nil {
  663. internal.Logf("info for cmd=%s not found", name)
  664. }
  665. return info
  666. }
  667. func cmdSlot(cmd Cmder, pos int) int {
  668. if pos == 0 {
  669. return hashtag.RandomSlot()
  670. }
  671. firstKey := cmd.stringArg(pos)
  672. return hashtag.Slot(firstKey)
  673. }
  674. func (c *ClusterClient) cmdSlot(cmd Cmder) int {
  675. cmdInfo := c.cmdInfo(cmd.Name())
  676. return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
  677. }
  678. func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
  679. state, err := c.state.Get()
  680. if err != nil {
  681. return 0, nil, err
  682. }
  683. cmdInfo := c.cmdInfo(cmd.Name())
  684. slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
  685. if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
  686. if c.opt.RouteByLatency {
  687. node, err := state.slotClosestNode(slot)
  688. return slot, node, err
  689. }
  690. if c.opt.RouteRandomly {
  691. node := state.slotRandomNode(slot)
  692. return slot, node, nil
  693. }
  694. node, err := state.slotSlaveNode(slot)
  695. return slot, node, err
  696. }
  697. node, err := state.slotMasterNode(slot)
  698. return slot, node, err
  699. }
  700. func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
  701. state, err := c.state.Get()
  702. if err != nil {
  703. return nil, err
  704. }
  705. nodes := state.slotNodes(slot)
  706. if len(nodes) > 0 {
  707. return nodes[0], nil
  708. }
  709. return c.nodes.Random()
  710. }
  711. func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
  712. if len(keys) == 0 {
  713. return fmt.Errorf("redis: Watch requires at least one key")
  714. }
  715. slot := hashtag.Slot(keys[0])
  716. for _, key := range keys[1:] {
  717. if hashtag.Slot(key) != slot {
  718. err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
  719. return err
  720. }
  721. }
  722. node, err := c.slotMasterNode(slot)
  723. if err != nil {
  724. return err
  725. }
  726. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  727. if attempt > 0 {
  728. time.Sleep(c.retryBackoff(attempt))
  729. }
  730. err = node.Client.Watch(fn, keys...)
  731. if err == nil {
  732. break
  733. }
  734. if internal.IsRetryableError(err, true) {
  735. c.state.LazyReload()
  736. continue
  737. }
  738. moved, ask, addr := internal.IsMovedError(err)
  739. if moved || ask {
  740. c.state.LazyReload()
  741. node, err = c.nodes.GetOrCreate(addr)
  742. if err != nil {
  743. return err
  744. }
  745. continue
  746. }
  747. if err == pool.ErrClosed {
  748. node, err = c.slotMasterNode(slot)
  749. if err != nil {
  750. return err
  751. }
  752. continue
  753. }
  754. return err
  755. }
  756. return err
  757. }
  758. // Close closes the cluster client, releasing any open resources.
  759. //
  760. // It is rare to Close a ClusterClient, as the ClusterClient is meant
  761. // to be long-lived and shared between many goroutines.
  762. func (c *ClusterClient) Close() error {
  763. return c.nodes.Close()
  764. }
  765. // Do creates a Cmd from the args and processes the cmd.
  766. func (c *ClusterClient) Do(args ...interface{}) *Cmd {
  767. cmd := NewCmd(args...)
  768. c.Process(cmd)
  769. return cmd
  770. }
  771. func (c *ClusterClient) WrapProcess(
  772. fn func(oldProcess func(Cmder) error) func(Cmder) error,
  773. ) {
  774. c.process = fn(c.process)
  775. }
  776. func (c *ClusterClient) Process(cmd Cmder) error {
  777. return c.process(cmd)
  778. }
  779. func (c *ClusterClient) defaultProcess(cmd Cmder) error {
  780. var node *clusterNode
  781. var ask bool
  782. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  783. if attempt > 0 {
  784. time.Sleep(c.retryBackoff(attempt))
  785. }
  786. if node == nil {
  787. var err error
  788. _, node, err = c.cmdSlotAndNode(cmd)
  789. if err != nil {
  790. cmd.setErr(err)
  791. break
  792. }
  793. }
  794. var err error
  795. if ask {
  796. pipe := node.Client.Pipeline()
  797. _ = pipe.Process(NewCmd("ASKING"))
  798. _ = pipe.Process(cmd)
  799. _, err = pipe.Exec()
  800. _ = pipe.Close()
  801. ask = false
  802. } else {
  803. err = node.Client.Process(cmd)
  804. }
  805. // If there is no error - we are done.
  806. if err == nil {
  807. break
  808. }
  809. // If slave is loading - read from master.
  810. if c.opt.ReadOnly && internal.IsLoadingError(err) {
  811. node.MarkAsLoading()
  812. continue
  813. }
  814. if internal.IsRetryableError(err, true) {
  815. c.state.LazyReload()
  816. // First retry the same node.
  817. if attempt == 0 {
  818. continue
  819. }
  820. // Second try random node.
  821. node, err = c.nodes.Random()
  822. if err != nil {
  823. break
  824. }
  825. continue
  826. }
  827. var moved bool
  828. var addr string
  829. moved, ask, addr = internal.IsMovedError(err)
  830. if moved || ask {
  831. c.state.LazyReload()
  832. node, err = c.nodes.GetOrCreate(addr)
  833. if err != nil {
  834. break
  835. }
  836. continue
  837. }
  838. if err == pool.ErrClosed {
  839. node = nil
  840. continue
  841. }
  842. break
  843. }
  844. return cmd.Err()
  845. }
  846. // ForEachMaster concurrently calls the fn on each master node in the cluster.
  847. // It returns the first error if any.
  848. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
  849. state, err := c.state.ReloadOrGet()
  850. if err != nil {
  851. return err
  852. }
  853. var wg sync.WaitGroup
  854. errCh := make(chan error, 1)
  855. for _, master := range state.Masters {
  856. wg.Add(1)
  857. go func(node *clusterNode) {
  858. defer wg.Done()
  859. err := fn(node.Client)
  860. if err != nil {
  861. select {
  862. case errCh <- err:
  863. default:
  864. }
  865. }
  866. }(master)
  867. }
  868. wg.Wait()
  869. select {
  870. case err := <-errCh:
  871. return err
  872. default:
  873. return nil
  874. }
  875. }
  876. // ForEachSlave concurrently calls the fn on each slave node in the cluster.
  877. // It returns the first error if any.
  878. func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
  879. state, err := c.state.ReloadOrGet()
  880. if err != nil {
  881. return err
  882. }
  883. var wg sync.WaitGroup
  884. errCh := make(chan error, 1)
  885. for _, slave := range state.Slaves {
  886. wg.Add(1)
  887. go func(node *clusterNode) {
  888. defer wg.Done()
  889. err := fn(node.Client)
  890. if err != nil {
  891. select {
  892. case errCh <- err:
  893. default:
  894. }
  895. }
  896. }(slave)
  897. }
  898. wg.Wait()
  899. select {
  900. case err := <-errCh:
  901. return err
  902. default:
  903. return nil
  904. }
  905. }
  906. // ForEachNode concurrently calls the fn on each known node in the cluster.
  907. // It returns the first error if any.
  908. func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
  909. state, err := c.state.ReloadOrGet()
  910. if err != nil {
  911. return err
  912. }
  913. var wg sync.WaitGroup
  914. errCh := make(chan error, 1)
  915. worker := func(node *clusterNode) {
  916. defer wg.Done()
  917. err := fn(node.Client)
  918. if err != nil {
  919. select {
  920. case errCh <- err:
  921. default:
  922. }
  923. }
  924. }
  925. for _, node := range state.Masters {
  926. wg.Add(1)
  927. go worker(node)
  928. }
  929. for _, node := range state.Slaves {
  930. wg.Add(1)
  931. go worker(node)
  932. }
  933. wg.Wait()
  934. select {
  935. case err := <-errCh:
  936. return err
  937. default:
  938. return nil
  939. }
  940. }
  941. // PoolStats returns accumulated connection pool stats.
  942. func (c *ClusterClient) PoolStats() *PoolStats {
  943. var acc PoolStats
  944. state, _ := c.state.Get()
  945. if state == nil {
  946. return &acc
  947. }
  948. for _, node := range state.Masters {
  949. s := node.Client.connPool.Stats()
  950. acc.Hits += s.Hits
  951. acc.Misses += s.Misses
  952. acc.Timeouts += s.Timeouts
  953. acc.TotalConns += s.TotalConns
  954. acc.IdleConns += s.IdleConns
  955. acc.StaleConns += s.StaleConns
  956. }
  957. for _, node := range state.Slaves {
  958. s := node.Client.connPool.Stats()
  959. acc.Hits += s.Hits
  960. acc.Misses += s.Misses
  961. acc.Timeouts += s.Timeouts
  962. acc.TotalConns += s.TotalConns
  963. acc.IdleConns += s.IdleConns
  964. acc.StaleConns += s.StaleConns
  965. }
  966. return &acc
  967. }
  968. func (c *ClusterClient) loadState() (*clusterState, error) {
  969. if c.opt.ClusterSlots != nil {
  970. slots, err := c.opt.ClusterSlots()
  971. if err != nil {
  972. return nil, err
  973. }
  974. return newClusterState(c.nodes, slots, "")
  975. }
  976. addrs, err := c.nodes.Addrs()
  977. if err != nil {
  978. return nil, err
  979. }
  980. var firstErr error
  981. for _, addr := range addrs {
  982. node, err := c.nodes.GetOrCreate(addr)
  983. if err != nil {
  984. if firstErr == nil {
  985. firstErr = err
  986. }
  987. continue
  988. }
  989. slots, err := node.Client.ClusterSlots().Result()
  990. if err != nil {
  991. if firstErr == nil {
  992. firstErr = err
  993. }
  994. continue
  995. }
  996. return newClusterState(c.nodes, slots, node.Client.opt.Addr)
  997. }
  998. return nil, firstErr
  999. }
  1000. // reaper closes idle connections to the cluster.
  1001. func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
  1002. ticker := time.NewTicker(idleCheckFrequency)
  1003. defer ticker.Stop()
  1004. for range ticker.C {
  1005. nodes, err := c.nodes.All()
  1006. if err != nil {
  1007. break
  1008. }
  1009. for _, node := range nodes {
  1010. _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
  1011. if err != nil {
  1012. internal.Logf("ReapStaleConns failed: %s", err)
  1013. }
  1014. }
  1015. }
  1016. }
  1017. func (c *ClusterClient) Pipeline() Pipeliner {
  1018. pipe := Pipeline{
  1019. exec: c.processPipeline,
  1020. }
  1021. pipe.statefulCmdable.setProcessor(pipe.Process)
  1022. return &pipe
  1023. }
  1024. func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  1025. return c.Pipeline().Pipelined(fn)
  1026. }
  1027. func (c *ClusterClient) WrapProcessPipeline(
  1028. fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
  1029. ) {
  1030. c.processPipeline = fn(c.processPipeline)
  1031. }
  1032. func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
  1033. cmdsMap, err := c.mapCmdsByNode(cmds)
  1034. if err != nil {
  1035. setCmdsErr(cmds, err)
  1036. return err
  1037. }
  1038. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1039. if attempt > 0 {
  1040. time.Sleep(c.retryBackoff(attempt))
  1041. }
  1042. failedCmds := make(map[*clusterNode][]Cmder)
  1043. for node, cmds := range cmdsMap {
  1044. cn, err := node.Client.getConn()
  1045. if err != nil {
  1046. if err == pool.ErrClosed {
  1047. c.remapCmds(cmds, failedCmds)
  1048. } else {
  1049. setCmdsErr(cmds, err)
  1050. }
  1051. continue
  1052. }
  1053. err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
  1054. if err == nil || internal.IsRedisError(err) {
  1055. node.Client.connPool.Put(cn)
  1056. } else {
  1057. node.Client.connPool.Remove(cn)
  1058. }
  1059. }
  1060. if len(failedCmds) == 0 {
  1061. break
  1062. }
  1063. cmdsMap = failedCmds
  1064. }
  1065. return cmdsFirstErr(cmds)
  1066. }
  1067. func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
  1068. state, err := c.state.Get()
  1069. if err != nil {
  1070. setCmdsErr(cmds, err)
  1071. return nil, err
  1072. }
  1073. cmdsMap := make(map[*clusterNode][]Cmder)
  1074. cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
  1075. for _, cmd := range cmds {
  1076. var node *clusterNode
  1077. var err error
  1078. if cmdsAreReadOnly {
  1079. _, node, err = c.cmdSlotAndNode(cmd)
  1080. } else {
  1081. slot := c.cmdSlot(cmd)
  1082. node, err = state.slotMasterNode(slot)
  1083. }
  1084. if err != nil {
  1085. return nil, err
  1086. }
  1087. cmdsMap[node] = append(cmdsMap[node], cmd)
  1088. }
  1089. return cmdsMap, nil
  1090. }
  1091. func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
  1092. for _, cmd := range cmds {
  1093. cmdInfo := c.cmdInfo(cmd.Name())
  1094. if cmdInfo == nil || !cmdInfo.ReadOnly {
  1095. return false
  1096. }
  1097. }
  1098. return true
  1099. }
  1100. func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) {
  1101. remappedCmds, err := c.mapCmdsByNode(cmds)
  1102. if err != nil {
  1103. setCmdsErr(cmds, err)
  1104. return
  1105. }
  1106. for node, cmds := range remappedCmds {
  1107. failedCmds[node] = cmds
  1108. }
  1109. }
  1110. func (c *ClusterClient) pipelineProcessCmds(
  1111. node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
  1112. ) error {
  1113. err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
  1114. return writeCmd(wr, cmds...)
  1115. })
  1116. if err != nil {
  1117. setCmdsErr(cmds, err)
  1118. failedCmds[node] = cmds
  1119. return err
  1120. }
  1121. err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
  1122. return c.pipelineReadCmds(rd, cmds, failedCmds)
  1123. })
  1124. return err
  1125. }
  1126. func (c *ClusterClient) pipelineReadCmds(
  1127. rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
  1128. ) error {
  1129. for _, cmd := range cmds {
  1130. err := cmd.readReply(rd)
  1131. if err == nil {
  1132. continue
  1133. }
  1134. if c.checkMovedErr(cmd, err, failedCmds) {
  1135. continue
  1136. }
  1137. if internal.IsRedisError(err) {
  1138. continue
  1139. }
  1140. return err
  1141. }
  1142. return nil
  1143. }
  1144. func (c *ClusterClient) checkMovedErr(
  1145. cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder,
  1146. ) bool {
  1147. moved, ask, addr := internal.IsMovedError(err)
  1148. if moved {
  1149. c.state.LazyReload()
  1150. node, err := c.nodes.GetOrCreate(addr)
  1151. if err != nil {
  1152. return false
  1153. }
  1154. failedCmds[node] = append(failedCmds[node], cmd)
  1155. return true
  1156. }
  1157. if ask {
  1158. node, err := c.nodes.GetOrCreate(addr)
  1159. if err != nil {
  1160. return false
  1161. }
  1162. failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
  1163. return true
  1164. }
  1165. return false
  1166. }
  1167. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  1168. func (c *ClusterClient) TxPipeline() Pipeliner {
  1169. pipe := Pipeline{
  1170. exec: c.processTxPipeline,
  1171. }
  1172. pipe.statefulCmdable.setProcessor(pipe.Process)
  1173. return &pipe
  1174. }
  1175. func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  1176. return c.TxPipeline().Pipelined(fn)
  1177. }
  1178. func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
  1179. state, err := c.state.Get()
  1180. if err != nil {
  1181. return err
  1182. }
  1183. cmdsMap := c.mapCmdsBySlot(cmds)
  1184. for slot, cmds := range cmdsMap {
  1185. node, err := state.slotMasterNode(slot)
  1186. if err != nil {
  1187. setCmdsErr(cmds, err)
  1188. continue
  1189. }
  1190. cmdsMap := map[*clusterNode][]Cmder{node: cmds}
  1191. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1192. if attempt > 0 {
  1193. time.Sleep(c.retryBackoff(attempt))
  1194. }
  1195. failedCmds := make(map[*clusterNode][]Cmder)
  1196. for node, cmds := range cmdsMap {
  1197. cn, err := node.Client.getConn()
  1198. if err != nil {
  1199. if err == pool.ErrClosed {
  1200. c.remapCmds(cmds, failedCmds)
  1201. } else {
  1202. setCmdsErr(cmds, err)
  1203. }
  1204. continue
  1205. }
  1206. err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
  1207. if err == nil || internal.IsRedisError(err) {
  1208. node.Client.connPool.Put(cn)
  1209. } else {
  1210. node.Client.connPool.Remove(cn)
  1211. }
  1212. }
  1213. if len(failedCmds) == 0 {
  1214. break
  1215. }
  1216. cmdsMap = failedCmds
  1217. }
  1218. }
  1219. return cmdsFirstErr(cmds)
  1220. }
  1221. func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
  1222. cmdsMap := make(map[int][]Cmder)
  1223. for _, cmd := range cmds {
  1224. slot := c.cmdSlot(cmd)
  1225. cmdsMap[slot] = append(cmdsMap[slot], cmd)
  1226. }
  1227. return cmdsMap
  1228. }
  1229. func (c *ClusterClient) txPipelineProcessCmds(
  1230. node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
  1231. ) error {
  1232. err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
  1233. return txPipelineWriteMulti(wr, cmds)
  1234. })
  1235. if err != nil {
  1236. setCmdsErr(cmds, err)
  1237. failedCmds[node] = cmds
  1238. return err
  1239. }
  1240. err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
  1241. err := c.txPipelineReadQueued(rd, cmds, failedCmds)
  1242. if err != nil {
  1243. setCmdsErr(cmds, err)
  1244. return err
  1245. }
  1246. return pipelineReadCmds(rd, cmds)
  1247. })
  1248. return err
  1249. }
  1250. func (c *ClusterClient) txPipelineReadQueued(
  1251. rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
  1252. ) error {
  1253. // Parse queued replies.
  1254. var statusCmd StatusCmd
  1255. if err := statusCmd.readReply(rd); err != nil {
  1256. return err
  1257. }
  1258. for _, cmd := range cmds {
  1259. err := statusCmd.readReply(rd)
  1260. if err == nil {
  1261. continue
  1262. }
  1263. if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) {
  1264. continue
  1265. }
  1266. return err
  1267. }
  1268. // Parse number of replies.
  1269. line, err := rd.ReadLine()
  1270. if err != nil {
  1271. if err == Nil {
  1272. err = TxFailedErr
  1273. }
  1274. return err
  1275. }
  1276. switch line[0] {
  1277. case proto.ErrorReply:
  1278. err := proto.ParseErrorReply(line)
  1279. for _, cmd := range cmds {
  1280. if !c.checkMovedErr(cmd, err, failedCmds) {
  1281. break
  1282. }
  1283. }
  1284. return err
  1285. case proto.ArrayReply:
  1286. // ok
  1287. default:
  1288. err := fmt.Errorf("redis: expected '*', but got line %q", line)
  1289. return err
  1290. }
  1291. return nil
  1292. }
  1293. func (c *ClusterClient) pubSub(channels []string) *PubSub {
  1294. var node *clusterNode
  1295. pubsub := &PubSub{
  1296. opt: c.opt.clientOptions(),
  1297. newConn: func(channels []string) (*pool.Conn, error) {
  1298. if node == nil {
  1299. var slot int
  1300. if len(channels) > 0 {
  1301. slot = hashtag.Slot(channels[0])
  1302. } else {
  1303. slot = -1
  1304. }
  1305. masterNode, err := c.slotMasterNode(slot)
  1306. if err != nil {
  1307. return nil, err
  1308. }
  1309. node = masterNode
  1310. }
  1311. return node.Client.newConn()
  1312. },
  1313. closeConn: func(cn *pool.Conn) error {
  1314. return node.Client.connPool.CloseConn(cn)
  1315. },
  1316. }
  1317. pubsub.init()
  1318. return pubsub
  1319. }
  1320. // Subscribe subscribes the client to the specified channels.
  1321. // Channels can be omitted to create empty subscription.
  1322. func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
  1323. pubsub := c.pubSub(channels)
  1324. if len(channels) > 0 {
  1325. _ = pubsub.Subscribe(channels...)
  1326. }
  1327. return pubsub
  1328. }
  1329. // PSubscribe subscribes the client to the given patterns.
  1330. // Patterns can be omitted to create empty subscription.
  1331. func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
  1332. pubsub := c.pubSub(channels)
  1333. if len(channels) > 0 {
  1334. _ = pubsub.PSubscribe(channels...)
  1335. }
  1336. return pubsub
  1337. }
  1338. func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
  1339. for _, n := range nodes {
  1340. if n == node {
  1341. return nodes
  1342. }
  1343. }
  1344. return append(nodes, node)
  1345. }
  1346. func appendIfNotExists(ss []string, es ...string) []string {
  1347. loop:
  1348. for _, e := range es {
  1349. for _, s := range ss {
  1350. if s == e {
  1351. continue loop
  1352. }
  1353. }
  1354. ss = append(ss, e)
  1355. }
  1356. return ss
  1357. }
  1358. func remove(ss []string, es ...string) []string {
  1359. if len(es) == 0 {
  1360. return ss[:0]
  1361. }
  1362. for _, e := range es {
  1363. for i, s := range ss {
  1364. if s == e {
  1365. ss = append(ss[:i], ss[i+1:]...)
  1366. break
  1367. }
  1368. }
  1369. }
  1370. return ss
  1371. }