Skip to content

aws-sdk-signers

AWS SDK Signers provides stand-alone signing functionality for use with HTTP tools such as AioHTTP, Curl, Postman, Requests, urllib3, etc.

AsyncBytesReader

A file-like object with an async read method.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_io.py
class AsyncBytesReader:
    """A file-like object with an async read method."""

    # BytesIO *is* a ByteStream, but mypy complains if it isn't here.
    _data: ByteStream | AsyncByteStream | AsyncIterable[bytes] | BytesIO | None
    _closed = False

    def __init__(self, data: StreamingBlob):
        """Initializes self.

        Data is read from the source on an as-needed basis and is not buffered.

        :param data: The source data to read from.
        """
        self._remainder = b""
        # pylint: disable-next=isinstance-second-argument-not-valid-type
        if isinstance(data, bytes | bytearray):
            self._data = BytesIO(data)
        else:
            self._data = data

    async def read(self, size: int | None = -1) -> bytes:
        """Read a number of bytes from the stream.

        :param size: The maximum number of bytes to read. If less than 0, all bytes will
            be read.
        """
        if self._closed or not self._data:
            raise ValueError("I/O operation on closed file.")

        if size is None:
            size = -1

        if isinstance(self._data, ByteStream) and not iscoroutinefunction(  # type: ignore - TODO(pyright)
            self._data.read
        ):
            # Python's runtime_checkable can't actually tell the difference between
            # sync and async, so we have to check ourselves.
            return self._data.read(size)

        if isinstance(self._data, AsyncByteStream):  # type: ignore - TODO(pyright)
            return await self._data.read(size)

        return await self._read_from_iterable(
            cast(AsyncIterable[bytes], self._data), size
        )

    async def _read_from_iterable(
        self, iterator: AsyncIterable[bytes], size: int
    ) -> bytes:
        # This takes the iterator as an arg here just to avoid mypy complaints, since
        # we know it's an iterator where this is called.
        result = self._remainder
        if size < 0:
            async for element in iterator:
                result += element
            self._remainder = b""
            return result

        async for element in iterator:
            result += element
            if len(result) >= size:
                break

        self._remainder = result[size:]
        return result[:size]

    def __aiter__(self) -> AsyncIterator[bytes]:
        return self.iter_chunks()

    def iter_chunks(
        self, chunk_size: int = _DEFAULT_CHUNK_SIZE
    ) -> AsyncIterator[bytes]:
        """Iterate over the reader in chunks of a given size.

        :param chunk_size: The maximum size of each chunk. If less than 0, the entire
            reader will be read into one chunk.
        """
        return _AsyncByteStreamIterator(self.read, chunk_size)

    def readable(self) -> bool:
        """Returns whether the stream is readable."""
        return True

    def writeable(self) -> bool:
        """Returns whether the stream is writeable."""
        return False

    def seekable(self) -> bool:
        """Returns whether the stream is seekable."""
        return False

    @property
    def closed(self) -> bool:
        """Returns whether the stream is closed."""
        return self._closed

    def close(self) -> None:
        """Closes the stream, as well as the underlying stream where possible."""
        if (close := getattr(self._data, "close", None)) is not None:
            close()
        self._data = None
        self._closed = True

closed property

Returns whether the stream is closed.

__init__(data)

Initializes self.

Data is read from the source on an as-needed basis and is not buffered.

:param data: The source data to read from.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_io.py
def __init__(self, data: StreamingBlob):
    """Initializes self.

    Data is read from the source on an as-needed basis and is not buffered.

    :param data: The source data to read from.
    """
    self._remainder = b""
    # pylint: disable-next=isinstance-second-argument-not-valid-type
    if isinstance(data, bytes | bytearray):
        self._data = BytesIO(data)
    else:
        self._data = data

close()

Closes the stream, as well as the underlying stream where possible.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_io.py
def close(self) -> None:
    """Closes the stream, as well as the underlying stream where possible."""
    if (close := getattr(self._data, "close", None)) is not None:
        close()
    self._data = None
    self._closed = True

iter_chunks(chunk_size=_DEFAULT_CHUNK_SIZE)

Iterate over the reader in chunks of a given size.

:param chunk_size: The maximum size of each chunk. If less than 0, the entire reader will be read into one chunk.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_io.py
def iter_chunks(
    self, chunk_size: int = _DEFAULT_CHUNK_SIZE
) -> AsyncIterator[bytes]:
    """Iterate over the reader in chunks of a given size.

    :param chunk_size: The maximum size of each chunk. If less than 0, the entire
        reader will be read into one chunk.
    """
    return _AsyncByteStreamIterator(self.read, chunk_size)

read(size=-1) async

Read a number of bytes from the stream.

:param size: The maximum number of bytes to read. If less than 0, all bytes will be read.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_io.py
async def read(self, size: int | None = -1) -> bytes:
    """Read a number of bytes from the stream.

    :param size: The maximum number of bytes to read. If less than 0, all bytes will
        be read.
    """
    if self._closed or not self._data:
        raise ValueError("I/O operation on closed file.")

    if size is None:
        size = -1

    if isinstance(self._data, ByteStream) and not iscoroutinefunction(  # type: ignore - TODO(pyright)
        self._data.read
    ):
        # Python's runtime_checkable can't actually tell the difference between
        # sync and async, so we have to check ourselves.
        return self._data.read(size)

    if isinstance(self._data, AsyncByteStream):  # type: ignore - TODO(pyright)
        return await self._data.read(size)

    return await self._read_from_iterable(
        cast(AsyncIterable[bytes], self._data), size
    )

readable()

Returns whether the stream is readable.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_io.py
def readable(self) -> bool:
    """Returns whether the stream is readable."""
    return True

seekable()

Returns whether the stream is seekable.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_io.py
def seekable(self) -> bool:
    """Returns whether the stream is seekable."""
    return False

writeable()

Returns whether the stream is writeable.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_io.py
def writeable(self) -> bool:
    """Returns whether the stream is writeable."""
    return False

AsyncSigV4Signer

Request signer for applying the AWS Signature Version 4 algorithm.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
class AsyncSigV4Signer:
    """Request signer for applying the AWS Signature Version 4 algorithm."""

    async def sign(
        self,
        *,
        request: AWSRequest,
        identity: _AWSCredentialsIdentity,
        properties: SigV4SigningProperties,
    ) -> AWSRequest:
        """Generate and apply a SigV4 Signature to a copy of the supplied request.

        :param request: An AWSRequest to sign prior to sending to the service.
        :param identity: A set of credentials representing an AWS Identity or role
            capacity.
        :param properties: SigV4SigningProperties to define signing primitives such as
            the target service, region, and date.
        """
        # Copy and prepopulate any missing values in the
        # supplied request and signing properties.

        await self._validate_identity(identity=identity)
        new_signing_properties = await self._normalize_signing_properties(
            signing_properties=properties
        )
        new_request = await self._generate_new_request(request=request)
        await self._apply_required_fields(
            request=new_request,
            signing_properties=new_signing_properties,
            identity=identity,
        )

        # Construct core signing components
        canonical_request = await self.canonical_request(
            signing_properties=properties,
            request=new_request,
        )
        string_to_sign = await self.string_to_sign(
            canonical_request=canonical_request,
            signing_properties=new_signing_properties,
        )
        signature = await self._signature(
            string_to_sign=string_to_sign,
            secret_key=identity.secret_access_key,
            signing_properties=new_signing_properties,
        )

        signing_fields = await self._normalize_signing_fields(request=new_request)
        credential_scope = await self._scope(signing_properties=new_signing_properties)
        credential = f"{identity.access_key_id}/{credential_scope}"
        authorization = await self.generate_authorization_field(
            credential=credential,
            signed_headers=list(signing_fields.keys()),
            signature=signature,
        )
        new_request.fields.set_field(authorization)
        return new_request

    async def generate_authorization_field(
        self, *, credential: str, signed_headers: list[str], signature: str
    ) -> Field:
        """Generate the `Authorization` field.

        :param credential:
            Credential scope string for generating the Authorization header.
            Defined as:
                <access_key>/<date>/<region>/<service>/<request_type>
        :param signed_headers:
            A list of the field names used in signing.
        :param signature:
            Final hash of the SigV4 signing algorithm generated from the
            canonical request and string to sign.
        """
        signed_headers_str = ";".join(signed_headers)
        auth_str = (
            f"AWS4-HMAC-SHA256 Credential={credential}, "
            f"SignedHeaders={signed_headers_str}, Signature={signature}"
        )
        return Field(name="Authorization", values=[auth_str])

    async def _signature(
        self,
        *,
        string_to_sign: str,
        secret_key: str,
        signing_properties: SigV4SigningProperties,
    ) -> str:
        """Sign the string to sign.

        In SigV4, a signing key is created that is scoped to a specific region and
        service. The date, region, service and resulting signing key are individually
        hashed, then the composite hash is used to sign the string to sign.
        """

        # Components of Signing Key Calculation
        #
        # DateKey              = HMAC-SHA256("AWS4"+"<SecretAccessKey>", "<YYYYMMDD>")
        # DateRegionKey        = HMAC-SHA256(<DateKey>, "<aws-region>")
        # DateRegionServiceKey = HMAC-SHA256(<DateRegionKey>, "<aws-service>")
        # SigningKey = HMAC-SHA256(<DateRegionServiceKey>, "aws4_request")
        assert "date" in signing_properties
        k_date = await self._hash(
            key=f"AWS4{secret_key}".encode(), value=signing_properties["date"][0:8]
        )
        k_region = await self._hash(key=k_date, value=signing_properties["region"])
        k_service = await self._hash(key=k_region, value=signing_properties["service"])
        k_signing = await self._hash(key=k_service, value="aws4_request")
        final_hash = await self._hash(key=k_signing, value=string_to_sign)

        return final_hash.hex()

    async def _hash(self, key: bytes, value: str) -> bytes:
        return hmac.new(key=key, msg=value.encode(), digestmod=sha256).digest()

    async def _validate_identity(self, *, identity: _AWSCredentialsIdentity) -> None:
        """Perform runtime and expiration checks before attempting signing."""
        if not isinstance(identity, _AWSCredentialsIdentity):  # pyright: ignore
            raise ValueError(
                "Received unexpected value for identity parameter. Expected "
                f"AWSCredentialIdentity but received {type(identity)}."
            )
        elif identity.is_expired:
            raise ValueError(
                f"Provided identity expired at {identity.expiration}. Please "
                "refresh the credentials or update the expiration parameter."
            )

    async def _normalize_signing_properties(
        self, *, signing_properties: SigV4SigningProperties
    ) -> SigV4SigningProperties:
        # Create copy of signing properties to avoid mutating the original
        new_signing_properties = SigV4SigningProperties(**signing_properties)
        if "date" not in new_signing_properties:
            date_obj = datetime.datetime.now(datetime.UTC)
            new_signing_properties["date"] = date_obj.strftime(SIGV4_TIMESTAMP_FORMAT)
        return new_signing_properties

    async def _generate_new_request(self, *, request: AWSRequest) -> AWSRequest:
        return deepcopy(request)

    async def _apply_required_fields(
        self,
        *,
        request: AWSRequest,
        signing_properties: SigV4SigningProperties,
        identity: _AWSCredentialsIdentity,
    ) -> None:
        # Apply required X-Amz-Date if neither X-Amz-Date nor Date are present.
        if "Date" not in request.fields and "X-Amz-Date" not in request.fields:
            assert "date" in signing_properties
            request.fields.set_field(
                Field(name="X-Amz-Date", values=[signing_properties["date"]])
            )
        # Apply required X-Amz-Security-Token if token present on identity
        if (
            "X-Amz-Security-Token" not in request.fields
            and identity.session_token is not None
        ):
            request.fields.set_field(
                Field(name="X-Amz-Security-Token", values=[identity.session_token])
            )

    async def canonical_request(
        self, *, signing_properties: SigV4SigningProperties, request: AWSRequest
    ) -> str:
        """The canonical request is a standardized string laying out the components used
        in the SigV4 signing algorithm. This is useful to quickly compare inputs to find
        signature mismatches and unintended variances.

        The SigV4 specification defines the canonical request to be:
            <HTTPMethod>\n
            <CanonicalURI>\n
            <CanonicalQueryString>\n
            <CanonicalHeaders>\n
            <SignedHeaders>\n
            <HashedPayload>

        :param signing_properties:
            SigV4SigningProperties to define signing primitives such as
            the target service, region, and date.
        :param request:
            An AWSRequest to use for generating a SigV4 signature.
        """
        # We generate the payload first to ensure any field modifications
        # are in place before choosing the canonical fields.
        canonical_payload = await self._format_canonical_payload(
            request=request, signing_properties=signing_properties
        )
        canonical_path = await self._format_canonical_path(
            path=request.destination.path, signing_properties=signing_properties
        )
        canonical_query = await self._format_canonical_query(
            query=request.destination.query
        )
        normalized_fields = await self._normalize_signing_fields(request=request)
        canonical_fields = await self._format_canonical_fields(fields=normalized_fields)
        return (
            f"{request.method.upper()}\n"
            f"{canonical_path}\n"
            f"{canonical_query}\n"
            f"{canonical_fields}\n"
            f"{';'.join(normalized_fields)}\n"
            f"{canonical_payload}"
        )

    async def string_to_sign(
        self,
        *,
        canonical_request: str,
        signing_properties: SigV4SigningProperties,
    ) -> str:
        """The string to sign is the second step of our signing algorithm which
        concatenates the formal identifier of our signing algorithm, the signing
        DateTime, the scope of our credentials, and a hash of our previously generated
        canonical request. This is another checkpoint that can be used to ensure we're
        constructing our signature as intended.

        The SigV4 specification defines the string to sign as:
            Algorithm \n
            RequestDateTime \n
            CredentialScope  \n
            HashedCanonicalRequest

        :param canonical_request:
            String generated from the `canonical_request` method.
        :param signing_properties:
            SigV4SigningProperties to define signing primitives such as
            the target service, region, and date.
        """
        date = signing_properties.get("date")
        if date is None:
            raise MissingExpectedParameterException(
                "Cannot generate string_to_sign without a valid date "
                f"in your signing_properties. Current value: {date}"
            )
        scope = await self._scope(signing_properties=signing_properties)
        return (
            "AWS4-HMAC-SHA256\n"
            f"{date}\n"
            f"{scope}\n"
            f"{sha256(canonical_request.encode()).hexdigest()}"
        )

    async def _scope(self, signing_properties: SigV4SigningProperties) -> str:
        assert "date" in signing_properties
        formatted_date = signing_properties["date"][0:8]
        region = signing_properties["region"]
        service = signing_properties["service"]
        # Scope format: <YYYYMMDD>/<AWS Region>/<AWS Service>/aws4_request
        return f"{formatted_date}/{region}/{service}/aws4_request"

    async def _format_canonical_path(
        self, *, path: str | None, signing_properties: SigV4SigningProperties
    ) -> str:
        if path is None:
            path = "/"

        if signing_properties.get("uri_encode_path", True):
            normalized_path = _remove_dot_segments(path)
            return quote(string=normalized_path, safe="/")
        else:
            return _remove_dot_segments(path, remove_consecutive_slashes=False)

    async def _format_canonical_query(self, *, query: str | None) -> str:
        if query is None:
            return ""

        query_params = parse_qsl(qs=query)
        query_parts = (
            (quote(string=key, safe=""), quote(string=value, safe=""))
            for key, value in query_params
        )
        # key-value pairs must be in sorted order for their encoded forms.
        return "&".join(f"{key}={value}" for key, value in sorted(query_parts))

    async def _normalize_signing_fields(self, *, request: AWSRequest) -> dict[str, str]:
        normalized_fields = {
            field.name.lower(): field.as_string()
            for field in request.fields
            if self._is_signable_header(field.name.lower())
        }
        if "host" not in normalized_fields:
            normalized_fields["host"] = await self._normalize_host_field(
                uri=request.destination  # type: ignore - TODO(pyright)
            )

        return dict(sorted(normalized_fields.items()))

    def _is_signable_header(self, field_name: str):
        if field_name in HEADERS_EXCLUDED_FROM_SIGNING:
            return False
        return True

    async def _normalize_host_field(self, *, uri: URI) -> str:
        if uri.port is not None and DEFAULT_PORTS.get(uri.scheme) == uri.port:
            uri_dict = uri.to_dict()
            uri_dict.update({"port": None})
            uri = URI(**uri_dict)
        return uri.netloc

    async def _format_canonical_fields(self, *, fields: dict[str, str]) -> str:
        return "".join(
            f"{key}:{' '.join(value.split())}\n" for key, value in fields.items()
        )

    async def _should_sha256_sign_payload(
        self,
        *,
        request: AWSRequest,
        signing_properties: SigV4SigningProperties,
    ) -> bool:
        # All insecure connections should be signed
        if request.destination.scheme != "https":
            return True

        return signing_properties.get("payload_signing_enabled", True)

    async def _format_canonical_payload(
        self,
        *,
        request: AWSRequest,
        signing_properties: SigV4SigningProperties,
    ) -> str:
        if (
            "X-Amz-Content-SHA256" in request.fields
            and len(request.fields["X-Amz-Content-SHA256"].values) == 1
        ):
            return request.fields["X-Amz-Content-SHA256"].values[0]

        if self._is_event_stream(request=request):
            request.fields.set_field(
                Field(name="X-Amz-Content-SHA256", values=[EVENT_STREAM_HASH])
            )
            return EVENT_STREAM_HASH

        payload_hash = await self._compute_payload_hash(
            request=request, signing_properties=signing_properties
        )
        if signing_properties.get("content_checksum_enabled", False):
            request.fields.set_field(
                Field(name="X-Amz-Content-SHA256", values=[payload_hash])
            )
        return payload_hash

    async def _compute_payload_hash(
        self, *, request: AWSRequest, signing_properties: SigV4SigningProperties
    ) -> str:
        if not await self._should_sha256_sign_payload(
            request=request, signing_properties=signing_properties
        ):
            return UNSIGNED_PAYLOAD

        body = request.body

        if body is None:
            return EMPTY_SHA256_HASH

        if not isinstance(body, AsyncIterable):
            raise TypeError(
                "A sync body was attached to an asynchronous signer. Please use "
                "SigV4Signer for sync AWSRequests or ensure your body is "
                "of type AsyncIterable[bytes]."
            )
        warnings.warn(
            "Payload signing is enabled. This may result in "
            "decreased performance for large request bodies.",
            AWSSDKWarning,
        )

        checksum = sha256()
        if self._seekable(body):
            position = body.tell()
            async for chunk in body:  # type: ignore
                checksum.update(chunk)  # type: ignore
            await body.seek(position)
        else:
            buffer = io.BytesIO()
            async for chunk in body:
                buffer.write(chunk)
                checksum.update(chunk)
            buffer.seek(0)
            request.body = AsyncBytesReader(buffer)
        return checksum.hexdigest()

    def _is_event_stream(self, request: AWSRequest) -> bool:
        if "Content-Type" not in request.fields:
            return False
        content_type = request.fields["Content-Type"].as_string()
        return content_type == EVENT_STREAM_CONTENT_TYPE

    def _seekable(self, body: AsyncIterable[bytes]) -> TypeGuard[AsyncSeekable]:
        if isinstance(body, ConditionallySeekable):
            return body.seekable()

        return isinstance(body, AsyncSeekable) and iscoroutinefunction(body.seek)

