Skip to content

smithy-aws-core

aio

protocols

RestJsonClientProtocol

Bases: HttpBindingClientProtocol

An implementation of the aws.protocols#restJson1 protocol.

Source code in packages/smithy-aws-core/src/smithy_aws_core/aio/protocols.py
class RestJsonClientProtocol(HttpBindingClientProtocol):
    """An implementation of the aws.protocols#restJson1 protocol."""

    _id: Final = RestJson1Trait.id
    _contentType: Final = "application/json"
    _error_identifier: Final = AWSErrorIdentifier()

    def __init__(self, service_schema: Schema) -> None:
        """Initialize a RestJsonClientProtocol.

        :param service: The schema for the service to interact with.
        """
        self._codec: Final = JSONCodec(
            document_class=AWSJSONDocument,
            default_namespace=service_schema.id.namespace,
            default_timestamp_format=TimestampFormat.EPOCH_SECONDS,
        )

    @property
    def id(self) -> ShapeID:
        return self._id

    @property
    def payload_codec(self) -> Codec:
        return self._codec

    @property
    def content_type(self) -> str:
        return self._contentType

    @property
    def error_identifier(self) -> HTTPErrorIdentifier:
        return self._error_identifier

    def create_event_publisher[
        OperationInput: SerializeableShape,
        OperationOutput: DeserializeableShape,
        Event: SerializeableShape,
    ](
        self,
        *,
        operation: "APIOperation[OperationInput, OperationOutput]",
        request: HTTPRequest,
        event_type: "TypeForm[Event]",
        context: TypedProperties,
        auth_scheme: AuthScheme[Any, Any, Any, Any] | None = None,
    ) -> EventPublisher[Event]:
        _assert_event_stream_capable()
        signing_config: SigningConfig | None = None
        if auth_scheme is not None:
            event_signer = auth_scheme.event_signer(request=request)
            if event_signer is not None:
                signing_config = SigningConfig(
                    signer=event_signer,
                    signing_properties=auth_scheme.signer_properties(context=context),
                    identity_resolver=auth_scheme.identity_resolver(context=context),
                    identity_properties=auth_scheme.identity_properties(
                        context=context
                    ),
                )

        # The HTTP body must be an async writeable. The HTTP serializers are responsible
        # for ensuring this.
        body = request.body
        if not isinstance(body, AsyncWriter) or not iscoroutinefunction(body.write):
            raise UnsupportedStreamError(
                "Input streams require an async write function, but none was present "
                "on the serialized HTTP request."
            )

        return AWSEventPublisher[Event](
            payload_codec=self.payload_codec,
            async_writer=body,
            signing_config=signing_config,
        )

    def create_event_receiver[
        OperationInput: SerializeableShape,
        OperationOutput: DeserializeableShape,
        Event: DeserializeableShape,
    ](
        self,
        *,
        operation: "APIOperation[OperationInput, OperationOutput]",
        request: HTTPRequest,
        response: HTTPResponse,
        event_type: "TypeForm[Event]",
        event_deserializer: Callable[[ShapeDeserializer], Event],
        context: TypedProperties,
    ) -> EventReceiver[Event]:
        _assert_event_stream_capable()
        return AWSEventReceiver(
            payload_codec=self.payload_codec,
            source=AsyncBytesReader(response.body),
            deserializer=event_deserializer,
        )
__init__(service_schema)

Initialize a RestJsonClientProtocol.

:param service: The schema for the service to interact with.

Source code in packages/smithy-aws-core/src/smithy_aws_core/aio/protocols.py
def __init__(self, service_schema: Schema) -> None:
    """Initialize a RestJsonClientProtocol.

    :param service: The schema for the service to interact with.
    """
    self._codec: Final = JSONCodec(
        document_class=AWSJSONDocument,
        default_namespace=service_schema.id.namespace,
        default_timestamp_format=TimestampFormat.EPOCH_SECONDS,
    )

auth

SigV4AuthScheme

Bases: AuthScheme[HTTPRequest, AWSCredentialsIdentity, AWSIdentityProperties, SigV4SigningProperties]

SigV4 AuthScheme.

