Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
291 lines
6.4 KiB
291 lines
6.4 KiB
package redis |
|
|
|
import ( |
|
"errors" |
|
"log" |
|
"net" |
|
"strings" |
|
"sync" |
|
"time" |
|
) |
|
|
|
//------------------------------------------------------------------------------ |
|
|
|
type FailoverOptions struct { |
|
MasterName string |
|
SentinelAddrs []string |
|
|
|
Password string |
|
DB int64 |
|
|
|
PoolSize int |
|
|
|
DialTimeout time.Duration |
|
ReadTimeout time.Duration |
|
WriteTimeout time.Duration |
|
IdleTimeout time.Duration |
|
} |
|
|
|
func (opt *FailoverOptions) getPoolSize() int { |
|
if opt.PoolSize == 0 { |
|
return 10 |
|
} |
|
return opt.PoolSize |
|
} |
|
|
|
func (opt *FailoverOptions) getDialTimeout() time.Duration { |
|
if opt.DialTimeout == 0 { |
|
return 5 * time.Second |
|
} |
|
return opt.DialTimeout |
|
} |
|
|
|
func (opt *FailoverOptions) options() *options { |
|
return &options{ |
|
DB: opt.DB, |
|
Password: opt.Password, |
|
|
|
DialTimeout: opt.getDialTimeout(), |
|
ReadTimeout: opt.ReadTimeout, |
|
WriteTimeout: opt.WriteTimeout, |
|
|
|
PoolSize: opt.getPoolSize(), |
|
IdleTimeout: opt.IdleTimeout, |
|
} |
|
} |
|
|
|
func NewFailoverClient(failoverOpt *FailoverOptions) *Client { |
|
opt := failoverOpt.options() |
|
failover := &sentinelFailover{ |
|
masterName: failoverOpt.MasterName, |
|
sentinelAddrs: failoverOpt.SentinelAddrs, |
|
|
|
opt: opt, |
|
} |
|
return &Client{ |
|
baseClient: &baseClient{ |
|
opt: opt, |
|
connPool: failover.Pool(), |
|
}, |
|
} |
|
} |
|
|
|
//------------------------------------------------------------------------------ |
|
|
|
type sentinelClient struct { |
|
*baseClient |
|
} |
|
|
|
func newSentinel(clOpt *Options) *sentinelClient { |
|
opt := clOpt.options() |
|
opt.Password = "" |
|
opt.DB = 0 |
|
dialer := func() (net.Conn, error) { |
|
return net.DialTimeout("tcp", clOpt.Addr, opt.DialTimeout) |
|
} |
|
return &sentinelClient{ |
|
baseClient: &baseClient{ |
|
opt: opt, |
|
connPool: newConnPool(newConnFunc(dialer), opt), |
|
}, |
|
} |
|
} |
|
|
|
func (c *sentinelClient) PubSub() *PubSub { |
|
return &PubSub{ |
|
baseClient: &baseClient{ |
|
opt: c.opt, |
|
connPool: newSingleConnPool(c.connPool, false), |
|
}, |
|
} |
|
} |
|
|
|
func (c *sentinelClient) GetMasterAddrByName(name string) *StringSliceCmd { |
|
cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name) |
|
c.Process(cmd) |
|
return cmd |
|
} |
|
|
|
func (c *sentinelClient) Sentinels(name string) *SliceCmd { |
|
cmd := NewSliceCmd("SENTINEL", "sentinels", name) |
|
c.Process(cmd) |
|
return cmd |
|
} |
|
|
|
type sentinelFailover struct { |
|
masterName string |
|
sentinelAddrs []string |
|
|
|
opt *options |
|
|
|
pool pool |
|
poolOnce sync.Once |
|
|
|
lock sync.RWMutex |
|
_sentinel *sentinelClient |
|
} |
|
|
|
func (d *sentinelFailover) dial() (net.Conn, error) { |
|
addr, err := d.MasterAddr() |
|
if err != nil { |
|
return nil, err |
|
} |
|
return net.DialTimeout("tcp", addr, d.opt.DialTimeout) |
|
} |
|
|
|
func (d *sentinelFailover) Pool() pool { |
|
d.poolOnce.Do(func() { |
|
d.pool = newConnPool(newConnFunc(d.dial), d.opt) |
|
}) |
|
return d.pool |
|
} |
|
|
|
func (d *sentinelFailover) MasterAddr() (string, error) { |
|
defer d.lock.Unlock() |
|
d.lock.Lock() |
|
|
|
// Try last working sentinel. |
|
if d._sentinel != nil { |
|
addr, err := d._sentinel.GetMasterAddrByName(d.masterName).Result() |
|
if err != nil { |
|
log.Printf("redis-sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err) |
|
d.resetSentinel() |
|
} else { |
|
addr := net.JoinHostPort(addr[0], addr[1]) |
|
log.Printf("redis-sentinel: %q addr is %s", d.masterName, addr) |
|
return addr, nil |
|
} |
|
} |
|
|
|
for i, sentinelAddr := range d.sentinelAddrs { |
|
sentinel := newSentinel(&Options{ |
|
Addr: sentinelAddr, |
|
|
|
DB: d.opt.DB, |
|
Password: d.opt.Password, |
|
|
|
DialTimeout: d.opt.DialTimeout, |
|
ReadTimeout: d.opt.ReadTimeout, |
|
WriteTimeout: d.opt.WriteTimeout, |
|
|
|
PoolSize: d.opt.PoolSize, |
|
IdleTimeout: d.opt.IdleTimeout, |
|
}) |
|
masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result() |
|
if err != nil { |
|
log.Printf("redis-sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err) |
|
sentinel.Close() |
|
continue |
|
} |
|
|
|
// Push working sentinel to the top. |
|
d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0] |
|
|
|
d.setSentinel(sentinel) |
|
addr := net.JoinHostPort(masterAddr[0], masterAddr[1]) |
|
log.Printf("redis-sentinel: %q addr is %s", d.masterName, addr) |
|
return addr, nil |
|
} |
|
|
|
return "", errors.New("redis: all sentinels are unreachable") |
|
} |
|
|
|
func (d *sentinelFailover) setSentinel(sentinel *sentinelClient) { |
|
d.discoverSentinels(sentinel) |
|
d._sentinel = sentinel |
|
go d.listen() |
|
} |
|
|
|
func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) { |
|
sentinels, err := sentinel.Sentinels(d.masterName).Result() |
|
if err != nil { |
|
log.Printf("redis-sentinel: Sentinels %q failed: %s", d.masterName, err) |
|
return |
|
} |
|
for _, sentinel := range sentinels { |
|
vals := sentinel.([]interface{}) |
|
for i := 0; i < len(vals); i += 2 { |
|
key := vals[i].(string) |
|
if key == "name" { |
|
sentinelAddr := vals[i+1].(string) |
|
if !contains(d.sentinelAddrs, sentinelAddr) { |
|
log.Printf( |
|
"redis-sentinel: discovered new %q sentinel: %s", |
|
d.masterName, sentinelAddr, |
|
) |
|
d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr) |
|
} |
|
} |
|
} |
|
} |
|
} |
|
|
|
func (d *sentinelFailover) listen() { |
|
var pubsub *PubSub |
|
for { |
|
if pubsub == nil { |
|
pubsub = d._sentinel.PubSub() |
|
if err := pubsub.Subscribe("+switch-master"); err != nil { |
|
log.Printf("redis-sentinel: Subscribe failed: %s", err) |
|
d.lock.Lock() |
|
d.resetSentinel() |
|
d.lock.Unlock() |
|
return |
|
} |
|
} |
|
|
|
msgIface, err := pubsub.Receive() |
|
if err != nil { |
|
log.Printf("redis-sentinel: Receive failed: %s", err) |
|
pubsub.Close() |
|
return |
|
} |
|
|
|
switch msg := msgIface.(type) { |
|
case *Message: |
|
switch msg.Channel { |
|
case "+switch-master": |
|
parts := strings.Split(msg.Payload, " ") |
|
if parts[0] != d.masterName { |
|
log.Printf("redis-sentinel: ignore new %s addr", parts[0]) |
|
continue |
|
} |
|
addr := net.JoinHostPort(parts[3], parts[4]) |
|
log.Printf( |
|
"redis-sentinel: new %q addr is %s", |
|
d.masterName, addr, |
|
) |
|
d.pool.Filter(func(cn *conn) bool { |
|
if cn.RemoteAddr().String() != addr { |
|
log.Printf( |
|
"redis-sentinel: closing connection to old master %s", |
|
cn.RemoteAddr(), |
|
) |
|
return false |
|
} |
|
return true |
|
}) |
|
default: |
|
log.Printf("redis-sentinel: unsupported message: %s", msg) |
|
} |
|
case *Subscription: |
|
// Ignore. |
|
default: |
|
log.Printf("redis-sentinel: unsupported message: %s", msgIface) |
|
} |
|
} |
|
} |
|
|
|
func (d *sentinelFailover) resetSentinel() { |
|
d._sentinel.Close() |
|
d._sentinel = nil |
|
} |
|
|
|
func contains(slice []string, str string) bool { |
|
for _, s := range slice { |
|
if s == str { |
|
return true |
|
} |
|
} |
|
return false |
|
}
|
|
|