canonical_request(*, signing_properties, request) async

The canonical request is a standardized string laying out the components used in the SigV4 signing algorithm. This is useful to quickly compare inputs to find signature mismatches and unintended variances.

The SigV4 specification defines the canonical request to be

:param signing_properties: SigV4SigningProperties to define signing primitives such as the target service, region, and date. :param request: An AWSRequest to use for generating a SigV4 signature.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
async def canonical_request(
    self, *, signing_properties: SigV4SigningProperties, request: AWSRequest
) -> str:
    """The canonical request is a standardized string laying out the components used
    in the SigV4 signing algorithm. This is useful to quickly compare inputs to find
    signature mismatches and unintended variances.

    The SigV4 specification defines the canonical request to be:
        <HTTPMethod>\n
        <CanonicalURI>\n
        <CanonicalQueryString>\n
        <CanonicalHeaders>\n
        <SignedHeaders>\n
        <HashedPayload>

    :param signing_properties:
        SigV4SigningProperties to define signing primitives such as
        the target service, region, and date.
    :param request:
        An AWSRequest to use for generating a SigV4 signature.
    """
    # We generate the payload first to ensure any field modifications
    # are in place before choosing the canonical fields.
    canonical_payload = await self._format_canonical_payload(
        request=request, signing_properties=signing_properties
    )
    canonical_path = await self._format_canonical_path(
        path=request.destination.path, signing_properties=signing_properties
    )
    canonical_query = await self._format_canonical_query(
        query=request.destination.query
    )
    normalized_fields = await self._normalize_signing_fields(request=request)
    canonical_fields = await self._format_canonical_fields(fields=normalized_fields)
    return (
        f"{request.method.upper()}\n"
        f"{canonical_path}\n"
        f"{canonical_query}\n"
        f"{canonical_fields}\n"
        f"{';'.join(normalized_fields)}\n"
        f"{canonical_payload}"
    )

generate_authorization_field(*, credential, signed_headers, signature) async

Generate the Authorization field.

:param credential: Credential scope string for generating the Authorization header. Defined as: //// :param signed_headers: A list of the field names used in signing. :param signature: Final hash of the SigV4 signing algorithm generated from the canonical request and string to sign.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
async def generate_authorization_field(
    self, *, credential: str, signed_headers: list[str], signature: str
) -> Field:
    """Generate the `Authorization` field.

    :param credential:
        Credential scope string for generating the Authorization header.
        Defined as:
            <access_key>/<date>/<region>/<service>/<request_type>
    :param signed_headers:
        A list of the field names used in signing.
    :param signature:
        Final hash of the SigV4 signing algorithm generated from the
        canonical request and string to sign.
    """
    signed_headers_str = ";".join(signed_headers)
    auth_str = (
        f"AWS4-HMAC-SHA256 Credential={credential}, "
        f"SignedHeaders={signed_headers_str}, Signature={signature}"
    )
    return Field(name="Authorization", values=[auth_str])

sign(*, request, identity, properties) async

Generate and apply a SigV4 Signature to a copy of the supplied request.

:param request: An AWSRequest to sign prior to sending to the service. :param identity: A set of credentials representing an AWS Identity or role capacity. :param properties: SigV4SigningProperties to define signing primitives such as the target service, region, and date.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
async def sign(
    self,
    *,
    request: AWSRequest,
    identity: _AWSCredentialsIdentity,
    properties: SigV4SigningProperties,
) -> AWSRequest:
    """Generate and apply a SigV4 Signature to a copy of the supplied request.

    :param request: An AWSRequest to sign prior to sending to the service.
    :param identity: A set of credentials representing an AWS Identity or role
        capacity.
    :param properties: SigV4SigningProperties to define signing primitives such as
        the target service, region, and date.
    """
    # Copy and prepopulate any missing values in the
    # supplied request and signing properties.

    await self._validate_identity(identity=identity)
    new_signing_properties = await self._normalize_signing_properties(
        signing_properties=properties
    )
    new_request = await self._generate_new_request(request=request)
    await self._apply_required_fields(
        request=new_request,
        signing_properties=new_signing_properties,
        identity=identity,
    )

    # Construct core signing components
    canonical_request = await self.canonical_request(
        signing_properties=properties,
        request=new_request,
    )
    string_to_sign = await self.string_to_sign(
        canonical_request=canonical_request,
        signing_properties=new_signing_properties,
    )
    signature = await self._signature(
        string_to_sign=string_to_sign,
        secret_key=identity.secret_access_key,
        signing_properties=new_signing_properties,
    )

    signing_fields = await self._normalize_signing_fields(request=new_request)
    credential_scope = await self._scope(signing_properties=new_signing_properties)
    credential = f"{identity.access_key_id}/{credential_scope}"
    authorization = await self.generate_authorization_field(
        credential=credential,
        signed_headers=list(signing_fields.keys()),
        signature=signature,
    )
    new_request.fields.set_field(authorization)
    return new_request

string_to_sign(*, canonical_request, signing_properties) async

The string to sign is the second step of our signing algorithm which concatenates the formal identifier of our signing algorithm, the signing DateTime, the scope of our credentials, and a hash of our previously generated canonical request. This is another checkpoint that can be used to ensure we're constructing our signature as intended.

The SigV4 specification defines the string to sign as

Algorithm

RequestDateTime

CredentialScope

HashedCanonicalRequest

:param canonical_request: String generated from the canonical_request method. :param signing_properties: SigV4SigningProperties to define signing primitives such as the target service, region, and date.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
async def string_to_sign(
    self,
    *,
    canonical_request: str,
    signing_properties: SigV4SigningProperties,
) -> str:
    """The string to sign is the second step of our signing algorithm which
    concatenates the formal identifier of our signing algorithm, the signing
    DateTime, the scope of our credentials, and a hash of our previously generated
    canonical request. This is another checkpoint that can be used to ensure we're
    constructing our signature as intended.

    The SigV4 specification defines the string to sign as:
        Algorithm \n
        RequestDateTime \n
        CredentialScope  \n
        HashedCanonicalRequest

    :param canonical_request:
        String generated from the `canonical_request` method.
    :param signing_properties:
        SigV4SigningProperties to define signing primitives such as
        the target service, region, and date.
    """
    date = signing_properties.get("date")
    if date is None:
        raise MissingExpectedParameterException(
            "Cannot generate string_to_sign without a valid date "
            f"in your signing_properties. Current value: {date}"
        )
    scope = await self._scope(signing_properties=signing_properties)
    return (
        "AWS4-HMAC-SHA256\n"
        f"{date}\n"
        f"{scope}\n"
        f"{sha256(canonical_request.encode()).hexdigest()}"
    )

Field

Bases: Field

A name-value pair representing a single field in an HTTP Request or Response.

The kind will dictate metadata placement within an HTTP message.

All field names are case insensitive and case-variance must be treated as equivalent. Names may be normalized but should be preserved for accuracy during transmission.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
class Field(interfaces_http.Field):
    """A name-value pair representing a single field in an HTTP Request or Response.

    The kind will dictate metadata placement within an HTTP message.

    All field names are case insensitive and case-variance must be treated as
    equivalent. Names may be normalized but should be preserved for accuracy during
    transmission.
    """

    def __init__(
        self,
        *,
        name: str,
        values: Iterable[str] | None = None,
        kind: interfaces_http.FieldPosition = interfaces_http.FieldPosition.HEADER,
    ):
        self.name = name
        self.values: list[str] = list(values) if values is not None else []
        self.kind = kind

    def add(self, value: str) -> None:
        """Append a value to a field."""
        self.values.append(value)

    def set(self, values: list[str]) -> None:
        """Overwrite existing field values."""
        self.values = values

    def remove(self, value: str) -> None:
        """Remove all matching entries from list."""
        try:
            while True:
                self.values.remove(value)
        except ValueError:
            return

    def as_string(self, delimiter: str = ",") -> str:
        """Get delimited string of all values. A comma followed by a space is used by
        default.

        If the ``Field`` has zero values, the empty string is returned. If the ``Field``
        has exactly one value, the value is returned unmodified.

        For ``Field``s with more than one value, the values are joined by a comma and a
        space. For such multi-valued ``Field``s, any values that already contain
        commas or double quotes will be surrounded by double quotes. Within any values
        that get quoted, pre-existing double quotes and backslashes are escaped with a
        backslash.
        """
        value_count = len(self.values)
        if value_count == 0:
            return ""
        if value_count == 1:
            return self.values[0]
        return delimiter.join(quote_and_escape_field_value(val) for val in self.values)

    def as_tuples(self) -> list[tuple[str, str]]:
        """Get list of ``name``, ``value`` tuples where each tuple represents one
        value."""
        return [(self.name, val) for val in self.values]

    def __eq__(self, other: object) -> bool:
        """Name, values, and kind must match.

        Values order must match.
        """
        if not isinstance(other, Field):
            return False
        return (
            self.name == other.name
            and self.kind is other.kind
            and self.values == other.values
        )

    def __repr__(self) -> str:
        return f"Field(name={self.name!r}, value={self.values!r}, kind={self.kind!r})"

__eq__(other)

Name, values, and kind must match.