Source code in packages/smithy-aws-core/src/smithy_aws_core/auth/sigv4.py
class SigV4AuthScheme(
    AuthScheme[
        HTTPRequest,
        AWSCredentialsIdentity,
        AWSIdentityProperties,
        SigV4SigningProperties,
    ]
):
    """SigV4 AuthScheme."""

    scheme_id = SigV4Trait.id
    _signer: SigV4Signer

    def __init__(
        self,
        *,
        service: str,
        signer: SigV4Signer | None = None,
    ) -> None:
        """Constructor.

        :param identity_resolver: The identity resolver to extract the api key identity.
        :param signer: The signer used to sign the request.
        """
        # TODO: There are type mismatches in the signature of the "sign" method.
        # The issues seems to be that it's not using protocols in its signature
        self._signer = signer or AsyncSigV4Signer()  # type: ignore
        self._service = service

    def identity_properties(
        self, *, context: _TypedProperties
    ) -> AWSIdentityProperties:
        config = context[AWS_IDENTITY_CONFIG]
        return {
            "access_key_id": config.aws_access_key_id,
            "secret_access_key": config.aws_secret_access_key,
            "session_token": config.aws_session_token,
        }

    def identity_resolver(self, *, context: _TypedProperties) -> AWSCredentialsResolver:
        config = context.get(SIGV4_CONFIG)
        if config is None or config.aws_credentials_identity_resolver is None:
            raise SmithyIdentityError(
                "Attempted to use SigV4 auth, but aws_credentials_identity_resolver was not "
                "set on the config."
            )
        return config.aws_credentials_identity_resolver

    def signer_properties(self, *, context: _TypedProperties) -> SigV4SigningProperties:
        config = context.get(SIGV4_CONFIG)
        if config is None or config.region is None:
            raise SmithyIdentityError(
                "Attempted to use SigV4 auth, but region was not set on the config."
            )
        return {
            "region": config.region,
            "service": self._service,
        }

    def signer(self) -> SigV4Signer:
        return self._signer

    def event_signer(
        self, *, request: HTTPRequest
    ) -> EventSigner[AWSCredentialsIdentity, SigV4SigningProperties] | None:
        if not HAS_EVENT_STREAM:
            return None

        auth_value = request.fields["Authorization"].as_string()
        signature: str = re.split("Signature=", auth_value)[-1]

        return AsyncEventSigner(
            initial_signature=signature.encode("utf-8"),
            event_encoder_cls=EventHeaderEncoder,
        )

    @classmethod
    def from_trait(cls, trait: SigV4Trait, /) -> Self:
        return cls(service=trait.name)

__init__(*, service, signer=None)

Constructor.

:param identity_resolver: The identity resolver to extract the api key identity. :param signer: The signer used to sign the request.

Source code in packages/smithy-aws-core/src/smithy_aws_core/auth/sigv4.py
def __init__(
    self,
    *,
    service: str,
    signer: SigV4Signer | None = None,
) -> None:
    """Constructor.

    :param identity_resolver: The identity resolver to extract the api key identity.
    :param signer: The signer used to sign the request.
    """
    # TODO: There are type mismatches in the signature of the "sign" method.
    # The issues seems to be that it's not using protocols in its signature
    self._signer = signer or AsyncSigV4Signer()  # type: ignore
    self._service = service

sigv4

SigV4AuthScheme

Bases: AuthScheme[HTTPRequest, AWSCredentialsIdentity, AWSIdentityProperties, SigV4SigningProperties]

SigV4 AuthScheme.

Source code in packages/smithy-aws-core/src/smithy_aws_core/auth/sigv4.py
class SigV4AuthScheme(
    AuthScheme[
        HTTPRequest,
        AWSCredentialsIdentity,
        AWSIdentityProperties,
        SigV4SigningProperties,
    ]
):
    """SigV4 AuthScheme."""

    scheme_id = SigV4Trait.id
    _signer: SigV4Signer

    def __init__(
        self,
        *,
        service: str,
        signer: SigV4Signer | None = None,
    ) -> None:
        """Constructor.

        :param identity_resolver: The identity resolver to extract the api key identity.
        :param signer: The signer used to sign the request.
        """
        # TODO: There are type mismatches in the signature of the "sign" method.
        # The issues seems to be that it's not using protocols in its signature
        self._signer = signer or AsyncSigV4Signer()  # type: ignore
        self._service = service

    def identity_properties(
        self, *, context: _TypedProperties
    ) -> AWSIdentityProperties:
        config = context[AWS_IDENTITY_CONFIG]
        return {
            "access_key_id": config.aws_access_key_id,
            "secret_access_key": config.aws_secret_access_key,
            "session_token": config.aws_session_token,
        }

    def identity_resolver(self, *, context: _TypedProperties) -> AWSCredentialsResolver:
        config = context.get(SIGV4_CONFIG)
        if config is None or config.aws_credentials_identity_resolver is None:
            raise SmithyIdentityError(
                "Attempted to use SigV4 auth, but aws_credentials_identity_resolver was not "
                "set on the config."
            )
        return config.aws_credentials_identity_resolver

    def signer_properties(self, *, context: _TypedProperties) -> SigV4SigningProperties:
        config = context.get(SIGV4_CONFIG)
        if config is None or config.region is None:
            raise SmithyIdentityError(
                "Attempted to use SigV4 auth, but region was not set on the config."
            )
        return {
            "region": config.region,
            "service": self._service,
        }

    def signer(self) -> SigV4Signer:
        return self._signer

    def event_signer(
        self, *, request: HTTPRequest
    ) -> EventSigner[AWSCredentialsIdentity, SigV4SigningProperties] | None:
        if not HAS_EVENT_STREAM:
            return None

        auth_value = request.fields["Authorization"].as_string()
        signature: str = re.split("Signature=", auth_value)[-1]

        return AsyncEventSigner(
            initial_signature=signature.encode("utf-8"),
            event_encoder_cls=EventHeaderEncoder,
        )

    @classmethod
    def from_trait(cls, trait: SigV4Trait, /) -> Self:
        return cls(service=trait.name)
