Skip to content

Commit

Permalink
bucket notifications - validate notifications conf on change (gh issu…
Browse files Browse the repository at this point in the history
…e 8649)

Signed-off-by: Amit Prinz Setter <[email protected]>
  • Loading branch information
alphaprinz committed Jan 15, 2025
1 parent a5b95e7 commit c634b40
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 21 deletions.
13 changes: 3 additions & 10 deletions src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async function get_bucket_status(data) {
* @param {Object} data
* @returns { Promise<{ code: typeof ManageCLIResponse.BucketUpdated, detail: Object }>}
*/
async function update_bucket(data, user_input) {
async function update_bucket(data) {
const cur_name = data.name;
const new_name = data.new_name;
const name_update = is_name_update(data);
Expand All @@ -210,13 +210,6 @@ async function update_bucket(data, user_input) {

let parsed_bucket_data;

if (user_input.notifications) {
//notifications are tested before they can be updated
const test_notif_err = await notifications_util.test_notifications(data, config_fs.connect_dir_path);
if (test_notif_err) {
throw_cli_error(ManageCLIError.InvalidArgument, "Failed to update notifications", test_notif_err);
}
}
if (name_update) {
parsed_bucket_data = await config_fs.create_bucket_config_file({ ...data, name: new_name });
await config_fs.delete_bucket_config_file(cur_name);
Expand Down Expand Up @@ -276,15 +269,15 @@ async function delete_bucket(data, force) {
*/
async function bucket_management(action, user_input) {
const data = action === ACTIONS.LIST ? undefined : await fetch_bucket_data(action, user_input);
await manage_nsfs_validations.validate_bucket_args(config_fs, data, action);
await manage_nsfs_validations.validate_bucket_args(config_fs, data, action, user_input);

let response = {};
if (action === ACTIONS.ADD) {
response = await add_bucket(data);
} else if (action === ACTIONS.STATUS) {
response = await get_bucket_status(data);
} else if (action === ACTIONS.UPDATE) {
response = await update_bucket(data, user_input);
response = await update_bucket(data);
} else if (action === ACTIONS.DELETE) {
const force = get_boolean_or_string_value(user_input.force);
response = await delete_bucket(data, force);
Expand Down
22 changes: 22 additions & 0 deletions src/endpoint/s3/ops/s3_put_bucket_notification.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
'use strict';

const S3Error = require('../s3_errors').S3Error;
const notif_util = require('../../../util/notifications_util');
const config = require('../../../../config')
const {ConfigFS} = require('../../../sdk/config_fs')

/**
* http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
Expand All @@ -22,6 +25,25 @@ async function put_bucket_notification(req) {
delete conf.Topic;
}

console.log("req.object_sdk.nsfs_config_root =", req.object_sdk.nsfs_config_root);

//test new notifications.
//if test notification fails, fail the put op
let config_fs = null;
if (req.object_sdk.nsfs_config_root) {
config_fs = new ConfigFS(req.object_sdk.nsfs_config_root);
}
const err = await notif_util.test_notifications(topic_configuration,
config_fs?.connections_dir_path);
if (err) {
throw new S3Error({
code: 'InvalidArgument',
message: JSON.stringify(err.message),
http_code: 400,
detail: err.toString()
});
}

const reply = await req.object_sdk.put_bucket_notification({
bucket_name: req.params.bucket,
notifications: topic_configuration
Expand Down
13 changes: 12 additions & 1 deletion src/manage_nsfs/manage_nsfs_validations.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const { TYPES, ACTIONS, VALID_OPTIONS, OPTION_TYPE, FROM_FILE, BOOLEAN_STRING_VA
GLACIER_ACTIONS, LIST_UNSETABLE_OPTIONS, ANONYMOUS, DIAGNOSE_ACTIONS, UPGRADE_ACTIONS } = require('../manage_nsfs/manage_nsfs_constants');
const iam_utils = require('../endpoint/iam/iam_utils');
const { check_root_account_owns_user } = require('../nc/nc_utils');
const notifications_util = require('../util/notifications_util');

/////////////////////////////
//// GENERAL VALIDATIONS ////
Expand Down Expand Up @@ -384,8 +385,9 @@ async function check_new_access_key_exists(config_fs, action, data) {
* @param {import('../sdk/config_fs').ConfigFS} config_fs
* @param {object} data
* @param {string} action
* @param {object} user_input
*/
async function validate_bucket_args(config_fs, data, action) {
async function validate_bucket_args(config_fs, data, action, user_input) {
if (action === ACTIONS.ADD || action === ACTIONS.UPDATE) {
if (action === ACTIONS.ADD) native_fs_utils.validate_bucket_creation({ name: data.name });
if ((action === ACTIONS.UPDATE) && (data.new_name !== undefined)) native_fs_utils.validate_bucket_creation({ name: data.new_name });
Expand Down Expand Up @@ -439,6 +441,15 @@ async function validate_bucket_args(config_fs, data, action) {
}
}
}

//if there's a change to the bucket's notifications, we need to test them
//if one of the specified notifications fail, we need to fail the user's request
if (user_input.notifications) {
const test_notif_err = await notifications_util.test_notifications(user_input.notifications, config_fs.connections_dir_path);
if (test_notif_err) {
throw_cli_error(ManageCLIError.InvalidArgument, "Failed to update notifications", test_notif_err);
}
}
}

/////////////////////////////
Expand Down
36 changes: 26 additions & 10 deletions src/util/notifications_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class HttpNotificator {
return;
}
dbg.error("Notify err =", err);
promise_failure_cb(JSON.stringify(notif)).then(resolve);
promise_failure_cb(JSON.stringify(notif), err).then(resolve);
});
req.on('timeout', () => {
dbg.error("Notify timeout");
Expand Down Expand Up @@ -249,7 +249,7 @@ class KafkaNotificator {
Date.now(),
(err, offset) => {
if (err) {
promise_failure_cb(JSON.stringify(notif)).then(resolve);
promise_failure_cb(JSON.stringify(notif), err).then(resolve);
} else {
resolve();
}
Expand Down Expand Up @@ -300,19 +300,35 @@ function get_connection(connect) {
}


async function test_notifications(bucket, connect_files_dir) {
async function test_notifications(notifs, connect_files_dir) {
const notificator = new Notificator({connect_files_dir});
for (const notif of bucket.notifications) {
const connect = await notificator.parse_connect_file(notif.connect);
dbg.log1("testing notif", notif);
for (const notif of notifs) {
let connect;
let connection;
let failure = false;
let notif_failure;
try {
const connection = get_connection(connect);
connect = await notificator.parse_connect_file(notif.topic[0]);
connection = get_connection(connect);
await connection.connect();
await connection.promise_notify({notif: "test notification"}, async err => err);
connection.destroy();
await connection.promise_notify({notif: "test notification"}, async (notif_cb, err_cb) => {
failure = true;
notif_failure = err_cb;
});
if (failure) {
if (notif_failure) {
throw notif_failure;
}
//no error was thrown during notify, throw generic error
throw new Error();
}
} catch (err) {
dbg.error("Connection failed for", connect);
dbg.error("Connection failed for", notif, ", connect =", connect, ", err = ", err);
return err;
} finally {
if (connection) {
connection.destroy();
}
}
}
}
Expand Down

0 comments on commit c634b40

Please sign in to comment.