Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
228 lines
5.3 KiB
228 lines
5.3 KiB
package couchbase |
|
|
|
import ( |
|
"encoding/json" |
|
"fmt" |
|
"github.com/couchbase/goutils/logging" |
|
"io" |
|
"io/ioutil" |
|
"math/rand" |
|
"net" |
|
"net/http" |
|
"time" |
|
"unsafe" |
|
) |
|
|
|
// Bucket auto-updater gets the latest version of the bucket config from |
|
// the server. If the configuration has changed then updated the local |
|
// bucket information. If the bucket has been deleted then notify anyone |
|
// who is holding a reference to this bucket |
|
|
|
const MAX_RETRY_COUNT = 5 |
|
const DISCONNECT_PERIOD = 120 * time.Second |
|
|
|
type NotifyFn func(bucket string, err error) |
|
type StreamingFn func(bucket *Bucket) |
|
|
|
// Use TCP keepalive to detect half close sockets |
|
var updaterTransport http.RoundTripper = &http.Transport{ |
|
Proxy: http.ProxyFromEnvironment, |
|
Dial: (&net.Dialer{ |
|
Timeout: 30 * time.Second, |
|
KeepAlive: 30 * time.Second, |
|
}).Dial, |
|
} |
|
|
|
var updaterHTTPClient = &http.Client{Transport: updaterTransport} |
|
|
|
func doHTTPRequestForUpdate(req *http.Request) (*http.Response, error) { |
|
|
|
var err error |
|
var res *http.Response |
|
|
|
for i := 0; i < HTTP_MAX_RETRY; i++ { |
|
res, err = updaterHTTPClient.Do(req) |
|
if err != nil && isHttpConnError(err) { |
|
continue |
|
} |
|
break |
|
} |
|
|
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
return res, err |
|
} |
|
|
|
func (b *Bucket) RunBucketUpdater(notify NotifyFn) { |
|
b.RunBucketUpdater2(nil, notify) |
|
} |
|
|
|
func (b *Bucket) RunBucketUpdater2(streamingFn StreamingFn, notify NotifyFn) { |
|
go func() { |
|
err := b.UpdateBucket2(streamingFn) |
|
if err != nil { |
|
if notify != nil { |
|
notify(b.GetName(), err) |
|
} |
|
logging.Errorf(" Bucket Updater exited with err %v", err) |
|
} |
|
}() |
|
} |
|
|
|
func (b *Bucket) replaceConnPools2(with []*connectionPool, bucketLocked bool) { |
|
if !bucketLocked { |
|
b.Lock() |
|
defer b.Unlock() |
|
} |
|
old := b.connPools |
|
b.connPools = unsafe.Pointer(&with) |
|
if old != nil { |
|
for _, pool := range *(*[]*connectionPool)(old) { |
|
if pool != nil && pool.inUse == false { |
|
pool.Close() |
|
} |
|
} |
|
} |
|
return |
|
} |
|
|
|
func (b *Bucket) UpdateBucket() error { |
|
return b.UpdateBucket2(nil) |
|
} |
|
|
|
func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error { |
|
var failures int |
|
var returnErr error |
|
var poolServices PoolServices |
|
|
|
for { |
|
|
|
if failures == MAX_RETRY_COUNT { |
|
logging.Errorf(" Maximum failures reached. Exiting loop...") |
|
return fmt.Errorf("Max failures reached. Last Error %v", returnErr) |
|
} |
|
|
|
nodes := b.Nodes() |
|
if len(nodes) < 1 { |
|
return fmt.Errorf("No healthy nodes found") |
|
} |
|
|
|
startNode := rand.Intn(len(nodes)) |
|
node := nodes[(startNode)%len(nodes)] |
|
|
|
streamUrl := fmt.Sprintf("http://%s/pools/default/bucketsStreaming/%s", node.Hostname, uriAdj(b.GetName())) |
|
logging.Infof(" Trying with %s", streamUrl) |
|
req, err := http.NewRequest("GET", streamUrl, nil) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
// Lock here to avoid having pool closed under us. |
|
b.RLock() |
|
err = maybeAddAuth(req, b.pool.client.ah) |
|
b.RUnlock() |
|
if err != nil { |
|
return err |
|
} |
|
|
|
res, err := doHTTPRequestForUpdate(req) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
if res.StatusCode != 200 { |
|
bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) |
|
logging.Errorf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod) |
|
res.Body.Close() |
|
returnErr = fmt.Errorf("Failed to connect to host. Status %v Body %s", res.StatusCode, bod) |
|
failures++ |
|
continue |
|
} |
|
|
|
dec := json.NewDecoder(res.Body) |
|
|
|
tmpb := &Bucket{} |
|
for { |
|
|
|
err := dec.Decode(&tmpb) |
|
if err != nil { |
|
returnErr = err |
|
res.Body.Close() |
|
break |
|
} |
|
|
|
// if we got here, reset failure count |
|
failures = 0 |
|
|
|
if b.pool.client.tlsConfig != nil { |
|
poolServices, err = b.pool.client.GetPoolServices("default") |
|
if err != nil { |
|
returnErr = err |
|
res.Body.Close() |
|
break |
|
} |
|
} |
|
|
|
b.Lock() |
|
|
|
// mark all the old connection pools for deletion |
|
pools := b.getConnPools(true /* already locked */) |
|
for _, pool := range pools { |
|
if pool != nil { |
|
pool.inUse = false |
|
} |
|
} |
|
|
|
newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList)) |
|
for i := range newcps { |
|
// get the old connection pool and check if it is still valid |
|
pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */) |
|
if pool != nil && pool.inUse == false && pool.tlsConfig == b.pool.client.tlsConfig { |
|
// if the hostname and index is unchanged then reuse this pool |
|
newcps[i] = pool |
|
pool.inUse = true |
|
continue |
|
} |
|
// else create a new pool |
|
var encrypted bool |
|
hostport := tmpb.VBSMJson.ServerList[i] |
|
if b.pool.client.tlsConfig != nil { |
|
hostport, encrypted, err = MapKVtoSSL(hostport, &poolServices) |
|
if err != nil { |
|
b.Unlock() |
|
return err |
|
} |
|
} |
|
if b.ah != nil { |
|
newcps[i] = newConnectionPool(hostport, |
|
b.ah, false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted) |
|
|
|
} else { |
|
newcps[i] = newConnectionPool(hostport, |
|
b.authHandler(true /* bucket already locked */), |
|
false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted) |
|
} |
|
} |
|
|
|
b.replaceConnPools2(newcps, true /* bucket already locked */) |
|
|
|
tmpb.ah = b.ah |
|
b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson) |
|
b.nodeList = unsafe.Pointer(&tmpb.NodesJSON) |
|
b.Unlock() |
|
|
|
if streamingFn != nil { |
|
streamingFn(tmpb) |
|
} |
|
logging.Debugf("Got new configuration for bucket %s", b.GetName()) |
|
|
|
} |
|
// we are here because of an error |
|
failures++ |
|
continue |
|
|
|
} |
|
return nil |
|
}
|
|
|