Skip to content

Commit

Permalink
Merge pull request #8344 from romayalon/romy-warp-concurrency-directo…
Browse files Browse the repository at this point in the history
…ries-issues

NC | Warp concurrent directories creation/deletion
  • Loading branch information
romayalon committed Sep 15, 2024
2 parents 4ece070 + aa1d857 commit 0e8c024
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 10 deletions.
1 change: 1 addition & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ config.NSFS_BUF_POOL_WARNING_TIMEOUT = 2 * 60 * 1000;
config.NSFS_SEM_WARNING_TIMEOUT = 10 * 60 * 1000;
// number of rename retries in case of deleted destination directory
config.NSFS_RENAME_RETRIES = 10;
config.NSFS_MKDIR_PATH_RETRIES = 3;

config.NSFS_VERSIONING_ENABLED = true;
config.NSFS_UPDATE_ISSUES_REPORT_ENABLED = true;
Expand Down
6 changes: 3 additions & 3 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ class NamespaceFS {
const same_inode = params.copy_source && copy_res === copy_status_enum.SAME_INODE;
const is_dir_content = this._is_directory_content(file_path, params.key);

let stat = await target_file.stat(fs_context);
const stat = await target_file.stat(fs_context);
this._verify_encryption(params.encryption, this._get_encryption_info(stat));

// handle xattr
Expand Down Expand Up @@ -1335,15 +1335,14 @@ class NamespaceFS {

if (!same_inode && !part_upload) {
await this._move_to_dest(fs_context, upload_path, file_path, target_file, open_mode, params.key);
if (config.NSFS_TRIGGER_FSYNC) await nb_native().fs.fsync(fs_context, path.dirname(file_path));
}

// when object is a dir, xattr are set on the folder itself and the content is in .folder file
if (is_dir_content) {
if (params.copy_source) fs_xattr = await this._get_copy_source_xattr(params, fs_context, fs_xattr);
await this._assign_dir_content_to_xattr(fs_context, fs_xattr, { ...params, size: stat.size }, copy_xattr);
}
stat = await nb_native().fs.stat(fs_context, file_path);
stat.xattr = { ...stat.xattr, ...fs_xattr };
const upload_info = this._get_upload_info(stat, fs_xattr && fs_xattr[XATTR_VERSION_ID]);
return upload_info;
}
Expand Down Expand Up @@ -1396,6 +1395,7 @@ class NamespaceFS {
} else {
await this._move_to_dest_version(fs_context, source_path, dest_path, target_file, key, open_mode);
}
if (config.NSFS_TRIGGER_FSYNC) await nb_native().fs.fsync(fs_context, path.dirname(dest_path));
break;
} catch (err) {
retries -= 1;
Expand Down
72 changes: 72 additions & 0 deletions src/test/unit_tests/jest_tests/test_nsfs_concurrency.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/* Copyright (C) 2016 NooBaa */
'use strict';

const path = require('path');
const P = require('../../../util/promise');
const fs_utils = require('../../../util/fs_utils');
const NamespaceFS = require('../../../sdk/namespace_fs');
const buffer_utils = require('../../../util/buffer_utils');
const { TMP_PATH } = require('../../system_tests/test_utils');
const { crypto_random_string } = require('../../../util/string_utils');
const endpoint_stats_collector = require('../../../sdk/endpoint_stats_collector');

function make_dummy_object_sdk(nsfs_config, uid, gid) {
return {
requesting_account: {
nsfs_account_config: nsfs_config && {
uid: uid || process.getuid(),
gid: gid || process.getgid(),
backend: '',
}
},
abort_controller: new AbortController(),
throw_if_aborted() {
if (this.abort_controller.signal.aborted) throw new Error('request aborted signal');
}
};
}

const DUMMY_OBJECT_SDK = make_dummy_object_sdk(true);
describe('test nsfs concurrency', () => {
const tmp_fs_path = path.join(TMP_PATH, 'test_nsfs_concurrency');

const nsfs = new NamespaceFS({
bucket_path: tmp_fs_path,
bucket_id: '1',
namespace_resource_id: undefined,
access_mode: undefined,
versioning: 'DISABLED',
force_md5_etag: false,
stats: endpoint_stats_collector.instance(),
});

beforeEach(async () => {
await fs_utils.create_fresh_path(tmp_fs_path);
});

afterEach(async () => {
await fs_utils.folder_delete(tmp_fs_path);
});

it('multiple puts of the same nested key', async () => {
const bucket = 'bucket1';
const key = 'dir1/key1';
const res_etags = [];
for (let i = 0; i < 15; i++) {
const random_data = Buffer.from(String(crypto_random_string(7)));
const body = buffer_utils.buffer_to_read_stream(random_data);
nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK)
.catch(err => {
console.log('put the same key error - ', err);
throw err;
}).then(res => {
console.log('upload res', res);
res_etags.push(res.etag);
});
await nsfs.delete_object({ bucket: bucket, key: key }, DUMMY_OBJECT_SDK).catch(err => console.log('delete the same key error - ', err));

}
await P.delay(5000);
expect(res_etags).toHaveLength(15);
}, 6000);
});
28 changes: 21 additions & 7 deletions src/util/native_fs_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,29 @@ async function _generate_unique_path(fs_context, tmp_dir_path) {
// opens open_path on POSIX, and on GPFS it will open open_path parent folder
async function open_file(fs_context, bucket_path, open_path, open_mode = config.NSFS_OPEN_READ_MODE,
file_permissions = config.BASE_MODE_FILE) {
let retries = config.NSFS_MKDIR_PATH_RETRIES;

const dir_path = path.dirname(open_path);
if ((open_mode === 'wt' || open_mode === 'w') && dir_path !== bucket_path) {
dbg.log1(`NamespaceFS._open_file: mode=${open_mode} creating dirs`, open_path, bucket_path);
await _make_path_dirs(open_path, fs_context);
}
dbg.log1(`NamespaceFS._open_file: mode=${open_mode}`, open_path);
// for 'wt' open the tmpfile with the parent dir path
const actual_open_path = open_mode === 'wt' ? dir_path : open_path;
return nb_native().fs.open(fs_context, actual_open_path, open_mode, get_umasked_mode(file_permissions));
const should_create_path_dirs = (open_mode === 'wt' || open_mode === 'w') && dir_path !== bucket_path;
for (;;) {
try {
if (should_create_path_dirs) {
dbg.log1(`NamespaceFS._open_file: mode=${open_mode} creating dirs`, open_path, bucket_path);
await _make_path_dirs(open_path, fs_context);
}
dbg.log1(`NamespaceFS._open_file: mode=${open_mode}`, open_path);
// for 'wt' open the tmpfile with the parent dir path
const fd = await nb_native().fs.open(fs_context, actual_open_path, open_mode, get_umasked_mode(file_permissions));
return fd;
} catch (err) {
dbg.warn(`native_fs_utils.open_file Retrying retries=${retries} mode=${open_mode} open_path=${open_path} dir_path=${dir_path} actual_open_path=${actual_open_path}`, err);
if (err.code !== 'ENOENT') throw err;
// case of concurrennt deletion of the dir_path
if (retries <= 0 || !should_create_path_dirs) throw err;
retries -= 1;
}
}
}

/**
Expand Down

0 comments on commit 0e8c024

Please sign in to comment.