diff --git a/automation/README.md b/automation/README.md new file mode 100644 index 00000000..499d4a29 --- /dev/null +++ b/automation/README.md @@ -0,0 +1,56 @@ +## Automation folder + +This folder contains automation components for managing automation jobs and submitting computation jobs in Argo Monitoring. The main component is `argo_automator`, a daemon that listens for events on AMS and executes triggered jobs. + +### Getting started + +First, set up a Python virtual environment: +```bash +python -m venv ./argo-venv +source ./argo-venv/bin/activate +pip install -r requirements.txt +``` + +**Requirements:** +- Python 3.9+ +- Dependencies: requests, argo-ams-library, pyyaml, pymongo + +### Running the automator daemon + +Start the automator with: +```bash +./argo_automator +``` + +By default it looks for `.config.yml` in the current directory. To specify a different config file: +```bash +./argo_automator -c /path/to/config.yml +``` + +See `config.yml.example` for configuration details. + +### Job submission scripts + +#### Ingest job +Submit an ingestion job for a tenant: +```bash +./run_ingest -t TENANTFOO +``` + +**Options:** +- `-c /path/to/config.yml` - Specify config file (default: `.config.yml`) +- `--no-verify` - Skip verification of remote endpoints like AMS +- `--dry-run` - Preview what would be submitted without executing +- `--log-level DEBUG` - Adjust logging verbosity + +#### Batch job +Submit a batch computation job (calculates AR, status, and trends): +```bash +./run_batch -t TENANTFOO -r Default +``` + +**Options:** +- `-d 2025-05-05` - Specify date (default: current day) +- `-c /path/to/config.yml` - Specify config file +- `--dry-run` - Preview submission +- `--log-level DEBUG` - Adjust logging verbosity \ No newline at end of file diff --git a/automation/argo_automator.py b/automation/argo_automator similarity index 100% rename from automation/argo_automator.py rename to automation/argo_automator diff --git a/automation/argo_config.py b/automation/argo_config.py index 78eecea9..7eaf5412 100644 --- a/automation/argo_config.py +++ b/automation/argo_config.py @@ -16,10 +16,12 @@ def __init__(self, path: str): exit(1) automation = config_data.get("automation", {}) tenants = config_data.get("tenants", {}) + run = config_data.get("run", {}) self.path = path self.automation = automation self.tenants = tenants + self.run = run self.ams_endpoint = automation.get("ams_endpoint") self.ams_event_token = automation.get("ams_event_token") self.ams_event_project = automation.get("ams_event_project") @@ -35,11 +37,19 @@ def __init__(self, path: str): self.web_api_endpoint = automation.get("web_api_endpoint") self.web_api_token = automation.get("web_api_token") self.default_ops_profile_file = automation.get("default_ops_profile_file") + self.hdfs_path = run.get("hdfs_path") + self.flink_path = run.get("flink_path") + self.batch_jar_path = run.get("batch_jar_path") + self.ingest_jar_path = run.get("ingest_jar_path") def save(self) -> None: """Save current configuration back to yaml file""" with open(self.path, "w") as f: - data = {"automation": self.automation, "tenants": self.tenants} + data = { + "automation": self.automation, + "run": self.run, + "tenants": self.tenants, + } yaml.dump(data, f, default_flow_style=False, sort_keys=False) logger.info("engine config - saved to disk") @@ -49,6 +59,12 @@ def set_tenant_web_api_access(self, tenant_id, tenant_name, web_api_token): logger.info(f"engine config - tenant {tenant_name} web_api_token prop set") self.save() + def set_tenant_reports(self, tenant_id: str, tenant_name: str, reports: dict): + self.ensure_tenant(tenant_id, tenant_name) + self.tenants.get(tenant_name)["reports"] = reports + logger.info(f"engine config - tenant {tenant_name} reports prop set") + self.save() + def set_tenant_ams_access(self, tenant_id, tenant_name, ams_token): self.ensure_tenant(tenant_id, tenant_name) self.tenants.get(tenant_name)["ams_token"] = ams_token @@ -58,10 +74,10 @@ def set_tenant_ams_access(self, tenant_id, tenant_name, ams_token): def ensure_tenant(self, tenant_id, tenant_name): cur_tenant = self.tenants.get(tenant_name) if not cur_tenant: - self.tenants[tenant_name] = {"tenant_id": tenant_id} + self.tenants[tenant_name] = {"id": tenant_id} logger.info(f"engine config - tenant {tenant_name} definition created") return - cur_tenant_id = self.tenants.get("tenant_id") + cur_tenant_id = cur_tenant.get("id") if not cur_tenant_id or cur_tenant_id != tenant_id: cur_tenant["tenant_id"] = tenant_id logger.info(f"engine config - tenant {tenant_name} tenant_id prop set") diff --git a/automation/argo_web_api.py b/automation/argo_web_api.py index e164b9ab..258da69c 100644 --- a/automation/argo_web_api.py +++ b/automation/argo_web_api.py @@ -16,12 +16,7 @@ def __init__(self, config: ArgoConfig): self.config = config def create_user( - self, - tenant_id: str, - tenant_name: str, - username: str, - role: str, - component: str + self, tenant_id: str, tenant_name: str, username: str, role: str, component: str ): """Http call to web-api to create a user""" logger.debug( @@ -32,7 +27,7 @@ def create_user( "name": username, "email": self.config.argo_ops_email, "roles": [role], - "component": component + "component": component, } url = f"https://{self.config.web_api_endpoint}/api/v2/admin/tenants/{tenant_id}/users" @@ -95,6 +90,28 @@ def update_tenant_db_info( f"tenant: {tenant_name} ({tenant_id}) - web-api updating db conf updated" ) + def get_reports( + self, + tenant_id: str, + tenant_name: str, + tenant_access_token: str, + ): + """Retrieve report names and report ids for specific tenant""" + logger.debug( + f"tenant: {tenant_name} ({tenant_id}) - retrieving report information from web-api..." + ) + url = f"https://{self.config.web_api_endpoint}/api/v2/reports" + headers = { + "x-api-key": tenant_access_token, + "Accept": "application/json", + } + + response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT) + response.raise_for_status() + + results = response.json().get("data") + return {item["info"]["name"]: item["id"] for item in results} + def create_ops_profile( self, tenant_id: str, @@ -148,7 +165,9 @@ def get_component_user( users = response.json().get("data") if users: - return next((user for user in users if user.get("component") == component), None) + return next( + (user for user in users if user.get("component") == component), None + ) return None def get_user(self, tenant_id: str, tenant_name: str, user_id: str): diff --git a/automation/config.yml.example b/automation/config.yml.example index a7af11f2..ed3e4c4f 100644 --- a/automation/config.yml.example +++ b/automation/config.yml.example @@ -22,6 +22,11 @@ automation: argo_ops_email: argo_ops@localhost default_ops_profile_file: ./default.ops.json +run: + hdfs_path: hdfs://localhost:9000/user/argo/test/tenants/ + flink_path: /opt/flink/bin/flink + batch_jar_path: /opt/argo/jars/multijob.jar + # The tenants section is automatically configured by engine - don't edit manually # # tenants: diff --git a/automation/init_ams.py b/automation/init_ams.py index c6d255bc..cc6e52d2 100644 --- a/automation/init_ams.py +++ b/automation/init_ams.py @@ -1,7 +1,11 @@ import logging -from argo_ams_library import (AmsServiceException, AmsUser, AmsUserProject, - ArgoMessagingService) +from argo_ams_library import ( + AmsServiceException, + AmsUser, + AmsUserProject, + ArgoMessagingService, +) from argo_config import ArgoConfig diff --git a/automation/init_compute_engine.py b/automation/init_compute_engine.py index 25f89a5d..cb1f97e4 100644 --- a/automation/init_compute_engine.py +++ b/automation/init_compute_engine.py @@ -48,7 +48,9 @@ def init_compute_engine( else: # create the user - user = web_api.create_user(tenant_id, tenant_name, username, role, component) + user = web_api.create_user( + tenant_id, tenant_name, username, role, component + ) if user and username == engine_username: engine_user_key = user.get("api_key") diff --git a/automation/init_mongo.py b/automation/init_mongo.py index ac976aef..bc8f751c 100644 --- a/automation/init_mongo.py +++ b/automation/init_mongo.py @@ -46,9 +46,9 @@ def init_mongo( ("status_services", index_report_dateint), ("threshold_profiles", index_desc_dateint_id), ("weights", index_desc_dateint_id), - ("topology_endpoints",index_desc_dateint_id), - ("topology_groups",index_desc_dateint_id), - ("topology_service_types",index_desc_dateint_name) + ("topology_endpoints", index_desc_dateint_id), + ("topology_groups", index_desc_dateint_id), + ("topology_service_types", index_desc_dateint_name), ] for collection_name, index_type in indexes: diff --git a/automation/run_batch b/automation/run_batch new file mode 100755 index 00000000..b7e8e1c8 --- /dev/null +++ b/automation/run_batch @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 + +import argparse +import logging +import subprocess +import sys +from datetime import datetime, timedelta + +from argo_config import ArgoConfig +from argo_web_api import ArgoWebApi + +logger = logging.getLogger(__name__) + + +def run_batch( + config: ArgoConfig, + tenant_name: str, + report_id: str, + cur_date_str: str, + prev_date_str: str, + tenant_web_api_token: str, + dry_run: bool, +): + """Function that composes the appropriate cli command to submit a batch job execution in flink""" + + cmd = [ + config.flink_path, + "run", + "-c", + "argo.batch.ArgoMultiJob", + config.batch_jar_path, + "--run.date", + cur_date_str, + "--mongo.uri", + f"{config.mongodb_url}/{config.tenant_db_prefix}{tenant_name}", + "--pdata", + f"{config.hdfs_path}/{tenant_name}/mdata/{prev_date_str}", + "--mdata", + f"{config.hdfs_path}/{tenant_name}/mdata/{cur_date_str}", + "--api.endpoint", + config.web_api_endpoint, + "--mongo.method", + "insert", + "--clear.mongo", + "true", + "--api.token", + tenant_web_api_token, + "--report.id", + report_id, + ] + + if dry_run: + print(("\033[92m" + " ".join(str(x) for x in cmd) + "\033[0m")) + return 0 + else: + try: + subprocess.run(cmd, check=True) + except Exception as e: + logger.error(f"Batch Job Error: {e}") + return 1 + + +def main(): + parser = argparse.ArgumentParser(description="Argo Automator") + parser.add_argument( + "-c", + "--config", + default="config.yml", + help="Path to configuration file (default: config.yml)", + ) + parser.add_argument( + "-l", + "--log-level", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="Set logging level (default: INFO)", + ) + parser.add_argument( + "-d", + "--date", + type=str, + help="set computation date as YYYY-MM-DD", + ) + parser.add_argument( + "-t", + "--tenant", + required=True, + type=str, + help="select tenant name for computation", + ) + parser.add_argument( + "-r", + "--report", + required=True, + type=str, + help="select report name for computation", + ) + parser.add_argument( + "--dry-run", + help="Runs in test mode without actually submitting the job", + action="store_true", + dest="dry_run", + ) + + args = parser.parse_args() + + logging.basicConfig( + level=getattr(logging, args.log_level), + format="%(asctime)s - %(levelname)s - %(message)s", + ) + + config = ArgoConfig(args.config) + + if not config.flink_path: + logging.error( + f"You need to set-up the run.flink_path parameter in your configuration file {args.config}" + ) + return 1 + + if not config.batch_jar_path: + logging.error( + f"You need to set-up the run.batch_jar_path parameter in your configuration file {args.config}" + ) + return 1 + + web_api = ArgoWebApi(config) + + ref_date = datetime.strptime(args.date, "%Y-%m-%d") if args.date else datetime.now() + prev_date_str = (ref_date - timedelta(days=1)).strftime("%Y-%m-%d") + cur_date_str = ref_date.strftime("%Y-%m-%d") + + # check if tenant exists in config + tenant = config.tenants.get(args.tenant, {}) + if not tenant or not tenant.get("id", {}) or not tenant.get("web_api_token", {}): + logging.error(f"Tenant {args.tenant} not configured in compute engine") + return 1 + + # check if report exists + report_id = ( + config.tenants.get(args.tenant, {}).get("reports", {}).get(args.report, {}) + ) + if not report_id: + # try to update the tenant report configuration + reports = web_api.get_reports( + tenant["id"], args.tenant, tenant["web_api_token"] + ) + config.set_tenant_reports(tenant["id"], args.tenant, reports) + + # check again if report is included in the updated web-api result + report_id = ( + config.tenants.get(args.tenant, {}).get("reports", {}).get(args.report, {}) + ) + if not report_id: + logging.error( + f"Tenant {args.tenant} does not include a report named: {args.report}" + ) + return 1 + + return run_batch( + config, args.tenant, report_id, cur_date_str, prev_date_str, tenant.get("web_api_token"), args.dry_run + ) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/automation/run_ingest b/automation/run_ingest new file mode 100755 index 00000000..812074e8 --- /dev/null +++ b/automation/run_ingest @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 + +import argparse +import logging +import subprocess +import sys + +from argo_config import ArgoConfig + +logger = logging.getLogger(__name__) + + +def run_ingest( + config: ArgoConfig, + tenant_name: str, + tenant_ams_token: str, + dry_run: bool, + verify: str, +): + """Function that composes the appropriate cli command to submit an ingest job execution in flink""" + + cmd = [ + config.flink_path, + "run", + "--detached", + "-c", + "argo.streaming.AmsIngestMetric", + config.ingest_jar_path, + "--ams.endpoint", + config.ams_endpoint, + "--ams.port", + "443", + "--ams.token", + tenant_ams_token, + "--ams.project", + tenant_name, + "--ams.sub", + "ingest_metric", + "--hdfs.path", + f"{config.hdfs_path}/{tenant_name}/mdata", + "--check.path", + f"{config.hdfs_path}/{tenant_name}/check", + "--check.interval", + "3000", + "--ams.interval", + "300", + "--ams.batch", + "100", + "--ams.verify", + verify, + "--tenant", + tenant_name, + ] + + if dry_run: + print(("\033[92m" + " ".join(str(x) for x in cmd) + "\033[0m")) + return 0 + else: + try: + subprocess.run(cmd, check=True) + except Exception as e: + logger.error(f"Batch Job Error: {e}") + return 1 + + +def main(): + parser = argparse.ArgumentParser(description="Argo Automator") + parser.add_argument( + "-c", + "--config", + default="config.yml", + help="Path to configuration file (default: config.yml)", + ), + parser.add_argument( + "-l", + "--log-level", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="Set logging level (default: INFO)", + ) + parser.add_argument( + "-t", + "--tenant", + required=True, + type=str, + help="select tenant name for computation", + ) + parser.add_argument( + "--dry-run", + help="Runs in test mode without actually submitting the job", + action="store_true", + dest="dry_run", + ) + parser.add_argument( + "--no-verify", + action="store_const", + const="false", + default="true", + dest="verify", + help="Disable verification", + ) + + args = parser.parse_args() + + logging.basicConfig( + level=getattr(logging, args.log_level), + format="%(asctime)s - %(levelname)s - %(message)s", + ) + + config = ArgoConfig(args.config) + + if not config.hdfs_path: + logging.error( + f"You need to set-up the run.hdfs_path parameter in your configuration file {args.config}" + ) + return 1 + + if not config.ingest_jar_path: + logging.error( + f"You need to set-up the run.ingest_jar_path parameter in your configuration file {args.config}" + ) + return 1 + + # check if tenant exists in config + tenant = config.tenants.get(args.tenant, {}) + if not tenant or not tenant.get("id", {}) or not tenant.get("ams_token"): + logging.error(f"Tenant {args.tenant} not configured in compute engine") + return 1 + + return run_ingest( + config, args.tenant, args.dry_run, tenant.get("ams_token"), args.verify + ) + + +if __name__ == "__main__": + sys.exit(main())