Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions dfvfs/analyzer/overlay_analyzer_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
"""The Overlay format analyzer helper implementation."""

from dfvfs.analyzer import analyzer
from dfvfs.analyzer import analyzer_helper
from dfvfs.lib import definitions


class OverlayAnalyzerHelper(analyzer_helper.AnalyzerHelper):
"""Overlay analyzer helper."""

FORMAT_CATEGORIES = frozenset([
definitions.FORMAT_CATEGORY_FILE_SYSTEM])

TYPE_INDICATOR = definitions.TYPE_INDICATOR_OVERLAY


analyzer.Analyzer.RegisterHelper(OverlayAnalyzerHelper())
105 changes: 105 additions & 0 deletions dfvfs/file_io/overlay_file_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
"""The Overlay file-like object implementation."""

import os

from dfvfs.file_io import file_io
from dfvfs.lib import errors
from dfvfs.resolver import resolver


class OverlayFile(file_io.FileIO):
"""File input/output (IO) object"""

def __init__(self, resolver_context, path_spec):
"""Initializes a file input/output (IO) object.

Args:
resolver_context (Context): resolver context.
path_spec (PathSpec): a path specification.
"""
super(OverlayFile, self).__init__(resolver_context, path_spec)
self._file_io = None

def _Close(self):
"""Closes the file input/output (IO) object.

Raises:
IOError: if the close failed.
OSError: if the close failed.
"""
self._file_io = None

def _Open(self, mode='rb'):
"""Opens the file input/output (IO) object defined by path specification.

Args:
mode (Optional[str]): file access mode.

Raises:
NotSupported: when path specification has a data stream
PathSpecError: when the parent attribute is None.
"""
data_stream = getattr(self._path_spec, 'data_stream', None)
if data_stream:
raise errors.NotSupported(
'Open data stream: {0:s} not supported.'.format(data_stream))

if not self._path_spec.parent:
raise errors.PathSpecError('Parent of path spec is None.')

self._file_io = resolver.Resolver.OpenFileObject(
self._path_spec.parent, self._resolver_context)

def read(self, size=None):
"""Reads a byte string from the file input/output (IO) object.

The function will read a byte string of the specified size or
all of the remaining data if no size was specified.

Args:
size (Optional[int]): number of bytes to read, where None is all
remaining data.

Returns:
bytes: data read.
"""
return self._file_io.read(size)

def seek(self, offset, whence=os.SEEK_SET):
"""Seeks to an offset within the file input/output (IO) object.

Args:
offset (int): offset to seek.
whence (Optional[int]): value that indicates whether offset is an
absolute or relative position within the file.

Raises:
IOError: if the seek failed.
OSError: if the seek failed.
"""
self._file_io.seek(offset, whence)

def get_offset(self):
"""Retrieves the current offset into the file input/output (IO) object.

Returns:
int: current offset into the file input/output (IO) object.

Raises:
IOError: if the file input/output (IO)-like object has not been opened.
OSError: if the file input/output (IO)-like object has not been opened.
"""
return self._file_io.get_offset()

def get_size(self):
"""Retrieves the size of the file input/output (IO) object.

Returns:
int: size of the file input/output (IO) object.

Raises:
IOError: if the file input/output (IO) object has not been opened.
OSError: if the file input/output (IO) object has not been opened.
"""
return self._file_io.get_size()
2 changes: 2 additions & 0 deletions dfvfs/lib/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
TYPE_INDICATOR_MOUNT = 'MOUNT'
TYPE_INDICATOR_NTFS = 'NTFS'
TYPE_INDICATOR_OS = 'OS'
TYPE_INDICATOR_OVERLAY = 'OVERLAY'
TYPE_INDICATOR_PHDI = 'PHDI'
TYPE_INDICATOR_QCOW = 'QCOW'
TYPE_INDICATOR_RAW = 'RAW'
Expand Down Expand Up @@ -83,6 +84,7 @@
TYPE_INDICATOR_FAT,
TYPE_INDICATOR_HFS,
TYPE_INDICATOR_NTFS,
TYPE_INDICATOR_OVERLAY,
TYPE_INDICATOR_TSK,
TYPE_INDICATOR_XFS])

Expand Down
1 change: 1 addition & 0 deletions dfvfs/path/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dfvfs.path import mount_path_spec
from dfvfs.path import ntfs_path_spec
from dfvfs.path import os_path_spec
from dfvfs.path import overlay_path_spec
from dfvfs.path import path_spec
from dfvfs.path import phdi_path_spec
from dfvfs.path import qcow_path_spec
Expand Down
45 changes: 45 additions & 0 deletions dfvfs/path/overlay_path_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
"""The Overlay path specification implementation."""

from dfvfs.lib import definitions
from dfvfs.path import factory
from dfvfs.path import path_spec


class OverlayPathSpec(path_spec.PathSpec):
"""Overlay path specification.

Attributes:
location (str): location.
"""

TYPE_INDICATOR = definitions.TYPE_INDICATOR_OVERLAY

def __init__(self, location=None, parent=None, **kwargs):
"""Initializes an Overlay path specification.

Args:
location (Optional[str]): location.
parent (Optional[PathSpec]): parent path specification.

Raises:
ValueError: when location and parent are not set.
"""
if not parent and not location:
raise ValueError('Missing location, or parent value.')

