Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion automation/argo_automator
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import requests
from argo_ams_library import ArgoMessagingService

from argo_config import ArgoConfig
from check_readiness import check_readiness
from init_ams import init_ams
from init_compute_engine import init_compute_engine
from init_mongo import init_mongo
Expand All @@ -31,6 +32,7 @@ class JobName(Enum):
INIT_MONGO = "INIT_MONGO"
INIT_AMS = "INIT_AMS"
INIT_COMPUTE_ENGINE = "INIT_COMPUTE_ENGINE"
CHECK_READINESS = "CHECK_READINESS"


class JobStatus(Enum):
Expand Down Expand Up @@ -178,7 +180,9 @@ class ArgoAutomator:
f"ams job {job_name} event for tenant: {tenant_name} ({tenant_id}) discarded"
)
return

if job_name == JobName.CHECK_READINESS.value:
self.executor.submit(self.job_check_readiness, tenant_id, tenant_name)
return
if job_name == JobName.INIT_MONGO.value:
self.executor.submit(self.job_init_mongo, tenant_id, tenant_name)
return
Expand All @@ -194,6 +198,54 @@ class ArgoAutomator:
except Exception as e:
logger.exception(f"Failed to process event: {e}")

def job_check_readiness(self, tenant_id: str, tenant_name: str):
"""Job placeholder to check readiness mongo"""
try:
logger.info(
f"job started: initialising check readiness tenant {tenant_name} with id: {tenant_id}"
)
# To do stuff here
self.mon_api.update_status(
tenant_id,
tenant_name,
JobName.CHECK_READINESS.value,
JobStatus.IN_PROGRESS.value,
"Running checks for tenant readiness",
)

# run the checks and collect information
job_done = check_readiness(
self.config,
tenant_id,
tenant_name,
)


if job_done:
self.mon_api.update_status(
tenant_id,
tenant_name,
JobName.CHECK_READINESS.value,
JobStatus.COMPLETED.value,
"Check readiness completed succesfully",
)
logger.info(
f"job completed: check_readiness for tenant {tenant_name}"
)
else:
self.mon_api.update_status(
tenant_id,
tenant_name,
JobName.CHECK_READINESS.value,
JobStatus.FAILED.value,
"Check readiness failed to complete!",
)
logger.error(f"job failed: check readiness for tenant {tenant_name}")
except Exception as e:
logger.exception(f"job failed for tenant {tenant_name}: {e}")



def job_init_mongo(self, tenant_id: str, tenant_name: str):
"""Job placeholder to init mongo"""
try:
Expand Down
1 change: 1 addition & 0 deletions automation/argo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, path: str):
self.web_api_token = automation.get("web_api_token")
self.default_ops_profile_file = automation.get("default_ops_profile_file")
self.hdfs_path = run.get("hdfs_path")
self.hdfs_check_path = run.get("hdfs_check_path")
self.flink_path = run.get("flink_path")
self.batch_jar_path = run.get("batch_jar_path")
self.ingest_jar_path = run.get("ingest_jar_path")
Expand Down
63 changes: 61 additions & 2 deletions automation/argo_web_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from enum import Enum
from typing import Dict, Optional

import requests
Expand All @@ -10,6 +11,12 @@
REQUEST_TIMEOUT = 30


class TopoItem(Enum):
ENDPOINTS = "endpoints"
GROUPS = "groups"
SERVICE_TYPES = "service-types"


class ArgoWebApi:

def __init__(self, config: ArgoConfig):
Expand Down Expand Up @@ -90,6 +97,28 @@ def update_tenant_db_info(
f"tenant: {tenant_name} ({tenant_id}) - web-api updating db conf updated"
)

def get_topology(
self,
tenant_id: str,
tenant_name: str,
tenant_access_token: str,
topology_item: TopoItem,
):
"""Retrieve topology items for specific tenant"""
logger.debug(
f"tenant: {tenant_name} ({tenant_id}) - retrieving report information from web-api..."
)
url = f"https://{self.config.web_api_endpoint}/api/v2/topology/{topology_item.value}"
headers = {
"x-api-key": tenant_access_token,
"Accept": "application/json",
}

response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
response.raise_for_status()

return response.json().get("data")

