Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions automation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
## Automation folder

This folder contains automation components for managing automation jobs and submitting computation jobs in Argo Monitoring. The main component is `argo_automator`, a daemon that listens for events on AMS and executes triggered jobs.

### Getting started

First, set up a Python virtual environment:
```bash
python -m venv ./argo-venv
source ./argo-venv/bin/activate
pip install -r requirements.txt
```

**Requirements:**
- Python 3.9+
- Dependencies: requests, argo-ams-library, pyyaml, pymongo

### Running the automator daemon

Start the automator with:
```bash
./argo_automator
```

By default it looks for `.config.yml` in the current directory. To specify a different config file:
```bash
./argo_automator -c /path/to/config.yml
```

See `config.yml.example` for configuration details.

### Job submission scripts

#### Ingest job
Submit an ingestion job for a tenant:
```bash
./run_ingest -t TENANTFOO
```

**Options:**
- `-c /path/to/config.yml` - Specify config file (default: `.config.yml`)
- `--no-verify` - Skip verification of remote endpoints like AMS
- `--dry-run` - Preview what would be submitted without executing
- `--log-level DEBUG` - Adjust logging verbosity

#### Batch job
Submit a batch computation job (calculates AR, status, and trends):
```bash
./run_batch -t TENANTFOO -r Default
```

**Options:**
- `-d 2025-05-05` - Specify date (default: current day)
- `-c /path/to/config.yml` - Specify config file
- `--dry-run` - Preview submission
- `--log-level DEBUG` - Adjust logging verbosity
File renamed without changes.
22 changes: 19 additions & 3 deletions automation/argo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ def __init__(self, path: str):
exit(1)
automation = config_data.get("automation", {})
tenants = config_data.get("tenants", {})
run = config_data.get("run", {})

self.path = path
self.automation = automation
self.tenants = tenants
self.run = run
self.ams_endpoint = automation.get("ams_endpoint")
self.ams_event_token = automation.get("ams_event_token")
self.ams_event_project = automation.get("ams_event_project")
Expand All @@ -35,11 +37,19 @@ def __init__(self, path: str):
self.web_api_endpoint = automation.get("web_api_endpoint")
self.web_api_token = automation.get("web_api_token")
self.default_ops_profile_file = automation.get("default_ops_profile_file")
self.hdfs_path = run.get("hdfs_path")
self.flink_path = run.get("flink_path")
self.batch_jar_path = run.get("batch_jar_path")
self.ingest_jar_path = run.get("ingest_jar_path")

def save(self) -> None:
"""Save current configuration back to yaml file"""
with open(self.path, "w") as f:
data = {"automation": self.automation, "tenants": self.tenants}
data = {
"automation": self.automation,
"run": self.run,
"tenants": self.tenants,
}
yaml.dump(data, f, default_flow_style=False, sort_keys=False)
logger.info("engine config - saved to disk")

Expand All @@ -49,6 +59,12 @@ def set_tenant_web_api_access(self, tenant_id, tenant_name, web_api_token):
logger.info(f"engine config - tenant {tenant_name} web_api_token prop set")
self.save()

def set_tenant_reports(self, tenant_id: str, tenant_name: str, reports: dict):
self.ensure_tenant(tenant_id, tenant_name)
self.tenants.get(tenant_name)["reports"] = reports
logger.info(f"engine config - tenant {tenant_name} reports prop set")
self.save()

def set_tenant_ams_access(self, tenant_id, tenant_name, ams_token):
self.ensure_tenant(tenant_id, tenant_name)
self.tenants.get(tenant_name)["ams_token"] = ams_token
Expand All @@ -58,10 +74,10 @@ def set_tenant_ams_access(self, tenant_id, tenant_name, ams_token):
def ensure_tenant(self, tenant_id, tenant_name):
cur_tenant = self.tenants.get(tenant_name)
if not cur_tenant:
self.tenants[tenant_name] = {"tenant_id": tenant_id}
self.tenants[tenant_name] = {"id": tenant_id}
logger.info(f"engine config - tenant {tenant_name} definition created")
return
cur_tenant_id = self.tenants.get("tenant_id")
cur_tenant_id = cur_tenant.get("id")
if not cur_tenant_id or cur_tenant_id != tenant_id:
cur_tenant["tenant_id"] = tenant_id
logger.info(f"engine config - tenant {tenant_name} tenant_id prop set")
35 changes: 27 additions & 8 deletions automation/argo_web_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@ def __init__(self, config: ArgoConfig):
self.config = config

