diff options
-rw-r--r-- | ranger/ext/vcs/hg.py | 198 |
1 files changed, 111 insertions, 87 deletions
diff --git a/ranger/ext/vcs/hg.py b/ranger/ext/vcs/hg.py index 02d8e684..92ca583a 100644 --- a/ranger/ext/vcs/hg.py +++ b/ranger/ext/vcs/hg.py @@ -4,132 +4,156 @@ """Mercurial module""" import os -import re from datetime import datetime +import json from .vcs import Vcs, VcsError + class Hg(Vcs): """VCS implementation for Mercurial""" HEAD = 'tip' - # Generic - #--------------------------- - - def _hg(self, path, args, silent=True, catchout=False, retbytes=False): - return self._vcs(path, 'hg', args, silent=silent, catchout=catchout, retbytes=retbytes) + _status_translations = ( + ('A', 'staged'), + ('M', 'changed'), + ('R!', 'deleted'), - def _has_head(self): - """Checks whether repo has head""" - rnum = self._hg(self.path, ['-q', 'identify', '--num', '-r', self.HEAD], catchout=True) - return rnum != '-1' + ('?', 'untracked'), + ('I', 'ignored'), + ) - def _sanitize_rev(self, rev): - if rev == None: return None - rev = rev.strip() - if len(rev) == 0: return None - if rev[-1] == '+': rev = rev[:-1] - - try: - if int(rev) == 0: return None - except: - pass + # Generic - return rev + def _hg(self, args, path=None, catchout=True, retbytes=False): + """Run a hg command""" + return self._vcs(['hg'] + args, path or self.path, catchout=catchout, retbytes=retbytes) def _log(self, refspec=None, maxres=None, filelist=None): - fmt = "changeset: {rev}:{node}\ntag: {tags}\nuser: {author}\ndate: {date}\nsummary: {desc}\n" - args = ['log', '--template', fmt] - - if refspec: args = args + ['--limit', '1', '-r', refspec] - elif maxres: args = args + ['--limit', str(maxres)] + args = [ + 'log', '--template', + '\\{' + '\\x00short\\x00:\\x00{rev}\\x00,' + '\\x00revid\\x00:\\x00{node}\\x00,' + '\\x00author\\x00:\\x00{author}\\x00,' + '\\x00date\\x00:\\x00{date}\\x00,' + '\\x00summary\\x00:\\x00{desc}\\x00' + '}\\n' + ] + if refspec: + args += ['--limit', '1', '--rev', refspec] + elif maxres: + args += ['--limit', str(maxres)] + if filelist: + args += ['--'] + filelist - if filelist: args = args + filelist - - raw = self._hg(self.path, args, catchout=True) - L = re.findall('^changeset:\s*([0-9]*):([0-9a-zA-Z]*)\s*$\s*^tag:\s*(.*)\s*$\s*^user:\s*(.*)\s*$\s*^date:\s*(.*)$\s*^summary:\s*(.*)\s*$', raw, re.MULTILINE) + try: + output = self._hg(args)\ + .replace('\\', '\\\\').replace('"', '\\"').replace('\x00', '"').splitlines() + except VcsError: + return None log = [] - for t in L: - dt = {} - dt['short'] = t[0].strip() - dt['revid'] = self._sanitize_rev(t[1].strip()) - dt['author'] = t[3].strip() - m = re.match('\d+(\.\d+)?', t[4].strip()) - dt['date'] = datetime.fromtimestamp(float(m.group(0))) - dt['summary'] = t[5].strip() - log.append(dt) + for line in output: + line = json.loads(line) + line['date'] = datetime.fromtimestamp(float(line['date'].split('-')[0])) + log.append(line) return log - def _hg_file_status(self, st): - if len(st) != 1: raise VcsError("Wrong hg file status string: %s" % st) - if st in "ARM": return 'staged' - elif st in "!": return 'deleted' - elif st in "I": return 'ignored' - elif st in "?": return 'untracked' - elif st in "X": return 'conflict' - elif st in "C": return 'sync' - else: return 'unknown' + def _remote_url(self): + """Remote url""" + try: + return self._hg(['showconfig', 'paths.default']).rstrip('\n') or None + except VcsError: + return None + + def _status_translate(self, code): + """Translate status code""" + for code_x, status in self._status_translations: # pylint: disable=invalid-name + if code in code_x: + return status + return 'unknown' - # Action Interface - #--------------------------- + # Action interface def action_add(self, filelist=None): - if filelist != None: self._hg(self.path, ['addremove'] + filelist) - else: self._hg(self.path, ['addremove']) + args = ['add'] + if filelist: + args += ['--'] + filelist + self._hg(args, catchout=False) def action_reset(self, filelist=None): - if filelist == None: filelist = self.data_status_subpaths().keys() - self._hg(self.path, ['forget'] + filelist) + args = ['forget', '--'] + if filelist: + args += filelist + else: + args += self.rootvcs.status_subpaths.keys() + self._hg(args, catchout=False) + + # Data interface + + def data_status_root(self): + statuses = set() - # Data Interface - #--------------------------- + # Paths with status + for line in self._hg(['status', '--all', '--print0']).rstrip('\x00').split('\x00'): + code = line[0] + if code == 'C': + continue + statuses.add(self._status_translate(code)) + + for status in self.DIRSTATUSES: + if status in statuses: + return status + return 'sync' def data_status_subpaths(self): - raw = self._hg(self.path, ['status'], catchout=True, retbytes=True) - L = re.findall('^(.)\s*(.*?)\s*$', raw.decode('utf-8'), re.MULTILINE) - ret = {} - for st, p in L: - # Detect conflict by the existence of .orig files - if st == '?' and re.match('^.*\.orig\s*$', p): st = 'X' - sta = self._hg_file_status(st) - ret[os.path.normpath(p.strip())] = sta - return ret + statuses = {} + + # Paths with status + for line in self._hg(['status', '--all', '--print0']).rstrip('\x00').split('\x00'): + code, path = line[0], line[2:] + if code == 'C': + continue + statuses[os.path.normpath(path)] = self._status_translate(code) + + return statuses def data_status_remote(self): - if self.get_remote() == None: - return "none" + if self._remote_url() is None: + return 'none' ahead = behind = True try: - self._hg(self.path, ['outgoing'], silent=True) - except: + self._hg(['outgoing'], catchout=False) + except VcsError: ahead = False try: - self._hg(self.path, ['incoming'], silent=True) - except: + self._hg(['incoming'], catchout=False) + except VcsError: behind = False - if ahead and behind: return "diverged" - elif ahead and not behind: return "ahead" - elif not ahead and behind: return "behind" - elif not ahead and not behind: return "sync" + if ahead: + return 'diverged' if behind else 'ahead' + else: + return 'behind' if behind else 'sync' def data_branch(self): - branch = self._hg(self.path, ['branch'], catchout=True) - return branch or None + return self._hg(['branch'], catchout=True).rstrip('\n') or None def data_info(self, rev=None): - if rev == None: rev = self.HEAD - rev = self._sanitize_rev(rev) - if rev == self.HEAD and not self._has_head(): return None - - L = self._log(refspec=rev) - if len(L) == 0: - raise VcsError("Revision %s does not exist" % rev) - elif len(L) > 1: - raise VcsError("More than one instance of revision %s ?!?" % rev) + if rev is None: + rev = self.HEAD + + log = self._log(refspec=rev) + if not log: + if rev == self.HEAD: + return None + else: + raise VcsError('Revision {0:s} does not exist'.format(rev)) + elif len(log) == 1: + return log[0] else: - return L[0] + raise VcsError('More than one instance of revision {0:s}'.format(rev)) |