Skip to content

Commit

Permalink
Standardize AWS credential names (apache#922)
Browse files Browse the repository at this point in the history
* first try

* glue + s3

* add dynamo

* change from aws. to client.

* add missing label

* change to PropertyUtil.get_first_property_value

* add unit tests for glue and dynamodb

* add unit tests for fileio

* add doc

* revise doc

* resolve review comments

* adopt kevinjqliu's suggestions
  • Loading branch information
HonahX committed Jul 19, 2024
1 parent 4282d2f commit d0bfb4a
Show file tree
Hide file tree
Showing 13 changed files with 487 additions and 79 deletions.
194 changes: 139 additions & 55 deletions mkdocs/docs/configuration.md

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
RecursiveDict,
)
from pyiceberg.utils.config import Config, merge_config
from pyiceberg.utils.deprecated import deprecated

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -100,6 +101,21 @@
re.X,
)

DEPRECATED_PROFILE_NAME = "profile_name"
DEPRECATED_REGION = "region_name"
DEPRECATED_BOTOCORE_SESSION = "botocore_session"
DEPRECATED_ACCESS_KEY_ID = "aws_access_key_id"
DEPRECATED_SECRET_ACCESS_KEY = "aws_secret_access_key"
DEPRECATED_SESSION_TOKEN = "aws_session_token"
DEPRECATED_PROPERTY_NAMES = {
DEPRECATED_PROFILE_NAME,
DEPRECATED_REGION,
DEPRECATED_BOTOCORE_SESSION,
DEPRECATED_ACCESS_KEY_ID,
DEPRECATED_SECRET_ACCESS_KEY,
DEPRECATED_SESSION_TOKEN,
}


class CatalogType(Enum):
REST = "rest"
Expand Down Expand Up @@ -692,6 +708,17 @@ def __repr__(self) -> str:


class MetastoreCatalog(Catalog, ABC):
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)

for property_name in DEPRECATED_PROPERTY_NAMES:
if self.properties.get(property_name):
deprecated(
deprecated_in="0.7.0",
removed_in="0.8.0",
help_message=f"The property {property_name} is deprecated. Please use properties that start with client., glue., and dynamo. instead",
)(lambda: None)()

def create_table_transaction(
self,
identifier: Union[str, Identifier],
Expand Down
35 changes: 28 additions & 7 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
import boto3

from pyiceberg.catalog import (
DEPRECATED_ACCESS_KEY_ID,
DEPRECATED_BOTOCORE_SESSION,
DEPRECATED_PROFILE_NAME,
DEPRECATED_REGION,
DEPRECATED_SECRET_ACCESS_KEY,
DEPRECATED_SESSION_TOKEN,
ICEBERG,
METADATA_LOCATION,
PREVIOUS_METADATA_LOCATION,
Expand All @@ -47,7 +53,7 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import load_file_io
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
Expand Down Expand Up @@ -78,17 +84,32 @@
ACTIVE = "ACTIVE"
ITEM = "Item"

DYNAMODB_PROFILE_NAME = "dynamodb.profile-name"
DYNAMODB_REGION = "dynamodb.region"
DYNAMODB_ACCESS_KEY_ID = "dynamodb.access-key-id"
DYNAMODB_SECRET_ACCESS_KEY = "dynamodb.secret-access-key"
DYNAMODB_SESSION_TOKEN = "dynamodb.session-token"


class DynamoDbCatalog(MetastoreCatalog):
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)

from pyiceberg.table import PropertyUtil

session = boto3.Session(
profile_name=properties.get("profile_name"),
region_name=properties.get("region_name"),
botocore_session=properties.get("botocore_session"),
aws_access_key_id=properties.get("aws_access_key_id"),
aws_secret_access_key=properties.get("aws_secret_access_key"),
aws_session_token=properties.get("aws_session_token"),
profile_name=PropertyUtil.get_first_property_value(properties, DYNAMODB_PROFILE_NAME, DEPRECATED_PROFILE_NAME),
region_name=PropertyUtil.get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION, DEPRECATED_REGION),
botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION),
aws_access_key_id=PropertyUtil.get_first_property_value(
properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, DEPRECATED_ACCESS_KEY_ID
),
aws_secret_access_key=PropertyUtil.get_first_property_value(
properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, DEPRECATED_SECRET_ACCESS_KEY
),
aws_session_token=PropertyUtil.get_first_property_value(
properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN, DEPRECATED_SESSION_TOKEN
),
)
self.dynamodb = session.client(DYNAMODB_CLIENT)
self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT)
Expand Down
33 changes: 27 additions & 6 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
)