__init__(*, service, signer=None)

Constructor.

:param identity_resolver: The identity resolver to extract the api key identity. :param signer: The signer used to sign the request.

Source code in packages/smithy-aws-core/src/smithy_aws_core/auth/sigv4.py
def __init__(
    self,
    *,
    service: str,
    signer: SigV4Signer | None = None,
) -> None:
    """Constructor.

    :param identity_resolver: The identity resolver to extract the api key identity.
    :param signer: The signer used to sign the request.
    """
    # TODO: There are type mismatches in the signature of the "sign" method.
    # The issues seems to be that it's not using protocols in its signature
    self._signer = signer or AsyncSigV4Signer()  # type: ignore
    self._service = service

endpoints

REGIONAL_ENDPOINT_CONFIG = PropertyKey(key='config', value_type=RegionalEndpointConfig) module-attribute

Endpoint config for services with standard regional endpoints.

RegionalEndpointConfig

Bases: StaticEndpointConfig

Endpoint config for services with standard regional endpoints.

Source code in packages/smithy-aws-core/src/smithy_aws_core/endpoints/__init__.py
class RegionalEndpointConfig(StaticEndpointConfig):
    """Endpoint config for services with standard regional endpoints."""

    region: str | None
    """The AWS region to address the request to."""

region instance-attribute

The AWS region to address the request to.

standard_regional

StandardRegionalEndpointsResolver

Bases: EndpointResolver

Resolves endpoints for services with standard regional endpoints.

Source code in packages/smithy-aws-core/src/smithy_aws_core/endpoints/standard_regional.py
class StandardRegionalEndpointsResolver(EndpointResolver):
    """Resolves endpoints for services with standard regional endpoints."""

    def __init__(self, endpoint_prefix: str = "bedrock-runtime"):
        self._endpoint_prefix = endpoint_prefix

    async def resolve_endpoint(self, params: EndpointResolverParams[Any]) -> Endpoint:
        if (static_uri := resolve_static_uri(params)) is not None:
            return Endpoint(uri=static_uri)

        region_config = params.context.get(REGIONAL_ENDPOINT_CONFIG)
        if region_config is not None and region_config.region is not None:
            # TODO: use dns suffix determined from partition metadata
            dns_suffix = "amazonaws.com"
            hostname = f"{self._endpoint_prefix}.{region_config.region}.{dns_suffix}"

            return Endpoint(uri=URI(host=hostname))

        raise EndpointResolutionError(
            "Unable to resolve endpoint - either endpoint_url or region are required."
        )

identity

AWSCredentialsIdentity dataclass

Bases: Identity

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/components.py
@dataclass(kw_only=True)
class AWSCredentialsIdentity(Identity):
    access_key_id: str
    """A unique identifier for an AWS user or role."""

    secret_access_key: str
    """A secret key used in conjunction with the access key ID to authenticate
    programmatic access to AWS services."""

    session_token: str | None = None
    """A temporary token used to specify the current session for the supplied
    credentials."""

    expiration: datetime | None = None
    """The expiration time of the identity.

    If time zone is provided, it is updated to UTC. The value must always be in UTC.
    """

    account_id: str | None = None
    """The AWS account's ID."""

access_key_id instance-attribute

A unique identifier for an AWS user or role.

account_id = None class-attribute instance-attribute

The AWS account's ID.

expiration = None class-attribute instance-attribute

The expiration time of the identity.

If time zone is provided, it is updated to UTC. The value must always be in UTC.

secret_access_key instance-attribute

A secret key used in conjunction with the access key ID to authenticate programmatic access to AWS services.

session_token = None class-attribute instance-attribute

A temporary token used to specify the current session for the supplied credentials.

ContainerCredentialsResolver

Bases: IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]

