Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
796 lines
22 KiB
796 lines
22 KiB
package themis |
|
|
|
import ( |
|
"fmt" |
|
"sync" |
|
"time" |
|
|
|
"github.com/juju/errors" |
|
"github.com/ngaut/log" |
|
"github.com/pingcap/go-hbase" |
|
"github.com/pingcap/go-themis/oracle" |
|
) |
|
|
|
type TxnConfig struct { |
|
ConcurrentPrewriteAndCommit bool |
|
WaitSecondaryCommit bool |
|
TTLInMs uint64 |
|
MaxRowsInOneTxn int |
|
// options below is for debugging and testing |
|
brokenPrewriteSecondaryTest bool |
|
brokenPrewriteSecondaryAndRollbackTest bool |
|
brokenCommitPrimaryTest bool |
|
brokenCommitSecondaryTest bool |
|
} |
|
|
|
var defaultTxnConf = TxnConfig{ |
|
ConcurrentPrewriteAndCommit: true, |
|
WaitSecondaryCommit: false, |
|
MaxRowsInOneTxn: 50000, |
|
TTLInMs: 5 * 1000, // default txn TTL: 5s |
|
brokenPrewriteSecondaryTest: false, |
|
brokenPrewriteSecondaryAndRollbackTest: false, |
|
brokenCommitPrimaryTest: false, |
|
brokenCommitSecondaryTest: false, |
|
} |
|
|
|
type themisTxn struct { |
|
client hbase.HBaseClient |
|
rpc *themisRPC |
|
lockCleaner LockManager |
|
oracle oracle.Oracle |
|
mutationCache *columnMutationCache |
|
startTs uint64 |
|
commitTs uint64 |
|
primaryRow *rowMutation |
|
primary *hbase.ColumnCoordinate |
|
secondaryRows []*rowMutation |
|
secondary []*hbase.ColumnCoordinate |
|
primaryRowOffset int |
|
singleRowTxn bool |
|
secondaryLockBytes []byte |
|
conf TxnConfig |
|
hooks *txnHook |
|
} |
|
|
|
var _ Txn = (*themisTxn)(nil) |
|
|
|
var ( |
|
// ErrSimulated is used when maybe rollback occurs error too. |
|
ErrSimulated = errors.New("simulated error") |
|
maxCleanLockRetryCount = 30 |
|
pauseTime = 300 * time.Millisecond |
|
) |
|
|
|
func NewTxn(c hbase.HBaseClient, oracle oracle.Oracle) (Txn, error) { |
|
return NewTxnWithConf(c, defaultTxnConf, oracle) |
|
} |
|
|
|
func NewTxnWithConf(c hbase.HBaseClient, conf TxnConfig, oracle oracle.Oracle) (Txn, error) { |
|
var err error |
|
txn := &themisTxn{ |
|
client: c, |
|
mutationCache: newColumnMutationCache(), |
|
oracle: oracle, |
|
primaryRowOffset: -1, |
|
conf: conf, |
|
rpc: newThemisRPC(c, oracle, conf), |
|
hooks: newHook(), |
|
} |
|
txn.startTs, err = txn.oracle.GetTimestamp() |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
txn.lockCleaner = newThemisLockManager(txn.rpc, c) |
|
return txn, nil |
|
} |
|
|
|
func (txn *themisTxn) setHook(hooks *txnHook) { |
|
txn.hooks = hooks |
|
} |
|
|
|
func (txn *themisTxn) Gets(tbl string, gets []*hbase.Get) ([]*hbase.ResultRow, error) { |
|
results, err := txn.rpc.themisBatchGet([]byte(tbl), gets, txn.startTs, false) |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
var ret []*hbase.ResultRow |
|
hasLock := false |
|
for _, r := range results { |
|
// if this row is locked, try clean lock and get again |
|
if isLockResult(r) { |
|
hasLock = true |
|
err = txn.constructLockAndClean([]byte(tbl), r.SortedColumns) |
|
if err != nil { |
|
// TODO if it's a conflict error, it means this lock |
|
// isn't expired, maybe we can retry or return partial results. |
|
return nil, errors.Trace(err) |
|
} |
|
} |
|
// it's OK, because themisBatchGet doesn't return nil value. |
|
ret = append(ret, r) |
|
} |
|
if hasLock { |
|
// after we cleaned locks, try to get again. |
|
ret, err = txn.rpc.themisBatchGet([]byte(tbl), gets, txn.startTs, true) |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
} |
|
return ret, nil |
|
} |
|
|
|
func (txn *themisTxn) Get(tbl string, g *hbase.Get) (*hbase.ResultRow, error) { |
|
r, err := txn.rpc.themisGet([]byte(tbl), g, txn.startTs, false) |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
// contain locks, try to clean and get again |
|
if r != nil && isLockResult(r) { |
|
r, err = txn.tryToCleanLockAndGetAgain([]byte(tbl), g, r.SortedColumns) |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
} |
|
return r, nil |
|
} |
|
|
|
func (txn *themisTxn) Put(tbl string, p *hbase.Put) { |
|
// add mutation to buffer |
|
for _, e := range getEntriesFromPut(p) { |
|
txn.mutationCache.addMutation([]byte(tbl), p.Row, e.Column, e.typ, e.value, false) |
|
} |
|
} |
|
|
|
func (txn *themisTxn) Delete(tbl string, p *hbase.Delete) error { |
|
entries, err := getEntriesFromDel(p) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
for _, e := range entries { |
|
txn.mutationCache.addMutation([]byte(tbl), p.Row, e.Column, e.typ, e.value, false) |
|
} |
|
return nil |
|
} |
|
|
|
func (txn *themisTxn) Commit() error { |
|
if txn.mutationCache.getMutationCount() == 0 { |
|
return nil |
|
} |
|
if txn.mutationCache.getRowCount() > txn.conf.MaxRowsInOneTxn { |
|
return ErrTooManyRows |
|
} |
|
|
|
txn.selectPrimaryAndSecondaries() |
|
err := txn.prewritePrimary() |
|
if err != nil { |
|
// no need to check wrong region here, hbase client will retry when |
|
// occurs single row NotInRegion error. |
|
log.Error(errors.ErrorStack(err)) |
|
// it's safe to retry, because this transaction is not committed. |
|
return ErrRetryable |
|
} |
|
|
|
err = txn.prewriteSecondary() |
|
if err != nil { |
|
if isWrongRegionErr(err) { |
|
log.Warn("region info outdated") |
|
// reset hbase client buffered region info |
|
txn.client.CleanAllRegionCache() |
|
} |
|
return ErrRetryable |
|
} |
|
|
|
txn.commitTs, err = txn.oracle.GetTimestamp() |
|
if err != nil { |
|
log.Error(errors.ErrorStack(err)) |
|
return ErrRetryable |
|
} |
|
err = txn.commitPrimary() |
|
if err != nil { |
|
// commit primary error, rollback |
|
log.Error("commit primary row failed", txn.startTs, err) |
|
txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow) |
|
txn.rollbackSecondaryRow(len(txn.secondaryRows) - 1) |
|
return ErrRetryable |
|
} |
|
txn.commitSecondary() |
|
log.Debug("themis txn commit successfully", txn.startTs, txn.commitTs) |
|
return nil |
|
} |
|
|
|
func (txn *themisTxn) commitSecondary() { |
|
if bypass, _, _ := txn.hooks.beforeCommitSecondary(txn, nil); !bypass { |
|
return |
|
} |
|
if txn.conf.brokenCommitSecondaryTest { |
|
txn.brokenCommitSecondary() |
|
return |
|
} |
|
if txn.conf.ConcurrentPrewriteAndCommit { |
|
txn.batchCommitSecondary(txn.conf.WaitSecondaryCommit) |
|
} else { |
|
txn.commitSecondarySync() |
|
} |
|
} |
|
|
|
func (txn *themisTxn) commitSecondarySync() { |
|
for _, r := range txn.secondaryRows { |
|
err := txn.rpc.commitSecondaryRow(r.tbl, r.row, r.mutationList(false), txn.startTs, txn.commitTs) |
|
if err != nil { |
|
// fail of secondary commit will not stop the commits of next |
|
// secondaries |
|
log.Warning(err) |
|
} |
|
} |
|
} |
|
|
|
func (txn *themisTxn) batchCommitSecondary(wait bool) error { |
|
//will batch commit all rows in a region |
|
rsRowMap, err := txn.groupByRegion() |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
|
|
wg := sync.WaitGroup{} |
|
for _, regionRowMap := range rsRowMap { |
|
wg.Add(1) |
|
_, firstRowM := getFirstEntity(regionRowMap) |
|
go func(cli *themisRPC, tbl string, rMap map[string]*rowMutation, startTs, commitTs uint64) { |
|
defer wg.Done() |
|
err := cli.batchCommitSecondaryRows([]byte(tbl), rMap, startTs, commitTs) |
|
if err != nil { |
|
// fail of secondary commit will not stop the commits of next |
|
// secondaries |
|
if isWrongRegionErr(err) { |
|
txn.client.CleanAllRegionCache() |
|
log.Warn("region info outdated when committing secondary rows, don't panic") |
|
} |
|
} |
|
}(txn.rpc, string(firstRowM.tbl), regionRowMap, txn.startTs, txn.commitTs) |
|
} |
|
if wait { |
|
wg.Wait() |
|
} |
|
return nil |
|
} |
|
|
|
func (txn *themisTxn) groupByRegion() (map[string]map[string]*rowMutation, error) { |
|
rsRowMap := make(map[string]map[string]*rowMutation) |
|
for _, rm := range txn.secondaryRows { |
|
region, err := txn.client.LocateRegion(rm.tbl, rm.row, true) |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
key := getBatchGroupKey(region, string(rm.tbl)) |
|
if _, exists := rsRowMap[key]; !exists { |
|
rsRowMap[key] = map[string]*rowMutation{} |
|
} |
|
rsRowMap[key][string(rm.row)] = rm |
|
} |
|
return rsRowMap, nil |
|
} |
|
|
|
func (txn *themisTxn) commitPrimary() error { |
|
if txn.conf.brokenCommitPrimaryTest { |
|
return txn.brokenCommitPrimary() |
|
} |
|
return txn.rpc.commitRow(txn.primary.Table, txn.primary.Row, |
|
txn.primaryRow.mutationList(false), |
|
txn.startTs, txn.commitTs, txn.primaryRowOffset) |
|
} |
|
|
|
func (txn *themisTxn) selectPrimaryAndSecondaries() { |
|
txn.secondary = nil |
|
for tblName, rowMutations := range txn.mutationCache.mutations { |
|
for _, rowMutation := range rowMutations { |
|
row := rowMutation.row |
|
findPrimaryInRow := false |
|
for i, mutation := range rowMutation.mutationList(true) { |
|
colcord := hbase.NewColumnCoordinate([]byte(tblName), row, mutation.Family, mutation.Qual) |
|
// set the first column as primary if primary is not set by user |
|
if txn.primaryRowOffset == -1 && |
|
(txn.primary == nil || txn.primary.Equal(colcord)) { |
|
txn.primary = colcord |
|
txn.primaryRowOffset = i |
|
txn.primaryRow = rowMutation |
|
findPrimaryInRow = true |
|
} else { |
|
txn.secondary = append(txn.secondary, colcord) |
|
} |
|
} |
|
if !findPrimaryInRow { |
|
txn.secondaryRows = append(txn.secondaryRows, rowMutation) |
|
} |
|
} |
|
} |
|
|
|
// hook for test |
|
if bypass, _, _ := txn.hooks.afterChoosePrimaryAndSecondary(txn, nil); !bypass { |
|
return |
|
} |
|
|
|
if len(txn.secondaryRows) == 0 { |
|
txn.singleRowTxn = true |
|
} |
|
// construct secondary lock |
|
secondaryLock := txn.constructSecondaryLock(hbase.TypePut) |
|
if secondaryLock != nil { |
|
txn.secondaryLockBytes = secondaryLock.Encode() |
|
} else { |
|
txn.secondaryLockBytes = nil |
|
} |
|
} |
|
|
|
func (txn *themisTxn) constructSecondaryLock(typ hbase.Type) *themisSecondaryLock { |
|
if txn.primaryRow.getSize() <= 1 && len(txn.secondaryRows) == 0 { |
|
return nil |
|
} |
|
l := newThemisSecondaryLock() |
|
l.primaryCoordinate = txn.primary |
|
l.ts = txn.startTs |
|
// TODO set client addr |
|
return l |
|
} |
|
|
|
func (txn *themisTxn) constructPrimaryLock() *themisPrimaryLock { |
|
l := newThemisPrimaryLock() |
|
l.typ = txn.primaryRow.getType(txn.primary.Column) |
|
l.ts = txn.startTs |
|
for _, c := range txn.secondary { |
|
l.addSecondary(c, txn.mutationCache.getMutation(c).typ) |
|
} |
|
return l |
|
} |
|
|
|
func (txn *themisTxn) constructLockAndClean(tbl []byte, lockKvs []*hbase.Kv) error { |
|
locks, err := getLocksFromResults([]byte(tbl), lockKvs, txn.rpc) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
for _, lock := range locks { |
|
err := txn.cleanLockWithRetry(lock) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (txn *themisTxn) tryToCleanLockAndGetAgain(tbl []byte, g *hbase.Get, lockKvs []*hbase.Kv) (*hbase.ResultRow, error) { |
|
// try to clean locks |
|
err := txn.constructLockAndClean(tbl, lockKvs) |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
// get again, ignore lock |
|
r, err := txn.rpc.themisGet([]byte(tbl), g, txn.startTs, true) |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
return r, nil |
|
} |
|
|
|
func (txn *themisTxn) commitSecondaryAndCleanLock(lock *themisSecondaryLock, commitTs uint64) error { |
|
cc := lock.Coordinate() |
|
mutation := &columnMutation{ |
|
Column: &cc.Column, |
|
mutationValuePair: &mutationValuePair{ |
|
typ: lock.typ, |
|
}, |
|
} |
|
err := txn.rpc.commitSecondaryRow(cc.Table, cc.Row, |
|
[]*columnMutation{mutation}, lock.Timestamp(), commitTs) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
return nil |
|
} |
|
|
|
func (txn *themisTxn) cleanLockWithRetry(lock Lock) error { |
|
for i := 0; i < maxCleanLockRetryCount; i++ { |
|
if exists, err := txn.lockCleaner.IsLockExists(lock.Coordinate(), 0, lock.Timestamp()); err != nil || !exists { |
|
return errors.Trace(err) |
|
} |
|
log.Warnf("lock exists txn: %v lock-txn: %v row: %q", txn.startTs, lock.Timestamp(), lock.Coordinate().Row) |
|
// try clean lock |
|
err := txn.tryToCleanLock(lock) |
|
if errorEqual(err, ErrLockNotExpired) { |
|
log.Warn("sleep a while, and retry clean lock", txn.startTs) |
|
// TODO(dongxu) use cleverer retry sleep time interval |
|
time.Sleep(pauseTime) |
|
continue |
|
} else if err != nil { |
|
return errors.Trace(err) |
|
} |
|
// lock cleaned successfully |
|
return nil |
|
} |
|
return ErrCleanLockFailed |
|
} |
|
|
|
func (txn *themisTxn) tryToCleanLock(lock Lock) error { |
|
// if it's secondary lock, first we'll check if its primary lock has been released. |
|
if lock.Role() == RoleSecondary { |
|
// get primary lock |
|
pl := lock.Primary() |
|
// check primary lock is exists |
|
exists, err := txn.lockCleaner.IsLockExists(pl.Coordinate(), 0, pl.Timestamp()) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
if !exists { |
|
// primary row is committed, commit this row |
|
cc := pl.Coordinate() |
|
commitTs, err := txn.lockCleaner.GetCommitTimestamp(cc, pl.Timestamp()) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
if commitTs > 0 { |
|
// if this transction has been committed |
|
log.Info("txn has been committed, ts:", commitTs, "prewriteTs:", pl.Timestamp()) |
|
// commit secondary rows |
|
err := txn.commitSecondaryAndCleanLock(lock.(*themisSecondaryLock), commitTs) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
return nil |
|
} |
|
} |
|
} |
|
expired, err := txn.rpc.checkAndSetLockIsExpired(lock) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
// only clean expired lock |
|
if expired { |
|
// try to clean primary lock |
|
pl := lock.Primary() |
|
commitTs, cleanedLock, err := txn.lockCleaner.CleanLock(pl.Coordinate(), pl.Timestamp()) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
if cleanedLock != nil { |
|
pl = cleanedLock |
|
} |
|
log.Info("try clean secondary locks", pl.Timestamp()) |
|
// clean secondary locks |
|
// erase lock and data if commitTs is 0; otherwise, commit it. |
|
for k, v := range pl.(*themisPrimaryLock).secondaries { |
|
cc := &hbase.ColumnCoordinate{} |
|
if err = cc.ParseFromString(k); err != nil { |
|
return errors.Trace(err) |
|
} |
|
if commitTs == 0 { |
|
// commitTs == 0, means clean primary lock successfully |
|
// expire trx havn't committed yet, we must delete lock and |
|
// dirty data |
|
err = txn.lockCleaner.EraseLockAndData(cc, pl.Timestamp()) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
} else { |
|
// primary row is committed, so we must commit other |
|
// secondary rows |
|
mutation := &columnMutation{ |
|
Column: &cc.Column, |
|
mutationValuePair: &mutationValuePair{ |
|
typ: v, |
|
}, |
|
} |
|
err = txn.rpc.commitSecondaryRow(cc.Table, cc.Row, |
|
[]*columnMutation{mutation}, pl.Timestamp(), commitTs) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
} |
|
} |
|
} else { |
|
return ErrLockNotExpired |
|
} |
|
return nil |
|
} |
|
|
|
func (txn *themisTxn) batchPrewriteSecondaryRowsWithLockClean(tbl []byte, rowMs map[string]*rowMutation) error { |
|
locks, err := txn.batchPrewriteSecondaryRows(tbl, rowMs) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
|
|
// lock clean |
|
if locks != nil && len(locks) > 0 { |
|
// hook for test |
|
if bypass, _, err := txn.hooks.onSecondaryOccursLock(txn, locks); !bypass { |
|
return errors.Trace(err) |
|
} |
|
// try one more time after clean lock successfully |
|
for _, lock := range locks { |
|
err = txn.cleanLockWithRetry(lock) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
|
|
// prewrite all secondary rows |
|
locks, err = txn.batchPrewriteSecondaryRows(tbl, rowMs) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
if len(locks) > 0 { |
|
for _, l := range locks { |
|
log.Errorf("can't clean lock, column:%q; conflict lock: %+v, lock ts: %d", l.Coordinate(), l, l.Timestamp()) |
|
} |
|
return ErrRetryable |
|
} |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (txn *themisTxn) prewriteRowWithLockClean(tbl []byte, mutation *rowMutation, containPrimary bool) error { |
|
lock, err := txn.prewriteRow(tbl, mutation, containPrimary) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
// lock clean |
|
if lock != nil { |
|
// hook for test |
|
if bypass, _, err := txn.hooks.beforePrewriteLockClean(txn, lock); !bypass { |
|
return errors.Trace(err) |
|
} |
|
err = txn.cleanLockWithRetry(lock) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
// try one more time after clean lock successfully |
|
lock, err = txn.prewriteRow(tbl, mutation, containPrimary) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
if lock != nil { |
|
log.Errorf("can't clean lock, column:%q; conflict lock: %+v, lock ts: %d", lock.Coordinate(), lock, lock.Timestamp()) |
|
return ErrRetryable |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (txn *themisTxn) batchPrewriteSecondaryRows(tbl []byte, rowMs map[string]*rowMutation) (map[string]Lock, error) { |
|
return txn.rpc.batchPrewriteSecondaryRows(tbl, rowMs, txn.startTs, txn.secondaryLockBytes) |
|
} |
|
|
|
func (txn *themisTxn) prewriteRow(tbl []byte, mutation *rowMutation, containPrimary bool) (Lock, error) { |
|
// hook for test |
|
if bypass, ret, err := txn.hooks.onPrewriteRow(txn, []interface{}{mutation, containPrimary}); !bypass { |
|
return ret.(Lock), errors.Trace(err) |
|
} |
|
if containPrimary { |
|
// try to get lock |
|
return txn.rpc.prewriteRow(tbl, mutation.row, |
|
mutation.mutationList(true), |
|
txn.startTs, |
|
txn.constructPrimaryLock().Encode(), |
|
txn.secondaryLockBytes, txn.primaryRowOffset) |
|
} |
|
return txn.rpc.prewriteSecondaryRow(tbl, mutation.row, |
|
mutation.mutationList(true), |
|
txn.startTs, |
|
txn.secondaryLockBytes) |
|
} |
|
|
|
func (txn *themisTxn) prewritePrimary() error { |
|
// hook for test |
|
if bypass, _, err := txn.hooks.beforePrewritePrimary(txn, nil); !bypass { |
|
return err |
|
} |
|
err := txn.prewriteRowWithLockClean(txn.primary.Table, txn.primaryRow, true) |
|
if err != nil { |
|
log.Debugf("prewrite primary %v %q failed: %v", txn.startTs, txn.primaryRow.row, err.Error()) |
|
return errors.Trace(err) |
|
} |
|
log.Debugf("prewrite primary %v %q successfully", txn.startTs, txn.primaryRow.row) |
|
return nil |
|
} |
|
|
|
func (txn *themisTxn) prewriteSecondary() error { |
|
// hook for test |
|
if bypass, _, err := txn.hooks.beforePrewriteSecondary(txn, nil); !bypass { |
|
return err |
|
} |
|
if txn.conf.brokenPrewriteSecondaryTest { |
|
return txn.brokenPrewriteSecondary() |
|
} |
|
if txn.conf.ConcurrentPrewriteAndCommit { |
|
return txn.batchPrewriteSecondaries() |
|
} |
|
return txn.prewriteSecondarySync() |
|
} |
|
|
|
func (txn *themisTxn) prewriteSecondarySync() error { |
|
for i, mu := range txn.secondaryRows { |
|
err := txn.prewriteRowWithLockClean(mu.tbl, mu, false) |
|
if err != nil { |
|
// rollback |
|
txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow) |
|
txn.rollbackSecondaryRow(i) |
|
return errors.Trace(err) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
// just for test |
|
func (txn *themisTxn) brokenCommitPrimary() error { |
|
// do nothing |
|
log.Warn("Simulating primary commit failed") |
|
return nil |
|
} |
|
|
|
// just for test |
|
func (txn *themisTxn) brokenCommitSecondary() { |
|
// do nothing |
|
log.Warn("Simulating secondary commit failed") |
|
} |
|
|
|
func (txn *themisTxn) brokenPrewriteSecondary() error { |
|
log.Warn("Simulating prewrite secondary failed") |
|
for i, rm := range txn.secondaryRows { |
|
if i == len(txn.secondary)-1 { |
|
if !txn.conf.brokenPrewriteSecondaryAndRollbackTest { |
|
// simulating prewrite failed, need rollback |
|
txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow) |
|
txn.rollbackSecondaryRow(i) |
|
} |
|
// maybe rollback occurs error too |
|
return ErrSimulated |
|
} |
|
txn.prewriteRowWithLockClean(rm.tbl, rm, false) |
|
} |
|
return nil |
|
} |
|
|
|
func (txn *themisTxn) batchPrewriteSecondaries() error { |
|
wg := sync.WaitGroup{} |
|
//will batch prewrite all rows in a region |
|
rsRowMap, err := txn.groupByRegion() |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
|
|
errChan := make(chan error, len(rsRowMap)) |
|
defer close(errChan) |
|
successChan := make(chan map[string]*rowMutation, len(rsRowMap)) |
|
defer close(successChan) |
|
|
|
for _, regionRowMap := range rsRowMap { |
|
wg.Add(1) |
|
_, firstRowM := getFirstEntity(regionRowMap) |
|
go func(tbl []byte, rMap map[string]*rowMutation) { |
|
defer wg.Done() |
|
err := txn.batchPrewriteSecondaryRowsWithLockClean(tbl, rMap) |
|
if err != nil { |
|
errChan <- err |
|
} else { |
|
successChan <- rMap |
|
} |
|
}(firstRowM.tbl, regionRowMap) |
|
} |
|
wg.Wait() |
|
|
|
if len(errChan) != 0 { |
|
// occur error, clean success prewrite mutations |
|
log.Warnf("batch prewrite secondary rows error, rolling back %d %d", len(successChan), txn.startTs) |
|
txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow) |
|
L: |
|
for { |
|
select { |
|
case succMutMap := <-successChan: |
|
{ |
|
for _, rowMut := range succMutMap { |
|
txn.rollbackRow(rowMut.tbl, rowMut) |
|
} |
|
} |
|
default: |
|
break L |
|
} |
|
} |
|
|
|
err := <-errChan |
|
if err != nil { |
|
log.Error("batch prewrite secondary rows error, txn:", txn.startTs, err) |
|
} |
|
return errors.Trace(err) |
|
} |
|
return nil |
|
} |
|
|
|
func getFirstEntity(rowMap map[string]*rowMutation) (string, *rowMutation) { |
|
for row, rowM := range rowMap { |
|
return row, rowM |
|
} |
|
return "", nil |
|
} |
|
|
|
func getBatchGroupKey(rInfo *hbase.RegionInfo, tblName string) string { |
|
return rInfo.Server + "_" + rInfo.Name |
|
} |
|
|
|
func (txn *themisTxn) rollbackRow(tbl []byte, mutation *rowMutation) error { |
|
l := fmt.Sprintf("\nrolling back %q %d {\n", mutation.row, txn.startTs) |
|
for _, v := range mutation.getColumns() { |
|
l += fmt.Sprintf("\t%s:%s\n", string(v.Family), string(v.Qual)) |
|
} |
|
l += "}\n" |
|
log.Warn(l) |
|
for _, col := range mutation.getColumns() { |
|
cc := &hbase.ColumnCoordinate{ |
|
Table: tbl, |
|
Row: mutation.row, |
|
Column: col, |
|
} |
|
err := txn.lockCleaner.EraseLockAndData(cc, txn.startTs) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (txn *themisTxn) rollbackSecondaryRow(successIndex int) error { |
|
for i := successIndex; i >= 0; i-- { |
|
r := txn.secondaryRows[i] |
|
err := txn.rollbackRow(r.tbl, r) |
|
if err != nil { |
|
return errors.Trace(err) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (txn *themisTxn) GetScanner(tbl []byte, startKey, endKey []byte, batchSize int) *ThemisScanner { |
|
scanner := newThemisScanner(tbl, txn, batchSize, txn.client) |
|
if startKey != nil { |
|
scanner.setStartRow(startKey) |
|
} |
|
if endKey != nil { |
|
scanner.setStopRow(endKey) |
|
} |
|
return scanner |
|
} |
|
|
|
func (txn *themisTxn) Release() { |
|
txn.primary = nil |
|
txn.primaryRow = nil |
|
txn.secondary = nil |
|
txn.secondaryRows = nil |
|
txn.startTs = 0 |
|
txn.commitTs = 0 |
|
} |
|
|
|
func (txn *themisTxn) String() string { |
|
return fmt.Sprintf("%d", txn.startTs) |
|
} |
|
|
|
func (txn *themisTxn) GetCommitTS() uint64 { |
|
return txn.commitTs |
|
} |
|
|
|
func (txn *themisTxn) GetStartTS() uint64 { |
|
return txn.startTs |
|
} |
|
|
|
func (txn *themisTxn) LockRow(tbl string, rowkey []byte) error { |
|
g := hbase.NewGet(rowkey) |
|
r, err := txn.Get(tbl, g) |
|
if err != nil { |
|
log.Warnf("get row error, table:%s, row:%q, error:%v", tbl, rowkey, err) |
|
return errors.Trace(err) |
|
} |
|
if r == nil { |
|
log.Warnf("has not data to lock, table:%s, row:%q", tbl, rowkey) |
|
return nil |
|
} |
|
for _, v := range r.Columns { |
|
txn.mutationCache.addMutation([]byte(tbl), rowkey, &v.Column, hbase.TypeMinimum, nil, true) |
|
} |
|
return nil |
|
}
|
|
|