diff options
Diffstat (limited to 'ranger/ext/vcs/git.py')
-rw-r--r-- | ranger/ext/vcs/git.py | 312 |
1 files changed, 154 insertions, 158 deletions
diff --git a/ranger/ext/vcs/git.py b/ranger/ext/vcs/git.py index bba1bfee..86a42502 100644 --- a/ranger/ext/vcs/git.py +++ b/ranger/ext/vcs/git.py @@ -9,187 +9,176 @@ import os import re import shutil from datetime import datetime +import json -from .vcs import Vcs, VcsError - +from ranger.ext.vcs import Vcs, VcsError class Git(Vcs): - vcsname = 'git' + """VCS implementation for Git""" + vcsname = 'git' + _status_combinations = ( + ('MADRC', ' ', 'staged'), + (' MADRC', 'M', 'changed'), + (' MARC', 'D', 'deleted'), + + ('D', 'DU', 'conflict'), + ('A', 'AU', 'conflict'), + ('U', 'ADU', 'conflict'), + + ('?', '?', 'untracked'), + ('!', '!', 'ignored'), + ) # Auxiliar stuff #--------------------------- - def _git(self, path, args, silent=True, catchout=False, bytes=False): - return self._vcs(path, 'git', args, silent=silent, catchout=catchout, bytes=bytes) - + def _git(self, args, path=None, silent=True, catchout=False, bytes=False): + """Call git""" + return self._vcs(path if path else self.path, 'git', args, silent=silent, + catchout=catchout, bytes=bytes) def _has_head(self): """Checks whether repo has head""" try: - self._git(self.path, ['rev-parse', 'HEAD'], silent=True) + self._git(['rev-parse', 'HEAD'], silent=True) except VcsError: return False return True - def _head_ref(self): """Gets HEAD's ref""" - ref = self._git(self.path, ['symbolic-ref', self.HEAD], catchout=True, silent=True) - return ref.strip() or None - + return self._git(['symbolic-ref', self.HEAD], catchout=True, silent=True) or None def _remote_ref(self, ref): """Gets remote ref associated to given ref""" - if ref == None: return None - remote = self._git(self.path, ['for-each-ref', '--format=%(upstream)', ref], catchout=True, silent=True) - return remote.strip() or None - + if ref is None: + return None + return self._git(['for-each-ref', '--format=%(upstream)', ref], + catchout=True, silent=True) \ + or None def _sanitize_rev(self, rev): - if rev == None: return None + """Sanitize revision string""" + if rev is None: + return None return rev.strip() - def _log(self, refspec=None, maxres=None, filelist=None): """Gets a list of dicts containing revision info, for the revisions matching refspec""" - fmt = '--pretty=%h %H%nAuthor: %an <%ae>%nDate: %ct%nSubject: %s%n' - - args = ['--no-pager', 'log', fmt] - if refspec: args = args + ['-1', refspec] - elif maxres: args = args + ['-%d' % maxres] - - if filelist: args = args + ['--'] + filelist - - raw = self._git(self.path, args, catchout=True) - L = re.findall('^\s*(\w*)\s*(\w*)\s*^Author:\s*(.*)\s*^Date:\s*(.*)\s*^Subject:\s*(.*)\s*', raw, re.MULTILINE) + args = [ + '--no-pager', 'log', + '--pretty={"short": "%h", "revid": "%H", "author": "%an", "date": %ct, "summary": "%s"}' + ] + if refspec: + args += ['-1', refspec] + elif maxres: + args += ['-{0:d}'.format(maxres)] + if filelist: + args += ['--'] + filelist log = [] - for t in L: - dt = {} - dt['short'] = t[0].strip() - dt['revid'] = t[1].strip() - dt['author'] = t[2].strip() - m = re.match('\d+(\.\d+)?', t[3].strip()) - dt['date'] = datetime.fromtimestamp(float(m.group(0))) - dt['summary'] = t[4].strip() - log.append(dt) + for line in self._git(args, catchout=True).splitlines(): + line = json.loads(line) + line['date'] = datetime.fromtimestamp(line['date']) + log.append(line) return log - - def _git_file_status(self, status): - """ Translate git status code """ - X, Y = (status[0], status[1]) - - if X in "MADRC" and Y in " " : return 'staged' - elif X in " MADRC" and Y in "M" : return 'changed' - elif X in " MARC" and Y in "D" : return 'deleted' - - elif X in "D" and Y in "DU" : return 'conflict' - elif X in "A" and Y in "AU" : return 'conflict' - elif X in "U" and Y in "ADU" : return 'conflict' - - elif X in "?" and Y in "?" : return 'untracked' - elif X in "!" and Y in "!" : return 'ignored' - - else : return 'unknown' - - + def _git_file_status(self, code): + """Translate git status code""" + for X, Y, status in self._status_combinations: + if code[0] in X and code[1] in Y: + return status + return 'unknown' # Repo creation #--------------------------- def init(self): """Initializes a repo in current path""" - self._git(self.path, ['init']) + self._git(['init']) self.update() - def clone(self, src): """Clones a repo from src""" - name = os.path.basename(self.path) - path = os.path.dirname(self.path) try: os.rmdir(self.path) except OSError: - raise VcsError("Can't clone to %s. It is not an empty directory" % self.path) + raise VcsError("Can't clone to {0:s}: Not an empty directory".format(self.path)) - self._git(path, ['clone', src, name]) + self._git(['clone', src, os.path.basename(self.path)], path=os.path.dirname(self.path)) self.update() - - # Action interface #--------------------------- def commit(self, message): """Commits with a given message""" - self._git(self.path, ['commit', '-m', message]) - + self._git(['commit', '--message', message]) def add(self, filelist=None): """Adds files to the index, preparing for commit""" - if filelist != None: self._git(self.path, ['add', '-A'] + filelist) - else: self._git(self.path, ['add', '-A']) - + if filelist: + self._git(['add', '--all'] + filelist) + else: + self._git(['add', '--all']) def reset(self, filelist=None): """Removes files from the index""" - if filelist != None: self._git(self.path, ['reset'] + filelist) - else: self._git(self.path, ['reset']) - + if filelist: + self._git(['reset'] + filelist) + else: + self._git(['reset']) - def pull(self, br=None): + def pull(self, *args): """Pulls from remote""" - if br: self._git(self.path, ['pull', br]) - else: self._git(self.path, ['pull']) - + self._git(['pull'] + list(args)) - def push(self, br=None): + def push(self, *args): """Pushes to remote""" - if br: self._git(self.path, ['push', br]) - else: self._git(self.path, ['push']) - + self._git(['push'] + list(args)) def checkout(self, rev): """Checks out a branch or revision""" - self._git(self.path, ['checkout', self._sanitize_rev(rev)]) - + self._git(['checkout', self._sanitize_rev(rev)]) def extract_file(self, rev, name, dest): """Extracts a file from a given revision and stores it in dest dir""" if rev == self.INDEX: shutil.copyfile(os.path.join(self.path, name), dest) else: - out = self._git(self.path, ['--no-pager', 'show', '%s:%s' % (self._sanitize_rev(rev), name)], - catchout=True, bytes=True) - with open(dest, 'wb') as fd: fd.write(out) - - + with open(dest, 'wb') as fd: + fd.write( + self._git([ + '--no-pager', 'show', '{0:s}:{1:s}'.format(self._sanitize_rev(rev), name) + ], catchout=True, bytes=True) + ) # Data Interface #--------------------------- def get_status_allfiles(self): - """ Returs a dict (path: status) for paths not in sync. Strips trailing '/' from dirs """ - output = self._git(self.path, ['status', '--porcelain', '-z'], catchout=True, bytes=True)\ - .decode('utf-8').split('\x00')[:-1] - output.reverse() - statuses = [] - while output: - line = output.pop() - statuses.append((line[:2], line[3:])) + """Returs a dict (path: status) for paths not in sync. Strips trailing '/' from dirs""" + statuses = {} + skip = False + for line in self._git(['status', '--porcelain', '-z'], catchout=True, bytes=True)\ + .decode('utf-8').split('\x00')[:-1]: + if skip: + skip = False + continue + statuses[os.path.normpath(line[3:])] = self._git_file_status(line[:2]) if line.startswith('R'): - output.pop() - - return {os.path.normpath(tup[1]): self._git_file_status(tup[0]) for tup in statuses} - + skip = True + return statuses def get_ignore_allfiles(self): - """ Returns a set of all the ignored files in the repo. Strips trailing '/' from dirs. """ - output = self._git(self.path, ['ls-files', '--others', '--directory', '--ignored', '--exclude-standard', '-z'], - catchout=True, bytes=True).decode('utf-8').split('\x00')[:-1] - return set(os.path.normpath(p) for p in output) - + """Returns a set of all the ignored files in the repo. Strips trailing '/' from dirs.""" + return set( + os.path.normpath(p) + for p in self._git( + ['ls-files', '--others', '--directory', '--ignored', '--exclude-standard', '-z'], + catchout=True, bytes=True + ).decode('utf-8').split('\x00')[:-1] + ) def get_remote_status(self): """Checks the status of the repo regarding sync state with remote branch""" @@ -198,18 +187,17 @@ class Git(Vcs): remote = self._remote_ref(head) except VcsError: head = remote = None - - if head and remote: - raw = self._git(self.path, ['rev-list', '--left-right', '%s...%s' % (remote, head)], catchout=True) - ahead = re.search("^>", raw, flags=re.MULTILINE) - behind = re.search("^<", raw, flags=re.MULTILINE) - - if ahead and behind: return "diverged" - elif ahead and not behind: return "ahead" - elif not ahead and behind: return "behind" - elif not ahead and not behind: return "sync" - else: return "none" - + if not head or not remote: + return 'none' + + output = self._git(['rev-list', '--left-right', '{0:s}...{1:s}'.format(remote, head)], + catchout=True) + ahead = re.search("^>", output, flags=re.MULTILINE) + behind = re.search("^<", output, flags=re.MULTILINE) + if ahead: + return 'diverged' if behind else 'ahead' + else: + return 'behind' if behind else 'sync' def get_branch(self): """Returns the current named branch, if this makes sense for the backend. None otherwise""" @@ -217,37 +205,38 @@ class Git(Vcs): head = self._head_ref() except VcsError: head = None + if head is None: + return 'detached' - if head: - m = re.match('refs/heads/([^/]*)', head) - if m: return m.group(1).strip() + match = re.match('refs/heads/([^/]+)', head) + if match: + return match.group(1) else: - return "detached" - - return None - + return None def get_log(self, filelist=None, maxres=None): """Get the entire log for the current HEAD""" - if not self._has_head(): return [] + if not self._has_head(): + return [] return self._log(refspec=None, maxres=maxres, filelist=filelist) - def get_raw_log(self, filelist=None): """Gets the raw log as a string""" - if not self._has_head(): return [] + if not self._has_head(): + return [] args = ['log'] - if filelist: args = args + ['--'] + filelist - return self._git(self.path, args, catchout=True) - + if filelist: + args += ['--'] + filelist + return self._git(args, catchout=True) def get_raw_diff(self, refspec=None, filelist=None): """Gets the raw diff as a string""" args = ['diff'] - if refspec: args = args + [refspec] - if filelist: args = args + ['--'] + filelist - return self._git(self.path, args, catchout=True) - + if refspec: + args += [refspec] + if filelist: + args += ['--'] + filelist + return self._git(args, catchout=True) def get_remote(self): """Returns the url for the remote repo attached to head""" @@ -257,49 +246,56 @@ class Git(Vcs): remote = self._remote_ref(ref) except VcsError: ref = remote = None - - if remote: - m = re.match('refs/remotes/([^/]*)/', remote) - if m: - url = self._git(self.path, ['config', '--get', 'remote.%s.url' % m.group(1)], catchout=True) - return url.strip() or None + if not remote: + return None + + match = re.match('refs/remotes/([^/]+)/', remote) + if match: + return self._git(['config', '--get', 'remote.{0:s}.url'.format(match.group(1))], + catchout=True).strip() \ + or None return None def get_revision_id(self, rev=None): """Get a canonical key for the revision rev""" - if rev == None: rev = self.HEAD - elif rev == self.INDEX: return None + if rev is None: + rev = self.HEAD + elif rev == self.INDEX: + return None rev = self._sanitize_rev(rev) - return self._sanitize_rev(self._git(self.path, ['rev-parse', rev], catchout=True)) - + return self._sanitize_rev(self._git(['rev-parse', rev], catchout=True)) def get_info(self, rev=None): """Gets info about the given revision rev""" - if rev == None: rev = self.HEAD + if rev is None: + rev = self.HEAD rev = self._sanitize_rev(rev) - if rev == self.HEAD and not self._has_head(): return None - - L = self._log(refspec=rev) - if len(L) == 0: - raise VcsError("Revision %s does not exist" % rev) - elif len(L) > 1: - raise VcsError("More than one instance of revision %s ?!?" % rev) + if rev == self.HEAD and not self._has_head(): + return None + + log = self._log(refspec=rev) + if len(log) == 0: + raise VcsError("Revision {0:s} does not exist".format(rev)) + elif len(log) > 1: + raise VcsError("More than one instance of revision {0:s} ?!?".format(rev)) else: - return L[0] - + return log[0] def get_files(self, rev=None): """Gets a list of files in revision rev""" - if rev == None: rev = self.HEAD + if rev is None: + rev = self.HEAD rev = self._sanitize_rev(rev) + if rev is None: + return [] - if rev: - if rev == self.INDEX: raw = self._git(self.path, ["ls-files"], catchout=True) - else: raw = self._git(self.path, ['ls-tree', '--name-only', '-r', rev], catchout=True) - return raw.split('\n') + if rev == self.INDEX: + return self._git(['ls-files', '-z'], + catchout=True, bytes=True).decode('utf-8').split('\x00') else: - return [] + return self._git(['ls-tree', '--name-only', '-r', '-z', rev], + catchout=True, bytes=True).decode('utf-8').split('\x00') # vim: expandtab:shiftwidth=4:tabstop=4:softtabstop=4:textwidth=80 |