Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions config/settings.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,25 @@ default:
token: # token for openshift where the testenv tools are deployed (unnecessary for for default openshift)
private_base_url:
default: echo_api # tool name to be used by default for backend
shared_certs:
# config for obtaining certificates which are signed by same CA as tools.
# If this config is not set, testsuite will use same openshift and namespace as tools
namespace: tools # openshift namespace/project where the secrets with certificates are deployed
server_url: # openshift url where the secrets with certificates are deployed
token: # token for openshift where the secrets with certificates are deployed
# certs can be also added manually
client_certs:
valid:
- name: client1
crt: "cert1"
key: "key1"
- name: client2
crt: "cert2"
key: "key2"
invalid:
- name: client3
crt: "cert3"
key: "key3"
warn_and_skip:
# section to control how warn_and_skip should behave for particular tests
# works just for tests and fixture that use warn_and_skip
Expand Down
40 changes: 40 additions & 0 deletions testsuite/dynaconf_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,43 @@ def _rhsso_credentials(tools_config, rhsso_config):
return None, None


def _shared_tool_certs(tools_config, shared_certs_config):
"""Search for SSO clients certificates"""
if not shared_certs_config:
shared_certs_config = tools_config
server_url = shared_certs_config.get("server_url")
project = shared_certs_config.get("namespace")
token = shared_certs_config.get("token")
client = OpenShiftClient(project_name=project, server_url=server_url, token=token)
try:
iter(client.secrets)
except OpenShiftPythonException:
return {"valid": [], "invalid": []}

valid_certs = []
invalid_certs = []
for secret in client.secrets:
try:
name = secret["metadata"]["name"]
annotations = secret["metadata"]["annotations"]
shared_cert = annotations["shared_cert"]
valid = annotations["valid"]
except KeyError:
continue
if shared_cert != "True":
continue
cert = {
"name": name,
"crt": client.secrets[name]["tls.crt"].decode("utf-8"),
"key": client.secrets[name]["tls.key"].decode("utf-8"),
}
if valid == "True":
valid_certs.append(cert)
if valid == "False":
invalid_certs.append(cert)
return {"valid": valid_certs, "invalid": invalid_certs}


def _threescale_operator_ocp(ocp):
try:
ocp.threescale_operator # pylint: disable=pointless-statement
Expand Down Expand Up @@ -264,6 +301,8 @@ def load(obj, env=None, silent=None, key=None):
rhsso_setup = obj.get("rhsso", {})
rhsso_username, rhsso_password = _rhsso_credentials(ocp_tools_setup, rhsso_setup)

shared_certs_setup = obj.get("shared_certs", {})

ocp = OpenShiftClient(
project_name=project, server_url=ocp_setup.get("server_url"), token=ocp_setup.get("token")
)
Expand Down Expand Up @@ -353,6 +392,7 @@ def load(obj, env=None, silent=None, key=None):
"apicast": {"openshift": apicast_operator_ocp},
},
"rhsso": {"password": rhsso_password, "username": rhsso_username},
"shared_certs": {"client_certs": _shared_tool_certs(ocp_tools_setup, shared_certs_setup)},
}

# this overwrites what's already in settings to ensure NAMESPACE is propagated
Expand Down
20 changes: 15 additions & 5 deletions testsuite/rhsso/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,19 @@ def __getstate__(self):
more info here: https://docs.python.org/3/library/pickle.html#object.__getstate__
"""
return {
"client": self.client.client_id,
"realm": self.realm.name,
"client": {
"client_id": self.client.client_id,
"verify": self.client.verify,
},
"realm": {
"name": self.realm.name,
"verify": self.realm.verify,
},
"rhsso": {
"url": self.rhsso.server_url,
"username": self.rhsso.master.connection.username,
"password": self.rhsso.master.connection.password,
"verify": self.rhsso.verify,
},
"user": self.user,
"username": self.username,
Expand All @@ -135,11 +142,14 @@ def __setstate__(self, state):
more info here: https://docs.python.org/3/library/pickle.html#object.__setstate__
"""
self.rhsso = RHSSO(
server_url=state["rhsso"]["url"], username=state["rhsso"]["username"], password=state["rhsso"]["password"]
server_url=state["rhsso"]["url"],
username=state["rhsso"]["username"],
password=state["rhsso"]["password"],
verify=state["rhsso"]["verify"],
)
self.realm = Realm(self.rhsso.master, state["realm"])
self.realm = Realm(self.rhsso.master, state["realm"]["name"], verify=state["realm"]["verify"])
self.user = state["user"]
self.client = Client(self.realm, state["client"])
self.client = Client(self.realm, state["client"]["client_id"], verify=state["client"]["verify"])
self.username = state["username"]
self.password = state["password"]
self._oidc_client = self.client.oidc_client
Expand Down
38 changes: 26 additions & 12 deletions testsuite/rhsso/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,41 @@
from testsuite.config import settings