Resolves AWS Credentials from container credential sources.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/container.py
class ContainerCredentialsResolver(
    IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]
):
    """Resolves AWS Credentials from container credential sources."""

    ENV_VAR = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"
    ENV_VAR_FULL = "AWS_CONTAINER_CREDENTIALS_FULL_URI"
    ENV_VAR_AUTH_TOKEN = "AWS_CONTAINER_AUTHORIZATION_TOKEN"  # noqa: S105
    ENV_VAR_AUTH_TOKEN_FILE = "AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE"  # noqa: S105

    def __init__(
        self,
        http_client: HTTPClient,
        config: ContainerCredentialsConfig | None = None,
    ):
        self._http_client = http_client
        self._config = config or ContainerCredentialsConfig()
        self._client = ContainerMetadataClient(http_client, self._config)
        self._credentials = None

    async def _resolve_uri_from_env(self) -> URI:
        if self.ENV_VAR in os.environ:
            return URI(
                scheme="http",
                host=_CONTAINER_METADATA_IP,
                path=os.environ[self.ENV_VAR],
            )
        elif self.ENV_VAR_FULL in os.environ:
            parsed = urlparse(os.environ[self.ENV_VAR_FULL])
            return URI(
                scheme=parsed.scheme,
                host=parsed.hostname or "",
                port=parsed.port,
                path=parsed.path,
            )
        else:
            raise SmithyIdentityError(
                f"Neither {self.ENV_VAR} or {self.ENV_VAR_FULL} environment "
                "variables are set. Unable to resolve credentials."
            )

    async def _resolve_fields_from_env(self) -> Fields:
        fields = Fields()
        if self.ENV_VAR_AUTH_TOKEN_FILE in os.environ:
            try:
                filename = os.environ[self.ENV_VAR_AUTH_TOKEN_FILE]
                auth_token = await asyncio.to_thread(self._read_file, filename)
            except (FileNotFoundError, PermissionError) as e:
                raise SmithyIdentityError(
                    f"Unable to open {os.environ[self.ENV_VAR_AUTH_TOKEN_FILE]}."
                ) from e

            fields.set_field(Field(name="Authorization", values=[auth_token]))
        elif self.ENV_VAR_AUTH_TOKEN in os.environ:
            auth_token = os.environ[self.ENV_VAR_AUTH_TOKEN]
            fields.set_field(Field(name="Authorization", values=[auth_token]))

        return fields

    def _read_file(self, filename: str) -> str:
        with open(filename) as f:
            try:
                return f.read().strip()
            except UnicodeDecodeError as e:
                raise SmithyIdentityError(
                    f"Unable to read valid utf-8 bytes from {filename}."
                ) from e

    async def get_identity(
        self, *, properties: AWSIdentityProperties
    ) -> AWSCredentialsIdentity:
        if (
            self._credentials is not None
            and self._credentials.expiration
            and datetime.now(UTC) < self._credentials.expiration
        ):
            return self._credentials

        uri = await self._resolve_uri_from_env()
        fields = await self._resolve_fields_from_env()
        creds = await self._client.get_credentials(uri, fields)

        access_key_id = creds.get("AccessKeyId")
        secret_access_key = creds.get("SecretAccessKey")
        session_token = creds.get("Token")
        expiration = creds.get("Expiration")
        account_id = creds.get("AccountId")

        if isinstance(expiration, str):
            expiration = datetime.fromisoformat(expiration).replace(tzinfo=UTC)

        if access_key_id is None or secret_access_key is None:
            raise SmithyIdentityError(
                "AccessKeyId and SecretAccessKey are required for container credentials"
            )

        self._credentials = AWSCredentialsIdentity(
            access_key_id=access_key_id,
            secret_access_key=secret_access_key,
            session_token=session_token,
            expiration=expiration,
            account_id=account_id,
        )
        return self._credentials

EnvironmentCredentialsResolver

Bases: IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]

Resolves AWS Credentials from system environment variables.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/environment.py
class EnvironmentCredentialsResolver(
    IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]
):
    """Resolves AWS Credentials from system environment variables."""

    def __init__(self):
        self._credentials = None

    async def get_identity(
        self, *, properties: AWSIdentityProperties
    ) -> AWSCredentialsIdentity:
        if self._credentials is not None:
            return self._credentials

        access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
        secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
        session_token = os.getenv("AWS_SESSION_TOKEN")
        account_id = os.getenv("AWS_ACCOUNT_ID")

        if access_key_id is None or secret_access_key is None:
            raise SmithyIdentityError(
                "AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are required"
            )

        self._credentials = AWSCredentialsIdentity(
            access_key_id=access_key_id,
            secret_access_key=secret_access_key,
            session_token=session_token,
            account_id=account_id,
        )

        return self._credentials

IMDSCredentialsResolver

Bases: IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]

Resolves AWS Credentials from an EC2 Instance Metadata Service (IMDS) client.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/imds.py
class IMDSCredentialsResolver(
    IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]
):
    """Resolves AWS Credentials from an EC2 Instance Metadata Service (IMDS) client."""

    _METADATA_PATH_BASE = "/latest/meta-data/iam/security-credentials"

    def __init__(self, http_client: HTTPClient, config: Config | None = None):
        # TODO: Respect IMDS specific config values from aws shared config file and environment.
        self._http_client = http_client
        self._ec2_metadata_client = EC2Metadata(http_client=http_client, config=config)
        self._config = config or Config()
        self._credentials = None
        self._profile_name = self._config.ec2_instance_profile_name

    async def get_identity(
        self, *, properties: AWSIdentityProperties
    ) -> AWSCredentialsIdentity:
        if (
            self._credentials is not None
            and self._credentials.expiration
            and datetime.now(UTC) < self._credentials.expiration
        ):
            return self._credentials

        profile = self._profile_name
        if profile is None:
            profile = await self._ec2_metadata_client.get(path=self._METADATA_PATH_BASE)

        creds_str = await self._ec2_metadata_client.get(
            path=f"{self._METADATA_PATH_BASE}/{profile}"
        )
        creds = json.loads(creds_str)

        access_key_id = creds.get("AccessKeyId")
        secret_access_key = creds.get("SecretAccessKey")
        session_token = creds.get("Token")
        account_id = creds.get("AccountId")
        expiration = creds.get("Expiration")
        if expiration is not None:
            expiration = datetime.fromisoformat(expiration).replace(tzinfo=UTC)

        if access_key_id is None or secret_access_key is None:
            raise SmithyIdentityError("AccessKeyId and SecretAccessKey are required")

        self._credentials = AWSCredentialsIdentity(
            access_key_id=access_key_id,
            secret_access_key=secret_access_key,
            session_token=session_token,
            expiration=expiration,
            account_id=account_id,
        )
        return self._credentials

