Skip to content

Complete GCP namespace support #7836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions src/api/account_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,9 @@ module.exports = {
},
endpoint_type: {
$ref: 'common_api#/definitions/endpoint_type'
},
gcp_hmac_key: {
$ref: 'common_api#/definitions/gcp_hmac_key'
}

}
Expand Down
11 changes: 11 additions & 0 deletions src/api/common_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,17 @@ module.exports = {
}
},

gcp_access_id: { wrapper: SensitiveString },
gcp_secret_key: { wrapper: SensitiveString },

gcp_hmac_key: {
type: 'object',
properties: {
access_id: { $ref: '#/definitions/gcp_access_id' },
secret_key: { $ref: '#/definitions/gcp_secret_key' },
}
},

ip_range: {
type: 'object',
required: ['start', 'end'],
Expand Down
1 change: 1 addition & 0 deletions src/api/pool_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ module.exports = {
region: {
type: 'string'
},
gcp_hmac_key: { $ref: 'common_api#/definitions/gcp_hmac_key' },
}
},

Expand Down
2 changes: 1 addition & 1 deletion src/endpoint/s3/ops/s3_put_object_tagging.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async function put_object_tagging(req, res) {
tagging: tag_set,
version_id
});
if (reply.version_id) res.setHeader('x-amz-version-id', reply.version_id);
if (reply?.version_id) res.setHeader('x-amz-version-id', reply.version_id);
}