Values order must match.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def __eq__(self, other: object) -> bool:
    """Name, values, and kind must match.

    Values order must match.
    """
    if not isinstance(other, Field):
        return False
    return (
        self.name == other.name
        and self.kind is other.kind
        and self.values == other.values
    )

add(value)

Append a value to a field.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def add(self, value: str) -> None:
    """Append a value to a field."""
    self.values.append(value)

as_string(delimiter=',')

Get delimited string of all values. A comma followed by a space is used by default.

If the Field has zero values, the empty string is returned. If the Field has exactly one value, the value is returned unmodified.

For Fields with more than one value, the values are joined by a comma and a space. For such multi-valued Fields, any values that already contain commas or double quotes will be surrounded by double quotes. Within any values that get quoted, pre-existing double quotes and backslashes are escaped with a backslash.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def as_string(self, delimiter: str = ",") -> str:
    """Get delimited string of all values. A comma followed by a space is used by
    default.

    If the ``Field`` has zero values, the empty string is returned. If the ``Field``
    has exactly one value, the value is returned unmodified.

    For ``Field``s with more than one value, the values are joined by a comma and a
    space. For such multi-valued ``Field``s, any values that already contain
    commas or double quotes will be surrounded by double quotes. Within any values
    that get quoted, pre-existing double quotes and backslashes are escaped with a
    backslash.
    """
    value_count = len(self.values)
    if value_count == 0:
        return ""
    if value_count == 1:
        return self.values[0]
    return delimiter.join(quote_and_escape_field_value(val) for val in self.values)

as_tuples()

Get list of name, value tuples where each tuple represents one value.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def as_tuples(self) -> list[tuple[str, str]]:
    """Get list of ``name``, ``value`` tuples where each tuple represents one
    value."""
    return [(self.name, val) for val in self.values]

remove(value)

Remove all matching entries from list.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def remove(self, value: str) -> None:
    """Remove all matching entries from list."""
    try:
        while True:
            self.values.remove(value)
    except ValueError:
        return

set(values)

Overwrite existing field values.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def set(self, values: list[str]) -> None:
    """Overwrite existing field values."""
    self.values = values

Fields

Bases: Fields

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
class Fields(interfaces_http.Fields):
    def __init__(
        self,
        initial: Iterable[interfaces_http.Field] | None = None,
        *,
        encoding: str = "utf-8",
    ):
        """Collection of header and trailer entries mapped by name.

        :param initial: Initial list of ``Field`` objects. ``Field``s can also be added
        and later removed.
        :param encoding: The string encoding to be used when converting the ``Field``
        name and value from ``str`` to ``bytes`` for transmission.
        """
        init_fields = list(initial) if initial is not None else []
        init_field_names = [self._normalize_field_name(fld.name) for fld in init_fields]
        fname_counter = Counter(init_field_names)
        repeated_names_exist = (
            len(init_fields) > 0 and fname_counter.most_common(1)[0][1] > 1
        )
        if repeated_names_exist:
            non_unique_names = [name for name, num in fname_counter.items() if num > 1]
            raise ValueError(
                "Field names of the initial list of fields must be unique. The "
                "following normalized field names appear more than once: "
                f"{', '.join(non_unique_names)}."
            )
        init_tuples = zip(init_field_names, init_fields)
        self.entries: OrderedDict[str, interfaces_http.Field] = OrderedDict(init_tuples)
        self.encoding: str = encoding

    def set_field(self, field: interfaces_http.Field) -> None:
        """Alias for __setitem__ to utilize the field.name for the entry key."""
        self.__setitem__(field.name, field)

    def __setitem__(self, name: str, field: interfaces_http.Field) -> None:
        """Set or override entry for a Field name."""
        normalized_name = self._normalize_field_name(name)
        normalized_field_name = self._normalize_field_name(field.name)
        if normalized_name != normalized_field_name:
            raise ValueError(
                f"Supplied key {name} does not match Field.name "
                f"provided: {normalized_field_name}"
            )
        self.entries[normalized_name] = field

    def get(
        self, key: str, default: interfaces_http.Field | None = None
    ) -> interfaces_http.Field | None:
        return self[key] if key in self else default

    def __getitem__(self, name: str) -> interfaces_http.Field:
        """Retrieve Field entry."""
        normalized_name = self._normalize_field_name(name)
        return self.entries[normalized_name]

    def __delitem__(self, name: str) -> None:
        """Delete entry from collection."""
        normalized_name = self._normalize_field_name(name)
        del self.entries[normalized_name]

    def get_by_type(
        self, kind: interfaces_http.FieldPosition
    ) -> list[interfaces_http.Field]:
        """Helper function for retrieving specific types of fields.

        Used to grab all headers or all trailers.
        """
        return [entry for entry in self.entries.values() if entry.kind is kind]

    def extend(self, other: interfaces_http.Fields) -> None:
        """Merges ``entries`` of ``other`` into the current ``entries``.

        For every `Field` in the ``entries`` of ``other``: If the normalized name
        already exists in the current ``entries``, the values from ``other`` are
        appended. Otherwise, the ``Field`` is added to the list of ``entries``.
        """
        for other_field in other:
            try:
                cur_field = self.__getitem__(other_field.name)
                for other_value in other_field.values:
                    cur_field.add(other_value)
            except KeyError:
                self.__setitem__(other_field.name, other_field)

    def _normalize_field_name(self, name: str) -> str:
        """Normalize field names.

        For use as key in ``entries``.
        """
        return name.lower()

    def __eq__(self, other: object) -> bool:
        """Encoding must match.

        Entries must match in values and order.
        """
        if not isinstance(other, Fields):
            return False
        return self.encoding == other.encoding and self.entries == other.entries

    def __iter__(self) -> Iterator[interfaces_http.Field]:
        yield from self.entries.values()

    def __len__(self) -> int:
        return len(self.entries)

    def __repr__(self) -> str:
        return f"Fields({self.entries})"

    def __contains__(self, key: str) -> bool:
        return self._normalize_field_name(key) in self.entries

__delitem__(name)

Delete entry from collection.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def __delitem__(self, name: str) -> None:
    """Delete entry from collection."""
    normalized_name = self._normalize_field_name(name)
    del self.entries[normalized_name]

__eq__(other)

Encoding must match.

Entries must match in values and order.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def __eq__(self, other: object) -> bool:
    """Encoding must match.

    Entries must match in values and order.
    """
    if not isinstance(other, Fields):
        return False
    return self.encoding == other.encoding and self.entries == other.entries

__getitem__(name)

Retrieve Field entry.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def __getitem__(self, name: str) -> interfaces_http.Field:
    """Retrieve Field entry."""
    normalized_name = self._normalize_field_name(name)
    return self.entries[normalized_name]

__init__(initial=None, *, encoding='utf-8')

Collection of header and trailer entries mapped by name.

:param initial: Initial list of Field objects. Fields can also be added and later removed. :param encoding: The string encoding to be used when converting the Field name and value from str to bytes for transmission.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def __init__(
    self,
    initial: Iterable[interfaces_http.Field] | None = None,
    *,
    encoding: str = "utf-8",
):
    """Collection of header and trailer entries mapped by name.

    :param initial: Initial list of ``Field`` objects. ``Field``s can also be added
    and later removed.
    :param encoding: The string encoding to be used when converting the ``Field``
    name and value from ``str`` to ``bytes`` for transmission.
    """
    init_fields = list(initial) if initial is not None else []
    init_field_names = [self._normalize_field_name(fld.name) for fld in init_fields]
    fname_counter = Counter(init_field_names)
    repeated_names_exist = (
        len(init_fields) > 0 and fname_counter.most_common(1)[0][1] > 1
    )
    if repeated_names_exist:
        non_unique_names = [name for name, num in fname_counter.items() if num > 1]
        raise ValueError(
            "Field names of the initial list of fields must be unique. The "
            "following normalized field names appear more than once: "
            f"{', '.join(non_unique_names)}."
        )
    init_tuples = zip(init_field_names, init_fields)
    self.entries: OrderedDict[str, interfaces_http.Field] = OrderedDict(init_tuples)
    self.encoding: str = encoding

__setitem__(name, field)

Set or override entry for a Field name.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def __setitem__(self, name: str, field: interfaces_http.Field) -> None:
    """Set or override entry for a Field name."""
    normalized_name = self._normalize_field_name(name)
    normalized_field_name = self._normalize_field_name(field.name)
    if normalized_name != normalized_field_name:
        raise ValueError(
            f"Supplied key {name} does not match Field.name "
            f"provided: {normalized_field_name}"
        )
    self.entries[normalized_name] = field

extend(other)

Merges entries of other into the current entries.

For every Field in the entries of other: If the normalized name already exists in the current entries, the values from other are appended. Otherwise, the Field is added to the list of entries.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def extend(self, other: interfaces_http.Fields) -> None:
    """Merges ``entries`` of ``other`` into the current ``entries``.

    For every `Field` in the ``entries`` of ``other``: If the normalized name
    already exists in the current ``entries``, the values from ``other`` are
    appended. Otherwise, the ``Field`` is added to the list of ``entries``.
    """
    for other_field in other:
        try:
            cur_field = self.__getitem__(other_field.name)
            for other_value in other_field.values:
                cur_field.add(other_value)
        except KeyError:
            self.__setitem__(other_field.name, other_field)

get_by_type(kind)

Helper function for retrieving specific types of fields.

Used to grab all headers or all trailers.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def get_by_type(
    self, kind: interfaces_http.FieldPosition
) -> list[interfaces_http.Field]:
    """Helper function for retrieving specific types of fields.

    Used to grab all headers or all trailers.
    """
    return [entry for entry in self.entries.values() if entry.kind is kind]

set_field(field)

Alias for setitem to utilize the field.name for the entry key.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def set_field(self, field: interfaces_http.Field) -> None:
    """Alias for __setitem__ to utilize the field.name for the entry key."""
    self.__setitem__(field.name, field)

SigV4Signer

