Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions google-cloud-bigtable/conformance/known_failures.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
TestCheckAndMutateRow_Generic_CloseClient
TestCheckAndMutateRow_Generic_DeadlineExceeded
TestExecuteQuery_EmptyResponse
TestExecuteQuery_SingleSimpleRow
TestExecuteQuery_*
TestMutateRow_Generic_DeadlineExceeded
TestMutateRow_Generic_CloseClient
TestMutateRows_Generic_DeadlineExceeded
TestMutateRows_Retry_ExponentialBackoff
TestMutateRows_Generic_CloseClient
TestMutateRows_Retry_WithRoutingCookie
TestMutateRows_Retry_WithRetryInfo
TestReadModifyWriteRow_Generic_CloseClient
TestReadModifyWriteRow_Generic_DeadlineExceeded
TestReadRow_Generic_DeadlineExceeded
Expand Down
63 changes: 46 additions & 17 deletions google-cloud-bigtable/lib/google/cloud/bigtable/rows_mutator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class RowsMutator
RETRY_LIMIT = 3

# @private
# The prefix for routing cookies. Used to dynamically find cookie
# headers in metadata.
COOKIE_KEY_PREFIX = "x-goog-cbt-cookie"

#
# Creates a mutate rows instance.
#
Expand All @@ -57,18 +61,20 @@ def initialize table, entries
#
def apply_mutations
@req_entries = @entries.map(&:to_grpc)
statuses = mutate_rows @req_entries
statuses, delay, cookies = mutate_rows @req_entries

# Collects retryable mutations indices.
indices = statuses.each_with_object [] do |e, r|
r << e.index if @entries[e.index].retryable? && RETRYABLE_CODES[e.status.code]
end

return statuses if indices.empty?

(RETRY_LIMIT - 1).times do
RETRY_LIMIT.times do
break if indices.empty?
indices = retry_entries statuses, indices

sleep delay if delay

indices, delay, cookies = retry_entries statuses, indices, cookies
end

statuses
Expand All @@ -80,32 +86,55 @@ def apply_mutations
# Mutates rows.
#
# @param entries [Array<Google::Cloud::Bigtable::MutationEntry>]
# @return [Array<Google::Cloud::Bigtable::V2::MutateRowsResponse::Entry>]
# @param cookies [Hash]
# @return [Array<Google::Cloud::Bigtable::V2::MutateRowsResponse::Entry>, Float|nil, Hash]
#
def mutate_rows entries
response = @table.service.mutate_rows @table.path, entries, app_profile_id: @table.app_profile_id
response.each_with_object [] do |res, statuses|
statuses.concat res.entries
def mutate_rows entries, cookies = {}
call_options = Gapic::CallOptions.new(metadata: cookies) unless cookies.empty?

response = @table.service.mutate_rows(
@table.path,
entries,
app_profile_id: @table.app_profile_id,
call_options: call_options
)
[response.flat_map(&:entries), nil, cookies]
rescue GRPC::BadStatus => e
info = e.status_details.find { |d| d.is_a? Google::Rpc::RetryInfo }
delay = if info&.retry_delay
info.retry_delay.seconds + (info.retry_delay.nanos / 1_000_000_000.0)
end

cookies.merge!(e.metadata.select { |k, _| k.start_with? COOKIE_KEY_PREFIX })

status = Google::Rpc::Status.new code: e.code, message: e.message
statuses = entries.map.with_index do |_, i|
Google::Cloud::Bigtable::V2::MutateRowsResponse::Entry.new(
index: i,
status: status
)
end
[statuses, delay, cookies]
end

##
# Collects failed entries, retries mutation, and updates status.
#
# @param statuses [Array<Google::Cloud::Bigtable::V2::MutateRowsResponse::Entry>]
# @param indices [Array<Integer>]
# Retry entries position mapping list
# @return [Array<Integer>]
# New list of failed entries positions
# @param cookies [Hash]
# @return [Array<Integer>, Float|nil, Hash]
#
def retry_entries statuses, indices
def retry_entries statuses, indices, cookies
entries = indices.map { |i| @req_entries[i] }
retry_statuses = mutate_rows entries
retry_statuses, delay, cookies = mutate_rows entries, cookies