StaticCredentialsResolver

Bases: IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]

Resolve Static AWS Credentials.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/static.py
class StaticCredentialsResolver(
    IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]
):
    """Resolve Static AWS Credentials."""

    async def get_identity(
        self, *, properties: AWSIdentityProperties
    ) -> AWSCredentialsIdentity:
        access_key_id = properties.get("access_key_id")
        secret_access_key = properties.get("secret_access_key")
        if access_key_id is not None and secret_access_key is not None:
            return AWSCredentialsIdentity(
                access_key_id=access_key_id,
                secret_access_key=secret_access_key,
                session_token=properties.get("session_token"),
            )
        raise SmithyIdentityError(
            "Attempted to resolve AWS crendentials from config, but credentials weren't configured."
        )

chain

create_default_chain(http_client)

Creates the default AWS credential provider chain.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/chain.py
def create_default_chain(http_client: HTTPClient) -> AWSCredentialsResolver:
    """Creates the default AWS credential provider chain."""
    return ChainedIdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties](
        resolvers=(
            StaticCredentialsResolver(),
            EnvironmentCredentialsResolver(),
            IMDSCredentialsResolver(http_client=http_client),
        )
    )

components

AWSCredentialsIdentity dataclass

Bases: Identity

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/components.py
@dataclass(kw_only=True)
class AWSCredentialsIdentity(Identity):
    access_key_id: str
    """A unique identifier for an AWS user or role."""

    secret_access_key: str
    """A secret key used in conjunction with the access key ID to authenticate
    programmatic access to AWS services."""

    session_token: str | None = None
    """A temporary token used to specify the current session for the supplied
    credentials."""

    expiration: datetime | None = None
    """The expiration time of the identity.

    If time zone is provided, it is updated to UTC. The value must always be in UTC.
    """

    account_id: str | None = None
    """The AWS account's ID."""
access_key_id instance-attribute

A unique identifier for an AWS user or role.

account_id = None class-attribute instance-attribute

The AWS account's ID.

expiration = None class-attribute instance-attribute

The expiration time of the identity.

If time zone is provided, it is updated to UTC. The value must always be in UTC.

secret_access_key instance-attribute

A secret key used in conjunction with the access key ID to authenticate programmatic access to AWS services.

session_token = None class-attribute instance-attribute

A temporary token used to specify the current session for the supplied credentials.

container

ContainerCredentialsConfig dataclass

Configuration for container credential retrieval operations.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/container.py
@dataclass
class ContainerCredentialsConfig:
    """Configuration for container credential retrieval operations."""

    timeout: int = _DEFAULT_TIMEOUT
    retries: int = _DEFAULT_RETRIES

ContainerCredentialsResolver

Bases: IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]

Resolves AWS Credentials from container credential sources.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/container.py
class ContainerCredentialsResolver(
    IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]
):
    """Resolves AWS Credentials from container credential sources."""

    ENV_VAR = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"
    ENV_VAR_FULL = "AWS_CONTAINER_CREDENTIALS_FULL_URI"
    ENV_VAR_AUTH_TOKEN = "AWS_CONTAINER_AUTHORIZATION_TOKEN"  # noqa: S105
    ENV_VAR_AUTH_TOKEN_FILE = "AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE"  # noqa: S105

    def __init__(
        self,
        http_client: HTTPClient,
        config: ContainerCredentialsConfig | None = None,
    ):
        self._http_client = http_client
        self._config = config or ContainerCredentialsConfig()
        self._client = ContainerMetadataClient(http_client, self._config)
        self._credentials = None

    async def _resolve_uri_from_env(self) -> URI:
        if self.ENV_VAR in os.environ:
            return URI(
                scheme="http",
                host=_CONTAINER_METADATA_IP,
                path=os.environ[self.ENV_VAR],
            )
        elif self.ENV_VAR_FULL in os.environ:
            parsed = urlparse(os.environ[self.ENV_VAR_FULL])
            return URI(
                scheme=parsed.scheme,
                host=parsed.hostname or "",
                port=parsed.port,
                path=parsed.path,
            )
        else:
            raise SmithyIdentityError(
                f"Neither {self.ENV_VAR} or {self.ENV_VAR_FULL} environment "
                "variables are set. Unable to resolve credentials."
            )

    async def _resolve_fields_from_env(self) -> Fields:
        fields = Fields()
        if self.ENV_VAR_AUTH_TOKEN_FILE in os.environ:
            try:
                filename = os.environ[self.ENV_VAR_AUTH_TOKEN_FILE]
                auth_token = await asyncio.to_thread(self._read_file, filename)
            except (FileNotFoundError, PermissionError) as e:
                raise SmithyIdentityError(
                    f"Unable to open {os.environ[self.ENV_VAR_AUTH_TOKEN_FILE]}."
                ) from e

            fields.set_field(Field(name="Authorization", values=[auth_token]))
        elif self.ENV_VAR_AUTH_TOKEN in os.environ:
            auth_token = os.environ[self.ENV_VAR_AUTH_TOKEN]
            fields.set_field(Field(name="Authorization", values=[auth_token]))

        return fields

    def _read_file(self, filename: str) -> str:
        with open(filename) as f:
            try:
                return f.read().strip()
            except UnicodeDecodeError as e:
                raise SmithyIdentityError(
                    f"Unable to read valid utf-8 bytes from {filename}."
                ) from e

    async def get_identity(
        self, *, properties: AWSIdentityProperties
    ) -> AWSCredentialsIdentity:
        if (
            self._credentials is not None
            and self._credentials.expiration
            and datetime.now(UTC) < self._credentials.expiration
        ):
            return self._credentials

        uri = await self._resolve_uri_from_env()
        fields = await self._resolve_fields_from_env()
        creds = await self._client.get_credentials(uri, fields)

        access_key_id = creds.get("AccessKeyId")
        secret_access_key = creds.get("SecretAccessKey")
        session_token = creds.get("Token")
        expiration = creds.get("Expiration")
        account_id = creds.get("AccountId")

        if isinstance(expiration, str):
            expiration = datetime.fromisoformat(expiration).replace(tzinfo=UTC)

        if access_key_id is None or secret_access_key is None:
            raise SmithyIdentityError(
                "AccessKeyId and SecretAccessKey are required for container credentials"
            )

        self._credentials = AWSCredentialsIdentity(
            access_key_id=access_key_id,
            secret_access_key=secret_access_key,
            session_token=session_token,
            expiration=expiration,
            account_id=account_id,
        )
        return self._credentials

