3333import com .google .cloud .bigquery .storage .v1 .StreamWriter .SingleConnectionOrConnectionPool .Kind ;
3434import com .google .common .annotations .VisibleForTesting ;
3535import com .google .common .base .Preconditions ;
36+ import com .google .common .cache .Cache ;
37+ import com .google .common .cache .CacheBuilder ;
3638import com .google .errorprone .annotations .CanIgnoreReturnValue ;
3739import com .google .protobuf .ByteString ;
3840import io .grpc .Status ;
4850import java .util .Map .Entry ;
4951import java .util .Objects ;
5052import java .util .UUID ;
53+ import java .util .concurrent .Callable ;
5154import java .util .concurrent .ConcurrentHashMap ;
55+ import java .util .concurrent .ExecutionException ;
5256import java .util .concurrent .TimeUnit ;
5357import java .util .concurrent .atomic .AtomicBoolean ;
5458import java .util .concurrent .locks .Lock ;
@@ -75,8 +79,15 @@ public class StreamWriter implements AutoCloseable {
7579 private static Pattern streamPatternDefaultStream = Pattern .compile (defaultStreamMatching );
7680
7781 // Cache of location info for a given dataset.
78- private static Map < String , String > projectAndDatasetToLocation = new ConcurrentHashMap <>();
82+ private static long LOCATION_CACHE_EXPIRE_MILLIS = 10 * 60 * 1000 ; // 10 minutes
7983
84+ private static Cache <String , String > allocateProjectLocationCache () {
85+ return CacheBuilder .newBuilder ()
86+ .expireAfterWrite (LOCATION_CACHE_EXPIRE_MILLIS , TimeUnit .MILLISECONDS )
87+ .build ();
88+ }
89+
90+ private static Cache <String , String > projectAndDatasetToLocation = allocateProjectLocationCache ();
8091 /*
8192 * The identifier of stream to write to.
8293 */
@@ -287,26 +298,33 @@ private StreamWriter(Builder builder) throws IOException {
287298 if (location == null || location .isEmpty ()) {
288299 // Location is not passed in, try to fetch from RPC
289300 String datasetAndProjectName = extractDatasetAndProjectName (builder .streamName );
290- location =
291- projectAndDatasetToLocation .computeIfAbsent (
292- datasetAndProjectName ,
293- (key ) -> {
294- GetWriteStreamRequest writeStreamRequest =
295- GetWriteStreamRequest .newBuilder ()
296- .setName (this .getStreamName ())
297- .setView (WriteStreamView .BASIC )
298- .build ();
299-
300- WriteStream writeStream = client .getWriteStream (writeStreamRequest );
301- TableSchema writeStreamTableSchema = writeStream .getTableSchema ();
302- String fetchedLocation = writeStream .getLocation ();
303- log .info (
304- String .format (
305- "Fetched location %s for stream name %s, extracted project and dataset"
306- + " name: %s\" " ,
307- fetchedLocation , streamName , datasetAndProjectName ));
308- return fetchedLocation ;
309- });
301+ try {
302+ location =
303+ projectAndDatasetToLocation .get (
304+ datasetAndProjectName ,
305+ new Callable <String >() {
306+ @ Override
307+ public String call () throws Exception {
308+ GetWriteStreamRequest writeStreamRequest =
309+ GetWriteStreamRequest .newBuilder ()
310+ .setName (getStreamName ())
311+ .setView (WriteStreamView .BASIC )
312+ .build ();
313+
314+ WriteStream writeStream = client .getWriteStream (writeStreamRequest );
315+ TableSchema writeStreamTableSchema = writeStream .getTableSchema ();
316+ String fetchedLocation = writeStream .getLocation ();
317+ log .info (
318+ String .format (
319+ "Fetched location %s for stream name %s, extracted project and dataset"
320+ + " name: %s\" " ,
321+ fetchedLocation , streamName , datasetAndProjectName ));
322+ return fetchedLocation ;
323+ }
324+ });
325+ } catch (ExecutionException e ) {
326+ throw new IllegalStateException (e .getCause ());
327+ }
310328 if (location .isEmpty ()) {
311329 throw new IllegalStateException (
312330 String .format (
@@ -371,6 +389,12 @@ static boolean isDefaultStream(String streamName) {
371389 return streamMatcher .find ();
372390 }
373391
392+ @ VisibleForTesting
393+ static void recreateProjectLocationCache (long durationExpireMillis ) {
394+ LOCATION_CACHE_EXPIRE_MILLIS = durationExpireMillis ;
395+ projectAndDatasetToLocation = allocateProjectLocationCache ();
396+ }
397+
374398 String getFullTraceId () {
375399 return fullTraceId ;
376400 }
0 commit comments