Skip to content
Open
170 changes: 149 additions & 21 deletions snare/cloner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
import os
import re
import sys
from typing import Dict, List, Tuple, Union

import aiohttp
from bs4 import BeautifulSoup
import cssutils
from pyppeteer import launch
import pyppeteer
from pyppeteer.errors import NetworkError, PageError, TimeoutError
import yarl

Expand All @@ -21,7 +22,18 @@


class BaseCloner:
def __init__(self, root, max_depth, css_validate, default_path="/opt/snare"):
def __init__(self, root: str, max_depth: int, css_validate: bool, default_path: str = "/opt/snare") -> None:
"""Base class for all core functions of the cloner

:param root: Website root URL
:type root: str
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need type if the type annotated in function definition?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Sphinx mandates it and the type disappears from the parameter description in docs when we remove its :type from the docstring.

:param max_depth: Max depth of cloning
:type max_depth: int
:param css_validate: Whether CSS validation is enabled
:type css_validate: bool
:param default_path: Storage path for site files, defaults to "/opt/snare"
:type default_path: str, optional
"""
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
self.visited_urls = []
Expand All @@ -48,15 +60,32 @@ def __init__(self, root, max_depth, css_validate, default_path="/opt/snare"):
self.itr = 0

@staticmethod
def add_scheme(url):
def add_scheme(url: str) -> Tuple[yarl.URL, yarl.URL]:
"""Generate root and 404 URLs with proper schemes

:param url: Raw website root URL
:type url: str
:return: root URL, 404 page URL
:rtype: Tuple[yarl.URL, yarl.URL]
"""
new_url = yarl.URL(url)
if not new_url.scheme:
new_url = yarl.URL("http://" + url)
err_url = new_url.with_path("/status_404").with_query(None).with_fragment(None)
return new_url, err_url

