diff --git a/docs/Makefile b/docs/Makefile index 298ea9e2..51285967 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -16,4 +16,4 @@ help: # Catch-all target: route all unknown targets to Sphinx using the new # "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). %: Makefile - @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) \ No newline at end of file + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/conf.py b/docs/conf.py index f60f5db3..13e3d6cd 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -38,7 +38,7 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = [] +extensions = ["sphinx.ext.autodoc"] # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] @@ -84,7 +84,7 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ["_static"] +html_static_path = [] # Custom sidebar templates, must be a dictionary that maps document names # to template names. diff --git a/docs/dev/modules.rst b/docs/dev/modules.rst new file mode 100644 index 00000000..8db66008 --- /dev/null +++ b/docs/dev/modules.rst @@ -0,0 +1,7 @@ +snare +===== + +.. toctree:: + :maxdepth: 4 + + snare diff --git a/docs/dev/snare.rst b/docs/dev/snare.rst new file mode 100644 index 00000000..8525e9b9 --- /dev/null +++ b/docs/dev/snare.rst @@ -0,0 +1,61 @@ +snare package +============= + +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + snare.utils + +Submodules +---------- + +snare.cloner module +------------------- + +.. automodule:: snare.cloner + :members: + :undoc-members: + :show-inheritance: + +snare.html\_handler module +-------------------------- + +.. automodule:: snare.html_handler + :members: + :undoc-members: + :show-inheritance: + +snare.middlewares module +------------------------ + +.. automodule:: snare.middlewares + :members: + :undoc-members: + :show-inheritance: + +snare.server module +------------------- + +.. automodule:: snare.server + :members: + :undoc-members: + :show-inheritance: + +snare.tanner\_handler module +---------------------------- + +.. automodule:: snare.tanner_handler + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: snare + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/dev/snare.utils.rst b/docs/dev/snare.utils.rst new file mode 100644 index 00000000..516a5f4e --- /dev/null +++ b/docs/dev/snare.utils.rst @@ -0,0 +1,45 @@ +snare.utils package +=================== + +Submodules +---------- + +snare.utils.asyncmock module +---------------------------- + +.. automodule:: snare.utils.asyncmock + :members: + :undoc-members: + :show-inheritance: + +snare.utils.logger module +------------------------- + +.. automodule:: snare.utils.logger + :members: + :undoc-members: + :show-inheritance: + +snare.utils.page\_path\_generator module +---------------------------------------- + +.. automodule:: snare.utils.page_path_generator + :members: + :undoc-members: + :show-inheritance: + +snare.utils.snare\_helpers module +--------------------------------- + +.. automodule:: snare.utils.snare_helpers + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: snare.utils + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/index.rst b/docs/index.rst index dc4b44ff..067648d7 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -13,6 +13,7 @@ Welcome to SNARE's documentation! quick-start parameters cloner + Snare API diff --git a/snare/cloner.py b/snare/cloner.py index 20ab2261..60f81076 100644 --- a/snare/cloner.py +++ b/snare/cloner.py @@ -7,11 +7,12 @@ import os import re import sys +from typing import Dict, List, Tuple, Union import aiohttp from bs4 import BeautifulSoup import cssutils -from pyppeteer import launch +import pyppeteer from pyppeteer.errors import NetworkError, PageError, TimeoutError import yarl @@ -21,7 +22,20 @@ class BaseCloner: - def __init__(self, root, max_depth, css_validate, default_path="/opt/snare"): + """Abstract base class for all core functions of cloner""" + + def __init__(self, root: str, max_depth: int, css_validate: bool, default_path: str = "/opt/snare") -> None: + """Constructor method + + :param root: Website root URL + :type root: str + :param max_depth: Max depth of cloning + :type max_depth: int + :param css_validate: Whether CSS validation is enabled + :type css_validate: bool + :param default_path: Storage path for site files, defaults to "/opt/snare" + :type default_path: str, optional + """ self.logger = logging.getLogger(__name__) self.logger.setLevel(logging.DEBUG) self.visited_urls = [] @@ -48,7 +62,22 @@ def __init__(self, root, max_depth, css_validate, default_path="/opt/snare"): self.itr = 0 @staticmethod - def add_scheme(url): + def add_scheme(url: str) -> Tuple[yarl.URL, yarl.URL]: + """Generate root and 404 URLs with schemes (http/https) + + :param url: Raw website root URL + :type url: str + :return: root URL, 404 page URL + :rtype: Tuple[yarl.URL, yarl.URL] + + Example: + .. code-block:: python + + >>> from snare.cloner import BaseCloner + >>> BaseCloner.add_scheme("foo.bar") + (URL('http://foo.bar'), URL('http://foo.bar/status_404')) + + """ new_url = yarl.URL(url) if not new_url.scheme: new_url = yarl.URL("http://" + url) @@ -56,7 +85,17 @@ def add_scheme(url): return new_url, err_url @staticmethod - def get_headers(response): + def get_headers( + response: Union[aiohttp.ClientResponse, pyppeteer.network_manager.Response] + ) -> Tuple[List[Dict[str, str]], Union[str, None]]: + """Filter response headers, convert them to a list of dictionaries of each header + and return them along with the content type + + :param response: Response object from aiohttp or Pyppeteer + :type response: Union[aiohttp.ClientResponse, pyppeteer.network_manager.Response] + :return: Response headers, Content-type + :rtype: Tuple[List[Dict[str, str]], str] + """ ignored_headers_lowercase = [ "age", "cache-control", @@ -74,12 +113,40 @@ def get_headers(response): headers = [] for key, value in response.headers.items(): if key.lower() == "content-type": - content_type = value + content_type = str(value) elif key.lower() not in ignored_headers_lowercase: headers.append({key: value}) - return [headers, content_type] - - async def process_link(self, url, level, check_host=False): + return headers, content_type + + async def process_link(self, url: str, level: int, check_host: bool = False) -> Union[str, None]: + """Add URL to the queue if new and return its relative URL + + :param url: Page URL + :type url: str + :param level: Page depth + :type level: int + :param check_host: Whether to check the host while processing, defaults to False + :type check_host: bool, optional + :return: Processed link + :rtype: Union[str, None] + + Example: + .. code-block:: python + + >>> import asyncio + >>> from snare.cloner import BaseCloner + >>> from yarl import URL + >>> root_url = "http://foo.com" + >>> cloner = BaseCloner(root_url, max_depth=10, css_validate=False) + >>> url = "http://foo.com/bar" + >>> processed_url = "" + >>> async def test_process_link(): + >>> processed_url = await cloner.process_link(url, level=1) + >>> asyncio.run(test_process_link()) + >>> processed_url + '/bar' + + """ try: url = yarl.URL(url) except UnicodeError: @@ -111,7 +178,16 @@ async def process_link(self, url, level, check_host=False): self.logger.error("ValueError while processing the %s link", url) return res - async def replace_links(self, data, level): + async def replace_links(self, data: Union[bytes, str], level: int) -> BeautifulSoup: + """Replace all links present in the page's data with their relative counterparts + + :param data: Page data + :type data: Union[bytes, str] + :param level: Page depth + :type level: int + :return: BeautifulSoup object + :rtype: BeautifulSoup + """ soup = BeautifulSoup(data, "html.parser") # find all relative links @@ -139,7 +215,26 @@ async def replace_links(self, data, level): return soup - def _make_filename(self, url): + def _make_filename(self, url: yarl.URL) -> Tuple[str, str]: + """Generate file name and its hash for meta info and file storage + + :param url: Site URL + :type url: yarl.URL + :return: File name, its MD5 hash + :rtype: Tuple[str, str] + + Example: + .. code-block:: python + + >>> from snare.cloner import BaseCloner + >>> from yarl import URL + >>> root_url = "http://foo.com" + >>> cloner = BaseCloner(root_url, max_depth=10, css_validate=False) + >>> url = URL("http://foo.com/bar") + >>> cloner._make_filename(url) + ('/bar', '6a764eebfa109a9ef76c113f3f608c6b') + + """ if url.is_absolute(): file_name = url.relative().human_repr() else: @@ -157,10 +252,27 @@ def _make_filename(self, url): hash_name = m.hexdigest() return file_name, hash_name - async def fetch_data(self, driver, current_url, level, try_count): + async def fetch_data(self, driver: None, current_url: None, level: None, try_count: None) -> None: + """Abstract method to fetch data from the given URL + + :param driver: Driver object + :type driver: None + :param current_url: URL of the page to clone + :type current_url: None + :param level: Depth of the URL + :type level: None + :param try_count: Try count of the URL + :type try_count: None + :raises NotImplementedError: Abstract method + """ raise NotImplementedError - async def get_body(self, driver): + async def get_body(self, driver: Union[aiohttp.ClientSession, pyppeteer.browser.Browser]) -> None: + """Get page body of URLs in queue + + :param driver: Driver object to fetch data + :type driver: Union[aiohttp.ClientSession, pyppeteer.browser.Browser] + """ while not self.new_urls.empty(): print(animation[self.itr], end="\r") self.itr = (self.itr + 1) % len(animation) @@ -212,7 +324,8 @@ async def get_body(self, driver): except TypeError: await self.new_urls.put({"url": current_url, "level": level, "try_count": try_count + 1}) - async def get_root_host(self): + async def get_root_host(self) -> None: + """Fetch the root page and update the website's root host""" try: async with aiohttp.ClientSession() as session: resp = await session.get(self.root) @@ -225,7 +338,24 @@ async def get_root_host(self): class SimpleCloner(BaseCloner): - async def fetch_data(self, session, current_url, level, try_count): + """aiohttp-driven data fetching""" + + async def fetch_data( + self, session: aiohttp.ClientSession, current_url: yarl.URL, level: int, try_count: int + ) -> Tuple[Union[yarl.URL, None], bytes, List[Dict[str, str]], str]: + """Fetch data from the given URL using aiohttp's ClientSession + + :param session: aiohttp ClientSession object + :type session: aiohttp.ClientSession + :param current_url: URL of the page to clone + :type current_url: yarl.URL + :param level: Depth of the URL + :type level: int + :param try_count: Try count of the URL + :type try_count: int + :return: Redirected URL, Page data, Response headers, Page content type + :rtype: Tuple[Union[yarl.URL, None], bytes, List[Dict[str, str]], str] + """ data = None headers = [] content_type = None @@ -242,11 +372,28 @@ async def fetch_data(self, session, current_url, level, try_count): await self.new_urls.put({"url": current_url, "level": level, "try_count": try_count + 1}) else: await response.release() - return [redirect_url, data, headers, content_type] + return redirect_url, data, headers, content_type class HeadlessCloner(BaseCloner): - async def fetch_data(self, browser, current_url, level, try_count): + """Pyppeteer-driven data fetching""" + + async def fetch_data( + self, browser: pyppeteer.browser.Browser, current_url: yarl.URL, level: int, try_count: int + ) -> Tuple[Union[yarl.URL, None], Union[str, bytes], List[Dict[str, str]], Union[str, None]]: + """Fetch data from the given URL using Pyppeteer's headless browser + + :param browser: Pyppeteer Browser object + :type browser: pyppeteer.Browser.browser + :param current_url: URL of the page to clone + :type current_url: yarl.URL + :param level: Depth of the URL + :type level: int + :param try_count: Try count of the URL + :type try_count: int + :return: Redirected URL, Page data, Response headers, Page content type + :rtype: Tuple[Union[yarl.URL, None], Union[str, bytes], List[Dict[str, str]], Union[str, None]] + """ data = None headers = [] content_type = None @@ -270,11 +417,29 @@ async def fetch_data(self, browser, current_url, level, try_count): except PageError as err: # when KeyboardInterrupt is raised midway cloning self.logger.error(err) - return [redirect_url, data, headers, content_type] + return redirect_url, data, headers, content_type class CloneRunner: - def __init__(self, root, max_depth, css_validate, default_path="/opt/snare", headless=False): + """One class to rule them all - Runner class for all cloners""" + + def __init__( + self, root: str, max_depth: int, css_validate: bool, default_path: str = "/opt/snare", headless: bool = False + ) -> None: + """Constructor method + + :param root: Website root URL + :type root: str + :param max_depth: Max depth of cloning + :type max_depth: int + :param css_validate: Whether CSS validation is enabled + :type css_validate: bool + :param default_path: Storage path for site files, defaults to "/opt/snare" + :type default_path: str, optional + :param headless: Whether headless cloning is enabled, defaults to False + :type headless: bool, optional + :raises Exception: If runner instance is None indicating initialization error + """ self.driver = None self.runner = None if headless: @@ -284,14 +449,18 @@ def __init__(self, root, max_depth, css_validate, default_path="/opt/snare", hea if not self.runner: raise Exception("Error initializing cloner!") - async def run(self): + async def run(self) -> None: + """Clone website + + :raises Exception: If runner instance is None + """ if not self.runner: raise Exception("Error running cloner! - Cloner instance is None") if type(self.runner) == SimpleCloner: self.driver = aiohttp.ClientSession() else: # close and handle SIGINIT manually with `except KeyboardInterrupt` - self.driver = await launch(autoClose=False, handleSIGINT=False) + self.driver = await pyppeteer.launch(autoClose=False, handleSIGINT=False) try: await self.runner.new_urls.put({"url": self.runner.root, "level": 0, "try_count": 0}) await self.runner.new_urls.put({"url": self.runner.error_page, "level": 0, "try_count": 0}) @@ -300,7 +469,11 @@ async def run(self): # in most cases, the exception is caught in `bin/clone` print_color("\nKeyboardInterrupt received... Quitting", "ERROR") - async def close(self): + async def close(self) -> None: + """Close all open connections and write meta info into file + + :raises Exception: If runner instance is None + """ if not self.runner: raise Exception("Error closing cloner! - Cloner instance is None") with open(os.path.join(self.runner.target_path, "meta.json"), "w") as mj: diff --git a/snare/html_handler.py b/snare/html_handler.py index da1528ea..111f4fed 100644 --- a/snare/html_handler.py +++ b/snare/html_handler.py @@ -1,6 +1,7 @@ import asyncio import json import logging +from typing import Dict, List, Union import aiohttp from bs4 import BeautifulSoup @@ -8,13 +9,27 @@ class HtmlHandler: - def __init__(self, no_dorks, tanner): + """Handle HTML content of pages""" + + def __init__(self, no_dorks: bool, tanner: str): + """Constructor method + + :param no_dorks: Disable dorks + :type no_dorks: bool + :param tanner: Tanner host address + :type tanner: str + """ self.no_dorks = no_dorks self.dorks = [] self.logger = logging.getLogger(__name__) self.tanner = tanner - async def get_dorks(self): + async def get_dorks(self) -> List[Dict[str, str]]: + """Fetch all dorks from Tanner + + :return: Dorks + :rtype: List[Dict[str, str]] + """ dorks = None try: async with aiohttp.ClientSession() as session: @@ -29,7 +44,14 @@ async def get_dorks(self): self.logger.error("Dorks timeout error: %s", error) return dorks["response"]["dorks"] if dorks else [] - async def handle_content(self, content): + async def handle_content(self, content: Union[str, bytes, None]) -> bytes: + """Parse and fix CSS and add dorks to page content + + :param content: Page content + :type content: Union[str, bytes, None] + :return: Modified page content to be served + :rtype: bytes + """ soup = BeautifulSoup(content, "html.parser") if self.no_dorks is not True: for p_elem in soup.find_all("p"): diff --git a/snare/middlewares.py b/snare/middlewares.py index 4c739430..1ddf9372 100644 --- a/snare/middlewares.py +++ b/snare/middlewares.py @@ -1,10 +1,25 @@ +from typing import Callable, Dict, List, Union + from aiohttp import web import aiohttp_jinja2 import multidict class SnareMiddleware: - def __init__(self, error_404, headers=[], server_header=""): + """Middleware for Snare's aiohttp web server""" + + def __init__( + self, error_404: Union[None, str], headers: List[Dict[str, str]] = [], server_header: str = "" + ) -> None: + """Constructor method + + :param error_404: 404 page's file name (hash) + :type error_404: Union[None, str] + :param headers: 404 page headers, defaults to [] + :type headers: List[Dict[str, str]], optional + :param server_header: Server header/banner, defaults to "" + :type server_header: str, optional + """ self.error_404 = error_404 self.headers = multidict.CIMultiDict() @@ -15,7 +30,15 @@ def __init__(self, error_404, headers=[], server_header=""): if server_header: self.headers["Server"] = server_header - async def handle_404(self, request): + async def handle_404(self, request: web.Request) -> web.Response: + """404 Handler (Page not found) + + :param request: Incoming request + :type request: web.Request + :raises web.HTTPNotFound: With correct headers from meta if 404 file is not found + :return: Templated 404 response + :rtype: web.Response + """ if not self.error_404: raise web.HTTPNotFound(headers=self.headers) response = aiohttp_jinja2.render_template(self.error_404, request, {}, status=404) @@ -23,12 +46,30 @@ async def handle_404(self, request): response.headers[key] = val return response - async def handle_500(self, _): + async def handle_500(self, _: web.Request) -> None: + """500 handler + + :param _: Incoming request + :type _: web.Request + :raises web.HTTPInternalServerError: With correct headers from meta + """ raise web.HTTPInternalServerError(headers=self.headers) - def create_error_middleware(self, overrides): + def create_error_middleware( + self, overrides: Dict + ) -> Callable[[web.Request, Callable[[web.Request], web.Response]], web.Response]: + """Create middleware for given errors + + :param overrides: Status codes and their handlers + :type overrides: Dict + :return: Middleware function to prepare response from handlers + :rtype: web.middleware + """ + @web.middleware - async def error_middleware(request, handler): + async def error_middleware( + request: web.Request, handler: Callable[[web.Request], web.Response] + ) -> web.Response: try: response = await handler(request) status = response.status @@ -47,7 +88,12 @@ async def error_middleware(request, handler): return error_middleware - def setup_middlewares(self, app): + def setup_middlewares(self, app: web.Application) -> None: + """Setup middlware + + :param app: Snare's aiohttp web application + :type app: web.Application + """ error_middleware = self.create_error_middleware( { 404: self.handle_404, diff --git a/snare/server.py b/snare/server.py index 7003c468..c7132128 100644 --- a/snare/server.py +++ b/snare/server.py @@ -1,4 +1,6 @@ +import argparse import logging +from typing import Dict import aiohttp from aiohttp import web @@ -11,7 +13,30 @@ class HttpRequestHandler: - def __init__(self, meta, run_args, snare_uuid, debug=False, keep_alive=75, **kwargs): + """Handle all HTTP requests to Snare""" + + def __init__( + self, + meta: Dict, + run_args: argparse.Namespace, + snare_uuid: bytes, + debug: bool = False, + keep_alive: int = 75, + **kwargs: Dict[str, str] + ) -> None: + """Constructor method + + :param meta: Meta info from `meta.json` + :type meta: Dict + :param run_args: Runtime CLI arguments + :type run_args: argparse.Namespace + :param snare_uuid: UUID of Snare instance + :type snare_uuid: bytes + :param debug: Enable debugging with verbose logs, defaults to False + :type debug: bool, optional + :param keep_alive: HTTP connection persistence duration, defaults to 75 + :type keep_alive: int, optional + """ self.run_args = run_args self.dir = run_args.full_page_path self.meta = meta @@ -20,7 +45,12 @@ def __init__(self, meta, run_args, snare_uuid, debug=False, keep_alive=75, **kwa self.sroute = StaticRoute(name=None, prefix="/", directory=self.dir) self.tanner_handler = TannerHandler(run_args, meta, snare_uuid) - async def submit_slurp(self, data): + async def submit_slurp(self, data: str) -> None: + """Log request URL to Slurp service + + :param data: Request URL + :type data: str + """ try: async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session: r = await session.post( @@ -35,7 +65,15 @@ async def submit_slurp(self, data): except Exception as e: self.logger.error("Error submitting slurp: %s", e) - async def handle_request(self, request): + async def handle_request(self, request: web.Request) -> web.Response: + """Communicate with Tanner to prepare response to incoming requests + + :param request: Incoming request + :type request: web.Request + :raises web.HTTPNotFound: If requested URL/page cannot be found (Status code: 404) + :return: Response + :rtype: web.Response + """ self.logger.info("Request path: {0}".format(request.path_qs)) data = self.tanner_handler.create_data(request, 200) if request.method == "POST": @@ -75,11 +113,19 @@ async def handle_request(self, request): return web.Response(body=content, status=status_code, headers=headers) @staticmethod - async def remove_default_server_header(_, response): + async def remove_default_server_header(_: web.Request, response: web.Response) -> None: + """Remove the default aiohttp server header (anti-fingerprinting defense) + + :param _: Incoming request + :type _: web.Request + :param response: Response to be sent + :type response: web.Response + """ if response.headers.get("Server") and "aiohttp" in response.headers["Server"]: del response.headers["Server"] - async def start(self): + async def start(self) -> None: + """Start Snare web server""" app = web.Application() app.add_routes([web.route("*", "/{tail:.*}", self.handle_request)]) app.on_response_prepare.append(self.remove_default_server_header) @@ -99,5 +145,6 @@ async def start(self): names = sorted(str(s.name) for s in self.runner.sites) print("======== Running on {} ========\n" "(Press CTRL+C to quit)".format(", ".join(names))) - async def stop(self): + async def stop(self) -> None: + """Clean up and close connections""" await self.runner.cleanup() diff --git a/snare/tanner_handler.py b/snare/tanner_handler.py index 0dfc8cd0..8b3998fe 100644 --- a/snare/tanner_handler.py +++ b/snare/tanner_handler.py @@ -1,7 +1,9 @@ +import argparse import json import logging import os import re +from typing import Dict, Tuple, Union from urllib.parse import unquote import aiohttp @@ -13,7 +15,18 @@ class TannerHandler: - def __init__(self, run_args, meta, snare_uuid): + """Handle Tanner communication""" + + def __init__(self, run_args: argparse.Namespace, meta: Dict, snare_uuid: bytes) -> None: + """Constructor method + + :param run_args: Runtime CLI arguments + :type run_args: argparse.Namespace + :param meta: Meta info from meta.json + :type meta: Dict + :param snare_uuid: UUID of Snare instance + :type snare_uuid: bytes + """ self.run_args = run_args self.meta = meta self.dir = run_args.full_page_path @@ -21,7 +34,16 @@ def __init__(self, run_args, meta, snare_uuid): self.html_handler = HtmlHandler(run_args.no_dorks, run_args.tanner) self.logger = logging.getLogger(__name__) - def create_data(self, request, response_status): + def create_data(self, request: web.Request, response_status: int) -> Dict: + """Create data to be sent to Tanner from request + + :param request: Incoming request to Snare server + :type request: web.Request + :param response_status: Reponse's status code + :type response_status: int + :return: Data to be sent to Tanner + :rtype: Dict + """ data = dict( method=None, path=None, @@ -47,7 +69,15 @@ def create_data(self, request, response_status): data["cookies"] = {cookie.split("=")[0]: cookie.split("=")[1] for cookie in header["Cookie"].split(";")} return data - async def submit_data(self, data): + async def submit_data(self, data: Dict[str, Union[str, int, Dict[str, str]]]) -> Union[None, Dict]: + """Submit data to Tanner and fetch response + + :param data: Data to be sent to Tanner + :type data: Dict[str, Union[str, int, Dict[str, str]]] + :raises e: If there is an error sending data to Tanner + :return: Response from Tanner + :rtype: Union[None, Dict] + """ event_result = None try: async with aiohttp.ClientSession() as session: @@ -84,7 +114,20 @@ async def submit_data(self, data): raise e return event_result - async def parse_tanner_response(self, requested_name, detection): + async def parse_tanner_response( + self, requested_name: str, detection: Dict[str, Union[str, int]] + ) -> Tuple[Union[None, bytes], multidict.CIMultiDict, int]: + """Parse Tanner's response to prepare Snare's response + + :param requested_name: Requested path + :type requested_name: str + :param detection: Tanner detection info + :type detection: Dict[str, Union[str, int]] + :raises web.HTTPFound: If page redirects + :raises web.HTTPFound: If error page redirects + :return: Response content, headers and status code + :rtype: Tuple[Union[None, bytes], multidict.CIMultiDict, int] + """ content = None status_code = 200 headers = multidict.CIMultiDict() diff --git a/snare/utils/asyncmock.py b/snare/utils/asyncmock.py index 82e07310..2feac683 100644 --- a/snare/utils/asyncmock.py +++ b/snare/utils/asyncmock.py @@ -1,7 +1,9 @@ from unittest.mock import Mock -class AsyncMock(Mock): # custom function defined to mock asyncio coroutines +class AsyncMock(Mock): + """Custom class defined to mock asyncio coroutines""" + def __call__(self, *args, **kwargs): sup = super(AsyncMock, self) diff --git a/snare/utils/logger.py b/snare/utils/logger.py index 980e66c8..bd071b6a 100644 --- a/snare/utils/logger.py +++ b/snare/utils/logger.py @@ -5,18 +5,42 @@ class LevelFilter(logging.Filter): """Filters (lets through) all messages with level < LEVEL""" - def __init__(self, level): + def __init__(self, level: int) -> None: + """Initialize level filter with level + + :param level: Log level + :type level: int + """ self.level = level - def filter(self, record): - return record.levelno < self.level + def filter(self, record: logging.LogRecord) -> bool: + """Filter record by log level - # "<" instead of "<=": since logger.setLevel is inclusive, this should be exclusive + :param record: Log record + :type record: logging.LogRecord + :return: True if record's level is lesser than the set level + :rtype: bool + """ + # "<" instead of "<=": since logger.setLevel is inclusive, this should be exclusive + return record.levelno < self.level class Logger: + """Modify built-in logger's format and handlers for Snare and Cloner""" + @staticmethod - def create_logger(debug_filename, err_filename, logger_name): + def create_logger(debug_filename: str, err_filename: str, logger_name: str) -> logging.Logger: + """Create logger with debugging and error level handlers for Snare + + :param debug_filename: Debug log filename + :type debug_filename: str + :param err_filename: Error log filename + :type err_filename: str + :param logger_name: Logger name + :type logger_name: str + :return: Logger with handlers and format set + :rtype: logging.Logger + """ logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) logger.propagate = False @@ -42,7 +66,14 @@ def create_logger(debug_filename, err_filename, logger_name): return logger @staticmethod - def create_clone_logger(log_filename, logger_name): + def create_clone_logger(log_filename: str, logger_name: str) -> None: + """Create logger for Cloner + + :param log_filename: Log filename + :type log_filename: str + :param logger_name: Logger name + :type logger_name: str + """ logger = logging.getLogger(logger_name) formatter = logging.Formatter( fmt="%(asctime)s %(levelname)s:%(name)s:%(funcName)s: %(message)s", diff --git a/snare/utils/page_path_generator.py b/snare/utils/page_path_generator.py index 0f46ed1d..b3e74fd0 100644 --- a/snare/utils/page_path_generator.py +++ b/snare/utils/page_path_generator.py @@ -3,11 +3,25 @@ import string -def directory_generator(size=9, chars=string.ascii_lowercase + string.digits): +def directory_generator(size: int = 9, chars: str = string.ascii_lowercase + string.digits) -> str: + """Generate directory name with given size from given characters + + :param size: Directory name size, defaults to 9 + :type size: int, optional + :param chars: Sample space of characters for directory name, defaults to string.ascii_lowercase+string.digits + :type chars: str, optional + :return: Randomly generated directory name + :rtype: str + """ return "".join(random.choice(chars) for _ in range(size)) -def generate_unique_path(): +def generate_unique_path() -> str: + """Genrate unique absolute path for storing page data + + :return: Unique absolute path + :rtype: str + """ path = "/opt/snare/pages/" + directory_generator() while os.path.exists(path): path = "/opt/snare/pages/" + directory_generator() diff --git a/snare/utils/snare_helpers.py b/snare/utils/snare_helpers.py index c50b5b52..d2b8f10d 100755 --- a/snare/utils/snare_helpers.py +++ b/snare/utils/snare_helpers.py @@ -11,7 +11,10 @@ class VersionManager: - def __init__(self): + """Check Snare-Tanner compatibility""" + + def __init__(self) -> None: + """Constructor method""" self.logger = logging.getLogger(__name__) self.version = "0.3.0" self.version_mapper = { @@ -20,7 +23,13 @@ def __init__(self): "0.3.0": ["0.5.0", "0.6.0"], } - def check_compatibility(self, tanner_version): + def check_compatibility(self, tanner_version: str) -> None: + """Check Snare compatibility with Tanner + + :param tanner_version: Tanner version + :type tanner_version: str + :raises RuntimeError: If Tanner and Snare versions are compatible + """ min_version = self.version_mapper[self.version][0] max_version = self.version_mapper[self.version][1] if not (StrictVersion(min_version) <= StrictVersion(tanner_version) <= StrictVersion(max_version)): @@ -33,11 +42,19 @@ def check_compatibility(self, tanner_version): class Converter: - def __init__(self): + """Convert a website's source files to make it servable by Snare""" + + def __init__(self) -> None: + """Constructor method""" self.logger = logging.getLogger(__name__) self.meta = {} - def convert(self, path): + def convert(self, path: str) -> None: + """Rename all page files to their MD5 hash and populate meta.json with their hash, empty headers and Content-Type + + :param path: Page files storage directory + :type path: str + """ files_to_convert = [] for (dirpath, dirnames, filenames) in walk(path): @@ -52,9 +69,8 @@ def convert(self, path): hash_name = m.hexdigest() self.meta[file_name] = { "hash": hash_name, - "headers": [ - {"Content-Type": mimetypes.guess_type(file_name)[0]}, - ], + "headers": [], + "content_type": mimetypes.guess_type(file_name)[0], } self.logger.debug("Converting the file as %s ", os.path.join(path, hash_name)) shutil.copyfile(fn, os.path.join(path, hash_name)) @@ -64,7 +80,18 @@ def convert(self, path): json.dump(self.meta, mj) -def add_meta_tag(page_dir, index_page, config, base_path): +def add_meta_tag(page_dir: str, index_page: str, config: dict, base_path: str) -> None: + """Add google and bing meta tags to index page + + :param page_dir: Page files storage directory + :type page_dir: str + :param index_page: Index page file name + :type index_page: str + :param config: Configuration settings + :type config: dict + :param base_path: Base path of files + :type base_path: str + """ google_content = config["WEB-TOOLS"]["google"] bing_content = config["WEB-TOOLS"]["bing"] @@ -92,7 +119,14 @@ def add_meta_tag(page_dir, index_page, config, base_path): file.write(html) -def check_meta_file(meta_info): +def check_meta_file(meta_info: dict) -> bool: + """Verify meta info by checking the presence of `hash`, `headers` and `content_type` keys + + :param meta_info: Meta info from meta.json + :type meta_info: dict + :return: True if contents are properly present + :rtype: bool + """ for _, val in meta_info.items(): if "hash" in val and any(header in val for header in ["content_type", "headers"]): continue @@ -103,7 +137,14 @@ def check_meta_file(meta_info): return True -def parse_timeout(timeout): +def parse_timeout(timeout: str) -> int: + """Parse auto-update timeout duration string + + :param timeout: Timeout duration + :type timeout: str + :return: Timeout duration in seconds + :rtype: int + """ timeouts_coeff = {"M": 60, "H": 3600, "D": 86400} form = timeout[-1] @@ -116,7 +157,16 @@ def parse_timeout(timeout): return result -def print_color(msg, mode="INFO", end="\n"): +def print_color(msg: str, mode: str = "INFO", end: str = "\n") -> None: + r"""Color printing + + :param msg: Message to be printed + :type msg: str + :param mode: Mode/level of message, defaults to "INFO" + :type mode: str, optional + :param end: Ending character(s), defaults to "\n" + :type end: str, optional + """ colors = { "INFO": "\033[97m", # white "ERROR": "\033[31m", # red @@ -129,9 +179,13 @@ def print_color(msg, mode="INFO", end="\n"): print(color + str(msg) + "\033[0m", end=end) -def check_privileges(path): - """ - Checks if the user has privileges to the path passed as argument. +def check_privileges(path: str) -> None: + """Create the given directory if it doesn't exist and Check if user has access to it + + :param path: Directory location (will be created if it doesn't exist already) + :type path: str + :raises PermissionError: If directory cannot be created + :raises PermissionError: If directory does not have write access """ if not os.path.exists(path): try: