Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1. Add support for ISPSS login
2. Error handling
3. Install over existing connector
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .main import main
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
import logging
import os
import time
from datetime import datetime, timedelta

from typing import Optional

import requests

from .pyepm import (
get_admin_audit_events,
get_aggregated_events,
get_aggregated_policy_audits,
get_detailed_raw_events,
get_policy_audit_raw_event_details,
get_sets_list,
)
from .storage import AzureBlobStorage, LocalStorage


def _get_env(*names: str, default=None):
for name in names:
value = os.environ.get(name)
if value is not None and value != '':
return value
return default


client_id = _get_env('OAUTH_USERNAME', 'OAuthUsername')
client_secret = _get_env('OAUTH_PASSWORD', 'OAuthPassword')
identity_endpoint = _get_env('IDENTITY_ENDPOINT', 'IdentityEndpoint')
epm_host = _get_env('EPM_HOST', 'EPMHost')
webapp_id = _get_env('WEBAPP_ID', 'WebAppID')

fetch_interval_minutes = int(_get_env('FETCH_INTERVAL', 'FetchInterval', default='60'))

storage = LocalStorage() if _get_env('STORAGE', 'Storage') == 'LocalStorage' else AzureBlobStorage()

TOKEN_FILE_NAME = 'token.json'
EPM_TENANT_URL_FILE_NAME = 'epm_tenant_url.json'
TIME_FRAME_FILE_NAME = 'time_frame.json'


def _is_token_expired(token: dict) -> bool:
timestamp = int(token.get('timestamp', 0))
expiration = int(token.get('expires_in', 0))
return timestamp + expiration <= int(time.time())


def _get_oauth_token() -> Optional[str]:
if not (client_id and client_secret and identity_endpoint and webapp_id):
logging.error('Missing OAuth2 configuration environment variables')
return None

url = f'{identity_endpoint}/oauth2/token/{webapp_id}'
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
body = {
'grant_type': 'client_credentials',
'client_id': client_id,
'client_secret': client_secret
}

token_data = storage.load(file_name=TOKEN_FILE_NAME)
if token_data and not _is_token_expired(token_data):
return token_data.get('token')
if token_data:
logging.warning('Stored token expired')

try:
logging.info('Creating new token')
response = requests.post(url=url, headers=headers, data=body)
res_content = response.json()
if 200 <= response.status_code <= 299:
expiration = res_content['expires_in']
token = res_content['access_token']
storage.save(
data={'token': token, 'expiration': expiration, 'timestamp': int(time.time())},
file_name=TOKEN_FILE_NAME,
)
return token
if response.status_code == 400:
logging.error(f"{res_content.get('error')} {res_content.get('error_description')}")
else:
logging.error(f'error during access token negotiation: {response.status_code} {response.text}')
except Exception as err:
logging.error(f'Something went wrong. Exception error text: {err}')
return None


def _get_time_window() -> tuple[str, str]:
current_time = datetime.utcnow().replace(second=0, microsecond=0) - timedelta(minutes=10)
current_time_str = current_time.strftime('%Y-%m-%dT%H:%M:%SZ')

saved = storage.load(file_name=TIME_FRAME_FILE_NAME) or {}
last_end_str = saved.get('last_end_time')

if last_end_str:
try:
start_time_dt = datetime.strptime(last_end_str, '%Y-%m-%dT%H:%M:%SZ')
except Exception:
logging.warning('Invalid last_end_time in storage. Falling back to configured fetch interval.')
start_time_dt = current_time - timedelta(minutes=fetch_interval_minutes)
else:
start_time_dt = current_time - timedelta(minutes=fetch_interval_minutes)

start_time_str = start_time_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
storage.save(data={'last_end_time': current_time_str}, file_name=TIME_FRAME_FILE_NAME)
return start_time_str, current_time_str


def _fetch_set_events(fetch_func, dispatcher_url: str, token: str, filter_date: str, set_id: dict, next_cursor: str = 'start') -> list:
response_json = fetch_func(
epm_server=dispatcher_url,
epm_token=token,
set_id=set_id['Id'],
data=filter_date,
next_cursor=next_cursor,
).json()

if isinstance(response_json, list):
return []

events = response_json.get('events') or []
cursor = response_json.get('nextCursor')
if cursor:
events += _fetch_set_events(fetch_func, dispatcher_url=dispatcher_url, token=token, filter_date=filter_date, set_id=set_id, next_cursor=cursor)
return [e | {"SetName": set_id.get("Name")} for e in events if isinstance(e, dict)]