def _fallback(value, default):
return default if value is None else value


class Realm:
"""Helper class for RHSSO realm manipulation"""

def __init__(self, master: KeycloakAdmin, name) -> None:
def __init__(self, master: KeycloakAdmin, name, verify=None) -> None:
self.verify = verify if verify else settings["ssl_verify"]
self.admin = KeycloakAdmin(
server_url=master.connection.server_url,
username=master.connection.username,
password=master.connection.password,
realm_name=name,
user_realm_name="master",
verify=settings["ssl_verify"],
verify=self.verify,
)
self.name = name

def delete(self):
"""Deletes realm"""
self.admin.delete_realm(self.name)

def create_client(self, name, **kwargs):
def create_client(self, name, cert=None, verify=None, **kwargs):
"""Creates new client"""
self.admin.create_client(payload={**kwargs, "clientId": name})
client_id = self.admin.get_client_id(name)
return Client(self, client_id)
return Client(self, client_id, cert, _fallback(verify, self.verify))

def update_client(self, name, cert=None, verify=None, **kwargs):
"""Updates existing client by merging provided kwargs into its current configuration"""
client_id = self.admin.get_client_id(name)
current = self.admin.get_client(client_id)
self.admin.update_client(client_id, payload={**current, **kwargs, "clientId": name})
return Client(self, client_id, cert, _fallback(verify, self.verify))

def create_user(self, username, password, **kwargs):
"""Creates new user"""
Expand All @@ -44,7 +56,7 @@ def create_user(self, username, password, **kwargs):
self.admin.update_user(user_id, {"emailVerified": True})
return user_id

def oidc_client(self, client_id, client_secret) -> KeycloakOpenID:
def oidc_client(self, client_id, client_secret, cert=None, verify=None) -> KeycloakOpenID:
"""Create OIDC client for this realm"""
server_url = self.admin.connection.server_url

Expand All @@ -57,16 +69,20 @@ def oidc_client(self, client_id, client_secret) -> KeycloakOpenID:
client_id=client_id,
realm_name=self.name,
client_secret_key=client_secret,
cert=cert,
verify=_fallback(verify, self.verify),
)


class Client:
"""Helper class for RHSSO client manipulation"""

def __init__(self, realm: Realm, client_id) -> None:
def __init__(self, realm: Realm, client_id, cert=None, verify=None) -> None:
self.admin = realm.admin
self.realm = realm
self.client_id = client_id
self.cert = cert
self.verify = verify

def assign_role(self, role_name):
"""Assign client role from realm management client"""
Expand All @@ -81,18 +97,16 @@ def oidc_client(self) -> KeycloakOpenID:
# Note This is different clientId (clientId) than self.client_id (Id), because RHSSO
client_id = self.admin.get_client(self.client_id)["clientId"]
secret = self.admin.get_client_secrets(self.client_id)["value"]
return self.realm.oidc_client(client_id, secret)
return self.realm.oidc_client(client_id, secret, self.cert, verify=self.verify)


# pylint: disable=too-few-public-methods
class RHSSO:
"""Helper class for RHSSO server"""

def __init__(self, server_url, username, password, verify=None) -> None:
def __init__(self, server_url, username, password, verify=True) -> None:
# python-keycloak API requires url to be pointed at auth/ endpoint
# pylint: disable=protected-access
if verify is None:
verify = settings["ssl_verify"]
self.verify = verify
try:
self.master = KeycloakAdmin(
Expand All @@ -115,10 +129,10 @@ def __init__(self, server_url, username, password, verify=None) -> None:
)
self.master.get_clients() # test whether the server url is valid

def create_realm(self, name: str, **kwargs) -> Realm:
def create_realm(self, name: str, verify=None, **kwargs) -> Realm:
"""Creates new realm"""
self.master.create_realm(payload={"realm": name, "enabled": True, "sslRequired": "None", **kwargs})
return Realm(self.master, name)
return Realm(self.master, name, verify=_fallback(verify, self.verify))


# pylint: disable=too-few-public-methods
Expand Down
Loading
Loading