diff --git a/.env.example b/.env.example index 0719911..47fc82a 100644 --- a/.env.example +++ b/.env.example @@ -162,6 +162,22 @@ RIVEN_FRONTEND_COMMAND=node build RIVEN_FRONTEND_CONFIG_DIR=/riven/frontend RIVEN_FRONTEND_ENV_DIALECT=postgres +#------------------------------------- +# Cacharr Variables +#------------------------------------- + +CACHARR_ENABLED=false +CACHARR_PROCESS_NAME=Cacharr +CACHARR_SUPPRESS_LOGGING=false +CACHARR_LOG_LEVEL=INFO +CACHARR_PORT=8484 +CACHARR_CONFIG_DIR=/cacharr +CACHARR_CONFIG_FILE=/data/cacharr_config.json +CACHARR_LOG_FILE=/log/cacharr.log +CACHARR_ENV_PROWLARR_URL=http://127.0.0.1:9696 +CACHARR_ENV_PROWLARR_KEY= +CACHARR_ENV_DB_DSN=postgresql://DUMB:postgres@127.0.0.1:5432/riven + #------------------------------------- # Zilean Variables #------------------------------------- diff --git a/cacharr/cacharr.py b/cacharr/cacharr.py new file mode 100644 index 0000000..e649f64 --- /dev/null +++ b/cacharr/cacharr.py @@ -0,0 +1,2280 @@ +#!/usr/bin/env python3 +""" +Cacharr +======= +Finds items stuck in Riven (Scraped/Failed with no cached streams on RD), +searches Prowlarr for torrent hashes, adds them to Real-Debrid to trigger +server-side caching, then resets the item in Riven to re-process once cached. + +Nothing is downloaded locally — RD fetches from seeders to their own servers. +State is persisted to /data/cacharr_state.json across restarts. +Web UI available on port 8484. + +Usage: + python /data/cacharr.py # run as daemon (default) + python /data/cacharr.py --once # single cycle then exit +""" + +import json +import os +import sys +import time +import logging +import argparse +import threading +import requests +import psycopg2 +import psycopg2.extras +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timedelta +from pathlib import Path +from http.server import BaseHTTPRequestHandler, HTTPServer +from socketserver import ThreadingMixIn +from collections import deque + +# ── Config ──────────────────────────────────────────────────────────────────── +# All settings can be overridden via environment variables. +# When running as a standalone container, set DB_DSN, PROWLARR_URL, +# PROWLARR_KEY, and RD_API_KEY. When running inside DUMB via +# riven_scraper_patch.sh the defaults (localhost) still work. + +DB_DSN = os.getenv("DB_DSN", "postgresql://DUMB:postgres@127.0.0.1:5432/riven") +PROWLARR_URL = os.getenv("PROWLARR_URL", "http://localhost:9696") +PROWLARR_KEY = os.getenv("PROWLARR_KEY", "") +RD_BASE = "https://api.real-debrid.com/rest/1.0" +RIVEN_SETTINGS = "/riven/backend/data/settings.json" +STATE_FILE = os.getenv("STATE_FILE", "/data/cacharr_state.json") +LOG_FILE = os.getenv("LOG_FILE", "/log/cacharr.log") +UI_PORT = int(os.getenv("UI_PORT", "8484")) +_RD_API_KEY = os.getenv("RD_API_KEY", "") # set in standalone container; falls back to settings.json +CACHARR_CONFIG_FILE = os.getenv("CONFIG_FILE", "/data/cacharr_config.json") + +# ── Force-cycle signal (set via /api/force-cycle) ───────────────────────────── +_force_cycle = threading.Event() + +# ── Force-stale-check signal (set via /api/force-stale-check) ───────────────── +_force_stale = threading.Event() + +# ── Stuck-items cache (populated each cycle, served to /api/stuck) ───────────── +_stuck_lock = threading.Lock() +_stuck_cache: dict = {"items": [], "fetched_at": None} + +LOOP_INTERVAL = 600 # seconds between cycles +CACHE_TIMEOUT_H = 8 # hours before giving up on an RD torrent +MAX_NEW_PER_CYCLE = 20 # max new torrents added to RD per cycle +MIN_SCRAPED_TIMES = 1 # item must have failed this many scrape rounds +MIN_STUCK_HOURS = 1 # item must have been stuck at least this long +MIN_SEEDERS = 1 # require at least 1 seeder so RD can actually download the torrent +STALE_ZERO_MINS = 60 # minutes a torrent can stay at 0% downloading before we drop it +STALE_SELECTING_MINS = 20 # minutes stuck in waiting_files_selection / magnet_conversion before drop +SEARCH_WORKERS = 8 # parallel Prowlarr search threads +TRIED_EXPIRY_DAYS = 7 # retry a failed hash after this many days +NONE_STRIKE_LIMIT = 3 # remove from pending after this many consecutive None-status checks +# Search miss cooldown: after N consecutive zero-result or wrong-season searches, back off. +# Schedule (minutes): 1st miss=30m, 2nd=60m, 3rd=2h, 4th=4h, 5th+=6h +SEARCH_MISS_BACKOFF_MINS = [30, 60, 120, 240, 360] + +STALE_CHECK_INTERVAL_H = 24 # hours between RD health checks on Completed items + +TRASH_WORDS = { + "cam", "camrip", "hdcam", "screener", "scr", + "telesync", "r5", "pdtv", "ts", + # Google Drive / fake dump torrents — never contain real video files + "gdrive", "g-drive", +} + +# ── Logging ─────────────────────────────────────────────────────────────────── + +# Ring buffer for the UI to read from +_log_buffer = deque(maxlen=200) + +class _BufferHandler(logging.Handler): + def emit(self, record): + _log_buffer.append(self.format(record)) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [hunter] %(levelname)s %(message)s", + datefmt="%H:%M:%S", + stream=sys.stdout, + force=True, +) +log = logging.getLogger("hunter") +_buf_handler = _BufferHandler() +_buf_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s", "%H:%M:%S")) +log.addHandler(_buf_handler) + +# ── Live cycle progress (thread-safe) ───────────────────────────────────────── + +_progress_lock = threading.Lock() +_cycle_progress = { + "phase": "idle", # idle | searching | rd_check | adding | pending_check + "phase_label": "Idle", + "search_done": 0, + "search_total": 0, + "search_items": [], # list of {label, done, found} for each search target + "next_cycle_at": None, # ISO timestamp of when next cycle fires + "cycle_running": False, +} + +def set_progress(**kwargs): + with _progress_lock: + _cycle_progress.update(kwargs) + +def get_progress(): + with _progress_lock: + return dict(_cycle_progress) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def load_rd_key(): + if _RD_API_KEY: + return _RD_API_KEY + with open(RIVEN_SETTINGS) as f: + return json.load(f)["downloaders"]["real_debrid"]["api_key"] + +def rd_headers(): + return {"Authorization": f"Bearer {load_rd_key()}"} + +def utcnow(): + return datetime.utcnow() + +def fmt_dt(iso): + try: + return datetime.fromisoformat(iso).strftime("%b %d %H:%M") + except Exception: + return iso or "—" + +def time_left(iso): + try: + delta = datetime.fromisoformat(iso) - utcnow() + mins = int(delta.total_seconds() / 60) + if mins < 0: + return "expired" + return f"{mins}m" + except Exception: + return "—" + + +# ── State ───────────────────────────────────────────────────────────────────── + +_state_lock = threading.Lock() + +def load_state(): + p = Path(STATE_FILE) + if p.exists(): + try: + return json.loads(p.read_text()) + except Exception: + pass + return {"pending": [], "tried_hashes": {}, "stats": {"resolved": 0, "timed_out": 0, "added": 0}} + +def load_tried_set(state): + """Return set of non-expired tried hashes, migrating old list format if needed.""" + raw = state.get("tried_hashes", {}) + cutoff = utcnow() - timedelta(days=TRIED_EXPIRY_DAYS) + if isinstance(raw, list): + # Migrate old format (plain list) — treat all as tried today + now = utcnow().isoformat() + state["tried_hashes"] = {h: now for h in raw} + return set(raw) + # New format: dict of hash -> ISO timestamp + valid = {h: ts for h, ts in raw.items() + if datetime.fromisoformat(ts) > cutoff} + expired = len(raw) - len(valid) + if expired: + log.info(f" Expired {expired} tried hash(es) older than {TRIED_EXPIRY_DAYS} days") + state["tried_hashes"] = valid + return set(valid.keys()) + +def save_tried(state, tried_set): + """Persist tried set back to state, keeping existing timestamps for old entries.""" + existing = state.get("tried_hashes", {}) + if isinstance(existing, list): + existing = {} + now = utcnow().isoformat() + state["tried_hashes"] = {h: existing.get(h, now) for h in tried_set} + + +# ── Search miss cooldown ─────────────────────────────────────────────────────── + +def _search_key(item): + """Stable cooldown key: 'show:s04' for episode groups, 'movie:tt1234' for movies.""" + if item.get("kind") in ("season_group", "episode"): + return f"{item['show_title'].lower()}:s{item.get('season_num', 0):02d}" + return f"movie:{(item.get('imdb_id') or item.get('title', '')).lower()}" + +def _is_in_cooldown(state, item): + cd = state.get("search_cooldowns", {}).get(_search_key(item)) + if not cd: + return False + try: + return utcnow() < datetime.fromisoformat(cd["next_search_at"]) + except Exception: + return False + +def _record_search_miss(state, item): + key = _search_key(item) + cds = state.setdefault("search_cooldowns", {}) + failures = cds.get(key, {}).get("failures", 0) + 1 + backoff = SEARCH_MISS_BACKOFF_MINS[min(failures - 1, len(SEARCH_MISS_BACKOFF_MINS) - 1)] + next_at = (utcnow() + timedelta(minutes=backoff)).isoformat() + cds[key] = {"failures": failures, "next_search_at": next_at} + log.info(f" Search miss #{failures} for '{item_label(item)}' — cooldown {backoff}m") + +def _clear_search_miss(state, item): + state.get("search_cooldowns", {}).pop(_search_key(item), None) + +def _prune_cooldowns(state): + """Drop expired cooldown entries so the dict doesn't grow forever.""" + cds = state.get("search_cooldowns", {}) + if not cds: + return + now = utcnow() + expired = [k for k, v in cds.items() + if datetime.fromisoformat(v.get("next_search_at", "2000-01-01")) <= now] + for k in expired: + del cds[k] + if expired: + log.info(f" Pruned {len(expired)} expired search cooldown(s)") + + +_state_mem = None # in-memory cache — avoids disk read on every 3s status poll + +def save_state(state): + global _state_mem + with _state_lock: + _state_mem = dict(state) + Path(STATE_FILE).write_text(json.dumps(state, indent=2, default=str)) + +def get_state(): + global _state_mem + with _state_lock: + if _state_mem is None: + _state_mem = load_state() + return dict(_state_mem) + + +# ── Database ────────────────────────────────────────────────────────────────── + +STUCK_MOVIES_SQL = """ + SELECT mv.id, mi.title, mi.imdb_id, mi.last_state, mi.scraped_times, mi.scraped_at, 'movie' AS kind + FROM "Movie" mv + JOIN "MediaItem" mi ON mv.id = mi.id + WHERE mi.last_state IN ('Scraped', 'Failed', 'Indexed') + AND mi.scraped_times >= %s + AND mi.scraped_at IS NOT NULL + AND mi.scraped_at < NOW() - (%s * INTERVAL '1 hour') + AND mi.imdb_id IS NOT NULL + ORDER BY mi.scraped_times DESC + LIMIT 100 +""" + +STUCK_EPISODES_SQL = """ + SELECT ep_mi.id, show_mi.title AS show_title, show_mi.imdb_id, + season_mi.number AS season_num, ep_mi.number AS ep_num, + ep_mi.last_state, ep_mi.scraped_times, ep_mi.scraped_at, 'episode' AS kind + FROM "Episode" ep + JOIN "MediaItem" ep_mi ON ep.id = ep_mi.id + JOIN "Season" sn ON ep.parent_id = sn.id + JOIN "MediaItem" season_mi ON sn.id = season_mi.id + JOIN "Show" sh ON sn.parent_id = sh.id + JOIN "MediaItem" show_mi ON sh.id = show_mi.id + WHERE ep_mi.last_state IN ('Scraped', 'Failed', 'Indexed') + AND ep_mi.scraped_times >= %s + AND ep_mi.scraped_at IS NOT NULL + AND ep_mi.scraped_at < NOW() - (%s * INTERVAL '1 hour') + AND show_mi.imdb_id IS NOT NULL + ORDER BY ep_mi.scraped_times DESC + LIMIT 100 +""" + +def get_stuck_items(conn): + cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + cur.execute(STUCK_MOVIES_SQL, (MIN_SCRAPED_TIMES, MIN_STUCK_HOURS)) + movies = [dict(r) for r in cur.fetchall()] + cur.execute(STUCK_EPISODES_SQL, (MIN_SCRAPED_TIMES, MIN_STUCK_HOURS)) + episodes = [dict(r) for r in cur.fetchall()] + return movies + episodes + +def reset_item(conn, item_id, label): + cur = conn.cursor() + cur.execute('DELETE FROM "StreamRelation" WHERE parent_id = %s', (item_id,)) + cur.execute('DELETE FROM "StreamBlacklistRelation" WHERE media_item_id = %s', (item_id,)) + cur.execute(""" + UPDATE "MediaItem" + SET last_state = 'Indexed', + scraped_at = NULL, + scraped_times = 0, + failed_attempts = 0 + WHERE id = %s + """, (item_id,)) + conn.commit() + log.info(f" ✓ Reset '{label}' → Indexed (streams cleared)") + + +# ── Stale RD content checker ────────────────────────────────────────────────── + +def _rd_torrent_exists(torrent_id): + """Return True if the RD torrent ID still exists in the user's account.""" + try: + r = requests.get( + f"{RD_BASE}/torrents/info/{torrent_id}", + headers=rd_headers(), timeout=15, + ) + return r.status_code != 404 + except Exception: + return True # assume still alive on network error — safer than false-positive reset + + +def check_stale_completed(conn): + """Check Completed items whose RD torrent ID no longer exists in the user's account. + + Uses GET /torrents/info/{id} — a 404 means the torrent was removed from My Torrents + (deleted, expired, or purged by RD). Network errors are treated as 'still alive' + to avoid false-positive resets. + + Only processes items that have both an infohash AND a torrent id in active_stream + (items completed via sync_library have neither and are skipped). + """ + cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + cur.execute(""" + SELECT id, type, title, + active_stream->>'infohash' AS infohash, + active_stream->>'id' AS rd_torrent_id + FROM "MediaItem" + WHERE last_state = 'Completed' + AND type IN ('movie', 'episode') + AND active_stream->>'id' IS NOT NULL + AND active_stream->>'id' != '' + """) + rows = cur.fetchall() + if not rows: + log.info("Stale check: no Completed items with RD torrent IDs to check") + return 0 + + log.info(f"Stale check: verifying {len(rows)} RD torrent IDs in user account...") + + stale = [] + for row in rows: + rd_id = row["rd_torrent_id"] + if not _rd_torrent_exists(rd_id): + stale.append(row) + + log.info( + f"Stale check: {len(rows) - len(stale)}/{len(rows)} torrents still in account, " + f"{len(stale)} missing" + ) + + if not stale: + return 0 + + reset_count = 0 + reset_cur = conn.cursor() + for row in stale: + item_id = row["id"] + item_type = row["type"] + title = row["title"] or item_id + h = (row["infohash"] or "")[:8] + try: + reset_cur.execute('DELETE FROM "StreamRelation" WHERE parent_id = %s', (item_id,)) + reset_cur.execute('DELETE FROM "StreamBlacklistRelation" WHERE media_item_id = %s', (item_id,)) + reset_cur.execute(""" + UPDATE "MediaItem" SET + last_state = 'Indexed', + symlinked = false, + symlinked_at = NULL, + symlink_path = NULL, + file = NULL, + folder = NULL, + active_stream = '{}', + scraped_at = NULL, + scraped_times = 0, + failed_attempts = 0 + WHERE id = %s + """, (item_id,)) + log.info(f" [stale] Reset {item_type} '{title}' — RD torrent {row['rd_torrent_id']} no longer in account") + reset_count += 1 + except Exception as e: + log.warning(f" [stale] Failed to reset {item_id}: {e}") + + conn.commit() + log.info(f"Stale check complete: reset {reset_count} item(s) → queued for re-scrape") + return reset_count + + +# ── Prowlarr ────────────────────────────────────────────────────────────────── + +def prowlarr_search(item): + kind = item["kind"] + label = item_label(item) + imdb_id = (item.get("imdb_id") or "").strip() + if kind == "season_group": + params = { + "query": item["show_title"], + "type": "tvsearch", + "season": item["season_num"], + "apikey": PROWLARR_KEY, + } + if imdb_id: + params["imdbId"] = imdb_id + elif kind == "episode": + params = { + "query": item["show_title"], + "type": "tvsearch", + "season": item["season_num"], + "ep": item["ep_num"], + "apikey": PROWLARR_KEY, + } + if imdb_id: + params["imdbId"] = imdb_id + else: + params = { + "query": item["title"], + "type": "search", + "apikey": PROWLARR_KEY, + } + if imdb_id: + params["imdbId"] = imdb_id + try: + resp = requests.get(f"{PROWLARR_URL}/api/v1/search", params=params, timeout=120) + resp.raise_for_status() + results = resp.json() + log.info(f" Prowlarr: {len(results)} results for '{label}'") + return results + except Exception as e: + log.warning(f" Prowlarr search failed for '{label}': {e}") + return [] + + +def group_episodes_by_season(episodes): + """Group episode DB rows into season-level search targets.""" + groups = {} + for ep in episodes: + key = (ep["show_title"], ep["imdb_id"], ep["season_num"]) + if key not in groups: + groups[key] = { + "kind": "season_group", + "show_title": ep["show_title"], + "imdb_id": ep["imdb_id"], + "season_num": ep["season_num"], + "item_ids": [], + "ep_nums": [], + } + groups[key]["item_ids"].append(ep["id"]) + groups[key]["ep_nums"].append(ep["ep_num"]) + result = [] + for g in groups.values(): + g["id"] = g["item_ids"][0] # representative id for dedup + result.append(g) + # Sort by most episodes stuck first (most urgent) + result.sort(key=lambda x: len(x["item_ids"]), reverse=True) + return result + +def item_label(item): + if item["kind"] == "season_group": + eps = sorted(item.get("ep_nums", [])) + ep_str = f" ({len(eps)} eps)" if eps else "" + return f"{item['show_title']} S{item['season_num']:02d}{ep_str}" + if item["kind"] == "episode": + return f"{item['show_title']} S{item['season_num']:02d}E{item['ep_num']:02d}" + return item["title"] + +def extract_hashes(results): + seen = set() + hashes = [] + for r in results: + h = (r.get("infoHash") or "").strip().lower() + if h and len(h) == 40 and h not in seen: + seen.add(h) + hashes.append(h) + return hashes + +_STOP_WORDS = {"the", "a", "an", "and", "or", "of", "in", "to", "with", "for", "is", "at"} + +def title_is_relevant(query_title, result_title): + """Return True if the result title shares at least one significant word with the query. + + Prevents GDrive dumps, multi-movie packs, and totally unrelated results from + passing the filter. Uses words >= 4 chars to avoid stop-word noise. + """ + import re + query_words = {w for w in re.split(r'\W+', query_title.lower()) + if len(w) >= 4 and w not in _STOP_WORDS} + if not query_words: + return True # can't judge — let it through + result_lower = result_title.lower() + return any(w in result_lower for w in query_words) + + +def season_score(title, season_num): + """Boost score for results that match the target season or are multi-season packs.""" + import re + t = title.lower() + # Exact season match — preferred: smaller, less likely to be DMCA blocked + if re.search(rf'\bs0*{season_num}\b|\bseason\s*0*{season_num}\b', t): + return 2 + # Multi-season pack (e.g. "S01-S05 Complete") — fallback if no exact match + if re.search(r's\d+\s*[-–]\s*s\d+', t) or any(w in t for w in ("complete", "collection", "seasons")): + return 1 + # Wrong season present — small penalty + if re.search(r'\bs\d{1,2}e\d|season\s*\d', t): + return -1 + return 0 + +def pick_best(results, tried_hashes, label="", season_num=None, query_title=""): + no_hash = no_tried = no_trash = no_seed = no_rel = 0 + candidates = [] + for r in results: + h = (r.get("infoHash") or "").strip().lower() + if not h or len(h) != 40: + no_hash += 1; continue + if h in tried_hashes: + no_tried += 1; continue + words = set(r.get("title", "").lower().split()) + if words & TRASH_WORDS: + no_trash += 1; continue + if (r.get("seeders") or 0) < MIN_SEEDERS: + no_seed += 1; continue + if query_title and not title_is_relevant(query_title, r.get("title", "")): + no_rel += 1; continue + candidates.append(r) + if not candidates: + log.info(f" '{label}' rejected: {no_hash} no-hash, {no_tried} already-tried, " + f"{no_trash} trash, {no_seed} no-seeders, {no_rel} irrelevant") + return None + # Sort: season relevance first, then seeder count + if season_num is not None: + candidates.sort(key=lambda x: (season_score(x.get("title",""), season_num), x.get("seeders") or 0), reverse=True) + best = candidates[0] + sc = season_score(best.get("title", ""), season_num) + if sc < 0: + log.info(f" '{label}': best candidate is wrong season ('{best.get('title','')[:60]}') — skipping to avoid bad content") + return None + else: + candidates.sort(key=lambda x: x.get("seeders") or 0, reverse=True) + best = candidates[0] + return best + + +# ── Real-Debrid ─────────────────────────────────────────────────────────────── + +def check_rd_cache(hashes): + cached = set() + for i in range(0, len(hashes), 100): + batch = hashes[i:i + 100] + try: + resp = requests.get( + f"{RD_BASE}/torrents/instantAvailability/{'/'.join(batch)}", + headers=rd_headers(), timeout=30, + ) + if not resp.ok: + continue + for h, info in resp.json().items(): + if info and isinstance(info, dict) and info.get("rd"): + cached.add(h.lower()) + except Exception as e: + log.warning(f" RD cache check error: {e}") + return cached + +def rd_add(info_hash): + magnet = f"magnet:?xt=urn:btih:{info_hash}" + try: + r = requests.post( + f"{RD_BASE}/torrents/addMagnet", + headers=rd_headers(), data={"magnet": magnet}, timeout=30, + ) + r.raise_for_status() + torrent_id = r.json().get("id") + if not torrent_id: + return None + requests.post( + f"{RD_BASE}/torrents/selectFiles/{torrent_id}", + headers=rd_headers(), data={"files": "all"}, timeout=30, + ) + return torrent_id + except Exception as e: + log.warning(f" RD add failed for {info_hash}: {e}") + return None + +def rd_status(torrent_id): + try: + r = requests.get( + f"{RD_BASE}/torrents/info/{torrent_id}", + headers=rd_headers(), timeout=30, + ) + if not r.ok: + return None, 0, 0, 0, 0 + d = r.json() + return ( + d.get("status"), + d.get("progress", 0), + d.get("speed", 0) or 0, # bytes/s + d.get("seeders", 0) or 0, + d.get("bytes_left", 0) or 0, + ) + except Exception: + return None, 0, 0, 0, 0 + +def rd_delete(torrent_id): + try: + requests.delete( + f"{RD_BASE}/torrents/delete/{torrent_id}", + headers=rd_headers(), timeout=15, + ) + except Exception: + pass + +# ── RD status cache (avoids N sequential API calls on every 3s UI poll) ──────── +_rd_status_cache: dict = {} +_RD_CACHE_TTL = 12 # seconds — fast enough for live progress, slow enough to not spam RD + +def rd_status_cached(torrent_id): + now = time.time() + hit = _rd_status_cache.get(torrent_id) + if hit and now - hit[0] < _RD_CACHE_TTL: + return hit[1] + result = rd_status(torrent_id) + _rd_status_cache[torrent_id] = (now, result) + # Evict entries older than 3× TTL + stale = [k for k, v in _rd_status_cache.items() if now - v[0] > _RD_CACHE_TTL * 3] + for k in stale: + _rd_status_cache.pop(k, None) + return result + + +# ── Config management ───────────────────────────────────────────────────────── + +def get_config_dict(): + return { + "db_dsn": DB_DSN, + "prowlarr_url": PROWLARR_URL, + "prowlarr_key": PROWLARR_KEY, + "rd_api_key": _RD_API_KEY, + "loop_interval": LOOP_INTERVAL, + "cache_timeout_h": CACHE_TIMEOUT_H, + "max_new_per_cycle": MAX_NEW_PER_CYCLE, + "min_scraped_times": MIN_SCRAPED_TIMES, + "min_stuck_hours": MIN_STUCK_HOURS, + "min_seeders": MIN_SEEDERS, + "stale_zero_mins": STALE_ZERO_MINS, + "stale_selecting_mins": STALE_SELECTING_MINS, + "search_workers": SEARCH_WORKERS, + "tried_expiry_days": TRIED_EXPIRY_DAYS, + "none_strike_limit": NONE_STRIKE_LIMIT, + "stale_check_interval_h": STALE_CHECK_INTERVAL_H, + } + +def apply_config(cfg, persist=True): + global DB_DSN, PROWLARR_URL, PROWLARR_KEY, _RD_API_KEY + global LOOP_INTERVAL, CACHE_TIMEOUT_H, MAX_NEW_PER_CYCLE + global MIN_SCRAPED_TIMES, MIN_STUCK_HOURS, MIN_SEEDERS, STALE_ZERO_MINS + global SEARCH_WORKERS, TRIED_EXPIRY_DAYS, NONE_STRIKE_LIMIT, STALE_CHECK_INTERVAL_H + if "db_dsn" in cfg: DB_DSN = str(cfg["db_dsn"]) + if "prowlarr_url" in cfg: PROWLARR_URL = str(cfg["prowlarr_url"]) + if "prowlarr_key" in cfg: PROWLARR_KEY = str(cfg["prowlarr_key"]) + if "rd_api_key" in cfg: _RD_API_KEY = str(cfg["rd_api_key"]) + if "loop_interval" in cfg: LOOP_INTERVAL = int(cfg["loop_interval"]) + if "cache_timeout_h" in cfg: CACHE_TIMEOUT_H = int(cfg["cache_timeout_h"]) + if "max_new_per_cycle" in cfg: MAX_NEW_PER_CYCLE = int(cfg["max_new_per_cycle"]) + if "min_scraped_times" in cfg: MIN_SCRAPED_TIMES = int(cfg["min_scraped_times"]) + if "min_stuck_hours" in cfg: MIN_STUCK_HOURS = int(cfg["min_stuck_hours"]) + if "min_seeders" in cfg: MIN_SEEDERS = int(cfg["min_seeders"]) + if "stale_zero_mins" in cfg: STALE_ZERO_MINS = int(cfg["stale_zero_mins"]) + if "stale_selecting_mins" in cfg: STALE_SELECTING_MINS = int(cfg["stale_selecting_mins"]) + if "search_workers" in cfg: SEARCH_WORKERS = int(cfg["search_workers"]) + if "tried_expiry_days" in cfg: TRIED_EXPIRY_DAYS = int(cfg["tried_expiry_days"]) + if "none_strike_limit" in cfg: NONE_STRIKE_LIMIT = int(cfg["none_strike_limit"]) + if "stale_check_interval_h" in cfg: STALE_CHECK_INTERVAL_H = int(cfg["stale_check_interval_h"]) + if persist: + Path(CACHARR_CONFIG_FILE).write_text(json.dumps(cfg, indent=2)) + log.info("Config saved") + +def load_cacharr_config(): + p = Path(CACHARR_CONFIG_FILE) + if not p.exists(): + return + try: + apply_config(json.loads(p.read_text()), persist=False) + log.info(f"Config loaded from {CACHARR_CONFIG_FILE}") + except Exception as e: + log.warning(f"Failed to load config file: {e}") + +def get_cached_stuck(): + with _stuck_lock: + return {"items": list(_stuck_cache["items"]), "fetched_at": _stuck_cache["fetched_at"]} + +def set_cached_stuck(items): + with _stuck_lock: + _stuck_cache["items"] = [dict(i) for i in items] + _stuck_cache["fetched_at"] = utcnow().isoformat() + + +# ── Web UI ──────────────────────────────────────────────────────────────────── + +HTML_TEMPLATE = """ + + + + +Cacharr + + + + + + + + +
+
+
Dashboard
+
+
Next cycle:
+
Updated: never
+ + +
+
+ +
+ + +
+
+
Caching on RD
+
Resolved All-Time
+
Success Rate
+
Timed Out
+
+
+
Movies Complete
+
Episodes Complete
+
Synced This Cycle
+
Missing (Indexed)
+
+
+
Idle
+
+
0%
+
Last:
+ + +
+ +
Currently Caching on Real-Debrid
+
+ +
ItemStatusProgressSpeedETASeedersAdded
No torrents currently being cached.
+
+ +
+ + + + + + + + + + + + + + + + +
+
+
+ + + +""" + + +def build_html(): + return HTML_TEMPLATE + + +def build_status_json(): + """Live status for the JS poller.""" + prog = get_progress() + state = get_state() + stats = state.get("stats", {}) + pending = state.get("pending", []) + # Fetch RD status in parallel with per-entry caching (TTL=12s) + def _fetch_rd(e): + st, pr, sp, se, bl = rd_status_cached(e.get("rd_torrent_id", "")) + return {**e, "live_status": st or "checking", "live_progress": pr, + "live_speed": sp, "live_seeders": se, "live_bytes_left": bl} + if pending: + with ThreadPoolExecutor(max_workers=min(len(pending), 8)) as _p: + pending_live = list(_p.map(_fetch_rd, pending)) + else: + pending_live = [] + return { + "progress": prog, + "stats": stats, + "pending": pending_live, + "log_lines": list(_log_buffer), + "library_sync": state.get("last_library_sync", {}), + "stale_check": state.get("last_stale_check", {}), + "history": state.get("history", [])[:20], + "tried_count": len(state.get("tried_hashes", {})), + "library_stats": state.get("library_stats_cache"), + "last_cycle_summary": state.get("last_cycle_summary", {}), + } + + +class UIHandler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path in ("/", "/index.html"): + try: + body = build_html().encode() + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + except Exception as e: + self.send_response(500) + self.end_headers() + self.wfile.write(str(e).encode()) + elif self.path == "/api/status": + try: + body = json.dumps(build_status_json(), default=str).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + except Exception as e: + self.send_response(500) + self.end_headers() + self.wfile.write(str(e).encode()) + elif self.path == "/api/config": + self._json(get_config_dict()) + elif self.path == "/api/stuck": + self._json(get_cached_stuck(), default=str) + elif self.path == "/api/library": + try: + c = psycopg2.connect(DB_DSN) + cur = c.cursor() + cur.execute(""" + SELECT type, last_state, COUNT(*) + FROM "MediaItem" + WHERE type IN ('movie','episode','show','season') + GROUP BY type, last_state + """) + rows = cur.fetchall() + c.close() + stats = {} + for typ, st, cnt in rows: + stats.setdefault(typ, {})[st] = cnt + self._json({"stats": stats}) + except Exception as ex: + self._json({"error": str(ex)}) + elif self.path == "/api/test/rd": + try: + key = load_rd_key() + r = requests.get(f"{RD_BASE}/user", headers={"Authorization": f"Bearer {key}"}, timeout=10) + result = {"ok": r.ok, "detail": f"Hello, {r.json().get('username','user')}" if r.ok else f"HTTP {r.status_code}"} + except Exception as e: + result = {"ok": False, "detail": str(e)[:120]} + self._json(result) + elif self.path == "/api/test/prowlarr": + try: + r = requests.get(f"{PROWLARR_URL}/api/v1/system/status", params={"apikey": PROWLARR_KEY}, timeout=10) + result = {"ok": r.ok, "detail": f"v{r.json().get('version','?')}" if r.ok else f"HTTP {r.status_code}"} + except Exception as e: + result = {"ok": False, "detail": str(e)[:120]} + self._json(result) + elif self.path == "/api/test/db": + try: + c = psycopg2.connect(DB_DSN); c.close() + result = {"ok": True, "detail": "Connected"} + except Exception as e: + result = {"ok": False, "detail": str(e)[:120]} + self._json(result) + else: + self.send_response(404) + self.end_headers() + + def do_POST(self): + try: + length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(length) + except Exception: + self.send_response(400); self.end_headers(); return + + if self.path == "/api/config": + try: + apply_config(json.loads(body)) + self._json({"ok": True}) + except Exception as e: + self.send_response(500); self.end_headers(); self.wfile.write(str(e).encode()) + elif self.path == "/api/force-cycle": + _force_cycle.set() + self._json({"ok": True}) + elif self.path == "/api/force-stale-check": + _force_stale.set() + _force_cycle.set() + self._json({"ok": True}) + elif self.path == "/api/force-library-sync": + # Run library sync immediately in a background thread + def _do_sync(): + try: + import sys as _s + if "/data" not in _s.path: _s.path.insert(0, "/data") + from sync_library import sync_movies as _sm, sync_episodes as _se, _get_conn as _sc + _c = _sc() + try: + m = _sm(_c); e = _se(_c) + log.info(f"Manual library sync: {m} movies + {e} episodes marked Completed") + finally: + _c.close() + except Exception as ex: + log.warning(f"Manual library sync failed: {ex}") + threading.Thread(target=_do_sync, daemon=True).start() + self._json({"ok": True}) + elif self.path == "/api/retry-item": + try: + data = json.loads(body) + item_id = data.get("item_id") + if not item_id: + self._json({"ok": False, "error": "missing item_id"}); return + conn = psycopg2.connect(DB_DSN) + try: + reset_item(conn, item_id, item_id) + # Also remove from tried hashes if the item's hash is known + state = load_state() + conn2_cur = conn.cursor() + conn2_cur.execute("SELECT active_stream->>'infohash' FROM \"MediaItem\" WHERE id=%s", (item_id,)) + row = conn2_cur.fetchone() + if row and row[0]: + state.get("tried_hashes", {}).pop(row[0].lower(), None) + save_state(state) + finally: + conn.close() + _force_cycle.set() + self._json({"ok": True}) + except Exception as e: + self._json({"ok": False, "error": str(e)}) + elif self.path == "/api/cancel-pending": + try: + data = json.loads(body) + rd_id = data.get("rd_torrent_id") + if not rd_id: + self._json({"ok": False, "error": "missing rd_torrent_id"}); return + rd_delete(rd_id) + state = load_state() + state["pending"] = [e for e in state.get("pending", []) if e.get("rd_torrent_id") != rd_id] + save_state(state) + self._json({"ok": True}) + except Exception as e: + self._json({"ok": False, "error": str(e)}) + elif self.path == "/api/clear-tried": + state = load_state() + count = len(state.get("tried_hashes", {})) + state["tried_hashes"] = {} + save_state(state) + log.info(f"Cleared {count} tried hash(es) — all items eligible for retry") + self._json({"ok": True, "cleared": count}) + else: + self.send_response(404); self.end_headers() + + def _json(self, data, **kwargs): + body = json.dumps(data, **kwargs).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, *_): + pass # suppress access logs + + +class _ThreadingHTTPServer(ThreadingMixIn, HTTPServer): + daemon_threads = True + allow_reuse_address = True + +def start_ui(): + server = _ThreadingHTTPServer(("0.0.0.0", UI_PORT), UIHandler) + log.info(f"Web UI listening on http://0.0.0.0:{UI_PORT}") + server.serve_forever() + + +# ── Main cycle ──────────────────────────────────────────────────────────────── + +def run_cycle(conn, state): + set_progress(cycle_running=True, phase="pending_check", phase_label="Checking pending torrents", + search_done=0, search_total=0, search_items=[]) + + # Sync Radarr/Sonarr (Decypharr) files → mark Riven items Completed + try: + import sys as _sys + if "/data" not in _sys.path: + _sys.path.insert(0, "/data") + from sync_library import sync_movies as _sm, sync_episodes as _se, _get_conn as _sc + _sconn = _sc() + try: + _m = _sm(_sconn) + _e = _se(_sconn) + finally: + _sconn.close() + if _m + _e: + log.info(f"Library sync: {_m} movies + {_e} episodes marked Completed") + state["last_library_sync"] = { + "movies": _m, "episodes": _e, + "run_at": utcnow().isoformat() + } + except Exception as _e2: + log.warning(f"Library sync failed: {_e2}") + + # Cache library stats snapshot for Dashboard cards (fast COUNT GROUP BY) + try: + _lcur = conn.cursor() + _lcur.execute(""" + SELECT type, last_state, COUNT(*) FROM "MediaItem" + WHERE type IN ('movie','episode','show','season') + GROUP BY type, last_state + """) + _lstats = {} + for _ltyp, _lst, _lcnt in _lcur.fetchall(): + _lstats.setdefault(_ltyp, {})[_lst] = _lcnt + state["library_stats_cache"] = _lstats + except Exception as _lse: + log.debug(f"Library stats cache: {_lse}") + + # Stale RD content check — runs every STALE_CHECK_INTERVAL_H hours or on demand + # Uses RD /torrents/info/{id} to verify the user's personal torrent still exists. + # Only checks items whose active_stream contains a valid RD torrent id ("id" key). + _last_stale = state.get("last_stale_check", {}) + _last_stale_at = _last_stale.get("run_at") + _stale_due = True + if _last_stale_at and not _force_stale.is_set(): + try: + _elapsed_h = (utcnow() - datetime.fromisoformat(_last_stale_at)).total_seconds() / 3600 + _stale_due = _elapsed_h >= STALE_CHECK_INTERVAL_H + except Exception: + pass + if _stale_due: + _force_stale.clear() + set_progress(phase="stale_check", phase_label="Checking RD content health") + try: + _stale_reset = check_stale_completed(conn) + state["last_stale_check"] = { + "reset": _stale_reset, + "run_at": utcnow().isoformat(), + } + except Exception as _se: + log.warning(f"Stale check failed: {_se}") + try: conn.rollback() + except Exception: pass + + now_str = utcnow().isoformat() + + pending = state.get("pending", []) + tried = load_tried_set(state) + stats = state.setdefault("stats", {"resolved": 0, "timed_out": 0, "added": 0}) + _cycle_resolved_start = stats.get("resolved", 0) + _cycle_timed_out_start = stats.get("timed_out", 0) + _prune_cooldowns(state) + + # ── 1. Check pending ────────────────────────────────────────────────────── + log.info(f"Checking {len(pending)} pending torrent(s)...") + still_pending = [] + + for entry in pending: + # Support both old single-id and new multi-id entries + item_ids = entry.get("item_ids") or [entry["item_id"]] + label = entry["label"] + rd_id = entry["rd_torrent_id"] + timeout_at = datetime.fromisoformat(entry["timeout_at"]) + status, progress, _spd, _seed, _bl = rd_status(rd_id) + + if status in ("downloaded", "seeding"): + log.info(f" ✓ '{label}' cached — resetting {len(item_ids)} item(s) in Riven") + for iid in item_ids: + reset_item(conn, iid, label) + rd_delete(rd_id) + stats["resolved"] = stats.get("resolved", 0) + len(item_ids) + _hist = state.setdefault("history", []) + _mins = int((utcnow() - datetime.fromisoformat(entry["added_at"])).total_seconds() / 60) + _hist.insert(0, {"label": label, "torrent_title": entry.get("torrent_title", ""), "resolved_at": now_str, "was_pending": True, "time_to_cache_mins": _mins}) + state["history"] = _hist[:100] + + elif status == "downloading" and (progress or 0) < 1: + added_at = datetime.fromisoformat(entry["added_at"]) + stale_mins = (utcnow() - added_at).total_seconds() / 60 + if stale_mins >= STALE_ZERO_MINS: + log.info(f" ✗ '{label}' stale — <1% for {int(stale_mins)}m — dropping, will retry next cycle") + rd_delete(rd_id) + stats["timed_out"] = stats.get("timed_out", 0) + len(item_ids) + else: + log.info(f" ↻ '{label}': downloading <1% ({int(stale_mins)}m / {STALE_ZERO_MINS}m before drop)") + still_pending.append(entry) + + elif utcnow() > timeout_at: + log.info(f" ✗ '{label}' timed out (status={status}) — giving up") + rd_delete(rd_id) + stats["timed_out"] = stats.get("timed_out", 0) + len(item_ids) + + elif status in ("waiting_files_selection", "magnet_conversion"): + added_at = datetime.fromisoformat(entry["added_at"]) + stale_mins = (utcnow() - added_at).total_seconds() / 60 + if stale_mins >= STALE_SELECTING_MINS: + log.info(f" ✗ '{label}' stuck in {status} for {int(stale_mins)}m — dropping, will retry") + rd_delete(rd_id) + stats["timed_out"] = stats.get("timed_out", 0) + len(item_ids) + else: + log.info(f" ↻ '{label}': {status} ({int(stale_mins)}m / {STALE_SELECTING_MINS}m before drop)") + still_pending.append(entry) + + elif status in ("error", "magnet_error", "virus", "dead"): + log.info(f" ✗ '{label}' RD error: {status} — giving up") + rd_delete(rd_id) + stats["timed_out"] = stats.get("timed_out", 0) + len(item_ids) + + elif status is None: + strikes = entry.get("none_strikes", 0) + 1 + if strikes >= NONE_STRIKE_LIMIT: + log.info(f" ✗ '{label}' vanished from RD (None x{NONE_STRIKE_LIMIT}) — will retry with different torrent") + stats["timed_out"] = stats.get("timed_out", 0) + len(item_ids) + # Don't rd_delete — it's already gone. Don't remove from tried — try a different hash next cycle. + else: + entry["none_strikes"] = strikes + log.info(f" ↻ '{label}': RD status None ({strikes}/{NONE_STRIKE_LIMIT})") + still_pending.append(entry) + + else: + entry.pop("none_strikes", None) # reset strike counter on any real status + log.info(f" ↻ '{label}': {status} {progress}%") + still_pending.append(entry) + + state["pending"] = still_pending + pending_ids = set() + for e in still_pending: + for iid in (e.get("item_ids") or [e["item_id"]]): + pending_ids.add(iid) + + # ── 2. Find stuck items ─────────────────────────────────────────────────── + set_progress(phase="db_query", phase_label="Querying database for stuck items") + try: + stuck = get_stuck_items(conn) + set_cached_stuck(stuck) + except Exception as e: + log.error(f"DB query failed: {e}") + conn.rollback() + set_progress(cycle_running=False, phase="idle", phase_label="Idle (DB error)") + return + + # Separate movies and episodes; group episodes by season + movies_stuck = [i for i in stuck if i["kind"] == "movie" and i["id"] not in pending_ids] + episodes_stuck = [i for i in stuck if i["kind"] == "episode"] + season_groups = group_episodes_by_season(episodes_stuck) + season_groups = [g for g in season_groups if not (set(g["item_ids"]) & pending_ids)] + + all_targets = movies_stuck + season_groups + log.info( + f"Found {len(stuck)} stuck item(s) in DB → " + f"{len(movies_stuck)} movies, {len(season_groups)} season group(s) eligible" + ) + new_adds = 0 + + if not all_targets: + state["last_cycle_summary"] = {"stuck_found": 0, "new_adds": 0, + "resolved": stats.get("resolved", 0) - _cycle_resolved_start, + "timed_out": stats.get("timed_out", 0) - _cycle_timed_out_start} + set_progress(cycle_running=False, phase="idle", phase_label="Idle") + return + + # ── 2a. Parallel Prowlarr searches ─────────────────────────────────────── + search_targets = all_targets[:MAX_NEW_PER_CYCLE * 4] + # Skip items still in search cooldown (zero-result or wrong-season backoff) + _cooling = [t for t in search_targets if _is_in_cooldown(state, t)] + search_targets = [t for t in search_targets if not _is_in_cooldown(state, t)] + if _cooling: + log.info(f" Skipping {len(_cooling)} item(s) in search cooldown: " + + ", ".join(item_label(t) for t in _cooling[:5])) + log.info(f"Searching Prowlarr for {len(search_targets)} item(s) in parallel (workers={SEARCH_WORKERS})...") + + # Initialise per-item progress rows + item_rows = [{"label": item_label(i), "done": False, "found": None} for i in search_targets] + label_to_idx = {item_label(i): idx for idx, i in enumerate(search_targets)} + set_progress(phase="searching", phase_label="Searching Prowlarr", + search_done=0, search_total=len(search_targets), search_items=list(item_rows)) + + search_results = {} # item_id -> (item, results) + done_count = 0 + with ThreadPoolExecutor(max_workers=SEARCH_WORKERS) as pool: + future_map = {pool.submit(prowlarr_search, item): item for item in search_targets} + for fut in as_completed(future_map): + item = future_map[fut] + lbl = item_label(item) + try: + results = fut.result() + except Exception as e: + log.warning(f" Search error for '{lbl}': {e}") + results = [] + search_results[item["id"]] = (item, results) + done_count += 1 + idx = label_to_idx.get(lbl) + if idx is not None: + item_rows[idx]["done"] = True + item_rows[idx]["found"] = len(results) + set_progress(search_done=done_count, search_items=list(item_rows)) + + # ── 2b. Batch RD availability check across ALL hashes ──────────────────── + set_progress(phase="rd_check", phase_label="Checking RD cache") + all_item_hashes = {} # item_id -> list of hashes + all_hashes_flat = [] + for item_id, (item, results) in search_results.items(): + hashes = extract_hashes(results) + all_item_hashes[item_id] = hashes + all_hashes_flat.extend(hashes) + + log.info(f"Checking {len(all_hashes_flat)} unique hashes against RD cache in one batch...") + all_cached = check_rd_cache(list(set(all_hashes_flat))) if all_hashes_flat else set() + if all_cached: + log.info(f" {len(all_cached)} hash(es) already cached on RD") + + # ── 2c. Process each item using search+cache results ───────────────────── + set_progress(phase="adding", phase_label="Adding to Real-Debrid") + for item in search_targets: + if new_adds >= MAX_NEW_PER_CYCLE: + log.info(f"Reached max {MAX_NEW_PER_CYCLE} adds this cycle") + break + + item_id = item["id"] + label = item_label(item) + _, results = search_results.get(item_id, (item, [])) + hashes = all_item_hashes.get(item_id, []) + + if not results: + # Prowlarr has zero coverage — back off so we don't search every 10 min + _record_search_miss(state, item) + continue + if not hashes: + log.info(f" '{label}': no valid hashes in results") + _record_search_miss(state, item) + continue + + # item_ids: list for season groups, single-element list for movies + item_ids = item.get("item_ids") or [item_id] + + # Already cached — just reset all items in this group + cached_for_item = set(hashes) & all_cached + if cached_for_item: + log.info(f" '{label}': {len(cached_for_item)} cached hash(es) — resetting {len(item_ids)} item(s)") + _clear_search_miss(state, item) + for iid in item_ids: + reset_item(conn, iid, label) + stats["resolved"] = stats.get("resolved", 0) + len(item_ids) + _hist = state.setdefault("history", []) + _hist.insert(0, {"label": label, "torrent_title": "(already cached on RD)", "resolved_at": now_str, "was_pending": False, "time_to_cache_mins": 0}) + state["history"] = _hist[:100] + continue + + query_title = item.get("show_title") or item.get("title") or "" + sn = item.get("season_num") + best = pick_best(results, tried, label, season_num=sn, query_title=query_title) + if not best: + # Record miss only when there are genuinely no viable candidates (not just all-tried). + # "All tried" will naturally unblock once TRIED_EXPIRY_DAYS passes. + if sn is not None: + viable_untried = [r for r in results + if (r.get("infoHash") or "").lower() not in tried + and season_score(r.get("title", ""), sn) >= 0] + if not viable_untried: + _record_search_miss(state, item) + else: + viable_untried = [r for r in results + if (r.get("infoHash") or "").lower() not in tried] + if not viable_untried: + _record_search_miss(state, item) + log.info(f" '{label}': no suitable uncached torrent (all tried or no seeders)") + continue + + h = best["infoHash"].lower() + seeders = best.get("seeders") or 0 + indexer = best.get("indexer") or "?" + title = best.get("title", "")[:60] + log.info(f" '{label}' → adding: '{title}' ({seeders} seeders, {indexer})") + + rd_id = rd_add(h) + if not rd_id: + tried.add(h) + continue + + tried.add(h) + _clear_search_miss(state, item) # found something — reset backoff + timeout_at = (utcnow() + timedelta(hours=CACHE_TIMEOUT_H)).isoformat() + state["pending"].append({ + "item_ids": item_ids, + "item_id": item_ids[0], # backward compat + "label": label, + "torrent_title": best.get("title", "")[:80], + "rd_torrent_id": rd_id, + "info_hash": h, + "added_at": now_str, + "timeout_at": timeout_at, + }) + stats["added"] = stats.get("added", 0) + len(item_ids) + new_adds += 1 + + save_tried(state, tried) + stats["last_cycle"] = utcnow().strftime("%b %d %H:%M:%S UTC") + state["last_cycle_summary"] = { + "stuck_found": len(all_targets), + "new_adds": new_adds, + "resolved": stats.get("resolved", 0) - _cycle_resolved_start, + "timed_out": stats.get("timed_out", 0) - _cycle_timed_out_start, + } + log.info(f"Cycle done — pending: {len(state['pending'])}, new: {new_adds}, resolved all-time: {stats.get('resolved',0)}") + set_progress(cycle_running=False, phase="idle", phase_label="Idle") + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--once", action="store_true", help="Run one cycle then exit") + args = parser.parse_args() + + load_cacharr_config() + log.info("Cacharr started") + log.info(f" Loop interval : {LOOP_INTERVAL}s | Max adds/cycle: {MAX_NEW_PER_CYCLE}") + log.info(f" Cache timeout : {CACHE_TIMEOUT_H}h | Min stuck: {MIN_STUCK_HOURS}h / {MIN_SCRAPED_TIMES}x") + + # Start web UI in background thread + ui_thread = threading.Thread(target=start_ui, daemon=True) + ui_thread.start() + + if not args.once: + log.info("Waiting 90s for services to be ready...") + time.sleep(90) + + while True: + state = load_state() + try: + conn = psycopg2.connect(DB_DSN) + try: + run_cycle(conn, state) + finally: + conn.close() + except Exception as e: + log.error(f"Cycle error: {e}", exc_info=True) + + save_state(state) + + if args.once: + log.info("--once complete, exiting.") + break + + next_at = (utcnow() + timedelta(seconds=LOOP_INTERVAL)).isoformat() + set_progress(next_cycle_at=next_at) + log.info(f"Sleeping {LOOP_INTERVAL}s (or until forced)...") + _force_cycle.wait(timeout=LOOP_INTERVAL) + _force_cycle.clear() + + +if __name__ == "__main__": + main() diff --git a/cacharr/sync_library.py b/cacharr/sync_library.py new file mode 100644 index 0000000..f699e55 --- /dev/null +++ b/cacharr/sync_library.py @@ -0,0 +1,258 @@ +""" +sync_library.py — Bridge between Radarr/Sonarr (Decypharr) and Riven. + +Scans the library for video files placed by Radarr/Sonarr and marks the +corresponding Riven MediaItem records as Completed so Riven doesn't +re-scrape content that already exists on disk. + +Run standalone: python3 /data/sync_library.py +Or import: from sync_library import sync_all; sync_all() +""" + +import os +import re +import logging + +import psycopg2 + +log = logging.getLogger("sync_library") + +MOVIES_DIR = "/mnt/library/movies" +SHOWS_DIR = "/mnt/library/shows" +VIDEO_EXTS = {".mkv", ".mp4", ".avi", ".m4v", ".ts"} +DB_DSN = os.getenv("DB_DSN", "postgresql://DUMB:postgres@127.0.0.1:5432/riven") + +COMPLETE_SQL = """ + UPDATE "MediaItem" + SET last_state = 'Completed', + symlinked = true, + symlinked_at = NOW(), + symlink_path = %s, + file = %s, + folder = %s + WHERE id = %s +""" + + +def _source_info(path): + """Return (real_file, real_folder) for a path, following symlinks.""" + if os.path.islink(path): + target = os.readlink(path) + return os.path.basename(target), os.path.basename(os.path.dirname(target)) + return os.path.basename(path), os.path.basename(os.path.dirname(path)) + + +_JUNK_RE = re.compile(r"(?i)\b(sample|trailer|extras?|featurette|interview|deleted.scene)\b") + + +def _find_video(folder_path): + """Return the best video file path in a folder, or None. + + Prefers files whose names don't look like samples/trailers/extras. + Falls back to the first video alphabetically if all files match the + junk pattern. Accepts symlinks regardless of whether the target + resolves — the debrid FUSE mount only exists inside the DUMB + container, so os.path.exists() returns False here even for valid links. + """ + try: + entries = os.listdir(folder_path) + except OSError: + return None + primary = [] + fallback = [] + for fname in sorted(entries): + if os.path.splitext(fname)[1].lower() not in VIDEO_EXTS: + continue + full_path = os.path.join(folder_path, fname) + if _JUNK_RE.search(fname): + fallback.append(full_path) + else: + primary.append(full_path) + if primary: + return primary[0] + if fallback: + return fallback[0] + return None + + +# --------------------------------------------------------------------------- +def sync_movies(conn): + """Mark Riven movie items Completed when a library file already exists.""" + cur = conn.cursor() + synced = skipped = no_match = no_file = 0 + + for folder in sorted(os.listdir(MOVIES_DIR)): + if "{imdb-" not in folder: + continue + m = re.search(r"\{imdb-(tt\d+)\}", folder) + if not m: + continue + imdb_id = m.group(1) + folder_path = os.path.join(MOVIES_DIR, folder) + + video_path = _find_video(folder_path) + if not video_path: + no_file += 1 + continue + + cur.execute( + 'SELECT id, last_state, symlinked FROM "MediaItem" ' + "WHERE type='movie' AND imdb_id=%s LIMIT 1", + (imdb_id,), + ) + row = cur.fetchone() + if not row: + no_match += 1 + continue + + item_id, last_state, sym = row + if last_state == "Completed" and sym: + skipped += 1 + continue + + real_file, real_folder = _source_info(video_path) + cur.execute(COMPLETE_SQL, (video_path, real_file, real_folder, item_id)) + log.info(" [movie] Completed: %s", folder) + synced += 1 + + conn.commit() + log.info( + "Movies — synced:%d already_done:%d no_file:%d no_riven_match:%d", + synced, skipped, no_file, no_match, + ) + return synced + + +# --------------------------------------------------------------------------- +def _extract_ep_nums(filename): + """Return all episode numbers encoded in a filename. + + Handles: + - Single: S01E05 → [5] + - Multi: S01E01E02E03 → [1, 2, 3] + - Range: S01E01-03 → [1, 2, 3] + """ + range_match = re.search(r"[Ee](\d{1,2})-(\d{2})\b", filename) + if range_match: + start, end = int(range_match.group(1)), int(range_match.group(2)) + if end >= start: + return list(range(start, end + 1)) + tokens = re.findall(r"[Ee](\d{1,2})", filename) + return [int(t) for t in tokens] if tokens else [] + + +def sync_episodes(conn): + """Mark Riven episode items Completed when library files already exist.""" + cur = conn.cursor() + synced = skipped = no_match = no_file = 0 + + for show_folder in sorted(os.listdir(SHOWS_DIR)): + if "{imdb-" not in show_folder: + continue + m = re.search(r"\{imdb-(tt\d+)\}", show_folder) + if not m: + continue + show_imdb = m.group(1) + show_path = os.path.join(SHOWS_DIR, show_folder) + + cur.execute( + 'SELECT id FROM "MediaItem" WHERE type=\'show\' AND imdb_id=%s LIMIT 1', + (show_imdb,), + ) + row = cur.fetchone() + if not row: + continue + show_id = row[0] + + try: + season_dirs = [ + d for d in os.listdir(show_path) + if os.path.isdir(os.path.join(show_path, d)) + and d.lower().startswith("season") + ] + except OSError: + continue + + for season_dir in season_dirs: + sm = re.search(r"(\d+)", season_dir) + if not sm: + continue + season_num = int(sm.group(1)) + season_path = os.path.join(show_path, season_dir) + + cur.execute( + 'SELECT m.id FROM "MediaItem" m ' + 'JOIN "Season" s ON s.id = m.id ' + "WHERE m.type='season' AND m.number=%s AND s.parent_id=%s LIMIT 1", + (season_num, show_id), + ) + row = cur.fetchone() + if not row: + continue + season_id = row[0] + + try: + ep_files = [ + f for f in os.listdir(season_path) + if os.path.splitext(f)[1].lower() in VIDEO_EXTS + ] + except OSError: + continue + + for ep_file in ep_files: + ep_nums = _extract_ep_nums(ep_file) + if not ep_nums: + continue + ep_path = os.path.join(season_path, ep_file) + + # Don't check os.path.exists — FUSE target only resolves in DUMB + + for ep_num in ep_nums: + cur.execute( + 'SELECT m.id, m.last_state, m.symlinked FROM "MediaItem" m ' + 'JOIN "Episode" e ON e.id = m.id ' + "WHERE m.type='episode' AND m.number=%s " + "AND e.parent_id=%s LIMIT 1", + (ep_num, season_id), + ) + row = cur.fetchone() + if not row: + no_match += 1 + continue + + ep_id, last_state, sym = row + if last_state == "Completed" and sym: + skipped += 1 + continue + + real_file, real_folder = _source_info(ep_path) + cur.execute(COMPLETE_SQL, (ep_path, real_file, real_folder, ep_id)) + synced += 1 + + conn.commit() + log.info( + "Episodes — synced:%d already_done:%d no_file:%d no_riven_match:%d", + synced, skipped, no_file, no_match, + ) + return synced + + +# --------------------------------------------------------------------------- +def sync_all(): + conn = _get_conn() + try: + m = sync_movies(conn) + e = sync_episodes(conn) + return m + e + finally: + conn.close() + + +def _get_conn(): + return psycopg2.connect(DB_DSN) + + +# --------------------------------------------------------------------------- +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, format="%(message)s") + sync_all() diff --git a/main.py b/main.py index c8ec53f..2c8c133 100644 --- a/main.py +++ b/main.py @@ -970,6 +970,7 @@ def _get_metrics_cfg(): "cli_battery", "riven_backend", "riven_frontend", + "cacharr", "plex", "jellyfin", "emby", diff --git a/utils/auto_update.py b/utils/auto_update.py index 37800de..15b0787 100644 --- a/utils/auto_update.py +++ b/utils/auto_update.py @@ -2199,6 +2199,33 @@ def start_process(self, process_name, config, key, instance_name): time.sleep(10) load_settings() + if key == "cacharr": + if self.process_handler.shutting_down: + return process, "Shutdown requested" + from utils.cacharr_settings import patch_cacharr_config + + time.sleep(5) + patched, error = patch_cacharr_config() + if patched: + # Reload env from CONFIG_MANAGER so the restart picks up the + # newly injected PROWLARR_KEY (and URL) rather than the stale + # snapshot captured before patch_cacharr_config() ran. + refreshed_cfg = CONFIG_MANAGER.get("cacharr") or {} + refreshed_env = dict(env) + refreshed_env.update(refreshed_cfg.get("env") or {}) + self.logger.info("Restarting Cacharr to apply Prowlarr API key") + self.process_handler.stop_process(process_name) + process, error = self.process_handler.start_process( + process_name, + config_dir, + command, + instance_name, + suppress_logging=suppress_logging, + env=refreshed_env, + ) + elif error: + self.logger.warning("Cacharr config patch failed: %s", error) + if key == "decypharr": if self.process_handler.shutting_down: return process, "Shutdown requested" diff --git a/utils/cacharr_settings.py b/utils/cacharr_settings.py new file mode 100644 index 0000000..ca8ca1d --- /dev/null +++ b/utils/cacharr_settings.py @@ -0,0 +1,95 @@ +""" +cacharr_settings.py — Auto-configure Cacharr from DUMB's service registry. + +Called by auto_update.py after Cacharr starts. If PROWLARR_KEY is not +already set in the Cacharr env config, this function reads the API key +and base URL from Prowlarr's config.xml / instance port, then returns True +so the caller can restart Cacharr with the correct credentials. +""" + +from __future__ import annotations + +import xml.etree.ElementTree as ET +import os + +from utils.global_logger import logger +from utils.config_loader import CONFIG_MANAGER + + +def _discover_prowlarr() -> tuple[str, str]: + """Return (api_key, base_url) from the first enabled Prowlarr instance. + + Reads the API key from the instance's config.xml and constructs the + base URL from the configured port (default 9696). Returns ("", "") + when no enabled instance with a readable key is found. + """ + prowlarr_cfg = CONFIG_MANAGER.get("prowlarr") or {} + instances = prowlarr_cfg.get("instances") or {} + if not isinstance(instances, dict): + logger.warning("Cacharr: invalid prowlarr.instances type: %s", type(instances).__name__) + return "", "" + for _inst_key, inst in instances.items(): + if not isinstance(inst, dict) or not inst.get("enabled"): + continue + config_file = (inst.get("config_file") or "").strip() + api_key = "" + if config_file and os.path.isfile(config_file): + try: + tree = ET.parse(config_file) + node = tree.getroot().find(".//ApiKey") + if node is not None and (node.text or "").strip(): + api_key = node.text.strip() + except Exception as exc: + logger.warning( + "Cacharr: failed reading Prowlarr ApiKey from %s: %s", + config_file, exc, + ) + if not api_key: + continue + port = inst.get("port") or 9696 + try: + port = int(port) + except (TypeError, ValueError): + port = 9696 + base_url = f"http://127.0.0.1:{port}" + return api_key, base_url + return "", "" + + +def patch_cacharr_config() -> tuple[bool, str | None]: + """Inject Prowlarr API key and URL into Cacharr's env config when not already set. + + Returns: + (patched, error) — patched is True if the config was changed and + Cacharr should be restarted; error is a non-empty string on failure. + """ + cacharr_cfg = CONFIG_MANAGER.get("cacharr") or {} + if not isinstance(cacharr_cfg, dict) or not cacharr_cfg.get("enabled"): + return False, None + + env = cacharr_cfg.get("env") or {} + if not isinstance(env, dict): + logger.warning("Cacharr: invalid cacharr.env type: %s; skipping patch.", type(env).__name__) + return False, None + existing_key = (env.get("PROWLARR_KEY") or "").strip() + if existing_key: + logger.debug("Cacharr: PROWLARR_KEY already set; skipping auto-inject.") + return False, None + + api_key, base_url = _discover_prowlarr() + if not api_key: + logger.warning("Cacharr: Prowlarr API key not found; PROWLARR_KEY will remain empty.") + return False, None + + updates: dict = {"PROWLARR_KEY": api_key} + if base_url and not (env.get("PROWLARR_URL") or "").strip(): + updates["PROWLARR_URL"] = base_url + + try: + CONFIG_MANAGER.update("cacharr", {"env": {**env, **updates}}) + logger.info( + "Cacharr: injected Prowlarr credentials (url=%s).", base_url or "unchanged" + ) + return True, None + except Exception as exc: + return False, f"Cacharr config update failed: {exc}" diff --git a/utils/dependency_map.py b/utils/dependency_map.py index 205d225..186fa1e 100644 --- a/utils/dependency_map.py +++ b/utils/dependency_map.py @@ -56,6 +56,7 @@ def build_conditional_dependency_map( deps: dict[str, set[str]] = { "riven_backend": {"postgres"}, "riven_frontend": {"riven_backend"}, + "cacharr": {"riven_backend"}, "zilean": {"postgres"}, "pgadmin": {"postgres"}, } diff --git a/utils/dumb_config.json b/utils/dumb_config.json index 737f108..9ba65ac 100644 --- a/utils/dumb_config.json +++ b/utils/dumb_config.json @@ -798,6 +798,28 @@ "log_file": "/log/riven_frontend.log", "env": {} }, + "cacharr": { + "enabled": false, + "process_name": "Cacharr", + "suppress_logging": false, + "log_level": "INFO", + "port": 8484, + "platforms": [], + "command": [ + "/venv/bin/python", + "/cacharr/cacharr.py" + ], + "config_dir": "/cacharr", + "config_file": "/data/cacharr_config.json", + "log_file": "/log/cacharr.log", + "env": { + "PYTHONUNBUFFERED": "1", + "PROWLARR_URL": "http://127.0.0.1:9696", + "PROWLARR_KEY": "", + "DB_DSN": "postgresql://DUMB:postgres@127.0.0.1:5432/riven", + "UI_PORT": "{port}" + } + }, "radarr": { "instances": { "Default": { diff --git a/utils/dumb_config_schema.json b/utils/dumb_config_schema.json index 3fe9e7a..ad6a66c 100644 --- a/utils/dumb_config_schema.json +++ b/utils/dumb_config_schema.json @@ -2916,6 +2916,63 @@ "env" ] }, + "cacharr": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean" + }, + "process_name": { + "type": "string" + }, + "suppress_logging": { + "type": "boolean" + }, + "log_level": { + "type": "string" + }, + "port": { + "type": "integer" + }, + "platforms": { + "type": "array", + "items": { + "type": "string" + } + }, + "command": { + "type": "array", + "items": { + "type": "string" + } + }, + "config_dir": { + "type": "string" + }, + "config_file": { + "type": "string" + }, + "log_file": { + "type": "string" + }, + "env": { + "type": "object" + } + }, + "required": [ + "enabled", + "process_name", + "suppress_logging", + "log_level", + "port", + "platforms", + "command", + "config_dir", + "config_file", + "log_file", + "env" + ] + }, "sonarr": { "type": "object", "properties": {