module.exports = {
Expand Down
188 changes: 175 additions & 13 deletions src/sdk/namespace_gcp.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,21 @@ const util = require('util');
const P = require('../util/promise');
const stream_utils = require('../util/stream_utils');
const dbg = require('../util/debug_module')(__filename);
const s3_utils = require('../endpoint/s3/s3_utils');
const S3Error = require('../endpoint/s3/s3_errors').S3Error;
// we use this wrapper to set a custom user agent
const GoogleCloudStorage = require('../util/google_storage_wrap');
const {
AbortMultipartUploadCommand,
CompleteMultipartUploadCommand,
CreateMultipartUploadCommand,
ListPartsCommand,
S3Client,
UploadPartCommand,
UploadPartCopyCommand,
} = require('@aws-sdk/client-s3');

const { fix_error_object } = require('./noobaa_s3_client/noobaa_s3_client');

/**
* @implements {nb.Namespace}
Expand All @@ -25,6 +37,10 @@ class NamespaceGCP {
* private_key: string,
* access_mode: string,
* stats: import('./endpoint_stats_collector').EndpointStatsCollector,
* hmac_key: {
* access_id: string,
* secret_key: string,
* }
* }} params
*/
constructor({
Expand All @@ -35,6 +51,7 @@ class NamespaceGCP {
private_key,
access_mode,
stats,
hmac_key,
}) {
this.namespace_resource_id = namespace_resource_id;
this.project_id = project_id;
Expand All @@ -47,6 +64,20 @@ class NamespaceGCP {
private_key: this.private_key,
}
});
/* An S3 client is needed for multipart upload since GCP only supports multipart uploads via the S3-compatible XML API
* https://cloud.google.com/storage/docs/multipart-uploads
* This is also the reason an HMAC key is generated as part of `add_external_connection` - since the standard GCP credentials
* cannot be used in conjunction with the S3 client.
*/
this.s3_client = new S3Client({
endpoint: 'https://storage.googleapis.com',
region: 'auto', //https://cloud.google.com/storage/docs/aws-simple-migration#storage-list-buckets-s3-python
credentials: {
accessKeyId: hmac_key.access_id,
secretAccessKey: hmac_key.secret_key,
},
});

this.bucket = target_bucket;
this.access_mode = access_mode;
this.stats = stats;
Expand Down Expand Up @@ -191,7 +222,7 @@ class NamespaceGCP {
read_stream.on('response', () => {
let count = 1;
const count_stream = stream_utils.get_tap_stream(data => {
this.stats_collector.update_namespace_write_stats({
this.stats.update_namespace_write_stats({
namespace_resource_id: this.namespace_resource_id,
bucket_name: params.bucket,
size: data.length,
Expand Down Expand Up @@ -297,27 +328,135 @@ class NamespaceGCP {

async create_object_upload(params, object_sdk) {
dbg.log0('NamespaceGCP.create_object_upload:', this.bucket, inspect(params));
throw new S3Error(S3Error.NotImplemented);
const tagging = params.tagging && params.tagging.map(tag => tag.key + '=' + tag.value).join('&');

const res = await this.s3_client.send(
new CreateMultipartUploadCommand({
Bucket: this.bucket,
Key: params.key,
ContentType: params.content_type,
StorageClass: params.storage_class,
Metadata: params.xattr,
Tagging: tagging
}));

dbg.log0('NamespaceGCP.create_object_upload:', this.bucket, inspect(params), 'res', inspect(res));
return { obj_id: res.UploadId };
}

async upload_multipart(params, object_sdk) {
dbg.log0('NamespaceGCP.upload_multipart:', this.bucket, inspect(params));
throw new S3Error(S3Error.NotImplemented);
let etag;
let res;
if (params.copy_source) {
const { copy_source, copy_source_range } = s3_utils.format_copy_source(params.copy_source);

res = await this.s3_client.send(
new UploadPartCopyCommand({
Bucket: this.bucket,
Key: params.key,
UploadId: params.obj_id,
PartNumber: params.num,
CopySource: copy_source,
CopySourceRange: copy_source_range,
}));
etag = s3_utils.parse_etag(res.CopyPartResult.ETag);
return { etag };
}

let count = 1;
const count_stream = stream_utils.get_tap_stream(data => {
this.stats?.update_namespace_write_stats({
namespace_resource_id: this.namespace_resource_id,
size: data.length,
count
});
// clear count for next updates
count = 0;
});
try {
res = await this.s3_client.send(
new UploadPartCommand({
Bucket: this.bucket,
Key: params.key,
UploadId: params.obj_id,
PartNumber: params.num,
Body: params.source_stream.pipe(count_stream),
ContentMD5: params.md5_b64,
ContentLength: params.size,
}));
} catch (err) {
fix_error_object(err);
object_sdk.rpc_client.pool.update_issues_report({
namespace_resource_id: this.namespace_resource_id,
error_code: String(err.code),
time: Date.now(),
});
throw err;
}
etag = s3_utils.parse_etag(res.ETag);

dbg.log0('NamespaceGCP.upload_multipart:', this.bucket, inspect(params), 'res', inspect(res));
return { etag };
}

async list_multiparts(params, object_sdk) {
dbg.log0('NamespaceGCP.list_multiparts:', this.bucket, inspect(params));
throw new S3Error(S3Error.NotImplemented);

const res = await this.s3_client.send(
new ListPartsCommand({
Bucket: this.bucket,
Key: params.key,
UploadId: params.obj_id,
MaxParts: params.max,
PartNumberMarker: params.num_marker,
}));

dbg.log0('NamespaceGCP.list_multiparts:', this.bucket, inspect(params), 'res', inspect(res));
return {
is_truncated: res.IsTruncated,
next_num_marker: res.NextPartNumberMarker,
multiparts: _.map(res.Parts, p => ({
num: p.PartNumber,
size: p.Size,
etag: s3_utils.parse_etag(p.ETag),
last_modified: p.LastModified,
}))
};
}

async complete_object_upload(params, object_sdk) {
dbg.log0('NamespaceGCP.complete_object_upload:', this.bucket, inspect(params));
throw new S3Error(S3Error.NotImplemented);

const res = await this.s3_client.send(
new CompleteMultipartUploadCommand({
Bucket: this.bucket,
Key: params.key,
UploadId: params.obj_id,
MultipartUpload: {
Parts: _.map(params.multiparts, p => ({
PartNumber: p.num,
ETag: `"${p.etag}"`,
}))
}
}));

dbg.log0('NamespaceGCP.complete_object_upload:', this.bucket, inspect(params), 'res', inspect(res));
const etag = s3_utils.parse_etag(res.ETag);
return { etag, version_id: res.VersionId };
}

async abort_object_upload(params, object_sdk) {
dbg.log0('NamespaceGCP.abort_object_upload:', this.bucket, inspect(params));
throw new S3Error(S3Error.NotImplemented);

const res = await this.s3_client.send(
new AbortMultipartUploadCommand({
Bucket: this.bucket,
Key: params.key,
UploadId: params.obj_id,
}));

dbg.log0('NamespaceGCP.abort_object_upload:', this.bucket, inspect(params), 'res', inspect(res));
}

//////////
Expand All @@ -332,12 +471,13 @@ class NamespaceGCP {
*/
async get_object_acl(params, object_sdk) {
dbg.log0('NamespaceGCP.get_object_acl:', this.bucket, inspect(params));
throw new S3Error(S3Error.NotImplemented);
await this.read_object_md(params, object_sdk);
return s3_utils.DEFAULT_OBJECT_ACL;
}

async put_object_acl(params, object_sdk) {
dbg.log0('NamespaceGCP.put_object_acl:', this.bucket, inspect(params));
throw new S3Error(S3Error.NotImplemented);
await this.read_object_md(params, object_sdk);
}

///////////////////
Expand All @@ -363,8 +503,8 @@ class NamespaceGCP {

const res = await P.map_with_concurrency(10, params.objects, obj =>
this.gcs.bucket(this.bucket).file(obj.key).delete()
.then(() => ({}))
.catch(err => ({ err_code: err.code, err_message: err.errors[0].reason || 'InternalError' })));
.then(() => ({}))
.catch(err => ({ err_code: err.code, err_message: err.errors[0].reason || 'InternalError' })));

dbg.log1('NamespaceGCP.delete_multiple_objects:', this.bucket, inspect(params), 'res', inspect(res));

Expand All @@ -378,13 +518,35 @@ class NamespaceGCP {
////////////////////

async get_object_tagging(params, object_sdk) {
throw new Error('TODO');
dbg.log0('NamespaceGCP.get_object_tagging:', this.bucket, inspect(params));
const obj_tags = (await this.read_object_md(params, object_sdk)).xattr;
// Converting tag dictionary to array of key-value object pairs
const tags = Object.entries(obj_tags).map(([key, value]) => ({ key, value }));
return {
tagging: tags
};
}
async delete_object_tagging(params, object_sdk) {
throw new Error('TODO');
dbg.log0('NamespaceGCP.delete_object_tagging:', this.bucket, inspect(params));
try {
// Set an empty metadata object to remove all tags
const res = await this.gcs.bucket(this.bucket).file(params.key).setMetadata({});
dbg.log0('NamespaceGCP.delete_object_tagging:', this.bucket, inspect(params), 'res', inspect(res));
} catch (err) {
dbg.error('NamespaceGCP.delete_object_tagging error:', err);
}
}
async put_object_tagging(params, object_sdk) {
throw new Error('TODO');
dbg.log0('NamespaceGCP.put_object_tagging:', this.bucket, inspect(params));
try {
// Convert the array of key-value object pairs to a dictionary
const tags_to_put = Object.fromEntries(params.tagging.map(tag => ([tag.key, tag.value])));
const res = await this.gcs.bucket(this.bucket).file(params.key).setMetadata({ metadata: tags_to_put });
dbg.log0('NamespaceGCP.put_object_tagging:', this.bucket, inspect(params), 'res', inspect(res));
} catch (err) {
dbg.error('NamespaceGCP.put_object_tagging error:', err);
}

}

///////////////////
Expand Down
3 changes: 3 additions & 0 deletions src/sdk/object_sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ class ObjectSDK {
/**
* @returns {nb.Namespace}
*/
// resource contains the values of namespace_resource_extended_info
_setup_single_namespace({ resource: r, path: p }, bucket_id, options) {

if (r.endpoint_type === 'NOOBAA') {
Expand Down Expand Up @@ -502,6 +503,8 @@ class ObjectSDK {
private_key,
access_mode: r.access_mode,
stats: this.stats,
hmac_key: { access_id: r.gcp_hmac_key.access_id.unwrap(),
secret_key: r.gcp_hmac_key.secret_key.unwrap() }
});
}
if (r.fs_root_path || r.fs_root_path === '') {
Expand Down
20 changes: 20 additions & 0 deletions src/server/system_services/account_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,26 @@ async function add_external_connection(req) {
};
}

// If the connection is for Google, generate an HMAC key for S3-compatible actions (e.g. multipart uploads)
if (info.endpoint_type === 'GOOGLE') {
dbg.log0('add_external_connection: creating HMAC key for Google connection');
const key_file = JSON.parse(req.rpc_params.secret.unwrap());
const credentials = _.pick(key_file, 'client_email', 'private_key');
const gs_client = new GoogleStorage({ credentials, projectId: key_file.project_id });
try {
const [hmac_key, secret] = await gs_client.createHmacKey(credentials.client_email);
info.gcp_hmac_key = {
access_id: hmac_key.metadata.accessId,
secret_key: system_store.master_key_manager.encrypt_sensitive_string_with_master_key_id(
new SensitiveString(secret), req.account.master_key_id._id)
};
} catch (err) {
// The most likely reason is that the storage account already has 10 existing HMAC keys, which is the limit
dbg.error('add_external_connection: failed to create HMAC key for Google connection', err);
throw new RpcError('INTERNAL_ERROR', 'Failed to create HMAC key for Google connection');
}
}

info.cp_code = req.rpc_params.cp_code || undefined;
info.auth_method = req.rpc_params.auth_method || config.DEFAULT_S3_AUTH_METHOD[info.endpoint_type] || undefined;
info = _.omitBy(info, _.isUndefined);
Expand Down
14 changes: 14 additions & 0 deletions src/server/system_services/master_key_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ class MasterKeysManager {
decipher: crypto.createDecipheriv(m_key.cipher_type, m_key.cipher_key, m_key.cipher_iv)
}, undefined);
}
if (keys.gcp_hmac_key?.secret_key) {
keys.gcp_hmac_key.secret_key = await this.secret_keys_cache.get_with_cache({
encrypted_value: keys.gcp_hmac_key.secret_key,
decipher: crypto.createDecipheriv(m_key.cipher_type, m_key.cipher_key, m_key.cipher_iv)
}, undefined);

}
}
}
}
Expand Down Expand Up @@ -371,6 +378,13 @@ class MasterKeysManager {
master_key_id: ns_resource.account.master_key_id._id
}, undefined);
}
if (ns_resource.connection.gcp_hmac_key?.secret_key) {
ns_resource.connection.gcp_hmac_key.secret_key = await this.secret_keys_cache.get_with_cache({
encrypted_value: ns_resource.connection.gcp_hmac_key.secret_key.unwrap(),
undefined,
master_key_id: ns_resource.account.master_key_id._id
}, undefined);
}
}
}
}
Expand Down
Loading
Loading