diff --git a/automation/argo_automator b/automation/argo_automator index 08826725..5823e2ed 100755 --- a/automation/argo_automator +++ b/automation/argo_automator @@ -13,6 +13,7 @@ import requests from argo_ams_library import ArgoMessagingService from argo_config import ArgoConfig +from check_readiness import check_readiness from init_ams import init_ams from init_compute_engine import init_compute_engine from init_mongo import init_mongo @@ -31,6 +32,7 @@ class JobName(Enum): INIT_MONGO = "INIT_MONGO" INIT_AMS = "INIT_AMS" INIT_COMPUTE_ENGINE = "INIT_COMPUTE_ENGINE" + CHECK_READINESS = "CHECK_READINESS" class JobStatus(Enum): @@ -178,7 +180,9 @@ class ArgoAutomator: f"ams job {job_name} event for tenant: {tenant_name} ({tenant_id}) discarded" ) return - + if job_name == JobName.CHECK_READINESS.value: + self.executor.submit(self.job_check_readiness, tenant_id, tenant_name) + return if job_name == JobName.INIT_MONGO.value: self.executor.submit(self.job_init_mongo, tenant_id, tenant_name) return @@ -194,6 +198,54 @@ class ArgoAutomator: except Exception as e: logger.exception(f"Failed to process event: {e}") + def job_check_readiness(self, tenant_id: str, tenant_name: str): + """Job placeholder to check readiness mongo""" + try: + logger.info( + f"job started: initialising check readiness tenant {tenant_name} with id: {tenant_id}" + ) + # To do stuff here + self.mon_api.update_status( + tenant_id, + tenant_name, + JobName.CHECK_READINESS.value, + JobStatus.IN_PROGRESS.value, + "Running checks for tenant readiness", + ) + + # run the checks and collect information + job_done = check_readiness( + self.config, + tenant_id, + tenant_name, + ) + + + if job_done: + self.mon_api.update_status( + tenant_id, + tenant_name, + JobName.CHECK_READINESS.value, + JobStatus.COMPLETED.value, + "Check readiness completed succesfully", + ) + logger.info( + f"job completed: check_readiness for tenant {tenant_name}" + ) + else: + self.mon_api.update_status( + tenant_id, + tenant_name, + JobName.CHECK_READINESS.value, + JobStatus.FAILED.value, + "Check readiness failed to complete!", + ) + logger.error(f"job failed: check readiness for tenant {tenant_name}") + except Exception as e: + logger.exception(f"job failed for tenant {tenant_name}: {e}") + + + def job_init_mongo(self, tenant_id: str, tenant_name: str): """Job placeholder to init mongo""" try: diff --git a/automation/argo_config.py b/automation/argo_config.py index 7eaf5412..bd6f440d 100644 --- a/automation/argo_config.py +++ b/automation/argo_config.py @@ -38,6 +38,7 @@ def __init__(self, path: str): self.web_api_token = automation.get("web_api_token") self.default_ops_profile_file = automation.get("default_ops_profile_file") self.hdfs_path = run.get("hdfs_path") + self.hdfs_check_path = run.get("hdfs_check_path") self.flink_path = run.get("flink_path") self.batch_jar_path = run.get("batch_jar_path") self.ingest_jar_path = run.get("ingest_jar_path") diff --git a/automation/argo_web_api.py b/automation/argo_web_api.py index 258da69c..2f995e5c 100644 --- a/automation/argo_web_api.py +++ b/automation/argo_web_api.py @@ -1,4 +1,5 @@ import logging +from enum import Enum from typing import Dict, Optional import requests @@ -10,6 +11,12 @@ REQUEST_TIMEOUT = 30 +class TopoItem(Enum): + ENDPOINTS = "endpoints" + GROUPS = "groups" + SERVICE_TYPES = "service-types" + + class ArgoWebApi: def __init__(self, config: ArgoConfig): @@ -90,6 +97,28 @@ def update_tenant_db_info( f"tenant: {tenant_name} ({tenant_id}) - web-api updating db conf updated" ) + def get_topology( + self, + tenant_id: str, + tenant_name: str, + tenant_access_token: str, + topology_item: TopoItem, + ): + """Retrieve topology items for specific tenant""" + logger.debug( + f"tenant: {tenant_name} ({tenant_id}) - retrieving report information from web-api..." + ) + url = f"https://{self.config.web_api_endpoint}/api/v2/topology/{topology_item.value}" + headers = { + "x-api-key": tenant_access_token, + "Accept": "application/json", + } + + response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT) + response.raise_for_status() + + return response.json().get("data") + def get_reports( self, tenant_id: str, @@ -109,8 +138,14 @@ def get_reports( response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT) response.raise_for_status() - results = response.json().get("data") - return {item["info"]["name"]: item["id"] for item in results} + return response.json().get("data") + + def get_report_ids( + self, tenant_id: str, tenant_name: str, tenant_access_token: str + ): + """Retrieve report names and ids for specific tenant""" + reports = self.get_reports(tenant_id, tenant_name, tenant_access_token) + return {item["info"]["name"]: item["id"] for item in reports} def create_ops_profile( self, @@ -147,6 +182,30 @@ def create_ops_profile( f"tenant: {tenant_name} ({tenant_id}) - web-api ops profile created" ) + def update_ready_state( + self, + tenant_id: str, + tenant_name: str, + payload: object, + ): + """Http call to web-api to update the readiness state for a specific tenant""" + logger.debug( + f"tenant: {tenant_name} ({tenant_id}) - web-api update readiness state..." + ) + + url = f"https://{self.config.web_api_endpoint}/api/v2/admin/tenants/{tenant_id}/ready" + headers = { + "x-api-key": self.config.web_api_token, + "Accept": "application/json", + } + + response = requests.put( + url, json=payload, headers=headers, timeout=REQUEST_TIMEOUT + ) + response.raise_for_status() + + logger.info(f"tenant: {tenant_name} ({tenant_id}) - web-api readiness updated") + def get_component_user( self, tenant_id: str, tenant_name: str, component: str ) -> Optional[Dict]: diff --git a/automation/check_readiness.py b/automation/check_readiness.py new file mode 100644 index 00000000..d52b12c1 --- /dev/null +++ b/automation/check_readiness.py @@ -0,0 +1,117 @@ +import datetime +import logging + +import requests + +from argo_config import ArgoConfig +from argo_web_api import ArgoWebApi, TopoItem + +REQUEST_TIMEOUT = 30 + +logger = logging.getLogger(__name__) + + +def check_hdfs(config: ArgoConfig, tenant_id: str, tenant_name: str) -> bool: + """Checks if data for today exist in hdfs tenant folders""" + + logger.debug( + f"tenant: {tenant_name} ({tenant_id}) - retrieving report information from web-api..." + ) + today = datetime.date.today().strftime("%Y-%m-%d") + url = f"{config.hdfs_check_path}/{tenant_name}/mdata/{today}?op=LISTSTATUS" + headers = { + "Accept": "application/json", + } + + try: + response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT) + response.raise_for_status() + + result = response.json().get("FileStatuses").get("FileStatus") + if len(result) > 0: + return True + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + logger.warning( + f"tenant: {tenant_name} ({tenant_id}) - tenant path not found in hdfs" + ) + return False + else: + raise + + return False + + +def check_readiness(config: ArgoConfig, tenant_id: str, tenant_name: str) -> object: + """Checks tenants readiness by doing web-api requests to see if topology and + reports are defined and also by checking if data are present both in ams and hdfs""" + + web_api = ArgoWebApi(config) + + # get access token from config file + tenant_token = config.tenants.get(tenant_name, {}).get("web_api_token") + + # check if topology exists + topology_ready = True + topology_msg = [] + topo_endpoints = web_api.get_topology( + tenant_id, tenant_name, tenant_token, TopoItem.ENDPOINTS + ) + topo_groups = web_api.get_topology( + tenant_id, tenant_name, tenant_token, TopoItem.GROUPS + ) + topo_service_types = web_api.get_topology( + tenant_id, tenant_name, tenant_token, TopoItem.SERVICE_TYPES + ) + + if len(topo_endpoints) > 0: + topology_msg.append("Topology endpoints are set.") + else: + topology_msg.append("Topology endpoints are missing!") + topology_ready = False + + if len(topo_groups) > 0: + topology_msg.append("Topology groups are set.") + else: + topology_msg.append("Topology groups are missing!") + topology_ready = False + + if len(topo_service_types) > 0: + topology_msg.append("Topology service-types are set.") + else: + topology_msg.append("Topology service-types are missing!") + topology_ready = False + + # check reports + reports_ready = True + reports_msg = "Tenant has at least one report" + + reports = web_api.get_reports(tenant_id, tenant_name, tenant_token) + + if len(reports) < 0: + reports_msg = "Tenant has no reports!" + + # check metric data in hdfs + hdfs_ready = True + hdfs_msg = "Tenant has metric data in HDFS for today" + hdfs_check = check_hdfs(config, tenant_id, tenant_name) + + if not hdfs_check: + hdfs_ready = False + hdfs_msg = "Tenant doesn't have metric data in HDFS for today!" + + # update the state + payload = { + "data": {"ready": hdfs_ready, "message": hdfs_msg}, + "topology": {"ready": topology_ready, "message": " ".join(topology_msg)}, + "reports": {"ready": reports_ready, "message": reports_msg}, + "last_check": datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y:%m:%dT%H:%M:%SZ" + ), + } + + # update the payload to web-api + result = web_api.update_ready_state(tenant_id, tenant_name, payload) + if result: + return True + return False diff --git a/automation/config.yml.example b/automation/config.yml.example index ed3e4c4f..926dd875 100644 --- a/automation/config.yml.example +++ b/automation/config.yml.example @@ -24,8 +24,10 @@ automation: run: hdfs_path: hdfs://localhost:9000/user/argo/test/tenants/ + hdfs_check_path: http://localhost:50070/user/argo/test/tenants flink_path: /opt/flink/bin/flink - batch_jar_path: /opt/argo/jars/multijob.jar + batch_jar_path: /opt/argo/jars/multijob.jar + ingest_jar_path: /opt/argo/jars/ingest.jar # The tenants section is automatically configured by engine - don't edit manually # diff --git a/automation/init_ams.py b/automation/init_ams.py index cc6e52d2..c6d255bc 100644 --- a/automation/init_ams.py +++ b/automation/init_ams.py @@ -1,11 +1,7 @@ import logging -from argo_ams_library import ( - AmsServiceException, - AmsUser, - AmsUserProject, - ArgoMessagingService, -) +from argo_ams_library import (AmsServiceException, AmsUser, AmsUserProject, + ArgoMessagingService) from argo_config import ArgoConfig diff --git a/automation/run_batch b/automation/run_batch index b7e8e1c8..d21a9712 100755 --- a/automation/run_batch +++ b/automation/run_batch @@ -61,7 +61,7 @@ def run_batch( def main(): - parser = argparse.ArgumentParser(description="Argo Automator") + parser = argparse.ArgumentParser(description="Argo Run Batch") parser.add_argument( "-c", "--config", @@ -141,7 +141,7 @@ def main(): ) if not report_id: # try to update the tenant report configuration - reports = web_api.get_reports( + reports = web_api.get_report_ids( tenant["id"], args.tenant, tenant["web_api_token"] ) config.set_tenant_reports(tenant["id"], args.tenant, reports) diff --git a/automation/run_ingest b/automation/run_ingest index 812074e8..3f5485e9 100755 --- a/automation/run_ingest +++ b/automation/run_ingest @@ -64,7 +64,7 @@ def run_ingest( def main(): - parser = argparse.ArgumentParser(description="Argo Automator") + parser = argparse.ArgumentParser(description="Argo Run Ingest") parser.add_argument( "-c", "--config",