def create_user(
self,
tenant_id: str,
tenant_name: str,
username: str,
role: str,
component: str
self, tenant_id: str, tenant_name: str, username: str, role: str, component: str
):
"""Http call to web-api to create a user"""
logger.debug(
Expand All @@ -32,7 +27,7 @@ def create_user(
"name": username,
"email": self.config.argo_ops_email,
"roles": [role],
"component": component
"component": component,
}

url = f"https://{self.config.web_api_endpoint}/api/v2/admin/tenants/{tenant_id}/users"
Expand Down Expand Up @@ -95,6 +90,28 @@ def update_tenant_db_info(
f"tenant: {tenant_name} ({tenant_id}) - web-api updating db conf updated"
)

def get_reports(
self,
tenant_id: str,
tenant_name: str,
tenant_access_token: str,
):
"""Retrieve report names and report ids for specific tenant"""
logger.debug(
f"tenant: {tenant_name} ({tenant_id}) - retrieving report information from web-api..."
)
url = f"https://{self.config.web_api_endpoint}/api/v2/reports"
headers = {
"x-api-key": tenant_access_token,
"Accept": "application/json",
}

response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
response.raise_for_status()

results = response.json().get("data")
return {item["info"]["name"]: item["id"] for item in results}

def create_ops_profile(
self,
tenant_id: str,
Expand Down Expand Up @@ -148,7 +165,9 @@ def get_component_user(

users = response.json().get("data")
if users:
return next((user for user in users if user.get("component") == component), None)
return next(
(user for user in users if user.get("component") == component), None
)
return None

def get_user(self, tenant_id: str, tenant_name: str, user_id: str):
Expand Down
5 changes: 5 additions & 0 deletions automation/config.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ automation:
argo_ops_email: argo_ops@localhost
default_ops_profile_file: ./default.ops.json

run:
hdfs_path: hdfs://localhost:9000/user/argo/test/tenants/
flink_path: /opt/flink/bin/flink
batch_jar_path: /opt/argo/jars/multijob.jar

# The tenants section is automatically configured by engine - don't edit manually
#
# tenants:
Expand Down
8 changes: 6 additions & 2 deletions automation/init_ams.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import logging

from argo_ams_library import (AmsServiceException, AmsUser, AmsUserProject,
ArgoMessagingService)
from argo_ams_library import (
AmsServiceException,
AmsUser,
AmsUserProject,
ArgoMessagingService,
)

from argo_config import ArgoConfig

Expand Down
4 changes: 3 additions & 1 deletion automation/init_compute_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ def init_compute_engine(

else:
# create the user
user = web_api.create_user(tenant_id, tenant_name, username, role, component)
user = web_api.create_user(
tenant_id, tenant_name, username, role, component
)
if user and username == engine_username:
engine_user_key = user.get("api_key")

Expand Down
6 changes: 3 additions & 3 deletions automation/init_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def init_mongo(
("status_services", index_report_dateint),
("threshold_profiles", index_desc_dateint_id),
("weights", index_desc_dateint_id),
("topology_endpoints",index_desc_dateint_id),
("topology_groups",index_desc_dateint_id),
("topology_service_types",index_desc_dateint_name)
("topology_endpoints", index_desc_dateint_id),
("topology_groups", index_desc_dateint_id),
("topology_service_types", index_desc_dateint_name),
]

for collection_name, index_type in indexes:
Expand Down
165 changes: 165 additions & 0 deletions automation/run_batch
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#!/usr/bin/env python3

import argparse
import logging
import subprocess
import sys
from datetime import datetime, timedelta

from argo_config import ArgoConfig
from argo_web_api import ArgoWebApi

logger = logging.getLogger(__name__)


def run_batch(
config: ArgoConfig,
tenant_name: str,
report_id: str,
cur_date_str: str,
prev_date_str: str,
tenant_web_api_token: str,
dry_run: bool,
):
"""Function that composes the appropriate cli command to submit a batch job execution in flink"""

cmd = [
config.flink_path,
"run",
"-c",
"argo.batch.ArgoMultiJob",
config.batch_jar_path,
"--run.date",
cur_date_str,
"--mongo.uri",
f"{config.mongodb_url}/{config.tenant_db_prefix}{tenant_name}",
"--pdata",
f"{config.hdfs_path}/{tenant_name}/mdata/{prev_date_str}",
"--mdata",
f"{config.hdfs_path}/{tenant_name}/mdata/{cur_date_str}",
"--api.endpoint",
config.web_api_endpoint,
"--mongo.method",
"insert",
"--clear.mongo",
"true",
"--api.token",
tenant_web_api_token,
"--report.id",
report_id,
]

if dry_run:
print(("\033[92m" + " ".join(str(x) for x in cmd) + "\033[0m"))
return 0
else:
try:
subprocess.run(cmd, check=True)
except Exception as e:
logger.error(f"Batch Job Error: {e}")
return 1


def main():
parser = argparse.ArgumentParser(description="Argo Automator")
parser.add_argument(
"-c",
"--config",
default="config.yml",
help="Path to configuration file (default: config.yml)",
)
parser.add_argument(
"-l",
"--log-level",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set logging level (default: INFO)",
)
parser.add_argument(
"-d",
"--date",
type=str,
help="set computation date as YYYY-MM-DD",
)
parser.add_argument(
"-t",
"--tenant",
required=True,
type=str,
help="select tenant name for computation",
)
parser.add_argument(
"-r",
"--report",
required=True,
type=str,
help="select report name for computation",
)
parser.add_argument(
"--dry-run",
help="Runs in test mode without actually submitting the job",
action="store_true",
dest="dry_run",
)

args = parser.parse_args()

logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s - %(levelname)s - %(message)s",
)

config = ArgoConfig(args.config)

if not config.flink_path:
logging.error(
f"You need to set-up the run.flink_path parameter in your configuration file {args.config}"
)
return 1

if not config.batch_jar_path:
logging.error(
f"You need to set-up the run.batch_jar_path parameter in your configuration file {args.config}"
)
return 1

web_api = ArgoWebApi(config)

ref_date = datetime.strptime(args.date, "%Y-%m-%d") if args.date else datetime.now()
prev_date_str = (ref_date - timedelta(days=1)).strftime("%Y-%m-%d")
cur_date_str = ref_date.strftime("%Y-%m-%d")

# check if tenant exists in config
tenant = config.tenants.get(args.tenant, {})
if not tenant or not tenant.get("id", {}) or not tenant.get("web_api_token", {}):
logging.error(f"Tenant {args.tenant} not configured in compute engine")
return 1

# check if report exists
report_id = (
config.tenants.get(args.tenant, {}).get("reports", {}).get(args.report, {})
)
if not report_id:
# try to update the tenant report configuration
reports = web_api.get_reports(
tenant["id"], args.tenant, tenant["web_api_token"]
)
config.set_tenant_reports(tenant["id"], args.tenant, reports)

# check again if report is included in the updated web-api result
report_id = (
config.tenants.get(args.tenant, {}).get("reports", {}).get(args.report, {})
)
if not report_id:
logging.error(
f"Tenant {args.tenant} does not include a report named: {args.report}"
)
return 1

return run_batch(
config, args.tenant, report_id, cur_date_str, prev_date_str, tenant.get("web_api_token"), args.dry_run
)


if __name__ == "__main__":
sys.exit(main())
Loading
Loading