diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index 6dbabd7d20..11dd19a41e 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -483,7 +483,7 @@ class NamespaceFS { } /** - * @param {nb.ObjectSDK} object_sdk + * @param {nb.ObjectSDK} object_sdk * @returns {nb.NativeFSContext} */ prepare_fs_context(object_sdk) { @@ -1090,7 +1090,9 @@ class NamespaceFS { // end the stream res.end(); - await stream_utils.wait_finished(res, { signal: object_sdk.abort_controller.signal }); + // in case of transform streams such as ChunkFS there is also a readable part. since we expect write stream + // and don't care about the readable part, set readable: false + await stream_utils.wait_finished(res, { readable: false, signal: object_sdk.abort_controller.signal }); object_sdk.throw_if_aborted(); dbg.log0('NamespaceFS: read_object_stream completed file', file_path, { @@ -1209,9 +1211,7 @@ class NamespaceFS { } if (copy_res) { - if (copy_res === copy_status_enum.FALLBACK) { - params.copy_source.nsfs_copy_fallback(); - } else { + if (copy_res !== copy_status_enum.FALLBACK) { // open file after copy link/same inode should use read open mode open_mode = config.NSFS_OPEN_READ_MODE; if (copy_res === copy_status_enum.SAME_INODE) open_path = file_path; @@ -1294,10 +1294,8 @@ class NamespaceFS { const stat = await target_file.stat(fs_context); this._verify_encryption(params.encryption, this._get_encryption_info(stat)); - // handle xattr - // assign user xattr on non copy / copy with xattr_copy header provided const copy_xattr = params.copy_source && params.xattr_copy; - let fs_xattr = copy_xattr ? undefined : to_fs_xattr(params.xattr); + let fs_xattr = to_fs_xattr(params.xattr); // assign noobaa internal xattr - content type, md5, versioning xattr if (params.content_type) { @@ -1339,7 +1337,6 @@ class NamespaceFS { // when object is a dir, xattr are set on the folder itself and the content is in .folder file if (is_dir_content) { - if (params.copy_source) fs_xattr = await this._get_copy_source_xattr(params, fs_context, fs_xattr); await this._assign_dir_content_to_xattr(fs_context, fs_xattr, { ...params, size: stat.size }, copy_xattr); } stat.xattr = { ...stat.xattr, ...fs_xattr }; @@ -1351,12 +1348,11 @@ class NamespaceFS { await native_fs_utils._make_path_dirs(file_path, fs_context); const copy_xattr = params.copy_source && params.xattr_copy; - let fs_xattr = copy_xattr ? {} : to_fs_xattr(params.xattr) || {}; + let fs_xattr = to_fs_xattr(params.xattr) || {}; if (params.content_type) { fs_xattr = fs_xattr || {}; fs_xattr[XATTR_CONTENT_TYPE] = params.content_type; } - if (params.copy_source) fs_xattr = await this._get_copy_source_xattr(params, fs_context, fs_xattr); await this._assign_dir_content_to_xattr(fs_context, fs_xattr, params, copy_xattr); // when .folder exist and it's no upload flow - .folder should be deleted if it exists @@ -1372,13 +1368,6 @@ class NamespaceFS { return upload_info; } - async _get_copy_source_xattr(params, fs_context, fs_xattr) { - const is_source_dir = params.copy_source.key.endsWith('/'); - const source_file_md_path = await this._find_version_path(fs_context, params.copy_source, is_source_dir); - const source_stat = await nb_native().fs.stat(fs_context, source_file_md_path); - return { ...source_stat.xattr, ...fs_xattr }; - } - // move to dest GPFS (wt) / POSIX (w / undefined) - non part upload async _move_to_dest(fs_context, source_path, dest_path, target_file, open_mode, key) { let retries = config.NSFS_RENAME_RETRIES; @@ -1511,7 +1500,7 @@ class NamespaceFS { // Can be finetuned further on if needed and inserting the Semaphore logic inside // Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream) async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) { - const { source_stream } = params; + const { source_stream, copy_source } = params; try { // Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy const md5_enabled = this._is_force_md5_enabled(object_sdk); @@ -1526,8 +1515,14 @@ class NamespaceFS { large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size }); chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1)); - await stream_utils.pipeline([source_stream, chunk_fs]); - await stream_utils.wait_finished(chunk_fs); + if (copy_source) { + await this.read_object_stream(copy_source, object_sdk, chunk_fs); + } else if (params.source_params) { + await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs); + } else { + await stream_utils.pipeline([source_stream, chunk_fs]); + await stream_utils.wait_finished(chunk_fs); + } return { digest: chunk_fs.digest, total_bytes: chunk_fs.total_bytes }; } catch (error) { dbg.error('_upload_stream had error: ', error); @@ -1813,6 +1808,7 @@ class NamespaceFS { upload_params.params.xattr = create_params_parsed.xattr; upload_params.params.storage_class = create_params_parsed.storage_class; upload_params.digest = MD5Async && (((await MD5Async.digest()).toString('hex')) + '-' + multiparts.length); + upload_params.params.content_type = create_params_parsed.content_type; const upload_info = await this._finish_upload(upload_params); diff --git a/src/sdk/object_sdk.js b/src/sdk/object_sdk.js index 594c72fd33..03189af0e2 100644 --- a/src/sdk/object_sdk.js +++ b/src/sdk/object_sdk.js @@ -106,8 +106,8 @@ class ObjectSDK { * in order to handle aborting requests gracefully. The `abort_controller` member will * be used to signal async flows that abort was detected. * @see {@link https://nodejs.org/docs/latest/api/globals.html#class-abortcontroller} - * @param {import('http').IncomingMessage} req - * @param {import('http').ServerResponse} res + * @param {import('http').IncomingMessage} req + * @param {import('http').ServerResponse} res */ setup_abort_controller(req, res) { res.once('error', err => { @@ -158,7 +158,7 @@ class ObjectSDK { } /** - * @param {string} name + * @param {string} name * @returns {Promise} */ async _get_bucket_namespace(name) { @@ -268,7 +268,7 @@ class ObjectSDK { return Boolean(fs_root_path || fs_root_path === ''); } - // validates requests for non nsfs buckets from accounts which are nsfs_only + // validates requests for non nsfs buckets from accounts which are nsfs_only has_non_nsfs_bucket_access(account, ns) { dbg.log1('validate_non_nsfs_bucket: ', account, ns?.write_resource?.resource); if (!account) return false; @@ -524,7 +524,7 @@ class ObjectSDK { /** * Calls the op and report time and error to stats collector. * on_success can be added to update read/write stats (but on_success shouln't throw) - * + * * @template T * @param {{ * op_name: string; @@ -642,7 +642,9 @@ class ObjectSDK { params.content_type = source_md.content_type; } try { - if (params.xattr) params.xattr = _.omitBy(params.xattr, (val, name) => name.startsWith('noobaa-namespace')); + //omitBy iterates all xattr calling startsWith on them. this can include symbols such as XATTR_SORT_SYMBOL. + //in that case startsWith will not apply + if (params.xattr) params.xattr = _.omitBy(params.xattr, (val, name) => name.startsWith?.('noobaa-namespace')); } catch (e) { dbg.log3("Got an error while trying to omitBy param.xattr:", params.xattr, "error:", e); } @@ -658,12 +660,6 @@ class ObjectSDK { params.copy_source.bucket = actual_source_ns.get_bucket(bucket); params.copy_source.obj_id = source_md.obj_id; params.copy_source.version_id = source_md.version_id; - if (source_ns instanceof NamespaceFS) { - params.copy_source.nsfs_copy_fallback = () => { - this._populate_nsfs_copy_fallback({ source_params, source_ns, params }); - params.copy_source = null; - }; - } } else { // source cannot be copied directly (different plaforms, accounts, etc.) // set the source_stream to read from the copy source @@ -671,6 +667,7 @@ class ObjectSDK { source_params.object_md = source_md; source_params.obj_id = source_md.obj_id; source_params.version_id = source_md.version_id; + source_params.bucket = actual_source_ns.get_bucket(bucket); // param size is needed when doing an upload. Can be overrided during ranged writes params.size = source_md.size; @@ -684,7 +681,13 @@ class ObjectSDK { // if the source namespace is NSFS then we need to pass the read_object_stream the read_stream if (source_ns instanceof NamespaceFS) { - this._populate_nsfs_copy_fallback({ source_params, source_ns, params }); + if (target_ns instanceof NamespaceFS) { + params.source_ns = actual_source_ns; + params.source_params = source_params; + } else { + //this._populate_nsfs_copy_fallback({ source_params, source_ns, params }); + throw new Error('TODO fix _populate_nsfs_copy_fallback'); + } } else { params.source_stream = await source_ns.read_object_stream(source_params, this); } @@ -701,9 +704,9 @@ class ObjectSDK { } } - // nsfs copy_object & server side copy consisted of link and a fallback to + // nsfs copy_object & server side copy consisted of link and a fallback to // read stream and then upload stream - // nsfs copy object when can't server side copy - fallback directly + // nsfs copy object when can't server side copy - fallback directly _populate_nsfs_copy_fallback({ source_ns, params, source_params }) { const read_stream = new stream.PassThrough(); source_ns.read_object_stream(source_params, this, read_stream) diff --git a/src/test/system_tests/ceph_s3_tests/s3-tests-lists/nsfs_s3_tests_pending_list.txt b/src/test/system_tests/ceph_s3_tests/s3-tests-lists/nsfs_s3_tests_pending_list.txt index 4c24acc10d..6191e5aad6 100644 --- a/src/test/system_tests/ceph_s3_tests/s3-tests-lists/nsfs_s3_tests_pending_list.txt +++ b/src/test/system_tests/ceph_s3_tests/s3-tests-lists/nsfs_s3_tests_pending_list.txt @@ -153,11 +153,7 @@ s3tests_boto3/functional/test_s3.py::test_put_object_ifmatch_failed s3tests_boto3/functional/test_s3.py::test_put_object_ifnonmatch_failed s3tests_boto3/functional/test_s3.py::test_put_object_ifnonmatch_overwrite_existed_failed s3tests_boto3/functional/test_s3.py::test_object_raw_authenticated_bucket_gone -s3tests_boto3/functional/test_s3.py::test_object_copy_to_itself_with_metadata s3tests_boto3/functional/test_s3.py::test_object_copy_canned_acl -s3tests_boto3/functional/test_s3.py::test_object_copy_retaining_metadata -s3tests_boto3/functional/test_s3.py::test_object_copy_replacing_metadata -s3tests_boto3/functional/test_s3.py::test_object_copy_versioning_multipart_upload s3tests_boto3/functional/test_s3.py::test_multipart_upload_missing_part s3tests_boto3/functional/test_s3.py::test_multipart_upload_incorrect_etag s3tests_boto3/functional/test_s3.py::test_atomic_dual_conditional_write_1mb diff --git a/src/test/unit_tests/test_namespace_fs.js b/src/test/unit_tests/test_namespace_fs.js index ecabff5009..d334586f6b 100644 --- a/src/test/unit_tests/test_namespace_fs.js +++ b/src/test/unit_tests/test_namespace_fs.js @@ -1582,12 +1582,14 @@ mocha.describe('namespace_fs copy object', function() { assert.deepStrictEqual(xattr, { ...add_user_prefix(read_md_res.xattr), [XATTR_DIR_CONTENT]: `${read_md_res.size}` }); assert.equal(stream_content_type, read_md_res.content_type); + const copy_source = { bucket: upload_bkt, key: key1 }; await ns_tmp.upload_object({ bucket: upload_bkt, key: key2, - copy_source: { bucket: upload_bkt, key: key1 }, + copy_source: copy_source, size: 100, - xattr_copy: true + xattr_copy: true, + xattr: await _get_source_copy_xattr(copy_source, ns_tmp, dummy_object_sdk) }, dummy_object_sdk); const file_path2 = ns_tmp_bucket_path + '/' + key2; xattr = await get_xattr(file_path2); @@ -1622,12 +1624,14 @@ mocha.describe('namespace_fs copy object', function() { assert.deepStrictEqual(xattr, { ...add_user_prefix(read_md_res.xattr) }); assert.equal(stream_content_type, read_md_res.content_type); + const copy_source = { bucket: upload_bkt, key: src_key }; await ns_tmp.upload_object({ bucket: upload_bkt, key: dst_key, - copy_source: { bucket: upload_bkt, key: src_key }, + copy_source: copy_source, size: 100, xattr_copy: true, + xattr: await _get_source_copy_xattr(copy_source, ns_tmp, dummy_object_sdk) }, dummy_object_sdk); const file_path2 = ns_tmp_bucket_path + '/' + dst_key; xattr = await get_xattr(file_path2); @@ -1663,12 +1667,14 @@ mocha.describe('namespace_fs copy object', function() { assert.deepStrictEqual(xattr, { ...add_user_prefix(read_md_res.xattr) }); assert.equal(stream_content_type, read_md_res.content_type); + const copy_source = { bucket: upload_bkt, key: src_key }; await ns_tmp.upload_object({ bucket: upload_bkt, key: dst_key, - copy_source: { bucket: upload_bkt, key: src_key }, + copy_source: copy_source, size: 0, - xattr_copy: true + xattr_copy: true, + xattr: await _get_source_copy_xattr(copy_source, ns_tmp, dummy_object_sdk) }, dummy_object_sdk); const file_path2 = ns_tmp_bucket_path + '/' + dst_key; xattr = await get_xattr(file_path2); @@ -1694,6 +1700,16 @@ mocha.describe('namespace_fs copy object', function() { }); +//simulates object_sdk.fix_copy_source_params filtering of source xattr for copy object tests +async function _get_source_copy_xattr(copy_source, source_ns, object_sdk) { + const read_md_res = await source_ns.read_object_md({ + bucket: copy_source.bucket, + key: copy_source.key + }, object_sdk); + const res = _.omitBy(read_md_res.xattr, (val, name) => name.startsWith?.('noobaa-namespace')); + return res; +} + async function list_objects(ns, bucket, delimiter, prefix, dummy_object_sdk) { const res = await ns.list_objects({ bucket: bucket,