123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- package redis
-
- import (
- "sync"
-
- "github.com/go-redis/redis/internal/pool"
- )
-
- type pipelineExecer func([]Cmder) error
-
- type Pipeliner interface {
- StatefulCmdable
- Process(cmd Cmder) error
- Close() error
- Discard() error
- Exec() ([]Cmder, error)
- }
-
- var _ Pipeliner = (*Pipeline)(nil)
-
- // Pipeline implements pipelining as described in
- // http://redis.io/topics/pipelining. It's safe for concurrent use
- // by multiple goroutines.
- type Pipeline struct {
- statefulCmdable
-
- exec pipelineExecer
-
- mu sync.Mutex
- cmds []Cmder
- closed bool
- }
-
- // Process queues the cmd for later execution.
- func (c *Pipeline) Process(cmd Cmder) error {
- c.mu.Lock()
- c.cmds = append(c.cmds, cmd)
- c.mu.Unlock()
- return nil
- }
-
- // Close closes the pipeline, releasing any open resources.
- func (c *Pipeline) Close() error {
- c.mu.Lock()
- c.discard()
- c.closed = true
- c.mu.Unlock()
- return nil
- }
-
- // Discard resets the pipeline and discards queued commands.
- func (c *Pipeline) Discard() error {
- c.mu.Lock()
- err := c.discard()
- c.mu.Unlock()
- return err
- }
-
- func (c *Pipeline) discard() error {
- if c.closed {
- return pool.ErrClosed
- }
- c.cmds = c.cmds[:0]
- return nil
- }
-
- // Exec executes all previously queued commands using one
- // client-server roundtrip.
- //
- // Exec always returns list of commands and error of the first failed
- // command if any.
- func (c *Pipeline) Exec() ([]Cmder, error) {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- if c.closed {
- return nil, pool.ErrClosed
- }
-
- if len(c.cmds) == 0 {
- return nil, nil
- }
-
- cmds := c.cmds
- c.cmds = nil
-
- return cmds, c.exec(cmds)
- }
-
- func (c *Pipeline) pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- if err := fn(c); err != nil {
- return nil, err
- }
- cmds, err := c.Exec()
- _ = c.Close()
- return cmds, err
- }
-
- func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.pipelined(fn)
- }
-
- func (c *Pipeline) Pipeline() Pipeliner {
- return c
- }
-
- func (c *Pipeline) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
- return c.pipelined(fn)
- }
-
- func (c *Pipeline) TxPipeline() Pipeliner {
- return c
- }
|