Request signer for applying the AWS Signature Version 4 algorithm.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
class SigV4Signer:
    """Request signer for applying the AWS Signature Version 4 algorithm."""

    def sign(
        self,
        *,
        request: AWSRequest,
        identity: _AWSCredentialsIdentity,
        properties: SigV4SigningProperties,
    ) -> AWSRequest:
        """Generate and apply a SigV4 Signature to a copy of the supplied request.

        :param request: An AWSRequest to sign prior to sending to the service.
        :param identity: A set of credentials representing an AWS Identity or role
            capacity.
        :param properties: SigV4SigningProperties to define signing primitives such as
            the target service, region, and date.
        """
        # Copy and prepopulate any missing values in the
        # supplied request and signing properties.
        self._validate_identity(identity=identity)
        new_signing_properties = self._normalize_signing_properties(
            signing_properties=properties
        )
        assert "date" in new_signing_properties

        new_request = self._generate_new_request(request=request)
        self._apply_required_fields(
            request=new_request,
            signing_properties=new_signing_properties,
            identity=identity,
        )

        # Construct core signing components
        canonical_request = self.canonical_request(
            signing_properties=new_signing_properties,
            request=new_request,
        )
        string_to_sign = self.string_to_sign(
            canonical_request=canonical_request,
            signing_properties=new_signing_properties,
        )
        signature = self._signature(
            string_to_sign=string_to_sign,
            secret_key=identity.secret_access_key,
            signing_properties=new_signing_properties,
        )

        signing_fields = self._normalize_signing_fields(request=new_request)
        credential_scope = self._scope(signing_properties=new_signing_properties)
        credential = f"{identity.access_key_id}/{credential_scope}"
        authorization = self.generate_authorization_field(
            credential=credential,
            signed_headers=list(signing_fields.keys()),
            signature=signature,
        )
        new_request.fields.set_field(authorization)

        return new_request

    def generate_authorization_field(
        self, *, credential: str, signed_headers: list[str], signature: str
    ) -> Field:
        """Generate the `Authorization` field.

        :param credential:
            Credential scope string for generating the Authorization header.
            Defined as:
                <access_key>/<date>/<region>/<service>/<request_type>
        :param signed_headers:
            A list of the field names used in signing.
        :param signature:
            Final hash of the SigV4 signing algorithm generated from the
            canonical request and string to sign.
        """
        signed_headers_str = ";".join(signed_headers)
        auth_str = (
            f"AWS4-HMAC-SHA256 Credential={credential}, "
            f"SignedHeaders={signed_headers_str}, Signature={signature}"
        )
        return Field(name="Authorization", values=[auth_str])

    def _signature(
        self,
        *,
        string_to_sign: str,
        secret_key: str,
        signing_properties: SigV4SigningProperties,
    ) -> str:
        """Sign the string to sign.

        In SigV4, a signing key is created that is scoped to a specific region and
        service. The date, region, service and resulting signing key are individually
        hashed, then the composite hash is used to sign the string to sign.
        """

        # Components of Signing Key Calculation
        #
        # DateKey              = HMAC-SHA256("AWS4"+"<SecretAccessKey>", "<YYYYMMDD>")
        # DateRegionKey        = HMAC-SHA256(<DateKey>, "<aws-region>")
        # DateRegionServiceKey = HMAC-SHA256(<DateRegionKey>, "<aws-service>")
        # SigningKey = HMAC-SHA256(<DateRegionServiceKey>, "aws4_request")
        assert "date" in signing_properties
        k_date = self._hash(
            key=f"AWS4{secret_key}".encode(), value=signing_properties["date"][0:8]
        )
        k_region = self._hash(key=k_date, value=signing_properties["region"])
        k_service = self._hash(key=k_region, value=signing_properties["service"])
        k_signing = self._hash(key=k_service, value="aws4_request")

        return self._hash(key=k_signing, value=string_to_sign).hex()

    def _hash(self, key: bytes, value: str) -> bytes:
        return hmac.new(key=key, msg=value.encode(), digestmod=sha256).digest()

    def _validate_identity(self, *, identity: _AWSCredentialsIdentity) -> None:
        """Perform runtime and expiration checks before attempting signing."""
        if not isinstance(identity, _AWSCredentialsIdentity):  # pyright: ignore
            raise ValueError(
                "Received unexpected value for identity parameter. Expected "
                f"AWSCredentialIdentity but received {type(identity)}."
            )
        elif identity.is_expired:
            raise ValueError(
                f"Provided identity expired at {identity.expiration}. Please "
                "refresh the credentials or update the expiration parameter."
            )

    def _normalize_signing_properties(
        self, *, signing_properties: SigV4SigningProperties
    ) -> SigV4SigningProperties:
        # Create copy of signing properties to avoid mutating the original
        new_signing_properties = SigV4SigningProperties(**signing_properties)
        if "date" not in new_signing_properties:
            date_obj = datetime.datetime.now(datetime.UTC)
            new_signing_properties["date"] = date_obj.strftime(SIGV4_TIMESTAMP_FORMAT)
        return new_signing_properties

    def _generate_new_request(self, *, request: AWSRequest) -> AWSRequest:
        return deepcopy(request)

    def _apply_required_fields(
        self,
        *,
        request: AWSRequest,
        signing_properties: SigV4SigningProperties,
        identity: _AWSCredentialsIdentity,
    ) -> None:
        # Apply required X-Amz-Date if neither X-Amz-Date nor Date are present.
        if "Date" not in request.fields and "X-Amz-Date" not in request.fields:
            assert "date" in signing_properties
            request.fields.set_field(
                Field(name="X-Amz-Date", values=[signing_properties["date"]])
            )
        # Apply required X-Amz-Security-Token if token present on identity
        if (
            "X-Amz-Security-Token" not in request.fields
            and identity.session_token is not None
        ):
            request.fields.set_field(
                Field(name="X-Amz-Security-Token", values=[identity.session_token])
            )

    def canonical_request(
        self, *, signing_properties: SigV4SigningProperties, request: AWSRequest
    ) -> str:
        """The canonical request is a standardized string laying out the components used
        in the SigV4 signing algorithm. This is useful to quickly compare inputs to find
        signature mismatches and unintended variances.

        The SigV4 specification defines the canonical request to be:
            <HTTPMethod>\n
            <CanonicalURI>\n
            <CanonicalQueryString>\n
            <CanonicalHeaders>\n
            <SignedHeaders>\n
            <HashedPayload>

        :param signing_properties:
            SigV4SigningProperties to define signing primitives such as
            the target service, region, and date.
        :param request:
            An AWSRequest to use for generating a SigV4 signature.
        """
        # We generate the payload first to ensure any field modifications
        # are in place before choosing the canonical fields.
        canonical_payload = self._format_canonical_payload(
            request=request, signing_properties=signing_properties
        )
        canonical_path = self._format_canonical_path(
            path=request.destination.path, signing_properties=signing_properties
        )
        canonical_query = self._format_canonical_query(query=request.destination.query)
        normalized_fields = self._normalize_signing_fields(request=request)
        canonical_fields = self._format_canonical_fields(fields=normalized_fields)
        return (
            f"{request.method.upper()}\n"
            f"{canonical_path}\n"
            f"{canonical_query}\n"
            f"{canonical_fields}\n"
            f"{';'.join(normalized_fields)}\n"
            f"{canonical_payload}"
        )

    def string_to_sign(
        self,
        *,
        canonical_request: str,
        signing_properties: SigV4SigningProperties,
    ) -> str:
        """The string to sign is the second step of our signing algorithm which
        concatenates the formal identifier of our signing algorithm, the signing
        DateTime, the scope of our credentials, and a hash of our previously generated
        canonical request. This is another checkpoint that can be used to ensure we're
        constructing our signature as intended.

        The SigV4 specification defines the string to sign as:
            Algorithm \n
            RequestDateTime \n
            CredentialScope  \n
            HashedCanonicalRequest

        :param canonical_request:
            String generated from the `canonical_request` method.
        :param signing_properties:
            SigV4SigningProperties to define signing primitives such as
            the target service, region, and date.
        """
        date = signing_properties.get("date")
        if date is None:
            raise MissingExpectedParameterException(
                "Cannot generate string_to_sign without a valid date "
                f"in your signing_properties. Current value: {date}"
            )
        return (
            "AWS4-HMAC-SHA256\n"
            f"{date}\n"
            f"{self._scope(signing_properties=signing_properties)}\n"
            f"{sha256(canonical_request.encode()).hexdigest()}"
        )

    def _scope(self, signing_properties: SigV4SigningProperties) -> str:
        assert "date" in signing_properties
        formatted_date = signing_properties["date"][0:8]
        region = signing_properties["region"]
        service = signing_properties["service"]
        # Scope format: <YYYYMMDD>/<AWS Region>/<AWS Service>/aws4_request
        return f"{formatted_date}/{region}/{service}/aws4_request"

    def _format_canonical_path(
        self, *, path: str | None, signing_properties: SigV4SigningProperties
    ) -> str:
        if path is None:
            path = "/"

        if signing_properties.get("uri_encode_path", True):
            normalized_path = _remove_dot_segments(path)
            return quote(string=normalized_path, safe="/")
        else:
            return _remove_dot_segments(path, remove_consecutive_slashes=False)

    def _format_canonical_query(self, *, query: str | None) -> str:
        if query is None:
            return ""

        query_params = parse_qsl(qs=query)
        query_parts = (
            (quote(string=key, safe=""), quote(string=value, safe=""))
            for key, value in query_params
        )
        # key-value pairs must be in sorted order for their encoded forms.
        return "&".join(f"{key}={value}" for key, value in sorted(query_parts))

    def _normalize_signing_fields(self, *, request: AWSRequest) -> dict[str, str]:
        normalized_fields = {
            field.name.lower(): field.as_string()
            for field in request.fields
            if self._is_signable_header(field.name.lower())
        }
        if "host" not in normalized_fields:
            normalized_fields["host"] = self._normalize_host_field(
                uri=request.destination  # type: ignore - TODO(pyright)
            )

        return dict(sorted(normalized_fields.items()))

    def _is_signable_header(self, field_name: str):
        if field_name in HEADERS_EXCLUDED_FROM_SIGNING:
            return False
        return True

    def _normalize_host_field(self, *, uri: URI) -> str:
        if uri.port is not None and DEFAULT_PORTS.get(uri.scheme) == uri.port:
            uri_dict = uri.to_dict()
            uri_dict.update({"port": None})
            uri = URI(**uri_dict)
        return uri.netloc

    def _format_canonical_fields(self, *, fields: dict[str, str]) -> str:
        return "".join(
            f"{key}:{' '.join(value.split())}\n" for key, value in fields.items()
        )

    def _should_sha256_sign_payload(
        self,
        *,
        request: AWSRequest,
        signing_properties: SigV4SigningProperties,
    ) -> bool:
        # All insecure connections should be signed
        if request.destination.scheme != "https":
            return True

        return signing_properties.get("payload_signing_enabled", True)

    def _format_canonical_payload(
        self,
        *,
        request: AWSRequest,
        signing_properties: SigV4SigningProperties,
    ) -> str:
        payload_hash = self._compute_payload_hash(
            request=request, signing_properties=signing_properties
        )
        if signing_properties.get("content_checksum_enabled", False):
            request.fields.set_field(
                Field(name="X-Amz-Content-SHA256", values=[payload_hash])
            )
        return payload_hash

    def _compute_payload_hash(
        self, *, request: AWSRequest, signing_properties: SigV4SigningProperties
    ) -> str:
        if not self._should_sha256_sign_payload(
            request=request, signing_properties=signing_properties
        ):
            return UNSIGNED_PAYLOAD

        body = request.body

        if body is None:
            return EMPTY_SHA256_HASH

        if not isinstance(body, Iterable):
            raise TypeError(
                "An async body was attached to a synchronous signer. Please use "
                "AsyncSigV4Signer for async AWSRequests or ensure your body is "
                "of type Iterable[bytes]."
            )

        warnings.warn(
            "Payload signing is enabled. This may result in "
            "decreased performance for large request bodies.",
            AWSSDKWarning,
        )

        checksum = sha256()
        if self._seekable(body):
            position = body.tell()
            for chunk in body:  # type: ignore
                checksum.update(chunk)  # type: ignore
            body.seek(position)
        else:
            buffer = io.BytesIO()
            for chunk in body:
                buffer.write(chunk)
                checksum.update(chunk)
            buffer.seek(0)
            request.body = buffer
        return checksum.hexdigest()

    def _seekable(self, body: Iterable[bytes]) -> TypeGuard[Seekable]:
        if isinstance(body, ConditionallySeekable):
            return body.seekable()

        return isinstance(body, Seekable)

canonical_request(*, signing_properties, request)

The canonical request is a standardized string laying out the components used in the SigV4 signing algorithm. This is useful to quickly compare inputs to find signature mismatches and unintended variances.

The SigV4 specification defines the canonical request to be

:param signing_properties: SigV4SigningProperties to define signing primitives such as the target service, region, and date. :param request: An AWSRequest to use for generating a SigV4 signature.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
def canonical_request(
    self, *, signing_properties: SigV4SigningProperties, request: AWSRequest
) -> str:
    """The canonical request is a standardized string laying out the components used
    in the SigV4 signing algorithm. This is useful to quickly compare inputs to find
    signature mismatches and unintended variances.

    The SigV4 specification defines the canonical request to be:
        <HTTPMethod>\n
        <CanonicalURI>\n
        <CanonicalQueryString>\n
        <CanonicalHeaders>\n
        <SignedHeaders>\n
        <HashedPayload>

    :param signing_properties:
        SigV4SigningProperties to define signing primitives such as
        the target service, region, and date.
    :param request:
        An AWSRequest to use for generating a SigV4 signature.
    """
    # We generate the payload first to ensure any field modifications
    # are in place before choosing the canonical fields.
    canonical_payload = self._format_canonical_payload(
        request=request, signing_properties=signing_properties
    )
    canonical_path = self._format_canonical_path(
        path=request.destination.path, signing_properties=signing_properties
    )
    canonical_query = self._format_canonical_query(query=request.destination.query)
    normalized_fields = self._normalize_signing_fields(request=request)
    canonical_fields = self._format_canonical_fields(fields=normalized_fields)
    return (
        f"{request.method.upper()}\n"
        f"{canonical_path}\n"
        f"{canonical_query}\n"
        f"{canonical_fields}\n"
        f"{';'.join(normalized_fields)}\n"
        f"{canonical_payload}"
    )

generate_authorization_field(*, credential, signed_headers, signature)

Generate the Authorization field.

:param credential: Credential scope string for generating the Authorization header. Defined as: //// :param signed_headers: A list of the field names used in signing. :param signature: Final hash of the SigV4 signing algorithm generated from the canonical request and string to sign.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
def generate_authorization_field(
    self, *, credential: str, signed_headers: list[str], signature: str
) -> Field:
    """Generate the `Authorization` field.

    :param credential:
        Credential scope string for generating the Authorization header.
        Defined as:
            <access_key>/<date>/<region>/<service>/<request_type>
    :param signed_headers:
        A list of the field names used in signing.
    :param signature:
        Final hash of the SigV4 signing algorithm generated from the
        canonical request and string to sign.
    """
    signed_headers_str = ";".join(signed_headers)
    auth_str = (
        f"AWS4-HMAC-SHA256 Credential={credential}, "
        f"SignedHeaders={signed_headers_str}, Signature={signature}"
    )
    return Field(name="Authorization", values=[auth_str])

sign(*, request, identity, properties)

Generate and apply a SigV4 Signature to a copy of the supplied request.

:param request: An AWSRequest to sign prior to sending to the service. :param identity: A set of credentials representing an AWS Identity or role capacity. :param properties: SigV4SigningProperties to define signing primitives such as the target service, region, and date.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
def sign(
    self,
    *,
    request: AWSRequest,
    identity: _AWSCredentialsIdentity,
    properties: SigV4SigningProperties,
) -> AWSRequest:
    """Generate and apply a SigV4 Signature to a copy of the supplied request.

    :param request: An AWSRequest to sign prior to sending to the service.
    :param identity: A set of credentials representing an AWS Identity or role
        capacity.
    :param properties: SigV4SigningProperties to define signing primitives such as
        the target service, region, and date.
    """
    # Copy and prepopulate any missing values in the
    # supplied request and signing properties.
    self._validate_identity(identity=identity)
    new_signing_properties = self._normalize_signing_properties(
        signing_properties=properties
    )
    assert "date" in new_signing_properties

    new_request = self._generate_new_request(request=request)
    self._apply_required_fields(
        request=new_request,
        signing_properties=new_signing_properties,
        identity=identity,
    )

    # Construct core signing components
    canonical_request = self.canonical_request(
        signing_properties=new_signing_properties,
        request=new_request,
    )
    string_to_sign = self.string_to_sign(
        canonical_request=canonical_request,
        signing_properties=new_signing_properties,
    )
    signature = self._signature(
        string_to_sign=string_to_sign,
        secret_key=identity.secret_access_key,
        signing_properties=new_signing_properties,
    )

    signing_fields = self._normalize_signing_fields(request=new_request)
    credential_scope = self._scope(signing_properties=new_signing_properties)
    credential = f"{identity.access_key_id}/{credential_scope}"
    authorization = self.generate_authorization_field(
        credential=credential,
        signed_headers=list(signing_fields.keys()),
        signature=signature,
    )
    new_request.fields.set_field(authorization)

    return new_request

string_to_sign(*, canonical_request, signing_properties)

The string to sign is the second step of our signing algorithm which concatenates the formal identifier of our signing algorithm, the signing DateTime, the scope of our credentials, and a hash of our previously generated canonical request. This is another checkpoint that can be used to ensure we're constructing our signature as intended.

The SigV4 specification defines the string to sign as

Algorithm

RequestDateTime

CredentialScope

HashedCanonicalRequest

:param canonical_request: String generated from the canonical_request method. :param signing_properties: SigV4SigningProperties to define signing primitives such as the target service, region, and date.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
def string_to_sign(
    self,
    *,
    canonical_request: str,
    signing_properties: SigV4SigningProperties,
) -> str:
    """The string to sign is the second step of our signing algorithm which
    concatenates the formal identifier of our signing algorithm, the signing
    DateTime, the scope of our credentials, and a hash of our previously generated
    canonical request. This is another checkpoint that can be used to ensure we're
    constructing our signature as intended.

    The SigV4 specification defines the string to sign as:
        Algorithm \n
        RequestDateTime \n
        CredentialScope  \n
        HashedCanonicalRequest

    :param canonical_request:
        String generated from the `canonical_request` method.
    :param signing_properties:
        SigV4SigningProperties to define signing primitives such as
        the target service, region, and date.
    """
    date = signing_properties.get("date")
    if date is None:
        raise MissingExpectedParameterException(
            "Cannot generate string_to_sign without a valid date "
            f"in your signing_properties. Current value: {date}"
        )
    return (
        "AWS4-HMAC-SHA256\n"
        f"{date}\n"
        f"{self._scope(signing_properties=signing_properties)}\n"
        f"{sha256(canonical_request.encode()).hexdigest()}"
    )

