Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
528 lines
11 KiB
528 lines
11 KiB
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> |
|
// All rights reserved. |
|
// |
|
// Use of this source code is governed by a BSD-style license that can be |
|
// found in the LICENSE file. |
|
|
|
package leveldb |
|
|
|
import ( |
|
"fmt" |
|
"sync/atomic" |
|
"unsafe" |
|
|
|
"github.com/syndtr/goleveldb/leveldb/iterator" |
|
"github.com/syndtr/goleveldb/leveldb/opt" |
|
"github.com/syndtr/goleveldb/leveldb/util" |
|
) |
|
|
|
type tSet struct { |
|
level int |
|
table *tFile |
|
} |
|
|
|
type version struct { |
|
s *session |
|
|
|
levels []tFiles |
|
|
|
// Level that should be compacted next and its compaction score. |
|
// Score < 1 means compaction is not strictly needed. These fields |
|
// are initialized by computeCompaction() |
|
cLevel int |
|
cScore float64 |
|
|
|
cSeek unsafe.Pointer |
|
|
|
closing bool |
|
ref int |
|
released bool |
|
} |
|
|
|
func newVersion(s *session) *version { |
|
return &version{s: s} |
|
} |
|
|
|
func (v *version) incref() { |
|
if v.released { |
|
panic("already released") |
|
} |
|
|
|
v.ref++ |
|
if v.ref == 1 { |
|
// Incr file ref. |
|
for _, tt := range v.levels { |
|
for _, t := range tt { |
|
v.s.addFileRef(t.fd, 1) |
|
} |
|
} |
|
} |
|
} |
|
|
|
func (v *version) releaseNB() { |
|
v.ref-- |
|
if v.ref > 0 { |
|
return |
|
} else if v.ref < 0 { |
|
panic("negative version ref") |
|
} |
|
|
|
for _, tt := range v.levels { |
|
for _, t := range tt { |
|
if v.s.addFileRef(t.fd, -1) == 0 { |
|
v.s.tops.remove(t) |
|
} |
|
} |
|
} |
|
|
|
v.released = true |
|
} |
|
|
|
func (v *version) release() { |
|
v.s.vmu.Lock() |
|
v.releaseNB() |
|
v.s.vmu.Unlock() |
|
} |
|
|
|
func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int, t *tFile) bool, lf func(level int) bool) { |
|
ukey := ikey.ukey() |
|
|
|
// Aux level. |
|
if aux != nil { |
|
for _, t := range aux { |
|
if t.overlaps(v.s.icmp, ukey, ukey) { |
|
if !f(-1, t) { |
|
return |
|
} |
|
} |
|
} |
|
|
|
if lf != nil && !lf(-1) { |
|
return |
|
} |
|
} |
|
|
|
// Walk tables level-by-level. |
|
for level, tables := range v.levels { |
|
if len(tables) == 0 { |
|
continue |
|
} |
|
|
|
if level == 0 { |
|
// Level-0 files may overlap each other. Find all files that |
|
// overlap ukey. |
|
for _, t := range tables { |
|
if t.overlaps(v.s.icmp, ukey, ukey) { |
|
if !f(level, t) { |
|
return |
|
} |
|
} |
|
} |
|
} else { |
|
if i := tables.searchMax(v.s.icmp, ikey); i < len(tables) { |
|
t := tables[i] |
|
if v.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 { |
|
if !f(level, t) { |
|
return |
|
} |
|
} |
|
} |
|
} |
|
|
|
if lf != nil && !lf(level) { |
|
return |
|
} |
|
} |
|
} |
|
|
|
func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) { |
|
if v.closing { |
|
return nil, false, ErrClosed |
|
} |
|
|
|
ukey := ikey.ukey() |
|
|
|
var ( |
|
tset *tSet |
|
tseek bool |
|
|
|
// Level-0. |
|
zfound bool |
|
zseq uint64 |
|
zkt keyType |
|
zval []byte |
|
) |
|
|
|
err = ErrNotFound |
|
|
|
// Since entries never hop across level, finding key/value |
|
// in smaller level make later levels irrelevant. |
|
v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool { |
|
if level >= 0 && !tseek { |
|
if tset == nil { |
|
tset = &tSet{level, t} |
|
} else { |
|
tseek = true |
|
} |
|
} |
|
|
|
var ( |
|
fikey, fval []byte |
|
ferr error |
|
) |
|
if noValue { |
|
fikey, ferr = v.s.tops.findKey(t, ikey, ro) |
|
} else { |
|
fikey, fval, ferr = v.s.tops.find(t, ikey, ro) |
|
} |
|
|
|
switch ferr { |
|
case nil: |
|
case ErrNotFound: |
|
return true |
|
default: |
|
err = ferr |
|
return false |
|
} |
|
|
|
if fukey, fseq, fkt, fkerr := parseInternalKey(fikey); fkerr == nil { |
|
if v.s.icmp.uCompare(ukey, fukey) == 0 { |
|
// Level <= 0 may overlaps each-other. |
|
if level <= 0 { |
|
if fseq >= zseq { |
|
zfound = true |
|
zseq = fseq |
|
zkt = fkt |
|
zval = fval |
|
} |
|
} else { |
|
switch fkt { |
|
case keyTypeVal: |
|
value = fval |
|
err = nil |
|
case keyTypeDel: |
|
default: |
|
panic("leveldb: invalid internalKey type") |
|
} |
|
return false |
|
} |
|
} |
|
} else { |
|
err = fkerr |
|
return false |
|
} |
|
|
|
return true |
|
}, func(level int) bool { |
|
if zfound { |
|
switch zkt { |
|
case keyTypeVal: |
|
value = zval |
|
err = nil |
|
case keyTypeDel: |
|
default: |
|
panic("leveldb: invalid internalKey type") |
|
} |
|
return false |
|
} |
|
|
|
return true |
|
}) |
|
|
|
if tseek && tset.table.consumeSeek() <= 0 { |
|
tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) |
|
} |
|
|
|
return |
|
} |
|
|
|
func (v *version) sampleSeek(ikey internalKey) (tcomp bool) { |
|
var tset *tSet |
|
|
|
v.walkOverlapping(nil, ikey, func(level int, t *tFile) bool { |
|
if tset == nil { |
|
tset = &tSet{level, t} |
|
return true |
|
} |
|
if tset.table.consumeSeek() <= 0 { |
|
tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) |
|
} |
|
return false |
|
}, nil) |
|
|
|
return |
|
} |
|
|
|
func (v *version) getIterators(slice *util.Range, ro *opt.ReadOptions) (its []iterator.Iterator) { |
|
strict := opt.GetStrict(v.s.o.Options, ro, opt.StrictReader) |
|
for level, tables := range v.levels { |
|
if level == 0 { |
|
// Merge all level zero files together since they may overlap. |
|
for _, t := range tables { |
|
its = append(its, v.s.tops.newIterator(t, slice, ro)) |
|
} |
|
} else if len(tables) != 0 { |
|
its = append(its, iterator.NewIndexedIterator(tables.newIndexIterator(v.s.tops, v.s.icmp, slice, ro), strict)) |
|
} |
|
} |
|
return |
|
} |
|
|
|
func (v *version) newStaging() *versionStaging { |
|
return &versionStaging{base: v} |
|
} |
|
|
|
// Spawn a new version based on this version. |
|
func (v *version) spawn(r *sessionRecord) *version { |
|
staging := v.newStaging() |
|
staging.commit(r) |
|
return staging.finish() |
|
} |
|
|
|
func (v *version) fillRecord(r *sessionRecord) { |
|
for level, tables := range v.levels { |
|
for _, t := range tables { |
|
r.addTableFile(level, t) |
|
} |
|
} |
|
} |
|
|
|
func (v *version) tLen(level int) int { |
|
if level < len(v.levels) { |
|
return len(v.levels[level]) |
|
} |
|
return 0 |
|
} |
|
|
|
func (v *version) offsetOf(ikey internalKey) (n int64, err error) { |
|
for level, tables := range v.levels { |
|
for _, t := range tables { |
|
if v.s.icmp.Compare(t.imax, ikey) <= 0 { |
|
// Entire file is before "ikey", so just add the file size |
|
n += t.size |
|
} else if v.s.icmp.Compare(t.imin, ikey) > 0 { |
|
// Entire file is after "ikey", so ignore |
|
if level > 0 { |
|
// Files other than level 0 are sorted by meta->min, so |
|
// no further files in this level will contain data for |
|
// "ikey". |
|
break |
|
} |
|
} else { |
|
// "ikey" falls in the range for this table. Add the |
|
// approximate offset of "ikey" within the table. |
|
if m, err := v.s.tops.offsetOf(t, ikey); err == nil { |
|
n += m |
|
} else { |
|
return 0, err |
|
} |
|
} |
|
} |
|
} |
|
|
|
return |
|
} |
|
|
|
func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) { |
|
if maxLevel > 0 { |
|
if len(v.levels) == 0 { |
|
return maxLevel |
|
} |
|
if !v.levels[0].overlaps(v.s.icmp, umin, umax, true) { |
|
var overlaps tFiles |
|
for ; level < maxLevel; level++ { |
|
if pLevel := level + 1; pLevel >= len(v.levels) { |
|
return maxLevel |
|
} else if v.levels[pLevel].overlaps(v.s.icmp, umin, umax, false) { |
|
break |
|
} |
|
if gpLevel := level + 2; gpLevel < len(v.levels) { |
|
overlaps = v.levels[gpLevel].getOverlaps(overlaps, v.s.icmp, umin, umax, false) |
|
if overlaps.size() > int64(v.s.o.GetCompactionGPOverlaps(level)) { |
|
break |
|
} |
|
} |
|
} |
|
} |
|
} |
|
return |
|
} |
|
|
|
func (v *version) computeCompaction() { |
|
// Precomputed best level for next compaction |
|
bestLevel := int(-1) |
|
bestScore := float64(-1) |
|
|
|
statFiles := make([]int, len(v.levels)) |
|
statSizes := make([]string, len(v.levels)) |
|
statScore := make([]string, len(v.levels)) |
|
statTotSize := int64(0) |
|
|
|
for level, tables := range v.levels { |
|
var score float64 |
|
size := tables.size() |
|
if level == 0 { |
|
// We treat level-0 specially by bounding the number of files |
|
// instead of number of bytes for two reasons: |
|
// |
|
// (1) With larger write-buffer sizes, it is nice not to do too |
|
// many level-0 compaction. |
|
// |
|
// (2) The files in level-0 are merged on every read and |
|
// therefore we wish to avoid too many files when the individual |
|
// file size is small (perhaps because of a small write-buffer |
|
// setting, or very high compression ratios, or lots of |
|
// overwrites/deletions). |
|
score = float64(len(tables)) / float64(v.s.o.GetCompactionL0Trigger()) |
|
} else { |
|
score = float64(size) / float64(v.s.o.GetCompactionTotalSize(level)) |
|
} |
|
|
|
if score > bestScore { |
|
bestLevel = level |
|
bestScore = score |
|
} |
|
|
|
statFiles[level] = len(tables) |
|
statSizes[level] = shortenb(int(size)) |
|
statScore[level] = fmt.Sprintf("%.2f", score) |
|
statTotSize += size |
|
} |
|
|
|
v.cLevel = bestLevel |
|
v.cScore = bestScore |
|
|
|
v.s.logf("version@stat F·%v S·%s%v Sc·%v", statFiles, shortenb(int(statTotSize)), statSizes, statScore) |
|
} |
|
|
|
func (v *version) needCompaction() bool { |
|
return v.cScore >= 1 || atomic.LoadPointer(&v.cSeek) != nil |
|
} |
|
|
|
type tablesScratch struct { |
|
added map[int64]atRecord |
|
deleted map[int64]struct{} |
|
} |
|
|
|
type versionStaging struct { |
|
base *version |
|
levels []tablesScratch |
|
} |
|
|
|
func (p *versionStaging) getScratch(level int) *tablesScratch { |
|
if level >= len(p.levels) { |
|
newLevels := make([]tablesScratch, level+1) |
|
copy(newLevels, p.levels) |
|
p.levels = newLevels |
|
} |
|
return &(p.levels[level]) |
|
} |
|
|
|
func (p *versionStaging) commit(r *sessionRecord) { |
|
// Deleted tables. |
|
for _, r := range r.deletedTables { |
|
scratch := p.getScratch(r.level) |
|
if r.level < len(p.base.levels) && len(p.base.levels[r.level]) > 0 { |
|
if scratch.deleted == nil { |
|
scratch.deleted = make(map[int64]struct{}) |
|
} |
|
scratch.deleted[r.num] = struct{}{} |
|
} |
|
if scratch.added != nil { |
|
delete(scratch.added, r.num) |
|
} |
|
} |
|
|
|
// New tables. |
|
for _, r := range r.addedTables { |
|
scratch := p.getScratch(r.level) |
|
if scratch.added == nil { |
|
scratch.added = make(map[int64]atRecord) |
|
} |
|
scratch.added[r.num] = r |
|
if scratch.deleted != nil { |
|
delete(scratch.deleted, r.num) |
|
} |
|
} |
|
} |
|
|
|
func (p *versionStaging) finish() *version { |
|
// Build new version. |
|
nv := newVersion(p.base.s) |
|
numLevel := len(p.levels) |
|
if len(p.base.levels) > numLevel { |
|
numLevel = len(p.base.levels) |
|
} |
|
nv.levels = make([]tFiles, numLevel) |
|
for level := 0; level < numLevel; level++ { |
|
var baseTabels tFiles |
|
if level < len(p.base.levels) { |
|
baseTabels = p.base.levels[level] |
|
} |
|
|
|
if level < len(p.levels) { |
|
scratch := p.levels[level] |
|
|
|
var nt tFiles |
|
// Prealloc list if possible. |
|
if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 { |
|
nt = make(tFiles, 0, n) |
|
} |
|
|
|
// Base tables. |
|
for _, t := range baseTabels { |
|
if _, ok := scratch.deleted[t.fd.Num]; ok { |
|
continue |
|
} |
|
if _, ok := scratch.added[t.fd.Num]; ok { |
|
continue |
|
} |
|
nt = append(nt, t) |
|
} |
|
|
|
// New tables. |
|
for _, r := range scratch.added { |
|
nt = append(nt, tableFileFromRecord(r)) |
|
} |
|
|
|
if len(nt) != 0 { |
|
// Sort tables. |
|
if level == 0 { |
|
nt.sortByNum() |
|
} else { |
|
nt.sortByKey(p.base.s.icmp) |
|
} |
|
|
|
nv.levels[level] = nt |
|
} |
|
} else { |
|
nv.levels[level] = baseTabels |
|
} |
|
} |
|
|
|
// Trim levels. |
|
n := len(nv.levels) |
|
for ; n > 0 && nv.levels[n-1] == nil; n-- { |
|
} |
|
nv.levels = nv.levels[:n] |
|
|
|
// Compute compaction score for new version. |
|
nv.computeCompaction() |
|
|
|
return nv |
|
} |
|
|
|
type versionReleaser struct { |
|
v *version |
|
once bool |
|
} |
|
|
|
func (vr *versionReleaser) Release() { |
|
v := vr.v |
|
v.s.vmu.Lock() |
|
if !vr.once { |
|
v.releaseNB() |
|
vr.once = true |
|
} |
|
v.s.vmu.Unlock() |
|
}
|
|
|