diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs index 909dde07219..be0ffa578d1 100644 --- a/object_store/src/aws/credential.rs +++ b/object_store/src/aws/credential.rs @@ -20,7 +20,7 @@ use crate::client::retry::RetryExt; use crate::client::token::{TemporaryToken, TokenCache}; use crate::client::TokenProvider; use crate::util::hmac_sha256; -use crate::{Result, RetryConfig}; +use crate::{CredentialProvider, Result, RetryConfig}; use async_trait::async_trait; use bytes::Buf; use chrono::{DateTime, Utc}; @@ -542,6 +542,49 @@ async fn web_identity( }) } +/// Credentials sourced from a task IAM role +/// +/// +#[derive(Debug)] +pub struct TaskCredentialProvider { + pub url: String, + pub retry: RetryConfig, + pub client: Client, + pub cache: TokenCache>, +} + +#[async_trait] +impl CredentialProvider for TaskCredentialProvider { + type Credential = AwsCredential; + + async fn get_credential(&self) -> Result> { + self.cache + .get_or_insert_with(|| task_credential(&self.client, &self.retry, &self.url)) + .await + .map_err(|source| crate::Error::Generic { + store: STORE, + source, + }) + } +} + +/// +async fn task_credential( + client: &Client, + retry: &RetryConfig, + url: &str, +) -> Result>, StdError> { + let creds: InstanceCredentials = + client.get(url).send_retry(retry).await?.json().await?; + + let now = Utc::now(); + let ttl = (creds.expiration - now).to_std().unwrap_or_default(); + Ok(TemporaryToken { + token: Arc::new(creds.into()), + expiry: Some(Instant::now() + ttl), + }) +} + #[cfg(test)] mod tests { use super::*; diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 8de4b7c6afa..8a486f98679 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -46,7 +46,9 @@ use url::Url; pub use crate::aws::checksum::Checksum; use crate::aws::client::{S3Client, S3Config}; -use crate::aws::credential::{InstanceCredentialProvider, WebIdentityProvider}; +use crate::aws::credential::{ + InstanceCredentialProvider, TaskCredentialProvider, WebIdentityProvider, +}; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::{ @@ -87,9 +89,6 @@ pub use credential::{AwsAuthorizer, AwsCredential}; /// Default metadata endpoint static DEFAULT_METADATA_ENDPOINT: &str = "http://169.254.169.254"; -/// ECS metadata endpoint -static ECS_METADATA_ENDPOINT: &str = "http://169.254.170.2"; - /// A specialized `Error` for object store-related errors #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -399,6 +398,8 @@ pub struct AmazonS3Builder { checksum_algorithm: Option>, /// Metadata endpoint, see metadata_endpoint: Option, + /// Container credentials URL, see + container_credentials_relative_uri: Option, /// Client options client_options: ClientOptions, /// Credentials @@ -529,6 +530,11 @@ pub enum AmazonS3ConfigKey { /// - `metadata_endpoint` MetadataEndpoint, + /// Set the container credentials relative URI + /// + /// + ContainerCredentialsRelativeUri, + /// Client options Client(ClientConfigKey), } @@ -548,6 +554,9 @@ impl AsRef for AmazonS3ConfigKey { Self::MetadataEndpoint => "aws_metadata_endpoint", Self::UnsignedPayload => "aws_unsigned_payload", Self::Checksum => "aws_checksum_algorithm", + Self::ContainerCredentialsRelativeUri => { + "aws_container_credentials_relative_uri" + } Self::Client(opt) => opt.as_ref(), } } @@ -578,6 +587,9 @@ impl FromStr for AmazonS3ConfigKey { "aws_metadata_endpoint" | "metadata_endpoint" => Ok(Self::MetadataEndpoint), "aws_unsigned_payload" | "unsigned_payload" => Ok(Self::UnsignedPayload), "aws_checksum_algorithm" | "checksum_algorithm" => Ok(Self::Checksum), + "aws_container_credentials_relative_uri" => { + Ok(Self::ContainerCredentialsRelativeUri) + } // Backwards compatibility "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)), _ => match s.parse() { @@ -625,15 +637,6 @@ impl AmazonS3Builder { } } - // This env var is set in ECS - // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html - if let Ok(metadata_relative_uri) = - std::env::var("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") - { - builder.metadata_endpoint = - Some(format!("{ECS_METADATA_ENDPOINT}{metadata_relative_uri}")); - } - builder } @@ -691,6 +694,9 @@ impl AmazonS3Builder { AmazonS3ConfigKey::Checksum => { self.checksum_algorithm = Some(ConfigValue::Deferred(value.into())) } + AmazonS3ConfigKey::ContainerCredentialsRelativeUri => { + self.container_credentials_relative_uri = Some(value.into()) + } AmazonS3ConfigKey::Client(key) => { self.client_options = self.client_options.with_config(key, value) } @@ -758,6 +764,9 @@ impl AmazonS3Builder { self.checksum_algorithm.as_ref().map(ToString::to_string) } AmazonS3ConfigKey::Client(key) => self.client_options.get_config_value(key), + AmazonS3ConfigKey::ContainerCredentialsRelativeUri => { + self.container_credentials_relative_uri.clone() + } } } @@ -999,6 +1008,15 @@ impl AmazonS3Builder { client, self.retry_config.clone(), )) as _ + } else if let Some(uri) = self.container_credentials_relative_uri { + info!("Using Task credential provider"); + Arc::new(TaskCredentialProvider { + url: format!("http://169.254.170.2{uri}"), + retry: self.retry_config.clone(), + // The instance metadata endpoint is access over HTTP + client: self.client_options.clone().with_allow_http(true).client()?, + cache: Default::default(), + }) as _ } else { info!("Using Instance credential provider"); @@ -1199,9 +1217,10 @@ mod tests { assert_eq!(builder.endpoint.unwrap(), aws_endpoint); assert_eq!(builder.token.unwrap(), aws_session_token); - let metadata_uri = - format!("{ECS_METADATA_ENDPOINT}{container_creds_relative_uri}"); - assert_eq!(builder.metadata_endpoint.unwrap(), metadata_uri); + assert_eq!( + builder.container_credentials_relative_uri.unwrap(), + container_creds_relative_uri + ); assert_eq!( builder.checksum_algorithm.unwrap().get().unwrap(), Checksum::SHA256