|
|
|
@ -37,7 +37,8 @@ type delayedStarter struct {
|
|
|
|
|
name string |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) { |
|
|
|
|
// setInternal must be called with the lock locked.
|
|
|
|
|
func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) error { |
|
|
|
|
var ctx context.Context |
|
|
|
|
var cancel context.CancelFunc |
|
|
|
|
if q.timeout > 0 { |
|
|
|
@ -56,8 +57,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
|
|
|
|
|
for q.internal == nil { |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
q.lock.Unlock() |
|
|
|
|
log.Fatal("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name) |
|
|
|
|
return fmt.Errorf("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name) |
|
|
|
|
default: |
|
|
|
|
queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar) |
|
|
|
|
if err == nil { |
|
|
|
@ -70,16 +70,21 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
|
|
|
|
|
} |
|
|
|
|
i++ |
|
|
|
|
if q.maxAttempts > 0 && i > q.maxAttempts { |
|
|
|
|
q.lock.Unlock() |
|
|
|
|
log.Fatal("Unable to create queue %v for %s with cfg %v by max attempts: error: %v", q.underlying, q.name, q.cfg, err) |
|
|
|
|
return fmt.Errorf("Unable to create queue %v for %s with cfg %v by max attempts: error: %v", q.underlying, q.name, q.cfg, err) |
|
|
|
|
} |
|
|
|
|
sleepTime := 100 * time.Millisecond |
|
|
|
|
if q.timeout > 0 && q.maxAttempts > 0 { |
|
|
|
|
sleepTime = (q.timeout - 200*time.Millisecond) / time.Duration(q.maxAttempts) |
|
|
|
|
} |
|
|
|
|
time.Sleep(sleepTime) |
|
|
|
|
t := time.NewTimer(sleepTime) |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
t.Stop() |
|
|
|
|
case <-t.C: |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// WrappedQueue wraps a delayed starting queue
|
|
|
|
@ -151,7 +156,12 @@ func (q *WrappedQueue) Push(data Data) error {
|
|
|
|
|
func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) { |
|
|
|
|
q.lock.Lock() |
|
|
|
|
if q.internal == nil { |
|
|
|
|
q.setInternal(atShutdown, q.handle, q.exemplar) |
|
|
|
|
err := q.setInternal(atShutdown, q.handle, q.exemplar) |
|
|
|
|
q.lock.Unlock() |
|
|
|
|
if err != nil { |
|
|
|
|
log.Fatal("Unable to set the internal queue for %s Error: %v", q.Name(), err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
go func() { |
|
|
|
|
for data := range q.channel { |
|
|
|
|
_ = q.internal.Push(data) |
|
|
|
|