Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move to new cos connection scheme #253

Merged
merged 1 commit into from
Mar 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 24 additions & 91 deletions component-library/util/util-cos.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"metadata": {},
"outputs": [],
"source": [
"!pip install aiobotocore botocore s3fs"
"!pip install aiobotocore botocore s3fs claimed-c3"
]
},
{
Expand All @@ -71,7 +71,8 @@
"import re\n",
"import s3fs\n",
"import sys\n",
"import glob"
"import glob\n",
"from c3.operator_utils import explode_connection_string"
]
},
{
Expand All @@ -90,26 +91,14 @@
},
"outputs": [],
"source": [
"# access key id\n",
"access_key_id = os.environ.get('access_key_id')\n",
"\n",
"# secret access key\n",
"secret_access_key = os.environ.get('secret_access_key')\n",
"# cos_connection in format: [cos|s3]://access_key_id:secret_access_key@endpoint/bucket/path\n",
"cos_connection = os.environ.get('cos_connection')\n",
"(access_key_id, secret_access_key, endpoint, cos_path) = explode_connection_string(cos_connection)\n",
"\n",
"# cos/s3 endpoint\n",
"endpoint = os.environ.get('endpoint')\n",
"\n",
"# cos bucket name\n",
"bucket_name = os.environ.get('bucket_name')\n",
"\n",
"# path\n",
"path = os.environ.get('path','')\n",
"\n",
"# source in case of uploads\n",
"source = os.environ.get('source', '')\n",
"\n",
"# target in case of downloads\n",
"target = os.environ.get('target', '')\n",
"# local_path for uploads, downloads, sync\n",
"local_path = os.environ.get('local_path')\n",
"\n",
"# recursive\n",
"recursive = bool(os.environ.get('recursive','False'))\n",
Expand All @@ -121,62 +110,6 @@
"log_level = os.environ.get('log_level', 'INFO')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "22c51c11-7c47-4877-92a3-88e071506012",
"metadata": {
"papermill": {
"duration": 0.01535,
"end_time": "2022-10-26T08:27:04.474938",
"exception": false,
"start_time": "2022-10-26T08:27:04.459588",
"status": "completed"
},
"tags": []
},
"outputs": [],
"source": [
"root = logging.getLogger()\n",
"root.setLevel(log_level)\n",
"\n",
"handler = logging.StreamHandler(sys.stdout)\n",
"handler.setLevel(log_level)\n",
"formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')\n",
"handler.setFormatter(formatter)\n",
"root.addHandler(handler)\n",
"\n",
"\n",
"parameters = list(\n",
" map(lambda s: re.sub('$', '\"', s),\n",
" map(\n",
" lambda s: s.replace('=', '=\"'),\n",
" filter(\n",
" lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\\/A-Za-z0-9]*', s)),\n",
" sys.argv\n",
" )\n",
" )))\n",
"\n",
"logging.info('Logging parameters: ' + ''.join(parameters))\n",
"for parameter in parameters:\n",
" logging.info('Parameter: ' + parameter)\n",
" exec(parameter)\n",
"\n",
"recursive = bool(recursive)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "efd9f14b",
"metadata": {},
"outputs": [],
"source": [
"def print_list(l):\n",
" for file in l:\n",
" print(file)"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -201,30 +134,30 @@
")\n",
"\n",
"if operation == 'mkdir':\n",
" s3.mkdir(bucket_name+path)\n",
" s3.mkdir(cos_path)\n",
"elif operation == 'ls':\n",
" print_list(s3.ls(bucket_name+path))\n",
" print(s3.ls(cos_path))\n",
"elif operation == 'find':\n",
" print_list(s3.find(bucket_name+path))\n",
" print(s3.find(cos_path))\n",
"elif operation == 'put':\n",
" print(s3.put(source,bucket_name+path, recursive=recursive))\n",
" print(s3.put(local_path,cos_path, recursive=recursive))\n",
"elif operation == 'sync_to_cos':\n",
" for file in glob.glob(source, recursive=recursive):\n",
" for file in glob.glob(local_path, recursive=recursive):\n",
" logging.info(f'processing {file}')\n",
" if s3.exists(bucket_name+file):\n",
" if s3.exists(cos_path+file):\n",
" logging.info(f'exists {file}')\n",
" logging.debug(f's3.info {s3.info(bucket_name+file)}')\n",
" if s3.info(bucket_name+file)['size'] != os.path.getsize(file):\n",
" logging.debug(f's3.info {s3.info(cos_path+file)}')\n",
" if s3.info(cos_path+file)['size'] != os.path.getsize(file):\n",
" logging.info(f'uploading {file}')\n",
" s3.put(file, bucket_name+file)\n",
" s3.put(file, cos_path+file)\n",
" else:\n",
" logging.info(f'skipping {file}')\n",
" else:\n",
" logging.info(f'uploading {file}')\n",
" s3.put(file, bucket_name+file)\n",
" s3.put(file, cos_path+file)\n",
"elif operation == 'sync_to_local':\n",
" for full_path in s3.glob(bucket_name+path):\n",
" local_full_path = target+full_path\n",
" for full_path in s3.glob(cos_path):\n",
" local_full_path = local_path+full_path\n",
" logging.info(f'processing {full_path}')\n",
" if s3.info(full_path)['type'] == 'directory':\n",
" logging.debug(f'skipping directory {full_path}')\n",
Expand All @@ -241,11 +174,11 @@
" logging.info(f'downloading {full_path} to {local_full_path}')\n",
" s3.get(full_path, local_full_path)\n",
"elif operation == 'get':\n",
" s3.get(bucket_name+path, target, recursive=recursive)\n",
" s3.get(cos_path, local_path, recursive=recursive)\n",
"elif operation == 'rm':\n",
" s3.rm(bucket_name+path, recursive=recursive)\n",
" s3.rm(cos_path, recursive=recursive)\n",
"elif operation == 'glob':\n",
" print_list(s3.glob(bucket_name+path))\n",
" print(s3.glob(cos_path))\n",
"else:\n",
" logging.error(f'operation unkonwn {operation}')"
]
Expand All @@ -267,7 +200,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
"version": "3.11.7"
},
"papermill": {
"default_parameters": {},
Expand Down
Loading