URI dataclass

Bases: URI

Universal Resource Identifier, target location for a :py:class:HTTPRequest.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
@dataclass(kw_only=True, frozen=True)
class URI(interfaces_http.URI):
    """Universal Resource Identifier, target location for a :py:class:`HTTPRequest`."""

    scheme: str = "https"
    """For example ``http`` or ``https``."""

    username: str | None = None
    """Username part of the userinfo URI component."""

    password: str | None = None
    """Password part of the userinfo URI component."""

    host: str
    """The hostname, for example ``amazonaws.com``."""

    port: int | None = None
    """An explicit port number."""

    path: str | None = None
    """Path component of the URI."""

    query: str | None = None
    """Query component of the URI as string."""

    fragment: str | None = None
    """Part of the URI specification, but may not be transmitted by a client."""

    @property
    def netloc(self) -> str:
        """Construct netloc string in format ``{username}:{password}@{host}:{port}``

        ``username``, ``password``, and ``port`` are only included if set. ``password``
        is ignored, unless ``username`` is also set.
        """
        return self._netloc

    # cached_property does NOT behave like property, it actually allows for setting.
    # Therefore we need a layer of indirection.
    @cached_property
    def _netloc(self) -> str:
        if self.username is not None:
            password = "" if self.password is None else f":{self.password}"
            userinfo = f"{self.username}{password}@"
        else:
            userinfo = ""

        if self.port is not None:
            port = f":{self.port}"
        else:
            port = ""

        host = self.host

        return f"{userinfo}{host}{port}"

    def build(self) -> str:
        """Construct URI string representation.

        Validate host. Returns a string of the form
        ``{scheme}://{username}:{password}@{host}:{port}{path}?{query}#{fragment}``
        """
        components = (
            self.scheme,
            self.netloc,
            self.path or "",
            "",  # params
            self.query,
            self.fragment,
        )
        return urlunparse(components)

    def to_dict(self) -> URIParameters:
        return {
            "scheme": self.scheme,
            "host": self.host,
            "port": self.port,
            "path": self.path,
            "query": self.query,
            "username": self.username,
            "password": self.password,
            "fragment": self.fragment,
        }

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, URI):
            return False
        return (
            self.scheme == other.scheme
            and self.host == other.host
            and self.port == other.port
            and self.path == other.path
            and self.query == other.query
            and self.username == other.username
            and self.password == other.password
            and self.fragment == other.fragment
        )

fragment = None class-attribute instance-attribute

Part of the URI specification, but may not be transmitted by a client.

host instance-attribute

The hostname, for example amazonaws.com.

netloc property

Construct netloc string in format {username}:{password}@{host}:{port}

username, password, and port are only included if set. password is ignored, unless username is also set.

password = None class-attribute instance-attribute

Password part of the userinfo URI component.

path = None class-attribute instance-attribute

Path component of the URI.

port = None class-attribute instance-attribute

An explicit port number.

query = None class-attribute instance-attribute

Query component of the URI as string.

scheme = 'https' class-attribute instance-attribute

For example http or https.

username = None class-attribute instance-attribute

Username part of the userinfo URI component.

build()

Construct URI string representation.

Validate host. Returns a string of the form {scheme}://{username}:{password}@{host}:{port}{path}?{query}#{fragment}

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/_http.py
def build(self) -> str:
    """Construct URI string representation.

    Validate host. Returns a string of the form
    ``{scheme}://{username}:{password}@{host}:{port}{path}?{query}#{fragment}``
    """
    components = (
        self.scheme,
        self.netloc,
        self.path or "",
        "",  # params
        self.query,
        self.fragment,
    )
    return urlunparse(components)

exceptions

BaseAWSSDKException

Bases: Exception

Top-level exception to capture SDK-related errors.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/exceptions.py
class BaseAWSSDKException(Exception):
    """Top-level exception to capture SDK-related errors."""

MissingExpectedParameterException

Bases: BaseAWSSDKException, ValueError

Some APIs require specific signing properties to be present.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/exceptions.py
class MissingExpectedParameterException(BaseAWSSDKException, ValueError):
    """Some APIs require specific signing properties to be present."""

interfaces

events

HEADERS_DICT = Mapping[str, HEADER_VALUE]

A dictionary of event headers.

HEADER_VALUE = bool | int | bytes | str | datetime.datetime | uuid.UUID

A union of valid value types for event headers.

EventHeaderEncoder

Bases: Protocol

A utility class that encodes event headers into bytes.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/events.py
class EventHeaderEncoder(Protocol):
    """A utility class that encodes event headers into bytes."""

    def clear(self) -> None:
        """Clear all previously encoded headers."""
        ...

    def get_result(self) -> bytes:
        """Get all the encoded header bytes."""
        ...

    def encode_headers(self, headers: HEADERS_DICT) -> None:
        """Encode a map of headers."""
        ...
clear()

Clear all previously encoded headers.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/events.py
def clear(self) -> None:
    """Clear all previously encoded headers."""
    ...
encode_headers(headers)

Encode a map of headers.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/events.py
def encode_headers(self, headers: HEADERS_DICT) -> None:
    """Encode a map of headers."""
    ...
get_result()

Get all the encoded header bytes.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/events.py
def get_result(self) -> bytes:
    """Get all the encoded header bytes."""
    ...

EventMessage

Bases: Protocol

A signable message that may be sent over an event stream.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/events.py
class EventMessage(Protocol):
    """A signable message that may be sent over an event stream."""

    headers: HEADERS_DICT
    """The headers present in the event message."""

    payload: bytes
    """The serialized bytes of the message payload."""

    def encode(self) -> bytes:
        """Encode heads and payload into bytes for transit."""
        ...
headers instance-attribute

The headers present in the event message.

payload instance-attribute

The serialized bytes of the message payload.

encode()

Encode heads and payload into bytes for transit.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/events.py
def encode(self) -> bytes:
    """Encode heads and payload into bytes for transit."""
    ...

http

Field

Bases: Protocol

A name-value pair representing a single field in a request or response.

The kind will dictate metadata placement within a message, for example as a header or trailer field in an HTTP request as defined in RFC 9110 Section 5.

All field names are case insensitive and case-variance must be treated as equivalent. Names may be normalized but should be preserved for accuracy during transmission.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
class Field(Protocol):
    """A name-value pair representing a single field in a request or response.

    The kind will dictate metadata placement within a message, for example as a header
    or trailer field in an HTTP request as defined in RFC 9110 Section 5.

    All field names are case insensitive and case-variance must be treated as
    equivalent. Names may be normalized but should be preserved for accuracy during
    transmission.
    """

    name: str
    values: list[str]
    kind: FieldPosition = FieldPosition.HEADER

    def add(self, value: str) -> None:
        """Append a value to a field."""
        ...

    def set(self, values: list[str]) -> None:
        """Overwrite existing field values."""
        ...

    def remove(self, value: str) -> None:
        """Remove all matching entries from list."""
        ...

    def as_string(self, delimiter: str = ", ") -> str:
        """Serialize the ``Field``'s values into a single line string."""
        ...

    def as_tuples(self) -> list[tuple[str, str]]:
        """Get list of ``name``, ``value`` tuples where each tuple represents one
        value."""
        ...
add(value)

Append a value to a field.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def add(self, value: str) -> None:
    """Append a value to a field."""
    ...
as_string(delimiter=', ')

Serialize the Field's values into a single line string.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def as_string(self, delimiter: str = ", ") -> str:
    """Serialize the ``Field``'s values into a single line string."""
    ...
as_tuples()

Get list of name, value tuples where each tuple represents one value.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def as_tuples(self) -> list[tuple[str, str]]:
    """Get list of ``name``, ``value`` tuples where each tuple represents one
    value."""
    ...
remove(value)

Remove all matching entries from list.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def remove(self, value: str) -> None:
    """Remove all matching entries from list."""
    ...
set(values)

Overwrite existing field values.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def set(self, values: list[str]) -> None:
    """Overwrite existing field values."""
    ...

FieldPosition

Bases: Enum

The type of a field.

Defines its placement in a request or response.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
class FieldPosition(Enum):
    """The type of a field.

    Defines its placement in a request or response.
    """

    HEADER = 0
    """Header field.

    In HTTP this is a header as defined in RFC 9110 Section 6.3. Implementations of
    other protocols may use this FieldPosition for similar types of metadata.
    """

    TRAILER = 1
    """Trailer field.

    In HTTP this is a trailer as defined in RFC 9110 Section 6.5. Implementations of
    other protocols may use this FieldPosition for similar types of metadata.
    """
HEADER = 0 class-attribute instance-attribute

Header field.

In HTTP this is a header as defined in RFC 9110 Section 6.3. Implementations of other protocols may use this FieldPosition for similar types of metadata.

TRAILER = 1 class-attribute instance-attribute

Trailer field.

In HTTP this is a trailer as defined in RFC 9110 Section 6.5. Implementations of other protocols may use this FieldPosition for similar types of metadata.

Fields

Bases: Protocol

Protocol agnostic mapping of key-value pair request metadata, such as HTTP fields.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
class Fields(Protocol):
    """Protocol agnostic mapping of key-value pair request metadata, such as HTTP
    fields."""

    # Entries are keyed off the name of a provided Field
    entries: OrderedDict[str, Field]
    encoding: str = "utf-8"

    def set_field(self, field: Field) -> None:
        """Alias for __setitem__ to utilize the field.name for the entry key."""
        ...

    def __setitem__(self, name: str, field: Field) -> None:
        """Set entry for a Field name."""
        ...

    def __getitem__(self, name: str) -> Field:
        """Retrieve Field entry."""
        ...

    def __delitem__(self, name: str) -> None:
        """Delete entry from collection."""
        ...

    def __iter__(self) -> Iterator[Field]:
        """Allow iteration over entries."""
        ...

    def __len__(self) -> int:
        """Get total number of Field entries."""
        ...

    def get_by_type(self, kind: FieldPosition) -> list[Field]:
        """Helper function for retrieving specific types of fields.

        Used to grab all headers or all trailers.
        """
        ...

    def extend(self, other: Fields) -> None:
        """Merges ``entries`` of ``other`` into the current ``entries``.

        For every `Field` in the ``entries`` of ``other``: If the normalized name
        already exists in the current ``entries``, the values from ``other`` are
        appended. Otherwise, the ``Field`` is added to the list of ``entries``.
        """
        ...
__delitem__(name)

Delete entry from collection.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def __delitem__(self, name: str) -> None:
    """Delete entry from collection."""
    ...
__getitem__(name)

Retrieve Field entry.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def __getitem__(self, name: str) -> Field:
    """Retrieve Field entry."""
    ...
__iter__()

Allow iteration over entries.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def __iter__(self) -> Iterator[Field]:
    """Allow iteration over entries."""
    ...
__len__()

Get total number of Field entries.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def __len__(self) -> int:
    """Get total number of Field entries."""
    ...
__setitem__(name, field)

Set entry for a Field name.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def __setitem__(self, name: str, field: Field) -> None:
    """Set entry for a Field name."""
    ...
extend(other)

Merges entries of other into the current entries.

For every Field in the entries of other: If the normalized name already exists in the current entries, the values from other are appended. Otherwise, the Field is added to the list of entries.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def extend(self, other: Fields) -> None:
    """Merges ``entries`` of ``other`` into the current ``entries``.

    For every `Field` in the ``entries`` of ``other``: If the normalized name
    already exists in the current ``entries``, the values from ``other`` are
    appended. Otherwise, the ``Field`` is added to the list of ``entries``.
    """
    ...
get_by_type(kind)

Helper function for retrieving specific types of fields.

Used to grab all headers or all trailers.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def get_by_type(self, kind: FieldPosition) -> list[Field]:
    """Helper function for retrieving specific types of fields.

    Used to grab all headers or all trailers.
    """
    ...
set_field(field)

Alias for setitem to utilize the field.name for the entry key.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def set_field(self, field: Field) -> None:
    """Alias for __setitem__ to utilize the field.name for the entry key."""
    ...

Request

Bases: Protocol

Protocol-agnostic representation of a request.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
class Request(Protocol):
    """Protocol-agnostic representation of a request."""

    destination: URI
    body: AsyncIterable[bytes] | Iterable[bytes] | None

URI

Bases: Protocol

Universal Resource Identifier, target location for a :py:class:Request.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
@runtime_checkable
class URI(Protocol):
    """Universal Resource Identifier, target location for a :py:class:`Request`."""

    scheme: str
    """For example ``http`` or ``mqtts``."""

    username: str | None
    """Username part of the userinfo URI component."""

    password: str | None
    """Password part of the userinfo URI component."""

    host: str
    """The hostname, for example ``amazonaws.com``."""

    port: int | None
    """An explicit port number."""

    path: str | None
    """Path component of the URI."""

    query: str | None
    """Query component of the URI as string."""

    fragment: str | None
    """Part of the URI specification, but may not be transmitted by a client."""

    def build(self) -> str:
        """Construct URI string representation.

        Returns a string of the form
        ``{scheme}://{username}:{password}@{host}:{port}{path}?{query}#{fragment}``
        """
        ...

    @property
    def netloc(self) -> str:
        """Construct netloc string in format ``{username}:{password}@{host}:{port}``"""
        ...
fragment instance-attribute

Part of the URI specification, but may not be transmitted by a client.

host instance-attribute

The hostname, for example amazonaws.com.

netloc property

Construct netloc string in format {username}:{password}@{host}:{port}

password instance-attribute

Password part of the userinfo URI component.

path instance-attribute

Path component of the URI.

port instance-attribute

An explicit port number.

query instance-attribute

Query component of the URI as string.

scheme instance-attribute

For example http or mqtts.

username instance-attribute

Username part of the userinfo URI component.

build()

Construct URI string representation.

