Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
143 lines
3.4 KiB
143 lines
3.4 KiB
package couchbase |
|
|
|
import ( |
|
"github.com/couchbase/gomemcached/client" |
|
"github.com/couchbase/goutils/logging" |
|
"sync" |
|
"time" |
|
) |
|
|
|
const initialRetryInterval = 1 * time.Second |
|
const maximumRetryInterval = 30 * time.Second |
|
|
|
// A TapFeed streams mutation events from a bucket. |
|
// |
|
// Events from the bucket can be read from the channel 'C'. Remember |
|
// to call Close() on it when you're done, unless its channel has |
|
// closed itself already. |
|
type TapFeed struct { |
|
C <-chan memcached.TapEvent |
|
|
|
bucket *Bucket |
|
args *memcached.TapArguments |
|
nodeFeeds []*memcached.TapFeed // The TAP feeds of the individual nodes |
|
output chan memcached.TapEvent // Same as C but writeably-typed |
|
wg sync.WaitGroup |
|
quit chan bool |
|
} |
|
|
|
// StartTapFeed creates and starts a new Tap feed |
|
func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error) { |
|
if args == nil { |
|
defaultArgs := memcached.DefaultTapArguments() |
|
args = &defaultArgs |
|
} |
|
|
|
feed := &TapFeed{ |
|
bucket: b, |
|
args: args, |
|
output: make(chan memcached.TapEvent, 10), |
|
quit: make(chan bool), |
|
} |
|
|
|
go feed.run() |
|
|
|
feed.C = feed.output |
|
return feed, nil |
|
} |
|
|
|
// Goroutine that runs the feed |
|
func (feed *TapFeed) run() { |
|
retryInterval := initialRetryInterval |
|
bucketOK := true |
|
for { |
|
// Connect to the TAP feed of each server node: |
|
if bucketOK { |
|
killSwitch, err := feed.connectToNodes() |
|
if err == nil { |
|
// Run until one of the sub-feeds fails: |
|
select { |
|
case <-killSwitch: |
|
case <-feed.quit: |
|
return |
|
} |
|
feed.closeNodeFeeds() |
|
retryInterval = initialRetryInterval |
|
} |
|
} |
|
|
|
// On error, try to refresh the bucket in case the list of nodes changed: |
|
logging.Infof("go-couchbase: TAP connection lost; reconnecting to bucket %q in %v", |
|
feed.bucket.Name, retryInterval) |
|
err := feed.bucket.Refresh() |
|
bucketOK = err == nil |
|
|
|
select { |
|
case <-time.After(retryInterval): |
|
case <-feed.quit: |
|
return |
|
} |
|
if retryInterval *= 2; retryInterval > maximumRetryInterval { |
|
retryInterval = maximumRetryInterval |
|
} |
|
} |
|
} |
|
|
|
func (feed *TapFeed) connectToNodes() (killSwitch chan bool, err error) { |
|
killSwitch = make(chan bool) |
|
for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) { |
|
var singleFeed *memcached.TapFeed |
|
singleFeed, err = serverConn.StartTapFeed(feed.args) |
|
if err != nil { |
|
logging.Errorf("go-couchbase: Error connecting to tap feed of %s: %v", serverConn.host, err) |
|
feed.closeNodeFeeds() |
|
return |
|
} |
|
feed.nodeFeeds = append(feed.nodeFeeds, singleFeed) |
|
go feed.forwardTapEvents(singleFeed, killSwitch, serverConn.host) |
|
feed.wg.Add(1) |
|
} |
|
return |
|
} |
|
|
|
// Goroutine that forwards Tap events from a single node's feed to the aggregate feed. |
|
func (feed *TapFeed) forwardTapEvents(singleFeed *memcached.TapFeed, killSwitch chan bool, host string) { |
|
defer feed.wg.Done() |
|
for { |
|
select { |
|
case event, ok := <-singleFeed.C: |
|
if !ok { |
|
if singleFeed.Error != nil { |
|
logging.Errorf("go-couchbase: Tap feed from %s failed: %v", host, singleFeed.Error) |
|
} |
|
killSwitch <- true |
|
return |
|
} |
|
feed.output <- event |
|
case <-feed.quit: |
|
return |
|
} |
|
} |
|
} |
|
|
|
func (feed *TapFeed) closeNodeFeeds() { |
|
for _, f := range feed.nodeFeeds { |
|
f.Close() |
|
} |
|
feed.nodeFeeds = nil |
|
} |
|
|
|
// Close a Tap feed. |
|
func (feed *TapFeed) Close() error { |
|
select { |
|
case <-feed.quit: |
|
return nil |
|
default: |
|
} |
|
|
|
feed.closeNodeFeeds() |
|
close(feed.quit) |
|
feed.wg.Wait() |
|
close(feed.output) |
|
return nil |
|
}
|
|
|