Skip to content

bucket notifications - facilitate notif conf to override connection #8932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/NooBaaNonContainerized/NooBaaCLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ The `account add` command is used to create a new account with customizable opti
```sh
noobaa-cli account add --name <account_name> --uid <uid> --gid <gid> [--user]
[--new_buckets_path][--access_key][--secret_key][--fs_backend]
[--allow_bucket_creation][--force_md5_etag][--anonymous][--from_file][--iam_operate_on_root_account]
[--allow_bucket_creation][--force_md5_etag][--anonymous][--from_file][--iam_operate_on_root_account][--default_connection]
```
#### Flags -
- `name` (Required)
Expand Down Expand Up @@ -135,6 +135,10 @@ noobaa-cli account add --name <account_name> --uid <uid> --gid <gid> [--user]
- Type: Boolean
- Description: Specifies if the account allowed to create root accounts using the IAM API (the default behavior is to create of IAM accounts). See - [IAM - Root Accounts Manager](./../design/iam.md#root-accounts-manager).

- `default_connection`
- Type: String
- Description: A default account for Kafka external servers. See bucket-notifications.md.

### Update Account

The `account update` command is used to update an existing account with customizable options.
Expand All @@ -143,7 +147,7 @@ The `account update` command is used to update an existing account with customiz
```sh
noobaa-cli account update --name <account_name> [--new_name][--uid][--gid][--user]
[--new_buckets_path][--access_key][--secret_key][--regenerate][--fs_backend]
[--allow_bucket_creation][--force_md5_etag][--anonymous][--iam_operate_on_root_account]
[--allow_bucket_creation][--force_md5_etag][--anonymous][--iam_operate_on_root_account][--default_connection]
```
#### Flags -
- `name` (Required)
Expand Down Expand Up @@ -207,6 +211,10 @@ noobaa-cli account update --name <account_name> [--new_name][--uid][--gid][--use
- Type: Boolean
- Description: Specifies if the account allowed to create root accounts using the IAM API (the default behavior is to create of IAM accounts). See - [IAM - Root Accounts Manager](./../design/iam.md#root-accounts-manager).

- `default_connection`
- Type: String
- Description: A default account for Kafka external servers. See bucket-notifications.md.

### Account Status

The `account status` command is used to print the status of the account.
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,8 @@ async function fetch_account_data(action, user_input) {
gid: user_input.user ? undefined : user_input.gid,
new_buckets_path: user_input.new_buckets_path,
fs_backend: user_input.fs_backend ? String(user_input.fs_backend) : config.NSFS_NC_STORAGE_BACKEND
}
},
default_connection: user_input.default_connection === undefined ? undefined : String(user_input.default_connection)
};
if (action === ACTIONS.UPDATE || action === ACTIONS.DELETE) {
// @ts-ignore
Expand Down
25 changes: 25 additions & 0 deletions src/endpoint/s3/ops/s3_put_bucket_notification.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,38 @@ const notif_util = require('../../../util/notifications_util');
async function put_bucket_notification(req) {

let topic_configuration = req.body.NotificationConfiguration?.TopicConfiguration;
const default_connection = req.object_sdk.requesting_account.default_connection;

//adapt to db shcema
if (topic_configuration) {
for (const conf of topic_configuration) {
conf.id = conf.Id;
conf.event = conf.Event;
conf.topic = conf.Topic;
//handle Kafka's topic synax, if present
if (conf.Topic && conf.Topic.length > 0 && conf.Topic[0].startsWith('kafka:::topic/')) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be also good to have a validation on the cli level. that you wouldn't be able to configure default_connection of this format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand.
Account's default_connection is just a string. It doesn't have a "special" syntax (as opposed to conf.Topic's optional "kafka:::topic" syntax).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, if it doesn't have a "special" syntax, its fine

//kafka_topic_parts is, by index:
//kafka_topic_parts[0] = 'kafka:::topic'
//kafka_topic_parts[1] = connection, optional
//kafka_topic_parts[2] = Kafka topic, mandatory
const kafka_topic_parts = conf.Topic[0].split('/');
if (kafka_topic_parts.length !== 3) {
throw new S3Error({
code: 'InvalidArgument',
message: "kafka:::topic is invalid. Must be of syntax: kafka:::topic:/connection/topic",
http_code: 400,
detail: conf.Topic[0]
});
}
//connection is optionally kafka_topic_parts[1], default to account's default_connection
let connection = default_connection;
if (typeof kafka_topic_parts[1] === 'string' && kafka_topic_parts[1].length > 0) {
connection = kafka_topic_parts[1];
}
const topic = kafka_topic_parts[2];
//write the full Topic string with the connection
conf.topic = ['kafka:::topic/' + connection + "/" + topic];
}
delete conf.Id;
delete conf.Event;
delete conf.Topic;
Expand Down
5 changes: 3 additions & 2 deletions src/manage_nsfs/manage_nsfs_constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ const FROM_FILE = 'from_file';
const ANONYMOUS = 'anonymous';

const VALID_OPTIONS_ACCOUNT = {
'add': new Set(['name', 'uid', 'gid', 'supplemental_groups', 'new_buckets_path', 'user', 'access_key', 'secret_key', 'fs_backend', 'allow_bucket_creation', 'force_md5_etag', 'iam_operate_on_root_account', FROM_FILE, ...CLI_MUTUAL_OPTIONS]),
'update': new Set(['name', 'uid', 'gid', 'supplemental_groups', 'new_buckets_path', 'user', 'access_key', 'secret_key', 'fs_backend', 'allow_bucket_creation', 'force_md5_etag', 'iam_operate_on_root_account', 'new_name', 'regenerate', ...CLI_MUTUAL_OPTIONS]),
'add': new Set(['name', 'uid', 'gid', 'supplemental_groups', 'new_buckets_path', 'user', 'access_key', 'secret_key', 'fs_backend', 'allow_bucket_creation', 'force_md5_etag', 'iam_operate_on_root_account', 'default_connection', FROM_FILE, ...CLI_MUTUAL_OPTIONS]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this part of the account configuration. shouldn't it be per bucket?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently in Marc's project the user and connection creation would be done by one admin user (with manage_nsfs cli), while buckets and their notif conf would be done by other (not-admin) users (with s3 rest api). So the admin would be able to do a once-per-system users and connection creation (including user's default connection), and then the other users wouldn't need to know about connections at all.

'update': new Set(['name', 'uid', 'gid', 'supplemental_groups', 'new_buckets_path', 'user', 'access_key', 'secret_key', 'fs_backend', 'allow_bucket_creation', 'force_md5_etag', 'iam_operate_on_root_account', 'new_name', 'regenerate', 'default_connection', ...CLI_MUTUAL_OPTIONS]),
'delete': new Set(['name', ...CLI_MUTUAL_OPTIONS]),
'list': new Set(['wide', 'show_secrets', 'gid', 'uid', 'user', 'name', 'access_key', ...CLI_MUTUAL_OPTIONS]),
'status': new Set(['name', 'access_key', 'show_secrets', ...CLI_MUTUAL_OPTIONS]),
Expand Down Expand Up @@ -142,6 +142,7 @@ const OPTION_TYPE = {
ips: 'string',
force: 'boolean',
anonymous: 'boolean',
default_connection: 'string',
// health options
deployment_type: 'string',
all_account_details: 'boolean',
Expand Down
3 changes: 3 additions & 0 deletions src/server/system_services/schemas/nsfs_account_schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,8 @@ module.exports = {
}
}]
},
default_connection: {
type: 'string'
}
}
};
14 changes: 14 additions & 0 deletions src/test/unit_tests/jest_tests/test_nc_account_cli.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,20 @@ describe('manage nsfs cli account flow', () => {
expect(JSON.parse(res_list).response.reply.map(item => item.name))
.toEqual(expect.arrayContaining([name]));
});

it('cli account add - default connection', async function() {
const action = ACTIONS.ADD;
const { type, name, new_buckets_path, uid, gid } = defaults;
const default_connection = 'defcon';
const account_options = { config_root, name, new_buckets_path, uid, gid, default_connection };
await fs_utils.create_fresh_path(new_buckets_path);
await fs_utils.file_must_exist(new_buckets_path);
await set_path_permissions_and_owner(new_buckets_path, account_options, 0o700);
await exec_manage_cli(type, action, account_options);
const account = await config_fs.get_account_by_name(name, config_fs_account_options);
assert_account(account, account_options, false);
expect(account.default_connection).toBe(default_connection);
});
});

describe('cli update account', () => {
Expand Down
4 changes: 2 additions & 2 deletions src/test/unit_tests/test_notifications.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const http_connect_path = path.join(tmp_connect, http_connect_filename);
//content of connect file, will be written to a file in before()
const http_connect = {
agent_request_object: {"host": "localhost", "port": 9998, "timeout": 1500},
request_options_object: {"auth": "amit:passw", "timeout": 1500},
request_options_object: {"auth": "amit:passw", "timeout": 1500, "path": "/default"},
notification_protocol: 'http',
name: 'http_notif'
};
Expand Down Expand Up @@ -240,7 +240,7 @@ mocha.describe('notifications', function() {
});

const step_wait = 100;
async function notify_await_result({bucket_name, event_name, etag, key, timeout = undefined}) {
async function notify_await_result({bucket_name, event_name, etag, key, url = "/default", timeout = undefined}) {

//remember expected result here so server could compare it to actual result later
expected_bucket = bucket_name;
Expand Down
16 changes: 15 additions & 1 deletion src/util/notifications_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,16 @@ class Notificator {
}
}

async parse_connect_file(connect_filename, decrypt = false) {
async parse_connect_file(connection_name, decrypt = false) {
let connect;
let connect_filename = connection_name;
let kafka_topic_from_connection_name;
if (connection_name.startsWith("kafka:::topic/")) {
const connection_parts = connection_name.split('/');
connect_filename = connection_parts[1];
kafka_topic_from_connection_name = connection_parts.length > 1 && connection_parts[2];
}

if (this.nc_config_fs) {
connect = await this.nc_config_fs.get_connection_by_name(connect_filename);
} else {
Expand All @@ -203,6 +211,11 @@ class Notificator {
connect.request_options_object.auth, connect.master_key_id);
}
load_files(connect);

//use the kafka topic, if it was present in connection_name
if (kafka_topic_from_connection_name) {
connect.topic = kafka_topic_from_connection_name;
}
return connect;
}
}
Expand Down Expand Up @@ -362,6 +375,7 @@ async function test_notifications(notifs, nc_config_dir, req) {
let notif_failure;
try {
connect = await notificator.parse_connect_file(notif.topic[0]);
dbg.log0(`effective connect for notif ${notif.id[0]} is`, connect);
connection = get_connection(connect);
await connection.connect();
await connection.promise_notify(compose_notification_test(req), async (notif_cb, err_cb, err) => {
Expand Down