Skip to content

Commit

Permalink
add convenience methods for most common crud actions
Browse files Browse the repository at this point in the history
  • Loading branch information
ThiagoTrabach committed Aug 3, 2023
1 parent 1a4ddb2 commit cb8303d
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions fhir_utils/healthcare_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

class HealthcareApi:
"""
This class provides methods for interacting with the Google Healthcare API.
This class provides the basic methods for interacting with the Google Healthcare API.
Based on examples from:
https://github.com/GoogleCloudPlatform/python-docs-samples/tree/3aa00a7549571b3a6ce8333d857226011e74a9be/healthcare/api-client/v1/fhir
https://github.com/GoogleCloudPlatform/python-docs-samples/tree/3aa00a7549571b3a6ce8333d857226011e74a9be/healthcare/api-client/v1beta1/fhir
Expand Down Expand Up @@ -58,12 +58,12 @@ def read(self, dataset_id: str, fhir_store_id: str, resource: str, resource_id:
print(f"Got contents of {resource} resource with ID {resource_id}:\n")

return {'response': response.status_code, 'payload': return_payload}



def read_lastupdated(self, dataset_id: str, fhir_store_id: str, resource: str, since: dict) -> dict:
"""Read the entries from a resource in the FHIR store that were last updated after a specific date and time (format: '2023-08-01T03:00:00.000Z')"""
def read_conditional(self, dataset_id: str, fhir_store_id: str, resource: str, condition: str) -> dict:
"""Read the entries from a resource in the FHIR store that were last updated after a specific date and time"""
resource_path = f"{self.url}/datasets/{dataset_id}/fhirStores/{fhir_store_id}/fhir/{resource}"
resource_path += f"?_lastUpdated=gt{since}"
resource_path += f"?{condition}"

response = self.session.get(resource_path, headers=self.header)
response.raise_for_status()
Expand All @@ -85,4 +85,19 @@ def delete(self, dataset_id: str, fhir_store_id: str, resource: str, resource_id
print(f"Deleted {resource} resource with ID {resource_id}.")

return {'response': response.status_code}



class FastCRUD(HealthcareApi):
"""The FastCRUD class is a subclass of HealthcareApi that provides additional convenience methods for most common CRUD actions."""
def __init__(self, project_id, location):
super().__init__(project_id, location)

def read_lastupdated(self, dataset_id: str, fhir_store_id: str, resource: str, since: str) -> dict:
"""Read the entries from a resource in the FHIR store that were last updated after a specific date and time"""
parameter = f"_lastUpdated=gt{since}"
return self.read_conditional(dataset_id, fhir_store_id, resource, parameter)

def read_patient(self, dataset_id: str, fhir_store_id: str, cpf: str):
"""Read the entries from Patient resource in the FHIR store based on tax id (CPF)"""
parameter = f"identifier=https://rnds-fhir.saude.gov.br/NamingSystem/cpf|{cpf}"
return self.read_conditional(dataset_id, fhir_store_id, 'Patient', parameter)

0 comments on commit cb8303d

Please sign in to comment.