diff --git a/.vscode/launch.json b/.vscode/launch.json index 8e6be0aa..29b4097f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -60,7 +60,7 @@ "--timeout", "7200"], "console": "integratedTerminal" }, - { + { "name": "Python Debugger: hdf_import", "type": "debugpy", "request": "launch", @@ -74,6 +74,21 @@ "--loglevel", "DEBUG" ], "console": "integratedTerminal" + }, + { + "name": "Python Debugger: hdf_import (prompt DS)", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/aerospike/hdf_import.py", + "cwd": "${workspaceFolder}/aerospike", + "args": [ + "--hdf", "${input:enterDataset}", + "--concurrency", "5000", + "--idxdrop", + "--logfile", "./hdfimport.log", + "--loglevel", "DEBUG" + ], + "console": "integratedTerminal" }, { "name": "Python Debugger: hdf_import LB", @@ -132,13 +147,13 @@ "console": "integratedTerminal" }, { - "name": "Python Debugger: hdf_query (check)", + "name": "Python Debugger: hdf_query (check prompt)", "type": "debugpy", "request": "launch", "program": "${workspaceFolder}/aerospike/hdf_query.py", "cwd": "${workspaceFolder}/aerospike", "args": [ - "--dataset", "random-xs-20-angular", + "--hdf", "${input:enterDataset}", "--logfile", "./hdfquery.log", "--check", "-r", "10" @@ -174,5 +189,35 @@ ], "console": "integratedTerminal" }, - ] + { + "name": "Python Debugger: hdf_create_dataset (prompt)", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/aerospike/hdf_create_dataset.py", + "cwd": "${workspaceFolder}/aerospike", + "args": [ + "--hosts", "localhost:3000", + "--hdf", "${input:enterHDFFile}", + "--logfile", "./hdfcreatedataset.log", + "--indexname", "HDF-data_Idx", + + ], + "justMyCode": false, + "console": "integratedTerminal" + }, + ], + "inputs": [ + { + "id": "enterDataset", + "type": "promptString", + "description": "Enter Dataset", + "default": "random-xs-20-angular" + }, + { + "id": "enterHDFFile", + "type": "promptString", + "description": "Enter HDF Path", + "default": "test" + } + ] } \ No newline at end of file diff --git a/aerospike/README.md b/aerospike/README.md index 31dc1e49..87a743ad 100644 --- a/aerospike/README.md +++ b/aerospike/README.md @@ -1,405 +1,527 @@ -# hdf_import.py - -This module will import a generated ANN HDF dataset. If the dataset doesn’t locally exist, it will be imported from ANN repro. All datasets are cached in the “./data” folder. - -You can obtain a list of arguments by running: - -``` -python hdf_import.py –help -``` - -Below is a review of the argument: - -\-h, --help - -Show this help message and exit - -\-d DS, --dataset DS - -The ANN dataset (DS) to load training points from (default: glove-100-angular) - -\-c N, --concurrency N - -The maximum number of concurrent tasks (N) used to population the index. - -“N’ Values are: - -- \< 0 – All records are upserted, concurrently waiting for the upsert confirmation once all upserts are submitted -- 0 -- Disable Population - If the index doesn’t existence, it is created. The “wait for index completion” is still being performed. -- 1 -- One record is upserted and confirmed at a time (sync) -- \> 1 -- The number of records upserted and confirmed, concurrently (async) - - After population occurs, the module will go into “wait for index completion” mode by default. When this occurs, the module will wait until all populated index records are merged into the Aerospike DB. - - (default: 500) - -\--idxdrop - -If the Vector Index existence, it will be dropped. Otherwise, it is updated. (default: False) - -\--idxnowait - -Waiting for index completion is disabled. The module will continue without waiting for the index records to -be merged into the Aerospike DB (default: False) - -\-E EVT, --exhaustedevt EVT - -This determines how the Resource Exhausted event is handled. -This event occurs with the Vector Server merge queue is filled and cannot process any additional -population requests. - -“EVT” Values are: - -- \< 0 – All population events are stopped and will not resume until the index merger queue is cleared. This is done by “waiting for index completion” to occur. Once the queue is cleared, the population will be restarted. -- 0 -- Disable event handling (just re-throws the exception) -- \>= 1 – All population events are stopped, and the module will wait for “EVT” seconds. Once the interval is reached, the population will be restarted. - -(default: -1) - -\-m RECS, --maxrecs RECS - -Determines the maximum number of records to populated. A value of -1 (default) all records in -the HDF dataset are populated. - -(default: -1) - -\-p port, --vectorport port - -The Vector Server Port (default: 5000) - -\-a HOST, --host HOST - -The Vector Server’s IP Address or Host Name (default: localhost) - -\-A HOST:PORT [HOST:PORT ...], --hosts HOST:PORT [HOST:PORT ...] - -A list of host and optional port. Each pair is separated by a space. -Example: 'hosta:5000' or 'hostb' (default: [localhost:5000]) - -If provided, each population request is distributed over the list of hosts. - -Note: if this is provided, “—host” and “—port” arguments are ignored. - -\-l, --vectorloadbalancer - -Use Vector's Load Balancer. - -Note: if “—hosts” argument is used, only the first host in the list is used (reminding hosts are ignored) - -(default: False) - -\-T, --vectortls - -Use TLS to connect to the Vector Server - -(default: False) - -\-n NS, --namespace NS - -The Aerospike Namespace -(default: test) - -\-N NS, --idxnamespace NS - -Aerospike Namespace where the vector index will be located. -Defaults to the value of “—namespace”. - -\-s SET, --setname SET - -The Aerospike Set Name -(default: HDF-data) - -\-I IDX, --idxname IDX - -The Vector Index Name. -Defaults to the Set Name (--setname) with the suffix of '_idx' - -\-g, --generatedetailsetname - -Generates a unique Set Name (--setname) based on distance type, dimensions, index params, etc. -(default: False) - -\-b BIN, --vectorbinname BIN - -The Aerospike Bin Name where the vector is stored -(default: “HDF_embedding”) - -\-D DIST, --distancetype DIST - -The Vector's Index Distance Type as defined by the Vector Phyton API. -The default is to select the index type based on the ANN dataset - -\-P PARM, --indexparams PARM - -The Vector's Index Params (HnswParams) as defined by the Vector Phyton API. -(default: {"m": 16, "ef_construction": 100, "ef": 100}) - -\-L LOG, --logfile LOG - -The logging file path, if provided. -The default is to stdout. - -\--loglevel LEVEL - -The Logging level (default: INFO) - -\--driverloglevel DLEVEL - -The Vector Phyton Driver's Logging level (default: NOTSET) - -\--prometheus PORT - -The Prometheus Port (default: 9464) - -\--prometheushb SECS - -Prometheus heartbeat in secs. The heartbeat updates common information to Prometheus -(default: 5 seconds) - -\--exitdelay wait - -Upon exist, the module will sleep ensuring all Prometheus events are captured -(default: 20) - -# hdf_query.py - -This module will query using the ANN neighbor query vector defined in the ANN dataset that was downloaded and populated using [hdf_import.py](#hdf_importpy). - -You can obtain a list of arguments by running: - -``` -python hdf_import.py –help -``` - -Below is a review of the argument: - -\-h, --help - -Show this help message and exit - -\-d DS, --dataset DS - -The ANN dataset (DS) to load training points from (default: glove-100-angular) - -\-r RUNS, --runs RUNS - -The number of times the query requests will run based on the ANN dataset. -For example: If the ANN dataset request 1,000 queries and if this value is 10; The total number of query requests will be 10,000 (1,000 \* 10). -(default: 10) - -\--limit NEEIGHBORS - -The number of neighbors to return from each query request. If this value is less than or equal to 0, the dataset's neighbor result array length will be used. -(default: -1) - -\--parallel - -Each “run” is conducted concurrently -(default: False) - -\--check - -Each query result is checked to determine if the result is correct. -{default False) - -\-p port, --vectorport port - -The Vector Server Port (default: 5000) - -\-a HOST, --host HOST - -The Vector Server’s IP Address or Host Name (default: localhost) - -\-A HOST:PORT [HOST:PORT ...], --hosts HOST:PORT [HOST:PORT ...] - -A list of host and optional port. Each pair is separated by a space. -Example: 'hosta:5000' or 'hostb' (default: [localhost:5000]) - -If provided, each query request is distributed over the list of hosts. - -Note: if this is provided, “—host” and “—port” arguments are ignored. - -\-l, --vectorloadbalancer - -Use Vector's Load Balancer. - -Note: if “—hosts” argument is used, only the first host in the list is used (reminding hosts are ignored) - -(default: False) - -\-T, --vectortls - -Use TLS to connect to the Vector Server - -(default: False) - -\-N NS, --idxnamespace NS - -Aerospike Namespace where the vector index will be located. -Defaults to the value of “—namespace”. - -\-s SET, --setname SET - -The Aerospike Set Name -(default: HDF-data) - -\-I IDX, --idxname IDX - -The Vector Index Name. -Defaults to the Set Name (--setname) with the suffix of '_idx' - -\-g, --generatedetailsetname - -Generates a unique Set Name (--setname) based on distance type, dimensions, index params, etc. -(default: False) - -\-S PARM, --searchparams PARM - -The Vector's Search Params (HnswParams) as defined by the Vector Phyton API. -Defaults to --indexparams - -\--metric TYPE - -Determines how recall is calculated. Defaults to “KNN”. Possible values are: - -- knn (default) -- epsilon -- Epsilon 0.01 Recall -- largeepsilon -- Epsilon 0.1 Recall -- rel -- Relative Error - -\-L LOG, --logfile LOG - -The logging file path, if provided. -The default is to stdout. - -\--loglevel LEVEL - -The Logging level (default: INFO) - -\--driverloglevel DLEVEL - -The Vector Phyton Driver's Logging level (default: NOTSET) - -\--prometheus PORT - -The Prometheus Port (default: 9464) - -\--prometheushb SECS - -Prometheus heartbeat in secs. The heartbeat updates common information to Prometheus -(default: 5 seconds) - -\--exitdelay wait - -Upon exist, the module will sleep ensuring all Prometheus events are captured -(default: 20) - -# Prometheus - -The module outputs certain meters to Prometheus. They are: - -- `aerospike.hdf.heartbeat This event is defined as a gauge. It provides information about the status of the module including the following attributes:` - - `"ns” – Aerospike Namespace` - - `"set” – Aerospike Set ` - - `"idxns” – Aerospike Vector Index Namespace` - - `"idx" – The Aerospike Vector Index Name ` - - `"idxbin" – The Vector’s Bin Name` - - `"idxdist" – The Vector’s API Distance Type` - - `"dims" – Vector’s dimensions` - - `"poprecs" – The number of records in the ANN dataset that will be populated` - - `"queries": The total number of queries used for this dataset ` - - `“querynbrlmt” – The number of possible neighbors to return for the query` - - `“queryruns” – The number of query runs. Each run will perform a set of define queries. The number is defined in “queries”. ` - - `"querycurrun" – The current query run ` - - `"querymetric – The recall method used` - - `"querymetricvalue" – The recall calculated value based on recall method` - - `"dataset” – The ANN dataset` - - `"paused" – The run status. Values are:` - - `Waiting – waiting for index completion` - - `Paused – paused due to resource exhausted` - - `Running - Populating` - - `Idle – before population, after, or after waiting…` - - `Done – Population/Wait done` - - `"action” – If importing (populating) or querying` - - "remainingRecs" – The current number of records that have not been populated - - "remainingquerynbrs” - The total number of queries (includes all runs) that have not been executed -- `aerospike.hdf.populate Current record rate that have been upserted. Defined as a counter. Attributes:` - - `"type" -- upsert` - - `"ns" -- Namespace` - - `"set" – Set Name` -- `aerospike.hdf.query Current query rate. Defined as a counter. Attributes:` - - `"ns" -- Namespace` - - `"` `idx" – Index Name` - - `“run” – The run associated with this query` -- `aerospike.hdf.exception Current exception rate. Defined as a counter. Attributes:` - - `"exception_type" – Type of exception` - - `"handled_by_user" – if handled by user code` - - `"ns" -- Namespace` - - `"set" – Set` - - `“idx” – Index name` -- `aerospike.hdf.waitidxcompletion Current number of waiting for index merge completions being conducted. Defined as a counter. Attributes:` - - `"ns" – Index Namespace` - - `"idx" – Index Name` -- `aerospike.hdf.dropidxtime The amount of time to perform an index drop. Defined as a histogram. Attributes:` - - `"ns" – Index Namepsace` - - `"idx" – Index Name` - -# Supported Datasets - -- deep-image-96-angular -- fashion-mnist-784-euclidean -- gist-960-euclidean -- glove-25-angular -- glove-50-angular -- glove-100-angular -- glove-200-angular -- mnist-784-euclidean -- random-xs-20-euclidean -- random-s-100-euclidean -- random-xs-20-angular -- random-s-100-angular -- random-xs-16-hamming -- random-s-128-hamming -- random-l-256-hamming -- random-s-jaccard -- random-l-jaccard -- sift-128-euclidean -- nytimes-256-angular -- nytimes-16-angular -- sift-256-hamming -- kosarak-jaccard -- movielens1m-jaccard -- movielens10m-jaccard -- movielens20m-jaccard -- dbpedia-openai-100k-angular -- dbpedia-openai-200k-angular -- dbpedia-openai-300k-angular -- dbpedia-openai-400k-angular -- dbpedia-openai-500k-angular -- dbpedia-openai-600k-angular -- dbpedia-openai-700k-angular -- dbpedia-openai-800k-angular -- dbpedia-openai-900k-angular -- dbpedia-openai-1000k-angular - -## Possible additional sites (can be more) - -Any new dataset needs to be defined to mini-ANN. - -- -- -- - - -## Dashboard - -You can import the dashboard from [here](https://github.com/aerospike-community/ann-benchmarks/blob/main/aerospike/AerospikeHDFDashboard.json) (AerospikeHDFDashboard.json). - -### hdf-import dashboard example - -![dashboard import](./readme-HDFImportDashboard.png) - -### hdf-query dashboard example - -![dashboard query](./readme-HDFQueryDashboard.png) +# hdf_import.py + +This module will import a generated ANN HDF dataset. If the dataset doesn’t locally exist, it will be imported from ANN repro. All datasets are cached in the “./data” folder. + +You can obtain a list of arguments by running: + +``` +python hdf_import.py –help +``` + +Below is a review of the argument: + +\-h, --help + +Show this help message and exit + +\-d DS, --dataset DS + +The ANN dataset (DS) to load training points from (default: glove-100-angular) + +\--hdf HDFFILE + +A HDF file that can be an ANN HDF file, or one created by “[hdf_create_dataset.py](#hdf_create_datasetpy)”. + +You can provide a path to this HDF file. If a path is not provided, the “data” folder is assumed. + +\-c N, --concurrency N + +The maximum number of concurrent tasks (N) used to population the index. + +“N’ Values are: + +- \< 0 – All records are upserted, concurrently waiting for the upsert confirmation once all upserts are submitted +- 0 -- Disable Population + If the index doesn’t existence, it is created. The “wait for index completion” is still being performed. +- 1 -- One record is upserted and confirmed at a time (sync) +- \> 1 -- The number of records upserted and confirmed, concurrently (async) + + After population occurs, the module will go into “wait for index completion” mode by default. When this occurs, the module will wait until all populated index records are merged into the Aerospike DB. + + (default: 500) + +\--idxdrop + +If the Vector Index existence, it will be dropped. Otherwise, it is updated. (default: False) + +\--idxnowait + +Waiting for index completion is disabled. The module will continue without waiting for the index records to +be merged into the Aerospike DB (default: False) + +\-E EVT, --exhaustedevt EVT + +This determines how the Resource Exhausted event is handled. +This event occurs with the Vector Server merge queue is filled and cannot process any additional +population requests. + +“EVT” Values are: + +- \< 0 – All population events are stopped and will not resume until the index merger queue is cleared. This is done by “waiting for index completion” to occur. Once the queue is cleared, the population will be restarted. +- 0 -- Disable event handling (just re-throws the exception) +- \>= 1 – All population events are stopped, and the module will wait for “EVT” seconds. Once the interval is reached, the population will be restarted. + +(default: -1) + +\-m RECS, --maxrecs RECS + +Determines the maximum number of records to populated. A value of -1 (default) all records in +the HDF dataset are populated. + +(default: -1) + +\-p port, --vectorport port + +The Vector Server Port (default: 5000) + +\-a HOST, --host HOST + +The Vector Server’s IP Address or Host Name (default: localhost) + +\-A HOST:PORT [HOST:PORT ...], --hosts HOST:PORT [HOST:PORT ...] + +A list of host and optional port. Each pair is separated by a space. +Example: 'hosta:5000' or 'hostb' (default: [localhost:5000]) + +If provided, each population request is distributed over the list of hosts. + +Note: if this is provided, “—host” and “—port” arguments are ignored. + +\-l, --vectorloadbalancer + +Use Vector's Load Balancer. + +Note: if “—hosts” argument is used, only the first host in the list is used (reminding hosts are ignored) + +(default: False) + +\-T, --vectortls + +Use TLS to connect to the Vector Server + +(default: False) + +\-n NS, --namespace NS + +The Aerospike Namespace +(default: test) + +\-N NS, --idxnamespace NS + +Aerospike Namespace where the vector index will be located. +Defaults to the value of “—namespace”. + +\-s SET, --setname SET + +The Aerospike Set Name +(default: HDF-data) + +\-I IDX, --idxname IDX + +The Vector Index Name. +Defaults to the Set Name (--setname) with the suffix of '_idx' + +\-g, --generatedetailsetname + +Generates a unique Set Name (--setname) based on distance type, dimensions, index params, etc. +(default: False) + +\-b BIN, --vectorbinname BIN + +The Aerospike Bin Name where the vector is stored +(default: “HDF_embedding”) + +\-D DIST, --distancetype DIST + +The Vector's Index Distance Type as defined by the Vector Phyton API. +The default is to select the index type based on the ANN dataset + +\-P PARM, --indexparams PARM + +The Vector's Index Params (HnswParams) as defined by the Vector Phyton API. +(default: {"m": 16, "ef_construction": 100, "ef": 100}) + +\-L LOG, --logfile LOG + +The logging file path, if provided. +The default is to stdout. + +\--loglevel LEVEL + +The Logging level (default: INFO) + +\--driverloglevel DLEVEL + +The Vector Phyton Driver's Logging level (default: NOTSET) + +\--prometheus PORT + +The Prometheus Port (default: 9464) + +\--prometheushb SECS + +Prometheus heartbeat in secs. The heartbeat updates common information to Prometheus +(default: 5 seconds) + +\--exitdelay wait + +Upon exist, the module will sleep ensuring all Prometheus events are captured +(default: 20) + +# hdf_query.py + +This module will query using the ANN neighbor query vector defined in the ANN dataset that was downloaded and populated using [hdf_import.py](#hdf_importpy). + +You can obtain a list of arguments by running: + +``` +python hdf_import.py –help +``` + +Below is a review of the argument: + +\-h, --help + +Show this help message and exit + +\-d DS, --dataset DS + +The ANN dataset (DS) to load training points from (default: glove-100-angular) + +\--hdf HDFFILE + +A HDF file that can be an ANN HDF file, or one created by “[hdf_create_dataset.py](#hdf_create_datasetpy)”. + +You can provide a path to this HDF file. If a path is not provided, the “data” folder is assumed. + +\-r RUNS, --runs RUNS + +The number of times the query requests will run based on the ANN dataset. +For example: If the ANN dataset request 1,000 queries and if this value is 10; The total number of query requests will be 10,000 (1,000 \* 10). +(default: 10) + +\--limit NEEIGHBORS + +The number of neighbors to return from each query request. If this value is less than or equal to 0, the dataset's neighbor result array length will be used. (default: -1) + +\--parallel + +Each “run” is conducted concurrently +(default: False) + +\--check + +Each query result is checked to determine if the result is correct. +{default False) + +\-p port, --vectorport port + +The Vector Server Port (default: 5000) + +\-a HOST, --host HOST + +The Vector Server’s IP Address or Host Name (default: localhost) + +\-A HOST:PORT [HOST:PORT ...], --hosts HOST:PORT [HOST:PORT ...] + +A list of host and optional port. Each pair is separated by a space. +Example: 'hosta:5000' or 'hostb' (default: [localhost:5000]) + +If provided, each query request is distributed over the list of hosts. + +Note: if this is provided, “—host” and “—port” arguments are ignored. + +\-l, --vectorloadbalancer + +Use Vector's Load Balancer. + +Note: if “—hosts” argument is used, only the first host in the list is used (reminding hosts are ignored) + +(default: False) + +\-T, --vectortls + +Use TLS to connect to the Vector Server + +(default: False) + +\-N NS, --idxnamespace NS + +Aerospike Namespace where the vector index will be located. +Defaults to the value of “—namespace”. + +\-s SET, --setname SET + +The Aerospike Set Name +(default: HDF-data) + +\-I IDX, --idxname IDX + +The Vector Index Name. +Defaults to the Set Name (--setname) with the suffix of '_idx' + +\-g, --generatedetailsetname + +Generates a unique Set Name (--setname) based on distance type, dimensions, index params, etc. +(default: False) + +\-S PARM, --searchparams PARM + +The Vector's Search Params (HnswParams) as defined by the Vector Phyton API. +Defaults to --indexparams + +\--metric TYPE + +Determines how recall is calculated. Defaults to “KNN”. Possible values are: + +- knn (default) +- epsilon -- Epsilon 0.01 Recall +- largeepsilon -- Epsilon 0.1 Recall +- rel -- Relative Error + +\-L LOG, --logfile LOG + +The logging file path, if provided. +The default is to stdout. + +\--loglevel LEVEL + +The Logging level (default: INFO) + +\--driverloglevel DLEVEL + +The Vector Phyton Driver's Logging level (default: NOTSET) + +\--prometheus PORT + +The Prometheus Port (default: 9464) + +\--prometheushb SECS + +Prometheus heartbeat in secs. The heartbeat updates common information to Prometheus +(default: 5 seconds) + +\--exitdelay wait + +Upon exist, the module will sleep ensuring all Prometheus events are captured +(default: 20) + +# hdf_create_dataset.py + +This module creates an HDF file from an existing vector dataset. This dataset can be an ANN or a user defined dataset. + +\-h, --help show this help message and exit + +\-idx INDEXNAME, --indexname INDEXNAME + +Vector's Index Name. +Required + +\--hdf HDFFILE A HDF file that will be created in the 'data' folder by default. + +You can provide a path, if for a different folder. +Required + +\-a HOST:PORT [HOST:PORT ...], --hosts HOST:PORT [HOST:PORT ...] + +A list of Aerospike host and optional ports (defaults to 3000). Example: + +'hosta:3000' or 'hostb' (default: ['localhost:3000']) + +\--policies POLICIES Aerospike connection policies + +(default: {"read": {"total_timeout": + +1000}}) + +\-A HOST:PORT [HOST:PORT ...], --vectorhosts HOST:PORT [HOST:PORT ...] + +A list of Aerospike Vector host and optional ports (defaults to 5000). + +Example: 'hosta:5000' or 'hostb' (default: ['localhost:5000']) + +\-l, --vectorloadbalancer + +Use Vector's DB Load Balancer (default: False) + +\-T, --vectortls Use TLS to connect to the Vector DB Server (default: False) + +\-idxns INDEXNAME, --indexnamespace INDEXNAME + +Vector's Index Namespace. + +(default test) + +\-S PARM, --searchparams PARM + +The Vector's Search Params (HnswParams) as a Json value used to obtain the neighbors + +\-pk BINNAME, --pkbinname BINNAME + +The Bin Name that represents the Primary Key for a record. If not provided the Aerospike PK will try to be used, if the PK value is returned. + +If the Aerospike PK is not a value (digest), PK array will not be part of the HDF dataset. (default: \_proximus_uk_) + +\-n NEIGHBORS, --neighbors NEIGHBORS + +The number of neighbors to return from the query. (default: 100) + +\--testsize VALUE + +If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. + +If int, represents the absolute number of test samples. + +If None, the value is set to the complement of the train size. If \`\`trainsize\`\` is also None, it will be set to 0.25 + +(default: 0.1) + +\--trainsize VALUE + +If float, should be between 0.0 and 1.0 and represent the proportion on the dataset to include in the train split. + +If int, represents absolute number of train samples. If None, the value is automatically set to the complement of the test size. + +(default: None) + +\--randomstate VALUE + +Controls the shuffling applied to the data before applying the split. Pass an int for reproducible output across multiple function calls. + +Can be an int, RandomState instance or None. + +Using a default of 1 as defined in the ANN benchmark. See this [link](https://scikit-learn.org/dev/modules/generated/sklearn.model_selection.tr) for more information. + +(default: 1) + +\--usetrainingds + +Creates the training dataset based on the actual vectors from the DB. This will use the Bruteforce/k-nn method to calculate the neighbors. The default is to use all vector records in the DB and a sampling is taken to conduct the searches using the Aerospike implementation. + +(default: False) + +\--metric TYPE + +Which metric to use to calculate Recall. + +(default: k-nn) + +\-L LOG, --logfile LOG + +The logging file path. The default is no logging to a file. + +(default: None) + +\--loglevel LEVEL + +The Logging level + +(default: INFO) + +# Prometheus + +The module outputs certain meters to Prometheus. They are: + +- `aerospike.hdf.heartbeat This event is defined as a gauge. It provides information about the status of the module including the following attributes:` + - `"ns” – Aerospike Namespace` + - `"set” – Aerospike Set ` + - `"idxns” – Aerospike Vector Index Namespace` + - `"idx" – The Aerospike Vector Index Name ` + - `"idxbin" – The Vector’s Bin Name` + - `"idxdist" – The Vector’s API Distance Type` + - `"dims" – Vector’s dimensions` + - `"poprecs" – The number of records in the ANN dataset that will be populated` + - `"queries": The total number of queries used for this dataset ` + - `“querynbrlmt” – The number of possible neighbors to return for the query` + - `“queryruns” – The number of query runs. Each run will perform a set of define queries. The number is defined in “queries”. ` + - `"querycurrun" – The current query run ` + - `"querymetric – The recall method used` + - `"querymetricvalue" – The recall calculated value based on recall method` + - `"dataset” – The ANN dataset` + - `"paused" – The run status. Values are:` + - `Waiting – waiting for index completion` + - `Paused – paused due to resource exhausted` + - `Running - Populating` + - `Idle – before population, after, or after waiting…` + - `Done – Population/Wait done` + - `"action” – If importing (populating) or querying` + - "remainingRecs" – The current number of records that have not been populated + - "remainingquerynbrs” - The total number of queries (includes all runs) that have not been executed +- `aerospike.hdf.populate Current record rate that have been upserted. Defined as a counter. Attributes:` + - `"type" -- upsert` + - `"ns" -- Namespace` + - `"set" – Set Name` +- `aerospike.hdf.query Current query rate. Defined as a counter. Attributes:` + - `"ns" -- Namespace` + - `"` `idx" – Index Name` + - `“run” – The run associated with this query` +- `aerospike.hdf.exception Current exception rate. Defined as a counter. Attributes:` + - `"exception_type" – Type of exception` + - `"handled_by_user" – if handled by user code` + - `"ns" -- Namespace` + - `"set" – Set` + - `“idx” – Index name` +- `aerospike.hdf.waitidxcompletion Current number of waiting for index merge completions being conducted. Defined as a counter. Attributes:` + - `"ns" – Index Namespace` + - `"idx" – Index Name` +- `aerospike.hdf.dropidxtime The amount of time to perform an index drop. Defined as a histogram. Attributes:` + - `"ns" – Index Namepsace` + - `"idx" – Index Name` + +# Supported Datasets + +- deep-image-96-angular +- fashion-mnist-784-euclidean +- gist-960-euclidean +- glove-25-angular +- glove-50-angular +- glove-100-angular +- glove-200-angular +- mnist-784-euclidean +- random-xs-20-euclidean +- random-s-100-euclidean +- random-xs-20-angular +- random-s-100-angular +- random-xs-16-hamming +- random-s-128-hamming +- random-l-256-hamming +- random-s-jaccard +- random-l-jaccard +- sift-128-euclidean +- nytimes-256-angular +- nytimes-16-angular +- sift-256-hamming +- kosarak-jaccard +- movielens1m-jaccard +- movielens10m-jaccard +- movielens20m-jaccard +- dbpedia-openai-100k-angular +- dbpedia-openai-200k-angular +- dbpedia-openai-300k-angular +- dbpedia-openai-400k-angular +- dbpedia-openai-500k-angular +- dbpedia-openai-600k-angular +- dbpedia-openai-700k-angular +- dbpedia-openai-800k-angular +- dbpedia-openai-900k-angular +- dbpedia-openai-1000k-angular + +## Possible additional sites (can be more) + +Any new dataset needs to be defined to mini-ANN. + +- +- +- + +## Dashboard + +You can import the dashboard from [here](https://github.com/aerospike-community/ann-benchmarks/blob/main/aerospike/AerospikeHDFDashboard.json) (AerospikeHDFDashboard.json). + +### hdf-import dashboard example + +![dashboard import](./readme-HDFImportDashboard.png) + +### hdf-query dashboard example + +![dashboard query](./readme-HDFQueryDashboard.png) diff --git a/aerospike/aerospikedataset.py b/aerospike/aerospikedataset.py new file mode 100644 index 00000000..563166de --- /dev/null +++ b/aerospike/aerospikedataset.py @@ -0,0 +1,522 @@ +import asyncio +import numpy as np +import time +import json +import argparse +import logging +import aerospike + +from logging import _nameToLevel as LogLevels +from typing import Iterable, List, Union, Any, Tuple +from importlib.metadata import version + +from aerospike_vector_search import types as vectorTypes +from aerospike_vector_search.aio import AdminClient as vectorASyncAdminClient, Client as vectorASyncClient +from metrics import all_metrics as METRICS, DummyMetric + +logger = logging.getLogger(__name__) +logFileHandler = None + +class AerospikeDS(): + + @staticmethod + def parse_arguments(parser: argparse.ArgumentParser) -> None: + ''' + Adds the arguments required to create a dataset from a Aerospike vector set. + ''' + + parser.add_argument( + '-a', '--hosts', + metavar="HOST:PORT", + nargs='+', + help="A list of Aerospike host and optional ports (defaults to 3000). Example: 'hosta:3000' or 'hostb'", + default=['localhost:3000'], + ) + parser.add_argument( + '--policies', + metavar="POLICIES", + type=json.loads, + help="Aerospike connection policies", + default='{"read": {"total_timeout": 1000}}' + ) + parser.add_argument( + '-A', '--vectorhosts', + metavar="HOST:PORT", + nargs='+', + help="A list of Aerospike Vector host and optional ports (defaults to 5000). Example: 'hosta:5000' or 'hostb'", + default=['localhost:5000'], + ) + parser.add_argument( + '-l', "--vectorloadbalancer", + help="Use Vector's DB Load Balancer", + action='store_true' + ) + parser.add_argument( + '-T', "--vectortls", + help="Use TLS to connect to the Vector DB Server", + action='store_true' + ) + parser.add_argument( + "--hdf", + metavar="HDFFILE", + help="A HDF file that will be created in the 'data' folder by default", + default=None, + type=str, + required=True, + ) + parser.add_argument( + '-idxns', "--indexnamespace", + metavar="INDEXNAME", + help="Vector's Index Namespace.", + required=False, + default="test", + type=str + ) + parser.add_argument( + '-idx', "--indexname", + metavar="INDEXNAME", + help="Vector's Index Name", + required=True + ) + parser.add_argument( + '-S', "--searchparams", + metavar="PARM", + type=json.loads, + help="The Vector's Search Params (HnswParams), as Json, used to obtained the neighbors", + default=None + ) + parser.add_argument( + '-pk', "--pkbinname", + metavar="BINNAME", + type=str, + help=''' + The Bin Name that represents the Primary Key for a record. + If not provided the Aerospike PK will try to be used if the PK value is returned. + If the Aerospike PK is not a value (digest), PK array will not be part of the HDF dataset (None). + ''', + default="_proximus_uk_" + ) + parser.add_argument( + "-n", "--neighbors", + metavar="NEIGHBORS", + type=int, + help="The number of neighbors to return from the query.", + default=100, + ) + parser.add_argument( + "--testsize", + metavar="VALUE", + help=''' + If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. + If int, represents the absolute number of test samples. + If None, the value is set to the complement of the train size. + If ``trainsize`` is also None, it will be set to 0.25. + ''', + type=Union[float,int,None], + default=0.1, + ) + parser.add_argument( + "--trainsize", + metavar="VALUE", + help=''' + If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the train split. + If int, represents the absolute number of train samples. + If None, the value is automatically set to the complement of the test size. + ''', + type=Union[float,int,None], + default=None, + ) + parser.add_argument( + "--randomstate", + metavar="VALUE", + help=''' + Controls the shuffling applied to the data before applying the split. + Pass an int for reproducible output across multiple function calls. + See :term:`Glossary `. + Can be a int, RandomState instance or None. + Using a default of 1 as defined in the ANN benchmark. + See https://scikit-learn.org/dev/modules/generated/sklearn.model_selection.train_test_split.html#sklearn.model_selection.train_test_split + ''', + default=1, + ) + parser.add_argument( + "--usetrainingds", + help=''' + Creates the training dataset based on the actual vectors from the DB. + This will use the Bruteforce/k-nn method to calculate the neighbors. + The defualt is to use all vector records in the DB and a sampling is taken to conduct the searches using the Aerospike implementation. + ''', + action='store_true' + ) + parser.add_argument( + "--metric", + metavar="TYPE", + help="Which metric to use to calculate Recall.", + default="k-nn", + type=str, + choices=METRICS.keys(), + ) + parser.add_argument( + '-L', "--logfile", + metavar="LOG", + help="The logging file path. Default is no logging to a file.", + default=None, + ) + parser.add_argument( + "--loglevel", + metavar="LEVEL", + help="The Logging level", + default="INFO", + choices=LogLevels.keys(), + ) + + def __init__(self, runtimeArgs: argparse.Namespace) -> None: + from datasets import get_dataset_fn + + self._hosts = [] + for pos, host in enumerate(runtimeArgs.hosts): + parts = host.split(':') + if len(parts) == 1: + self._hosts.append((parts[0], 3000)) + elif len(parts) == 2: + self._hosts.append((parts[0],int(parts[1]))) + self._vector_hosts : List[vectorTypes.HostPort] = [] + for pos, host in enumerate(runtimeArgs.vectorhosts): + parts = host.split(':') + if len(parts) == 1: + self._vector_hosts.append(vectorTypes.HostPort(host=host,port=5000,is_tls=bool(runtimeArgs.vectortls))) + elif len(parts) == 2: + self._vector_hosts.append(vectorTypes.HostPort(host=parts[0],port=int(parts[1]),is_tls=bool(runtimeArgs.vectortls))) + + self._as_namespace : str = None + self._as_set : str = None + self._as_pkbinname : str = runtimeArgs.pkbinname + self._vector_lb : bool = runtimeArgs.vectorloadbalancer + self._hdf_path, self._ann_dataset = get_dataset_fn(runtimeArgs.hdf) + self._vector_namespace : str = runtimeArgs.indexnamespace + self._vector_name : str = runtimeArgs.indexname + self._as_vectorbinname : str = None + self._vector_searchparams = None if runtimeArgs.searchparams is None else AerospikeDS.set_hnsw_params_attrs(vectorTypes.HnswSearchParams(), runtimeArgs.searchparams) + self._vector_distance : vectorTypes.VectorDistanceMetric = None + self._vector_hnsw : dict = None + self._vector_dimensions : int = None + self._vector_neighbors : int = runtimeArgs.neighbors + self._vector_trainsize = runtimeArgs.trainsize + self._vector_testsize = runtimeArgs.testsize + self._vector_randomstate = runtimeArgs.randomstate + self._vector_usetrainingds = runtimeArgs.usetrainingds + self._vector_metric = METRICS[runtimeArgs.metric] + + self._logging_init(runtimeArgs, logger) + + self._as_clientconfig = { + 'hosts': self._hosts, + 'policies': runtimeArgs.policies, + } + + def _logging_init(self, runtimeArgs: argparse.Namespace, logger: logging.Logger) -> None: + + global logFileHandler + + self._logFilePath = runtimeArgs.logfile + self._logLevel = runtimeArgs.loglevel + self._logger = logger + self._loggingEnabled = False + + if self._logFilePath is not None and self._logFilePath and self._logLevel != "NOTSET": + print(f"Logging to file {self._logFilePath}") + if logFileHandler is None: + logFileHandler = logging.FileHandler(self._logFilePath, "w") + logFormatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + logFileHandler.setFormatter(logFormatter) + self._logger.addHandler(logFileHandler) + self._logger.setLevel(logging.getLevelName(self._logLevel)) + self._logFileHandler = logFileHandler + self._loggingEnabled = True + self._logger.info(f'Start Aerospike: Metric: {self}') + self._logger.info(f" aerospike: {version('aerospike')}") + self._logger.info(f" aerospike-vector-search: {version('aerospike_vector_search')}") + #self._logger.info(f"Prometheus HTTP Server: {self._prometheus_http_server[0].server_address}") + #self._logger.info(f" Metrics Name: {self._meter.name}") + self._logger.info(f"Arguments: {runtimeArgs}") + + def flush_log(self) -> None: + if(self._logger.handlers is not None): + for handler in self._logger.handlers: + handler.flush() + + def print_log(self, msg :str, logLevel :int = logging.INFO) -> None: + if self._loggingEnabled: + self._logger.log(level=logLevel, msg=msg) + if logLevel == logging.INFO: + print(msg + f', Time: {time.strftime("%Y-%m-%d %H:%M:%S")}') + elif logLevel == logging.WARN or logLevel == logging.ERROR or logLevel == logging.CRITICAL: + levelName = "" if logLevel == logging.INFO else f" {logging.getLevelName(logLevel)}: " + print(levelName + msg + f', Time: {time.strftime("%Y-%m-%d %H:%M:%S")}') + else: + levelName = "" if logLevel == logging.INFO else f" {logging.getLevelName(logLevel)}: " + print(levelName + msg + f', Time: {time.strftime("%Y-%m-%d %H:%M:%S")}') + + @staticmethod + def _vector_hostport_str(vectorhosts : List[vectorTypes.HostPort]) -> str: + return ",".join(f"{hp.host}:{hp.port}" for hp in vectorhosts) + + @staticmethod + def set_hnsw_params_attrs(__obj :object, __dict: dict) -> object: + for key in __dict: + if key == 'batching_params': + setattr( + __obj, + key, + AerospikeDS.set_hnsw_params_attrs( + vectorTypes.HnswBatchingParams(), + __dict[key].asdict() + ) + ) + else: + setattr(__obj, key, __dict[key]) + return __obj + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + self.flush_log() + + async def index_exist(self, adminClient: vectorASyncAdminClient) -> Union[dict, None]: + existingIndexes = await adminClient.index_list() + if len(existingIndexes) == 0: + return None + return [(index if index["id"]["namespace"] == self._vector_namespace + and index["id"]["name"] == self._vector_name else None) + for index in existingIndexes][0] + + async def populate_vector_info(self) -> None: + ''' + Updates AerospikeDS instance with attributes about the vector index. + ''' + from baseaerospike import _distanceNameToAerospikeType as DISTANCETYPES + + self.print_log(f'Updating instance with Aerospike Vector Idx attributes for {self._vector_namespace}.{self._vector_name} on {AerospikeDS._vector_hostport_str(self._vector_hosts)}') + + async with vectorASyncAdminClient(seeds=self._vector_hosts, + listener_name=None, + is_loadbalancer=self._vector_lb + ) as adminClient: + + idxAttribs = await self.index_exist(adminClient) + if idxAttribs is None: + self.print_log(f'populate_vector_info: Vector Index: {self._vector_namespace}.{self._vector_name}, not found') + raise FileNotFoundError(f"Vector Index {self._vector_namespace}.{self._vector_name} not found") + + self._as_namespace : str = idxAttribs['storage']['namespace'] + self._as_set : str = idxAttribs['setFilter'] + self._as_vectorbinname : str = idxAttribs["field"] + self._vector_distance : vectorTypes.VectorDistanceMetric = vectorTypes.VectorDistanceMetric[idxAttribs["vectorDistanceMetric"]] + self._vector_hnsw : dict = idxAttribs['hnsw_params'] + self._vector_dimensions : int = idxAttribs['dimensions'] + self._vector_ann_distance : str = next((anndisttype for anndisttype, disttype in DISTANCETYPES.items() if disttype == self._vector_distance)) + + async def Generate_hdf_dataset(self) -> str: + import h5py + from string import digits + + self.print_log(f"Creating HDF dataset {self._hdf_path}") + + pkarray = await self.get_pk_from_as_set() + (train, + sampletrain, + test, + neighbors, + distances, + metricresult) = await self.get_vectors(pkarray) + + if self._vector_usetrainingds: + train = sampletrain + + with h5py.File(self._hdf_path, "w") as f: + f.attrs["type"] = "dense" + f.attrs["distance"] = self._vector_ann_distance + f.attrs["dimension"] = len(train[0]) + f.attrs["point_type"] = train[0].dtype.name.rstrip(digits) + if metricresult is not None: + f.attrs["recall"] = metricresult + f.attrs["recallmethod"] = self._vector_metric["type"] + print(f"train size: {train.shape[0]} * {train.shape[1]}") + print(f"test size: {test.shape[0]} * {test.shape[1]}") + f.create_dataset("train", data=train) + f.create_dataset("test", data=test) + f.create_dataset("neighbors", data=neighbors) + if distances is not None: + f.create_dataset("distances", data=distances) + if not self._vector_usetrainingds: + f.create_dataset("primarykeys", data=pkarray) + hdfpath = f.filename + self.print_log(f"Created HDF dataset '{hdfpath}'") + + return hdfpath + + async def get_pk_from_as_set(self) -> List: + ''' + Read records from the Aerospike DB and return an array of the PKs. + ''' + self.print_log(f"Opening connecction to Aerospike DB Cluster {self._as_clientconfig}") + + pkarray = [] + + client = aerospike.client(self._as_clientconfig).connect() + try: + self.print_log(f"Connected to Aerospike DB Cluster {client.info_random_node('version')}") + + self._logger.debug(f"Creating Query on {self._as_namespace}.{self._as_set} for PK { 'PK' if self._as_pkbinname == None else self._as_pkbinname}") + + query = client.query(self._as_namespace, self._as_set) + if self._as_pkbinname is not None: + query.select(self._as_pkbinname) + records = query.results() + + self.print_log(f"Aerospike Query record number: {len(records)}") + + for record in records: + key, _, bins = record + pkvalue = None + if self._as_pkbinname is None: + _, _, primaryKeyValue, digest = key + if primaryKeyValue is not None: + pkvalue = primaryKeyValue + else: + pkvalue = digest + elif self._as_pkbinname in bins: + pkvalue = bins[self._as_pkbinname] + + pkarray.append(pkvalue) + + finally: + self._logger.debug("closing connection to DB") + client.close() + + if len(pkarray) == 0 or all(p is None for p in pkarray): + pkarray is None + + self.print_log(f"Closed connecction to Aerospike DB Cluster {self._as_clientconfig} with PK array of {0 if pkarray is None else len(pkarray)}") + + return pkarray + + async def calculate_knn_neighbor_distance(self, train: np.ndarray, test: np.ndarray) -> Tuple[np.ndarray[np.ndarray], np.ndarray[np.ndarray]]: + ''' + Determines the neighbors and associated distance from the training and test datasets + using Bruteforce/k-nn method. + returns the neighbors and distances. + ''' + from bruteforce import BruteForceBLAS + + self.print_log(f"Determing neighbors/distances from training and test datasets") + bf = BruteForceBLAS(self._vector_ann_distance, + precision=train.dtype) + bf.fit(train) + + neighbors = [] + distances = [] + + for i, x in enumerate(test): + if i % 1000 == 0: + print(f"BruteForce (k-nn) {i}/{len(test)}...") + + # Query the model and sort results by distance + res = list(bf.query_with_distances(x, self._vector_neighbors)) + res.sort(key=lambda t: t[-1]) + + # Save neighbors indices and distances + neighbors.append(np.array([idx for idx, _ in res])) + distances.append(np.array([dist for _, dist in res])) + + self.print_log(f"Determined neighbors ({len(neighbors)})/distances ({len(distances)})") + + return np.array(neighbors), np.array(neighbors) + + async def get_vectors(self, pkarray : List) -> Tuple[np.ndarray[np.ndarray], np.ndarray[np.ndarray], np.ndarray[np.ndarray], np.ndarray[np.ndarray], np.ndarray[np.ndarray], Union[float, int, None]]: + ''' + Gets the corresponding vectors based on the pkarray. + It returns: + training, + calculated training (based on smapling of all vector records), + test (used to perform searches), + neighbors (result from the test dataset), + distances (only provided when neighbors are calculated via k-nn method) + metricvalue (recal) value (can be None) + ''' + from sklearn.model_selection import train_test_split as sklearn_train_test_split + + self.print_log(f'get_vectors based on PKs ({len(pkarray)})') + + async with vectorASyncClient(seeds=self._vector_hosts, + is_loadbalancer=self._vector_lb + ) as client: + self.print_log(f'Opened connection to Vectors on {AerospikeDS._vector_hostport_str(self._vector_hosts)} for set {self._as_namespace}.{self._as_set}') + vectors = [] + + for pk in pkarray: + record = await client.get(namespace=self._as_namespace, + key=pk, + field_names=[self._as_vectorbinname], + set_name=self._as_set) + vectors.append(np.array(record.fields[self._as_vectorbinname])) + + vectors = np.array(vectors) + self.print_log(f"Splitting {len(vectors)}*{self._vector_dimensions} into train/test with sizes Test:{self._vector_testsize}, Train: {self._vector_trainsize}, Random State: {self._vector_randomstate}") + + X_train, X_test = sklearn_train_test_split(vectors, + test_size=self._vector_testsize, + train_size=self._vector_trainsize, + random_state=self._vector_randomstate) + + distanceds : np.array[np.array] = None + neighborsds : np.array[np.array] = None + metricvalue = None + + if self._vector_usetrainingds: + self.print_log(f"Split vector DS into training {X_train.shape} and test {X_test.shape} DSs. Using Brunte ", logging.WARN) + neighborsds, distanceds = await self.calculate_knn_neighbor_distance(X_test, X_train) + else: + self.print_log(f"Using vector orginal DS {vectors.shape} and test DS {X_test.shape} (Training DS, which is not used, is {X_train.shape}).") + neighborsds = await self.conduct_search(client, X_test) + + metricfunc = None if self._vector_metric is None else self._vector_metric["function"] + if metricfunc is not None: + try: + metricvalue = metricfunc(X_test, X_test, DummyMetric(), 0, len(X_test[0])) + self.print_log(f"{self._vector_metric['type']} Recall/Metric Value: {metricvalue}") + except Exception as e: + self.print_log(f"Recall/Metric caculation failed with '{e}'", logging.ERROR) + + zeronbrs = [len(neighbor) == 0 for neighbor in neighborsds].count(True) + if zeronbrs > 0: + self.print_log(f"Found {zeronbrs} neighbors in resulting Search.", logging.WARN) + + return vectors, X_train, X_test, neighborsds, distanceds, metricvalue + + async def conduct_search(self, client:vectorASyncClient, testds : np.ndarray[np.ndarray]) -> np.ndarray: + + self.print_log(f"Performing Search using Test DS {testds.shape}") + neighborsds = [] + for searchitem in testds: + neighbors = await self.search_vector(client, searchitem.tolist()) + if len(neighbors) > 0: + result_ids = [neighbor.key.key for neighbor in neighbors] + neighborsds.append(np.array(result_ids)) + else: + self._logger.debug(f"Found zero neighbors in resulting Search.", logging.WARN) + neighborsds.append(np.empty()) + + neighborsds = np.array(neighborsds) + self.print_log(f"Search Completed resulting in a neighbors DS {neighborsds.shape}") + + return neighborsds + + async def search_vector(self, client:vectorASyncClient, query:List[float|bool]) -> List[vectorTypes.Neighbor]: + + return await client.vector_search(namespace=self._vector_namespace, + index_name=self._vector_name, + query=query, + limit=self._vector_neighbors, + search_params=self._vector_searchparams) diff --git a/aerospike/aerospikehdf.py b/aerospike/aerospikehdf.py index 857d0ea9..c7a77b6a 100644 --- a/aerospike/aerospikehdf.py +++ b/aerospike/aerospikehdf.py @@ -14,7 +14,7 @@ from aerospike_vector_search.shared.proto_generated.types_pb2_grpc import grpc as vectorResultCodes from baseaerospike import BaseAerospike, _distanceNameToAerospikeType as DistanceMaps, OperationActions -from datasets import DATASETS, load_and_transform_dataset +from datasets import DATASETS, load_and_transform_dataset, get_dataset_fn from metrics import all_metrics as METRICS, DummyMetric logger = logging.getLogger(__name__) @@ -34,7 +34,14 @@ def parse_arguments_population(parser: argparse.ArgumentParser) -> None: metavar="DS", help="the dataset to load training points from", default="glove-100-angular", - choices=DATASETS.keys(), + choices=DATASETS.keys(), + ) + parser.add_argument( + "--hdf", + metavar="HDFFILE", + help="A HDF file that will be the dataset to load training points from... Defaults to 'data' folder", + default=None, + type=str ) parser.add_argument( '-c', "--concurrency", @@ -93,10 +100,17 @@ def parse_arguments_query(parser: argparse.ArgumentParser) -> None: parser.add_argument( '-d', "--dataset", metavar="DS", - help="the dataset to load training points from", + help="the dataset to load the search points from", default="glove-100-angular", choices=DATASETS.keys(), ) + parser.add_argument( + "--hdf", + metavar="HDFFILE", + help="A HDF file that will be the dataset to load search points from... Defaults to 'data' folder", + default=None, + type=str + ) parser.add_argument( '-r', "--runs", metavar="RUNS", @@ -125,7 +139,7 @@ def parse_arguments_query(parser: argparse.ArgumentParser) -> None: "--metric", metavar="TYPE", help="Which metric to use to calculate Recall", - default="knn", + default="k-nn", type=str, choices=METRICS.keys(), ) @@ -144,6 +158,11 @@ def __init__(self, runtimeArgs: argparse.Namespace, actions: OperationActions): self._neighbors : Union[np.ndarray, List[np.ndarray]] = None self._dataset = None self._pausePuts : bool = False + self._pks : Union[np.ndarray, List[np.ndarray]] = None + self._hdf_file : str = None + + if runtimeArgs.hdf is not None: + self._hdf_file, self._datasetname = get_dataset_fn(runtimeArgs.hdf) if OperationActions.POPULATION in actions: self._idx_drop = runtimeArgs.idxdrop @@ -170,7 +189,13 @@ async def get_dataset(self) -> None: self.print_log(f'get_dataset: {self}') - self._trainarray, self._queryarray, self._neighbors, distance, self._dataset, self._dimensions = load_and_transform_dataset(self._datasetname) + (self._trainarray, + self._queryarray, + self._neighbors, + distance, + self._dataset, + self._dimensions, + self._pks) = load_and_transform_dataset(self._datasetname, self._hdf_file) if self._idx_distance is None or not self._idx_distance: self._idx_distance = DistanceMaps.get(distance.lower()) @@ -192,7 +217,10 @@ async def get_dataset(self) -> None: self._remainingrecs = 0 self._remainingquerynbrs = 0 - + + if self._dataset.attrs.get("recall", None) is not None: + self.print_log(f"Precalculated Recall value found {self._dataset.attrs['recall']}") + self.prometheus_status(0) self.print_log(f'get_dataset Exit: {self}, Train Array: {len(self._trainarray)}, Query Array: {len(self._queryarray)}, Distance: {distance}, Dimensions: {self._dimensions}') @@ -291,9 +319,11 @@ async def _resourceexhaused_handler(self, key: int, embedding, i: int, client: v else: await self._put_wait_sleep_handler(key, embedding, i, client, logLevel) - async def put_vector(self, key: int, embedding, i: int, client: vectorASyncClient, retry: bool = False) -> None: + async def put_vector(self, key, embedding, i: int, client: vectorASyncClient, retry: bool = False) -> None: try: try: + if type(key).__module__ == np.__name__: + key = key.item() await client.upsert(namespace=self._namespace, set_name=self._setName, key=key, @@ -368,8 +398,11 @@ async def populate(self) -> None: taskPuts = [] i = 1 self._populate_counter.add(0, {"type": "upsert","ns":self._namespace,"set":self._setName}) - + usePKValues : bool = self._pks is not None + for key, embedding in enumerate(self._trainarray): + if usePKValues: + key = self._pks[key] if self._pausePuts: loopTimes = 0 await asyncio.gather(*taskPuts) @@ -403,10 +436,11 @@ async def populate(self) -> None: taskPuts.clear() print('Index Put Counter [%d]\r'%i, end="") - if self._idx_maxrecs >= 0 and i >= self._idx_maxrecs: - break i += 1 + if self._idx_maxrecs >= 0 and i > self._idx_maxrecs: + break + i -= 1 logger.debug(f"Waiting for Put Tasks (finial {len(taskPuts)}) to Complete at {i}") await asyncio.gather(*taskPuts) self._populate_counter.add(len(taskPuts), {"type": "upsert","ns":self._namespace,"set":self._setName}) @@ -466,6 +500,7 @@ async def query(self) -> None: queries.append(resultnbrs) if metricfunc is not None: self._query_metric_value = metricfunc(self._neighbors, resultnbrs, DummyMetric(), i-1, len(resultnbrs[0])) + self._logger.info(f"Run: {i}, Neighbors: {len(resultnbrs)}, {self._query_metric["type"]}: {self._query_metric_value}") i += 1 results = await asyncio.gather(*taskPuts) diff --git a/aerospike/datasets.py b/aerospike/datasets.py index 97550022..0e571840 100644 --- a/aerospike/datasets.py +++ b/aerospike/datasets.py @@ -21,7 +21,7 @@ def download(source_url: str, destination_path: str) -> None: urlretrieve(source_url, destination_path) -def get_dataset_fn(dataset_name: str) -> str: +def get_dataset_fn(dataset_name: str) -> Tuple[str,str]: """ Returns the full file path for a given dataset name in the data directory. @@ -29,14 +29,26 @@ def get_dataset_fn(dataset_name: str) -> str: dataset_name (str): The name of the dataset. Returns: - str: The full file path of the dataset. + str: The full file path of the dataset and the dataset name. """ if not os.path.exists("data"): os.mkdir("data") - return os.path.join("data", f"{dataset_name}.hdf5") - + + filename, fileext = os.path.splitext(dataset_name) + filenamewext : str = dataset_name + + if fileext is None or not fileext: + filenamewext = f"{filename}.hdf5" + + if (filenamewext[0] == os.path.sep + or filenamewext.startswith(f"data{os.path.sep}") + or filenamewext.startswith(f".{os.path.sep}")): + splitpath = os.path.split(filename) + return filenamewext, splitpath[1] + + return os.path.join("data", filenamewext), filename -def get_dataset(dataset_name: str) -> Tuple[h5py.File, int]: +def get_dataset(dataset_name: str, hdfpath : str = None) -> Tuple[h5py.File, int]: """ Fetches a dataset by downloading it from a known URL or creating it locally if it's not already present. The dataset file is then opened for reading, @@ -49,16 +61,25 @@ def get_dataset(dataset_name: str) -> Tuple[h5py.File, int]: Tuple[h5py.File, int]: A tuple containing the opened HDF5 file object and the dimension of the dataset. """ - hdf5_filename = get_dataset_fn(dataset_name) - try: - dataset_url = f"https://ann-benchmarks.com/{dataset_name}.hdf5" - download(dataset_url, hdf5_filename) - except: - print(f"Cannot download {dataset_url}") - if dataset_name in DATASETS: - print("Creating dataset locally") - DATASETS[dataset_name](hdf5_filename) - + if hdfpath is None: + hdf5_filename, dataset_name = get_dataset_fn(dataset_name) + else: + hdf5_filename = hdfpath + + if dataset_name in DATASETS.keys(): + try: + dataset_url = f"https://ann-benchmarks.com/{dataset_name}.hdf5" + download(dataset_url, hdf5_filename) + except: + print(f"Cannot download {dataset_url}") + if dataset_name in DATASETS: + print("Creating dataset locally") + DATASETS[dataset_name](hdf5_filename) + elif os.path.isfile(hdf5_filename): + DATASETS.update({dataset_name:hdf5_filename}) + else: + raise FileNotFoundError(f"HDF File '{hdf5_filename}' doesn't exist.") + hdf5_file = h5py.File(hdf5_filename, "r") # here for backward compatibility, to ensure old datasets can still be used with newer versions @@ -66,13 +87,14 @@ def get_dataset(dataset_name: str) -> Tuple[h5py.File, int]: dimension = int(hdf5_file.attrs["dimension"]) if "dimension" in hdf5_file.attrs else len(hdf5_file["train"][0]) return hdf5_file, dimension -def load_and_transform_dataset(dataset_name: str) -> Tuple[ +def load_and_transform_dataset(dataset_name: str, hdfpath : str = None) -> Tuple[ Union[numpy.ndarray, List[numpy.ndarray]], Union[numpy.ndarray, List[numpy.ndarray]], Union[numpy.ndarray, List[numpy.ndarray]], str, h5py.File, - int]: + int, + Union[numpy.ndarray, List[numpy.ndarray]]]: """Loads and transforms the dataset. Args: @@ -80,8 +102,8 @@ def load_and_transform_dataset(dataset_name: str) -> Tuple[ Returns: Tuple: Transformed datasets. - """ - D, dimension = get_dataset(dataset_name) + """ + D, dimension = get_dataset(dataset_name, hdfpath) X_train = numpy.array(D["train"]) X_test = numpy.array(D["test"]) distance = D.attrs["distance"] @@ -89,8 +111,10 @@ def load_and_transform_dataset(dataset_name: str) -> Tuple[ print(f"Got a train set of size ({X_train.shape[0]} * {dimension})") print(f"Got {len(X_test)} queries") - train, test, neighbors = dataset_transform(D) - return train, test, neighbors, distance, D, dimension + train, test, neighbors, primarykeys = dataset_transform(D) + if primarykeys is not None and primarykeys.dtype == numpy.dtype('O'): + primarykeys = None + return train, test, neighbors, distance, D, dimension, primarykeys def dataset_transform(dataset: h5py.Dataset) -> Tuple[Union[numpy.ndarray, List[numpy.ndarray]], Union[numpy.ndarray, List[numpy.ndarray]], Union[numpy.ndarray, List[numpy.ndarray]]]: """ @@ -111,7 +135,8 @@ def dataset_transform(dataset: h5py.Dataset) -> Tuple[Union[numpy.ndarray, List[ return ( numpy.array(dataset["train"]), numpy.array(dataset["test"]), - numpy.array(dataset.get("neighbors")) + numpy.array(dataset.get("neighbors")), + numpy.array(dataset.get("primarykeys")) ) # we store the dataset as a list of integers, accompanied by a list of lengths in hdf5 @@ -119,7 +144,8 @@ def dataset_transform(dataset: h5py.Dataset) -> Tuple[Union[numpy.ndarray, List[ return ( convert_sparse_to_list(dataset["train"], dataset["size_train"]), convert_sparse_to_list(dataset["test"], dataset["size_test"]), - numpy.array(dataset.get("neighbors")) if dataset.get("size_neighbors") is None else convert_sparse_to_list(dataset["neighbors"], dataset["size_neighbors"]) + numpy.array(dataset.get("neighbors")) if dataset.get("size_neighbors") is None else convert_sparse_to_list(dataset["neighbors"], dataset["size_neighbors"]), + None ) def write_output(train: numpy.ndarray, test: numpy.ndarray, fn: str, distance: str, point_type: str = "float", count: int = 100) -> None: @@ -250,7 +276,7 @@ def train_test_split(X: numpy.ndarray, test_size: int = 10000, dimension: int = """ from sklearn.model_selection import train_test_split as sklearn_train_test_split - dimension = dimension if not None else X.shape[1] + dimension = dimension if dimension is not None else X.shape[1] print(f"Splitting {X.shape[0]}*{dimension} into train/test") return sklearn_train_test_split(X, test_size=test_size, random_state=1) diff --git a/aerospike/hdf_create_dataset.py b/aerospike/hdf_create_dataset.py new file mode 100644 index 00000000..5f3b6560 --- /dev/null +++ b/aerospike/hdf_create_dataset.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +import asyncio +import argparse + +from aerospikedataset import AerospikeDS + + +def parse_arguments() -> argparse.Namespace: + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + AerospikeDS.parse_arguments(parser) + + args = parser.parse_args() + + return args + +async def main_loop(args : argparse.Namespace) -> None: + + async with AerospikeDS(args) as asInstance: + await asInstance.populate_vector_info() + await asInstance.Generate_hdf_dataset() + +if __name__ == "__main__": + args = parse_arguments() + + asyncio.run(main_loop(args)) + + diff --git a/aerospike/metrics.py b/aerospike/metrics.py index a728be8b..9eb2170b 100644 --- a/aerospike/metrics.py +++ b/aerospike/metrics.py @@ -119,9 +119,9 @@ def create_group(self, name): return self.d[name] all_metrics = { - "knn": { + "k-nn": { "type":"knn", - "description": "Recall", + "description": "k-NN Recall", "function": lambda true_distances, run_distances, metrics, times, distance_count: knn( true_distances, run_distances, distance_count, metrics ).attrs[ diff --git a/aerospike/requirements.txt b/aerospike/requirements.txt index e8cf0cdf..53e34582 100644 --- a/aerospike/requirements.txt +++ b/aerospike/requirements.txt @@ -1,3 +1,6 @@ +h5py +numpy +aerospike aerospike-vector-search opentelemetry-api opentelemetry-sdk