diff options
Diffstat (limited to 'ranger/vcs/vcs.py')
-rw-r--r-- | ranger/vcs/vcs.py | 345 |
1 files changed, 345 insertions, 0 deletions
diff --git a/ranger/vcs/vcs.py b/ranger/vcs/vcs.py new file mode 100644 index 00000000..6aa21784 --- /dev/null +++ b/ranger/vcs/vcs.py @@ -0,0 +1,345 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# vcs - a python module to handle various version control systems +# Copyright 2011, 2012 Abdó Roig-Maranges <abdo.roig@gmail.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import subprocess +from datetime import datetime + + +class VcsError(Exception): + def __init__(self, msg): + super().__init__(msg) + + +class Vcs(object): + """ This class represents a version controlled path, abstracting the usual + operations from the different supported backends. + + The backends are declared in te variable self.repo_types, and are derived + classes from Vcs with the following restrictions: + + * do NOT implement __init__. Vcs takes care of this. + + * do not create change internal state. All internal state should be + handled in Vcs + + Objects from backend classes should never be created directly. Instead + create objects of Vcs class. The initialization calls update, which takes + care of detecting the right Vcs backend to use and dynamically changes the + object type accordingly. + """ + + # These are abstracted revs, representing the current index (staged files), + # the current head and nothing. Every backend should redefine them if the + # version control has a similar concept, or implement _sanitize_rev method to + # clean the rev before using them + INDEX = "INDEX" + HEAD = "HEAD" + NONE = "NONE" + vcsname = None + + # Possible status responses + FILE_STATUS = ['conflict', 'untracked', 'deleted', 'changed', 'staged', 'ignored', 'sync', 'none', 'unknown'] + REMOTE_STATUS = ['none', 'sync', 'behind', 'ahead', 'diverged', 'unknown'] + + + def __init__(self, path, vcstype=None): + # This is a bit hackish, but I need to import here to avoid circular imports + from .git import Git + from .hg import Hg + from .bzr import Bzr + self.repo_types = {'git': Git, 'hg': Hg, 'bzr': Bzr} + + self.path = os.path.expanduser(path) + self.status = {} + self.ignored = set() + self.root = None + + self.update(vcstype=vcstype) + + + # Auxiliar + #--------------------------- + + def _vcs(self, path, cmd, args, silent=False, catchout=False, bytes=False): + """Executes a vcs command""" + with open('/dev/null', 'w') as devnull: + if silent: out=devnull + else: out=None + try: + if catchout: + raw = subprocess.check_output([cmd] + args, stderr=out, cwd=path) + if bytes: return raw + else: return raw.decode('utf-8', errors="ignore").strip() + else: + subprocess.check_call([cmd] + args, stderr=out, stdout=out, cwd=path) + except subprocess.CalledProcessError: + raise VcsError("%s error on %s. Command: %s" % (cmd, path, ' '.join([cmd] + args))) + + + def _path_contains(self, parent, path): + """Checks wether path is an object belonging to the subtree in parent""" + if parent == path: return true + return os.path.commonprefix([parent, path]) == parent + + + # Object manipulation + #--------------------------- + # This may be a little hacky, but very useful. An instance of Vcs class changes its own class + # when calling update(), to match the right repo type. I can have the same object adapt to + # the path repo type, if this ever changes! + + def get_repo_type(self, path): + """Returns the right repo type for path. None if no repo present in path""" + for rn, rt in self.repo_types.items(): + if path and os.path.exists(os.path.join(path, '.%s' % rn)): return rt + return None + + + def get_root(self, path): + """Finds the repository root path. Otherwise returns none""" + curpath = os.path.abspath(path) + while curpath != '/': + if self.get_repo_type(curpath): return curpath + else: curpath = os.path.dirname(curpath) + return None + + + def update(self, vcstype=None): + """Updates the repo instance. Re-checks the repo and changes object class if repo type changes + If vcstype is given, uses that repo type, without autodetection""" + if os.path.exists(self.path): + self.root = self.get_root(self.path) + if vcstype: + if vcstype in self.repo_types: + ty = self.repo_types[vcstype] + else: + raise VcsError("Unrecognized repo type %s" % vcstype) + else: + ty = self.get_repo_type(self.root) + if ty: + self.__class__ = ty + return + + self.__class__ = Vcs + + + # Repo creation + #--------------------------- + + def init(self, repotype): + """Initializes a repo in current path""" + if not repotype in self.repo_types: + raise VcsError("Unrecognized repo type %s" % repotype) + + if not os.path.exists(self.path): os.makedirs(self.path) + rt = self.repo_types[repotype] + try: + self.__class__ = rt + self.init() + except: + self.__class__ = Vcs + raise + + + def clone(self, repotype, src): + """Clones a repo from src""" + if not repotype in self.repo_types: + raise VcsError("Unrecognized repo type %s" % repotype) + + if not os.path.exists(self.path): os.makedirs(self.path) + rt = self.repo_types[repotype] + try: + self.__class__ = rt + self.clone(src) + except: + self.__class__ = Vcs + raise + + + # Action interface + #--------------------------- + + def commit(self, message): + """Commits with a given message""" + raise NotImplementedError + + + def add(self, filelist): + """Adds files to the index, preparing for commit""" + raise NotImplementedError + + + def reset(self, filelist): + """Removes files from the index""" + raise NotImplementedError + + + def pull(self): + """Pulls from remote""" + raise NotImplementedError + + + def push(self): + """Pushes to remote""" + raise NotImplementedError + + + def checkout(self, rev): + """Checks out a branch or revision""" + raise NotImplementedError + + + def extract_file(self, rev, name, dest): + """Extracts a file from a given revision and stores it in dest dir""" + raise NotImplementedError + + + # Data + #--------------------------- + + def is_repo(self): + """Checks wether there is an initialized repo in self.path""" + return self.path and os.path.exists(self.path) and self.root != None + + + def is_tracking(self): + """Checks whether HEAD is tracking a remote repo""" + return self.get_remote(self.HEAD) != None + + + def get_file_status(self, path): + """Returns the status for a given path regarding the repo""" + + # if path is relative, join it with root. otherwise do nothing + path = os.path.join(self.root, path) + + if os.path.commonprefix([self.root, path]) == self.root: + prel = os.path.relpath(path, self.root) + if prel in self.ignored: return "ignored" + if os.path.isdir(path): + sts = set([st for p, st in self.status.items() + if self._path_contains(path, os.path.join(self.root, p))]) | set(['sync']) + for st in self.FILE_STATUS: + if st in sts: return st + else: + if prel in self.status: return self.status[prel] + else: return "sync" + else: + return "none" + + + def get_status(self, path=None): + """Returns a dict with changed files under path and their status. + If path is None, returns all changed files""" + + self.status = self.get_status_allfiles() + self.ignored = self.get_ignore_allfiles() + if path: + path = os.path.join(self.root, path) + if os.path.commonprefix([self.root, path]) == self.root: + return {p: st for p, st in self.status.items() if self._path_contains(path, os.path.join(self.root, p))} + else: + return {} + else: + return self.status + + + def get_status_allfiles(self): + """Returns a dict indexed by files not in sync their status as values. + Paths are given relative to the root.""" + raise NotImplementedError + + + def get_ignore_allfiles(self): + """Returns a set of all the ignored files in the repo""" + raise NotImplementedError + + + def get_remote_status(self): + """Checks the status of the entire repo""" + raise NotImplementedError + + + def get_branch(self): + """Returns the current named branch, if this makes sense for the backend. None otherwise""" + raise NotImplementedError + + + def get_log(self): + """Get the entire log for the current HEAD""" + raise NotImplementedError + + + def get_raw_log(self, filelist=None): + """Gets the raw log as a string""" + raise NotImplementedError + + + def get_raw_diff(self, refspec=None, filelist=None): + """Gets the raw diff as a string""" + raise NotImplementedError + + + def get_remote(self): + """Returns the url for the remote repo attached to head""" + raise NotImplementedError + + + def get_revision_id(self, rev=None): + """Get a canonical key for the revision rev""" + raise NotImplementedError + + + def get_info(self, rev=None): + """Gets info about the given revision rev""" + raise NotImplementedError + + + def get_files(self, rev=None): + """Gets a list of files in revision rev""" + raise NotImplementedError + + + + # I / O + #--------------------------- + + def print_log(self, fmt): + log = self.log() + if fmt == "compact": + for dt in log: + print(self.format_revision_compact(dt)) + else: + raise Exception("Unknown format %s" % fmt) + + + def format_revision_compact(self, dt): + return "{0:<10}{1:<20}{2}".format(dt['revshort'], + dt['date'].strftime('%a %b %d, %Y'), + dt['summary']) + + + def format_revision_text(self, dt): + L = ["revision: %s:%s" % (dt['revshort'], dt['revhash']), + "date: %s" % dt['date'].strftime('%a %b %d, %Y'), + "time: %s" % dt['date'].strftime('%H:%M'), + "user: %s" % dt['author'], + "description: %s" % dt['summary']] + return '\n'.join(L) |