Skip to content

Commit

Permalink
tool: BlobDownloader
Browse files Browse the repository at this point in the history
  • Loading branch information
LiliDeng committed Mar 15, 2024
1 parent d9e34d6 commit 3f4d57e
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 22 deletions.
56 changes: 34 additions & 22 deletions lisa/sut_orchestrator/azure/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,36 +1232,44 @@ def generate_sas_token(


def get_blob_service_client(
credential: Any,
subscription_id: str,
cloud: Cloud,
account_name: str,
resource_group_name: str,
credential: Optional[Any] = None,
subscription_id: Optional[str] = None,
cloud: Optional[Cloud] = None,
account_name: Optional[str] = None,
resource_group_name: Optional[str] = None,
connection_string: Optional[str] = None,
) -> BlobServiceClient:
"""
Create a Azure Storage container if it does not exist.
"""
shared_key_credential = get_storage_credential(
credential=credential,
subscription_id=subscription_id,
cloud=cloud,
account_name=account_name,
resource_group_name=resource_group_name,
)
blob_service_client = BlobServiceClient(
f"https://{account_name}.blob.{cloud.suffixes.storage_endpoint}",
shared_key_credential,
)
if connection_string:
blob_service_client = BlobServiceClient.from_connection_string(
connection_string
)
else:
shared_key_credential = get_storage_credential(
credential=credential,
subscription_id=subscription_id,
cloud=cloud,
account_name=account_name,
resource_group_name=resource_group_name,
)
blob_service_client = BlobServiceClient(
f"https://{account_name}.blob.{cloud.suffixes.storage_endpoint}",
shared_key_credential,
)
return blob_service_client


def get_or_create_storage_container(
credential: Any,
subscription_id: str,
cloud: Cloud,
account_name: str,
container_name: str,
resource_group_name: str,
credential: Optional[Any] = None,
subscription_id: Optional[str] = None,
cloud: Optional[Cloud] = None,
account_name: Optional[str] = None,
resource_group_name: Optional[str] = None,
connection_string: Optional[str] = None,
create_if_not_exist: bool = True,
) -> ContainerClient:
"""
Create a Azure Storage container if it does not exist.
Expand All @@ -1272,10 +1280,14 @@ def get_or_create_storage_container(
cloud,
account_name,
resource_group_name,
connection_string,
)
container_client = blob_service_client.get_container_client(container_name)
if not container_client.exists():
container_client.create_container()
if create_if_not_exist:
container_client.create_container()
else:
raise LisaException(f"container {container_name} does not exist.")
return container_client


Expand Down
89 changes: 89 additions & 0 deletions lisa/sut_orchestrator/azure/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Dict, List, Optional, Tuple, Type

from assertpy import assert_that
from msrestazure.azure_cloud import Cloud # type: ignore

from lisa.base_tools import Cat, Wget
from lisa.executable import Tool
Expand Down Expand Up @@ -579,3 +580,91 @@ def get_pool_records(self, pool_id: int, force_run: bool = False) -> Dict[str, s
records[content_split[i]] = content_split[i + 1]

return records


class BlobDownloader(Tool):
@property
def command(self) -> str:
return "echo blob_downloader"

@property
def can_install(self) -> bool:
return False

def _check_exists(self) -> bool:
return True

def download_blob(
self,
account_name: str,
container_name: str,
blob_name: str,
credential: Optional[Any] = None,
subscription_id: Optional[str] = None,
cloud: Optional[Cloud] = None,
resource_group_name: Optional[str] = None,
connection_string: Optional[str] = None,
) -> PurePath:
from lisa import constants
from lisa.sut_orchestrator.azure.common import get_or_create_storage_container

working_path = constants.RUN_LOCAL_WORKING_PATH
working_path.mkdir(parents=True, exist_ok=True)
file_path = working_path / blob_name
container_client = get_or_create_storage_container(
container_name,
credential,
subscription_id,
cloud,
account_name,
resource_group_name,
connection_string,
False,
)
blob_client = container_client.get_blob_client(blob_name)
if not blob_client.exists():
raise LisaException(
f"Blob {blob_name} not found in container {container_name}"
)
try:
blob_data = blob_client.download_blob()
with open(file_path, "wb") as f:
f.write(blob_data.readall())
self._log.debug("Blob downloaded successfully.")
except Exception as e:
raise LisaException("An error occurred during blob download.") from e
return file_path

def list_blobs(
self,
account_name: str,
container_name: str,
credential: Optional[Any] = None,
subscription_id: Optional[str] = None,
cloud: Optional[Cloud] = None,
resource_group_name: Optional[str] = None,
connection_string: Optional[str] = None,
blob_name_pattern: str = "",
) -> List[str]:
blob_names_list: List[str] = []
filtered_blobs: List[str] = []
from lisa.sut_orchestrator.azure.common import get_or_create_storage_container

container_client = get_or_create_storage_container(
container_name,
credential,
subscription_id,
cloud,
account_name,
resource_group_name,
connection_string,
False,
)
blob_list = container_client.list_blobs()
for blob in blob_list:
blob_names_list.append(blob.name) # type: ignore
if blob_name_pattern:
pattern = re.compile(blob_name_pattern, re.M)
filtered_blobs = [blob for blob in blob_names_list if pattern.match(blob)]
return filtered_blobs
return blob_names_list

0 comments on commit 3f4d57e

Please sign in to comment.