diff options
Diffstat (limited to 'apply.py')
-rwxr-xr-x | apply.py | 248 |
1 files changed, 0 insertions, 248 deletions
diff --git a/apply.py b/apply.py deleted file mode 100755 index 252f103..0000000 --- a/apply.py +++ /dev/null @@ -1,248 +0,0 @@ -#!/usr/bin/env python3 - -import yaml -import json -import os -from pathlib import Path -import argparse -from enum import Enum -import shutil - - -class Applier: - def __init__(self): - self.links_todo = {} - self.dirs_todo = [] - self.delete_todo = [] - - def create_link(self, target: Path, linkpath: Path): - if linkpath in self.links_todo: - prev = self.links_todo[linkpath] - print(f"Link {linkpath} to {target} replaces previus {prev}") - self.links_todo[linkpath] = target - - def create_dir(self, path: Path): - if path in self.dirs_todo: - return - - self.dirs_todo.append(path) - - def delete(self, path: Path): - if path in self.delete_todo: - return - - self.delete_todo.append(path) - - def apply_dir(self, path: Path, dry_run): - print(f"mkdir {path}") - if not dry_run: - path.mkdir() - - def apply_delete(self, path: Path, dry_run): - if path.is_dir(): - print(f"rmtree {path}!!!") - if not dry_run: - shutil.rmtree(str(path)) - else: - print(f"remove {path}") - if not dry_run: - path.unlink() - - def apply_link(self, linkpath: Path, target: Path, dry_run): - print(f"link {linkpath} -> {target}") - if not dry_run: - linkpath.symlink_to(target) - - def apply(self, dry_run=True): - for d in self.delete_todo: - self.apply_delete(d, dry_run) - - for d in self.dirs_todo: - self.apply_dir(d, dry_run) - - for link, target in self.links_todo.items(): - self.apply_link(link, target, dry_run) - - -def add_or_create(dictio, key, value): - if key in dictio: - if value not in dictio[key]: - dictio[key].append(value) - else: - dictio[key] = [value] - - -class FileState(Enum): - Unused = 1 - Owned = 2 - Used = 3 - links_to_path: Path = Path() - - def can_write(self) -> bool: - return self in [FileState.Unused, FileState.Owned] - - def links_to(self) -> str: - if self is not FileState.Owned: - raise Exception(f"Cannot call location on {self}") - - return self.links_to_path - - @staticmethod - def create_owned(links_to: Path) -> "FileState": - s = FileState.Owned - s.links_to_path = links_to - return s - - -class Resolver: - def __init__(self, applydir, dotdir, override): - self.applydir = Path(applydir) - self.applier = Applier() - self.dotdir = Path(dotdir) - self.override = override - - # Load state - self.statefile = Path("state.json") - if self.statefile.exists(): - with self.statefile.open("r") as f: - self.state = json.load(f) - else: - self.state = {"dirs": {}, "links": {}} - - self.stateclean = True - - def dump_state(self): - with self.statefile.open("w") as f: - json.dump(self.state, f) - - def check_location(self, path: Path) -> FileState: - if not path.exists(): - return FileState.Unused - - if path.is_symlink(): - dest = Path(os.path.realpath(str(path))) - if self.dotdir in dest.parents: - return FileState.create_owned(dest) - - return FileState.Used - - def state_save_link(self, dest, packagename): - self.state["links"][str(dest)] = packagename - - def check_parent(self, path: Path, packagename): - """ - Check if parents exists, and if we created them mark them - with package name - """ - - parent = path.parent - exists = parent.exists() - if (not exists) or parent in self.state["dirs"]: - self.check_parent(parent, packagename) - - # Add to state - add_or_create(self.state["dirs"], str(parent), packagename) - - if not exists: - self.applier.create_dir(parent) - - def do_link(self, package, ppath: Path): - dest = Path(self.applydir, ppath) - dest_state = self.check_location(dest) - - if not self.override and not dest_state.can_write(): - # Check if it's a pointer to the correct location - print(os.readlink(dest)) - raise Exception(f"Destination {ppath} already exists") - - # Save the link in the statefile - self.state_save_link(dest, package) - - self.check_parent(dest, package) - - target_abs = Path.cwd().joinpath(Path(package, ppath)) - if dest_state == FileState.Owned \ - and dest_state.links_to() == target_abs: - return - - if dest_state != FileState.Unused and self.override: - self.applier.delete(dest) - self.applier.create_link(target_abs, dest) - - def do_folder_link(self, package, ppath: Path) -> bool: - self.do_link(package, ppath) - return True - - def apply(self, dry_run=True): - self.applier.apply(dry_run) - - -class DirReader: - def __init__(self, config, resolv): - self.resolv = resolv - self.config = config - - def read_package(self, name): - package_root = name - - # Walk down and link dir og file - for root, dirs, files in os.walk(package_root): - rootrel = os.path.relpath(root, start=package_root) - - # Check if we can just link this folder - if rootrel not in self.config["do_not_link"]: - ok = self.resolv.do_folder_link(name, Path(rootrel)) - - if ok: - # Do not go further down - dirs.clear() - continue - - # Link files - for f in files: - self.resolv.do_link(name, Path(rootrel, f)) - - -def parse_config(path): - config = {} - with open(path, "r") as f: - config = yaml.safe_load(f) - - return config - - -parser = argparse.ArgumentParser() -parser.add_argument("--dot-dir", "-d", default=".", - help="Directory to load dots from") -parser.add_argument("--apply-dir", "-a", default=None, - help="Directory to load dots from") -parser.add_argument("--dry-run", "-n", default=False, - help="Do not make filesystem changes", action="store_true") -parser.add_argument("--override-existing", "-o", default=False, - help="Override existing files,dirs,links", - action="store_true") -parser.add_argument("packages", nargs="*", help="Packages to apply") - -if __name__ == "__main__": - args = parser.parse_args() - - # Just cd to dotdir, so everything can make the assumption that cwd - # is dotdir - os.chdir(args.dot_dir) - - if args.apply_dir is None: - args.apply_dir = ".." - else: - args.apply_dir = str(Path(args.apply_dir).relative_to(args.dot_dir)) - - config = parse_config("config.yaml") - - resolv = Resolver(args.apply_dir, Path.cwd(), args.override_existing) - - reader = DirReader(config, resolv) - for pack in args.packages: - reader.read_package(pack) - - resolv.apply(dry_run=args.dry_run) - if args.dry_run: - resolv.dump_state() |