Skip to content

Commit

Permalink
bug: Priorityqueue: Yet another queue_depth metric fix
Browse files Browse the repository at this point in the history
Inside the priorityqueues `spin` we call the metrics `add` if an item
becomes ready so that the `queue_depth` metric gets incremented. To
avoid doing this multiple times for the same item, we track the key in a
map and remove it there when we hand the item out.

If an item gets added without `RequeueAfter` that is already on the
queue but with a `RequeueAfter` we also call the metrics `add` - But if
we already did that in `spin` we will count the item twice.
  • Loading branch information
alvaroaleman authored and k8s-infra-cherrypick-robot committed Jan 23, 2025
1 parent 791b6c9 commit 64cb665
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/priorityqueue/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ func (m *defaultQueueMetrics[T]) get(item T) {
return
}

m.depth.Dec()

m.mapLock.Lock()
defer m.mapLock.Unlock()

m.depth.Dec()

m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(m.sinceInSeconds(startTime))
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
}

if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
if readyAt == nil {
if readyAt == nil && !w.becameReady.Has(key) {
w.metrics.add(key)
}
item.ReadyAt = readyAt
Expand Down
42 changes: 42 additions & 0 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,48 @@ var _ = Describe("Controllerworkqueue", func() {
Expect(q.Len()).To(Equal(1))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(1))
metrics.mu.Unlock()

// Get the item to ensure the codepath in
// `spin` for the metrics is passed by so
// that this starts failing if it incorrectly
// calls `metrics.add` again.
item, _ := q.Get()
Expect(item).To(Equal("foo"))
Expect(q.Len()).To(Equal(0))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(0))
metrics.mu.Unlock()
})

It("Updates metrics correctly for an item whose requeueAfter expired that gets added again without requeueAfter", func() {
q, metrics := newQueue()
defer q.ShutDown()

q.AddWithOpts(AddOpts{After: 50 * time.Millisecond}, "foo")
time.Sleep(100 * time.Millisecond)

Expect(q.Len()).To(Equal(1))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(1))
metrics.mu.Unlock()

q.AddWithOpts(AddOpts{}, "foo")
Expect(q.Len()).To(Equal(1))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(1))
metrics.mu.Unlock()

// Get the item to ensure the codepath in
// `spin` for the metrics is passed by so
// that this starts failing if it incorrectly
// calls `metrics.add` again.
item, _ := q.Get()
Expect(item).To(Equal("foo"))
Expect(q.Len()).To(Equal(0))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(0))
metrics.mu.Unlock()
})
})

Expand Down

0 comments on commit 64cb665

Please sign in to comment.