Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
146 lines
3.7 KiB
146 lines
3.7 KiB
package rupture |
|
|
|
import ( |
|
"fmt" |
|
"hash/fnv" |
|
"path/filepath" |
|
"strconv" |
|
|
|
"github.com/blevesearch/bleve" |
|
"github.com/blevesearch/bleve/document" |
|
"github.com/blevesearch/bleve/mapping" |
|
) |
|
|
|
// ShardedIndex an index that is built onto of multiple underlying bleve |
|
// indices (i.e. shards). Similar to bleve's index aliases, some methods may |
|
// not be supported. |
|
type ShardedIndex interface { |
|
bleve.Index |
|
shards() []bleve.Index |
|
} |
|
|
|
// a type alias for bleve.Index, so that the anonymous field of |
|
// shardedIndex does not conflict with the Index(..) method. |
|
type bleveIndex bleve.Index |
|
|
|
type shardedIndex struct { |
|
bleveIndex |
|
indices []bleve.Index |
|
} |
|
|
|
func hash(id string, n int) uint64 { |
|
fnvHash := fnv.New64() |
|
fnvHash.Write([]byte(id)) |
|
return fnvHash.Sum64() % uint64(n) |
|
} |
|
|
|
func childIndexerPath(rootPath string, i int) string { |
|
return filepath.Join(rootPath, strconv.Itoa(i)) |
|
} |
|
|
|
// NewShardedIndex creates a sharded index at the specified path, with the |
|
// specified mapping and number of shards. |
|
func NewShardedIndex(path string, mapping mapping.IndexMapping, numShards int) (ShardedIndex, error) { |
|
if numShards <= 0 { |
|
return nil, fmt.Errorf("Invalid number of shards: %d", numShards) |
|
} |
|
err := writeJSON(shardedIndexMetadataPath(path), &shardedIndexMetadata{NumShards: numShards}) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
s := &shardedIndex{ |
|
indices: make([]bleve.Index, numShards), |
|
} |
|
for i := 0; i < numShards; i++ { |
|
s.indices[i], err = bleve.New(childIndexerPath(path, i), mapping) |
|
if err != nil { |
|
return nil, err |
|
} |
|
} |
|
s.bleveIndex = bleve.NewIndexAlias(s.indices...) |
|
return s, nil |
|
} |
|
|
|
// OpenShardedIndex opens a sharded index at the specified path. |
|
func OpenShardedIndex(path string) (ShardedIndex, error) { |
|
var meta shardedIndexMetadata |
|
var err error |
|
if err = readJSON(shardedIndexMetadataPath(path), &meta); err != nil { |
|
return nil, err |
|
} |
|
|
|
s := &shardedIndex{ |
|
indices: make([]bleve.Index, meta.NumShards), |
|
} |
|
for i := 0; i < meta.NumShards; i++ { |
|
s.indices[i], err = bleve.Open(childIndexerPath(path, i)) |
|
if err != nil { |
|
return nil, err |
|
} |
|
} |
|
s.bleveIndex = bleve.NewIndexAlias(s.indices...) |
|
return s, nil |
|
} |
|
|
|
func (s *shardedIndex) Index(id string, data interface{}) error { |
|
return s.indices[hash(id, len(s.indices))].Index(id, data) |
|
} |
|
|
|
func (s *shardedIndex) Delete(id string) error { |
|
return s.indices[hash(id, len(s.indices))].Delete(id) |
|
} |
|
|
|
func (s *shardedIndex) Document(id string) (*document.Document, error) { |
|
return s.indices[hash(id, len(s.indices))].Document(id) |
|
} |
|
|
|
func (s *shardedIndex) Close() error { |
|
if err := s.bleveIndex.Close(); err != nil { |
|
return err |
|
} |
|
for _, index := range s.indices { |
|
if err := index.Close(); err != nil { |
|
return err |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (s *shardedIndex) shards() []bleve.Index { |
|
return s.indices |
|
} |
|
|
|
type shardedIndexFlushingBatch struct { |
|
batches []*singleIndexFlushingBatch |
|
} |
|
|
|
// NewShardedFlushingBatch creates a flushing batch with the specified batch |
|
// size for the specified sharded index. |
|
func NewShardedFlushingBatch(index ShardedIndex, maxBatchSize int) FlushingBatch { |
|
indices := index.shards() |
|
b := &shardedIndexFlushingBatch{ |
|
batches: make([]*singleIndexFlushingBatch, len(indices)), |
|
} |
|
for i, index := range indices { |
|
b.batches[i] = newFlushingBatch(index, maxBatchSize) |
|
} |
|
return b |
|
} |
|
|
|
func (b *shardedIndexFlushingBatch) Index(id string, data interface{}) error { |
|
return b.batches[hash(id, len(b.batches))].Index(id, data) |
|
} |
|
|
|
func (b *shardedIndexFlushingBatch) Delete(id string) error { |
|
return b.batches[hash(id, len(b.batches))].Delete(id) |
|
} |
|
|
|
func (b *shardedIndexFlushingBatch) Flush() error { |
|
for _, batch := range b.batches { |
|
if err := batch.Flush(); err != nil { |
|
return err |
|
} |
|
} |
|
return nil |
|
}
|
|
|