ContainerMetadataClient

Client for remote credential retrieval in Container environments like ECS/EKS.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/container.py
class ContainerMetadataClient:
    """Client for remote credential retrieval in Container environments like ECS/EKS."""

    def __init__(self, http_client: HTTPClient, config: ContainerCredentialsConfig):
        self._http_client = http_client
        self._config = config

    def _validate_allowed_url(self, uri: URI) -> None:
        if self._is_loopback(uri.host):
            return

        if not self._is_allowed_container_metadata_host(uri.host):
            raise SmithyIdentityError(
                f"Unsupported host '{uri.host}'. "
                f"Can only retrieve metadata from a loopback address or "
                f"one of: {', '.join(_CONTAINER_METADATA_ALLOWED_HOSTS)}"
            )

    async def get_credentials(self, uri: URI, fields: Fields) -> dict[str, str]:
        self._validate_allowed_url(uri)
        fields.set_field(Field(name="Accept", values=["application/json"]))

        attempts = 0
        last_exc = None
        while attempts < self._config.retries:
            try:
                request = HTTPRequest(
                    method="GET",
                    destination=uri,
                    fields=fields,
                )
                response: HTTPResponse = await self._http_client.send(request)
                body = await response.consume_body_async()
                if response.status != 200:
                    raise SmithyIdentityError(
                        f"Container metadata service returned {response.status}: "
                        f"{body.decode('utf-8')}"
                    )
                try:
                    return json.loads(body.decode("utf-8"))
                except Exception as e:
                    raise SmithyIdentityError(
                        f"Unable to parse JSON from container metadata: {body.decode('utf-8')}"
                    ) from e
            except Exception as e:
                last_exc = e
                await asyncio.sleep(_SLEEP_SECONDS)
                attempts += 1

        raise SmithyIdentityError(
            f"Failed to retrieve container metadata after {self._config.retries} attempt(s)"
        ) from last_exc

    def _is_loopback(self, hostname: str) -> bool:
        try:
            return ipaddress.ip_address(hostname).is_loopback
        except ValueError:
            return False

    def _is_allowed_container_metadata_host(self, hostname: str) -> bool:
        return hostname in _CONTAINER_METADATA_ALLOWED_HOSTS

environment

EnvironmentCredentialsResolver

Bases: IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]

Resolves AWS Credentials from system environment variables.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/environment.py
class EnvironmentCredentialsResolver(
    IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]
):
    """Resolves AWS Credentials from system environment variables."""

    def __init__(self):
        self._credentials = None

    async def get_identity(
        self, *, properties: AWSIdentityProperties
    ) -> AWSCredentialsIdentity:
        if self._credentials is not None:
            return self._credentials

        access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
        secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
        session_token = os.getenv("AWS_SESSION_TOKEN")
        account_id = os.getenv("AWS_ACCOUNT_ID")

        if access_key_id is None or secret_access_key is None:
            raise SmithyIdentityError(
                "AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are required"
            )

        self._credentials = AWSCredentialsIdentity(
            access_key_id=access_key_id,
            secret_access_key=secret_access_key,
            session_token=session_token,
            account_id=account_id,
        )

        return self._credentials

imds

Config dataclass

