diff --git a/packages/google-auth/google/auth/aio/transport/mtls.py b/packages/google-auth/google/auth/aio/transport/mtls.py index b85d30b53485..aee96ccefacb 100644 --- a/packages/google-auth/google/auth/aio/transport/mtls.py +++ b/packages/google-auth/google/auth/aio/transport/mtls.py @@ -25,34 +25,12 @@ from typing import Optional from google.auth import exceptions -import google.auth.transport._mtls_helper import google.auth.transport.mtls +from google.auth.transport._mtls_helper import secure_cert_key_paths _LOGGER = logging.getLogger(__name__) -@contextlib.contextmanager -def _create_temp_file(content: bytes): - """Creates a temporary file with the given content. - - Args: - content (bytes): The content to write to the file. - - Yields: - str: The path to the temporary file. - """ - # Create a temporary file that is readable only by the owner. - fd, file_path = tempfile.mkstemp() - try: - with os.fdopen(fd, "wb") as f: - f.write(content) - yield file_path - finally: - # Securely delete the file after use. - if os.path.exists(file_path): - os.remove(file_path) - - def make_client_cert_ssl_context( cert_bytes: bytes, key_bytes: bytes, passphrase: Optional[bytes] = None ) -> ssl.SSLContext: @@ -71,13 +49,15 @@ def make_client_cert_ssl_context( Raises: google.auth.exceptions.TransportError: If there is an error loading the certificate. """ - with _create_temp_file(cert_bytes) as cert_path, _create_temp_file( - key_bytes - ) as key_path: + with secure_cert_key_paths(cert_bytes, key_bytes, passphrase=passphrase) as ( + cert_path, + key_path, + passphrase_val, + ): try: context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) context.load_cert_chain( - certfile=cert_path, keyfile=key_path, password=passphrase + certfile=cert_path, keyfile=key_path, password=passphrase_val ) return context except (ssl.SSLError, OSError, IOError, ValueError, RuntimeError) as exc: diff --git a/packages/google-auth/google/auth/identity_pool.py b/packages/google-auth/google/auth/identity_pool.py index 30819ef0485a..ca13a2b9f927 100644 --- a/packages/google-auth/google/auth/identity_pool.py +++ b/packages/google-auth/google/auth/identity_pool.py @@ -152,13 +152,9 @@ def __init__(self, trust_chain_path, leaf_cert_callback): @_helpers.copy_docstring(SubjectTokenSupplier) def get_subject_token(self, context, request): - # Import OpennSSL inline because it is an extra import only required by customers - # using mTLS. - from OpenSSL import crypto + from cryptography import x509 - leaf_cert = crypto.load_certificate( - crypto.FILETYPE_PEM, self._leaf_cert_callback() - ) + leaf_cert = x509.load_pem_x509_certificate(self._leaf_cert_callback()) trust_chain = self._read_trust_chain() cert_chain = [] @@ -184,9 +180,7 @@ def get_subject_token(self, context, request): return json.dumps(cert_chain) def _read_trust_chain(self): - # Import OpennSSL inline because it is an extra import only required by customers - # using mTLS. - from OpenSSL import crypto + from cryptography import x509 certificate_trust_chain = [] # If no trust chain path was provided, return an empty list. @@ -204,9 +198,7 @@ def _read_trust_chain(self): cert_data = b"-----BEGIN CERTIFICATE-----" + cert_block try: # Load each certificate and add it to the trust chain. - cert = crypto.load_certificate( - crypto.FILETYPE_PEM, cert_data - ) + cert = x509.load_pem_x509_certificate(cert_data) certificate_trust_chain.append(cert) except Exception as e: raise exceptions.RefreshError( @@ -221,13 +213,11 @@ def _read_trust_chain(self): ) def _encode_cert(cert): - # Import OpennSSL inline because it is an extra import only required by customers - # using mTLS. - from OpenSSL import crypto + from cryptography.hazmat.primitives import serialization - return base64.b64encode( - crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) - ).decode("utf-8") + return base64.b64encode(cert.public_bytes(serialization.Encoding.DER)).decode( + "utf-8" + ) def _parse_token_data(token_content, format_type="text", subject_token_field_name=None): diff --git a/packages/google-auth/google/auth/transport/_custom_tls_signer.py b/packages/google-auth/google/auth/transport/_custom_tls_signer.py index 9279158d45c6..1ac0d081e2da 100644 --- a/packages/google-auth/google/auth/transport/_custom_tls_signer.py +++ b/packages/google-auth/google/auth/transport/_custom_tls_signer.py @@ -23,8 +23,6 @@ import os import sys -import cffi # type: ignore - from google.auth import exceptions _LOGGER = logging.getLogger(__name__) @@ -45,11 +43,6 @@ ) -# Cast SSL_CTX* to void* -def _cast_ssl_ctx_to_void_p_pyopenssl(ssl_ctx): - return ctypes.cast(int(cffi.FFI().cast("intptr_t", ssl_ctx)), ctypes.c_void_p) - - # Cast SSL_CTX* to void* def _cast_ssl_ctx_to_void_p_stdlib(context): return ctypes.c_void_p.from_address( @@ -274,7 +267,7 @@ def attach_to_ssl_context(self, ctx): if not self._offload_lib.ConfigureSslContext( self._sign_callback, ctypes.c_char_p(self._cert), - _cast_ssl_ctx_to_void_p_pyopenssl(ctx._ctx._context), + _cast_ssl_ctx_to_void_p_stdlib(ctx), ): raise exceptions.MutualTLSChannelError( "failed to configure ECP Offload SSL context" diff --git a/packages/google-auth/google/auth/transport/_mtls_helper.py b/packages/google-auth/google/auth/transport/_mtls_helper.py index d6450291c7f2..53c040458ff9 100644 --- a/packages/google-auth/google/auth/transport/_mtls_helper.py +++ b/packages/google-auth/google/auth/transport/_mtls_helper.py @@ -14,11 +14,13 @@ """Helper functions for getting mTLS cert and key.""" +import contextlib import json import logging from os import environ, getenv, path import re import subprocess +from typing import Generator, Optional, Tuple, Union from google.auth import _agent_identity_utils from google.auth import environment_vars @@ -65,6 +67,229 @@ ) +@contextlib.contextmanager +def secure_cert_key_paths( + cert: Union[str, bytes], + key: Union[str, bytes], + passphrase: Optional[bytes] = None, +) -> Generator[Tuple[str, str, Optional[bytes]], None, None]: + """Provides secure file paths for certificate and key. + + Standard TLS libraries (like Python's standard library `ssl`) require file paths to + load credentials. To minimize exposure of raw private key bytes on physical storage, + this context manager implements a three-tier fallback strategy: yielding pass-through + paths (Tier 1), using RAM-backed virtual files on Linux (Tier 2), or falling back + to encrypted temporary files on disk (Tier 3). + + Args: + cert (Union[str, bytes]): Certificate path or raw PEM content bytes. + key (Union[str, bytes]): Private key path or raw PEM content bytes. + passphrase (Optional[bytes]): Optional passphrase for the private key. + + Yields: + Tuple[str, str, Optional[bytes]]: The certificate path, key path, and + the passphrase needed to load the key (either the user's original, + or the newly generated one if Tier 3 had to encrypt the key). + """ + import os + import sys + + # Tier 1: Pass-through (No-op). If the caller already provided file paths, + # we yield them directly to avoid any unnecessary file creation. + if isinstance(cert, str) and isinstance(key, str): + yield cert, key, passphrase + return + + cert_bytes = cert if isinstance(cert, bytes) else None + key_bytes = key if isinstance(key, bytes) else None + + # Tier 2: Linux RAM-backed virtual files. If supported by the OS, we write + # the bytes to anonymous in-memory files using memfd_create. This yields + # /proc/self/fd/... paths, keeping the private key entirely in memory. + if sys.platform == "linux" and hasattr(os, "memfd_create"): + cm = _memfd_cert_key_paths(cert_bytes, key_bytes) + try: + cert_path, key_path = cm.__enter__() + except OSError: + pass # Fallback to Tier 3 on failure. + else: + try: + # Handle cases where path exists but might be restricted. + if (cert_path is None or os.path.exists(cert_path)) and ( + key_path is None or os.path.exists(key_path) + ): + yield cert_path or cert, key_path or key, passphrase + return + finally: + import sys + + exc_info = sys.exc_info() + cm.__exit__( + *(exc_info if exc_info[0] is not None else (None, None, None)) + ) + # If verification failed, fall through to Tier 3. + + # Tier 3: Fallback Encrypted Temp Files. If in-memory files are not supported + # (macOS/Windows), we write to disk. To protect the key, we encrypt plaintext + # keys on-the-fly and securely wipe the files with null bytes during cleanup. + with _tempfile_cert_key_paths(cert_bytes, key_bytes, passphrase) as ( + cert_path, + key_path, + new_passphrase, + ): + yield cert_path or cert, key_path or key, new_passphrase + + +def _encrypt_key_if_plaintext( + key_bytes: bytes, passphrase: Optional[bytes] +) -> Tuple[bytes, Optional[bytes]]: + """Encrypts a plaintext PEM key if necessary, returning the bytes and passphrase. + + If the key is already encrypted, returns it as-is. + """ + from cryptography.hazmat.primitives import serialization + import secrets + + try: + pkey = serialization.load_pem_private_key(key_bytes, password=None) + # It's plaintext, encrypt it. + target_passphrase = ( + passphrase + if passphrase is not None + else secrets.token_hex(32).encode("utf-8") + ) + encrypted_content = pkey.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.BestAvailableEncryption( + target_passphrase + ), + ) + return encrypted_content, target_passphrase + except (ValueError, TypeError): + # Likely already encrypted or invalid, return as-is. + return key_bytes, passphrase + + +def _secure_wipe_and_remove(file_path: str): + """Overwrites a file with null bytes before deleting it. + + This is an extra security measure to make file recovery harder. However, on modern + solid-state drives (SSDs), the hardware optimizes where data is written, meaning + the original private key bytes might still physically remain on the storage chips + until the drive cleans them up. + """ + import os + + if not os.path.exists(file_path): + return + try: + size = os.path.getsize(file_path) + with open(file_path, "r+b") as f: + f.write(b"\0" * size) + f.flush() + os.fsync(f.fileno()) + except OSError: + pass # Ignore permission/lock errors during cleanup. + finally: + try: + os.remove(file_path) + except OSError: + pass + + +@contextlib.contextmanager +def _memfd_cert_key_paths(cert_bytes: Optional[bytes], key_bytes: Optional[bytes]): + """Creates secure, in-memory virtual files on Linux using memfd_create. + + Yields: + Tuple[Optional[str], Optional[str]]: In-memory file paths pointing to + the active descriptors (e.g., '/proc/self/fd/3'). + """ + import os + + cleanup_fds = [] + cert_path, key_path = None, None + + try: + if cert_bytes is not None: + # MFD_CLOEXEC prevents FD leaks to spawned subprocesses. + fd_cert = os.memfd_create("mtls_cert", os.MFD_CLOEXEC) + os.write(fd_cert, cert_bytes) + cert_path = f"/proc/self/fd/{fd_cert}" + cleanup_fds.append(fd_cert) + + if key_bytes is not None: + fd_key = os.memfd_create("mtls_key", os.MFD_CLOEXEC) + os.write(fd_key, key_bytes) + key_path = f"/proc/self/fd/{fd_key}" + cleanup_fds.append(fd_key) + + yield cert_path, key_path + finally: + # Closing the descriptors automatically frees the RAM allocation. + for fd in cleanup_fds: + try: + os.close(fd) + except OSError: + pass + + +@contextlib.contextmanager +def _tempfile_cert_key_paths( + cert_bytes: Optional[bytes], key_bytes: Optional[bytes], passphrase: Optional[bytes] +): + """Creates secure temporary file paths on disk, encrypting private keys. + + Yields: + Tuple[Optional[str], Optional[str], Optional[bytes]]: The temporary file + paths and the passphrase needed to load the key. + """ + import os + import tempfile + + # Prioritize RAM-backed /dev/shm to avoid writing secrets to physical storage. + tmp_dir = "/dev/shm" if os.path.isdir("/dev/shm") else None + cert_path, key_path = None, None + cleanup_files = [] + new_passphrase = passphrase + + try: + if cert_bytes is not None: + fd, cert_path = tempfile.mkstemp(dir=tmp_dir) + cleanup_files.append(cert_path) + with os.fdopen(fd, "wb") as f: + f.write(cert_bytes) + f.flush() + os.fsync(f.fileno()) + + if key_bytes is not None: + # Encrypt plaintext keys on-the-fly before dropping to disk. + encrypted_key_bytes, new_passphrase = _encrypt_key_if_plaintext( + key_bytes, passphrase + ) + + fd, key_path = tempfile.mkstemp(dir=tmp_dir) + cleanup_files.append(key_path) + with os.fdopen(fd, "wb") as f: + f.write(encrypted_key_bytes) + f.flush() + os.fsync(f.fileno()) + + yield cert_path, key_path, new_passphrase + finally: + for file_path in cleanup_files: + try: + # Wiping the private key with null bytes before removal. + if file_path == key_path: + _secure_wipe_and_remove(file_path) + else: + if os.path.exists(file_path): + os.remove(file_path) + except OSError: + pass + + def _check_config_path(config_path): """Checks for config file path. If it exists, returns the absolute path with user expansion; otherwise returns None. @@ -436,16 +661,19 @@ def client_cert_callback(): bytes: The decrypted private key in PEM format. Raises: - ImportError: If pyOpenSSL is not installed. - OpenSSL.crypto.Error: If there is any problem decrypting the private key. + ValueError: If there is any problem decrypting the private key. """ - from OpenSSL import crypto + from cryptography.hazmat.primitives import serialization # First convert encrypted_key_bytes to PKey object - pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase) + pkey = serialization.load_pem_private_key(key, password=passphrase) # Then dump the decrypted key bytes - return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey) + return pkey.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) def check_use_client_cert(): diff --git a/packages/google-auth/google/auth/transport/requests.py b/packages/google-auth/google/auth/transport/requests.py index 9735762c4414..7b246592df33 100644 --- a/packages/google-auth/google/auth/transport/requests.py +++ b/packages/google-auth/google/auth/transport/requests.py @@ -204,30 +204,37 @@ class _MutualTlsAdapter(requests.adapters.HTTPAdapter): key (bytes): client private key in PEM format Raises: - ImportError: if certifi or pyOpenSSL is not installed - OpenSSL.crypto.Error: if client cert or key is invalid + ImportError: if certifi is not installed """ def __init__(self, cert, key): import certifi - from OpenSSL import crypto - import urllib3.contrib.pyopenssl # type: ignore - - urllib3.contrib.pyopenssl.inject_into_urllib3() - - pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key) - x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert) + import ssl ctx_poolmanager = create_urllib3_context() ctx_poolmanager.load_verify_locations(cafile=certifi.where()) - ctx_poolmanager._ctx.use_certificate(x509) - ctx_poolmanager._ctx.use_privatekey(pkey) - self._ctx_poolmanager = ctx_poolmanager ctx_proxymanager = create_urllib3_context() ctx_proxymanager.load_verify_locations(cafile=certifi.where()) - ctx_proxymanager._ctx.use_certificate(x509) - ctx_proxymanager._ctx.use_privatekey(pkey) + + with _mtls_helper.secure_cert_key_paths(cert, key) as ( + cert_path, + key_path, + passphrase, + ): + try: + ctx_poolmanager.load_cert_chain( + certfile=cert_path, keyfile=key_path, password=passphrase + ) + ctx_proxymanager.load_cert_chain( + certfile=cert_path, keyfile=key_path, password=passphrase + ) + except (ssl.SSLError, OSError, IOError, ValueError, RuntimeError) as exc: + raise exceptions.MutualTLSChannelError( + "Failed to configure client certificate and key for mTLS." + ) from exc + + self._ctx_poolmanager = ctx_poolmanager self._ctx_proxymanager = ctx_proxymanager super(_MutualTlsAdapter, self).__init__() @@ -258,7 +265,7 @@ class _MutualTlsOffloadAdapter(requests.adapters.HTTPAdapter): } Raises: - ImportError: if certifi or pyOpenSSL is not installed + ImportError: if certifi is not installed google.auth.exceptions.MutualTLSChannelError: If mutual TLS channel creation failed for any reason. """ @@ -270,10 +277,6 @@ def __init__(self, enterprise_cert_file_path): self.signer = _custom_tls_signer.CustomTlsSigner(enterprise_cert_file_path) self.signer.load_libraries() - import urllib3.contrib.pyopenssl - - urllib3.contrib.pyopenssl.inject_into_urllib3() - poolmanager = create_urllib3_context() poolmanager.load_verify_locations(cafile=certifi.where()) self.signer.attach_to_ssl_context(poolmanager) @@ -449,11 +452,6 @@ def configure_mtls_channel(self, client_cert_callback=None): if not use_client_cert: self._is_mtls = False return - try: - import OpenSSL - except ImportError as caught_exc: - new_exc = exceptions.MutualTLSChannelError(caught_exc) - raise new_exc from caught_exc try: ( @@ -471,10 +469,14 @@ def configure_mtls_channel(self, client_cert_callback=None): except ( exceptions.ClientCertError, ImportError, - OpenSSL.crypto.Error, + ValueError, ) as caught_exc: + self._is_mtls = False new_exc = exceptions.MutualTLSChannelError(caught_exc) raise new_exc from caught_exc + except Exception: + self._is_mtls = False + raise def request( self, diff --git a/packages/google-auth/google/auth/transport/urllib3.py b/packages/google-auth/google/auth/transport/urllib3.py index de07007a946c..78d313625b22 100644 --- a/packages/google-auth/google/auth/transport/urllib3.py +++ b/packages/google-auth/google/auth/transport/urllib3.py @@ -174,22 +174,27 @@ def _make_mutual_tls_http(cert, key): urllib3.PoolManager: Mutual TLS HTTP connection. Raises: - ImportError: If certifi or pyOpenSSL is not installed. - OpenSSL.crypto.Error: If the cert or key is invalid. + ValueError: If the cert or key is invalid. """ import certifi - from OpenSSL import crypto - import urllib3.contrib.pyopenssl # type: ignore + import ssl - urllib3.contrib.pyopenssl.inject_into_urllib3() ctx = urllib3.util.ssl_.create_urllib3_context() ctx.load_verify_locations(cafile=certifi.where()) - pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key) - x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert) - - ctx._ctx.use_certificate(x509) - ctx._ctx.use_privatekey(pkey) + with _mtls_helper.secure_cert_key_paths(cert, key) as ( + cert_path, + key_path, + passphrase, + ): + try: + ctx.load_cert_chain( + certfile=cert_path, keyfile=key_path, password=passphrase + ) + except (ssl.SSLError, OSError, IOError, ValueError, RuntimeError) as exc: + raise exceptions.MutualTLSChannelError( + "Failed to configure client certificate and key for mTLS." + ) from exc http = urllib3.PoolManager(ssl_context=ctx) return http @@ -341,11 +346,6 @@ def configure_mtls_channel(self, client_cert_callback=None): return False else: self._is_mtls = True - try: - import OpenSSL - except ImportError as caught_exc: - new_exc = exceptions.MutualTLSChannelError(caught_exc) - raise new_exc from caught_exc try: found_cert_key, cert, key = transport._mtls_helper.get_client_cert_and_key( @@ -357,13 +357,18 @@ def configure_mtls_channel(self, client_cert_callback=None): self._cached_cert = cert else: self.http = _make_default_http() + self._is_mtls = False except ( exceptions.ClientCertError, ImportError, - OpenSSL.crypto.Error, + ValueError, ) as caught_exc: + self._is_mtls = False new_exc = exceptions.MutualTLSChannelError(caught_exc) raise new_exc from caught_exc + except Exception: + self._is_mtls = False + raise if self._has_user_provided_http: self._has_user_provided_http = False diff --git a/packages/google-auth/noxfile.py b/packages/google-auth/noxfile.py index 9e38ad2b9e69..30df3e180f42 100644 --- a/packages/google-auth/noxfile.py +++ b/packages/google-auth/noxfile.py @@ -134,7 +134,6 @@ def mypy(session): "mypy", "types-certifi", "types-freezegun", - "types-pyOpenSSL", "types-requests", "types-setuptools", "types-mock", diff --git a/packages/google-auth/setup.py b/packages/google-auth/setup.py index 7255dbd41313..83424b899c3e 100644 --- a/packages/google-auth/setup.py +++ b/packages/google-auth/setup.py @@ -35,10 +35,7 @@ reauth_extra_require = ["pyu2f>=0.1.5"] -# TODO(https://github.com/googleapis/google-auth-library-python/issues/1738): Add bounds for pyopenssl dependency. -enterprise_cert_extra_require = ["pyopenssl"] - -pyopenssl_extra_require = ["pyopenssl>=20.0.0"] +enterprise_cert_extra_require = cryptography_base_require # TODO(https://github.com/googleapis/google-auth-library-python/issues/1739): Add bounds for urllib3 and packaging dependencies. urllib3_extra_require = ["urllib3", "packaging"] @@ -55,7 +52,6 @@ "pytest", "pytest-cov", "pytest-localserver", - *pyopenssl_extra_require, *reauth_extra_require, "responses", *urllib3_extra_require, @@ -63,10 +59,6 @@ *aiohttp_extra_require, "aioresponses", "pytest-asyncio", - # TODO(https://github.com/googleapis/google-auth-library-python/issues/1665): Remove the pinned version of pyopenssl - # once `TestDecryptPrivateKey::test_success` is updated to remove the deprecated `OpenSSL.crypto.sign` and - # `OpenSSL.crypto.verify` methods. See: https://www.pyopenssl.org/en/latest/changelog.html#id3. - "pyopenssl < 24.3.0", # TODO(https://github.com/googleapis/google-auth-library-python/issues/1722): `test_aiohttp_requests` depend on # aiohttp < 3.10.0 which is a bug. Investigate and remove the pinned aiohttp version. "aiohttp < 3.10.0", @@ -77,7 +69,6 @@ "cryptography": cryptography_base_require, "aiohttp": aiohttp_extra_require, "enterprise_cert": enterprise_cert_extra_require, - "pyopenssl": pyopenssl_extra_require, "pyjwt": pyjwt_extra_require, "reauth": reauth_extra_require, "requests": requests_extra_require, diff --git a/packages/google-auth/system_tests/noxfile.py b/packages/google-auth/system_tests/noxfile.py index 2cc4d122cf02..825ef0aab509 100644 --- a/packages/google-auth/system_tests/noxfile.py +++ b/packages/google-auth/system_tests/noxfile.py @@ -322,7 +322,7 @@ def urllib3(session): @nox.session(python=PYTHON_VERSIONS_SYNC) def mtls_http(session): session.install(LIBRARY_DIR) - session.install(*TEST_DEPENDENCIES_SYNC, "pyopenssl") + session.install(*TEST_DEPENDENCIES_SYNC) session.env[EXPLICIT_CREDENTIALS_ENV] = SERVICE_ACCOUNT_FILE default( session, diff --git a/packages/google-auth/tests/test_identity_pool.py b/packages/google-auth/tests/test_identity_pool.py index c68fac64708d..cfc4f3589bc4 100644 --- a/packages/google-auth/tests/test_identity_pool.py +++ b/packages/google-auth/tests/test_identity_pool.py @@ -20,7 +20,8 @@ from unittest import mock import urllib -from OpenSSL import crypto +from cryptography import x509 +from cryptography.hazmat.primitives import serialization import pytest # type: ignore from google.auth import _helpers, external_account @@ -69,17 +70,15 @@ JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME) with open(CERT_FILE, "rb") as f: + cert = x509.load_pem_x509_certificate(f.read()) CERT_FILE_CONTENT = base64.b64encode( - crypto.dump_certificate( - crypto.FILETYPE_ASN1, crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) - ) + cert.public_bytes(serialization.Encoding.DER) ).decode("utf-8") with open(OTHER_CERT_FILE, "rb") as f: + cert = x509.load_pem_x509_certificate(f.read()) OTHER_CERT_FILE_CONTENT = base64.b64encode( - crypto.dump_certificate( - crypto.FILETYPE_ASN1, crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) - ) + cert.public_bytes(serialization.Encoding.DER) ).decode("utf-8") TOKEN_URL = "https://sts.googleapis.com/v1/token" diff --git a/packages/google-auth/tests/transport/test__custom_tls_signer.py b/packages/google-auth/tests/transport/test__custom_tls_signer.py index 3ecb29a60516..c0e40466e17e 100644 --- a/packages/google-auth/tests/transport/test__custom_tls_signer.py +++ b/packages/google-auth/tests/transport/test__custom_tls_signer.py @@ -22,12 +22,6 @@ from google.auth import exceptions from google.auth.transport import _custom_tls_signer -urllib3_pyopenssl = pytest.importorskip( - "urllib3.contrib.pyopenssl", - reason="urllib3.contrib.pyopenssl not available in this environment", -) - -urllib3_pyopenssl.inject_into_urllib3() FAKE_ENTERPRISE_CERT_FILE_PATH = "/path/to/enterprise/cert/file" ENTERPRISE_CERT_FILE = os.path.join( diff --git a/packages/google-auth/tests/transport/test__mtls_helper.py b/packages/google-auth/tests/transport/test__mtls_helper.py index 078df67470d2..492692c09a4d 100644 --- a/packages/google-auth/tests/transport/test__mtls_helper.py +++ b/packages/google-auth/tests/transport/test__mtls_helper.py @@ -16,7 +16,9 @@ import re from unittest import mock -from OpenSSL import crypto +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives import hashes import pytest # type: ignore from google.auth import environment_vars, exceptions @@ -26,16 +28,17 @@ KEY_MOCK_VAL = b"key" CONTEXT_AWARE_METADATA = {"cert_provider_command": ["some command"]} ENCRYPTED_EC_PRIVATE_KEY = b"""-----BEGIN ENCRYPTED PRIVATE KEY----- -MIHkME8GCSqGSIb3DQEFDTBCMCkGCSqGSIb3DQEFDDAcBAgl2/yVgs1h3QICCAAw -DAYIKoZIhvcNAgkFADAVBgkrBgEEAZdVAQIECJk2GRrvxOaJBIGQXIBnMU4wmciT -uA6yD8q0FxuIzjG7E2S6tc5VRgSbhRB00eBO3jWmO2pBybeQW+zVioDcn50zp2ts -wYErWC+LCm1Zg3r+EGnT1E1GgNoODbVQ3AEHlKh1CGCYhEovxtn3G+Fjh7xOBrNB -saVVeDb4tHD4tMkiVVUBrUcTZPndP73CtgyGHYEphasYPzEz3+AU +MIH0MF8GCSqGSIb3DQEFDTBSMDEGCSqGSIb3DQEFDDAkBBClWcQyUELNC9Hjr+Sp +WK85AgIIADAMBggqhkiG9w0CCQUAMB0GCWCGSAFlAwQBKgQQ6uJeoqE7P9HtxAgS +n6rBFgSBkMRDYXLucNp7ew7LbQmkZCmjnRhgyw6b0dD3eK8f3jisj8UiR8aj9a2S +1FZiNHKLmI7hkZHH+d2DPWYhe/tf5SS4iLzpZogBehMv4UDNnNaj0dvQZgpnpciK +1H+0u/i+crc1WAGlemLAi7dktCCBTzeX19cRMGHie68rx1C82LHLZmefr7AEIVxp +uUoJ+sLhBw== -----END ENCRYPTED PRIVATE KEY-----""" EC_PUBLIC_KEY = b"""-----BEGIN PUBLIC KEY----- -MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEvCNi1NoDY1oMqPHIgXI8RBbTYGi/ -brEjbre1nSiQW11xRTJbVeETdsuP0EAu2tG3PcRhhwDfeJ8zXREgTBurNw== +MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEwdsHzL05VUmqYJat2yGdbSHQAg49 +Wc+fhwLH3b+SCC/2/TqPNDy9yMdMxMtEfZfKal2EaeE2erJrtu7WNfjD0Q== -----END PUBLIC KEY-----""" PASSPHRASE = b"""-----BEGIN PASSPHRASE----- @@ -757,17 +760,15 @@ def test_success(self): decrypted_key = _mtls_helper.decrypt_private_key( ENCRYPTED_EC_PRIVATE_KEY, PASSPHRASE_VALUE ) - private_key = crypto.load_privatekey(crypto.FILETYPE_PEM, decrypted_key) - public_key = crypto.load_publickey(crypto.FILETYPE_PEM, EC_PUBLIC_KEY) - x509 = crypto.X509() - x509.set_pubkey(public_key) + private_key = serialization.load_pem_private_key(decrypted_key, password=None) + public_key = serialization.load_pem_public_key(EC_PUBLIC_KEY) # Test the decrypted key works by signing and verification. - signature = crypto.sign(private_key, b"data", "sha256") - crypto.verify(x509, signature, b"data", "sha256") + signature = private_key.sign(b"data", ec.ECDSA(hashes.SHA256())) + public_key.verify(signature, b"data", ec.ECDSA(hashes.SHA256())) def test_crypto_error(self): - with pytest.raises(crypto.Error): + with pytest.raises(ValueError): _mtls_helper.decrypt_private_key( ENCRYPTED_EC_PRIVATE_KEY, b"wrong_password" ) diff --git a/packages/google-auth/tests/transport/test_aio_mtls_helper.py b/packages/google-auth/tests/transport/test_aio_mtls_helper.py index bc9cde7d793b..2af155d9ee83 100644 --- a/packages/google-auth/tests/transport/test_aio_mtls_helper.py +++ b/packages/google-auth/tests/transport/test_aio_mtls_helper.py @@ -26,24 +26,6 @@ class TestMTLS: - @pytest.mark.asyncio - async def test__create_temp_file(self): - """Tests that _create_temp_file creates a file with correct content and deletes it.""" - content = b"test cert data" - - # Test file creation and content - with mtls._create_temp_file(content) as file_path: - assert os.path.exists(file_path) - # Verify file is not readable by others (mkstemp default) - if os.name == "posix": - assert (os.stat(file_path).st_mode & 0o777) == 0o600 - - with open(file_path, "rb") as f: - assert f.read() == content - - # Test file deletion after context exit - assert not os.path.exists(file_path) - @pytest.mark.asyncio async def test_make_client_cert_ssl_context_success(self): """Tests successful creation of an SSLContext with client certificates.""" diff --git a/packages/google-auth/tests/transport/test_requests.py b/packages/google-auth/tests/transport/test_requests.py index c9fab036e17b..a8531e37dcfa 100644 --- a/packages/google-auth/tests/transport/test_requests.py +++ b/packages/google-auth/tests/transport/test_requests.py @@ -20,7 +20,6 @@ from unittest import mock import freezegun -import OpenSSL import pytest # type: ignore import requests import requests.adapters @@ -192,18 +191,11 @@ def test_success(self, mock_proxy_manager_for, mock_init_poolmanager): mock_proxy_manager_for.assert_called_with(ssl_context=adapter._ctx_proxymanager) def test_invalid_cert_or_key(self): - with pytest.raises(OpenSSL.crypto.Error): + with pytest.raises(exceptions.MutualTLSChannelError): google.auth.transport.requests._MutualTlsAdapter( b"invalid cert", b"invalid key" ) - @mock.patch.dict("sys.modules", {"OpenSSL.crypto": None}) - def test_import_error(self): - with pytest.raises(ImportError): - google.auth.transport.requests._MutualTlsAdapter( - pytest.public_cert_bytes, pytest.private_key_bytes - ) - def make_response(status=http_client.OK, data=None): response = requests.Response() @@ -491,9 +483,29 @@ def test_configure_mtls_channel_exceptions(self, mock_get_client_cert_and_key): ): auth_session.configure_mtls_channel() - mock_get_client_cert_and_key.return_value = (False, None, None) - with mock.patch.dict("sys.modules"): - sys.modules["OpenSSL"] = None + @mock.patch( + "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True + ) + @mock.patch("google.auth.transport.requests.create_urllib3_context", autospec=True) + def test_configure_mtls_channel_cert_loading_exceptions( + self, mock_create_urllib3_context, mock_get_client_cert_and_key + ): + import ssl + + mock_get_client_cert_and_key.return_value = ( + True, + pytest.public_cert_bytes, + pytest.private_key_bytes, + ) + + for exception_type in [ValueError("error"), ssl.SSLError("error")]: + mock_ctx = mock.Mock() + mock_ctx.load_cert_chain.side_effect = exception_type + mock_create_urllib3_context.return_value = mock_ctx + + auth_session = google.auth.transport.requests.AuthorizedSession( + credentials=mock.Mock() + ) with pytest.raises(exceptions.MutualTLSChannelError): with mock.patch.dict( os.environ, @@ -501,6 +513,8 @@ def test_configure_mtls_channel_exceptions(self, mock_get_client_cert_and_key): ): auth_session.configure_mtls_channel() + assert not auth_session.is_mtls + @mock.patch( "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True ) diff --git a/packages/google-auth/tests/transport/test_urllib3.py b/packages/google-auth/tests/transport/test_urllib3.py index b29e4e950433..e5b4b561369a 100644 --- a/packages/google-auth/tests/transport/test_urllib3.py +++ b/packages/google-auth/tests/transport/test_urllib3.py @@ -17,7 +17,6 @@ import sys from unittest import mock -import OpenSSL import pytest # type: ignore import urllib3 # type: ignore @@ -103,18 +102,11 @@ def test_success(self): assert isinstance(http, urllib3.PoolManager) def test_crypto_error(self): - with pytest.raises(OpenSSL.crypto.Error): + with pytest.raises(exceptions.MutualTLSChannelError): google.auth.transport.urllib3._make_mutual_tls_http( b"invalid cert", b"invalid key" ) - @mock.patch.dict("sys.modules", {"OpenSSL.crypto": None}) - def test_import_error(self): - with pytest.raises(ImportError): - google.auth.transport.urllib3._make_mutual_tls_http( - pytest.public_cert_bytes, pytest.private_key_bytes - ) - class TestAuthorizedHttp(object): TEST_URL = "http://example.com" @@ -280,9 +272,33 @@ def test_configure_mtls_channel_exceptions(self, mock_get_client_cert_and_key): ): authed_http.configure_mtls_channel() - mock_get_client_cert_and_key.return_value = (False, None, None) - with mock.patch.dict("sys.modules"): - sys.modules["OpenSSL"] = None + @mock.patch( + "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True + ) + @mock.patch( + "google.auth.transport.urllib3.urllib3.util.ssl_.create_urllib3_context", + autospec=True, + ) + def test_configure_mtls_channel_cert_loading_exceptions( + self, mock_create_urllib3_context, mock_get_client_cert_and_key + ): + import ssl + + authed_http = google.auth.transport.urllib3.AuthorizedHttp( + credentials=mock.Mock() + ) + + mock_get_client_cert_and_key.return_value = ( + True, + pytest.public_cert_bytes, + pytest.private_key_bytes, + ) + + for exception_type in [ValueError("error"), ssl.SSLError("error")]: + mock_ctx = mock.Mock() + mock_ctx.load_cert_chain.side_effect = exception_type + mock_create_urllib3_context.return_value = mock_ctx + with pytest.raises(exceptions.MutualTLSChannelError): with mock.patch.dict( os.environ, @@ -290,6 +306,8 @@ def test_configure_mtls_channel_exceptions(self, mock_get_client_cert_and_key): ): authed_http.configure_mtls_channel() + assert not authed_http._is_mtls + @mock.patch( "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True )