diff --git a/tests/functional/object/mcg/test_bucket_policy.py b/tests/functional/object/mcg/test_bucket_policy.py index e35e0bde54a..af303ca8c78 100644 --- a/tests/functional/object/mcg/test_bucket_policy.py +++ b/tests/functional/object/mcg/test_bucket_policy.py @@ -850,6 +850,212 @@ def test_bucket_policy_verify_invalid_scenarios( f"{e.response} received invalid error code {response.error['Code']}" ) + @tier2 + @pytest.mark.polarion_id("OCS-5764") + @pytest.mark.bugzilla("2222487") + @pytest.mark.parametrize( + argnames="not_policy", + argvalues=[ + pytest.param( + "NotPrincipal", + ), + pytest.param( + "NotAction", + ), + pytest.param( + "NotResource", + ), + ], + ) + def test_bucket_policy_with_not_policy(self, mcg_obj, bucket_factory, not_policy): + """ + Test "NotPrincipal", "NotAction" and "NotResource" policy in Noobaa for bucket policy + + """ + + # create the bucket + obc = bucket_factory(interface="OC", amount=1)[0] + obc_obj = OBC(obc.name) + logger.info(f"Bucket created {obc_obj.obc_name}") + + # apply the appropriate policy based on not_policy param + if not_policy == "NotPrincipal": + + # create new noobaa account + account = "new-user-" + str(uuid.uuid4().hex) + user = NoobaaAccount( + mcg=mcg_obj, + name=account, + email=f"{account}@email.com", + ) + logger.info(f"Created new user: {user.account_name}") + + # upload to the bucket using the account used for creating obc + s3_put_object( + s3_obj=obc_obj, + bucketname=obc_obj.obc_name, + object_key="Random-1.txt", + data="Random data", + ) + logger.info( + f"Successfully uploaded object using user {obc_obj.obc_account}" + ) + + # apply the bucket policy that will let any user + # to upload object to the bucket + bucket_policy_gen = gen_bucket_policy( + user_list="*", + actions_list=["PutObject"], + resources_list=[f'{obc_obj.obc_name}/{"*"}'], + ) + bucket_policy = json.dumps(bucket_policy_gen) + put_bucket_policy(mcg_obj, obc_obj.obc_name, bucket_policy) + logger.info(f"Successfully applied bucket policy: {bucket_policy}") + + # test that upload object works with the new user + s3_put_object( + s3_obj=user, + bucketname=obc_obj.obc_name, + object_key="Random-2.txt", + data="some data", + ) + logger.info( + f"Successfully uploaded object using new user {user.account_name}" + ) + + # apply NotPrincipal bucket policy on the bucket + # which will deny any user except the original user + # who created the bucket + bucket_policy_gen = gen_bucket_policy( + user_list=obc_obj.obc_account, + actions_list=["PutObject"], + effect="Deny", + resources_list=[f'{obc_obj.obc_name}/{"*"}'], + ) + bucket_policy_gen["Statement"][0].pop("Principal") + bucket_policy_gen["Statement"][0]["NotPrincipal"] = { + "AWS": obc_obj.obc_account + } + logger.info(bucket_policy_gen) + bucket_policy = json.dumps(bucket_policy_gen) + put_bucket_policy(mcg_obj, obc_obj.obc_name, bucket_policy) + logger.info( + f"Successfully applied NotPrincipal bucket policy: {bucket_policy}" + ) + + # upload object using the new-user + # expected a failure because of AccessDenied error + try: + s3_put_object( + s3_obj=user, + bucketname=obc_obj.obc_name, + object_key="Random-3.txt", + data="Random data", + ) + except boto3exception.ClientError as e: + assert "AccessDenied" in e.args[0], f"Failed unexpctedly, {e.args[0]}" + logger.info(f"Failed as expected, {e.args[0]}") + else: + assert False, "Passed unexpectedly" + + elif not_policy == "NotAction": + + # upload object to the bucket + s3_put_object( + s3_obj=obc_obj, + bucketname=obc_obj.obc_name, + object_key="Random-1.txt", + data="Random data", + ) + logger.info("Successfully uploaded object to the bucket") + + # apply NotAction policy to the bucket + # make sure every action is denied except the + # GetObject + bucket_policy_gen = gen_bucket_policy( + user_list="*", + actions_list=["GetObject"], + effect="Deny", + resources_list=[f'{obc_obj.obc_name}/{"*"}'], + ) + not_action = bucket_policy_gen["Statement"][0].get("Action") + bucket_policy_gen["Statement"][0].pop("Action") + bucket_policy_gen["Statement"][0]["NotAction"] = not_action + bucket_policy = json.dumps(bucket_policy_gen) + put_bucket_policy(mcg_obj, obc_obj.obc_name, bucket_policy) + logger.info(f"Successfully applied bucket policy: {bucket_policy}") + + time.sleep(60) + response = s3_get_object( + s3_obj=obc_obj, bucketname=obc_obj.obc_name, object_key="Random-1.txt" + ) + logger.info(f"Get object response: {response}") + + # try PutObject and its expected to fail + try: + s3_put_object( + s3_obj=obc_obj, + bucketname=obc_obj.obc_name, + object_key="Random-2.txt", + data="Random data", + ) + except boto3exception.ClientError as e: + assert "AccessDenied" in e.args[0], f"Failed unexpctedly, {e.args[0]}" + logger.info(f"Failed as expected, {e.args[0]}") + else: + assert False, "Passed unexpectedly" + + elif not_policy == "NotResource": + prefix_1 = "first" + prefix_2 = "second" + + # upload object to prefix_1 + # make sure it works + s3_put_object( + s3_obj=obc_obj, + bucketname=f"{obc_obj.obc_name}", + object_key=f"{prefix_1}/Random-1.txt", + data="Random data", + ) + logger.info(f"Sucessfully uploaded object to prefix {prefix_1}") + + # apply NotResource policy where upload is restricted + # to every prefix except prefix_2 + bucket_policy_gen = gen_bucket_policy( + user_list="*", + actions_list=["PutObject"], + effect="Deny", + resources_list=[f"{obc_obj.obc_name}/{prefix_2}/*"], + ) + not_resource = bucket_policy_gen["Statement"][0].get("Resource") + bucket_policy_gen["Statement"][0].pop("Resource") + bucket_policy_gen["Statement"][0]["NotResource"] = not_resource + logger.info(bucket_policy_gen) + bucket_policy = json.dumps(bucket_policy_gen) + put_bucket_policy(mcg_obj, obc_obj.obc_name, bucket_policy) + logger.info(f"Successfully applied {bucket_policy}") + + s3_put_object( + s3_obj=obc_obj, + bucketname=f"{obc_obj.obc_name}", + object_key=f"{prefix_2}/Random-2.txt", + data="Random data", + ) + logger.info(f"Successfully uploaded object to prefix {prefix_2}") + + try: + s3_put_object( + s3_obj=obc_obj, + bucketname=obc_obj.obc_name, + object_key=f"{prefix_1}/Random-3.txt", + data="Random data", + ) + except boto3exception.ClientError as e: + logger.info(f"Failed as expected, {e.args[0]}") + assert "AccessDenied" in e.args[0], f"Failed unexpctedly, {e.args[0]}" + else: + assert False, "Passed unexpectedly" + @pytest.mark.polarion_id("OCS-2451") @pytest.mark.bugzilla("1893163") @skipif_ocs_version("<4.6")