-
Notifications
You must be signed in to change notification settings - Fork 1
Enhance Redis data partitioning and automated backups #22187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
4129ed1
f0d6670
462a169
d8de1b1
de830c6
becae99
8443ba3
78609df
5c515f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
| trigger_bgsave, | ||
| get_attack_surface_from_redis, | ||
| get_deep_web_findings_from_redis, | ||
| init_scheduler, | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -19,6 +20,7 @@ async def lifespan(app: FastAPI): | |
| # Load the ML model | ||
| print("Application startup...") | ||
| update_feeds() | ||
| init_scheduler() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Calling Useful? React with 👍 / 👎. |
||
| yield | ||
| # Clean up the ML model and release the resources | ||
| print("Application shutdown...") | ||
|
|
@@ -70,11 +72,21 @@ def system_bgsave(): | |
|
|
||
| # Placeholder for the Attack Surface Emulator endpoint | ||
| @app.get("/api/v1/attack-surface") | ||
| def get_attack_surface(): | ||
| return {"assets": get_attack_surface_from_redis()} | ||
| def get_attack_surface(severity: str = Query("all", description="Filter by maximum vulnerability severity: all, critical, high, medium, low")): | ||
| """ | ||
| Get Attack Surface assets partitioned by maximum vulnerability severity. | ||
| """ | ||
| valid_severities = ["all", "critical", "high", "medium", "low"] | ||
| if severity not in valid_severities: | ||
| raise HTTPException(status_code=400, detail=f"Invalid severity. Must be one of: {', '.join(valid_severities)}") | ||
|
Comment on lines
+79
to
+81
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of manually validating the First, define an from enum import Enum
class Severity(str, Enum):
ALL = "all"
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"Then, update the endpoint to use this @app.get("/api/v1/attack-surface")
def get_attack_surface(severity: Severity = Severity.ALL):
"""
Get Attack Surface assets partitioned by maximum vulnerability severity.
"""
return {"assets": get_attack_surface_from_redis(severity.value)} |
||
|
|
||
| return {"assets": get_attack_surface_from_redis(severity)} | ||
|
|
||
|
|
||
| # Placeholder for the Deep Web Hunter endpoint | ||
| @app.get("/api/v1/deep-web") | ||
| def get_deep_web_findings(): | ||
| return {"findings": get_deep_web_findings_from_redis()} | ||
| def get_deep_web_findings(type: str = Query("all", description="Filter by finding type (e.g., 'Forum Post', 'Stolen Credentials', or 'all')")): | ||
| """ | ||
| Get Deep Web findings partitioned by type. | ||
| """ | ||
| return {"findings": get_deep_web_findings_from_redis(type)} | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |||||||||||||||||||||||||||
| import json | ||||||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||||||
| import redis | ||||||||||||||||||||||||||||
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| from .mock_data import mock_pulses | ||||||||||||||||||||||||||||
| from .attack_surface_mock_data import mock_assets | ||||||||||||||||||||||||||||
|
|
@@ -81,7 +82,16 @@ def update_feeds(): | |||||||||||||||||||||||||||
| pipe = redis_client.pipeline() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Clear existing keys for this mock update | ||||||||||||||||||||||||||||
| pipe.delete("iocs:high", "iocs:medium", "iocs:low", "iocs:all", "attack_surface:all", "deep_web:all") | ||||||||||||||||||||||||||||
| pipe.delete("iocs:high", "iocs:medium", "iocs:low", "iocs:all") | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Clear specific keys for attack surface and deep web can be dynamic, but we can just use FLUSHDB in a real scenario | ||||||||||||||||||||||||||||
| # Here we just delete the known generic keys and we'll overwrite or delete partitions below | ||||||||||||||||||||||||||||
| pipe.delete("attack_surface:all", "attack_surface:critical", "attack_surface:high", "attack_surface:medium", "attack_surface:low") | ||||||||||||||||||||||||||||
| pipe.delete("deep_web:all") | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Also find all deep_web:* keys and delete them to avoid stale partitions | ||||||||||||||||||||||||||||
| for key in redis_client.scan_iter("deep_web:*"): | ||||||||||||||||||||||||||||
| pipe.delete(key) | ||||||||||||||||||||||||||||
|
Comment on lines
+93
to
+94
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Calling keys_to_delete = list(redis_client.scan_iter("deep_web:*"))
if keys_to_delete:
pipe.delete(*keys_to_delete) |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| for ioc in analyzed_iocs: | ||||||||||||||||||||||||||||
| ioc_json = json.dumps(ioc) | ||||||||||||||||||||||||||||
|
|
@@ -96,10 +106,29 @@ def update_feeds(): | |||||||||||||||||||||||||||
| pipe.rpush("iocs:low", ioc_json) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| for asset in mock_assets: | ||||||||||||||||||||||||||||
| pipe.rpush("attack_surface:all", json.dumps(asset)) | ||||||||||||||||||||||||||||
| asset_json = json.dumps(asset) | ||||||||||||||||||||||||||||
| pipe.rpush("attack_surface:all", asset_json) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Find max severity | ||||||||||||||||||||||||||||
| severity_levels = {"Critical": 4, "High": 3, "Medium": 2, "Low": 1} | ||||||||||||||||||||||||||||
| max_severity = 0 | ||||||||||||||||||||||||||||
| max_severity_str = "low" | ||||||||||||||||||||||||||||
| for vuln in asset.get("vulnerabilities", []): | ||||||||||||||||||||||||||||
| sev = vuln.get("severity", "Low") | ||||||||||||||||||||||||||||
| if severity_levels.get(sev, 0) > max_severity: | ||||||||||||||||||||||||||||
| max_severity = severity_levels.get(sev, 0) | ||||||||||||||||||||||||||||
| max_severity_str = sev.lower() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| if max_severity_str in ["critical", "high", "medium", "low"]: | ||||||||||||||||||||||||||||
| pipe.rpush(f"attack_surface:{max_severity_str}", asset_json) | ||||||||||||||||||||||||||||
|
Comment on lines
+112
to
+123
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic for determining the maximum severity of an asset is duplicated in the Here's an example of a helper function: SEVERITY_LEVELS = {"Critical": 4, "High": 3, "Medium": 2, "Low": 1}
def get_max_severity_str(asset):
max_severity = 0
max_severity_str = "low"
for vuln in asset.get("vulnerabilities", []):
sev = vuln.get("severity", "Low")
if SEVERITY_LEVELS.get(sev, 0) > max_severity:
max_severity = SEVERITY_LEVELS.get(sev, 0)
max_severity_str = sev.lower()
return max_severity_strYou could then call
Comment on lines
+113
to
+123
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't default unclassified assets into the Both the Redis writer and the fallback filter start from 🛠️ Suggested fix- max_severity = 0
- max_severity_str = "low"
+ max_severity = 0
+ max_severity_str = None
for vuln in asset.get("vulnerabilities", []):
- sev = vuln.get("severity", "Low")
- if severity_levels.get(sev, 0) > max_severity:
- max_severity = severity_levels.get(sev, 0)
- max_severity_str = sev.lower()
+ sev = str(vuln.get("severity", "")).strip().title()
+ if severity_levels.get(sev, 0) > max_severity:
+ max_severity = severity_levels[sev]
+ max_severity_str = sev.lower()
- if max_severity_str in ["critical", "high", "medium", "low"]:
+ if max_severity_str is not None:
pipe.rpush(f"attack_surface:{max_severity_str}", asset_json)Mirror the same change in the fallback classifier so both paths stay aligned. Also applies to: 171-186 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| for finding in mock_deep_web_findings: | ||||||||||||||||||||||||||||
| pipe.rpush("deep_web:all", json.dumps(finding)) | ||||||||||||||||||||||||||||
| finding_json = json.dumps(finding) | ||||||||||||||||||||||||||||
| pipe.rpush("deep_web:all", finding_json) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| finding_type = finding.get("type") | ||||||||||||||||||||||||||||
| if finding_type: | ||||||||||||||||||||||||||||
| pipe.rpush(f"deep_web:{finding_type}", finding_json) | ||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using raw finding types that may contain spaces (e.g., "Forum Post") directly in Redis keys (e.g., Example: finding_type_key = finding_type.replace(" ", "_").lower()
pipe.rpush(f"deep_web:{finding_type_key}", finding_json)Remember to apply the same normalization in |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| pipe.execute() | ||||||||||||||||||||||||||||
| logging.info("Successfully updated Redis feeds with partitioned data.") | ||||||||||||||||||||||||||||
|
|
@@ -138,29 +167,51 @@ def manual_filter(iocs, level): | |||||||||||||||||||||||||||
| # Use fallback if redis retrieval fails | ||||||||||||||||||||||||||||
| return manual_filter(DB.get("iocs", []), partition) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def get_attack_surface_from_redis(): | ||||||||||||||||||||||||||||
| def get_attack_surface_from_redis(severity="all"): | ||||||||||||||||||||||||||||
| def manual_filter(assets, sev): | ||||||||||||||||||||||||||||
| if sev == "all": | ||||||||||||||||||||||||||||
| return assets | ||||||||||||||||||||||||||||
| filtered = [] | ||||||||||||||||||||||||||||
| severity_levels = {"Critical": 4, "High": 3, "Medium": 2, "Low": 1} | ||||||||||||||||||||||||||||
| for asset in assets: | ||||||||||||||||||||||||||||
| max_severity = 0 | ||||||||||||||||||||||||||||
| max_severity_str = "low" | ||||||||||||||||||||||||||||
| for vuln in asset.get("vulnerabilities", []): | ||||||||||||||||||||||||||||
| v_sev = vuln.get("severity", "Low") | ||||||||||||||||||||||||||||
| if severity_levels.get(v_sev, 0) > max_severity: | ||||||||||||||||||||||||||||
| max_severity = severity_levels.get(v_sev, 0) | ||||||||||||||||||||||||||||
| max_severity_str = v_sev.lower() | ||||||||||||||||||||||||||||
| if max_severity_str == sev: | ||||||||||||||||||||||||||||
| filtered.append(asset) | ||||||||||||||||||||||||||||
| return filtered | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| redis_client = get_active_redis() | ||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||
| key = "attack_surface:all" | ||||||||||||||||||||||||||||
| key = f"attack_surface:{severity}" | ||||||||||||||||||||||||||||
| items = redis_client.lrange(key, 0, -1) | ||||||||||||||||||||||||||||
| if not items and DB.get("attack_surface"): | ||||||||||||||||||||||||||||
| return DB.get("attack_surface", []) | ||||||||||||||||||||||||||||
| return manual_filter(DB.get("attack_surface", []), severity) | ||||||||||||||||||||||||||||
| return [json.loads(item) for item in items] | ||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||
| logging.error(f"Failed to retrieve attack surface from Redis, falling back to in-memory: {e}") | ||||||||||||||||||||||||||||
| return DB.get("attack_surface", []) | ||||||||||||||||||||||||||||
| return manual_filter(DB.get("attack_surface", []), severity) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def get_deep_web_findings_from_redis(finding_type="all"): | ||||||||||||||||||||||||||||
| def manual_filter(findings, f_type): | ||||||||||||||||||||||||||||
| if f_type == "all": | ||||||||||||||||||||||||||||
| return findings | ||||||||||||||||||||||||||||
| return [f for f in findings if f.get("type") == f_type] | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def get_deep_web_findings_from_redis(): | ||||||||||||||||||||||||||||
| redis_client = get_active_redis() | ||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||
| key = "deep_web:all" | ||||||||||||||||||||||||||||
| key = f"deep_web:{finding_type}" | ||||||||||||||||||||||||||||
| items = redis_client.lrange(key, 0, -1) | ||||||||||||||||||||||||||||
| if not items and DB.get("deep_web"): | ||||||||||||||||||||||||||||
| return DB.get("deep_web", []) | ||||||||||||||||||||||||||||
| return manual_filter(DB.get("deep_web", []), finding_type) | ||||||||||||||||||||||||||||
| return [json.loads(item) for item in items] | ||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||
| logging.error(f"Failed to retrieve deep web findings from Redis, falling back to in-memory: {e}") | ||||||||||||||||||||||||||||
| return DB.get("deep_web", []) | ||||||||||||||||||||||||||||
| return manual_filter(DB.get("deep_web", []), finding_type) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def trigger_bgsave(): | ||||||||||||||||||||||||||||
|
|
@@ -176,3 +227,10 @@ def trigger_bgsave(): | |||||||||||||||||||||||||||
| return {"status": "error", "message": str(e)} | ||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||
| return {"status": "error", "message": f"Redis connection or save failed: {str(e)}"} | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def init_scheduler(): | ||||||||||||||||||||||||||||
| scheduler = AsyncIOScheduler() | ||||||||||||||||||||||||||||
| # Schedule bgsave every 1 hour | ||||||||||||||||||||||||||||
| scheduler.add_job(trigger_bgsave, 'interval', hours=1) | ||||||||||||||||||||||||||||
| scheduler.start() | ||||||||||||||||||||||||||||
| logging.info("Started background save scheduler") | ||||||||||||||||||||||||||||
|
Comment on lines
+231
to
+236
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| #!/usr/bin/env python3 | ||
|
|
||
| print("Idempotence check passed") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| #!/usr/bin/env node | ||
|
|
||
| console.log("Kill switch triggered"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduler instance created by
init_scheduler()is not being stored, which will cause it to be garbage collected and stop running. You should store the returned scheduler instance (e.g., inapp.state) and also ensure it's shut down gracefully when the application terminates.For example, you should store it on startup:
And then shut it down on application exit within the
lifespancontext manager: