#!/usr/bin/env python3
#
# targit.py
"""
Archive where the changes to the contents are recorded using `git <https://git-scm.com/>`_.
"""
#
# Copyright © 2020,2022 Dominic Davis-Foster <dominic@davis-foster.co.uk>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.
#
# stdlib
import atexit
import datetime
import getpass
import os
import re
import socket
import tarfile
import time
from typing import Iterator, NamedTuple, Optional
# 3rd party
from domdf_python_tools.doctools import prettify_docstrings
from domdf_python_tools.paths import PathPlus, TemporaryPathPlus
from domdf_python_tools.typing import PathLike
from dulwich.objects import format_timezone
from dulwich.repo import Repo
from filelock import FileLock, Timeout
from typing_extensions import Literal
# this package
from southwark import StagedDict, status
__all__ = [
"BadArchiveError",
"Modes",
"Status",
"TarGit",
"check_archive_paths",
"SaveState",
]
Modes = Literal["r", "w", "a"]
"""
Valid modes for opening :class:`~.TarGit` archives in
* ``'r'`` -- Read only access. The archive must exist.
* ``'w'`` -- Read and write access. The archive must not exist.
* ``'a'`` -- Read and write access to an existing archive.
"""
Status = StagedDict
"""
Represents the dictionary returned by :meth:`TarGit.status() <.TarGit.status>`.
The values are lists of filenames, relative to the TarGit root.
"""
[docs]@prettify_docstrings
class SaveState(NamedTuple):
"""
Represents a save event in a :class:`~.TarGit` archive's history.
"""
# TODO: changed files
#: The SHA id of the underlying commit.
id: str # noqa: A003 # pylint: disable=redefined-builtin
#: The name of the user who made the changes.
user: str
#: The hostname of the device the changes were made on.
device: str
#: The time the changes were saved, in seconds from epoch.
time: float
#: The timezone the changes were made in, as a GMT offset in seconds.
timezone: int
[docs]def check_archive_paths(archive: tarfile.TarFile) -> bool:
"""
Checks the contents of an archive to ensure it does not contain
any filenames with absolute paths or path traversal.
For example, the following paths would raise an :exc:`~.BadArchiveError`:
* ``/usr/bin/malware.sh`` -- this is an absolute path.
* ``~/.local/bin/malware.sh`` -- this tries to put the file in the user's home directory.
* ``../.local/bin/malware.sh`` -- this uses path traversal to try to get to a parent directory.
.. seealso:: The warning for :meth:`tarfile.TarFile.extractall` in the Python documentation.
:param archive:
""" # noqa: D400
for member_name in archive.getnames():
member_name_p = PathPlus(member_name)
if member_name_p.is_absolute() or ".." in member_name_p.parts or member_name.startswith('~'):
raise BadArchiveError
return True
[docs]class BadArchiveError(IOError):
"""
Exception to indicate an archive contains files utilising path traversal.
"""
def __init__(self):
super().__init__("Refusing to extract an archive containing files utilising path traversal.")
[docs]class TarGit(os.PathLike):
"""
A "TarGit" (pronounced "target", /tɑːɡɪt/) is a ``tar.gz`` archive where the changes to the contents are
recorded using `git <https://git-scm.com/>`_.
:param filename: The filename of the archive.
:param mode: The mode to open the file in.
:raises FileNotFoundError: If the file is opened in read or append mode, but it does not exist.
:raises FileExistsError: If the file is opened in write mode, but it already exists.
:raises ValueError: If an unknown value for ``mode`` is given.
""" # noqa: D400
__mode: Modes
__repo: Repo
__lock: Optional[FileLock]
def __init__(self, filename: PathLike, mode: Modes = 'r'):
self.filename = PathPlus(filename)
self.__closed: bool = True
self.__tmpdir: TemporaryPathPlus = TemporaryPathPlus()
self.__tmpdir_p = self.__tmpdir.name
atexit.register(self.__exit_handler)
if mode in {'w', 'a'}:
lock_file = str(self.filename.with_suffix(self.filename.suffix + ".lock"))
self.__lock = FileLock(lock_file, timeout=1)
try:
self.__lock.acquire()
except Timeout:
raise OSError(f"Unable to acquire a lock for the file '{self.filename!s}'")
else:
self.__lock = None
if mode in {'r', 'a'}:
if not self.exists():
raise FileNotFoundError(f"No such TarGit file '{self.filename!s}'")
with tarfile.open(
self.filename,
mode="r:gz",
format=tarfile.PAX_FORMAT,
) as tf:
check_archive_paths(tf)
tf.extractall(path=self.__tmpdir_p)
self.__repo = Repo(self.__tmpdir_p)
self.__mode = mode
self.__closed = False
elif mode in {'w'}:
if self.exists():
raise FileExistsError(f"TarGit file '{self.filename!s}' already exists.")
# Initialise git repo in tmpdir
self.__repo = Repo.init(self.__tmpdir_p)
self.__mode = mode
self.__closed = False
self.__do_commit(message="Empty initial commit.")
else:
raise ValueError(f"Unknown IO mode {mode!r}")
[docs] def save(self) -> bool:
"""
Saves the contents of the archive.
Does nothing if there are no changes to be saved.
:returns: Whether there were any changes to save.
:raises IOError: If the file is closed, or if it was opened in read-only mode.
"""
if self.closed:
raise OSError("IO operation on closed TarGit file.")
elif self.__mode not in {'w', 'a'}:
raise OSError("Cannot write to TarGit file opened in read-only mode.")
current_status = self.status()
if any([
current_status["add"] != [],
current_status["delete"] != [],
current_status["modify"] != [],
]):
# There are changes to commit
message = "; ".join([
f"{len(current_status['add'])} added",
f"{len(current_status['delete'])} deleted",
f"{len(current_status['modify'])} modified",
])
self.__do_commit(message)
with self.filename.open("wb", buffering=False) as fp:
with tarfile.open(
self.filename,
mode="w:gz",
format=tarfile.PAX_FORMAT,
fileobj=fp,
) as tf:
tf.add(str(self.__tmpdir_p), arcname='')
fp.flush()
return True
return False
[docs] def status(self) -> StagedDict:
"""
Returns the status of the TarGit archive.
The values in the dictionary are lists of filenames, relative to the TarGit root.
:raises IOError: If the file is closed.
"""
if self.closed:
raise OSError("IO operation on closed TarGit file.")
elif self.__mode not in {'w', 'a'}:
return {"add": [], "delete": [], "modify": []}
current_status = status(self.__tmpdir_p)
for file in (*current_status.unstaged, *current_status.untracked):
self.__repo.stage(str(file))
return status(self.__tmpdir_p).staged
def __do_commit(self, message: str) -> None:
if self.closed:
raise OSError("IO operation on closed TarGit file.")
elif self.__mode not in {'w', 'a'}:
raise OSError("Cannot write to TarGit file opened in read-only mode.")
login = getpass.getuser()
username = f"{login} <{login}@{socket.gethostname()}>"
current_time = datetime.datetime.now(datetime.timezone.utc).astimezone()
current_timezone = current_time.tzinfo.utcoffset(None).total_seconds() # type: ignore
self.__repo.do_commit(
message=message.encode("UTF-8"),
committer=username.encode("UTF-8"),
author=username.encode("UTF-8"),
commit_timestamp=current_time.timestamp(),
commit_timezone=current_timezone,
)
[docs] def exists(self) -> bool:
"""
Returns whether the :class:`~.TarGit` archive exists.
"""
return self.filename.is_file()
[docs] def close(self) -> None:
"""
Closes the :class:`~.TarGit` archive.
"""
self.__exit_handler()
atexit.unregister(self.__exit_handler)
def __exit_handler(self) -> None:
if self.__tmpdir is not None:
self.__tmpdir.cleanup()
if self.__lock is not None:
self.__lock.release()
self.__closed = True
@property
def closed(self) -> bool:
"""
Returns whether the :class:`~.TarGit` archive is closed.
"""
return self.__closed
@property
def mode(self) -> Modes:
"""
Returns the mode the :class:`~.TarGit` archive was opened in.
This defaults to ``'r'``. After the archive is closed this will show the
last mode until the archive is opened again.
"""
return self.__mode
[docs] def __truediv__(self, filename):
"""
Returns a :class:`~domdf_python_tools.paths.PathPlus` object
representing the given filename relative to the archive root.
:param filename:
""" # noqa: D400
return self.__tmpdir_p / filename
def __del__(self) -> None:
self.close()
[docs] def __repr__(self) -> str:
"""
Returns a string representation of the :class:`~.TarGit`.
"""
return f"{self.__class__.__name__}({self.filename})"
[docs] def __fspath__(self) -> str:
"""
Returns the filename of the :class:`~.TarGit` archive.
"""
return os.fspath(self.filename)
[docs] def __str__(self) -> str:
"""
Returns the filename of the :class:`~.TarGit` archive.
"""
return self.filename.as_posix()
@property
def history(self) -> Iterator[SaveState]:
"""
Returns an iterable over the historic save states of the :class:`~.TarGit`.
:return:
"""
if self.closed:
raise OSError("IO operation on closed TarGit file.")
for entry in self.__repo.get_walker():
# TODO: changed files
author_m = re.match(r".*?\s+<(.*?)@(.*?)>", entry.commit.author.decode("UTF-8"))
if author_m:
user, device = author_m.groups()
else:
user, device = '', ''
yield SaveState(
id=entry.commit.id.decode("UTF-8"),
user=user,
device=device,
time=entry.commit.author_time,
timezone=entry.commit.author_timezone,
)