cache: Add on-demand progress notification from cache#21545
cache: Add on-demand progress notification from cache#21545akstron wants to merge 1 commit intoetcd-io:mainfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: akstron The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @akstron. Thanks for your PR. I'm waiting for a etcd-io member to verify that this patch is reasonable to test. If it is, they should reply with Regular contributors should join the org to skip this step. Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
8adc2a8 to
3f10b5f
Compare
cache/demux_test.go
Outdated
| d.BroadcastProgress() | ||
|
|
||
| resps1 := readResponses(t, w1, 1) | ||
| require.Truef(t, resps1[0].IsProgressNotify(), "expected progress notify") |
There was a problem hiding this comment.
nit: assert len(resps1) == 1 or whatever is the size we expect
There was a problem hiding this comment.
readResponses is a test utility, which will always return the number of responses it is asked to read. If it can't it will timeout and the test will fail. So, we don't have to add assert len(resps1) == 1
cache/demux_test.go
Outdated
| require.Truef(t, resps2[0].IsProgressNotify(), "expected progress notify") | ||
| require.Equal(t, int64(10), resps2[0].Header.Revision) | ||
|
|
||
| d.maxRev = 0 |
There was a problem hiding this comment.
Nit: Validating different scenario, like if progress is sent on unsychronized demux, should be a separate test.
tests/integration/cache_test.go
Outdated
| for i := 0; i < 3; i++ { | ||
| select { | ||
| case resp := <-watchCh: | ||
| if resp.Canceled { | ||
| t.Fatalf("unexpected canceled response: %v", resp.CancelReason) | ||
| } |
There was a problem hiding this comment.
There is no guarantee that 3 events will be put into 3 responses, this might work locally but will flake on CI. It's up to demux to group it as it pleases. Instead of reading from channel 3 times, we should collect responses until the aggregated event count get to 3.
|
|
||
| ctx := t.Context() | ||
|
|
||
| c, err := cache.New(client, "/foo") |
There was a problem hiding this comment.
For future would be good to also test if the progress notification progresses if writes were done outside of prefix. This would require to also propagate progress request to watch under the cache. It would require some rate limiting so we don't have multiple watchers on cache requesting progress that is demultiplexed on one watch.
There was a problem hiding this comment.
Makes sense. This would require sending progress request to the etcd server. Should we just implement that instead of this? But this would also mean that each progress request will now make a network call.
Signed-off-by: Alok Kumar Singh <dev.alok.singh123@gmail.com>
3f10b5f to
bfc3ffe
Compare
Add Cache.RequestProgress() method that mirrors the etcd Watcher.RequestProgress API but is served entirely from the cache. It broadcasts a progress notification at the cache's current revision to all active local watchers.
Also, used AI for writing tests.
Part of #19371