Returns a string of the form {scheme}://{username}:{password}@{host}:{port}{path}?{query}#{fragment}

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/http.py
def build(self) -> str:
    """Construct URI string representation.

    Returns a string of the form
    ``{scheme}://{username}:{password}@{host}:{port}{path}?{query}#{fragment}``
    """
    ...

identity

AWSCredentialsIdentity

Bases: Identity, Protocol

AWS Credentials Identity.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/identity.py
@runtime_checkable
class AWSCredentialsIdentity(Identity, Protocol):
    """AWS Credentials Identity."""

    access_key_id: str
    """A unique identifier for an AWS user or role."""

    secret_access_key: str
    """A secret key used in conjunction with the access key ID to authenticate
    programmatic access to AWS services."""

    session_token: str | None = None
    """A temporary token used to specify the current session for the supplied
    credentials."""
access_key_id instance-attribute

A unique identifier for an AWS user or role.

secret_access_key instance-attribute

A secret key used in conjunction with the access key ID to authenticate programmatic access to AWS services.

session_token = None class-attribute instance-attribute

A temporary token used to specify the current session for the supplied credentials.

Identity

Bases: Protocol

An entity available to the client representing who the user is.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/identity.py
@runtime_checkable
class Identity(Protocol):
    """An entity available to the client representing who the user is."""

    expiration: datetime | None = None
    """The expiration time of the identity.

    If time zone is provided, it is updated to UTC. The value must always be in UTC.
    """

    @property
    def is_expired(self) -> bool:
        """Whether the identity is expired."""
        if self.expiration is None:
            return False
        return datetime.now(tz=UTC) >= self.expiration
expiration = None class-attribute instance-attribute

The expiration time of the identity.

If time zone is provided, it is updated to UTC. The value must always be in UTC.

is_expired property

Whether the identity is expired.

io

AsyncByteStream

Bases: Protocol

A file-like object with an async read method.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/io.py
@runtime_checkable
class AsyncByteStream(Protocol):
    """A file-like object with an async read method."""

    async def read(self, size: int | None = -1, /) -> bytes: ...

AsyncSeekable

Bases: Protocol

An async file-like object with seek and tell implemented.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/io.py
@runtime_checkable
class AsyncSeekable(Protocol):
    """An async file-like object with seek and tell implemented."""

    async def seek(self, offset: int, whence: int = 0, /) -> int: ...

    def tell(self) -> int: ...

ByteStream

Bases: Protocol

A file-like object with a read method that returns bytes.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/io.py
@runtime_checkable
class ByteStream(Protocol):
    """A file-like object with a read method that returns bytes."""

    def read(self, size: int | None = -1, /) -> bytes: ...

ConditionallySeekable

Bases: Protocol

A file-like object that is conditionally seekable.

This is separate from Seekable and AsyncSeekable as seekable objects may not define this method.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/io.py
@runtime_checkable
class ConditionallySeekable(Protocol):
    """A file-like object that is conditionally seekable.

    This is separate from Seekable and AsyncSeekable as seekable objects may not define
    this method.
    """

    def seekable(self) -> bool: ...

Seekable

Bases: Protocol

A file-like object with seek and tell implemented.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/interfaces/io.py
@runtime_checkable
class Seekable(Protocol):
    """A file-like object with seek and tell implemented."""

    def seek(self, offset: int, whence: int = 0, /) -> int: ...

    def tell(self) -> int: ...

signers

AsyncSigV4Signer

Request signer for applying the AWS Signature Version 4 algorithm.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
class AsyncSigV4Signer:
    """Request signer for applying the AWS Signature Version 4 algorithm."""

    async def sign(
        self,
        *,
        request: AWSRequest,
        identity: _AWSCredentialsIdentity,
        properties: SigV4SigningProperties,
    ) -> AWSRequest:
        """Generate and apply a SigV4 Signature to a copy of the supplied request.

        :param request: An AWSRequest to sign prior to sending to the service.
        :param identity: A set of credentials representing an AWS Identity or role
            capacity.
        :param properties: SigV4SigningProperties to define signing primitives such as
            the target service, region, and date.
        """
        # Copy and prepopulate any missing values in the
        # supplied request and signing properties.

        await self._validate_identity(identity=identity)
        new_signing_properties = await self._normalize_signing_properties(
            signing_properties=properties
        )
        new_request = await self._generate_new_request(request=request)
        await self._apply_required_fields(
            request=new_request,
            signing_properties=new_signing_properties,
            identity=identity,
        )

        # Construct core signing components
        canonical_request = await self.canonical_request(
            signing_properties=properties,
            request=new_request,
        )
        string_to_sign = await self.string_to_sign(
            canonical_request=canonical_request,
            signing_properties=new_signing_properties,
        )
        signature = await self._signature(
            string_to_sign=string_to_sign,
            secret_key=identity.secret_access_key,
            signing_properties=new_signing_properties,
        )

        signing_fields = await self._normalize_signing_fields(request=new_request)
        credential_scope = await self._scope(signing_properties=new_signing_properties)
        credential = f"{identity.access_key_id}/{credential_scope}"
        authorization = await self.generate_authorization_field(
            credential=credential,
            signed_headers=list(signing_fields.keys()),
            signature=signature,
        )
        new_request.fields.set_field(authorization)
        return new_request

    async def generate_authorization_field(
        self, *, credential: str, signed_headers: list[str], signature: str
    ) -> Field:
        """Generate the `Authorization` field.

        :param credential:
            Credential scope string for generating the Authorization header.
            Defined as:
                <access_key>/<date>/<region>/<service>/<request_type>
        :param signed_headers:
            A list of the field names used in signing.
        :param signature:
            Final hash of the SigV4 signing algorithm generated from the
            canonical request and string to sign.
        """
        signed_headers_str = ";".join(signed_headers)
        auth_str = (
            f"AWS4-HMAC-SHA256 Credential={credential}, "
            f"SignedHeaders={signed_headers_str}, Signature={signature}"
        )
        return Field(name="Authorization", values=[auth_str])

    async def _signature(
        self,
        *,
        string_to_sign: str,
        secret_key: str,
        signing_properties: SigV4SigningProperties,
    ) -> str:
        """Sign the string to sign.

        In SigV4, a signing key is created that is scoped to a specific region and
        service. The date, region, service and resulting signing key are individually
        hashed, then the composite hash is used to sign the string to sign.
        """

        # Components of Signing Key Calculation
        #
        # DateKey              = HMAC-SHA256("AWS4"+"<SecretAccessKey>", "<YYYYMMDD>")
        # DateRegionKey        = HMAC-SHA256(<DateKey>, "<aws-region>")
        # DateRegionServiceKey = HMAC-SHA256(<DateRegionKey>, "<aws-service>")
        # SigningKey = HMAC-SHA256(<DateRegionServiceKey>, "aws4_request")
        assert "date" in signing_properties
        k_date = await self._hash(
            key=f"AWS4{secret_key}".encode(), value=signing_properties["date"][0:8]
        )
        k_region = await self._hash(key=k_date, value=signing_properties["region"])
        k_service = await self._hash(key=k_region, value=signing_properties["service"])
        k_signing = await self._hash(key=k_service, value="aws4_request")
        final_hash = await self._hash(key=k_signing, value=string_to_sign)

        return final_hash.hex()

    async def _hash(self, key: bytes, value: str) -> bytes:
        return hmac.new(key=key, msg=value.encode(), digestmod=sha256).digest()

    async def _validate_identity(self, *, identity: _AWSCredentialsIdentity) -> None:
        """Perform runtime and expiration checks before attempting signing."""
        if not isinstance(identity, _AWSCredentialsIdentity):  # pyright: ignore
            raise ValueError(
                "Received unexpected value for identity parameter. Expected "
                f"AWSCredentialIdentity but received {type(identity)}."
            )
        elif identity.is_expired:
            raise ValueError(
                f"Provided identity expired at {identity.expiration}. Please "
                "refresh the credentials or update the expiration parameter."
            )

    async def _normalize_signing_properties(
        self, *, signing_properties: SigV4SigningProperties
    ) -> SigV4SigningProperties:
        # Create copy of signing properties to avoid mutating the original
        new_signing_properties = SigV4SigningProperties(**signing_properties)
        if "date" not in new_signing_properties:
            date_obj = datetime.datetime.now(datetime.UTC)
            new_signing_properties["date"] = date_obj.strftime(SIGV4_TIMESTAMP_FORMAT)
        return new_signing_properties

    async def _generate_new_request(self, *, request: AWSRequest) -> AWSRequest:
        return deepcopy(request)

    async def _apply_required_fields(
        self,
        *,
        request: AWSRequest,
        signing_properties: SigV4SigningProperties,
        identity: _AWSCredentialsIdentity,
    ) -> None:
        # Apply required X-Amz-Date if neither X-Amz-Date nor Date are present.
        if "Date" not in request.fields and "X-Amz-Date" not in request.fields:
            assert "date" in signing_properties
            request.fields.set_field(
                Field(name="X-Amz-Date", values=[signing_properties["date"]])
            )
        # Apply required X-Amz-Security-Token if token present on identity
        if (
            "X-Amz-Security-Token" not in request.fields
            and identity.session_token is not None
        ):
            request.fields.set_field(
                Field(name="X-Amz-Security-Token", values=[identity.session_token])
            )

    async def canonical_request(
        self, *, signing_properties: SigV4SigningProperties, request: AWSRequest
    ) -> str:
        """The canonical request is a standardized string laying out the components used
        in the SigV4 signing algorithm. This is useful to quickly compare inputs to find
        signature mismatches and unintended variances.

        The SigV4 specification defines the canonical request to be:
            <HTTPMethod>\n
            <CanonicalURI>\n
            <CanonicalQueryString>\n
            <CanonicalHeaders>\n
            <SignedHeaders>\n
            <HashedPayload>

        :param signing_properties:
            SigV4SigningProperties to define signing primitives such as
            the target service, region, and date.
        :param request:
            An AWSRequest to use for generating a SigV4 signature.
        """
        # We generate the payload first to ensure any field modifications
        # are in place before choosing the canonical fields.
        canonical_payload = await self._format_canonical_payload(
            request=request, signing_properties=signing_properties
        )
        canonical_path = await self._format_canonical_path(
            path=request.destination.path, signing_properties=signing_properties
        )
        canonical_query = await self._format_canonical_query(
            query=request.destination.query
        )
        normalized_fields = await self._normalize_signing_fields(request=request)
        canonical_fields = await self._format_canonical_fields(fields=normalized_fields)
        return (
            f"{request.method.upper()}\n"
            f"{canonical_path}\n"
            f"{canonical_query}\n"
            f"{canonical_fields}\n"
            f"{';'.join(normalized_fields)}\n"
            f"{canonical_payload}"
        )

    async def string_to_sign(
        self,
        *,
        canonical_request: str,
        signing_properties: SigV4SigningProperties,
    ) -> str:
        """The string to sign is the second step of our signing algorithm which
        concatenates the formal identifier of our signing algorithm, the signing
        DateTime, the scope of our credentials, and a hash of our previously generated
        canonical request. This is another checkpoint that can be used to ensure we're
        constructing our signature as intended.

        The SigV4 specification defines the string to sign as:
            Algorithm \n
            RequestDateTime \n
            CredentialScope  \n
            HashedCanonicalRequest

        :param canonical_request:
            String generated from the `canonical_request` method.
        :param signing_properties:
            SigV4SigningProperties to define signing primitives such as
            the target service, region, and date.
        """
        date = signing_properties.get("date")
        if date is None:
            raise MissingExpectedParameterException(
                "Cannot generate string_to_sign without a valid date "
                f"in your signing_properties. Current value: {date}"
            )
        scope = await self._scope(signing_properties=signing_properties)
        return (
            "AWS4-HMAC-SHA256\n"
            f"{date}\n"
            f"{scope}\n"
            f"{sha256(canonical_request.encode()).hexdigest()}"
        )

    async def _scope(self, signing_properties: SigV4SigningProperties) -> str:
        assert "date" in signing_properties
        formatted_date = signing_properties["date"][0:8]
        region = signing_properties["region"]
        service = signing_properties["service"]
        # Scope format: <YYYYMMDD>/<AWS Region>/<AWS Service>/aws4_request
        return f"{formatted_date}/{region}/{service}/aws4_request"

    async def _format_canonical_path(
        self, *, path: str | None, signing_properties: SigV4SigningProperties
    ) -> str:
        if path is None:
            path = "/"

        if signing_properties.get("uri_encode_path", True):
            normalized_path = _remove_dot_segments(path)
            return quote(string=normalized_path, safe="/")
        else:
            return _remove_dot_segments(path, remove_consecutive_slashes=False)

    async def _format_canonical_query(self, *, query: str | None) -> str:
        if query is None:
            return ""

        query_params = parse_qsl(qs=query)
        query_parts = (
            (quote(string=key, safe=""), quote(string=value, safe=""))
            for key, value in query_params
        )
        # key-value pairs must be in sorted order for their encoded forms.
        return "&".join(f"{key}={value}" for key, value in sorted(query_parts))

    async def _normalize_signing_fields(self, *, request: AWSRequest) -> dict[str, str]:
        normalized_fields = {
            field.name.lower(): field.as_string()
            for field in request.fields
            if self._is_signable_header(field.name.lower())
        }
        if "host" not in normalized_fields:
            normalized_fields["host"] = await self._normalize_host_field(
                uri=request.destination  # type: ignore - TODO(pyright)
            )

        return dict(sorted(normalized_fields.items()))

    def _is_signable_header(self, field_name: str):
        if field_name in HEADERS_EXCLUDED_FROM_SIGNING:
            return False
        return True

    async def _normalize_host_field(self, *, uri: URI) -> str:
        if uri.port is not None and DEFAULT_PORTS.get(uri.scheme) == uri.port:
            uri_dict = uri.to_dict()
            uri_dict.update({"port": None})
            uri = URI(**uri_dict)
        return uri.netloc

    async def _format_canonical_fields(self, *, fields: dict[str, str]) -> str:
        return "".join(
            f"{key}:{' '.join(value.split())}\n" for key, value in fields.items()
        )

    async def _should_sha256_sign_payload(
        self,
        *,
        request: AWSRequest,
        signing_properties: SigV4SigningProperties,
    ) -> bool:
        # All insecure connections should be signed
        if request.destination.scheme != "https":
            return True

        return signing_properties.get("payload_signing_enabled", True)

    async def _format_canonical_payload(
        self,
        *,
        request: AWSRequest,
        signing_properties: SigV4SigningProperties,
    ) -> str:
        if (
            "X-Amz-Content-SHA256" in request.fields
            and len(request.fields["X-Amz-Content-SHA256"].values) == 1
        ):
            return request.fields["X-Amz-Content-SHA256"].values[0]

        if self._is_event_stream(request=request):
            request.fields.set_field(
                Field(name="X-Amz-Content-SHA256", values=[EVENT_STREAM_HASH])
            )
            return EVENT_STREAM_HASH

        payload_hash = await self._compute_payload_hash(
            request=request, signing_properties=signing_properties
        )
        if signing_properties.get("content_checksum_enabled", False):
            request.fields.set_field(
                Field(name="X-Amz-Content-SHA256", values=[payload_hash])
            )
        return payload_hash

    async def _compute_payload_hash(
        self, *, request: AWSRequest, signing_properties: SigV4SigningProperties
    ) -> str:
        if not await self._should_sha256_sign_payload(
            request=request, signing_properties=signing_properties
        ):
            return UNSIGNED_PAYLOAD

        body = request.body

        if body is None:
            return EMPTY_SHA256_HASH

        if not isinstance(body, AsyncIterable):
            raise TypeError(
                "A sync body was attached to an asynchronous signer. Please use "
                "SigV4Signer for sync AWSRequests or ensure your body is "
                "of type AsyncIterable[bytes]."
            )
        warnings.warn(
            "Payload signing is enabled. This may result in "
            "decreased performance for large request bodies.",
            AWSSDKWarning,
        )

        checksum = sha256()
        if self._seekable(body):
            position = body.tell()
            async for chunk in body:  # type: ignore
                checksum.update(chunk)  # type: ignore
            await body.seek(position)
        else:
            buffer = io.BytesIO()
            async for chunk in body:
                buffer.write(chunk)
                checksum.update(chunk)
            buffer.seek(0)
            request.body = AsyncBytesReader(buffer)
        return checksum.hexdigest()

    def _is_event_stream(self, request: AWSRequest) -> bool:
        if "Content-Type" not in request.fields:
            return False
        content_type = request.fields["Content-Type"].as_string()
        return content_type == EVENT_STREAM_CONTENT_TYPE

    def _seekable(self, body: AsyncIterable[bytes]) -> TypeGuard[AsyncSeekable]:
        if isinstance(body, ConditionallySeekable):
            return body.seekable()

        return isinstance(body, AsyncSeekable) and iscoroutinefunction(body.seek)

