Skip to content

Commit f1e6bd3

Browse files
committed
bucket notifications - allow kafka topic in TopicArn, default connection
Signed-off-by: Amit Prinz Setter <[email protected]>
1 parent c7fa146 commit f1e6bd3

File tree

5 files changed

+35
-4
lines changed

5 files changed

+35
-4
lines changed

src/cmd/manage_nsfs.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,8 @@ async function fetch_account_data(action, user_input) {
422422
gid: user_input.user ? undefined : user_input.gid,
423423
new_buckets_path: user_input.new_buckets_path,
424424
fs_backend: user_input.fs_backend ? String(user_input.fs_backend) : config.NSFS_NC_STORAGE_BACKEND
425-
}
425+
},
426+
default_connection: user_input.default_connection === undefined ? undefined : String(user_input.default_connection)
426427
};
427428
if (action === ACTIONS.UPDATE || action === ACTIONS.DELETE) {
428429
// @ts-ignore

src/endpoint/s3/ops/s3_put_bucket_notification.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,38 @@ async function put_bucket_notification(req) {
1111

1212
let topic_configuration = req.body.NotificationConfiguration?.TopicConfiguration;
1313

14+
console.log("AAAAAAAAAAAAAAAAAAAAAA req.object_sdk.requesting_account = ", req.object_sdk.requesting_account);
15+
16+
const default_connection = req.object_sdk.requesting_account.default_connection;
17+
console.log("AAAAAAAAAAAAAAAAAAAAAAAAAAA2 default_connection = ", default_connection);
18+
1419
//adapt to db shcema
1520
if (topic_configuration) {
1621
for (const conf of topic_configuration) {
1722
conf.id = conf.Id;
1823
conf.event = conf.Event;
1924
conf.topic = conf.Topic;
25+
//if Topic is not specified, default to account's default connection
26+
if (conf.Topic && conf.Topic.length > 0 && conf.Topic[0].startsWith('kafka:::topic/')) {
27+
const kafka_topic_parts = conf.Topic[0].split('/');
28+
console.log("AAAAAAAAAAAAAAAAAAAAAAAA5 kafka_topic_parts", kafka_topic_parts);
29+
if (kafka_topic_parts.length != 3) {
30+
throw new S3Error({
31+
code: 'InvalidArgument',
32+
message: "kafka:::topic is invalid. Must be of syntax: kafka:::topic:/connection/topic",
33+
http_code: 400,
34+
detail: conf.Topic[0]
35+
});
36+
}
37+
let connection = default_connection;
38+
if (typeof kafka_topic_parts[1] === 'string' && kafka_topic_parts[1].length > 0) {
39+
connection = kafka_topic_parts[1];
40+
}
41+
const topic = kafka_topic_parts[2];
42+
conf.topic = ['kafka:::topic/' + connection + "/" + topic];
43+
}
44+
console.log("AAAAAAAAAAAAAAAAAAAA3 conf.Topic = ", conf.Topic);
45+
console.log("AAAAAAAAAAAAAAA4 conf.topic = ", conf.topic);
2046
delete conf.Id;
2147
delete conf.Event;
2248
delete conf.Topic;

src/manage_nsfs/manage_nsfs_constants.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ const FROM_FILE = 'from_file';
4646
const ANONYMOUS = 'anonymous';
4747

4848
const VALID_OPTIONS_ACCOUNT = {
49-
'add': new Set(['name', 'uid', 'gid', 'supplemental_groups', 'new_buckets_path', 'user', 'access_key', 'secret_key', 'fs_backend', 'allow_bucket_creation', 'force_md5_etag', 'iam_operate_on_root_account', FROM_FILE, ...CLI_MUTUAL_OPTIONS]),
50-
'update': new Set(['name', 'uid', 'gid', 'supplemental_groups', 'new_buckets_path', 'user', 'access_key', 'secret_key', 'fs_backend', 'allow_bucket_creation', 'force_md5_etag', 'iam_operate_on_root_account', 'new_name', 'regenerate', ...CLI_MUTUAL_OPTIONS]),
49+
'add': new Set(['name', 'uid', 'gid', 'supplemental_groups', 'new_buckets_path', 'user', 'access_key', 'secret_key', 'fs_backend', 'allow_bucket_creation', 'force_md5_etag', 'iam_operate_on_root_account', 'default_connection', FROM_FILE, ...CLI_MUTUAL_OPTIONS]),
50+
'update': new Set(['name', 'uid', 'gid', 'supplemental_groups', 'new_buckets_path', 'user', 'access_key', 'secret_key', 'fs_backend', 'allow_bucket_creation', 'force_md5_etag', 'iam_operate_on_root_account', 'new_name', 'regenerate', 'default_connection', ...CLI_MUTUAL_OPTIONS]),
5151
'delete': new Set(['name', ...CLI_MUTUAL_OPTIONS]),
5252
'list': new Set(['wide', 'show_secrets', 'gid', 'uid', 'user', 'name', 'access_key', ...CLI_MUTUAL_OPTIONS]),
5353
'status': new Set(['name', 'access_key', 'show_secrets', ...CLI_MUTUAL_OPTIONS]),
@@ -142,6 +142,7 @@ const OPTION_TYPE = {
142142
ips: 'string',
143143
force: 'boolean',
144144
anonymous: 'boolean',
145+
default_connection: 'string',
145146
// health options
146147
deployment_type: 'string',
147148
all_account_details: 'boolean',

src/server/system_services/schemas/nsfs_account_schema.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,5 +101,8 @@ module.exports = {
101101
}
102102
}]
103103
},
104+
default_connection: {
105+
type: 'string'
106+
}
104107
}
105108
};

src/util/notifications_util.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ class Notificator {
193193
if (connection_name.startsWith("kafka:::topic/")) {
194194
const connection_parts = connection_name.split('/');
195195
connect_filename = connection_parts[1];
196-
kafka_topic_from_connection_name = connection_parts.length > 2 && connection_parts[3];
196+
kafka_topic_from_connection_name = connection_parts.length > 1 && connection_parts[2];
197197
}
198198

199199
if (this.nc_config_fs) {

0 commit comments

Comments
 (0)