Skip to content

Commit

Permalink
Merge pull request #253 from romeokienzler/main
Browse files Browse the repository at this point in the history
move to new cos connection scheme
  • Loading branch information
romeokienzler committed Mar 28, 2024
2 parents 8a9d88e + 467ec44 commit dcaaf32
Showing 1 changed file with 24 additions and 91 deletions.
115 changes: 24 additions & 91 deletions component-library/util/util-cos.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"metadata": {},
"outputs": [],
"source": [
"!pip install aiobotocore botocore s3fs"
"!pip install aiobotocore botocore s3fs claimed-c3"
]
},
{
Expand All @@ -71,7 +71,8 @@
"import re\n",
"import s3fs\n",
"import sys\n",
"import glob"
"import glob\n",
"from c3.operator_utils import explode_connection_string"
]
},
{
Expand All @@ -90,26 +91,14 @@
},
"outputs": [],
"source": [
"# access key id\n",
"access_key_id = os.environ.get('access_key_id')\n",
"\n",
"# secret access key\n",
"secret_access_key = os.environ.get('secret_access_key')\n",
"# cos_connection in format: [cos|s3]://access_key_id:secret_access_key@endpoint/bucket/path\n",
"cos_connection = os.environ.get('cos_connection')\n",
"(access_key_id, secret_access_key, endpoint, cos_path) = explode_connection_string(cos_connection)\n",
"\n",
"# cos/s3 endpoint\n",
"endpoint = os.environ.get('endpoint')\n",
"\n",
"# cos bucket name\n",
"bucket_name = os.environ.get('bucket_name')\n",
"\n",
"# path\n",
"path = os.environ.get('path','')\n",
"\n",
"# source in case of uploads\n",
"source = os.environ.get('source', '')\n",
"\n",
"# target in case of downloads\n",
"target = os.environ.get('target', '')\n",
"# local_path for uploads, downloads, sync\n",
"local_path = os.environ.get('local_path')\n",
"\n",
"# recursive\n",
"recursive = bool(os.environ.get('recursive','False'))\n",
Expand All @@ -121,62 +110,6 @@
"log_level = os.environ.get('log_level', 'INFO')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "22c51c11-7c47-4877-92a3-88e071506012",
"metadata": {
"papermill": {
"duration": 0.01535,
"end_time": "2022-10-26T08:27:04.474938",
"exception": false,
"start_time": "2022-10-26T08:27:04.459588",
"status": "completed"
},
"tags": []
},
"outputs": [],
"source": [
"root = logging.getLogger()\n",
"root.setLevel(log_level)\n",
"\n",
"handler = logging.StreamHandler(sys.stdout)\n",
"handler.setLevel(log_level)\n",
"formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')\n",
"handler.setFormatter(formatter)\n",
"root.addHandler(handler)\n",
"\n",
"\n",
"parameters = list(\n",
" map(lambda s: re.sub('$', '\"', s),\n",
" map(\n",
" lambda s: s.replace('=', '=\"'),\n",
" filter(\n",
" lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\\/A-Za-z0-9]*', s)),\n",
" sys.argv\n",
" )\n",
" )))\n",
"\n",
"logging.info('Logging parameters: ' + ''.join(parameters))\n",
"for parameter in parameters:\n",
" logging.info('Parameter: ' + parameter)\n",
" exec(parameter)\n",
"\n",
"recursive = bool(recursive)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "efd9f14b",
"metadata": {},
"outputs": [],
"source": [
"def print_list(l):\n",
" for file in l:\n",
" print(file)"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -201,30 +134,30 @@
")\n",
"\n",
"if operation == 'mkdir':\n",
" s3.mkdir(bucket_name+path)\n",
" s3.mkdir(cos_path)\n",
"elif operation == 'ls':\n",
" print_list(s3.ls(bucket_name+path))\n",
" print(s3.ls(cos_path))\n",
"elif operation == 'find':\n",
" print_list(s3.find(bucket_name+path))\n",
" print(s3.find(cos_path))\n",
"elif operation == 'put':\n",
" print(s3.put(source,bucket_name+path, recursive=recursive))\n",
" print(s3.put(local_path,cos_path, recursive=recursive))\n",
"elif operation == 'sync_to_cos':\n",
" for file in glob.glob(source, recursive=recursive):\n",
" for file in glob.glob(local_path, recursive=recursive):\n",
" logging.info(f'processing {file}')\n",
" if s3.exists(bucket_name+file):\n",
" if s3.exists(cos_path+file):\n",
" logging.info(f'exists {file}')\n",
" logging.debug(f's3.info {s3.info(bucket_name+file)}')\n",
" if s3.info(bucket_name+file)['size'] != os.path.getsize(file):\n",
" logging.debug(f's3.info {s3.info(cos_path+file)}')\n",
" if s3.info(cos_path+file)['size'] != os.path.getsize(file):\n",
" logging.info(f'uploading {file}')\n",
" s3.put(file, bucket_name+file)\n",
" s3.put(file, cos_path+file)\n",
" else:\n",
" logging.info(f'skipping {file}')\n",
" else:\n",
" logging.info(f'uploading {file}')\n",
" s3.put(file, bucket_name+file)\n",
" s3.put(file, cos_path+file)\n",
"elif operation == 'sync_to_local':\n",
" for full_path in s3.glob(bucket_name+path):\n",
" local_full_path = target+full_path\n",
" for full_path in s3.glob(cos_path):\n",
" local_full_path = local_path+full_path\n",
" logging.info(f'processing {full_path}')\n",
" if s3.info(full_path)['type'] == 'directory':\n",
" logging.debug(f'skipping directory {full_path}')\n",
Expand All @@ -241,11 +174,11 @@
" logging.info(f'downloading {full_path} to {local_full_path}')\n",
" s3.get(full_path, local_full_path)\n",
"elif operation == 'get':\n",
" s3.get(bucket_name+path, target, recursive=recursive)\n",
" s3.get(cos_path, local_path, recursive=recursive)\n",
"elif operation == 'rm':\n",
" s3.rm(bucket_name+path, recursive=recursive)\n",
" s3.rm(cos_path, recursive=recursive)\n",
"elif operation == 'glob':\n",
" print_list(s3.glob(bucket_name+path))\n",
" print(s3.glob(cos_path))\n",
"else:\n",
" logging.error(f'operation unkonwn {operation}')"
]
Expand All @@ -267,7 +200,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
"version": "3.11.7"
},
"papermill": {
"default_parameters": {},
Expand Down

0 comments on commit dcaaf32

Please sign in to comment.