canonical_request(*, signing_properties, request) async

The canonical request is a standardized string laying out the components used in the SigV4 signing algorithm. This is useful to quickly compare inputs to find signature mismatches and unintended variances.

The SigV4 specification defines the canonical request to be

:param signing_properties: SigV4SigningProperties to define signing primitives such as the target service, region, and date. :param request: An AWSRequest to use for generating a SigV4 signature.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
async def canonical_request(
    self, *, signing_properties: SigV4SigningProperties, request: AWSRequest
) -> str:
    """The canonical request is a standardized string laying out the components used
    in the SigV4 signing algorithm. This is useful to quickly compare inputs to find
    signature mismatches and unintended variances.

    The SigV4 specification defines the canonical request to be:
        <HTTPMethod>\n
        <CanonicalURI>\n
        <CanonicalQueryString>\n
        <CanonicalHeaders>\n
        <SignedHeaders>\n
        <HashedPayload>

    :param signing_properties:
        SigV4SigningProperties to define signing primitives such as
        the target service, region, and date.
    :param request:
        An AWSRequest to use for generating a SigV4 signature.
    """
    # We generate the payload first to ensure any field modifications
    # are in place before choosing the canonical fields.
    canonical_payload = await self._format_canonical_payload(
        request=request, signing_properties=signing_properties
    )
    canonical_path = await self._format_canonical_path(
        path=request.destination.path, signing_properties=signing_properties
    )
    canonical_query = await self._format_canonical_query(
        query=request.destination.query
    )
    normalized_fields = await self._normalize_signing_fields(request=request)
    canonical_fields = await self._format_canonical_fields(fields=normalized_fields)
    return (
        f"{request.method.upper()}\n"
        f"{canonical_path}\n"
        f"{canonical_query}\n"
        f"{canonical_fields}\n"
        f"{';'.join(normalized_fields)}\n"
        f"{canonical_payload}"
    )

generate_authorization_field(*, credential, signed_headers, signature) async

Generate the Authorization field.

:param credential: Credential scope string for generating the Authorization header. Defined as: //// :param signed_headers: A list of the field names used in signing. :param signature: Final hash of the SigV4 signing algorithm generated from the canonical request and string to sign.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
async def generate_authorization_field(
    self, *, credential: str, signed_headers: list[str], signature: str
) -> Field:
    """Generate the `Authorization` field.

    :param credential:
        Credential scope string for generating the Authorization header.
        Defined as:
            <access_key>/<date>/<region>/<service>/<request_type>
    :param signed_headers:
        A list of the field names used in signing.
    :param signature:
        Final hash of the SigV4 signing algorithm generated from the
        canonical request and string to sign.
    """
    signed_headers_str = ";".join(signed_headers)
    auth_str = (
        f"AWS4-HMAC-SHA256 Credential={credential}, "
        f"SignedHeaders={signed_headers_str}, Signature={signature}"
    )
    return Field(name="Authorization", values=[auth_str])

sign(*, request, identity, properties) async

Generate and apply a SigV4 Signature to a copy of the supplied request.

:param request: An AWSRequest to sign prior to sending to the service. :param identity: A set of credentials representing an AWS Identity or role capacity. :param properties: SigV4SigningProperties to define signing primitives such as the target service, region, and date.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
async def sign(
    self,
    *,
    request: AWSRequest,
    identity: _AWSCredentialsIdentity,
    properties: SigV4SigningProperties,
) -> AWSRequest:
    """Generate and apply a SigV4 Signature to a copy of the supplied request.

    :param request: An AWSRequest to sign prior to sending to the service.
    :param identity: A set of credentials representing an AWS Identity or role
        capacity.
    :param properties: SigV4SigningProperties to define signing primitives such as
        the target service, region, and date.
    """
    # Copy and prepopulate any missing values in the
    # supplied request and signing properties.

    await self._validate_identity(identity=identity)
    new_signing_properties = await self._normalize_signing_properties(
        signing_properties=properties
    )
    new_request = await self._generate_new_request(request=request)
    await self._apply_required_fields(
        request=new_request,
        signing_properties=new_signing_properties,
        identity=identity,
    )

    # Construct core signing components
    canonical_request = await self.canonical_request(
        signing_properties=properties,
        request=new_request,
    )
    string_to_sign = await self.string_to_sign(
        canonical_request=canonical_request,
        signing_properties=new_signing_properties,
    )
    signature = await self._signature(
        string_to_sign=string_to_sign,
        secret_key=identity.secret_access_key,
        signing_properties=new_signing_properties,
    )

    signing_fields = await self._normalize_signing_fields(request=new_request)
    credential_scope = await self._scope(signing_properties=new_signing_properties)
    credential = f"{identity.access_key_id}/{credential_scope}"
    authorization = await self.generate_authorization_field(
        credential=credential,
        signed_headers=list(signing_fields.keys()),
        signature=signature,
    )
    new_request.fields.set_field(authorization)
    return new_request

string_to_sign(*, canonical_request, signing_properties) async

The string to sign is the second step of our signing algorithm which concatenates the formal identifier of our signing algorithm, the signing DateTime, the scope of our credentials, and a hash of our previously generated canonical request. This is another checkpoint that can be used to ensure we're constructing our signature as intended.

The SigV4 specification defines the string to sign as

Algorithm

RequestDateTime

CredentialScope

HashedCanonicalRequest

:param canonical_request: String generated from the canonical_request method. :param signing_properties: SigV4SigningProperties to define signing primitives such as the target service, region, and date.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
async def string_to_sign(
    self,
    *,
    canonical_request: str,
    signing_properties: SigV4SigningProperties,
) -> str:
    """The string to sign is the second step of our signing algorithm which
    concatenates the formal identifier of our signing algorithm, the signing
    DateTime, the scope of our credentials, and a hash of our previously generated
    canonical request. This is another checkpoint that can be used to ensure we're
    constructing our signature as intended.

    The SigV4 specification defines the string to sign as:
        Algorithm \n
        RequestDateTime \n
        CredentialScope  \n
        HashedCanonicalRequest

    :param canonical_request:
        String generated from the `canonical_request` method.
    :param signing_properties:
        SigV4SigningProperties to define signing primitives such as
        the target service, region, and date.
    """
    date = signing_properties.get("date")
    if date is None:
        raise MissingExpectedParameterException(
            "Cannot generate string_to_sign without a valid date "
            f"in your signing_properties. Current value: {date}"
        )
    scope = await self._scope(signing_properties=signing_properties)
    return (
        "AWS4-HMAC-SHA256\n"
        f"{date}\n"
        f"{scope}\n"
        f"{sha256(canonical_request.encode()).hexdigest()}"
    )

SigV4Signer