@staticmethod
def get_headers(response):
def get_headers(
response: Union[aiohttp.ClientResponse, pyppeteer.network_manager.Response]
) -> Tuple[List[Dict[str, str]], Union[str, None]]:
"""Filter response headers, convert them to a list of dictionaries of each header
and return them along with the content type

:param response: Response object from aiohttp or Pyppeteer
:type response: Union[aiohttp.ClientResponse, pyppeteer.network_manager.Response]
:return: Response headers, Content-type
:rtype: Tuple[List[Dict[str, str]], str]
"""
ignored_headers_lowercase = [
"age",
"cache-control",
Expand All @@ -74,12 +103,23 @@ def get_headers(response):
headers = []
for key, value in response.headers.items():
if key.lower() == "content-type":
content_type = value
content_type = str(value)
elif key.lower() not in ignored_headers_lowercase:
headers.append({key: value})
return [headers, content_type]

async def process_link(self, url, level, check_host=False):
return headers, content_type

async def process_link(self, url: str, level: int, check_host: bool = False) -> Union[str, None]:
"""Process (relative and absolute) links to make them suitable for serving and add new URLs to the queue

:param url: Page URL
:type url: str
:param level: Page depth
:type level: int
:param check_host: Whether to check the host while processing, defaults to False
:type check_host: bool, optional
:return: Processed link
:rtype: Union[str, None]
"""
try:
url = yarl.URL(url)
except UnicodeError:
Expand Down Expand Up @@ -111,7 +151,16 @@ async def process_link(self, url, level, check_host=False):
self.logger.error("ValueError while processing the %s link", url)
return res

async def replace_links(self, data, level):
async def replace_links(self, data: Union[bytes, str], level: int) -> BeautifulSoup:
"""Replace website links to make them suitable for serving
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean "suitable"? try to be more concrete, maybe even with example

Copy link
Copy Markdown
Contributor Author

@lordlabuckdas lordlabuckdas Aug 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have clarified it in 58eafef. Will add detailed descriptions to this and other functions in the next series of commits.


:param data: Page data
:type data: Union[bytes, str]
:param level: Page depth
:type level: int
:return: BeautifulSoup object
:rtype: BeautifulSoup
"""
soup = BeautifulSoup(data, "html.parser")

# find all relative links
Expand Down Expand Up @@ -139,7 +188,14 @@ async def replace_links(self, data, level):

return soup

def _make_filename(self, url):
def _make_filename(self, url: yarl.URL) -> Tuple[str, str]:
"""Generate file name and its hash for meta info and file storage

:param url: Site URL
:type url: yarl.URL
:return: File name, its MD5 hash
:rtype: Tuple[str, str]
"""
if url.is_absolute():
file_name = url.relative().human_repr()
else:
Expand All @@ -157,10 +213,27 @@ def _make_filename(self, url):
hash_name = m.hexdigest()
return file_name, hash_name

async def fetch_data(self, driver, current_url, level, try_count):
async def fetch_data(self, driver: None, current_url: None, level: None, try_count: None) -> None:
"""Abstract method to fetch data from the given URL

:param driver: Driver object
:type driver: None
:param current_url: URL of the page to clone
:type current_url: None
:param level: Depth of the URL
:type level: None
:param try_count: Try count of the URL
:type try_count: None
:raises NotImplementedError: Abstract method
"""
raise NotImplementedError

async def get_body(self, driver):
async def get_body(self, driver: Union[aiohttp.ClientSession, pyppeteer.browser.Browser]) -> None:
"""Get page body of URLs in queue

:param driver: Driver object to fetch data
:type driver: Union[aiohttp.ClientSession, pyppeteer.browser.Browser]
"""
while not self.new_urls.empty():
print(animation[self.itr], end="\r")
self.itr = (self.itr + 1) % len(animation)
Expand Down Expand Up @@ -212,7 +285,8 @@ async def get_body(self, driver):
except TypeError:
await self.new_urls.put({"url": current_url, "level": level, "try_count": try_count + 1})

async def get_root_host(self):
async def get_root_host(self) -> None:
"""Update the website's root host"""
try:
async with aiohttp.ClientSession() as session:
resp = await session.get(self.root)
Expand All @@ -225,7 +299,22 @@ async def get_root_host(self):


class SimpleCloner(BaseCloner):
async def fetch_data(self, session, current_url, level, try_count):
async def fetch_data(
self, session: aiohttp.ClientSession, current_url: yarl.URL, level: int, try_count: int
) -> Tuple[Union[yarl.URL, None], bytes, List[Dict[str, str]], str]:
"""Fetch data from the given URL using aiohttp

:param session: aiohttp ClientSession object
:type session: aiohttp.ClientSession
:param current_url: URL of the page to clone
:type current_url: yarl.URL
:param level: Depth of the URL
:type level: int
:param try_count: Try count of the URL
:type try_count: int
:return: Redirected URL, Page data, Response headers, Page content type
:rtype: Tuple[Union[yarl.URL, None], bytes, List[Dict[str, str]], str]
"""
data = None
headers = []
content_type = None
Expand All @@ -242,11 +331,26 @@ async def fetch_data(self, session, current_url, level, try_count):
await self.new_urls.put({"url": current_url, "level": level, "try_count": try_count + 1})
else:
await response.release()
return [redirect_url, data, headers, content_type]
return redirect_url, data, headers, content_type


class HeadlessCloner(BaseCloner):
async def fetch_data(self, browser, current_url, level, try_count):
async def fetch_data(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding the docstring for the class as well

self, browser: pyppeteer.browser.Browser, current_url: yarl.URL, level: int, try_count: int
) -> Tuple[Union[yarl.URL, None], Union[str, bytes], List[Dict[str, str]], Union[str, None]]:
"""Fetch data from the given URL using Pyppeteer

:param browser: Pyppeteer Browser object
:type browser: pyppeteer.Browser.browser
:param current_url: URL of the page to clone
:type current_url: yarl.URL
:param level: Depth of the URL
:type level: int
:param try_count: Try count of the URL
:type try_count: int
:return: Redirected URL, Page data, Response headers, Page content type
:rtype: Tuple[Union[yarl.URL, None], Union[str, bytes], List[Dict[str, str]], Union[str, None]]
"""
data = None
headers = []
content_type = None
Expand All @@ -270,11 +374,27 @@ async def fetch_data(self, browser, current_url, level, try_count):
except PageError as err: # when KeyboardInterrupt is raised midway cloning
self.logger.error(err)

return [redirect_url, data, headers, content_type]
return redirect_url, data, headers, content_type


class CloneRunner:
def __init__(self, root, max_depth, css_validate, default_path="/opt/snare", headless=False):
def __init__(
self, root: str, max_depth: int, css_validate: bool, default_path: str = "/opt/snare", headless: bool = False
) -> None:
"""Runner class for all cloners

:param root: Website root URL
:type root: str
:param max_depth: Max depth of cloning
:type max_depth: int
:param css_validate: Whether CSS validation is enabled
:type css_validate: bool
:param default_path: Storage path for site files, defaults to "/opt/snare"
:type default_path: str, optional
:param headless: Whether headless cloning is enabled, defaults to False
:type headless: bool, optional
:raises Exception: If runner instance is None indicating initialization error
"""
self.driver = None
self.runner = None
if headless:
Expand All @@ -284,14 +404,18 @@ def __init__(self, root, max_depth, css_validate, default_path="/opt/snare", hea
if not self.runner:
raise Exception("Error initializing cloner!")

async def run(self):
async def run(self) -> None:
"""Clone website

:raises Exception: If runner instance is None
"""
if not self.runner:
raise Exception("Error running cloner! - Cloner instance is None")
if type(self.runner) == SimpleCloner:
self.driver = aiohttp.ClientSession()
else:
# close and handle SIGINIT manually with `except KeyboardInterrupt`
self.driver = await launch(autoClose=False, handleSIGINT=False)
self.driver = await pyppeteer.launch(autoClose=False, handleSIGINT=False)
try:
await self.runner.new_urls.put({"url": self.runner.root, "level": 0, "try_count": 0})
await self.runner.new_urls.put({"url": self.runner.error_page, "level": 0, "try_count": 0})
Expand All @@ -300,7 +424,11 @@ async def run(self):
# in most cases, the exception is caught in `bin/clone`
print_color("\nKeyboardInterrupt received... Quitting", "ERROR")

async def close(self):
async def close(self) -> None:
"""Close all open connections and write meta info into file

:raises Exception: If runner instance is None
"""
if not self.runner:
raise Exception("Error closing cloner! - Cloner instance is None")
with open(os.path.join(self.runner.target_path, "meta.json"), "w") as mj:
Expand Down
26 changes: 23 additions & 3 deletions snare/html_handler.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
import asyncio
import json
import logging
from typing import Dict, List, Union

import aiohttp
from bs4 import BeautifulSoup
import cssutils


class HtmlHandler:
def __init__(self, no_dorks, tanner):
def __init__(self, no_dorks: bool, tanner: str):
"""Class to handle HTML contents of pages

:param no_dorks: Disable dorks
:type no_dorks: bool
:param tanner: Tanner host address
:type tanner: str
"""
self.no_dorks = no_dorks
self.dorks = []
self.logger = logging.getLogger(__name__)
self.tanner = tanner

async def get_dorks(self):
async def get_dorks(self) -> List[Dict[str, str]]:
"""Fetch all dorks from Tanner

:return: Dorks
:rtype: List[Dict[str, str]]
"""
dorks = None
try:
async with aiohttp.ClientSession() as session:
Expand All @@ -29,7 +42,14 @@ async def get_dorks(self):
self.logger.error("Dorks timeout error: %s", error)
return dorks["response"]["dorks"] if dorks else []

async def handle_content(self, content):
async def handle_content(self, content: Union[str, bytes, None]) -> bytes:
"""Parse and fix CSS and add dorks to page content

:param content: Page content
:type content: Union[str, bytes, None]
:return: Modified page content to be served
:rtype: bytes
"""
soup = BeautifulSoup(content, "html.parser")
if self.no_dorks is not True:
for p_elem in soup.find_all("p"):
Expand Down
Loading