Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
270 lines
6.6 KiB
270 lines
6.6 KiB
// Copyright 2015 PingCAP, Inc. |
|
// |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package domain |
|
|
|
import ( |
|
"sync" |
|
"sync/atomic" |
|
"time" |
|
|
|
"github.com/juju/errors" |
|
"github.com/ngaut/log" |
|
"github.com/pingcap/tidb/ddl" |
|
"github.com/pingcap/tidb/infoschema" |
|
"github.com/pingcap/tidb/kv" |
|
"github.com/pingcap/tidb/meta" |
|
"github.com/pingcap/tidb/model" |
|
"github.com/pingcap/tidb/sessionctx/variable" |
|
"github.com/pingcap/tidb/store/localstore" |
|
"github.com/pingcap/tidb/terror" |
|
) |
|
|
|
var ddlLastReloadSchemaTS = "ddl_last_reload_schema_ts" |
|
|
|
// Domain represents a storage space. Different domains can use the same database name. |
|
// Multiple domains can be used in parallel without synchronization. |
|
type Domain struct { |
|
store kv.Storage |
|
infoHandle *infoschema.Handle |
|
ddl ddl.DDL |
|
leaseCh chan time.Duration |
|
// nano seconds |
|
lastLeaseTS int64 |
|
m sync.Mutex |
|
} |
|
|
|
func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) { |
|
m := meta.NewMeta(txn) |
|
schemaMetaVersion, err := m.GetSchemaVersion() |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
|
|
info := do.infoHandle.Get() |
|
if info != nil && schemaMetaVersion <= info.SchemaMetaVersion() { |
|
// info may be changed by other txn, so here its version may be bigger than schema version, |
|
// so we don't need to reload. |
|
log.Debugf("[ddl] schema version is still %d, no need reload", schemaMetaVersion) |
|
return nil |
|
} |
|
|
|
schemas, err := m.ListDatabases() |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
|
|
for _, di := range schemas { |
|
if di.State != model.StatePublic { |
|
// schema is not public, can't be used outside. |
|
continue |
|
} |
|
|
|
tables, err1 := m.ListTables(di.ID) |
|
if err1 != nil { |
|
return errors.Trace(err1) |
|
} |
|
|
|
di.Tables = make([]*model.TableInfo, 0, len(tables)) |
|
for _, tbl := range tables { |
|
if tbl.State != model.StatePublic { |
|
// schema is not public, can't be used outsiee. |
|
continue |
|
} |
|
di.Tables = append(di.Tables, tbl) |
|
} |
|
} |
|
|
|
log.Infof("[ddl] loadInfoSchema %d", schemaMetaVersion) |
|
err = do.infoHandle.Set(schemas, schemaMetaVersion) |
|
return errors.Trace(err) |
|
} |
|
|
|
// InfoSchema gets information schema from domain. |
|
func (do *Domain) InfoSchema() infoschema.InfoSchema { |
|
// try reload if possible. |
|
do.tryReload() |
|
return do.infoHandle.Get() |
|
} |
|
|
|
// DDL gets DDL from domain. |
|
func (do *Domain) DDL() ddl.DDL { |
|
return do.ddl |
|
} |
|
|
|
// Store gets KV store from domain. |
|
func (do *Domain) Store() kv.Storage { |
|
return do.store |
|
} |
|
|
|
// SetLease will reset the lease time for online DDL change. |
|
func (do *Domain) SetLease(lease time.Duration) { |
|
do.leaseCh <- lease |
|
|
|
// let ddl to reset lease too. |
|
do.ddl.SetLease(lease) |
|
} |
|
|
|
// Stats returns the domain statistic. |
|
func (do *Domain) Stats() (map[string]interface{}, error) { |
|
m := make(map[string]interface{}) |
|
m[ddlLastReloadSchemaTS] = atomic.LoadInt64(&do.lastLeaseTS) / 1e9 |
|
|
|
return m, nil |
|
} |
|
|
|
// GetScope gets the status variables scope. |
|
func (do *Domain) GetScope(status string) variable.ScopeFlag { |
|
// Now domain status variables scope are all default scope. |
|
return variable.DefaultScopeFlag |
|
} |
|
|
|
func (do *Domain) tryReload() { |
|
// if we don't have update the schema for a long time > lease, we must force reloading it. |
|
// Although we try to reload schema every lease time in a goroutine, sometimes it may not |
|
// run accurately, e.g, the machine has a very high load, running the ticker is delayed. |
|
last := atomic.LoadInt64(&do.lastLeaseTS) |
|
lease := do.ddl.GetLease() |
|
|
|
// if lease is 0, we use the local store, so no need to reload. |
|
if lease > 0 && time.Now().UnixNano()-last > lease.Nanoseconds() { |
|
do.mustReload() |
|
} |
|
} |
|
|
|
const minReloadTimeout = 20 * time.Second |
|
|
|
func (do *Domain) reload() error { |
|
// lock here for only once at same time. |
|
do.m.Lock() |
|
defer do.m.Unlock() |
|
|
|
timeout := do.ddl.GetLease() / 2 |
|
if timeout < minReloadTimeout { |
|
timeout = minReloadTimeout |
|
} |
|
|
|
done := make(chan error, 1) |
|
go func() { |
|
var err error |
|
|
|
for { |
|
err = kv.RunInNewTxn(do.store, false, do.loadInfoSchema) |
|
// if err is db closed, we will return it directly, otherwise, we will |
|
// check reloading again. |
|
if terror.ErrorEqual(err, localstore.ErrDBClosed) { |
|
break |
|
} |
|
|
|
if err != nil { |
|
log.Errorf("[ddl] load schema err %v, retry again", errors.ErrorStack(err)) |
|
// TODO: use a backoff algorithm. |
|
time.Sleep(500 * time.Millisecond) |
|
continue |
|
} |
|
|
|
atomic.StoreInt64(&do.lastLeaseTS, time.Now().UnixNano()) |
|
break |
|
} |
|
|
|
done <- err |
|
}() |
|
|
|
select { |
|
case err := <-done: |
|
return errors.Trace(err) |
|
case <-time.After(timeout): |
|
return errors.New("reload schema timeout") |
|
} |
|
} |
|
|
|
func (do *Domain) mustReload() { |
|
// if reload error, we will terminate whole program to guarantee data safe. |
|
err := do.reload() |
|
if err != nil { |
|
log.Fatalf("[ddl] reload schema err %v", errors.ErrorStack(err)) |
|
} |
|
} |
|
|
|
// check schema every 300 seconds default. |
|
const defaultLoadTime = 300 * time.Second |
|
|
|
func (do *Domain) loadSchemaInLoop(lease time.Duration) { |
|
if lease <= 0 { |
|
lease = defaultLoadTime |
|
} |
|
|
|
ticker := time.NewTicker(lease) |
|
defer ticker.Stop() |
|
|
|
for { |
|
select { |
|
case <-ticker.C: |
|
err := do.reload() |
|
// we may close store in test, but the domain load schema loop is still checking, |
|
// so we can't panic for ErrDBClosed and just return here. |
|
if terror.ErrorEqual(err, localstore.ErrDBClosed) { |
|
return |
|
} else if err != nil { |
|
log.Fatalf("[ddl] reload schema err %v", errors.ErrorStack(err)) |
|
} |
|
case newLease := <-do.leaseCh: |
|
if newLease <= 0 { |
|
newLease = defaultLoadTime |
|
} |
|
|
|
if lease == newLease { |
|
// nothing to do |
|
continue |
|
} |
|
|
|
lease = newLease |
|
// reset ticker too. |
|
ticker.Stop() |
|
ticker = time.NewTicker(lease) |
|
} |
|
} |
|
} |
|
|
|
type ddlCallback struct { |
|
ddl.BaseCallback |
|
do *Domain |
|
} |
|
|
|
func (c *ddlCallback) OnChanged(err error) error { |
|
if err != nil { |
|
return err |
|
} |
|
log.Warnf("[ddl] on DDL change") |
|
|
|
c.do.mustReload() |
|
return nil |
|
} |
|
|
|
// NewDomain creates a new domain. |
|
func NewDomain(store kv.Storage, lease time.Duration) (d *Domain, err error) { |
|
d = &Domain{ |
|
store: store, |
|
leaseCh: make(chan time.Duration, 1), |
|
} |
|
|
|
d.infoHandle = infoschema.NewHandle(d.store) |
|
d.ddl = ddl.NewDDL(d.store, d.infoHandle, &ddlCallback{do: d}, lease) |
|
d.mustReload() |
|
|
|
variable.RegisterStatistics(d) |
|
|
|
go d.loadSchemaInLoop(lease) |
|
|
|
return d, nil |
|
}
|
|
|