diff options
Diffstat (limited to 'apply.py')
-rwxr-xr-x | apply.py | 248 |
1 files changed, 248 insertions, 0 deletions
diff --git a/apply.py b/apply.py new file mode 100755 index 0000000..252f103 --- /dev/null +++ b/apply.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 + +import yaml +import json +import os +from pathlib import Path +import argparse +from enum import Enum +import shutil + + +class Applier: + def __init__(self): + self.links_todo = {} + self.dirs_todo = [] + self.delete_todo = [] + + def create_link(self, target: Path, linkpath: Path): + if linkpath in self.links_todo: + prev = self.links_todo[linkpath] + print(f"Link {linkpath} to {target} replaces previus {prev}") + self.links_todo[linkpath] = target + + def create_dir(self, path: Path): + if path in self.dirs_todo: + return + + self.dirs_todo.append(path) + + def delete(self, path: Path): + if path in self.delete_todo: + return + + self.delete_todo.append(path) + + def apply_dir(self, path: Path, dry_run): + print(f"mkdir {path}") + if not dry_run: + path.mkdir() + + def apply_delete(self, path: Path, dry_run): + if path.is_dir(): + print(f"rmtree {path}!!!") + if not dry_run: + shutil.rmtree(str(path)) + else: + print(f"remove {path}") + if not dry_run: + path.unlink() + + def apply_link(self, linkpath: Path, target: Path, dry_run): + print(f"link {linkpath} -> {target}") + if not dry_run: + linkpath.symlink_to(target) + + def apply(self, dry_run=True): + for d in self.delete_todo: + self.apply_delete(d, dry_run) + + for d in self.dirs_todo: + self.apply_dir(d, dry_run) + + for link, target in self.links_todo.items(): + self.apply_link(link, target, dry_run) + + +def add_or_create(dictio, key, value): + if key in dictio: + if value not in dictio[key]: + dictio[key].append(value) + else: + dictio[key] = [value] + + +class FileState(Enum): + Unused = 1 + Owned = 2 + Used = 3 + links_to_path: Path = Path() + + def can_write(self) -> bool: + return self in [FileState.Unused, FileState.Owned] + + def links_to(self) -> str: + if self is not FileState.Owned: + raise Exception(f"Cannot call location on {self}") + + return self.links_to_path + + @staticmethod + def create_owned(links_to: Path) -> "FileState": + s = FileState.Owned + s.links_to_path = links_to + return s + + +class Resolver: + def __init__(self, applydir, dotdir, override): + self.applydir = Path(applydir) + self.applier = Applier() + self.dotdir = Path(dotdir) + self.override = override + + # Load state + self.statefile = Path("state.json") + if self.statefile.exists(): + with self.statefile.open("r") as f: + self.state = json.load(f) + else: + self.state = {"dirs": {}, "links": {}} + + self.stateclean = True + + def dump_state(self): + with self.statefile.open("w") as f: + json.dump(self.state, f) + + def check_location(self, path: Path) -> FileState: + if not path.exists(): + return FileState.Unused + + if path.is_symlink(): + dest = Path(os.path.realpath(str(path))) + if self.dotdir in dest.parents: + return FileState.create_owned(dest) + + return FileState.Used + + def state_save_link(self, dest, packagename): + self.state["links"][str(dest)] = packagename + + def check_parent(self, path: Path, packagename): + """ + Check if parents exists, and if we created them mark them + with package name + """ + + parent = path.parent + exists = parent.exists() + if (not exists) or parent in self.state["dirs"]: + self.check_parent(parent, packagename) + + # Add to state + add_or_create(self.state["dirs"], str(parent), packagename) + + if not exists: + self.applier.create_dir(parent) + + def do_link(self, package, ppath: Path): + dest = Path(self.applydir, ppath) + dest_state = self.check_location(dest) + + if not self.override and not dest_state.can_write(): + # Check if it's a pointer to the correct location + print(os.readlink(dest)) + raise Exception(f"Destination {ppath} already exists") + + # Save the link in the statefile + self.state_save_link(dest, package) + + self.check_parent(dest, package) + + target_abs = Path.cwd().joinpath(Path(package, ppath)) + if dest_state == FileState.Owned \ + and dest_state.links_to() == target_abs: + return + + if dest_state != FileState.Unused and self.override: + self.applier.delete(dest) + self.applier.create_link(target_abs, dest) + + def do_folder_link(self, package, ppath: Path) -> bool: + self.do_link(package, ppath) + return True + + def apply(self, dry_run=True): + self.applier.apply(dry_run) + + +class DirReader: + def __init__(self, config, resolv): + self.resolv = resolv + self.config = config + + def read_package(self, name): + package_root = name + + # Walk down and link dir og file + for root, dirs, files in os.walk(package_root): + rootrel = os.path.relpath(root, start=package_root) + + # Check if we can just link this folder + if rootrel not in self.config["do_not_link"]: + ok = self.resolv.do_folder_link(name, Path(rootrel)) + + if ok: + # Do not go further down + dirs.clear() + continue + + # Link files + for f in files: + self.resolv.do_link(name, Path(rootrel, f)) + + +def parse_config(path): + config = {} + with open(path, "r") as f: + config = yaml.safe_load(f) + + return config + + +parser = argparse.ArgumentParser() +parser.add_argument("--dot-dir", "-d", default=".", + help="Directory to load dots from") +parser.add_argument("--apply-dir", "-a", default=None, + help="Directory to load dots from") +parser.add_argument("--dry-run", "-n", default=False, + help="Do not make filesystem changes", action="store_true") +parser.add_argument("--override-existing", "-o", default=False, + help="Override existing files,dirs,links", + action="store_true") +parser.add_argument("packages", nargs="*", help="Packages to apply") + +if __name__ == "__main__": + args = parser.parse_args() + + # Just cd to dotdir, so everything can make the assumption that cwd + # is dotdir + os.chdir(args.dot_dir) + + if args.apply_dir is None: + args.apply_dir = ".." + else: + args.apply_dir = str(Path(args.apply_dir).relative_to(args.dot_dir)) + + config = parse_config("config.yaml") + + resolv = Resolver(args.apply_dir, Path.cwd(), args.override_existing) + + reader = DirReader(config, resolv) + for pack in args.packages: + reader.read_package(pack) + + resolv.apply(dry_run=args.dry_run) + if args.dry_run: + resolv.dump_state() |