Request signer for applying the AWS Signature Version 4 algorithm.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
class SigV4Signer:
    """Request signer for applying the AWS Signature Version 4 algorithm."""

    def sign(
        self,
        *,
        request: AWSRequest,
        identity: _AWSCredentialsIdentity,
        properties: SigV4SigningProperties,
    ) -> AWSRequest:
        """Generate and apply a SigV4 Signature to a copy of the supplied request.

        :param request: An AWSRequest to sign prior to sending to the service.
        :param identity: A set of credentials representing an AWS Identity or role
            capacity.
        :param properties: SigV4SigningProperties to define signing primitives such as
            the target service, region, and date.
        """
        # Copy and prepopulate any missing values in the
        # supplied request and signing properties.
        self._validate_identity(identity=identity)
        new_signing_properties = self._normalize_signing_properties(
            signing_properties=properties
        )
        assert "date" in new_signing_properties

        new_request = self._generate_new_request(request=request)
        self._apply_required_fields(
            request=new_request,
            signing_properties=new_signing_properties,
            identity=identity,
        )

        # Construct core signing components
        canonical_request = self.canonical_request(
            signing_properties=new_signing_properties,
            request=new_request,
        )
        string_to_sign = self.string_to_sign(
            canonical_request=canonical_request,
            signing_properties=new_signing_properties,
        )
        signature = self._signature(
            string_to_sign=string_to_sign,
            secret_key=identity.secret_access_key,
            signing_properties=new_signing_properties,
        )

        signing_fields = self._normalize_signing_fields(request=new_request)
        credential_scope = self._scope(signing_properties=new_signing_properties)
        credential = f"{identity.access_key_id}/{credential_scope}"
        authorization = self.generate_authorization_field(
            credential=credential,
            signed_headers=list(signing_fields.keys()),
            signature=signature,
        )
        new_request.fields.set_field(authorization)

        return new_request

    def generate_authorization_field(
        self, *, credential: str, signed_headers: list[str], signature: str
    ) -> Field:
        """Generate the `Authorization` field.

        :param credential:
            Credential scope string for generating the Authorization header.
            Defined as:
                <access_key>/<date>/<region>/<service>/<request_type>
        :param signed_headers:
            A list of the field names used in signing.
        :param signature:
            Final hash of the SigV4 signing algorithm generated from the
            canonical request and string to sign.
        """
        signed_headers_str = ";".join(signed_headers)
        auth_str = (
            f"AWS4-HMAC-SHA256 Credential={credential}, "
            f"SignedHeaders={signed_headers_str}, Signature={signature}"
        )
        return Field(name="Authorization", values=[auth_str])

    def _signature(
        self,
        *,
        string_to_sign: str,
        secret_key: str,
        signing_properties: SigV4SigningProperties,
    ) -> str:
        """Sign the string to sign.

        In SigV4, a signing key is created that is scoped to a specific region and
        service. The date, region, service and resulting signing key are individually
        hashed, then the composite hash is used to sign the string to sign.
        """

        # Components of Signing Key Calculation
        #
        # DateKey              = HMAC-SHA256("AWS4"+"<SecretAccessKey>", "<YYYYMMDD>")
        # DateRegionKey        = HMAC-SHA256(<DateKey>, "<aws-region>")
        # DateRegionServiceKey = HMAC-SHA256(<DateRegionKey>, "<aws-service>")
        # SigningKey = HMAC-SHA256(<DateRegionServiceKey>, "aws4_request")
        assert "date" in signing_properties
        k_date = self._hash(
            key=f"AWS4{secret_key}".encode(), value=signing_properties["date"][0:8]
        )
        k_region = self._hash(key=k_date, value=signing_properties["region"])
        k_service = self._hash(key=k_region, value=signing_properties["service"])
        k_signing = self._hash(key=k_service, value="aws4_request")

        return self._hash(key=k_signing, value=string_to_sign).hex()

    def _hash(self, key: bytes, value: str) -> bytes:
        return hmac.new(key=key, msg=value.encode(), digestmod=sha256).digest()

    def _validate_identity(self, *, identity: _AWSCredentialsIdentity) -> None:
        """Perform runtime and expiration checks before attempting signing."""
        if not isinstance(identity, _AWSCredentialsIdentity):  # pyright: ignore
            raise ValueError(
                "Received unexpected value for identity parameter. Expected "
                f"AWSCredentialIdentity but received {type(identity)}."
            )
        elif identity.is_expired:
            raise ValueError(
                f"Provided identity expired at {identity.expiration}. Please "
                "refresh the credentials or update the expiration parameter."
            )

    def _normalize_signing_properties(
        self, *, signing_properties: SigV4SigningProperties
    ) -> SigV4SigningProperties:
        # Create copy of signing properties to avoid mutating the original
        new_signing_properties = SigV4SigningProperties(**signing_properties)
        if "date" not in new_signing_properties:
            date_obj = datetime.datetime.now(datetime.UTC)
            new_signing_properties["date"] = date_obj.strftime(SIGV4_TIMESTAMP_FORMAT)
        return new_signing_properties

    def _generate_new_request(self, *, request: AWSRequest) -> AWSRequest:
        return deepcopy(request)

    def _apply_required_fields(
        self,
        *,
        request: AWSRequest,
        signing_properties: SigV4SigningProperties,
        identity: _AWSCredentialsIdentity,
    ) -> None:
        # Apply required X-Amz-Date if neither X-Amz-Date nor Date are present.
        if "Date" not in request.fields and "X-Amz-Date" not in request.fields:
            assert "date" in signing_properties
            request.fields.set_field(
                Field(name="X-Amz-Date", values=[signing_properties["date"]])
            )
        # Apply required X-Amz-Security-Token if token present on identity
        if (
            "X-Amz-Security-Token" not in request.fields
            and identity.session_token is not None
        ):
            request.fields.set_field(
                Field(name="X-Amz-Security-Token", values=[identity.session_token])
            )

    def canonical_request(
        self, *, signing_properties: SigV4SigningProperties, request: AWSRequest
    ) -> str:
        """The canonical request is a standardized string laying out the components used
        in the SigV4 signing algorithm. This is useful to quickly compare inputs to find
        signature mismatches and unintended variances.

        The SigV4 specification defines the canonical request to be:
            <HTTPMethod>\n
            <CanonicalURI>\n
            <CanonicalQueryString>\n
            <CanonicalHeaders>\n
            <SignedHeaders>\n
            <HashedPayload>

        :param signing_properties:
            SigV4SigningProperties to define signing primitives such as
            the target service, region, and date.
        :param request:
            An AWSRequest to use for generating a SigV4 signature.
        """
        # We generate the payload first to ensure any field modifications
        # are in place before choosing the canonical fields.
        canonical_payload = self._format_canonical_payload(
            request=request, signing_properties=signing_properties
        )
        canonical_path = self._format_canonical_path(
            path=request.destination.path, signing_properties=signing_properties
        )
        canonical_query = self._format_canonical_query(query=request.destination.query)
        normalized_fields = self._normalize_signing_fields(request=request)
        canonical_fields = self._format_canonical_fields(fields=normalized_fields)
        return (
            f"{request.method.upper()}\n"
            f"{canonical_path}\n"
            f"{canonical_query}\n"
            f"{canonical_fields}\n"
            f"{';'.join(normalized_fields)}\n"
            f"{canonical_payload}"
        )

    def string_to_sign(
        self,
        *,
        canonical_request: str,
        signing_properties: SigV4SigningProperties,
    ) -> str:
        """The string to sign is the second step of our signing algorithm which
        concatenates the formal identifier of our signing algorithm, the signing
        DateTime, the scope of our credentials, and a hash of our previously generated
        canonical request. This is another checkpoint that can be used to ensure we're
        constructing our signature as intended.

        The SigV4 specification defines the string to sign as:
            Algorithm \n
            RequestDateTime \n
            CredentialScope  \n
            HashedCanonicalRequest

        :param canonical_request:
            String generated from the `canonical_request` method.
        :param signing_properties:
            SigV4SigningProperties to define signing primitives such as
            the target service, region, and date.
        """
        date = signing_properties.get("date")
        if date is None:
            raise MissingExpectedParameterException(
                "Cannot generate string_to_sign without a valid date "
                f"in your signing_properties. Current value: {date}"
            )
        return (
            "AWS4-HMAC-SHA256\n"
            f"{date}\n"
            f"{self._scope(signing_properties=signing_properties)}\n"
            f"{sha256(canonical_request.encode()).hexdigest()}"
        )

    def _scope(self, signing_properties: SigV4SigningProperties) -> str:
        assert "date" in signing_properties
        formatted_date = signing_properties["date"][0:8]
        region = signing_properties["region"]
        service = signing_properties["service"]
        # Scope format: <YYYYMMDD>/<AWS Region>/<AWS Service>/aws4_request
        return f"{formatted_date}/{region}/{service}/aws4_request"

    def _format_canonical_path(
        self, *, path: str | None, signing_properties: SigV4SigningProperties
    ) -> str:
        if path is None:
            path = "/"

        if signing_properties.get("uri_encode_path", True):
            normalized_path = _remove_dot_segments(path)
            return quote(string=normalized_path, safe="/")
        else:
            return _remove_dot_segments(path, remove_consecutive_slashes=False)

    def _format_canonical_query(self, *, query: str | None) -> str:
        if query is None:
            return ""

        query_params = parse_qsl(qs=query)
        query_parts = (
            (quote(string=key, safe=""), quote(string=value, safe=""))
            for key, value in query_params
        )
        # key-value pairs must be in sorted order for their encoded forms.
        return "&".join(f"{key}={value}" for key, value in sorted(query_parts))

    def _normalize_signing_fields(self, *, request: AWSRequest) -> dict[str, str]:
        normalized_fields = {
            field.name.lower(): field.as_string()
            for field in request.fields
            if self._is_signable_header(field.name.lower())
        }
        if "host" not in normalized_fields:
            normalized_fields["host"] = self._normalize_host_field(
                uri=request.destination  # type: ignore - TODO(pyright)
            )

        return dict(sorted(normalized_fields.items()))

    def _is_signable_header(self, field_name: str):
        if field_name in HEADERS_EXCLUDED_FROM_SIGNING:
            return False
        return True

    def _normalize_host_field(self, *, uri: URI) -> str:
        if uri.port is not None and DEFAULT_PORTS.get(uri.scheme) == uri.port:
            uri_dict = uri.to_dict()
            uri_dict.update({"port": None})
            uri = URI(**uri_dict)
        return uri.netloc

    def _format_canonical_fields(self, *, fields: dict[str, str]) -> str:
        return "".join(
            f"{key}:{' '.join(value.split())}\n" for key, value in fields.items()
        )

    def _should_sha256_sign_payload(
        self,
        *,
        request: AWSRequest,
        signing_properties: SigV4SigningProperties,
    ) -> bool:
        # All insecure connections should be signed
        if request.destination.scheme != "https":
            return True

        return signing_properties.get("payload_signing_enabled", True)

    def _format_canonical_payload(
        self,
        *,
        request: AWSRequest,
        signing_properties: SigV4SigningProperties,
    ) -> str:
        payload_hash = self._compute_payload_hash(
            request=request, signing_properties=signing_properties
        )
        if signing_properties.get("content_checksum_enabled", False):
            request.fields.set_field(
                Field(name="X-Amz-Content-SHA256", values=[payload_hash])
            )
        return payload_hash

    def _compute_payload_hash(
        self, *, request: AWSRequest, signing_properties: SigV4SigningProperties
    ) -> str:
        if not self._should_sha256_sign_payload(
            request=request, signing_properties=signing_properties
        ):
            return UNSIGNED_PAYLOAD

        body = request.body

        if body is None:
            return EMPTY_SHA256_HASH

        if not isinstance(body, Iterable):
            raise TypeError(
                "An async body was attached to a synchronous signer. Please use "
                "AsyncSigV4Signer for async AWSRequests or ensure your body is "
                "of type Iterable[bytes]."
            )

        warnings.warn(
            "Payload signing is enabled. This may result in "
            "decreased performance for large request bodies.",
            AWSSDKWarning,
        )

        checksum = sha256()
        if self._seekable(body):
            position = body.tell()
            for chunk in body:  # type: ignore
                checksum.update(chunk)  # type: ignore
            body.seek(position)
        else:
            buffer = io.BytesIO()
            for chunk in body:
                buffer.write(chunk)
                checksum.update(chunk)
            buffer.seek(0)
            request.body = buffer
        return checksum.hexdigest()

    def _seekable(self, body: Iterable[bytes]) -> TypeGuard[Seekable]:
        if isinstance(body, ConditionallySeekable):
            return body.seekable()

        return isinstance(body, Seekable)

canonical_request(*, signing_properties, request)

The canonical request is a standardized string laying out the components used in the SigV4 signing algorithm. This is useful to quickly compare inputs to find signature mismatches and unintended variances.

The SigV4 specification defines the canonical request to be

:param signing_properties: SigV4SigningProperties to define signing primitives such as the target service, region, and date. :param request: An AWSRequest to use for generating a SigV4 signature.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
def canonical_request(
    self, *, signing_properties: SigV4SigningProperties, request: AWSRequest
) -> str:
    """The canonical request is a standardized string laying out the components used
    in the SigV4 signing algorithm. This is useful to quickly compare inputs to find
    signature mismatches and unintended variances.

    The SigV4 specification defines the canonical request to be:
        <HTTPMethod>\n
        <CanonicalURI>\n
        <CanonicalQueryString>\n
        <CanonicalHeaders>\n
        <SignedHeaders>\n
        <HashedPayload>

    :param signing_properties:
        SigV4SigningProperties to define signing primitives such as
        the target service, region, and date.
    :param request:
        An AWSRequest to use for generating a SigV4 signature.
    """
    # We generate the payload first to ensure any field modifications
    # are in place before choosing the canonical fields.
    canonical_payload = self._format_canonical_payload(
        request=request, signing_properties=signing_properties
    )
    canonical_path = self._format_canonical_path(
        path=request.destination.path, signing_properties=signing_properties
    )
    canonical_query = self._format_canonical_query(query=request.destination.query)
    normalized_fields = self._normalize_signing_fields(request=request)
    canonical_fields = self._format_canonical_fields(fields=normalized_fields)
    return (
        f"{request.method.upper()}\n"
        f"{canonical_path}\n"
        f"{canonical_query}\n"
        f"{canonical_fields}\n"
        f"{';'.join(normalized_fields)}\n"
        f"{canonical_payload}"
    )

generate_authorization_field(*, credential, signed_headers, signature)

Generate the Authorization field.

:param credential: Credential scope string for generating the Authorization header. Defined as: //// :param signed_headers: A list of the field names used in signing. :param signature: Final hash of the SigV4 signing algorithm generated from the canonical request and string to sign.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
def generate_authorization_field(
    self, *, credential: str, signed_headers: list[str], signature: str
) -> Field:
    """Generate the `Authorization` field.

    :param credential:
        Credential scope string for generating the Authorization header.
        Defined as:
            <access_key>/<date>/<region>/<service>/<request_type>
    :param signed_headers:
        A list of the field names used in signing.
    :param signature:
        Final hash of the SigV4 signing algorithm generated from the
        canonical request and string to sign.
    """
    signed_headers_str = ";".join(signed_headers)
    auth_str = (
        f"AWS4-HMAC-SHA256 Credential={credential}, "
        f"SignedHeaders={signed_headers_str}, Signature={signature}"
    )
    return Field(name="Authorization", values=[auth_str])

sign(*, request, identity, properties)

Generate and apply a SigV4 Signature to a copy of the supplied request.

:param request: An AWSRequest to sign prior to sending to the service. :param identity: A set of credentials representing an AWS Identity or role capacity. :param properties: SigV4SigningProperties to define signing primitives such as the target service, region, and date.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
def sign(
    self,
    *,
    request: AWSRequest,
    identity: _AWSCredentialsIdentity,
    properties: SigV4SigningProperties,
) -> AWSRequest:
    """Generate and apply a SigV4 Signature to a copy of the supplied request.

    :param request: An AWSRequest to sign prior to sending to the service.
    :param identity: A set of credentials representing an AWS Identity or role
        capacity.
    :param properties: SigV4SigningProperties to define signing primitives such as
        the target service, region, and date.
    """
    # Copy and prepopulate any missing values in the
    # supplied request and signing properties.
    self._validate_identity(identity=identity)
    new_signing_properties = self._normalize_signing_properties(
        signing_properties=properties
    )
    assert "date" in new_signing_properties

    new_request = self._generate_new_request(request=request)
    self._apply_required_fields(
        request=new_request,
        signing_properties=new_signing_properties,
        identity=identity,
    )

    # Construct core signing components
    canonical_request = self.canonical_request(
        signing_properties=new_signing_properties,
        request=new_request,
    )
    string_to_sign = self.string_to_sign(
        canonical_request=canonical_request,
        signing_properties=new_signing_properties,
    )
    signature = self._signature(
        string_to_sign=string_to_sign,
        secret_key=identity.secret_access_key,
        signing_properties=new_signing_properties,
    )

    signing_fields = self._normalize_signing_fields(request=new_request)
    credential_scope = self._scope(signing_properties=new_signing_properties)
    credential = f"{identity.access_key_id}/{credential_scope}"
    authorization = self.generate_authorization_field(
        credential=credential,
        signed_headers=list(signing_fields.keys()),
        signature=signature,
    )
    new_request.fields.set_field(authorization)

    return new_request

string_to_sign(*, canonical_request, signing_properties)

The string to sign is the second step of our signing algorithm which concatenates the formal identifier of our signing algorithm, the signing DateTime, the scope of our credentials, and a hash of our previously generated canonical request. This is another checkpoint that can be used to ensure we're constructing our signature as intended.

The SigV4 specification defines the string to sign as

Algorithm

RequestDateTime

CredentialScope

HashedCanonicalRequest

:param canonical_request: String generated from the canonical_request method. :param signing_properties: SigV4SigningProperties to define signing primitives such as the target service, region, and date.

Source code in packages/aws-sdk-signers/src/aws_sdk_signers/signers.py
def string_to_sign(
    self,
    *,
    canonical_request: str,
    signing_properties: SigV4SigningProperties,
) -> str:
    """The string to sign is the second step of our signing algorithm which
    concatenates the formal identifier of our signing algorithm, the signing
    DateTime, the scope of our credentials, and a hash of our previously generated
    canonical request. This is another checkpoint that can be used to ensure we're
    constructing our signature as intended.

    The SigV4 specification defines the string to sign as:
        Algorithm \n
        RequestDateTime \n
        CredentialScope  \n
        HashedCanonicalRequest

    :param canonical_request:
        String generated from the `canonical_request` method.
    :param signing_properties:
        SigV4SigningProperties to define signing primitives such as
        the target service, region, and date.
    """
    date = signing_properties.get("date")
    if date is None:
        raise MissingExpectedParameterException(
            "Cannot generate string_to_sign without a valid date "
            f"in your signing_properties. Current value: {date}"
        )
    return (
        "AWS4-HMAC-SHA256\n"
        f"{date}\n"
        f"{self._scope(signing_properties=signing_properties)}\n"
        f"{sha256(canonical_request.encode()).hexdigest()}"
    )