def _get_dispatcher_url(auth_token: str):
url = f'{epm_host}/EPM/API/accounts/tenanturl'
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {auth_token}'
}
tenant_url = storage.load(file_name=EPM_TENANT_URL_FILE_NAME)
if tenant_url:
return tenant_url.get('tenantUrl')

try:
logging.info('Getting tenant URL')
response = requests.get(url=url, headers=headers)
res_content = response.json()
if 200 <= response.status_code <= 299:
tenant_url = res_content['tenantUrl']
storage.save(
data={'tenantUrl': tenant_url},
file_name=EPM_TENANT_URL_FILE_NAME,
)
return tenant_url
if response.status_code == 400:
logging.error(f"{res_content.get('error')} {res_content.get('error_description')}")
else:
logging.error(f'error fetching tenant URL: {response.status_code} {response.text}')
except Exception as err:
logging.error(f'Something went wrong. Exception error text: {err}')
return None


def collect_events() -> list:
token = _get_oauth_token()
dispatcher_url = _get_dispatcher_url(token)
if not token or not dispatcher_url:
logging.error('Failed to obtain OAuth token or dispatcher URL')
return []

start_time, end_time = _get_time_window()
logging.info(f'Data processing. Period(UTC): {start_time} - {end_time}')

filter_date = '{"filter": "arrivalTime GE ' + str(start_time) + ' AND arrivalTime LE ' + end_time + '"}'

try:
sets_list = get_sets_list(epm_server=dispatcher_url, epm_token=token)
sets = sets_list.json().get('Sets') or []
except Exception:
logging.error('CyberArkEPMServerURL is invalid')
return []

all_events: list = []
for set_id in sets:
aggregated_events = _fetch_set_events(get_aggregated_events, dispatcher_url=dispatcher_url, token=token, filter_date=filter_date, set_id=set_id)
logging.info(f"Fetched {len(aggregated_events)} aggregated events from {set_id.get('Name')}")
all_events.extend([e | {"eventType": 'aggregated_events'} for e in aggregated_events if isinstance(e, dict)])

detailed_raw_events = _fetch_set_events(get_detailed_raw_events, dispatcher_url=dispatcher_url, token=token, filter_date=filter_date, set_id=set_id)
logging.info(f"Fetched {len(detailed_raw_events)} detailed raw events from {set_id.get('Name')}")
all_events.extend([e | {"eventType": 'raw_event'} for e in detailed_raw_events if isinstance(e, dict)])

aggregated_policy_audits = _fetch_set_events(get_aggregated_policy_audits, dispatcher_url=dispatcher_url, token=token, filter_date=filter_date, set_id=set_id)
logging.info(f"Fetched {len(aggregated_policy_audits)} aggregated policy audits from {set_id.get('Name')}")
all_events.extend([e | {"eventType": 'aggregated_policy_audits'} for e in aggregated_policy_audits if isinstance(e, dict)])

audit_raw_event_details = _fetch_set_events(get_policy_audit_raw_event_details, dispatcher_url=dispatcher_url, token=token, filter_date=filter_date, set_id=set_id)
logging.info(f"Fetched {len(audit_raw_event_details)} policy audit raw events from {set_id.get('Name')}")
all_events.extend([e | {"eventType": 'policy_audit_raw_event_details'} for e in audit_raw_event_details if isinstance(e, dict)])

try:
admin_events = get_admin_audit_events(
epm_server=dispatcher_url,
epm_token=token,
set_id=set_id['Id'],
start_time=start_time,
end_time=end_time,
limit=100,
)
logging.info(f"Fetched {len(admin_events)} admin audit events from {set_id.get('Name')}")
all_events.extend(admin_events)
except Exception as err:
logging.warning(f'Failed fetching Admin Audit Data: {err}')

return [e for e in all_events if isinstance(e, dict)]
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import json
import logging
import os
from datetime import datetime, timezone

from azure.identity import DefaultAzureCredential
from azure.monitor.ingestion import LogsIngestionClient


_CONNECTOR_EVENT_TYPES = {
'aggregated_events',
'raw_event',
'aggregated_policy_audits',
'policy_audit_raw_event_details',
'admin_audit',
}


def _to_rfc3339_utc(value):
if value is None:
return None
if isinstance(value, datetime):
dt = value
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat().replace('+00:00', 'Z')
if not isinstance(value, str):
return None

s = value.strip()
if not s:
return None

# Examples in docs:
# - 2021-07-07T06:44:52Z
# - 2022-02-28T11:28:16.069Z
try:
if s.endswith('Z'):
# datetime.fromisoformat doesn't accept trailing 'Z' directly
dt = datetime.fromisoformat(s[:-1])
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = datetime.fromisoformat(s)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat().replace('+00:00', 'Z')
except Exception:
return None


