diff --git a/src/mvt/android/cmd_check_androidqf.py b/src/mvt/android/cmd_check_androidqf.py index 54dbc3d47..04c981b70 100644 --- a/src/mvt/android/cmd_check_androidqf.py +++ b/src/mvt/android/cmd_check_androidqf.py @@ -13,7 +13,7 @@ from mvt.android.artifacts.getprop import GetProp from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs -from mvt.android.cmd_check_backup import CmdAndroidCheckBackup +from mvt.android.cmd_check_backup import CmdAndroidCheckBackup, InvalidAndroidBackup from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport from mvt.common.command import Command from mvt.common.indicators import Indicators @@ -240,7 +240,14 @@ def run_backup_cmd(self) -> bool: hashes=self.hashes, sub_command=True, ) - cmd.from_ab(backup) + try: + cmd.from_ab(backup) + except InvalidAndroidBackup as exc: + self.log.warning( + "Skipping backup modules as backup.ab is malformed: %s", exc + ) + return False + cmd.run() self.timeline.extend(cmd.timeline) diff --git a/src/mvt/android/cmd_check_backup.py b/src/mvt/android/cmd_check_backup.py index 1b239f10a..3f91f0758 100644 --- a/src/mvt/android/cmd_check_backup.py +++ b/src/mvt/android/cmd_check_backup.py @@ -27,6 +27,10 @@ log = logging.getLogger(__name__) +class InvalidAndroidBackup(Exception): + pass + + class CmdAndroidCheckBackup(Command): def __init__( self, @@ -68,6 +72,10 @@ def from_ab(self, ab_file_bytes: bytes) -> None: self.__type = "ab" header = parse_ab_header(ab_file_bytes) if not header["backup"]: + if self.sub_command: + raise InvalidAndroidBackup( + "Invalid backup format, file should be in .ab format" + ) log.critical("Invalid backup format, file should be in .ab format") sys.exit(1) @@ -83,12 +91,25 @@ def from_ab(self, ab_file_bytes: bytes) -> None: log.critical("Invalid backup password") sys.exit(1) except AndroidBackupParsingError as exc: + if self.sub_command: + raise InvalidAndroidBackup( + f"Impossible to parse this backup file: {exc}" + ) from exc log.critical("Impossible to parse this backup file: %s", exc) log.critical("Please use Android Backup Extractor (ABE) instead") sys.exit(1) dbytes = io.BytesIO(tardata) - self.__tar = tarfile.open(fileobj=dbytes) + try: + self.__tar = tarfile.open(fileobj=dbytes) + except tarfile.TarError as exc: + if self.sub_command: + raise InvalidAndroidBackup( + f"Impossible to parse this backup file: {exc}" + ) from exc + log.critical("Impossible to parse this backup file: %s", exc) + log.critical("Please use Android Backup Extractor (ABE) instead") + sys.exit(1) for member in self.__tar: self.__files.append(member.name) diff --git a/src/mvt/android/modules/androidqf/sms.py b/src/mvt/android/modules/androidqf/sms.py index bcbd226d5..64aaab6a8 100644 --- a/src/mvt/android/modules/androidqf/sms.py +++ b/src/mvt/android/modules/androidqf/sms.py @@ -58,7 +58,7 @@ def check_indicators(self) -> None: def parse_backup(self, data): header = parse_ab_header(data) if not header["backup"]: - self.log.critical("Invalid backup format, backup.ab was not analysed") + self.log.warning("Invalid backup format, backup.ab was not analysed") return password = None @@ -76,7 +76,7 @@ def parse_backup(self, data): self.log.critical("Invalid backup password") return except AndroidBackupParsingError: - self.log.critical( + self.log.warning( "Impossible to parse this backup file, please use" " Android Backup Extractor instead" ) diff --git a/src/mvt/android/parsers/backup.py b/src/mvt/android/parsers/backup.py index c81ecd0b6..8c9bb5c8f 100644 --- a/src/mvt/android/parsers/backup.py +++ b/src/mvt/android/parsers/backup.py @@ -48,13 +48,16 @@ def parse_ab_header(data): 'encryption': "none", 'version': 4} """ if data.startswith(b"ANDROID BACKUP"): - [_, version, is_compressed, encryption, _] = data.split(b"\n", 4) - return { - "backup": True, - "compression": (is_compressed == b"1"), - "version": int(version), - "encryption": encryption.decode("utf-8"), - } + try: + [_, version, is_compressed, encryption, _] = data.split(b"\n", 4) + return { + "backup": True, + "compression": (is_compressed == b"1"), + "version": int(version), + "encryption": encryption.decode("utf-8"), + } + except (UnicodeDecodeError, ValueError): + pass return {"backup": False, "compression": None, "version": None, "encryption": None} diff --git a/tests/android/test_backup_parser.py b/tests/android/test_backup_parser.py index 5bd5b99ab..6e5be725e 100644 --- a/tests/android/test_backup_parser.py +++ b/tests/android/test_backup_parser.py @@ -5,12 +5,24 @@ import hashlib -from mvt.android.parsers.backup import parse_backup_file, parse_tar_for_sms +from mvt.android.parsers.backup import ( + parse_ab_header, + parse_backup_file, + parse_tar_for_sms, +) from ..utils import get_artifact class TestBackupParsing: + def test_parse_incomplete_header(self): + assert parse_ab_header(b"ANDROID BACKUP\n") == { + "backup": False, + "compression": None, + "version": None, + "encryption": None, + } + def test_parsing_noencryption(self): file = get_artifact("android_backup/backup.ab") with open(file, "rb") as f: diff --git a/tests/test_check_android_androidqf.py b/tests/test_check_android_androidqf.py index c6e422168..c4c341022 100644 --- a/tests/test_check_android_androidqf.py +++ b/tests/test_check_android_androidqf.py @@ -3,7 +3,9 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import os +import shutil from click.testing import CliRunner @@ -68,3 +70,18 @@ def test_check_encrypted_backup_env(self, mocker): assert result.exit_code == 0 del os.environ["MVT_ANDROID_BACKUP_PASSWORD"] settings.__init__() # Reset settings + + def test_check_malformed_backup_skips_backup_modules(self, tmp_path, caplog): + path = tmp_path / "androidqf" + shutil.copytree(os.path.join(get_artifact_folder(), "androidqf"), path) + (path / "backup.ab").write_bytes(b"") + + runner = CliRunner() + with caplog.at_level(logging.WARNING): + result = runner.invoke(check_androidqf, [str(path)]) + + assert result.exit_code == 0 + assert "Skipping backup modules as backup.ab is malformed" in caplog.text + assert not any( + record.levelname in {"CRITICAL", "FATAL"} for record in caplog.records + )