from pyiceberg.catalog import (
DEPRECATED_ACCESS_KEY_ID,
DEPRECATED_BOTOCORE_SESSION,
DEPRECATED_PROFILE_NAME,
DEPRECATED_REGION,
DEPRECATED_SECRET_ACCESS_KEY,
DEPRECATED_SESSION_TOKEN,
EXTERNAL_TABLE,
ICEBERG,
LOCATION,
Expand All @@ -58,6 +64,7 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
Expand Down Expand Up @@ -117,6 +124,12 @@
ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional"
ICEBERG_FIELD_CURRENT = "iceberg.field.current"

GLUE_PROFILE_NAME = "glue.profile-name"
GLUE_REGION = "glue.region"
GLUE_ACCESS_KEY_ID = "glue.access-key-id"
GLUE_SECRET_ACCESS_KEY = "glue.secret-access-key"
GLUE_SESSION_TOKEN = "glue.session-token"


def _construct_parameters(
metadata_location: str, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None
Expand Down Expand Up @@ -285,13 +298,21 @@ class GlueCatalog(MetastoreCatalog):
def __init__(self, name: str, **properties: Any):
super().__init__(name, **properties)

from pyiceberg.table import PropertyUtil

session = boto3.Session(
profile_name=properties.get("profile_name"),
region_name=properties.get("region_name"),
botocore_session=properties.get("botocore_session"),
aws_access_key_id=properties.get("aws_access_key_id"),
aws_secret_access_key=properties.get("aws_secret_access_key"),
aws_session_token=properties.get("aws_session_token"),
profile_name=PropertyUtil.get_first_property_value(properties, GLUE_PROFILE_NAME, DEPRECATED_PROFILE_NAME),
region_name=PropertyUtil.get_first_property_value(properties, GLUE_REGION, AWS_REGION, DEPRECATED_REGION),
botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION),
aws_access_key_id=PropertyUtil.get_first_property_value(
properties, GLUE_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, DEPRECATED_ACCESS_KEY_ID
),
aws_secret_access_key=PropertyUtil.get_first_property_value(
properties, GLUE_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, DEPRECATED_SECRET_ACCESS_KEY
),
aws_session_token=PropertyUtil.get_first_property_value(
properties, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN, DEPRECATED_SESSION_TOKEN
),
)
self.glue: GlueClient = session.client("glue", endpoint_url=properties.get(GLUE_CATALOG_ENDPOINT))

Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@

logger = logging.getLogger(__name__)

AWS_REGION = "client.region"
AWS_ACCESS_KEY_ID = "client.access-key-id"
AWS_SECRET_ACCESS_KEY = "client.secret-access-key"
AWS_SESSION_TOKEN = "client.session-token"
S3_ENDPOINT = "s3.endpoint"
S3_ACCESS_KEY_ID = "s3.access-key-id"
S3_SECRET_ACCESS_KEY = "s3.secret-access-key"
Expand Down
14 changes: 10 additions & 4 deletions pyiceberg/io/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
ADLFS_CONNECTION_STRING,
ADLFS_SAS_TOKEN,
ADLFS_TENANT_ID,
AWS_ACCESS_KEY_ID,
AWS_REGION,
AWS_SECRET_ACCESS_KEY,
AWS_SESSION_TOKEN,
GCS_ACCESS,
GCS_CACHE_TIMEOUT,
GCS_CONSISTENCY,
Expand Down Expand Up @@ -114,12 +118,14 @@ def _file(_: Properties) -> LocalFileSystem:
def _s3(properties: Properties) -> AbstractFileSystem:
from s3fs import S3FileSystem

from pyiceberg.table import PropertyUtil

client_kwargs = {
"endpoint_url": properties.get(S3_ENDPOINT),
"aws_access_key_id": properties.get(S3_ACCESS_KEY_ID),
"aws_secret_access_key": properties.get(S3_SECRET_ACCESS_KEY),
"aws_session_token": properties.get(S3_SESSION_TOKEN),
"region_name": properties.get(S3_REGION),
"aws_access_key_id": PropertyUtil.get_first_property_value(properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"aws_secret_access_key": PropertyUtil.get_first_property_value(properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"aws_session_token": PropertyUtil.get_first_property_value(properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region_name": PropertyUtil.get_first_property_value(properties, S3_REGION, AWS_REGION),
}
config_kwargs = {}
register_events: Dict[str, Callable[[Properties], None]] = {}
Expand Down
14 changes: 10 additions & 4 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@
)
from pyiceberg.expressions.visitors import visit as boolean_expression_visit
from pyiceberg.io import (
AWS_ACCESS_KEY_ID,
AWS_REGION,
AWS_SECRET_ACCESS_KEY,
AWS_SESSION_TOKEN,
GCS_DEFAULT_LOCATION,
GCS_ENDPOINT,
GCS_TOKEN,
Expand Down Expand Up @@ -345,12 +349,14 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
if scheme in {"s3", "s3a", "s3n"}:
from pyarrow.fs import S3FileSystem

from pyiceberg.table import PropertyUtil

client_kwargs: Dict[str, Any] = {
"endpoint_override": self.properties.get(S3_ENDPOINT),
"access_key": self.properties.get(S3_ACCESS_KEY_ID),
"secret_key": self.properties.get(S3_SECRET_ACCESS_KEY),
"session_token": self.properties.get(S3_SESSION_TOKEN),
"region": self.properties.get(S3_REGION),
"access_key": PropertyUtil.get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"secret_key": PropertyUtil.get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"session_token": PropertyUtil.get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region": PropertyUtil.get_first_property_value(self.properties, S3_REGION, AWS_REGION),
}

if proxy_uri := self.properties.get(S3_PROXY_URI):
Expand Down
7 changes: 7 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ def property_as_bool(properties: Dict[str, str], property_name: str, default: bo
return value.lower() == "true"
return default

@staticmethod
def get_first_property_value(properties: Properties, *property_names: str) -> Optional[Any]:
for property_name in property_names:
if property_value := properties.get(property_name):
return property_value
return None


class Transaction:
_table: Table
Expand Down
62 changes: 61 additions & 1 deletion tests/catalog/test_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@
TableAlreadyExistsError,
)
from pyiceberg.schema import Schema
from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX
from pyiceberg.typedef import Properties
from tests.conftest import (
BUCKET_NAME,
DEPRECATED_AWS_SESSION_PROPERTIES,
TABLE_METADATA_LOCATION_REGEX,
UNIFIED_AWS_SESSION_PROPERTIES,
)


@mock_aws
Expand Down Expand Up @@ -579,6 +585,60 @@ def test_passing_provided_profile() -> None:
assert test_catalog.dynamodb is mock_session().client()


@mock_aws
def test_passing_glue_session_properties() -> None:
session_properties: Properties = {
"dynamodb.access-key-id": "dynamodb.access-key-id",
"dynamodb.secret-access-key": "dynamodb.secret-access-key",
"dynamodb.profile-name": "dynamodb.profile-name",
"dynamodb.region": "dynamodb.region",
"dynamodb.session-token": "dynamodb.session-token",
**UNIFIED_AWS_SESSION_PROPERTIES,
**DEPRECATED_AWS_SESSION_PROPERTIES,
}

with mock.patch("boto3.Session") as mock_session:
mock_client = mock.Mock()
mock_session.return_value.client.return_value = mock_client
mock_client.describe_table.return_value = {"Table": {"TableStatus": "ACTIVE"}}
test_catalog = DynamoDbCatalog("dynamodb", **session_properties)

mock_session.assert_called_with(
aws_access_key_id="dynamodb.access-key-id",
aws_secret_access_key="dynamodb.secret-access-key",
aws_session_token="dynamodb.session-token",
region_name="dynamodb.region",
profile_name="dynamodb.profile-name",
botocore_session=None,
)
assert test_catalog.dynamodb is mock_session().client()


@mock_aws
def test_passing_unified_session_properties_to_dynamodb() -> None:
session_properties: Properties = {
"dynamodb.profile-name": "dynamodb.profile-name",
**UNIFIED_AWS_SESSION_PROPERTIES,
**DEPRECATED_AWS_SESSION_PROPERTIES,
}

with mock.patch("boto3.Session") as mock_session:
mock_client = mock.Mock()
mock_session.return_value.client.return_value = mock_client
mock_client.describe_table.return_value = {"Table": {"TableStatus": "ACTIVE"}}
test_catalog = DynamoDbCatalog("dynamodb", **session_properties)

mock_session.assert_called_with(
aws_access_key_id="client.access-key-id",
aws_secret_access_key="client.secret-access-key",
aws_session_token="client.session-token",
region_name="client.region",
profile_name="dynamodb.profile-name",
botocore_session=None,
)
assert test_catalog.dynamodb is mock_session().client()


@mock_aws
def test_table_exists(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
Expand Down
Loading

0 comments on commit d0bfb4a

Please sign in to comment.