Configuration for EC2Metadata.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/imds.py
@dataclass(init=False)
class Config:
    """Configuration for EC2Metadata."""

    _HOST_MAPPING = MappingProxyType(
        {"IPv4": "169.254.169.254", "IPv6": "[fd00:ec2::254]"}
    )
    _MIN_TTL = 5
    _MAX_TTL = 21600

    retry_strategy: RetryStrategy
    endpoint_uri: URI
    endpoint_mode: Literal["IPv4", "IPv6"]
    token_ttl: int

    def __init__(
        self,
        *,
        retry_strategy: RetryStrategy | None = None,
        endpoint_uri: URI | None = None,
        endpoint_mode: Literal["IPv4", "IPv6"] = "IPv4",
        token_ttl: int = _MAX_TTL,
        ec2_instance_profile_name: str | None = None,
    ):
        #  TODO: Implement retries.
        self.retry_strategy = retry_strategy or SimpleRetryStrategy(max_attempts=3)
        self.endpoint_mode = endpoint_mode
        self.endpoint_uri = self._resolve_endpoint(endpoint_uri, endpoint_mode)
        self.token_ttl = self._validate_token_ttl(token_ttl)
        self.ec2_instance_profile_name = ec2_instance_profile_name

    def _validate_token_ttl(self, ttl: int) -> int:
        if not self._MIN_TTL <= ttl <= self._MAX_TTL:
            raise ValueError(
                f"Token TTL must be between {self._MIN_TTL} and {self._MAX_TTL} seconds."
            )
        return ttl

    def _resolve_endpoint(
        self, endpoint_uri: URI | None, endpoint_mode: Literal["IPv4", "IPv6"]
    ) -> URI:
        if endpoint_uri is not None:
            return endpoint_uri

        return URI(
            scheme="http",
            host=self._HOST_MAPPING.get(endpoint_mode, self._HOST_MAPPING["IPv4"]),
            port=80,
        )

IMDSCredentialsResolver

Bases: IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]

Resolves AWS Credentials from an EC2 Instance Metadata Service (IMDS) client.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/imds.py
class IMDSCredentialsResolver(
    IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]
):
    """Resolves AWS Credentials from an EC2 Instance Metadata Service (IMDS) client."""

    _METADATA_PATH_BASE = "/latest/meta-data/iam/security-credentials"

    def __init__(self, http_client: HTTPClient, config: Config | None = None):
        # TODO: Respect IMDS specific config values from aws shared config file and environment.
        self._http_client = http_client
        self._ec2_metadata_client = EC2Metadata(http_client=http_client, config=config)
        self._config = config or Config()
        self._credentials = None
        self._profile_name = self._config.ec2_instance_profile_name

    async def get_identity(
        self, *, properties: AWSIdentityProperties
    ) -> AWSCredentialsIdentity:
        if (
            self._credentials is not None
            and self._credentials.expiration
            and datetime.now(UTC) < self._credentials.expiration
        ):
            return self._credentials

        profile = self._profile_name
        if profile is None:
            profile = await self._ec2_metadata_client.get(path=self._METADATA_PATH_BASE)

        creds_str = await self._ec2_metadata_client.get(
            path=f"{self._METADATA_PATH_BASE}/{profile}"
        )
        creds = json.loads(creds_str)

        access_key_id = creds.get("AccessKeyId")
        secret_access_key = creds.get("SecretAccessKey")
        session_token = creds.get("Token")
        account_id = creds.get("AccountId")
        expiration = creds.get("Expiration")
        if expiration is not None:
            expiration = datetime.fromisoformat(expiration).replace(tzinfo=UTC)

        if access_key_id is None or secret_access_key is None:
            raise SmithyIdentityError("AccessKeyId and SecretAccessKey are required")

        self._credentials = AWSCredentialsIdentity(
            access_key_id=access_key_id,
            secret_access_key=secret_access_key,
            session_token=session_token,
            expiration=expiration,
            account_id=account_id,
        )
        return self._credentials

Token

Represents an IMDSv2 session token with a value and method for checking expiration.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/imds.py
class Token:
    """Represents an IMDSv2 session token with a value and method for checking
    expiration."""

    def __init__(self, value: str, ttl: int):
        self._value = value
        self._ttl = ttl
        self._created_time = datetime.now()

    def is_expired(self) -> bool:
        return datetime.now() - self._created_time >= timedelta(seconds=self._ttl)

    @property
    def value(self) -> str:
        return self._value

TokenCache

Holds the token needed to fetch instance metadata.

