Skip to content

Commit

Permalink
Abstract DatabaseAdmin, admin standard utility conversion/methods + t…
Browse files Browse the repository at this point in the history
…ests thereof (#268)

* abstract DatabaseAdmin class

* eq, copy, set_caller etc - standard methods to all admin classes
  • Loading branch information
hemidactylus authored Mar 27, 2024
1 parent 510088e commit a569a38
Show file tree
Hide file tree
Showing 4 changed files with 525 additions and 5 deletions.
240 changes: 237 additions & 3 deletions astrapy/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import re
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from dataclasses import dataclass

Expand Down Expand Up @@ -63,7 +64,11 @@ def __init__(self) -> None:
TEST = "test"


database_id_finder = re.compile(
database_id_matcher = re.compile(
"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
)

api_endpoint_parser = re.compile(
"https://"
"([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
"-"
Expand Down Expand Up @@ -116,7 +121,7 @@ def parse_api_endpoint(api_endpoint: str) -> Optional[ParsedAPIEndpoint]:
The parsed ParsedAPIEndpoint. If parsing fails, return None.
"""

match = database_id_finder.match(api_endpoint)
match = api_endpoint_parser.match(api_endpoint)
if match and match.groups():
d_id, d_re, d_en_x = match.groups()
return ParsedAPIEndpoint(
Expand Down Expand Up @@ -327,6 +332,103 @@ def __repr__(self) -> str:
env_desc = f', environment="{self.environment}"'
return f'{self.__class__.__name__}("{self.token[:12]}..."{env_desc})'

def __eq__(self, other: Any) -> bool:
if isinstance(other, AstraDBAdmin):
return all(
[
self.token == other.token,
self.environment == other.environment,
self.dev_ops_url == other.dev_ops_url,
self.dev_ops_url == other.dev_ops_url,
self._caller_name == other._caller_name,
self._caller_version == other._caller_version,
self._dev_ops_url == other._dev_ops_url,
self._dev_ops_api_version == other._dev_ops_api_version,
self._astra_db_ops == other._astra_db_ops,
]
)
else:
return False

def _copy(
self,
*,
token: Optional[str] = None,
environment: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
dev_ops_url: Optional[str] = None,
dev_ops_api_version: Optional[str] = None,
) -> AstraDBAdmin:
return AstraDBAdmin(
token=token or self.token,
environment=environment or self.environment,
caller_name=caller_name or self._caller_name,
caller_version=caller_version or self._caller_version,
dev_ops_url=dev_ops_url or self._dev_ops_url,
dev_ops_api_version=dev_ops_api_version or self._dev_ops_api_version,
)

def with_options(
self,
*,
token: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> AstraDBAdmin:
"""
Create a clone of this AstraDBAdmin with some changed attributes.
Args:
token: an Access Token to the database. Example: `"AstraCS:xyz..."`.
caller_name: name of the application, or framework, on behalf of which
the Data API and DevOps API calls are performed. This ends up in
the request user-agent.
caller_version: version of the caller.
Returns:
a new AstraDBAdmin instance.
Example:
>>> another_astra_db_admin = my_astra_db_admin.with_options(
... caller_name="caller_identity",
... caller_version="1.2.0",
... )
"""

return self._copy(
token=token,
caller_name=caller_name,
caller_version=caller_version,
)

def set_caller(
self,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> None:
"""
Set a new identity for the application/framework on behalf of which
the DevOps API calls will be performed (the "caller").
New objects spawned from this client afterwards will inherit the new settings.
Args:
caller_name: name of the application, or framework, on behalf of which
the DevOps API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.
Example:
>>> my_astra_db_admin.set_caller(
... caller_name="the_caller",
... caller_version="0.1.0",
... )
"""

self._caller_name = caller_name
self._caller_version = caller_version
self._astra_db_ops.set_caller(caller_name, caller_version)

@ops_recast_method_sync
def list_databases(
self,
Expand Down Expand Up @@ -717,7 +819,45 @@ def get_async_database(
).to_async()


class AstraDBDatabaseAdmin:
class DatabaseAdmin(ABC):
"""
An abstract class defining the interface for a database admin object.
This supports generic namespace crud, as well as spawning databases,
without committing to a specific database architecture (e.g. Astra DB).
"""

@abstractmethod
def list_namespaces(self, *pargs: Any, **kwargs: Any) -> List[str]:
"""Get a list of namespaces for the database."""
...

@abstractmethod
def create_namespace(self, name: str, *pargs: Any, **kwargs: Any) -> Dict[str, Any]:
"""
Create a namespace in the database, returning {'ok': 1} if successful.
"""
...

@abstractmethod
def drop_namespace(self, name: str, *pargs: Any, **kwargs: Any) -> Dict[str, Any]:
"""
Drop (delete) a namespace from the database,
returning {'ok': 1} if successful.
"""
...

@abstractmethod
def get_database(self, *pargs: Any, **kwargs: Any) -> Database:
"""Get a Database object from this database admin."""
...

@abstractmethod
def get_async_database(self, *pargs: Any, **kwargs: Any) -> AsyncDatabase:
"""Get an AsyncDatabase object from this database admin."""
...


class AstraDBDatabaseAdmin(DatabaseAdmin):
"""
An "admin" object, able to perform administrative tasks at the namespaces level
(i.e. within a certani database), such as creating/listing/dropping namespaces.
Expand Down Expand Up @@ -790,6 +930,100 @@ def __repr__(self) -> str:
f'"{self.token[:12]}..."{env_desc})'
)

def __eq__(self, other: Any) -> bool:
if isinstance(other, AstraDBDatabaseAdmin):
return all(
[
self.id == other.id,
self.token == other.token,
self.environment == other.environment,
self._astra_db_admin == other._astra_db_admin,
]
)
else:
return False

def _copy(
self,
id: Optional[str] = None,
token: Optional[str] = None,
environment: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
dev_ops_url: Optional[str] = None,
dev_ops_api_version: Optional[str] = None,
) -> AstraDBDatabaseAdmin:
return AstraDBDatabaseAdmin(
id=id or self.id,
token=token or self.token,
environment=environment or self.environment,
caller_name=caller_name or self._astra_db_admin._caller_name,
caller_version=caller_version or self._astra_db_admin._caller_version,
dev_ops_url=dev_ops_url or self._astra_db_admin._dev_ops_url,
dev_ops_api_version=dev_ops_api_version
or self._astra_db_admin._dev_ops_api_version,
)

def with_options(
self,
*,
id: Optional[str] = None,
token: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> AstraDBDatabaseAdmin:
"""
Create a clone of this AstraDBDatabaseAdmin with some changed attributes.
Args:
id: e. g. "01234567-89ab-cdef-0123-456789abcdef".
token: an Access Token to the database. Example: `"AstraCS:xyz..."`.
caller_name: name of the application, or framework, on behalf of which
the Data API and DevOps API calls are performed. This ends up in
the request user-agent.
caller_version: version of the caller.
Returns:
a new AstraDBDatabaseAdmin instance.
Example:
>>> admin_for_my_other_db = admin_for_my_db.with_options(
... id="abababab-0101-2323-4545-6789abcdef01",
... )
"""

return self._copy(
id=id,
token=token,
caller_name=caller_name,
caller_version=caller_version,
)

def set_caller(
self,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> None:
"""
Set a new identity for the application/framework on behalf of which
the DevOps API calls will be performed (the "caller").
New objects spawned from this client afterwards will inherit the new settings.
Args:
caller_name: name of the application, or framework, on behalf of which
the DevOps API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.
Example:
>>> admin_for_my_db.set_caller(
... caller_name="the_caller",
... caller_version="0.1.0",
... )
"""

self._astra_db_admin.set_caller(caller_name, caller_version)

@staticmethod
def from_astra_db_admin(
id: str, *, astra_db_admin: AstraDBAdmin
Expand Down
98 changes: 98 additions & 0 deletions astrapy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

from __future__ import annotations

import re
from typing import Any, Dict, Optional, TYPE_CHECKING

from astrapy.admin import (
Environment,
api_endpoint_parser,
build_api_endpoint,
database_id_matcher,
fetch_raw_database_info_from_id_token,
parse_api_endpoint,
)
Expand Down Expand Up @@ -85,6 +88,101 @@ def __repr__(self) -> str:
env_desc = f', environment="{self.environment}"'
return f'{self.__class__.__name__}("{self.token[:12]}..."{env_desc})'

def __eq__(self, other: Any) -> bool:
if isinstance(other, DataAPIClient):
return all(
[
self.token == other.token,
self.environment == other.environment,
self._caller_name == other._caller_name,
self._caller_version == other._caller_version,
]
)
else:
return False

def __getitem__(self, database_id_or_api_endpoint: str) -> Database:
if re.match(database_id_matcher, database_id_or_api_endpoint):
return self.get_database(database_id_or_api_endpoint)
elif re.match(api_endpoint_parser, database_id_or_api_endpoint):
return self.get_database_by_api_endpoint(database_id_or_api_endpoint)
else:
raise ValueError(
"The provided input does not look like either a database ID "
f"or an API endpoint ('{database_id_or_api_endpoint}')."
)

def _copy(
self,
*,
token: Optional[str] = None,
environment: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> DataAPIClient:
return DataAPIClient(
token=token or self.token,
environment=environment or self.environment,
caller_name=caller_name or self._caller_name,
caller_version=caller_version or self._caller_version,
)

def with_options(
self,
*,
token: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> DataAPIClient:
"""
Create a clone of this DataAPIClient with some changed attributes.
Args:
token: an Access Token to the database. Example: `"AstraCS:xyz..."`.
caller_name: name of the application, or framework, on behalf of which
the Data API and DevOps API calls are performed. This ends up in
the request user-agent.
caller_version: version of the caller.
Returns:
a new DataAPIClient instance.
Example:
>>> another_client = my_client.with_options(
... caller_name="caller_identity",
... caller_version="1.2.0",
... )
"""

return self._copy(
token=token,
caller_name=caller_name,
caller_version=caller_version,
)

def set_caller(
self,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> None:
"""
Set a new identity for the application/framework on behalf of which
the API calls will be performed (the "caller").
New objects spawned from this client afterwards will inherit the new settings.
Args:
caller_name: name of the application, or framework, on behalf of which
the API API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.
Example:
>>> my_client.set_caller(caller_name="the_caller", caller_version="0.1.0")
"""

self._caller_name = caller_name
self._caller_version = caller_version

def get_database(
self,
id: str,
Expand Down
Loading

0 comments on commit a569a38

Please sign in to comment.