|
|
@ -6,6 +6,7 @@ package queue |
|
|
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"context" |
|
|
|
|
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
"time" |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
@ -19,6 +20,7 @@ type PersistableChannelQueueConfiguration struct { |
|
|
|
QueueLength int |
|
|
|
QueueLength int |
|
|
|
Timeout time.Duration |
|
|
|
Timeout time.Duration |
|
|
|
MaxAttempts int |
|
|
|
MaxAttempts int |
|
|
|
|
|
|
|
Workers int |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// PersistableChannelQueue wraps a channel queue and level queue together
|
|
|
|
// PersistableChannelQueue wraps a channel queue and level queue together
|
|
|
@ -40,14 +42,17 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( |
|
|
|
batchChannelQueue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{ |
|
|
|
batchChannelQueue, err := NewBatchedChannelQueue(handle, BatchedChannelQueueConfiguration{ |
|
|
|
QueueLength: config.QueueLength, |
|
|
|
QueueLength: config.QueueLength, |
|
|
|
BatchLength: config.BatchLength, |
|
|
|
BatchLength: config.BatchLength, |
|
|
|
|
|
|
|
Workers: config.Workers, |
|
|
|
}, exemplar) |
|
|
|
}, exemplar) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// the level backend only needs one worker to catch up with the previously dropped work
|
|
|
|
levelCfg := LevelQueueConfiguration{ |
|
|
|
levelCfg := LevelQueueConfiguration{ |
|
|
|
DataDir: config.DataDir, |
|
|
|
DataDir: config.DataDir, |
|
|
|
BatchLength: config.BatchLength, |
|
|
|
BatchLength: config.BatchLength, |
|
|
|
|
|
|
|
Workers: 1, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) |
|
|
|
levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) |
|
|
@ -100,6 +105,19 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte |
|
|
|
|
|
|
|
|
|
|
|
// Just run the level queue - we shut it down later
|
|
|
|
// Just run the level queue - we shut it down later
|
|
|
|
go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) |
|
|
|
go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
wg := sync.WaitGroup{} |
|
|
|
|
|
|
|
for i := 0; i < p.workers; i++ { |
|
|
|
|
|
|
|
wg.Add(1) |
|
|
|
|
|
|
|
go func() { |
|
|
|
|
|
|
|
p.worker() |
|
|
|
|
|
|
|
wg.Done() |
|
|
|
|
|
|
|
}() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
wg.Wait() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (p *PersistableChannelQueue) worker() { |
|
|
|
delay := time.Millisecond * 300 |
|
|
|
delay := time.Millisecond * 300 |
|
|
|
var datas = make([]Data, 0, p.batchLength) |
|
|
|
var datas = make([]Data, 0, p.batchLength) |
|
|
|
loop: |
|
|
|
loop: |
|
|
|