def _pick_timegenerated(event: dict):
for k in ('arrivalTime', 'lastEventDate', 'firstEventDate', 'date', 'Date', 'time', 'Time'):
ts = _to_rfc3339_utc(event.get(k))
if ts:
return ts
return _to_rfc3339_utc(datetime.now(timezone.utc))


def _get_first(event: dict, keys):
for k in keys:
v = event.get(k)
if v is not None and v != '':
return v
return None


def _transform_schema(epm_events):
dcr_events = []
for event in epm_events:
if not isinstance(event, dict):
continue

raw_event_type = event.get('eventType') or event.get('event_type')
if isinstance(raw_event_type, str) and raw_event_type in _CONNECTOR_EVENT_TYPES:
connector_event_type = raw_event_type
cyberark_event_type = event.get('CyberArkEventType') or event.get('cyberArkEventType')
else:
connector_event_type = event.get('EventType') or event.get('event_type') or 'unknown'
cyberark_event_type = raw_event_type

normalized = {
'TimeGenerated': _pick_timegenerated(event),
'EventType': connector_event_type,
'SetId': event.get('SetId') or event.get('setId'),
'SetName': event.get('SetName') or event.get('set_name') or event.get('setName'),
'EpmAgentId': event.get('agentId') or event.get('lastAgentId'),
'ComputerName': event.get('computerName') or event.get('lastEventComputerName') or event.get('sourceWSName'),
'UserName': _get_first(event, ('userName', 'lastEventUserName', 'firstEventUserName', 'owner')),
'PolicyName': event.get('policyName') or event.get('lastEventDisplayName'),
'PolicyAction': event.get('policyAction') or event.get('threatDetectionAction') or event.get('threatProtectionAction'),
'CyberArkEventType': cyberark_event_type,
'FileName': _get_first(event, ('fileName', 'lastEventFileName', 'lastEventOriginalFileName', 'originalFileName')),
'FilePath': event.get('filePath') or event.get('fileLocation'),
'Hash': event.get('hash'),
'Publisher': event.get('publisher'),
'SourceType': event.get('sourceType') or event.get('lastEventSourceType'),
'SourceName': event.get('sourceName') or event.get('lastEventSourceName'),
'FirstEventDate': _to_rfc3339_utc(event.get('firstEventDate')),
'LastEventDate': _to_rfc3339_utc(event.get('lastEventDate')),
'ArrivalTime': _to_rfc3339_utc(event.get('arrivalTime')),
'TotalEvents': event.get('totalEvents'),
'AffectedComputers': event.get('affectedComputers'),
'AffectedUsers': event.get('affectedUsers'),
'AggregatedBy': event.get('aggregatedBy'),
'FileQualifier': event.get('fileQualifier'),
'Skipped': event.get('skipped'),
'SkippedCount': event.get('skippedCount'),
'AdditionalFields': event,
}

# Drop None keys (keeps payload smaller and avoids type conflicts in DCR)
normalized = {k: v for k, v in normalized.items() if v is not None}

# Ensure non-dynamic fields don't accidentally become dict/list
for k, v in list(normalized.items()):
if k == 'AdditionalFields':
continue
if isinstance(v, (dict, list)):
normalized[k] = json.dumps(v)

dcr_events.append(normalized)

return dcr_events


def send_dcr_data(data: list):
endpoint = os.environ.get('DATA_COLLECTION_ENDPOINT')
rule_id = os.environ.get('LOGS_DCR_RULE_ID')
try:
credential = DefaultAzureCredential() # CodeQL [SM05139] This data connector (Function app based) is deprecated.
client = LogsIngestionClient(endpoint=endpoint, credential=credential, logging_enable=True)
dcr_events = _transform_schema(data)
client.upload(rule_id=rule_id, stream_name=os.environ.get('LOGS_DCR_STREAM_NAME'), logs=dcr_events)
except Exception as e:
logging.error(f"Upload failed: {e}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import os
import logging
import azure.functions as func
from .epm import collect_events
from .exporter import send_dcr_data


def _iter_chunks(data, chunk_size: int):
chunk = []
for item in data:
chunk.append(item)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk


def main(mytimer: func.TimerRequest) -> None:
if getattr(mytimer, 'past_due', False):
logging.info('The timer is past due!')

logging.getLogger().setLevel(logging.INFO)
logging.info('Starting program')

events = collect_events()
logging.info(f'Found {len(events)} events to export')
if not events:
return
chunk_size = int(os.environ.get('CHUNK_SIZE', '2000'))
for chunk in _iter_chunks(events, chunk_size=chunk_size):
send_dcr_data(data=chunk)
Loading