Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2295 lines
70 KiB
2295 lines
70 KiB
// Copyright 2012-present Oliver Eilhard. All rights reserved. |
|
// Use of this source code is governed by a MIT-license. |
|
// See http://olivere.mit-license.org/license.txt for details. |
|
|
|
package elastic |
|
|
|
import ( |
|
"bytes" |
|
"context" |
|
"encoding/json" |
|
"fmt" |
|
"log" |
|
"net/http" |
|
"net/http/httputil" |
|
"net/url" |
|
"os" |
|
"runtime" |
|
"strings" |
|
"sync" |
|
"time" |
|
|
|
"github.com/pkg/errors" |
|
|
|
"github.com/olivere/elastic/v7/config" |
|
) |
|
|
|
const ( |
|
// Version is the current version of Elastic. |
|
Version = "7.0.25" |
|
|
|
// DefaultURL is the default endpoint of Elasticsearch on the local machine. |
|
// It is used e.g. when initializing a new Client without a specific URL. |
|
DefaultURL = "http://127.0.0.1:9200" |
|
|
|
// DefaultScheme is the default protocol scheme to use when sniffing |
|
// the Elasticsearch cluster. |
|
DefaultScheme = "http" |
|
|
|
// DefaultHealthcheckEnabled specifies if healthchecks are enabled by default. |
|
DefaultHealthcheckEnabled = true |
|
|
|
// DefaultHealthcheckTimeoutStartup is the time the healthcheck waits |
|
// for a response from Elasticsearch on startup, i.e. when creating a |
|
// client. After the client is started, a shorter timeout is commonly used |
|
// (its default is specified in DefaultHealthcheckTimeout). |
|
DefaultHealthcheckTimeoutStartup = 5 * time.Second |
|
|
|
// DefaultHealthcheckTimeout specifies the time a running client waits for |
|
// a response from Elasticsearch. Notice that the healthcheck timeout |
|
// when a client is created is larger by default (see DefaultHealthcheckTimeoutStartup). |
|
DefaultHealthcheckTimeout = 1 * time.Second |
|
|
|
// DefaultHealthcheckInterval is the default interval between |
|
// two health checks of the nodes in the cluster. |
|
DefaultHealthcheckInterval = 60 * time.Second |
|
|
|
// DefaultSnifferEnabled specifies if the sniffer is enabled by default. |
|
DefaultSnifferEnabled = true |
|
|
|
// DefaultSnifferInterval is the interval between two sniffing procedures, |
|
// i.e. the lookup of all nodes in the cluster and their addition/removal |
|
// from the list of actual connections. |
|
DefaultSnifferInterval = 15 * time.Minute |
|
|
|
// DefaultSnifferTimeoutStartup is the default timeout for the sniffing |
|
// process that is initiated while creating a new client. For subsequent |
|
// sniffing processes, DefaultSnifferTimeout is used (by default). |
|
DefaultSnifferTimeoutStartup = 5 * time.Second |
|
|
|
// DefaultSnifferTimeout is the default timeout after which the |
|
// sniffing process times out. Notice that for the initial sniffing |
|
// process, DefaultSnifferTimeoutStartup is used. |
|
DefaultSnifferTimeout = 2 * time.Second |
|
|
|
// DefaultSendGetBodyAs is the HTTP method to use when elastic is sending |
|
// a GET request with a body. |
|
DefaultSendGetBodyAs = "GET" |
|
|
|
// DefaultGzipEnabled specifies if gzip compression is enabled by default. |
|
DefaultGzipEnabled = false |
|
|
|
// off is used to disable timeouts. |
|
off = -1 * time.Second |
|
) |
|
|
|
var ( |
|
// ErrNoClient is raised when no Elasticsearch node is available. |
|
ErrNoClient = errors.New("no Elasticsearch node available") |
|
|
|
// ErrRetry is raised when a request cannot be executed after the configured |
|
// number of retries. |
|
ErrRetry = errors.New("cannot connect after several retries") |
|
|
|
// ErrTimeout is raised when a request timed out, e.g. when WaitForStatus |
|
// didn't return in time. |
|
ErrTimeout = errors.New("timeout") |
|
|
|
// noRetries is a retrier that does not retry. |
|
noRetries = NewStopRetrier() |
|
|
|
// noDeprecationLog is a no-op for logging deprecations. |
|
noDeprecationLog = func(*http.Request, *http.Response) {} |
|
) |
|
|
|
// Doer is an interface to perform HTTP requests. |
|
// It can be used for mocking. |
|
type Doer interface { |
|
Do(*http.Request) (*http.Response, error) |
|
} |
|
|
|
// ClientOptionFunc is a function that configures a Client. |
|
// It is used in NewClient. |
|
type ClientOptionFunc func(*Client) error |
|
|
|
// Client is an Elasticsearch client. Create one by calling NewClient. |
|
type Client struct { |
|
c Doer // e.g. a net/*http.Client to use for requests |
|
|
|
connsMu sync.RWMutex // connsMu guards the next block |
|
conns []*conn // all connections |
|
cindex int // index into conns |
|
|
|
mu sync.RWMutex // guards the next block |
|
urls []string // set of URLs passed initially to the client |
|
running bool // true if the client's background processes are running |
|
errorlog Logger // error log for critical messages |
|
infolog Logger // information log for e.g. response times |
|
tracelog Logger // trace log for debugging |
|
deprecationlog func(*http.Request, *http.Response) |
|
scheme string // http or https |
|
healthcheckEnabled bool // healthchecks enabled or disabled |
|
healthcheckTimeoutStartup time.Duration // time the healthcheck waits for a response from Elasticsearch on startup |
|
healthcheckTimeout time.Duration // time the healthcheck waits for a response from Elasticsearch |
|
healthcheckInterval time.Duration // interval between healthchecks |
|
healthcheckStop chan bool // notify healthchecker to stop, and notify back |
|
snifferEnabled bool // sniffer enabled or disabled |
|
snifferTimeoutStartup time.Duration // time the sniffer waits for a response from nodes info API on startup |
|
snifferTimeout time.Duration // time the sniffer waits for a response from nodes info API |
|
snifferInterval time.Duration // interval between sniffing |
|
snifferCallback SnifferCallback // callback to modify the sniffing decision |
|
snifferStop chan bool // notify sniffer to stop, and notify back |
|
decoder Decoder // used to decode data sent from Elasticsearch |
|
basicAuthUsername string // username for HTTP Basic Auth |
|
basicAuthPassword string // password for HTTP Basic Auth |
|
sendGetBodyAs string // override for when sending a GET with a body |
|
gzipEnabled bool // gzip compression enabled or disabled (default) |
|
requiredPlugins []string // list of required plugins |
|
retrier Retrier // strategy for retries |
|
retryStatusCodes []int // HTTP status codes where to retry automatically (with retrier) |
|
headers http.Header // a list of default headers to add to each request |
|
} |
|
|
|
// NewClient creates a new client to work with Elasticsearch. |
|
// |
|
// NewClient, by default, is meant to be long-lived and shared across |
|
// your application. If you need a short-lived client, e.g. for request-scope, |
|
// consider using NewSimpleClient instead. |
|
// |
|
// The caller can configure the new client by passing configuration options |
|
// to the func. |
|
// |
|
// Example: |
|
// |
|
// client, err := elastic.NewClient( |
|
// elastic.SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"), |
|
// elastic.SetBasicAuth("user", "secret")) |
|
// |
|
// If no URL is configured, Elastic uses DefaultURL by default. |
|
// |
|
// If the sniffer is enabled (the default), the new client then sniffes |
|
// the cluster via the Nodes Info API |
|
// (see https://www.elastic.co/guide/en/elasticsearch/reference/7.0/cluster-nodes-info.html#cluster-nodes-info). |
|
// It uses the URLs specified by the caller. The caller is responsible |
|
// to only pass a list of URLs of nodes that belong to the same cluster. |
|
// This sniffing process is run on startup and periodically. |
|
// Use SnifferInterval to set the interval between two sniffs (default is |
|
// 15 minutes). In other words: By default, the client will find new nodes |
|
// in the cluster and remove those that are no longer available every |
|
// 15 minutes. Disable the sniffer by passing SetSniff(false) to NewClient. |
|
// |
|
// The list of nodes found in the sniffing process will be used to make |
|
// connections to the REST API of Elasticsearch. These nodes are also |
|
// periodically checked in a shorter time frame. This process is called |
|
// a health check. By default, a health check is done every 60 seconds. |
|
// You can set a shorter or longer interval by SetHealthcheckInterval. |
|
// Disabling health checks is not recommended, but can be done by |
|
// SetHealthcheck(false). |
|
// |
|
// Connections are automatically marked as dead or healthy while |
|
// making requests to Elasticsearch. When a request fails, Elastic will |
|
// call into the Retry strategy which can be specified with SetRetry. |
|
// The Retry strategy is also responsible for handling backoff i.e. the time |
|
// to wait before starting the next request. There are various standard |
|
// backoff implementations, e.g. ExponentialBackoff or SimpleBackoff. |
|
// Retries are disabled by default. |
|
// |
|
// If no HttpClient is configured, then http.DefaultClient is used. |
|
// You can use your own http.Client with some http.Transport for |
|
// advanced scenarios. |
|
// |
|
// An error is also returned when some configuration option is invalid or |
|
// the new client cannot sniff the cluster (if enabled). |
|
func NewClient(options ...ClientOptionFunc) (*Client, error) { |
|
return DialContext(context.Background(), options...) |
|
} |
|
|
|
// NewClientFromConfig initializes a client from a configuration. |
|
func NewClientFromConfig(cfg *config.Config) (*Client, error) { |
|
options, err := configToOptions(cfg) |
|
if err != nil { |
|
return nil, err |
|
} |
|
return DialContext(context.Background(), options...) |
|
} |
|
|
|
// NewSimpleClient creates a new short-lived Client that can be used in |
|
// use cases where you need e.g. one client per request. |
|
// |
|
// While NewClient by default sets up e.g. periodic health checks |
|
// and sniffing for new nodes in separate goroutines, NewSimpleClient does |
|
// not and is meant as a simple replacement where you don't need all the |
|
// heavy lifting of NewClient. |
|
// |
|
// NewSimpleClient does the following by default: First, all health checks |
|
// are disabled, including timeouts and periodic checks. Second, sniffing |
|
// is disabled, including timeouts and periodic checks. The number of retries |
|
// is set to 1. NewSimpleClient also does not start any goroutines. |
|
// |
|
// Notice that you can still override settings by passing additional options, |
|
// just like with NewClient. |
|
func NewSimpleClient(options ...ClientOptionFunc) (*Client, error) { |
|
c := &Client{ |
|
c: http.DefaultClient, |
|
conns: make([]*conn, 0), |
|
cindex: -1, |
|
scheme: DefaultScheme, |
|
decoder: &DefaultDecoder{}, |
|
healthcheckEnabled: false, |
|
healthcheckTimeoutStartup: off, |
|
healthcheckTimeout: off, |
|
healthcheckInterval: off, |
|
healthcheckStop: make(chan bool), |
|
snifferEnabled: false, |
|
snifferTimeoutStartup: off, |
|
snifferTimeout: off, |
|
snifferInterval: off, |
|
snifferCallback: nopSnifferCallback, |
|
snifferStop: make(chan bool), |
|
sendGetBodyAs: DefaultSendGetBodyAs, |
|
gzipEnabled: DefaultGzipEnabled, |
|
retrier: noRetries, // no retries by default |
|
retryStatusCodes: nil, // no automatic retries for specific HTTP status codes |
|
deprecationlog: noDeprecationLog, |
|
} |
|
|
|
// Run the options on it |
|
for _, option := range options { |
|
if err := option(c); err != nil { |
|
return nil, err |
|
} |
|
} |
|
|
|
// Use a default URL and normalize them |
|
if len(c.urls) == 0 { |
|
c.urls = []string{DefaultURL} |
|
} |
|
c.urls = canonicalize(c.urls...) |
|
|
|
// If the URLs have auth info, use them here as an alternative to SetBasicAuth |
|
if c.basicAuthUsername == "" && c.basicAuthPassword == "" { |
|
for _, urlStr := range c.urls { |
|
u, err := url.Parse(urlStr) |
|
if err == nil && u.User != nil { |
|
c.basicAuthUsername = u.User.Username() |
|
c.basicAuthPassword, _ = u.User.Password() |
|
break |
|
} |
|
} |
|
} |
|
|
|
for _, url := range c.urls { |
|
c.conns = append(c.conns, newConn(url, url)) |
|
} |
|
|
|
// Ensure that we have at least one connection available |
|
if err := c.mustActiveConn(); err != nil { |
|
return nil, err |
|
} |
|
|
|
// Check the required plugins |
|
for _, plugin := range c.requiredPlugins { |
|
found, err := c.HasPlugin(plugin) |
|
if err != nil { |
|
return nil, err |
|
} |
|
if !found { |
|
return nil, fmt.Errorf("elastic: plugin %s not found", plugin) |
|
} |
|
} |
|
|
|
c.mu.Lock() |
|
c.running = true |
|
c.mu.Unlock() |
|
|
|
return c, nil |
|
} |
|
|
|
// Dial will call DialContext with a background context. |
|
func Dial(options ...ClientOptionFunc) (*Client, error) { |
|
return DialContext(context.Background(), options...) |
|
} |
|
|
|
// DialContext will connect to Elasticsearch, just like NewClient does. |
|
// |
|
// The context is honoured in terms of e.g. cancellation. |
|
func DialContext(ctx context.Context, options ...ClientOptionFunc) (*Client, error) { |
|
// Set up the client |
|
c := &Client{ |
|
c: http.DefaultClient, |
|
conns: make([]*conn, 0), |
|
cindex: -1, |
|
scheme: DefaultScheme, |
|
decoder: &DefaultDecoder{}, |
|
healthcheckEnabled: DefaultHealthcheckEnabled, |
|
healthcheckTimeoutStartup: DefaultHealthcheckTimeoutStartup, |
|
healthcheckTimeout: DefaultHealthcheckTimeout, |
|
healthcheckInterval: DefaultHealthcheckInterval, |
|
healthcheckStop: make(chan bool), |
|
snifferEnabled: DefaultSnifferEnabled, |
|
snifferTimeoutStartup: DefaultSnifferTimeoutStartup, |
|
snifferTimeout: DefaultSnifferTimeout, |
|
snifferInterval: DefaultSnifferInterval, |
|
snifferCallback: nopSnifferCallback, |
|
snifferStop: make(chan bool), |
|
sendGetBodyAs: DefaultSendGetBodyAs, |
|
gzipEnabled: DefaultGzipEnabled, |
|
retrier: noRetries, // no retries by default |
|
retryStatusCodes: nil, // no automatic retries for specific HTTP status codes |
|
deprecationlog: noDeprecationLog, |
|
} |
|
|
|
// Run the options on it |
|
for _, option := range options { |
|
if err := option(c); err != nil { |
|
return nil, err |
|
} |
|
} |
|
|
|
// Use a default URL and normalize them |
|
if len(c.urls) == 0 { |
|
c.urls = []string{DefaultURL} |
|
} |
|
c.urls = canonicalize(c.urls...) |
|
|
|
// If the URLs have auth info, use them here as an alternative to SetBasicAuth |
|
if c.basicAuthUsername == "" && c.basicAuthPassword == "" { |
|
for _, urlStr := range c.urls { |
|
u, err := url.Parse(urlStr) |
|
if err == nil && u.User != nil { |
|
c.basicAuthUsername = u.User.Username() |
|
c.basicAuthPassword, _ = u.User.Password() |
|
break |
|
} |
|
} |
|
} |
|
|
|
// Check if we can make a request to any of the specified URLs |
|
if c.healthcheckEnabled { |
|
if err := c.startupHealthcheck(ctx, c.healthcheckTimeoutStartup); err != nil { |
|
return nil, err |
|
} |
|
} |
|
|
|
if c.snifferEnabled { |
|
// Sniff the cluster initially |
|
if err := c.sniff(ctx, c.snifferTimeoutStartup); err != nil { |
|
return nil, err |
|
} |
|
} else { |
|
// Do not sniff the cluster initially. Use the provided URLs instead. |
|
for _, url := range c.urls { |
|
c.conns = append(c.conns, newConn(url, url)) |
|
} |
|
} |
|
|
|
if c.healthcheckEnabled { |
|
// Perform an initial health check |
|
c.healthcheck(ctx, c.healthcheckTimeoutStartup, true) |
|
} |
|
// Ensure that we have at least one connection available |
|
if err := c.mustActiveConn(); err != nil { |
|
return nil, err |
|
} |
|
|
|
// Check the required plugins |
|
for _, plugin := range c.requiredPlugins { |
|
found, err := c.HasPlugin(plugin) |
|
if err != nil { |
|
return nil, err |
|
} |
|
if !found { |
|
return nil, fmt.Errorf("elastic: plugin %s not found", plugin) |
|
} |
|
} |
|
|
|
if c.snifferEnabled { |
|
go c.sniffer() // periodically update cluster information |
|
} |
|
if c.healthcheckEnabled { |
|
go c.healthchecker() // start goroutine periodically ping all nodes of the cluster |
|
} |
|
|
|
c.mu.Lock() |
|
c.running = true |
|
c.mu.Unlock() |
|
|
|
return c, nil |
|
} |
|
|
|
// DialWithConfig will use the configuration settings parsed from config package |
|
// to connect to Elasticsearch. |
|
// |
|
// The context is honoured in terms of e.g. cancellation. |
|
func DialWithConfig(ctx context.Context, cfg *config.Config) (*Client, error) { |
|
options, err := configToOptions(cfg) |
|
if err != nil { |
|
return nil, err |
|
} |
|
return DialContext(ctx, options...) |
|
} |
|
|
|
func configToOptions(cfg *config.Config) ([]ClientOptionFunc, error) { |
|
var options []ClientOptionFunc |
|
if cfg != nil { |
|
if cfg.URL != "" { |
|
options = append(options, SetURL(cfg.URL)) |
|
} |
|
if cfg.Errorlog != "" { |
|
f, err := os.OpenFile(cfg.Errorlog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) |
|
if err != nil { |
|
return nil, errors.Wrap(err, "unable to initialize error log") |
|
} |
|
l := log.New(f, "", 0) |
|
options = append(options, SetErrorLog(l)) |
|
} |
|
if cfg.Tracelog != "" { |
|
f, err := os.OpenFile(cfg.Tracelog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) |
|
if err != nil { |
|
return nil, errors.Wrap(err, "unable to initialize trace log") |
|
} |
|
l := log.New(f, "", 0) |
|
options = append(options, SetTraceLog(l)) |
|
} |
|
if cfg.Infolog != "" { |
|
f, err := os.OpenFile(cfg.Infolog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) |
|
if err != nil { |
|
return nil, errors.Wrap(err, "unable to initialize info log") |
|
} |
|
l := log.New(f, "", 0) |
|
options = append(options, SetInfoLog(l)) |
|
} |
|
if cfg.Username != "" || cfg.Password != "" { |
|
options = append(options, SetBasicAuth(cfg.Username, cfg.Password)) |
|
} |
|
if cfg.Sniff != nil { |
|
options = append(options, SetSniff(*cfg.Sniff)) |
|
} |
|
if cfg.Healthcheck != nil { |
|
options = append(options, SetHealthcheck(*cfg.Healthcheck)) |
|
} |
|
} |
|
return options, nil |
|
} |
|
|
|
// SetHttpClient can be used to specify the http.Client to use when making |
|
// HTTP requests to Elasticsearch. |
|
func SetHttpClient(httpClient Doer) ClientOptionFunc { |
|
return func(c *Client) error { |
|
if httpClient != nil { |
|
c.c = httpClient |
|
} else { |
|
c.c = http.DefaultClient |
|
} |
|
return nil |
|
} |
|
} |
|
|
|
// SetBasicAuth can be used to specify the HTTP Basic Auth credentials to |
|
// use when making HTTP requests to Elasticsearch. |
|
func SetBasicAuth(username, password string) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.basicAuthUsername = username |
|
c.basicAuthPassword = password |
|
return nil |
|
} |
|
} |
|
|
|
// SetURL defines the URL endpoints of the Elasticsearch nodes. Notice that |
|
// when sniffing is enabled, these URLs are used to initially sniff the |
|
// cluster on startup. |
|
func SetURL(urls ...string) ClientOptionFunc { |
|
return func(c *Client) error { |
|
switch len(urls) { |
|
case 0: |
|
c.urls = []string{DefaultURL} |
|
default: |
|
c.urls = urls |
|
} |
|
// Check URLs |
|
for _, urlStr := range c.urls { |
|
if _, err := url.Parse(urlStr); err != nil { |
|
return err |
|
} |
|
} |
|
return nil |
|
} |
|
} |
|
|
|
// SetScheme sets the HTTP scheme to look for when sniffing (http or https). |
|
// This is http by default. |
|
func SetScheme(scheme string) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.scheme = scheme |
|
return nil |
|
} |
|
} |
|
|
|
// SetSniff enables or disables the sniffer (enabled by default). |
|
func SetSniff(enabled bool) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.snifferEnabled = enabled |
|
return nil |
|
} |
|
} |
|
|
|
// SetSnifferTimeoutStartup sets the timeout for the sniffer that is used |
|
// when creating a new client. The default is 5 seconds. Notice that the |
|
// timeout being used for subsequent sniffing processes is set with |
|
// SetSnifferTimeout. |
|
func SetSnifferTimeoutStartup(timeout time.Duration) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.snifferTimeoutStartup = timeout |
|
return nil |
|
} |
|
} |
|
|
|
// SetSnifferTimeout sets the timeout for the sniffer that finds the |
|
// nodes in a cluster. The default is 2 seconds. Notice that the timeout |
|
// used when creating a new client on startup is usually greater and can |
|
// be set with SetSnifferTimeoutStartup. |
|
func SetSnifferTimeout(timeout time.Duration) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.snifferTimeout = timeout |
|
return nil |
|
} |
|
} |
|
|
|
// SetSnifferInterval sets the interval between two sniffing processes. |
|
// The default interval is 15 minutes. |
|
func SetSnifferInterval(interval time.Duration) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.snifferInterval = interval |
|
return nil |
|
} |
|
} |
|
|
|
// SnifferCallback defines the protocol for sniffing decisions. |
|
type SnifferCallback func(*NodesInfoNode) bool |
|
|
|
// nopSnifferCallback is the default sniffer callback: It accepts |
|
// all nodes the sniffer finds. |
|
var nopSnifferCallback = func(*NodesInfoNode) bool { return true } |
|
|
|
// SetSnifferCallback allows the caller to modify sniffer decisions. |
|
// When setting the callback, the given SnifferCallback is called for |
|
// each (healthy) node found during the sniffing process. |
|
// If the callback returns false, the node is ignored: No requests |
|
// are routed to it. |
|
func SetSnifferCallback(f SnifferCallback) ClientOptionFunc { |
|
return func(c *Client) error { |
|
if f != nil { |
|
c.snifferCallback = f |
|
} |
|
return nil |
|
} |
|
} |
|
|
|
// SetHealthcheck enables or disables healthchecks (enabled by default). |
|
func SetHealthcheck(enabled bool) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.healthcheckEnabled = enabled |
|
return nil |
|
} |
|
} |
|
|
|
// SetHealthcheckTimeoutStartup sets the timeout for the initial health check. |
|
// The default timeout is 5 seconds (see DefaultHealthcheckTimeoutStartup). |
|
// Notice that timeouts for subsequent health checks can be modified with |
|
// SetHealthcheckTimeout. |
|
func SetHealthcheckTimeoutStartup(timeout time.Duration) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.healthcheckTimeoutStartup = timeout |
|
return nil |
|
} |
|
} |
|
|
|
// SetHealthcheckTimeout sets the timeout for periodic health checks. |
|
// The default timeout is 1 second (see DefaultHealthcheckTimeout). |
|
// Notice that a different (usually larger) timeout is used for the initial |
|
// healthcheck, which is initiated while creating a new client. |
|
// The startup timeout can be modified with SetHealthcheckTimeoutStartup. |
|
func SetHealthcheckTimeout(timeout time.Duration) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.healthcheckTimeout = timeout |
|
return nil |
|
} |
|
} |
|
|
|
// SetHealthcheckInterval sets the interval between two health checks. |
|
// The default interval is 60 seconds. |
|
func SetHealthcheckInterval(interval time.Duration) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.healthcheckInterval = interval |
|
return nil |
|
} |
|
} |
|
|
|
// SetMaxRetries sets the maximum number of retries before giving up when |
|
// performing a HTTP request to Elasticsearch. |
|
// |
|
// Deprecated: Replace with a Retry implementation. |
|
func SetMaxRetries(maxRetries int) ClientOptionFunc { |
|
return func(c *Client) error { |
|
if maxRetries < 0 { |
|
return errors.New("MaxRetries must be greater than or equal to 0") |
|
} else if maxRetries == 0 { |
|
c.retrier = noRetries |
|
} else { |
|
// Create a Retrier that will wait for 100ms (+/- jitter) between requests. |
|
// This resembles the old behavior with maxRetries. |
|
ticks := make([]int, maxRetries) |
|
for i := 0; i < len(ticks); i++ { |
|
ticks[i] = 100 |
|
} |
|
backoff := NewSimpleBackoff(ticks...) |
|
c.retrier = NewBackoffRetrier(backoff) |
|
} |
|
return nil |
|
} |
|
} |
|
|
|
// SetGzip enables or disables gzip compression (disabled by default). |
|
func SetGzip(enabled bool) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.gzipEnabled = enabled |
|
return nil |
|
} |
|
} |
|
|
|
// SetDecoder sets the Decoder to use when decoding data from Elasticsearch. |
|
// DefaultDecoder is used by default. |
|
func SetDecoder(decoder Decoder) ClientOptionFunc { |
|
return func(c *Client) error { |
|
if decoder != nil { |
|
c.decoder = decoder |
|
} else { |
|
c.decoder = &DefaultDecoder{} |
|
} |
|
return nil |
|
} |
|
} |
|
|
|
// SetRequiredPlugins can be used to indicate that some plugins are required |
|
// before a Client will be created. |
|
func SetRequiredPlugins(plugins ...string) ClientOptionFunc { |
|
return func(c *Client) error { |
|
if c.requiredPlugins == nil { |
|
c.requiredPlugins = make([]string, 0) |
|
} |
|
c.requiredPlugins = append(c.requiredPlugins, plugins...) |
|
return nil |
|
} |
|
} |
|
|
|
// SetErrorLog sets the logger for critical messages like nodes joining |
|
// or leaving the cluster or failing requests. It is nil by default. |
|
func SetErrorLog(logger Logger) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.errorlog = logger |
|
return nil |
|
} |
|
} |
|
|
|
// SetInfoLog sets the logger for informational messages, e.g. requests |
|
// and their response times. It is nil by default. |
|
func SetInfoLog(logger Logger) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.infolog = logger |
|
return nil |
|
} |
|
} |
|
|
|
// SetTraceLog specifies the log.Logger to use for output of HTTP requests |
|
// and responses which is helpful during debugging. It is nil by default. |
|
func SetTraceLog(logger Logger) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.tracelog = logger |
|
return nil |
|
} |
|
} |
|
|
|
// SetSendGetBodyAs specifies the HTTP method to use when sending a GET request |
|
// with a body. It is GET by default. |
|
func SetSendGetBodyAs(httpMethod string) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.sendGetBodyAs = httpMethod |
|
return nil |
|
} |
|
} |
|
|
|
// SetRetrier specifies the retry strategy that handles errors during |
|
// HTTP request/response with Elasticsearch. |
|
func SetRetrier(retrier Retrier) ClientOptionFunc { |
|
return func(c *Client) error { |
|
if retrier == nil { |
|
retrier = noRetries // no retries by default |
|
} |
|
c.retrier = retrier |
|
return nil |
|
} |
|
} |
|
|
|
// SetRetryStatusCodes specifies the HTTP status codes where the client |
|
// will retry automatically. Notice that retries call the specified retrier, |
|
// so calling SetRetryStatusCodes without setting a Retrier won't do anything |
|
// for retries. |
|
func SetRetryStatusCodes(statusCodes ...int) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.retryStatusCodes = statusCodes |
|
return nil |
|
} |
|
} |
|
|
|
// SetHeaders adds a list of default HTTP headers that will be added to |
|
// each requests executed by PerformRequest. |
|
func SetHeaders(headers http.Header) ClientOptionFunc { |
|
return func(c *Client) error { |
|
c.headers = headers |
|
return nil |
|
} |
|
} |
|
|
|
// String returns a string representation of the client status. |
|
func (c *Client) String() string { |
|
c.connsMu.Lock() |
|
conns := c.conns |
|
c.connsMu.Unlock() |
|
|
|
var buf bytes.Buffer |
|
for i, conn := range conns { |
|
if i > 0 { |
|
buf.WriteString(", ") |
|
} |
|
buf.WriteString(conn.String()) |
|
} |
|
return buf.String() |
|
} |
|
|
|
// IsRunning returns true if the background processes of the client are |
|
// running, false otherwise. |
|
func (c *Client) IsRunning() bool { |
|
c.mu.RLock() |
|
defer c.mu.RUnlock() |
|
return c.running |
|
} |
|
|
|
// Start starts the background processes like sniffing the cluster and |
|
// periodic health checks. You don't need to run Start when creating a |
|
// client with NewClient; the background processes are run by default. |
|
// |
|
// If the background processes are already running, this is a no-op. |
|
func (c *Client) Start() { |
|
c.mu.RLock() |
|
if c.running { |
|
c.mu.RUnlock() |
|
return |
|
} |
|
c.mu.RUnlock() |
|
|
|
if c.snifferEnabled { |
|
go c.sniffer() |
|
} |
|
if c.healthcheckEnabled { |
|
go c.healthchecker() |
|
} |
|
|
|
c.mu.Lock() |
|
c.running = true |
|
c.mu.Unlock() |
|
|
|
c.infof("elastic: client started") |
|
} |
|
|
|
// Stop stops the background processes that the client is running, |
|
// i.e. sniffing the cluster periodically and running health checks |
|
// on the nodes. |
|
// |
|
// If the background processes are not running, this is a no-op. |
|
func (c *Client) Stop() { |
|
c.mu.RLock() |
|
if !c.running { |
|
c.mu.RUnlock() |
|
return |
|
} |
|
c.mu.RUnlock() |
|
|
|
if c.healthcheckEnabled { |
|
c.healthcheckStop <- true |
|
<-c.healthcheckStop |
|
} |
|
|
|
if c.snifferEnabled { |
|
c.snifferStop <- true |
|
<-c.snifferStop |
|
} |
|
|
|
c.mu.Lock() |
|
c.running = false |
|
c.mu.Unlock() |
|
|
|
c.infof("elastic: client stopped") |
|
} |
|
|
|
// errorf logs to the error log. |
|
func (c *Client) errorf(format string, args ...interface{}) { |
|
if c.errorlog != nil { |
|
c.errorlog.Printf(format, args...) |
|
} |
|
} |
|
|
|
// infof logs informational messages. |
|
func (c *Client) infof(format string, args ...interface{}) { |
|
if c.infolog != nil { |
|
c.infolog.Printf(format, args...) |
|
} |
|
} |
|
|
|
// tracef logs to the trace log. |
|
func (c *Client) tracef(format string, args ...interface{}) { |
|
if c.tracelog != nil { |
|
c.tracelog.Printf(format, args...) |
|
} |
|
} |
|
|
|
// dumpRequest dumps the given HTTP request to the trace log. |
|
func (c *Client) dumpRequest(r *http.Request) { |
|
if c.tracelog != nil { |
|
out, err := httputil.DumpRequestOut(r, true) |
|
if err == nil { |
|
c.tracef("%s\n", string(out)) |
|
} |
|
} |
|
} |
|
|
|
// dumpResponse dumps the given HTTP response to the trace log. |
|
func (c *Client) dumpResponse(resp *http.Response) { |
|
if c.tracelog != nil { |
|
out, err := httputil.DumpResponse(resp, true) |
|
if err == nil { |
|
c.tracef("%s\n", string(out)) |
|
} |
|
} |
|
} |
|
|
|
// sniffer periodically runs sniff. |
|
func (c *Client) sniffer() { |
|
c.mu.RLock() |
|
timeout := c.snifferTimeout |
|
interval := c.snifferInterval |
|
c.mu.RUnlock() |
|
|
|
ticker := time.NewTicker(interval) |
|
defer ticker.Stop() |
|
|
|
for { |
|
select { |
|
case <-c.snifferStop: |
|
// we are asked to stop, so we signal back that we're stopping now |
|
c.snifferStop <- true |
|
return |
|
case <-ticker.C: |
|
c.sniff(context.Background(), timeout) |
|
} |
|
} |
|
} |
|
|
|
// sniff uses the Node Info API to return the list of nodes in the cluster. |
|
// It uses the list of URLs passed on startup plus the list of URLs found |
|
// by the preceding sniffing process (if sniffing is enabled). |
|
// |
|
// If sniffing is disabled, this is a no-op. |
|
func (c *Client) sniff(parentCtx context.Context, timeout time.Duration) error { |
|
c.mu.RLock() |
|
if !c.snifferEnabled { |
|
c.mu.RUnlock() |
|
return nil |
|
} |
|
|
|
// Use all available URLs provided to sniff the cluster. |
|
var urls []string |
|
urlsMap := make(map[string]bool) |
|
|
|
// Add all URLs provided on startup |
|
for _, url := range c.urls { |
|
urlsMap[url] = true |
|
urls = append(urls, url) |
|
} |
|
c.mu.RUnlock() |
|
|
|
// Add all URLs found by sniffing |
|
c.connsMu.RLock() |
|
for _, conn := range c.conns { |
|
if !conn.IsDead() { |
|
url := conn.URL() |
|
if _, found := urlsMap[url]; !found { |
|
urls = append(urls, url) |
|
} |
|
} |
|
} |
|
c.connsMu.RUnlock() |
|
|
|
if len(urls) == 0 { |
|
return errors.Wrap(ErrNoClient, "no URLs found") |
|
} |
|
|
|
// Start sniffing on all found URLs |
|
ch := make(chan []*conn, len(urls)) |
|
|
|
ctx, cancel := context.WithTimeout(parentCtx, timeout) |
|
defer cancel() |
|
|
|
for _, url := range urls { |
|
go func(url string) { ch <- c.sniffNode(ctx, url) }(url) |
|
} |
|
|
|
// Wait for the results to come back, or the process times out. |
|
for { |
|
select { |
|
case conns := <-ch: |
|
if len(conns) > 0 { |
|
c.updateConns(conns) |
|
return nil |
|
} |
|
case <-ctx.Done(): |
|
if err := ctx.Err(); err != nil { |
|
switch { |
|
case IsContextErr(err): |
|
return err |
|
} |
|
return errors.Wrapf(ErrNoClient, "sniff timeout: %v", err) |
|
} |
|
// We get here if no cluster responds in time |
|
return errors.Wrap(ErrNoClient, "sniff timeout") |
|
} |
|
} |
|
} |
|
|
|
// sniffNode sniffs a single node. This method is run as a goroutine |
|
// in sniff. If successful, it returns the list of node URLs extracted |
|
// from the result of calling Nodes Info API. Otherwise, an empty array |
|
// is returned. |
|
func (c *Client) sniffNode(ctx context.Context, url string) []*conn { |
|
var nodes []*conn |
|
|
|
// Call the Nodes Info API at /_nodes/http |
|
req, err := NewRequest("GET", url+"/_nodes/http") |
|
if err != nil { |
|
return nodes |
|
} |
|
|
|
c.mu.RLock() |
|
if c.basicAuthUsername != "" || c.basicAuthPassword != "" { |
|
req.SetBasicAuth(c.basicAuthUsername, c.basicAuthPassword) |
|
} |
|
c.mu.RUnlock() |
|
|
|
if req.Header.Get("User-Agent") == "" { |
|
req.Header.Add("User-Agent", "elastic/"+Version+" ("+runtime.GOOS+"-"+runtime.GOARCH+")") |
|
} |
|
|
|
res, err := c.c.Do((*http.Request)(req).WithContext(ctx)) |
|
if err != nil { |
|
return nodes |
|
} |
|
defer res.Body.Close() |
|
|
|
var info NodesInfoResponse |
|
if err := json.NewDecoder(res.Body).Decode(&info); err == nil { |
|
if len(info.Nodes) > 0 { |
|
for nodeID, node := range info.Nodes { |
|
if c.snifferCallback(node) { |
|
if node.HTTP != nil && len(node.HTTP.PublishAddress) > 0 { |
|
url := c.extractHostname(c.scheme, node.HTTP.PublishAddress) |
|
if url != "" { |
|
nodes = append(nodes, newConn(nodeID, url)) |
|
} |
|
} |
|
} |
|
} |
|
} |
|
} |
|
return nodes |
|
} |
|
|
|
// extractHostname returns the URL from the http.publish_address setting. |
|
func (c *Client) extractHostname(scheme, address string) string { |
|
var ( |
|
host string |
|
port string |
|
|
|
addrs = strings.Split(address, "/") |
|
ports = strings.Split(address, ":") |
|
) |
|
|
|
if len(addrs) > 1 { |
|
host = addrs[0] |
|
} else { |
|
host = strings.Split(addrs[0], ":")[0] |
|
} |
|
port = ports[len(ports)-1] |
|
|
|
return fmt.Sprintf("%s://%s:%s", scheme, host, port) |
|
} |
|
|
|
// updateConns updates the clients' connections with new information |
|
// gather by a sniff operation. |
|
func (c *Client) updateConns(conns []*conn) { |
|
c.connsMu.Lock() |
|
|
|
// Build up new connections: |
|
// If we find an existing connection, use that (including no. of failures etc.). |
|
// If we find a new connection, add it. |
|
var newConns []*conn |
|
for _, conn := range conns { |
|
var found bool |
|
for _, oldConn := range c.conns { |
|
// Notice that e.g. in a Kubernetes cluster the NodeID might be |
|
// stable while the URL has changed. |
|
if oldConn.NodeID() == conn.NodeID() && oldConn.URL() == conn.URL() { |
|
// Take over the old connection |
|
newConns = append(newConns, oldConn) |
|
found = true |
|
break |
|
} |
|
} |
|
if !found { |
|
// New connection didn't exist, so add it to our list of new conns. |
|
c.infof("elastic: %s joined the cluster", conn.URL()) |
|
newConns = append(newConns, conn) |
|
} |
|
} |
|
|
|
c.conns = newConns |
|
c.cindex = -1 |
|
c.connsMu.Unlock() |
|
} |
|
|
|
// healthchecker periodically runs healthcheck. |
|
func (c *Client) healthchecker() { |
|
c.mu.RLock() |
|
timeout := c.healthcheckTimeout |
|
interval := c.healthcheckInterval |
|
c.mu.RUnlock() |
|
|
|
ticker := time.NewTicker(interval) |
|
defer ticker.Stop() |
|
|
|
for { |
|
select { |
|
case <-c.healthcheckStop: |
|
// we are asked to stop, so we signal back that we're stopping now |
|
c.healthcheckStop <- true |
|
return |
|
case <-ticker.C: |
|
c.healthcheck(context.Background(), timeout, false) |
|
} |
|
} |
|
} |
|
|
|
// healthcheck does a health check on all nodes in the cluster. Depending on |
|
// the node state, it marks connections as dead, sets them alive etc. |
|
// If healthchecks are disabled and force is false, this is a no-op. |
|
// The timeout specifies how long to wait for a response from Elasticsearch. |
|
func (c *Client) healthcheck(parentCtx context.Context, timeout time.Duration, force bool) { |
|
c.mu.RLock() |
|
if !c.healthcheckEnabled && !force { |
|
c.mu.RUnlock() |
|
return |
|
} |
|
headers := c.headers |
|
basicAuth := c.basicAuthUsername != "" || c.basicAuthPassword != "" |
|
basicAuthUsername := c.basicAuthUsername |
|
basicAuthPassword := c.basicAuthPassword |
|
c.mu.RUnlock() |
|
|
|
c.connsMu.RLock() |
|
conns := c.conns |
|
c.connsMu.RUnlock() |
|
|
|
for _, conn := range conns { |
|
// Run the HEAD request against ES with a timeout |
|
ctx, cancel := context.WithTimeout(parentCtx, timeout) |
|
defer cancel() |
|
|
|
// Goroutine executes the HTTP request, returns an error and sets status |
|
var status int |
|
errc := make(chan error, 1) |
|
go func(url string) { |
|
req, err := NewRequest("HEAD", url) |
|
if err != nil { |
|
errc <- err |
|
return |
|
} |
|
if basicAuth { |
|
req.SetBasicAuth(basicAuthUsername, basicAuthPassword) |
|
} |
|
if len(headers) > 0 { |
|
for key, values := range headers { |
|
for _, v := range values { |
|
req.Header.Add(key, v) |
|
} |
|
} |
|
} |
|
if req.Header.Get("User-Agent") == "" { |
|
req.Header.Add("User-Agent", "elastic/"+Version+" ("+runtime.GOOS+"-"+runtime.GOARCH+")") |
|
} |
|
res, err := c.c.Do((*http.Request)(req).WithContext(ctx)) |
|
if res != nil { |
|
status = res.StatusCode |
|
if res.Body != nil { |
|
res.Body.Close() |
|
} |
|
} |
|
errc <- err |
|
}(conn.URL()) |
|
|
|
// Wait for the Goroutine (or its timeout) |
|
select { |
|
case <-ctx.Done(): // timeout |
|
c.errorf("elastic: %s is dead", conn.URL()) |
|
conn.MarkAsDead() |
|
case err := <-errc: |
|
if err != nil { |
|
c.errorf("elastic: %s is dead", conn.URL()) |
|
conn.MarkAsDead() |
|
break |
|
} |
|
if status >= 200 && status < 300 { |
|
conn.MarkAsAlive() |
|
} else { |
|
conn.MarkAsDead() |
|
c.errorf("elastic: %s is dead [status=%d]", conn.URL(), status) |
|
} |
|
} |
|
} |
|
} |
|
|
|
// startupHealthcheck is used at startup to check if the server is available |
|
// at all. |
|
func (c *Client) startupHealthcheck(parentCtx context.Context, timeout time.Duration) error { |
|
c.mu.Lock() |
|
urls := c.urls |
|
headers := c.headers |
|
basicAuth := c.basicAuthUsername != "" || c.basicAuthPassword != "" |
|
basicAuthUsername := c.basicAuthUsername |
|
basicAuthPassword := c.basicAuthPassword |
|
c.mu.Unlock() |
|
|
|
// If we don't get a connection after "timeout", we bail. |
|
var lastErr error |
|
start := time.Now() |
|
done := false |
|
for !done { |
|
for _, url := range urls { |
|
req, err := http.NewRequest("HEAD", url, nil) |
|
if err != nil { |
|
return err |
|
} |
|
if basicAuth { |
|
req.SetBasicAuth(basicAuthUsername, basicAuthPassword) |
|
} |
|
if len(headers) > 0 { |
|
for key, values := range headers { |
|
for _, v := range values { |
|
req.Header.Add(key, v) |
|
} |
|
} |
|
} |
|
ctx, cancel := context.WithTimeout(parentCtx, timeout) |
|
defer cancel() |
|
req = req.WithContext(ctx) |
|
res, err := c.c.Do(req) |
|
if err != nil { |
|
lastErr = err |
|
} else if res.StatusCode >= 200 && res.StatusCode < 300 { |
|
return nil |
|
} else if res.StatusCode == http.StatusUnauthorized { |
|
lastErr = &Error{Status: res.StatusCode} |
|
} |
|
} |
|
select { |
|
case <-parentCtx.Done(): |
|
lastErr = parentCtx.Err() |
|
done = true |
|
case <-time.After(1 * time.Second): |
|
if time.Since(start) > timeout { |
|
done = true |
|
} |
|
} |
|
} |
|
if lastErr != nil { |
|
if IsContextErr(lastErr) || IsUnauthorized(lastErr) { |
|
return lastErr |
|
} |
|
return errors.Wrapf(ErrNoClient, "health check timeout: %v", lastErr) |
|
} |
|
return errors.Wrap(ErrNoClient, "health check timeout") |
|
} |
|
|
|
// next returns the next available connection, or ErrNoClient. |
|
func (c *Client) next() (*conn, error) { |
|
// We do round-robin here. |
|
// TODO(oe) This should be a pluggable strategy, like the Selector in the official clients. |
|
c.connsMu.Lock() |
|
defer c.connsMu.Unlock() |
|
|
|
i := 0 |
|
numConns := len(c.conns) |
|
for { |
|
i++ |
|
if i > numConns { |
|
break // we visited all conns: they all seem to be dead |
|
} |
|
c.cindex++ |
|
if c.cindex >= numConns { |
|
c.cindex = 0 |
|
} |
|
conn := c.conns[c.cindex] |
|
if !conn.IsDead() { |
|
return conn, nil |
|
} |
|
} |
|
|
|
// We have a deadlock here: All nodes are marked as dead. |
|
// If sniffing is disabled, connections will never be marked alive again. |
|
// So we are marking them as alive--if sniffing is disabled. |
|
// They'll then be picked up in the next call to PerformRequest. |
|
if !c.snifferEnabled { |
|
c.errorf("elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns)) |
|
for _, conn := range c.conns { |
|
conn.MarkAsAlive() |
|
} |
|
} |
|
|
|
// We tried hard, but there is no node available |
|
return nil, errors.Wrap(ErrNoClient, "no available connection") |
|
} |
|
|
|
// mustActiveConn returns nil if there is an active connection, |
|
// otherwise ErrNoClient is returned. |
|
func (c *Client) mustActiveConn() error { |
|
c.connsMu.Lock() |
|
defer c.connsMu.Unlock() |
|
|
|
for _, c := range c.conns { |
|
if !c.IsDead() { |
|
return nil |
|
} |
|
} |
|
return errors.Wrap(ErrNoClient, "no active connection found") |
|
} |
|
|
|
// -- PerformRequest -- |
|
|
|
// PerformRequestOptions must be passed into PerformRequest. |
|
type PerformRequestOptions struct { |
|
Method string |
|
Path string |
|
Params url.Values |
|
Body interface{} |
|
ContentType string |
|
IgnoreErrors []int |
|
Retrier Retrier |
|
RetryStatusCodes []int |
|
Headers http.Header |
|
MaxResponseSize int64 |
|
Stream bool |
|
} |
|
|
|
// PerformRequest does a HTTP request to Elasticsearch. |
|
// It returns a response (which might be nil) and an error on failure. |
|
// |
|
// Optionally, a list of HTTP error codes to ignore can be passed. |
|
// This is necessary for services that expect e.g. HTTP status 404 as a |
|
// valid outcome (Exists, IndicesExists, IndicesTypeExists). |
|
// |
|
// If Stream is set, the returned BodyReader field must be closed, even |
|
// if PerformRequest returns an error. |
|
func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) (*Response, error) { |
|
start := time.Now().UTC() |
|
|
|
c.mu.RLock() |
|
timeout := c.healthcheckTimeout |
|
basicAuth := c.basicAuthUsername != "" || c.basicAuthPassword != "" |
|
basicAuthUsername := c.basicAuthUsername |
|
basicAuthPassword := c.basicAuthPassword |
|
sendGetBodyAs := c.sendGetBodyAs |
|
gzipEnabled := c.gzipEnabled |
|
healthcheckEnabled := c.healthcheckEnabled |
|
retrier := c.retrier |
|
if opt.Retrier != nil { |
|
retrier = opt.Retrier |
|
} |
|
retryStatusCodes := c.retryStatusCodes |
|
if opt.RetryStatusCodes != nil { |
|
retryStatusCodes = opt.RetryStatusCodes |
|
} |
|
defaultHeaders := c.headers |
|
c.mu.RUnlock() |
|
|
|
// retry returns true if statusCode indicates the request is to be retried |
|
retry := func(statusCode int) bool { |
|
for _, code := range retryStatusCodes { |
|
if code == statusCode { |
|
return true |
|
} |
|
} |
|
return false |
|
} |
|
|
|
var err error |
|
var conn *conn |
|
var req *Request |
|
var resp *Response |
|
var retried bool |
|
var n int |
|
|
|
// Change method if sendGetBodyAs is specified. |
|
if opt.Method == "GET" && opt.Body != nil && sendGetBodyAs != "GET" { |
|
opt.Method = sendGetBodyAs |
|
} |
|
|
|
for { |
|
pathWithParams := opt.Path |
|
if len(opt.Params) > 0 { |
|
pathWithParams += "?" + opt.Params.Encode() |
|
} |
|
|
|
// Get a connection |
|
conn, err = c.next() |
|
if errors.Cause(err) == ErrNoClient { |
|
n++ |
|
if !retried { |
|
// Force a healtcheck as all connections seem to be dead. |
|
c.healthcheck(ctx, timeout, false) |
|
if healthcheckEnabled { |
|
retried = true |
|
continue |
|
} |
|
} |
|
wait, ok, rerr := retrier.Retry(ctx, n, nil, nil, err) |
|
if rerr != nil { |
|
return nil, rerr |
|
} |
|
if !ok { |
|
return nil, err |
|
} |
|
retried = true |
|
time.Sleep(wait) |
|
continue // try again |
|
} |
|
if err != nil { |
|
c.errorf("elastic: cannot get connection from pool") |
|
return nil, err |
|
} |
|
|
|
req, err = NewRequest(opt.Method, conn.URL()+pathWithParams) |
|
if err != nil { |
|
c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(opt.Method), conn.URL()+pathWithParams, err) |
|
return nil, err |
|
} |
|
if basicAuth { |
|
req.SetBasicAuth(basicAuthUsername, basicAuthPassword) |
|
} |
|
if opt.ContentType != "" { |
|
req.Header.Set("Content-Type", opt.ContentType) |
|
} |
|
for key, value := range opt.Headers { |
|
for _, v := range value { |
|
req.Header.Add(key, v) |
|
} |
|
} |
|
if len(defaultHeaders) > 0 { |
|
for key, value := range defaultHeaders { |
|
for _, v := range value { |
|
req.Header.Add(key, v) |
|
} |
|
} |
|
} |
|
if req.Header.Get("User-Agent") == "" { |
|
req.Header.Set("User-Agent", "elastic/"+Version+" ("+runtime.GOOS+"-"+runtime.GOARCH+")") |
|
} |
|
|
|
// Set body |
|
if opt.Body != nil { |
|
err = req.SetBody(opt.Body, gzipEnabled) |
|
if err != nil { |
|
c.errorf("elastic: couldn't set body %+v for request: %v", opt.Body, err) |
|
return nil, err |
|
} |
|
} |
|
|
|
// Tracing |
|
c.dumpRequest((*http.Request)(req)) |
|
|
|
// Get response |
|
res, err := c.c.Do((*http.Request)(req).WithContext(ctx)) |
|
if IsContextErr(err) { |
|
// Proceed, but don't mark the node as dead |
|
return nil, err |
|
} |
|
if err != nil { |
|
n++ |
|
wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err) |
|
if rerr != nil { |
|
c.errorf("elastic: %s is dead", conn.URL()) |
|
conn.MarkAsDead() |
|
return nil, rerr |
|
} |
|
if !ok { |
|
c.errorf("elastic: %s is dead", conn.URL()) |
|
conn.MarkAsDead() |
|
return nil, err |
|
} |
|
retried = true |
|
time.Sleep(wait) |
|
continue // try again |
|
} |
|
if retry(res.StatusCode) { |
|
n++ |
|
wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err) |
|
if rerr != nil { |
|
c.errorf("elastic: %s is dead", conn.URL()) |
|
conn.MarkAsDead() |
|
return nil, rerr |
|
} |
|
if ok { |
|
// retry |
|
retried = true |
|
time.Sleep(wait) |
|
continue // try again |
|
} |
|
} |
|
|
|
if !opt.Stream { |
|
defer res.Body.Close() |
|
} |
|
|
|
// Tracing |
|
c.dumpResponse(res) |
|
|
|
// Log deprecation warnings as errors |
|
if len(res.Header["Warning"]) > 0 { |
|
c.deprecationlog((*http.Request)(req), res) |
|
for _, warning := range res.Header["Warning"] { |
|
c.errorf("Deprecation warning: %s", warning) |
|
} |
|
} |
|
|
|
// Check for errors |
|
if err := checkResponse((*http.Request)(req), res, opt.IgnoreErrors...); err != nil { |
|
// No retry if request succeeded |
|
// We still try to return a response. |
|
resp, _ = c.newResponse(res, opt.MaxResponseSize, opt.Stream) |
|
return resp, err |
|
} |
|
|
|
// We successfully made a request with this connection |
|
conn.MarkAsHealthy() |
|
|
|
resp, err = c.newResponse(res, opt.MaxResponseSize, opt.Stream) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
break |
|
} |
|
|
|
duration := time.Now().UTC().Sub(start) |
|
c.infof("%s %s [status:%d, request:%.3fs]", |
|
strings.ToUpper(opt.Method), |
|
req.URL, |
|
resp.StatusCode, |
|
float64(int64(duration/time.Millisecond))/1000) |
|
|
|
return resp, nil |
|
} |
|
|
|
// -- Document APIs -- |
|
|
|
// Index a document. |
|
func (c *Client) Index() *IndexService { |
|
return NewIndexService(c) |
|
} |
|
|
|
// Get a document. |
|
func (c *Client) Get() *GetService { |
|
return NewGetService(c) |
|
} |
|
|
|
// MultiGet retrieves multiple documents in one roundtrip. |
|
func (c *Client) MultiGet() *MgetService { |
|
return NewMgetService(c) |
|
} |
|
|
|
// Mget retrieves multiple documents in one roundtrip. |
|
func (c *Client) Mget() *MgetService { |
|
return NewMgetService(c) |
|
} |
|
|
|
// Delete a document. |
|
func (c *Client) Delete() *DeleteService { |
|
return NewDeleteService(c) |
|
} |
|
|
|
// DeleteByQuery deletes documents as found by a query. |
|
func (c *Client) DeleteByQuery(indices ...string) *DeleteByQueryService { |
|
return NewDeleteByQueryService(c).Index(indices...) |
|
} |
|
|
|
// Update a document. |
|
func (c *Client) Update() *UpdateService { |
|
return NewUpdateService(c) |
|
} |
|
|
|
// UpdateByQuery performs an update on a set of documents. |
|
func (c *Client) UpdateByQuery(indices ...string) *UpdateByQueryService { |
|
return NewUpdateByQueryService(c).Index(indices...) |
|
} |
|
|
|
// Bulk is the entry point to mass insert/update/delete documents. |
|
func (c *Client) Bulk() *BulkService { |
|
return NewBulkService(c) |
|
} |
|
|
|
// BulkProcessor allows setting up a concurrent processor of bulk requests. |
|
func (c *Client) BulkProcessor() *BulkProcessorService { |
|
return NewBulkProcessorService(c) |
|
} |
|
|
|
// Reindex copies data from a source index into a destination index. |
|
// |
|
// See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/docs-reindex.html |
|
// for details on the Reindex API. |
|
func (c *Client) Reindex() *ReindexService { |
|
return NewReindexService(c) |
|
} |
|
|
|
// TermVectors returns information and statistics on terms in the fields |
|
// of a particular document. |
|
func (c *Client) TermVectors(index string) *TermvectorsService { |
|
builder := NewTermvectorsService(c) |
|
builder = builder.Index(index) |
|
return builder |
|
} |
|
|
|
// MultiTermVectors returns information and statistics on terms in the fields |
|
// of multiple documents. |
|
func (c *Client) MultiTermVectors() *MultiTermvectorService { |
|
return NewMultiTermvectorService(c) |
|
} |
|
|
|
// -- Search APIs -- |
|
|
|
// Search is the entry point for searches. |
|
func (c *Client) Search(indices ...string) *SearchService { |
|
return NewSearchService(c).Index(indices...) |
|
} |
|
|
|
// MultiSearch is the entry point for multi searches. |
|
func (c *Client) MultiSearch() *MultiSearchService { |
|
return NewMultiSearchService(c) |
|
} |
|
|
|
// Count documents. |
|
func (c *Client) Count(indices ...string) *CountService { |
|
return NewCountService(c).Index(indices...) |
|
} |
|
|
|
// Explain computes a score explanation for a query and a specific document. |
|
func (c *Client) Explain(index, typ, id string) *ExplainService { |
|
return NewExplainService(c).Index(index).Type(typ).Id(id) |
|
} |
|
|
|
// TODO Search Template |
|
// TODO Search Exists API |
|
|
|
// Validate allows a user to validate a potentially expensive query without executing it. |
|
func (c *Client) Validate(indices ...string) *ValidateService { |
|
return NewValidateService(c).Index(indices...) |
|
} |
|
|
|
// SearchShards returns statistical information about nodes and shards. |
|
func (c *Client) SearchShards(indices ...string) *SearchShardsService { |
|
return NewSearchShardsService(c).Index(indices...) |
|
} |
|
|
|
// FieldCaps returns statistical information about fields in indices. |
|
func (c *Client) FieldCaps(indices ...string) *FieldCapsService { |
|
return NewFieldCapsService(c).Index(indices...) |
|
} |
|
|
|
// Exists checks if a document exists. |
|
func (c *Client) Exists() *ExistsService { |
|
return NewExistsService(c) |
|
} |
|
|
|
// Scroll through documents. Use this to efficiently scroll through results |
|
// while returning the results to a client. |
|
func (c *Client) Scroll(indices ...string) *ScrollService { |
|
return NewScrollService(c).Index(indices...) |
|
} |
|
|
|
// ClearScroll can be used to clear search contexts manually. |
|
func (c *Client) ClearScroll(scrollIds ...string) *ClearScrollService { |
|
return NewClearScrollService(c).ScrollId(scrollIds...) |
|
} |
|
|
|
// OpenPointInTime opens a new Point in Time. |
|
func (c *Client) OpenPointInTime(indices ...string) *OpenPointInTimeService { |
|
return NewOpenPointInTimeService(c).Index(indices...) |
|
} |
|
|
|
// ClosePointInTime closes an existing Point in Time. |
|
func (c *Client) ClosePointInTime(id string) *ClosePointInTimeService { |
|
return NewClosePointInTimeService(c).ID(id) |
|
} |
|
|
|
// -- Indices APIs -- |
|
|
|
// CreateIndex returns a service to create a new index. |
|
func (c *Client) CreateIndex(name string) *IndicesCreateService { |
|
return NewIndicesCreateService(c).Index(name) |
|
} |
|
|
|
// DeleteIndex returns a service to delete an index. |
|
func (c *Client) DeleteIndex(indices ...string) *IndicesDeleteService { |
|
return NewIndicesDeleteService(c).Index(indices) |
|
} |
|
|
|
// IndexExists allows to check if an index exists. |
|
func (c *Client) IndexExists(indices ...string) *IndicesExistsService { |
|
return NewIndicesExistsService(c).Index(indices) |
|
} |
|
|
|
// ShrinkIndex returns a service to shrink one index into another. |
|
func (c *Client) ShrinkIndex(source, target string) *IndicesShrinkService { |
|
return NewIndicesShrinkService(c).Source(source).Target(target) |
|
} |
|
|
|
// RolloverIndex rolls an alias over to a new index when the existing index |
|
// is considered to be too large or too old. |
|
func (c *Client) RolloverIndex(alias string) *IndicesRolloverService { |
|
return NewIndicesRolloverService(c).Alias(alias) |
|
} |
|
|
|
// IndexStats provides statistics on different operations happining |
|
// in one or more indices. |
|
func (c *Client) IndexStats(indices ...string) *IndicesStatsService { |
|
return NewIndicesStatsService(c).Index(indices...) |
|
} |
|
|
|
// OpenIndex opens an index. |
|
func (c *Client) OpenIndex(name string) *IndicesOpenService { |
|
return NewIndicesOpenService(c).Index(name) |
|
} |
|
|
|
// CloseIndex closes an index. |
|
func (c *Client) CloseIndex(name string) *IndicesCloseService { |
|
return NewIndicesCloseService(c).Index(name) |
|
} |
|
|
|
// FreezeIndex freezes an index. |
|
func (c *Client) FreezeIndex(name string) *IndicesFreezeService { |
|
return NewIndicesFreezeService(c).Index(name) |
|
} |
|
|
|
// UnfreezeIndex unfreezes an index. |
|
func (c *Client) UnfreezeIndex(name string) *IndicesUnfreezeService { |
|
return NewIndicesUnfreezeService(c).Index(name) |
|
} |
|
|
|
// IndexGet retrieves information about one or more indices. |
|
// IndexGet is only available for Elasticsearch 1.4 or later. |
|
func (c *Client) IndexGet(indices ...string) *IndicesGetService { |
|
return NewIndicesGetService(c).Index(indices...) |
|
} |
|
|
|
// IndexGetSettings retrieves settings of all, one or more indices. |
|
func (c *Client) IndexGetSettings(indices ...string) *IndicesGetSettingsService { |
|
return NewIndicesGetSettingsService(c).Index(indices...) |
|
} |
|
|
|
// IndexPutSettings sets settings for all, one or more indices. |
|
func (c *Client) IndexPutSettings(indices ...string) *IndicesPutSettingsService { |
|
return NewIndicesPutSettingsService(c).Index(indices...) |
|
} |
|
|
|
// IndexSegments retrieves low level segment information for all, one or more indices. |
|
func (c *Client) IndexSegments(indices ...string) *IndicesSegmentsService { |
|
return NewIndicesSegmentsService(c).Index(indices...) |
|
} |
|
|
|
// IndexAnalyze performs the analysis process on a text and returns the |
|
// token breakdown of the text. |
|
func (c *Client) IndexAnalyze() *IndicesAnalyzeService { |
|
return NewIndicesAnalyzeService(c) |
|
} |
|
|
|
// Forcemerge optimizes one or more indices. |
|
// It replaces the deprecated Optimize API. |
|
func (c *Client) Forcemerge(indices ...string) *IndicesForcemergeService { |
|
return NewIndicesForcemergeService(c).Index(indices...) |
|
} |
|
|
|
// Refresh asks Elasticsearch to refresh one or more indices. |
|
func (c *Client) Refresh(indices ...string) *RefreshService { |
|
return NewRefreshService(c).Index(indices...) |
|
} |
|
|
|
// Flush asks Elasticsearch to free memory from the index and |
|
// flush data to disk. |
|
func (c *Client) Flush(indices ...string) *IndicesFlushService { |
|
return NewIndicesFlushService(c).Index(indices...) |
|
} |
|
|
|
// SyncedFlush performs a synced flush. |
|
// |
|
// See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/indices-synced-flush.html |
|
// for more details on synched flushes and how they differ from a normal |
|
// Flush. |
|
func (c *Client) SyncedFlush(indices ...string) *IndicesSyncedFlushService { |
|
return NewIndicesSyncedFlushService(c).Index(indices...) |
|
} |
|
|
|
// ClearCache clears caches for one or more indices. |
|
func (c *Client) ClearCache(indices ...string) *IndicesClearCacheService { |
|
return NewIndicesClearCacheService(c).Index(indices...) |
|
} |
|
|
|
// Alias enables the caller to add and/or remove aliases. |
|
func (c *Client) Alias() *AliasService { |
|
return NewAliasService(c) |
|
} |
|
|
|
// Aliases returns aliases by index name(s). |
|
func (c *Client) Aliases() *AliasesService { |
|
return NewAliasesService(c) |
|
} |
|
|
|
// -- Legacy templates -- |
|
|
|
// IndexGetTemplate gets an index template (v1/legacy version before 7.8). |
|
// |
|
// This service implements the legacy version of index templates as described |
|
// in https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-templates-v1.html. |
|
// |
|
// See e.g. IndexPutIndexTemplate and IndexPutComponentTemplate for the new version(s). |
|
func (c *Client) IndexGetTemplate(names ...string) *IndicesGetTemplateService { |
|
return NewIndicesGetTemplateService(c).Name(names...) |
|
} |
|
|
|
// IndexTemplateExists gets check if an index template exists (v1/legacy version before 7.8). |
|
// |
|
// This service implements the legacy version of index templates as described |
|
// in https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-templates-v1.html. |
|
// |
|
// See e.g. IndexPutIndexTemplate and IndexPutComponentTemplate for the new version(s). |
|
func (c *Client) IndexTemplateExists(name string) *IndicesExistsTemplateService { |
|
return NewIndicesExistsTemplateService(c).Name(name) |
|
} |
|
|
|
// IndexPutTemplate creates or updates an index template (v1/legacy version before 7.8). |
|
// |
|
// This service implements the legacy version of index templates as described |
|
// in https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-templates-v1.html. |
|
// |
|
// See e.g. IndexPutIndexTemplate and IndexPutComponentTemplate for the new version(s). |
|
func (c *Client) IndexPutTemplate(name string) *IndicesPutTemplateService { |
|
return NewIndicesPutTemplateService(c).Name(name) |
|
} |
|
|
|
// IndexDeleteTemplate deletes an index template (v1/legacy version before 7.8). |
|
// |
|
// This service implements the legacy version of index templates as described |
|
// in https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-templates-v1.html. |
|
// |
|
// See e.g. IndexPutIndexTemplate and IndexPutComponentTemplate for the new version(s). |
|
func (c *Client) IndexDeleteTemplate(name string) *IndicesDeleteTemplateService { |
|
return NewIndicesDeleteTemplateService(c).Name(name) |
|
} |
|
|
|
// -- Index templates -- |
|
|
|
// IndexPutIndexTemplate creates or updates an index template (new version after 7.8). |
|
// |
|
// This service implements the new version of index templates as described |
|
// on https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-put-template.html. |
|
// |
|
// See e.g. IndexPutTemplate for the v1/legacy version. |
|
func (c *Client) IndexPutIndexTemplate(name string) *IndicesPutIndexTemplateService { |
|
return NewIndicesPutIndexTemplateService(c).Name(name) |
|
} |
|
|
|
// IndexGetIndexTemplate returns an index template (new version after 7.8). |
|
// |
|
// This service implements the new version of index templates as described |
|
// on https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-get-template.html. |
|
// |
|
// See e.g. IndexPutTemplate for the v1/legacy version. |
|
func (c *Client) IndexGetIndexTemplate(name string) *IndicesGetIndexTemplateService { |
|
return NewIndicesGetIndexTemplateService(c).Name(name) |
|
} |
|
|
|
// IndexDeleteIndexTemplate deletes an index template (new version after 7.8). |
|
// |
|
// This service implements the new version of index templates as described |
|
// on https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-delete-template.html. |
|
// |
|
// See e.g. IndexPutTemplate for the v1/legacy version. |
|
func (c *Client) IndexDeleteIndexTemplate(name string) *IndicesDeleteIndexTemplateService { |
|
return NewIndicesDeleteIndexTemplateService(c).Name(name) |
|
} |
|
|
|
// -- Component templates -- |
|
|
|
// IndexPutComponentTemplate creates or updates a component template (available since 7.8). |
|
// |
|
// This service implements the component templates as described |
|
// on https://www.elastic.co/guide/en/elasticsearch/reference/7.10/indices-component-template.html. |
|
func (c *Client) IndexPutComponentTemplate(name string) *IndicesPutComponentTemplateService { |
|
return NewIndicesPutComponentTemplateService(c).Name(name) |
|
} |
|
|
|
// IndexGetComponentTemplate returns a component template (available since 7.8). |
|
// |
|
// This service implements the component templates as described |
|
// on https://www.elastic.co/guide/en/elasticsearch/reference/7.10/getting-component-templates.html. |
|
func (c *Client) IndexGetComponentTemplate(name string) *IndicesGetComponentTemplateService { |
|
return NewIndicesGetComponentTemplateService(c).Name(name) |
|
} |
|
|
|
// IndexDeleteComponentTemplate deletes a component template (available since 7.8). |
|
// |
|
// This service implements the component templates as described |
|
// on https://www.elastic.co/guide/en/elasticsearch/reference/7.10/indices-delete-component-template.html. |
|
func (c *Client) IndexDeleteComponentTemplate(name string) *IndicesDeleteComponentTemplateService { |
|
return NewIndicesDeleteComponentTemplateService(c).Name(name) |
|
} |
|
|
|
// GetMapping gets a mapping. |
|
func (c *Client) GetMapping() *IndicesGetMappingService { |
|
return NewIndicesGetMappingService(c) |
|
} |
|
|
|
// PutMapping registers a mapping. |
|
func (c *Client) PutMapping() *IndicesPutMappingService { |
|
return NewIndicesPutMappingService(c) |
|
} |
|
|
|
// GetFieldMapping gets mapping for fields. |
|
func (c *Client) GetFieldMapping() *IndicesGetFieldMappingService { |
|
return NewIndicesGetFieldMappingService(c) |
|
} |
|
|
|
// -- cat APIs -- |
|
|
|
// TODO cat master |
|
// TODO cat nodes |
|
// TODO cat pending tasks |
|
// TODO cat plugins |
|
// TODO cat recovery |
|
// TODO cat thread pool |
|
// TODO cat shards |
|
// TODO cat segments |
|
|
|
// CatFielddata returns information about the amount of heap memory currently used by the field data cache. |
|
func (c *Client) CatFielddata() *CatFielddataService { |
|
return NewCatFielddataService(c) |
|
} |
|
|
|
// CatAliases returns information about aliases. |
|
func (c *Client) CatAliases() *CatAliasesService { |
|
return NewCatAliasesService(c) |
|
} |
|
|
|
// CatAllocation returns information about the allocation across nodes. |
|
func (c *Client) CatAllocation() *CatAllocationService { |
|
return NewCatAllocationService(c) |
|
} |
|
|
|
// CatCount returns document counts for indices. |
|
func (c *Client) CatCount() *CatCountService { |
|
return NewCatCountService(c) |
|
} |
|
|
|
// CatHealth returns information about cluster health. |
|
func (c *Client) CatHealth() *CatHealthService { |
|
return NewCatHealthService(c) |
|
} |
|
|
|
// CatIndices returns information about indices. |
|
func (c *Client) CatIndices() *CatIndicesService { |
|
return NewCatIndicesService(c) |
|
} |
|
|
|
// CatShards returns information about shards. |
|
func (c *Client) CatShards() *CatShardsService { |
|
return NewCatShardsService(c) |
|
} |
|
|
|
// -- Ingest APIs -- |
|
|
|
// IngestPutPipeline adds pipelines and updates existing pipelines in |
|
// the cluster. |
|
func (c *Client) IngestPutPipeline(id string) *IngestPutPipelineService { |
|
return NewIngestPutPipelineService(c).Id(id) |
|
} |
|
|
|
// IngestGetPipeline returns pipelines based on ID. |
|
func (c *Client) IngestGetPipeline(ids ...string) *IngestGetPipelineService { |
|
return NewIngestGetPipelineService(c).Id(ids...) |
|
} |
|
|
|
// IngestDeletePipeline deletes a pipeline by ID. |
|
func (c *Client) IngestDeletePipeline(id string) *IngestDeletePipelineService { |
|
return NewIngestDeletePipelineService(c).Id(id) |
|
} |
|
|
|
// IngestSimulatePipeline executes a specific pipeline against the set of |
|
// documents provided in the body of the request. |
|
func (c *Client) IngestSimulatePipeline() *IngestSimulatePipelineService { |
|
return NewIngestSimulatePipelineService(c) |
|
} |
|
|
|
// -- Cluster APIs -- |
|
|
|
// ClusterHealth retrieves the health of the cluster. |
|
func (c *Client) ClusterHealth() *ClusterHealthService { |
|
return NewClusterHealthService(c) |
|
} |
|
|
|
// ClusterReroute allows for manual changes to the allocation of |
|
// individual shards in the cluster. |
|
func (c *Client) ClusterReroute() *ClusterRerouteService { |
|
return NewClusterRerouteService(c) |
|
} |
|
|
|
// ClusterState retrieves the state of the cluster. |
|
func (c *Client) ClusterState() *ClusterStateService { |
|
return NewClusterStateService(c) |
|
} |
|
|
|
// ClusterStats retrieves cluster statistics. |
|
func (c *Client) ClusterStats() *ClusterStatsService { |
|
return NewClusterStatsService(c) |
|
} |
|
|
|
// NodesInfo retrieves one or more or all of the cluster nodes information. |
|
func (c *Client) NodesInfo() *NodesInfoService { |
|
return NewNodesInfoService(c) |
|
} |
|
|
|
// NodesStats retrieves one or more or all of the cluster nodes statistics. |
|
func (c *Client) NodesStats() *NodesStatsService { |
|
return NewNodesStatsService(c) |
|
} |
|
|
|
// TasksCancel cancels tasks running on the specified nodes. |
|
func (c *Client) TasksCancel() *TasksCancelService { |
|
return NewTasksCancelService(c) |
|
} |
|
|
|
// TasksList retrieves the list of tasks running on the specified nodes. |
|
func (c *Client) TasksList() *TasksListService { |
|
return NewTasksListService(c) |
|
} |
|
|
|
// TasksGetTask retrieves a task running on the cluster. |
|
func (c *Client) TasksGetTask() *TasksGetTaskService { |
|
return NewTasksGetTaskService(c) |
|
} |
|
|
|
// TODO Pending cluster tasks |
|
// TODO Cluster Reroute |
|
// TODO Cluster Update Settings |
|
// TODO Nodes Stats |
|
// TODO Nodes hot_threads |
|
|
|
// -- Snapshot and Restore -- |
|
|
|
// SnapshotStatus returns information about the status of a snapshot. |
|
func (c *Client) SnapshotStatus() *SnapshotStatusService { |
|
return NewSnapshotStatusService(c) |
|
} |
|
|
|
// SnapshotCreate creates a snapshot. |
|
func (c *Client) SnapshotCreate(repository string, snapshot string) *SnapshotCreateService { |
|
return NewSnapshotCreateService(c).Repository(repository).Snapshot(snapshot) |
|
} |
|
|
|
// SnapshotCreateRepository creates or updates a snapshot repository. |
|
func (c *Client) SnapshotCreateRepository(repository string) *SnapshotCreateRepositoryService { |
|
return NewSnapshotCreateRepositoryService(c).Repository(repository) |
|
} |
|
|
|
// SnapshotDelete deletes a snapshot in a snapshot repository. |
|
func (c *Client) SnapshotDelete(repository string, snapshot string) *SnapshotDeleteService { |
|
return NewSnapshotDeleteService(c).Repository(repository).Snapshot(snapshot) |
|
} |
|
|
|
// SnapshotDeleteRepository deletes a snapshot repository. |
|
func (c *Client) SnapshotDeleteRepository(repositories ...string) *SnapshotDeleteRepositoryService { |
|
return NewSnapshotDeleteRepositoryService(c).Repository(repositories...) |
|
} |
|
|
|
// SnapshotGetRepository gets a snapshot repository. |
|
func (c *Client) SnapshotGetRepository(repositories ...string) *SnapshotGetRepositoryService { |
|
return NewSnapshotGetRepositoryService(c).Repository(repositories...) |
|
} |
|
|
|
// SnapshotGet lists snapshot for a repository. |
|
func (c *Client) SnapshotGet(repository string) *SnapshotGetService { |
|
return NewSnapshotGetService(c).Repository(repository) |
|
} |
|
|
|
// SnapshotVerifyRepository verifies a snapshot repository. |
|
func (c *Client) SnapshotVerifyRepository(repository string) *SnapshotVerifyRepositoryService { |
|
return NewSnapshotVerifyRepositoryService(c).Repository(repository) |
|
} |
|
|
|
// SnapshotRestore restores the specified indices from a given snapshot |
|
func (c *Client) SnapshotRestore(repository string, snapshot string) *SnapshotRestoreService { |
|
return NewSnapshotRestoreService(c).Repository(repository).Snapshot(snapshot) |
|
} |
|
|
|
// -- Scripting APIs -- |
|
|
|
// GetScript reads a stored script in Elasticsearch. |
|
// Use PutScript for storing a script. |
|
func (c *Client) GetScript() *GetScriptService { |
|
return NewGetScriptService(c) |
|
} |
|
|
|
// PutScript allows saving a stored script in Elasticsearch. |
|
func (c *Client) PutScript() *PutScriptService { |
|
return NewPutScriptService(c) |
|
} |
|
|
|
// DeleteScript allows removing a stored script from Elasticsearch. |
|
func (c *Client) DeleteScript() *DeleteScriptService { |
|
return NewDeleteScriptService(c) |
|
} |
|
|
|
// -- X-Pack General -- |
|
|
|
// XPackInfo gets information on the xpack plugins enabled on the cluster |
|
|
|
func (c *Client) XPackInfo() *XPackInfoService { |
|
return NewXPackInfoService(c) |
|
} |
|
|
|
// -- X-Pack Async Search -- |
|
|
|
// XPackAsyncSearchSubmit starts an asynchronous search. |
|
func (c *Client) XPackAsyncSearchSubmit() *XPackAsyncSearchSubmit { |
|
return NewXPackAsyncSearchSubmit(c) |
|
} |
|
|
|
// XPackAsyncSearchGet retrieves the outcome of an asynchronous search. |
|
func (c *Client) XPackAsyncSearchGet() *XPackAsyncSearchGet { |
|
return NewXPackAsyncSearchGet(c) |
|
} |
|
|
|
// XPackAsyncSearchDelete deletes an asynchronous search. |
|
func (c *Client) XPackAsyncSearchDelete() *XPackAsyncSearchDelete { |
|
return NewXPackAsyncSearchDelete(c) |
|
} |
|
|
|
// -- X-Pack Index Lifecycle Management -- |
|
|
|
// XPackIlmPutLifecycle adds or modifies an ilm policy. |
|
func (c *Client) XPackIlmPutLifecycle() *XPackIlmPutLifecycleService { |
|
return NewXPackIlmPutLifecycleService(c) |
|
} |
|
|
|
// XPackIlmGettLifecycle gets an ilm policy. |
|
func (c *Client) XPackIlmGetLifecycle() *XPackIlmGetLifecycleService { |
|
return NewXPackIlmGetLifecycleService(c) |
|
} |
|
|
|
// XPackIlmDeleteLifecycle deletes an ilm policy. |
|
func (c *Client) XPackIlmDeleteLifecycle() *XPackIlmDeleteLifecycleService { |
|
return NewXPackIlmDeleteLifecycleService(c) |
|
} |
|
|
|
// -- X-Pack Security -- |
|
|
|
// XPackSecurityGetRoleMapping gets a role mapping. |
|
func (c *Client) XPackSecurityGetRoleMapping(roleMappingName string) *XPackSecurityGetRoleMappingService { |
|
return NewXPackSecurityGetRoleMappingService(c).Name(roleMappingName) |
|
} |
|
|
|
// XPackSecurityPutRoleMapping adds a role mapping. |
|
func (c *Client) XPackSecurityPutRoleMapping(roleMappingName string) *XPackSecurityPutRoleMappingService { |
|
return NewXPackSecurityPutRoleMappingService(c).Name(roleMappingName) |
|
} |
|
|
|
// XPackSecurityDeleteRoleMapping deletes a role mapping. |
|
func (c *Client) XPackSecurityDeleteRoleMapping(roleMappingName string) *XPackSecurityDeleteRoleMappingService { |
|
return NewXPackSecurityDeleteRoleMappingService(c).Name(roleMappingName) |
|
} |
|
|
|
// XPackSecurityGetRole gets a role. |
|
func (c *Client) XPackSecurityGetRole(roleName string) *XPackSecurityGetRoleService { |
|
return NewXPackSecurityGetRoleService(c).Name(roleName) |
|
} |
|
|
|
// XPackSecurityPutRole adds a role. |
|
func (c *Client) XPackSecurityPutRole(roleName string) *XPackSecurityPutRoleService { |
|
return NewXPackSecurityPutRoleService(c).Name(roleName) |
|
} |
|
|
|
// XPackSecurityDeleteRole deletes a role. |
|
func (c *Client) XPackSecurityDeleteRole(roleName string) *XPackSecurityDeleteRoleService { |
|
return NewXPackSecurityDeleteRoleService(c).Name(roleName) |
|
} |
|
|
|
// TODO: Clear role cache API |
|
// https://www.elastic.co/guide/en/elasticsearch/reference/7.0/security-api-clear-role-cache.html |
|
|
|
// XPackSecurityChangePassword changes the password of users in the native realm. |
|
func (c *Client) XPackSecurityChangePassword(username string) *XPackSecurityChangePasswordService { |
|
return NewXPackSecurityChangePasswordService(c).Username(username) |
|
} |
|
|
|
// XPackSecurityGetUser gets details about one or more users. |
|
func (c *Client) XPackSecurityGetUser(usernames ...string) *XPackSecurityGetUserService { |
|
return NewXPackSecurityGetUserService(c).Usernames(usernames...) |
|
} |
|
|
|
// XPackSecurityPutUser adds or updates a user. |
|
func (c *Client) XPackSecurityPutUser(username string) *XPackSecurityPutUserService { |
|
return NewXPackSecurityPutUserService(c).Username(username) |
|
} |
|
|
|
// XPackSecurityEnableUser enables a user. |
|
func (c *Client) XPackSecurityEnableUser(username string) *XPackSecurityEnableUserService { |
|
return NewXPackSecurityEnableUserService(c).Username(username) |
|
} |
|
|
|
// XPackSecurityDisableUser disables a user. |
|
func (c *Client) XPackSecurityDisableUser(username string) *XPackSecurityDisableUserService { |
|
return NewXPackSecurityDisableUserService(c).Username(username) |
|
} |
|
|
|
// XPackSecurityDeleteUser deletes a user. |
|
func (c *Client) XPackSecurityDeleteUser(username string) *XPackSecurityDeleteUserService { |
|
return NewXPackSecurityDeleteUserService(c).Username(username) |
|
} |
|
|
|
// -- X-Pack Watcher -- |
|
|
|
// XPackWatchPut adds a watch. |
|
func (c *Client) XPackWatchPut(watchId string) *XPackWatcherPutWatchService { |
|
return NewXPackWatcherPutWatchService(c).Id(watchId) |
|
} |
|
|
|
// XPackWatchGet gets a watch. |
|
func (c *Client) XPackWatchGet(watchId string) *XPackWatcherGetWatchService { |
|
return NewXPackWatcherGetWatchService(c).Id(watchId) |
|
} |
|
|
|
// XPackWatchDelete deletes a watch. |
|
func (c *Client) XPackWatchDelete(watchId string) *XPackWatcherDeleteWatchService { |
|
return NewXPackWatcherDeleteWatchService(c).Id(watchId) |
|
} |
|
|
|
// XPackWatchExecute executes a watch. |
|
func (c *Client) XPackWatchExecute() *XPackWatcherExecuteWatchService { |
|
return NewXPackWatcherExecuteWatchService(c) |
|
} |
|
|
|
// XPackWatchAck acknowledging a watch. |
|
func (c *Client) XPackWatchAck(watchId string) *XPackWatcherAckWatchService { |
|
return NewXPackWatcherAckWatchService(c).WatchId(watchId) |
|
} |
|
|
|
// XPackWatchActivate activates a watch. |
|
func (c *Client) XPackWatchActivate(watchId string) *XPackWatcherActivateWatchService { |
|
return NewXPackWatcherActivateWatchService(c).WatchId(watchId) |
|
} |
|
|
|
// XPackWatchDeactivate deactivates a watch. |
|
func (c *Client) XPackWatchDeactivate(watchId string) *XPackWatcherDeactivateWatchService { |
|
return NewXPackWatcherDeactivateWatchService(c).WatchId(watchId) |
|
} |
|
|
|
// XPackWatchStats returns the current Watcher metrics. |
|
func (c *Client) XPackWatchStats() *XPackWatcherStatsService { |
|
return NewXPackWatcherStatsService(c) |
|
} |
|
|
|
// XPackWatchStart starts a watch. |
|
func (c *Client) XPackWatchStart() *XPackWatcherStartService { |
|
return NewXPackWatcherStartService(c) |
|
} |
|
|
|
// XPackWatchStop stops a watch. |
|
func (c *Client) XPackWatchStop() *XPackWatcherStopService { |
|
return NewXPackWatcherStopService(c) |
|
} |
|
|
|
// -- Helpers and shortcuts -- |
|
|
|
// ElasticsearchVersion returns the version number of Elasticsearch |
|
// running on the given URL. |
|
func (c *Client) ElasticsearchVersion(url string) (string, error) { |
|
res, _, err := c.Ping(url).Do(context.Background()) |
|
if err != nil { |
|
return "", err |
|
} |
|
return res.Version.Number, nil |
|
} |
|
|
|
// IndexNames returns the names of all indices in the cluster. |
|
func (c *Client) IndexNames() ([]string, error) { |
|
res, err := c.IndexGetSettings().Index("_all").Do(context.Background()) |
|
if err != nil { |
|
return nil, err |
|
} |
|
var names []string |
|
for name := range res { |
|
names = append(names, name) |
|
} |
|
return names, nil |
|
} |
|
|
|
// Ping checks if a given node in a cluster exists and (optionally) |
|
// returns some basic information about the Elasticsearch server, |
|
// e.g. the Elasticsearch version number. |
|
// |
|
// Notice that you need to specify a URL here explicitly. |
|
func (c *Client) Ping(url string) *PingService { |
|
return NewPingService(c).URL(url) |
|
} |
|
|
|
// WaitForStatus waits for the cluster to have the given status. |
|
// This is a shortcut method for the ClusterHealth service. |
|
// |
|
// WaitForStatus waits for the specified timeout, e.g. "10s". |
|
// If the cluster will have the given state within the timeout, nil is returned. |
|
// If the request timed out, ErrTimeout is returned. |
|
func (c *Client) WaitForStatus(status string, timeout string) error { |
|
health, err := c.ClusterHealth().WaitForStatus(status).Timeout(timeout).Do(context.Background()) |
|
if err != nil { |
|
return err |
|
} |
|
if health.TimedOut { |
|
return ErrTimeout |
|
} |
|
return nil |
|
} |
|
|
|
// WaitForGreenStatus waits for the cluster to have the "green" status. |
|
// See WaitForStatus for more details. |
|
func (c *Client) WaitForGreenStatus(timeout string) error { |
|
return c.WaitForStatus("green", timeout) |
|
} |
|
|
|
// WaitForYellowStatus waits for the cluster to have the "yellow" status. |
|
// See WaitForStatus for more details. |
|
func (c *Client) WaitForYellowStatus(timeout string) error { |
|
return c.WaitForStatus("yellow", timeout) |
|
}
|
|
|