Skip to content

Commit 9a5fe2d

Browse files
committed
feat: New Container: Lowkey Vault
- Add Lowkey Vault container implementation - Cover the new container with tests - Add basic example for secret use Resolves #948
1 parent d7953b8 commit 9a5fe2d

File tree

8 files changed

+801
-7
lines changed

8 files changed

+801
-7
lines changed

modules/lowkey-vault/README.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.. autoclass:: testcontainers.lowkeyvault.LowkeyVaultContainer
2+
.. title:: testcontainers.lowkeyvault.LowkeyVaultContainer
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import urllib3
2+
from azure.core.pipeline.transport._requests_basic import RequestsTransport
3+
from azure.keyvault.secrets import SecretClient
4+
5+
from testcontainers.lowkeyvault import LowkeyVaultContainer
6+
7+
8+
def basic_example():
9+
with LowkeyVaultContainer() as lowkey_vault_container:
10+
# get connection details
11+
connection_url = lowkey_vault_container.get_connection_url()
12+
print(f"Lowkey Vault is running: {connection_url}")
13+
token = lowkey_vault_container.get_token()
14+
print("Obtained token")
15+
# prepare a transport ignoring self-signed certificate issues
16+
transport = RequestsTransport(connection_verify=False)
17+
# make sure to turn off challenge resource verification
18+
secret_client: SecretClient = SecretClient(
19+
vault_url=connection_url, credential=token, verify_challenge_resource=False, transport=transport
20+
)
21+
22+
# set a secret
23+
secret_client.set_secret(name="test-secret", value="a secret message")
24+
print("The secret has been set.")
25+
26+
# get the value of the secret
27+
actual: str = secret_client.get_secret(name="test-secret").value
28+
print(f"The secret has been retrieved with value: '{actual}'")
29+
30+
# close the secret client
31+
secret_client.close()
32+
33+
34+
if __name__ == "__main__":
35+
# ignore cert errors
36+
urllib3.disable_warnings()
37+
# run the code
38+
basic_example()
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import os
2+
from enum import Enum
3+
from typing import Any, NamedTuple, Optional
4+
5+
import requests
6+
7+
from testcontainers.core.container import DockerContainer
8+
from testcontainers.core.wait_strategies import LogMessageWaitStrategy
9+
10+
# This comment can be removed (Used for testing)
11+
12+
13+
class StaticToken(NamedTuple):
14+
"""Represents an OAuth access token."""
15+
16+
token: str
17+
"""The token string."""
18+
expires_on: int
19+
"""The token's expiration time in Unix time."""
20+
21+
22+
class StaticTokenCredential:
23+
def __init__(self, json_token: str):
24+
self.access_token = StaticToken(token=json_token.get("access_token"), expires_on=json_token.get("expires_on"))
25+
26+
def get_token(
27+
self,
28+
*scopes: str,
29+
claims: Optional[str] = None,
30+
tenant_id: Optional[str] = None,
31+
enable_cae: bool = False,
32+
**kwargs: Any,
33+
) -> StaticToken:
34+
return self.access_token
35+
36+
37+
class NetworkType(Enum):
38+
NETWORK = "network"
39+
LOCAL = "local"
40+
41+
42+
class LowkeyVaultContainer(DockerContainer):
43+
"""
44+
Container for a Lowkey Vault instance for emulating Azure Key Vault.
45+
Supports Key, Secret and Certificate APIs.
46+
47+
Example:
48+
49+
.. doctest::
50+
51+
>>> from azure.core.pipeline.transport._requests_basic import RequestsTransport
52+
>>> from azure.keyvault.secrets import SecretClient
53+
>>> from testcontainers.lowkeyvault import LowkeyVaultContainer
54+
55+
>>> with LowkeyVaultContainer() as lowkey_vault:
56+
... connection_url = lowkey_vault.get_connection_url()
57+
... token = lowkey_vault.get_token()
58+
... # don't fail due to the self-signed certificate
59+
... transport = RequestsTransport(connection_verify=False)
60+
... # make sure to turn off challenge resource verification
61+
... secret_client = SecretClient(
62+
... vault_url=connection_url,
63+
... credential=token,
64+
... verify_challenge_resource=False,
65+
... transport=transport
66+
... )
67+
"""
68+
69+
def __init__(
70+
self, image: str = "nagyesta/lowkey-vault:7.0.9-ubi10-minimal", container_alias: Optional[str] = None, **kwargs
71+
) -> None:
72+
super().__init__(image, **kwargs)
73+
self.api_port = 8443
74+
self.metadata_port = 8080
75+
self.with_exposed_ports(self.api_port, self.metadata_port)
76+
self.with_env("LOWKEY_VAULT_RELAXED_PORTS", "true")
77+
container_host_ip: str = self.get_container_host_ip()
78+
if container_alias is not None:
79+
self.with_network_aliases(container_alias)
80+
self.container_alias = container_alias
81+
self.with_env("LOWKEY_VAULT_ALIASES", f"localhost={container_alias}:<port>")
82+
elif container_host_ip != "localhost":
83+
self.with_env("LOWKEY_VAULT_ALIASES", f"localhost={container_host_ip}:<port>")
84+
self.waiting_for(LogMessageWaitStrategy("Started LowkeyVaultApp."))
85+
86+
def _configure(self) -> None:
87+
return
88+
89+
def get_connection_url(self, network_type: NetworkType = NetworkType.LOCAL) -> str:
90+
if network_type == NetworkType.LOCAL:
91+
return f"https://{self.get_container_host_ip()}:{self.get_exposed_port(self.api_port)}"
92+
else:
93+
return f"https://{self.container_alias}:{self.api_port}"
94+
95+
def get_imds_endpoint(self, network_type: NetworkType = NetworkType.LOCAL) -> str:
96+
if network_type == NetworkType.LOCAL:
97+
return f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self.metadata_port)}"
98+
else:
99+
return f"http://{self.container_alias}:{self.metadata_port}"
100+
101+
def auto_set_local_managed_identity_env_variables(self):
102+
imds_endpoint: str = self.get_imds_endpoint(network_type=NetworkType.LOCAL)
103+
token_url: str = self.get_token_url(network_type=NetworkType.LOCAL)
104+
os.environ["AZURE_POD_IDENTITY_AUTHORITY_HOST"] = imds_endpoint
105+
os.environ["IMDS_ENDPOINT"] = imds_endpoint
106+
os.environ["IDENTITY_ENDPOINT"] = token_url
107+
108+
def get_token_url(self, network_type: NetworkType = NetworkType.LOCAL) -> str:
109+
base_url = self.get_imds_endpoint(network_type=network_type)
110+
return f"{base_url}/metadata/identity/oauth2/token"
111+
112+
def get_token(self, network_type: NetworkType = NetworkType.LOCAL) -> StaticTokenCredential:
113+
resource = self.get_connection_url(network_type=network_type)
114+
token_url = self.get_token_url(network_type=network_type)
115+
json_response = requests.get(f"{token_url}?resource={resource}").json()
116+
return StaticTokenCredential(json_token=json_response)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Use an official Python runtime as a parent image
2+
FROM python:3.10-slim
3+
4+
# Set the working directory in the container
5+
WORKDIR /app
6+
7+
# Install dependencies and create a dummy key file for the authentication to work
8+
RUN \
9+
pip install azure-keyvault-certificates==4.10.0 && \
10+
pip install azure-keyvault-secrets==4.10.0 && \
11+
pip install azure-keyvault-keys==4.11.0 && \
12+
pip install cryptography==46.0.3 && \
13+
pip install azure-identity==1.25.1 && \
14+
mkdir -p /var/opt/azcmagent/tokens/ && \
15+
touch /var/opt/azcmagent/tokens/assumed-identity.key
16+
17+
COPY ./netowrk_container.py netowrk_container.py
18+
EXPOSE 80
19+
# Define the command to run the application
20+
CMD ["python", "netowrk_container.py"]
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
import base64
2+
import os
3+
4+
import urllib3
5+
from azure.core.pipeline.transport._requests_basic import RequestsTransport
6+
from azure.identity import DefaultAzureCredential
7+
from azure.keyvault.certificates import CertificateClient, CertificatePolicy
8+
from azure.keyvault.keys import KeyClient, KeyOperation
9+
from azure.keyvault.keys.crypto import CryptographyClient, EncryptionAlgorithm, EncryptResult, DecryptResult
10+
from azure.keyvault.secrets import SecretClient
11+
from cryptography import x509
12+
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
13+
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
14+
15+
API_VERSION = "7.6"
16+
17+
18+
def hello_secrets_from_an_external_container():
19+
"""
20+
Entry point function for a custom Docker container to test connectivity
21+
and "secrets" functionality with Lowkey Vault.
22+
23+
This function is designed to run inside a separate container within the
24+
same Docker network as a Lowkey Vault instance.
25+
"""
26+
vault_url = os.environ["CONNECTION_URL"]
27+
secret_message = os.environ["SECRET_VALUE"]
28+
secret_name = os.environ["SECRET_NAME"]
29+
# test secrets API to see that the container works
30+
secret_client = None
31+
try:
32+
# ignore SSL errors because we are using a self-signed certificate
33+
transport_secrets = RequestsTransport(connection_verify=False)
34+
# create the client we need
35+
secret_client = SecretClient(
36+
vault_url=vault_url,
37+
credential=DefaultAzureCredential(),
38+
verify_challenge_resource=False,
39+
transport=transport_secrets,
40+
api_version=API_VERSION,
41+
)
42+
# set the result as a secret
43+
secret_client.set_secret(name=secret_name, value=secret_message)
44+
# get back the value
45+
actual = secret_client.get_secret(name=secret_name).value
46+
47+
# verify the result
48+
assert actual == secret_message
49+
print("Lowkey Vault Container created.")
50+
except Exception as e:
51+
print(f"Something went wrong : {e}")
52+
finally:
53+
# close client
54+
if secret_client is not None:
55+
secret_client.close()
56+
57+
58+
def hello_keys_from_an_external_container():
59+
"""
60+
Entry point function for a custom Docker container to test connectivity
61+
and "keys" functionality with Lowkey Vault.
62+
63+
This function is designed to run inside a separate container within the
64+
same Docker network as a Lowkey Vault instance.
65+
"""
66+
vault_url = os.environ["CONNECTION_URL"]
67+
secret_message = os.environ["SECRET_VALUE"]
68+
key_name = os.environ["KEY_NAME"]
69+
# test key API to see that the container works
70+
key_client = None
71+
crypto_client = None
72+
try:
73+
# ignore SSL errors because we are using a self-signed certificate
74+
transport_keys = RequestsTransport(connection_verify=False)
75+
transport_crypto = RequestsTransport(connection_verify=False)
76+
# create the clients we need
77+
key_client = KeyClient(
78+
vault_url=vault_url,
79+
credential=DefaultAzureCredential(),
80+
verify_challenge_resource=False,
81+
transport=transport_keys,
82+
api_version=API_VERSION,
83+
)
84+
# create a new key
85+
key_client.create_rsa_key(
86+
name=key_name,
87+
size=2048,
88+
key_operations=[KeyOperation.encrypt, KeyOperation.decrypt, KeyOperation.wrap_key, KeyOperation.unwrap_key],
89+
)
90+
91+
crypto_client = CryptographyClient(
92+
key=key_client.get_key(name=key_name).id,
93+
credential=DefaultAzureCredential(),
94+
verify_challenge_resource=False,
95+
transport=transport_crypto,
96+
api_version=API_VERSION,
97+
)
98+
99+
# encode the text
100+
text_as_bytes: bytes = bytes(secret_message.encode("utf-8"))
101+
encrypted: EncryptResult = crypto_client.encrypt(
102+
algorithm=EncryptionAlgorithm.rsa_oaep_256, plaintext=text_as_bytes
103+
)
104+
cipher_text: bytes = encrypted.ciphertext
105+
106+
# decode the cipher text
107+
decrypted: DecryptResult = crypto_client.decrypt(
108+
algorithm=EncryptionAlgorithm.rsa_oaep_256, ciphertext=cipher_text
109+
)
110+
decrypted_text: str = decrypted.plaintext.decode("utf-8")
111+
112+
# verify the result
113+
assert decrypted_text == secret_message
114+
print("Lowkey Vault Container created.")
115+
except Exception as e:
116+
print(f"Something went wrong : {e}")
117+
finally:
118+
# close clients
119+
if key_client is not None:
120+
key_client.close()
121+
if crypto_client is not None:
122+
crypto_client.close()
123+
124+
125+
def hello_certificates_from_an_external_container():
126+
"""
127+
Entry point function for a custom Docker container to test connectivity
128+
and "certificates" functionality with Lowkey Vault.
129+
130+
This function is designed to run inside a separate container within the
131+
same Docker network as a Lowkey Vault instance.
132+
"""
133+
vault_url = os.environ["CONNECTION_URL"]
134+
cert_name = os.environ["CERT_NAME"]
135+
# test certificates API to see that the container works
136+
certificate_client = None
137+
secret_client = None
138+
try:
139+
# ignore SSL errors because we are using a self-signed certificate
140+
transport_certs = RequestsTransport(connection_verify=False)
141+
transport_secrets = RequestsTransport(connection_verify=False)
142+
# create the clients we need
143+
certificate_client = CertificateClient(
144+
vault_url=vault_url,
145+
credential=DefaultAzureCredential(),
146+
verify_challenge_resource=False,
147+
transport=transport_certs,
148+
api_version=API_VERSION,
149+
)
150+
secret_client = SecretClient(
151+
vault_url=vault_url,
152+
credential=DefaultAzureCredential(),
153+
verify_challenge_resource=False,
154+
transport=transport_secrets,
155+
api_version=API_VERSION,
156+
)
157+
158+
subject_name: str = "CN=example.com"
159+
policy: CertificatePolicy = CertificatePolicy(
160+
issuer_name="Self",
161+
subject=subject_name,
162+
key_curve_name="P-256",
163+
key_type="EC",
164+
validity_in_months=12,
165+
content_type="application/x-pkcs12",
166+
)
167+
certificate_client.begin_create_certificate(certificate_name=cert_name, policy=policy).wait()
168+
169+
cert_value = secret_client.get_secret(name=cert_name).value
170+
171+
# decode base64 secret
172+
decoded = base64.b64decode(cert_value)
173+
# open decoded secret as PKCS12 file
174+
pkcs12 = load_key_and_certificates(decoded, b"")
175+
176+
# get the components
177+
ec_key: EllipticCurvePrivateKey = pkcs12[0]
178+
x509_cert: x509.Certificate = pkcs12[1]
179+
180+
# verify the result
181+
assert subject_name == x509_cert.subject.rdns[0].rfc4514_string()
182+
assert "secp256r1" == ec_key.curve.name
183+
184+
print("Lowkey Vault Container created.")
185+
except Exception as e:
186+
print(f"Something went wrong : {e}")
187+
finally:
188+
# close clients
189+
if certificate_client is not None:
190+
certificate_client.close()
191+
if secret_client is not None:
192+
secret_client.close()
193+
194+
195+
if __name__ == "__main__":
196+
mode = os.getenv("TEST")
197+
# ignore cert errors
198+
urllib3.disable_warnings()
199+
if mode == "secrets":
200+
hello_secrets_from_an_external_container()
201+
elif mode == "keys":
202+
hello_keys_from_an_external_container()
203+
elif mode == "certificates":
204+
hello_certificates_from_an_external_container()
205+
else:
206+
print("The TEST env variable must be 'secrets', 'keys', or 'certificates'.")

0 commit comments

Comments
 (0)