In addition, it knows how to refresh itself.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/imds.py
class TokenCache:
    """Holds the token needed to fetch instance metadata.

    In addition, it knows how to refresh itself.
    """

    _TOKEN_PATH = "/latest/api/token"  # noqa: S105

    def __init__(self, http_client: HTTPClient, config: Config):
        self._http_client = http_client
        self._config = config
        self._base_uri = config.endpoint_uri
        self._refresh_lock = asyncio.Lock()
        self._token = None

    def _should_refresh(self) -> bool:
        return self._token is None or self._token.is_expired()

    async def _refresh(self) -> None:
        async with self._refresh_lock:
            if not self._should_refresh():
                return
            headers = Fields(
                [
                    _USER_AGENT_FIELD,
                    Field(
                        name="x-aws-ec2-metadata-token-ttl-seconds",
                        values=[str(self._config.token_ttl)],
                    ),
                ]
            )
            request = HTTPRequest(
                method="PUT",
                destination=URI(
                    scheme=self._base_uri.scheme,
                    host=self._base_uri.host,
                    port=self._base_uri.port,
                    path=self._TOKEN_PATH,
                ),
                fields=headers,
            )
            response = await self._http_client.send(request)
            token_value = await response.consume_body_async()
            self._token = Token(token_value.decode("utf-8"), self._config.token_ttl)

    async def get_token(self) -> Token:
        if self._should_refresh():
            await self._refresh()
        assert self._token is not None  # noqa: S101
        return self._token

static

StaticCredentialsResolver

Bases: IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]

Resolve Static AWS Credentials.

Source code in packages/smithy-aws-core/src/smithy_aws_core/identity/static.py
class StaticCredentialsResolver(
    IdentityResolver[AWSCredentialsIdentity, AWSIdentityProperties]
):
    """Resolve Static AWS Credentials."""

    async def get_identity(
        self, *, properties: AWSIdentityProperties
    ) -> AWSCredentialsIdentity:
        access_key_id = properties.get("access_key_id")
        secret_access_key = properties.get("secret_access_key")
        if access_key_id is not None and secret_access_key is not None:
            return AWSCredentialsIdentity(
                access_key_id=access_key_id,
                secret_access_key=secret_access_key,
                session_token=properties.get("session_token"),
            )
        raise SmithyIdentityError(
            "Attempted to resolve AWS crendentials from config, but credentials weren't configured."
        )

interceptors

user_agent

UserAgentInterceptor

Bases: Interceptor[Any, Any, Any, Any]

Adds AWS fields to the UserAgent.

Source code in packages/smithy-aws-core/src/smithy_aws_core/interceptors/user_agent.py
class UserAgentInterceptor(Interceptor[Any, Any, Any, Any]):
    """Adds AWS fields to the UserAgent."""

    def __init__(
        self,
        *,
        ua_suffix: str | None,
        ua_app_id: str | None,
        sdk_version: str,
        service_id: str,
    ) -> None:
        """Initialize the UserAgentInterceptor.

        :param ua_suffix: Additional suffix to be added to the UserAgent header.
        :param ua_app_id: User defined and opaque application ID to be added to the
            UserAgent header.
        :param sdk_version: SDK version to be added to the UserAgent header.
        :param service_id: ServiceId to be added to the UserAgent header.
        """
        super().__init__()
        self._ua_suffix = ua_suffix
        self._ua_app_id = ua_app_id
        self._sdk_version = sdk_version
        self._service_id = service_id

    def read_after_serialization(self, context: RequestContext[Any, Any]) -> None:
        if USER_AGENT in context.properties:
            user_agent = context.properties[USER_AGENT]
            user_agent.sdk_metadata = self._build_sdk_metadata()
            user_agent.api_metadata.append(
                UserAgentComponent("api", self._service_id, self._sdk_version)
            )

            if self._ua_app_id is not None:
                user_agent.additional_metadata.append(
                    UserAgentComponent("app", self._ua_app_id)
                )

            if self._ua_suffix is not None:
                user_agent.additional_metadata.append(
                    RawStringUserAgentComponent(self._ua_suffix)
                )

    def _build_sdk_metadata(self) -> list[UserAgentComponent]:
        return [
            UserAgentComponent(_USERAGENT_SDK_NAME, __version__),
            UserAgentComponent("md", "smithy-core", smithy_core.__version__),
            *self._crt_version(),
        ]

    def _crt_version(self) -> list[UserAgentComponent]:
        try:
            import awscrt

            return [UserAgentComponent("md", "awscrt", awscrt.__version__)]
        except (ImportError, AttributeError):
            return []
__init__(*, ua_suffix, ua_app_id, sdk_version, service_id)

Initialize the UserAgentInterceptor.

:param ua_suffix: Additional suffix to be added to the UserAgent header. :param ua_app_id: User defined and opaque application ID to be added to the UserAgent header. :param sdk_version: SDK version to be added to the UserAgent header. :param service_id: ServiceId to be added to the UserAgent header.

Source code in packages/smithy-aws-core/src/smithy_aws_core/interceptors/user_agent.py
def __init__(
    self,
    *,
    ua_suffix: str | None,
    ua_app_id: str | None,
    sdk_version: str,
    service_id: str,
) -> None:
    """Initialize the UserAgentInterceptor.

    :param ua_suffix: Additional suffix to be added to the UserAgent header.
    :param ua_app_id: User defined and opaque application ID to be added to the
        UserAgent header.
    :param sdk_version: SDK version to be added to the UserAgent header.
    :param service_id: ServiceId to be added to the UserAgent header.
    """
    super().__init__()
    self._ua_suffix = ua_suffix
    self._ua_app_id = ua_app_id
    self._sdk_version = sdk_version
    self._service_id = service_id