super(OverlayPathSpec, self).__init__(parent=parent, **kwargs)
self.location = location

@property
def comparable(self):
"""str: comparable representation of the path specification."""
string_parts = []

if self.location is not None:
string_parts.append('location: {0:s}'.format(self.location))

return self._GetComparable(sub_comparable_string=', '.join(string_parts))


factory.Factory.RegisterPathSpec(OverlayPathSpec)
41 changes: 41 additions & 0 deletions dfvfs/resolver_helpers/overlay_resolver_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
"""The Overlay path specification resolver helper implementation."""

from dfvfs.file_io import overlay_file_io
from dfvfs.lib import definitions
from dfvfs.resolver_helpers import manager
from dfvfs.resolver_helpers import resolver_helper
from dfvfs.vfs import overlay_file_system


class OverlayResolverHelper(resolver_helper.ResolverHelper):
"""Overlay resolver helper."""

TYPE_INDICATOR = definitions.TYPE_INDICATOR_OVERLAY

def NewFileObject(self, resolver_context, path_spec):
"""Creates a new file input/output (IO) object.

Args:
resolver_context (Context): resolver context.
path_spec (PathSpec): a path specification.

Returns:
FileIO: file input/output (IO) object.
"""
return overlay_file_io.OverlayFile(resolver_context, path_spec)

def NewFileSystem(self, resolver_context, path_spec):
"""Creates a new file system object.

Args:
resolver_context (Context): resolver context.
path_spec (PathSpec): a path specification.

Returns:
FileSystem: file system.
"""
return overlay_file_system.OverlayFileSystem(resolver_context, path_spec)


manager.ResolverHelperManager.RegisterHelper(OverlayResolverHelper())
119 changes: 119 additions & 0 deletions dfvfs/vfs/overlay_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
"""The Overlay directory implementation."""

from dfvfs.lib import definitions
from dfvfs.lib import errors
from dfvfs.path import factory
from dfvfs.resolver import resolver
from dfvfs.vfs import directory


class OverlayDirectory(directory.Directory):
"""Overlay directory."""

# pylint: disable=useless-super-delegation
def __init__(self, file_system, path_spec):
"""Initializes an Overlay directory.

Args:
file_system (OverlayFileSystem): file system.
path_spec (OverlayPathSpec): path specification.
"""
super(OverlayDirectory, self).__init__(file_system, path_spec)

def _ReadDirectory(self, location):
"""Enumerates the upper and lower layers of the Overlay filesystem.

Args:
location (str): the location in the Overlay.

Yields:
OverlayPathSpec: Overlay Path Specification.
"""
visited_paths = set([location])
whiteouts = set()

resolver_context = self._file_system._resolver_context # pylint: disable=protected-access

layer_specs = [self._file_system.upper_path_spec]
layer_specs.extend(self._file_system.lower_path_specs)

for index, layer in enumerate(layer_specs):
if location in whiteouts:
continue

layer_filesystem = self._file_system._filesystem_layers[index] # pylint: disable=protected-access
layer_path_spec = factory.Factory.NewPathSpec(
layer.type_indicator, location=layer.location + location,
parent=layer.parent)

if not layer_path_spec:
continue

if not layer_filesystem.FileEntryExistsByPathSpec(layer_path_spec):
continue
layer_file_entry = resolver.Resolver.OpenFileEntry(
layer_path_spec, resolver_context=resolver_context)

for attribute in layer_file_entry.attributes:
if (attribute.name == 'trusted.overlay.opaque' and
attribute.read() == b'y'):
whiteouts.add(location)

if not layer_file_entry.IsDirectory():
continue

layer_directory = layer_file_entry._GetDirectory() # pylint: disable=protected-access

for subentry_spec in layer_directory.entries:
overlay_path = subentry_spec.location[len(layer.location):]
if overlay_path in visited_paths:
continue

if overlay_path in whiteouts:
continue

subentry = resolver.Resolver.OpenFileEntry(subentry_spec)
stat = subentry.GetStatAttribute()

if (stat.type == definitions.FILE_ENTRY_TYPE_CHARACTER_DEVICE and
stat.device_number == (0, 0)):
whiteouts.add(overlay_path)
continue

if stat.type == definitions.FILE_ENTRY_TYPE_DIRECTORY:
for attribute in subentry.attributes:
if (attribute.name == 'trusted.overlay.opaque' and
attribute.read() == b'y'):
whiteouts.add(overlay_path)
break

if overlay_path in visited_paths:
continue
visited_paths.add(overlay_path)

overlay_path_spec = factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_OVERLAY, location=overlay_path,
parent=subentry_spec)
yield overlay_path_spec

def _EntriesGenerator(self):
"""Retrieves directory entries.

Since a directory can contain a vast number of entries using
a generator is more memory efficient.

Yields:
OverlayPathSpec: Overlay path specification.
"""
try:
fsoverlay_file_entry = self._file_system.GetFileEntryByPathSpec(
self.path_spec)
except errors.PathSpecError:
return

if not fsoverlay_file_entry:
return

location = getattr(self.path_spec, 'location', None)
yield from self._ReadDirectory(location)
Loading