retry_statuses.each_with_object [] do |e, next_indices|
next_indices << indices[e.index] if RETRYABLE_CODES[e.status.code]
statuses[indices[e.index]].status = e.status
next_indices = retry_statuses.each_with_object [] do |e, list|
next_index = indices[e.index]
statuses[next_index].status = e.status
list << next_index if RETRYABLE_CODES[e.status.code]
end
[next_indices, delay, cookies]
end
end
end
Expand Down
15 changes: 7 additions & 8 deletions google-cloud-bigtable/lib/google/cloud/bigtable/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -689,14 +689,13 @@ def mutate_row table_name, row_key, mutations, app_profile_id: nil
)
end

def mutate_rows table_name, entries, app_profile_id: nil
client(table_name, app_profile_id).mutate_rows(
**{
table_name: table_name,
app_profile_id: app_profile_id,
entries: entries
}.compact
)
def mutate_rows table_name, entries, app_profile_id: nil, call_options: nil
request = {
table_name: table_name,
app_profile_id: app_profile_id,
entries: entries
}.compact
client(table_name, app_profile_id).mutate_rows request, call_options
end

def check_and_mutate_row table_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@
}]
)

mock.expect :mutate_rows, [res],
req = {
table_name: table_path(instance_id, table_id),
entries: [mutation_entry_grpc],
app_profile_id: app_profile_id
}
mock.expect :mutate_rows, [res], [req, nil]

entry = Google::Cloud::Bigtable::MutationEntry.new(row_key)
entry.set_cell(family, qualifier, cell_value, timestamp: timestamp)
Expand Down Expand Up @@ -100,7 +102,12 @@
}]
)
mock = Minitest::Mock.new
mock.expect :mutate_rows, [res], table_name: table_path(instance_id, table_id), entries: [entry], app_profile_id: app_profile_id
req = {
table_name: table_path(instance_id, table_id),
app_profile_id: app_profile_id,
entries: [entry]
}
mock.expect :mutate_rows, [res], [req, nil]

bigtable.service.mocked_client = mock

Expand All @@ -119,11 +126,12 @@
mock.verify
end

it "retry for failed mutation with 3 times" do
it "retries a failed mutation 3 times" do
req_entries = req_entries_grpc
retry_entries = [
req_entries,
[req_entries.last],
[req_entries.last],
[req_entries.last]
]

Expand All @@ -135,6 +143,9 @@
[
{ index: 0, status: { code: Google::Rpc::Code::DEADLINE_EXCEEDED, message: "failed" }}
],
[
{ index: 0, status: { code: Google::Rpc::Code::DEADLINE_EXCEEDED, message: "failed" }}
],
[
{ index: 0, status: { code: Google::Rpc::Code::DEADLINE_EXCEEDED, message: "failed" }}
]
Expand All @@ -152,10 +163,11 @@
req_retry_entries: retry_entries,
req_retry_response: retry_responses
)
def mock.mutate_rows request
def mock.mutate_rows request, call_options
t._(request[:table_name]).must_equal expected_table_path
t._(request[:entries]).must_equal req_retry_entries[self.retry_count]
t._(request[:app_profile_id]).must_equal expected_req_app_profile_id
t.assert_kind_of Gapic::CallOptions, call_options if call_options

res = req_retry_response[self.retry_count]
self.retry_count += 1
Expand All @@ -171,7 +183,7 @@ def mock.mutate_rows request
end
responses = table.mutate_rows(mutation_entries)

_(mock.retry_count).must_equal 3
_(mock.retry_count).must_equal 4
_(responses.length).must_equal 2
_(responses[0].index).must_equal 0
_(responses[0].status.code).must_equal Google::Rpc::Code::OK
Expand Down Expand Up @@ -219,10 +231,11 @@ def mock.mutate_rows request
req_retry_entries: retry_entries,
req_retry_response: retry_responses
)
def mock.mutate_rows request
def mock.mutate_rows request, call_options
t._(request[:table_name]).must_equal expected_table_path
t._(request[:entries]).must_equal req_retry_entries[self.retry_count]
t._(request[:app_profile_id]).must_equal expected_req_app_profile_id
t.assert_kind_of Gapic::CallOptions, call_options if call_options

res = req_retry_response[self.retry_count]
self.retry_count += 1
Expand Down
Loading