-
Notifications
You must be signed in to change notification settings - Fork 5.1k
[Storage] [WebJobs] LogScan new blobs between the start of polling and the LMT of the last LogScan #53767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[Storage] [WebJobs] LogScan new blobs between the start of polling and the LMT of the last LogScan #53767
Changes from 4 commits
d1a86fb
a098b2c
540ec53
aaed34c
c68a7b4
e122815
20e3c52
c930734
61d855e
3ed42ec
b294bb7
62ddef9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |||||
| ### Breaking Changes | ||||||
|
|
||||||
| ### Bugs Fixed | ||||||
| - Fixed a bug where polling for new blobs fails to detect newly created blobs when the List Blobs/Get Blobs operation requires multiple requests. If a blob is added or modified after LogScan begins, and is listed during the later portion of the listing, any blobs changed between the listing start time and the last modified timestamp of blobs found later will not be flagged as new in the subsequent LogScan. | ||||||
|
||||||
| - Fixed a bug where polling for new blobs fails to detect newly created blobs when the List Blobs/Get Blobs operation requires multiple requests. If a blob is added or modified after LogScan begins, and is listed during the later portion of the listing, any blobs changed between the listing start time and the last modified timestamp of blobs found later will not be flagged as new in the subsequent LogScan. | |
| - Fixed a bug where polling for new blobs fails to detect blobs modified between the start of a multi-page blob listing and the last modified timestamp of blobs found in later pages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this fix details best how this issue is exposed. But if others would rather see it shortened to this, I can do that.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ | |
| using Azure.Storage.Blobs; | ||
| using Azure.Storage.Blobs.Models; | ||
| using Azure.Storage.Blobs.Specialized; | ||
| using Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Tests; | ||
| using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners; | ||
| using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Tests; | ||
| using Microsoft.Azure.WebJobs.Host.Executors; | ||
|
|
@@ -370,7 +371,7 @@ public async Task ExecuteAsync_UpdatesScanInfo_WithEarliestFailure() | |
| int testScanBlobLimitPerPoll = 6; | ||
|
|
||
| // we'll introduce multiple errors to make sure we take the earliest timestamp | ||
| DateTime earliestErrorTime = DateTime.UtcNow; | ||
| DateTimeOffset earliestErrorTime = DateTimeOffset.UtcNow.AddHours(-1); | ||
|
|
||
| var container = _blobContainerMock.Object; | ||
|
|
||
|
|
@@ -410,6 +411,91 @@ public async Task ExecuteAsync_UpdatesScanInfo_WithEarliestFailure() | |
| Times.Exactly(2)); | ||
| } | ||
|
|
||
| [Test] | ||
| public async Task ExecuteAsync_PollNewBlobsAsync_ContinuationTokenUpdatedBlobs() | ||
| { | ||
| // Arrange | ||
| int testScanBlobLimitPerPoll = 3; | ||
| IBlobListenerStrategy product = new ScanBlobScanLogHybridPollingStrategy(new TestBlobScanInfoManager(), _exceptionHandler, NullLogger<BlobListener>.Instance); | ||
| LambdaBlobTriggerExecutor executor = new LambdaBlobTriggerExecutor(); | ||
| typeof(ScanBlobScanLogHybridPollingStrategy) | ||
| .GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic) | ||
| .SetValue(product, testScanBlobLimitPerPoll); | ||
|
|
||
| // Setup container to have multiple GetBlobsAsync calls to simulate continuation tokens | ||
| Uri uri = new Uri("https://fakeaccount.blob.core.windows.net/fakecontainer2"); | ||
| Mock<BlobContainerClient> containerMock = new Mock<BlobContainerClient>(uri, null); | ||
| containerMock.Setup(x => x.Uri).Returns(uri); | ||
| containerMock.Setup(x => x.Name).Returns(ContainerName); | ||
| containerMock.Setup(x => x.AccountName).Returns(AccountName); | ||
|
|
||
| // Create first pages of blob to list from | ||
amnguye marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| List<BlobItem> blobItems = new List<BlobItem>(); | ||
| List<string> expectedNames = new List<string>(); | ||
| for (int i = 0; i < 5; i++) | ||
| { | ||
| DateTimeOffset lastModified = DateTimeOffset.UtcNow.AddMinutes(-10 * i); | ||
| expectedNames.Add(CreateBlobAndUploadToContainer(containerMock, blobItems, lastModified: lastModified)); | ||
| } | ||
| // Create second page | ||
| List<BlobItem> blobItemsPage2 = new List<BlobItem>(); | ||
| for (int i = 0; i < 3; i++) | ||
| { | ||
| DateTimeOffset lastModified = DateTimeOffset.UtcNow.AddMinutes(-5 * i); | ||
| expectedNames.Add(CreateBlobAndUploadToContainer(containerMock, blobItemsPage2, lastModified: lastModified)); | ||
| } | ||
|
|
||
| // Add at least one blob that has a LastModifiedTime that goes beyond the start time of polling | ||
| DateTimeOffset lastModifiedAfterStartPolling = DateTimeOffset.UtcNow.AddSeconds(5); | ||
| string blobNameWithLmtAfterStartedPolling = CreateBlobAndUploadToContainer(containerMock, blobItemsPage2, lastModified: lastModifiedAfterStartPolling); | ||
|
|
||
| // Update all the blobs in the second listing, that way they get detected again in the second polling | ||
| List<BlobItem> blobItemsUpdated = new List<BlobItem>(); | ||
| List<string> secondSetExpectedNames = new List<string>(); | ||
| for (int i = 0; i < 4; i++) | ||
| { | ||
| // Create LastModified to be after the LMT of the blob that was beyond the start of the polling time to test if blobs created after that time will also be detected | ||
| secondSetExpectedNames.Add(CreateBlobAndUploadToContainer(containerMock, blobItemsUpdated, lastModified: lastModifiedAfterStartPolling.AddSeconds(-2))); | ||
| } | ||
| // Add blob with LMT after polling started to the second polling expected names. | ||
| secondSetExpectedNames.Add(blobNameWithLmtAfterStartedPolling); | ||
|
|
||
| // Set up GetBlobsAsync to return pages with contination token for each page, but not at the end of each polling | ||
amnguye marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| containerMock.SetupSequence(x => x.GetBlobsAsync(It.IsAny<BlobTraits>(), It.IsAny<BlobStates>(), It.IsAny<string>(), It.IsAny<CancellationToken>())) | ||
| // First polling | ||
| .Returns(() => | ||
| { | ||
| return new TestAsyncPageableWithContinuationToken(blobItems, true); | ||
| }) | ||
| .Returns(() => | ||
| { | ||
| return new TestAsyncPageableWithContinuationToken(blobItemsPage2, false); | ||
| }) | ||
| // Second polling | ||
| .Returns(() => | ||
| { | ||
| return new TestAsyncPageableWithContinuationToken(blobItemsUpdated, true); | ||
| }) | ||
| .Returns(() => | ||
| { | ||
| return new TestAsyncPageableWithContinuationToken(blobItemsPage2, false); | ||
| }); | ||
|
|
||
| // Register the container to initialize _scanInfo | ||
| await product.RegisterAsync(_blobClientMock.Object, containerMock.Object, executor, CancellationToken.None); | ||
|
|
||
| // Act / Assert - First Polling | ||
| RunExecuteWithMultiPollingInterval(expectedNames, product, executor, blobItems.Count); | ||
|
|
||
| // Wait 5 seconds to ensure that the blob with LMT after polling started is detected as a new blob. | ||
| Thread.Sleep(5000); | ||
|
||
|
|
||
| // Act / Assert - Second Polling | ||
| // We expect all the blobs we updated above to be detected and the blob that was created after the first polling started that wasn't detected | ||
| // to be now detected in this polling. | ||
| RunExecuteWithMultiPollingInterval(secondSetExpectedNames, product, executor, blobItemsUpdated.Count); | ||
| } | ||
|
|
||
| private void RunExecuterWithExpectedBlobsInternal(IDictionary<string, int> blobNameMap, IBlobListenerStrategy product, LambdaBlobTriggerExecutor executor, int expectedCount) | ||
| { | ||
| if (blobNameMap.Count == 0) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using Azure; | ||
| using Azure.Storage.Blobs.Models; | ||
| using Moq; | ||
| using System.Collections.Generic; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Tests | ||
| { | ||
| public class TestAsyncPageableWithContinuationToken : AsyncPageable<BlobItem> | ||
| { | ||
| private readonly List<BlobItem> _page; | ||
| private readonly bool _returnsContinuationToken; | ||
|
|
||
| public TestAsyncPageableWithContinuationToken(List<BlobItem> page, bool returnsContinuationToken) | ||
| { | ||
| _page = page; | ||
| _returnsContinuationToken = returnsContinuationToken; | ||
| } | ||
|
|
||
| public override async IAsyncEnumerable<Page<BlobItem>> AsPages(string continuationToken = null, int? pageSizeHint = null) | ||
| { | ||
| string mockContinuationToken = System.Guid.NewGuid().ToString(); | ||
| yield return Page<BlobItem>.FromValues( | ||
| _page.AsReadOnly(), | ||
| _returnsContinuationToken ? mockContinuationToken : null, | ||
| Mock.Of<Response>()); | ||
| // Simulate async page boundary | ||
| await Task.Yield(); | ||
| } | ||
amnguye marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not my favorite changelog message, any rewording suggestions is welcomed here.