def get_reports(
self,
tenant_id: str,
Expand All @@ -109,8 +138,14 @@ def get_reports(
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
response.raise_for_status()

results = response.json().get("data")
return {item["info"]["name"]: item["id"] for item in results}
return response.json().get("data")

def get_report_ids(
self, tenant_id: str, tenant_name: str, tenant_access_token: str
):
"""Retrieve report names and ids for specific tenant"""
reports = self.get_reports(tenant_id, tenant_name, tenant_access_token)
return {item["info"]["name"]: item["id"] for item in reports}

def create_ops_profile(
self,
Expand Down Expand Up @@ -147,6 +182,30 @@ def create_ops_profile(
f"tenant: {tenant_name} ({tenant_id}) - web-api ops profile created"
)

def update_ready_state(
self,
tenant_id: str,
tenant_name: str,
payload: object,
):
"""Http call to web-api to update the readiness state for a specific tenant"""
logger.debug(
f"tenant: {tenant_name} ({tenant_id}) - web-api update readiness state..."
)

url = f"https://{self.config.web_api_endpoint}/api/v2/admin/tenants/{tenant_id}/ready"
headers = {
"x-api-key": self.config.web_api_token,
"Accept": "application/json",
}

response = requests.put(
url, json=payload, headers=headers, timeout=REQUEST_TIMEOUT
)
response.raise_for_status()

logger.info(f"tenant: {tenant_name} ({tenant_id}) - web-api readiness updated")

def get_component_user(
self, tenant_id: str, tenant_name: str, component: str
) -> Optional[Dict]:
Expand Down
117 changes: 117 additions & 0 deletions automation/check_readiness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import datetime
import logging

import requests

from argo_config import ArgoConfig
from argo_web_api import ArgoWebApi, TopoItem

REQUEST_TIMEOUT = 30

logger = logging.getLogger(__name__)


def check_hdfs(config: ArgoConfig, tenant_id: str, tenant_name: str) -> bool:
"""Checks if data for today exist in hdfs tenant folders"""

logger.debug(
f"tenant: {tenant_name} ({tenant_id}) - retrieving report information from web-api..."
)
today = datetime.date.today().strftime("%Y-%m-%d")
url = f"{config.hdfs_check_path}/{tenant_name}/mdata/{today}?op=LISTSTATUS"
headers = {
"Accept": "application/json",
}

try:
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
response.raise_for_status()

result = response.json().get("FileStatuses").get("FileStatus")
if len(result) > 0:
return True
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
logger.warning(
f"tenant: {tenant_name} ({tenant_id}) - tenant path not found in hdfs"
)
return False
else:
raise

return False


def check_readiness(config: ArgoConfig, tenant_id: str, tenant_name: str) -> object:
"""Checks tenants readiness by doing web-api requests to see if topology and
reports are defined and also by checking if data are present both in ams and hdfs"""

web_api = ArgoWebApi(config)

# get access token from config file
tenant_token = config.tenants.get(tenant_name, {}).get("web_api_token")

# check if topology exists
topology_ready = True
topology_msg = []
topo_endpoints = web_api.get_topology(
tenant_id, tenant_name, tenant_token, TopoItem.ENDPOINTS
)
topo_groups = web_api.get_topology(
tenant_id, tenant_name, tenant_token, TopoItem.GROUPS
)
topo_service_types = web_api.get_topology(
tenant_id, tenant_name, tenant_token, TopoItem.SERVICE_TYPES
)

if len(topo_endpoints) > 0:
topology_msg.append("Topology endpoints are set.")
else:
topology_msg.append("Topology endpoints are missing!")
topology_ready = False

if len(topo_groups) > 0:
topology_msg.append("Topology groups are set.")
else:
topology_msg.append("Topology groups are missing!")
topology_ready = False

if len(topo_service_types) > 0:
topology_msg.append("Topology service-types are set.")
else:
topology_msg.append("Topology service-types are missing!")
topology_ready = False

# check reports
reports_ready = True
reports_msg = "Tenant has at least one report"

reports = web_api.get_reports(tenant_id, tenant_name, tenant_token)

if len(reports) < 0:
reports_msg = "Tenant has no reports!"

# check metric data in hdfs
hdfs_ready = True
hdfs_msg = "Tenant has metric data in HDFS for today"
hdfs_check = check_hdfs(config, tenant_id, tenant_name)

if not hdfs_check:
hdfs_ready = False
hdfs_msg = "Tenant doesn't have metric data in HDFS for today!"

# update the state
payload = {
"data": {"ready": hdfs_ready, "message": hdfs_msg},
"topology": {"ready": topology_ready, "message": " ".join(topology_msg)},
"reports": {"ready": reports_ready, "message": reports_msg},
"last_check": datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y:%m:%dT%H:%M:%SZ"
),
}

# update the payload to web-api
result = web_api.update_ready_state(tenant_id, tenant_name, payload)
if result:
return True
return False
4 changes: 3 additions & 1 deletion automation/config.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ automation:

run:
hdfs_path: hdfs://localhost:9000/user/argo/test/tenants/
hdfs_check_path: http://localhost:50070/user/argo/test/tenants
flink_path: /opt/flink/bin/flink
batch_jar_path: /opt/argo/jars/multijob.jar
batch_jar_path: /opt/argo/jars/multijob.jar
ingest_jar_path: /opt/argo/jars/ingest.jar

# The tenants section is automatically configured by engine - don't edit manually
#
Expand Down
8 changes: 2 additions & 6 deletions automation/init_ams.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import logging

from argo_ams_library import (
AmsServiceException,
AmsUser,
AmsUserProject,
ArgoMessagingService,
)
from argo_ams_library import (AmsServiceException, AmsUser, AmsUserProject,
ArgoMessagingService)

from argo_config import ArgoConfig

Expand Down
4 changes: 2 additions & 2 deletions automation/run_batch
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def run_batch(


def main():
parser = argparse.ArgumentParser(description="Argo Automator")
parser = argparse.ArgumentParser(description="Argo Run Batch")
parser.add_argument(
"-c",
"--config",
Expand Down Expand Up @@ -141,7 +141,7 @@ def main():
)
if not report_id:
# try to update the tenant report configuration
reports = web_api.get_reports(
reports = web_api.get_report_ids(
tenant["id"], args.tenant, tenant["web_api_token"]
)
config.set_tenant_reports(tenant["id"], args.tenant, reports)
Expand Down
2 changes: 1 addition & 1 deletion automation/run_ingest
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def run_ingest(


def main():
parser = argparse.ArgumentParser(description="Argo Automator")
parser = argparse.ArgumentParser(description="Argo Run Ingest")
parser.add_argument(
"-c",
"--config",
Expand Down
Loading