Skip to content

Commit db9bbf6

Browse files
fix: make location cache expire after 10 minutes
1 parent 0749f3f commit db9bbf6

File tree

2 files changed

+108
-21
lines changed

2 files changed

+108
-21
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
3434
import com.google.common.annotations.VisibleForTesting;
3535
import com.google.common.base.Preconditions;
36+
import com.google.common.cache.Cache;
37+
import com.google.common.cache.CacheBuilder;
3638
import com.google.errorprone.annotations.CanIgnoreReturnValue;
3739
import com.google.protobuf.ByteString;
3840
import io.grpc.Status;
@@ -48,7 +50,9 @@
4850
import java.util.Map.Entry;
4951
import java.util.Objects;
5052
import java.util.UUID;
53+
import java.util.concurrent.Callable;
5154
import java.util.concurrent.ConcurrentHashMap;
55+
import java.util.concurrent.ExecutionException;
5256
import java.util.concurrent.TimeUnit;
5357
import java.util.concurrent.atomic.AtomicBoolean;
5458
import java.util.concurrent.locks.Lock;
@@ -75,8 +79,15 @@ public class StreamWriter implements AutoCloseable {
7579
private static Pattern streamPatternDefaultStream = Pattern.compile(defaultStreamMatching);
7680

7781
// Cache of location info for a given dataset.
78-
private static Map<String, String> projectAndDatasetToLocation = new ConcurrentHashMap<>();
82+
private static long LOCATION_CACHE_EXPIRE_MILLIS = 10 * 60 * 1000; // 10 minutes
7983

84+
private static Cache<String, String> allocateProjectLocationCache() {
85+
return CacheBuilder.newBuilder()
86+
.expireAfterWrite(LOCATION_CACHE_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
87+
.build();
88+
}
89+
90+
private static Cache<String, String> projectAndDatasetToLocation = allocateProjectLocationCache();
8091
/*
8192
* The identifier of stream to write to.
8293
*/
@@ -287,26 +298,33 @@ private StreamWriter(Builder builder) throws IOException {
287298
if (location == null || location.isEmpty()) {
288299
// Location is not passed in, try to fetch from RPC
289300
String datasetAndProjectName = extractDatasetAndProjectName(builder.streamName);
290-
location =
291-
projectAndDatasetToLocation.computeIfAbsent(
292-
datasetAndProjectName,
293-
(key) -> {
294-
GetWriteStreamRequest writeStreamRequest =
295-
GetWriteStreamRequest.newBuilder()
296-
.setName(this.getStreamName())
297-
.setView(WriteStreamView.BASIC)
298-
.build();
299-
300-
WriteStream writeStream = client.getWriteStream(writeStreamRequest);
301-
TableSchema writeStreamTableSchema = writeStream.getTableSchema();
302-
String fetchedLocation = writeStream.getLocation();
303-
log.info(
304-
String.format(
305-
"Fetched location %s for stream name %s, extracted project and dataset"
306-
+ " name: %s\"",
307-
fetchedLocation, streamName, datasetAndProjectName));
308-
return fetchedLocation;
309-
});
301+
try {
302+
location =
303+
projectAndDatasetToLocation.get(
304+
datasetAndProjectName,
305+
new Callable<String>() {
306+
@Override
307+
public String call() throws Exception {
308+
GetWriteStreamRequest writeStreamRequest =
309+
GetWriteStreamRequest.newBuilder()
310+
.setName(getStreamName())
311+
.setView(WriteStreamView.BASIC)
312+
.build();
313+
314+
WriteStream writeStream = client.getWriteStream(writeStreamRequest);
315+
TableSchema writeStreamTableSchema = writeStream.getTableSchema();
316+
String fetchedLocation = writeStream.getLocation();
317+
log.info(
318+
String.format(
319+
"Fetched location %s for stream name %s, extracted project and dataset"
320+
+ " name: %s\"",
321+
fetchedLocation, streamName, datasetAndProjectName));
322+
return fetchedLocation;
323+
}
324+
});
325+
} catch (ExecutionException e) {
326+
throw new IllegalStateException(e.getCause());
327+
}
310328
if (location.isEmpty()) {
311329
throw new IllegalStateException(
312330
String.format(
@@ -371,6 +389,12 @@ static boolean isDefaultStream(String streamName) {
371389
return streamMatcher.find();
372390
}
373391

392+
@VisibleForTesting
393+
static void recreateProjectLocationCache(long durationExpireMillis) {
394+
LOCATION_CACHE_EXPIRE_MILLIS = durationExpireMillis;
395+
projectAndDatasetToLocation = allocateProjectLocationCache();
396+
}
397+
374398
String getFullTraceId() {
375399
return fullTraceId;
376400
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import com.google.protobuf.Descriptors;
5757
import com.google.protobuf.Descriptors.DescriptorValidationException;
5858
import com.google.protobuf.Int64Value;
59+
import com.google.protobuf.Timestamp;
5960
import io.grpc.Status;
6061
import io.grpc.StatusRuntimeException;
6162
import io.opentelemetry.api.common.Attributes;
@@ -2551,4 +2552,66 @@ public void testGetDefaultStreamName() {
25512552
assertEquals(
25522553
"projects/projectId/datasets/datasetId/tables/tableId/_default", actualDefaultName);
25532554
}
2555+
2556+
@Test
2557+
public void testLocationCacheIsHit() throws Exception {
2558+
WriteStream expectedResponse =
2559+
WriteStream.newBuilder()
2560+
.setName(WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]").toString())
2561+
.setCreateTime(Timestamp.newBuilder().build())
2562+
.setCommitTime(Timestamp.newBuilder().build())
2563+
.setTableSchema(TableSchema.newBuilder().build())
2564+
.setLocation("oklahoma")
2565+
.build();
2566+
testBigQueryWrite.addResponse(expectedResponse);
2567+
2568+
// first stream will result in call to getWriteStream for location lookup
2569+
StreamWriter writer1 =
2570+
StreamWriter.newBuilder(TEST_STREAM_1, client)
2571+
.setWriterSchema(createProtoSchema())
2572+
.setEnableConnectionPool(true)
2573+
.build();
2574+
2575+
// second stream will hit the location cache
2576+
StreamWriter writer2 =
2577+
StreamWriter.newBuilder(TEST_STREAM_1, client)
2578+
.setWriterSchema(createProtoSchema())
2579+
.setEnableConnectionPool(true)
2580+
.build();
2581+
assertEquals(1, testBigQueryWrite.getWriteStreamRequests().size());
2582+
}
2583+
2584+
@Test
2585+
public void testLocationCacheExpires() throws Exception {
2586+
// force cache to expire in 1000 millis
2587+
StreamWriter.recreateProjectLocationCache(1000);
2588+
WriteStream expectedResponse =
2589+
WriteStream.newBuilder()
2590+
.setName(WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]").toString())
2591+
.setCreateTime(Timestamp.newBuilder().build())
2592+
.setCommitTime(Timestamp.newBuilder().build())
2593+
.setTableSchema(TableSchema.newBuilder().build())
2594+
.setLocation("oklahoma")
2595+
.build();
2596+
testBigQueryWrite.addResponse(expectedResponse);
2597+
testBigQueryWrite.addResponse(expectedResponse);
2598+
2599+
// first stream will result in call to getWriteStream for location lookup
2600+
StreamWriter writer1 =
2601+
StreamWriter.newBuilder(TEST_STREAM_1, client)
2602+
.setWriterSchema(createProtoSchema())
2603+
.setEnableConnectionPool(true)
2604+
.build();
2605+
2606+
// force cache to expire
2607+
TimeUnit.SECONDS.sleep(2);
2608+
2609+
// second stream will result in call to getWriteStream for location lookup
2610+
StreamWriter writer2 =
2611+
StreamWriter.newBuilder(TEST_STREAM_1, client)
2612+
.setWriterSchema(createProtoSchema())
2613+
.setEnableConnectionPool(true)
2614+
.build();
2615+
assertEquals(2, testBigQueryWrite.getWriteStreamRequests().size());
2616+
}
25542617
}

0 commit comments

Comments
 (0)