diff options
Diffstat (limited to 'ranger/ext/vcs/bzr.py')
-rw-r--r-- | ranger/ext/vcs/bzr.py | 335 |
1 files changed, 103 insertions, 232 deletions
diff --git a/ranger/ext/vcs/bzr.py b/ranger/ext/vcs/bzr.py index 2a52cf02..5decc8b1 100644 --- a/ranger/ext/vcs/bzr.py +++ b/ranger/ext/vcs/bzr.py @@ -1,270 +1,141 @@ -# -*- coding: utf-8 -*- # This file is part of ranger, the console file manager. # License: GNU GPL version 3, see the file "AUTHORS" for details. -# Author: Abdó Roig-Maranges <abdo.roig@gmail.com>, 2012 -# -# vcs - a python module to handle various version control systems +"""GNU Bazaar module""" + +from datetime import datetime import os import re -import shutil -from datetime import datetime from .vcs import Vcs, VcsError class Bzr(Vcs): - vcsname = 'bzr' - HEAD="last:1" - - # Auxiliar stuff - #--------------------------- - - def _bzr(self, path, args, silent=True, catchout=False, bytes=False): - return self._vcs(path, 'bzr', args, silent=silent, catchout=catchout, bytes=bytes) - - - def _has_head(self): - """Checks whether repo has head""" - rnum = self._bzr(self.path, ['revno'], catchout=True) - return rnum != '0' - + """VCS implementation for GNU Bazaar""" + HEAD = 'last:1' - def _sanitize_rev(self, rev): - if rev == None: return None - rev = rev.strip() - if len(rev) == 0: return None + _status_translations = ( + ('+ -R', 'K NM', 'staged'), + (' -', 'D', 'deleted'), + ('?', ' ', 'untracked'), + ) - return rev + # Generic + def _remote_url(self): + """Remote url""" + try: + return self._run(['config', 'parent_location']).rstrip('\n') or None + except VcsError: + return None def _log(self, refspec=None, filelist=None): - """Gets a list of dicts containing revision info, for the revisions matching refspec""" - args = ['log', '-n0', '--show-ids'] - if refspec: args = args + ["-r", refspec] + """Returns an array of dicts containing revision info for refspec""" + args = ['log', '--log-format', 'long', '--levels', '0', '--show-ids'] + if refspec: + args += ['--revision', refspec] + if filelist: + args += ['--'] + filelist - if filelist: args = args + filelist - - raw = self._bzr(self.path, args, catchout=True, silent=True) - L = re.findall('-+$(.*?)^-', raw + '\n---', re.MULTILINE | re.DOTALL) + try: + output = self._run(args) + except VcsError: + return None + entries = re.findall(r'-+\n(.+?)\n(?:-|\Z)', output, re.MULTILINE | re.DOTALL) log = [] - for t in L: - t = t.strip() - if len(t) == 0: continue - - dt = {} - m = re.search('^revno:\s*([0-9]+)\s*$', t, re.MULTILINE) - if m: dt['short'] = m.group(1).strip() - m = re.search('^revision-id:\s*(.+)\s*$', t, re.MULTILINE) - if m: dt['revid'] = m.group(1).strip() - m = re.search('^committer:\s*(.+)\s*$', t, re.MULTILINE) - if m: dt['author'] = m.group(1).strip() - m = re.search('^timestamp:\s*(.+)\s*$', t, re.MULTILINE) - if m: dt['date'] = datetime.strptime(m.group(1).strip(), '%a %Y-%m-%d %H:%M:%S %z') - m = re.search('^message:\s*^(.+)$', t, re.MULTILINE) - if m: dt['summary'] = m.group(1).strip() - log.append(dt) + for entry in entries: + new = {} + try: + new['short'] = re.search(r'^revno: ([0-9]+)', entry, re.MULTILINE).group(1) + new['revid'] = re.search(r'^revision-id: (.+)$', entry, re.MULTILINE).group(1) + new['author'] = re.search(r'^committer: (.+)$', entry, re.MULTILINE).group(1) + new['date'] = datetime.strptime( + re.search(r'^timestamp: (.+)$', entry, re.MULTILINE).group(1), + '%a %Y-%m-%d %H:%M:%S %z' + ) + new['summary'] = re.search(r'^message:\n (.+)$', entry, re.MULTILINE).group(1) + except AttributeError: + return None + log.append(new) return log - - def _bzr_file_status(self, st): - st = st.strip() - if st in "AM": return 'staged' - elif st in "D": return 'deleted' - elif st in "?": return 'untracked' - else: return 'unknown' - - - - # Repo creation - #--------------------------- - - def init(self): - """Initializes a repo in current path""" - self._bzr(self.path, ['init']) - self.update() - - - def clone(self, src): - """Clones a repo from src""" - path = os.path.dirname(self.path) - name = os.path.basename(self.path) - try: - os.rmdir(self.path) - except OSError: - raise VcsError("Can't clone to %s. It is not an empty directory" % self.path) - - self._bzr(path, ['branch', src, name]) - self.update() - - + def _status_translate(self, code): + """Translate status code""" + for code_x, code_y, status in self._status_translations: + if code[0] in code_x and code[1] in code_y: + return status + return 'unknown' # Action Interface - #--------------------------- - - def commit(self, message): - """Commits with a given message""" - self._bzr(self.path, ['commit', '-m', message]) - - - def add(self, filelist=None): - """Adds files to the index, preparing for commit""" - if filelist != None: self._bzr(self.path, ['add'] + filelist) - else: self._bzr(self.path, ['add']) - - - def reset(self, filelist=None): - """Removes files from the index""" - if filelist != None: self._bzr(self.path, ['remove', '--keep', '--new'] + filelist) - else: self._bzr(self.path, ['remove', '--keep', '--new']) - - - def pull(self): - """Pulls a git repo""" - self._bzr(self.path, ['pull']) - - - def push(self): - """Pushes a git repo""" - self._bzr(self.path, ['push']) - - - def checkout(self, rev): - """Checks out a branch or revision""" - self._bzr(self.path, ['update', '-r', rev]) - - - def extract_file(self, rev, name, dest): - """Extracts a file from a given revision and stores it in dest dir""" - if rev == self.INDEX: - shutil.copyfile(os.path.join(self.path, name), dest) - else: - out = self._bzr(self.path, ['cat', '--r', rev, name], catchout=True, bytes=True) - with open(dest, 'wb') as fd: fd.write(out) + def action_add(self, filelist=None): + args = ['add'] + if filelist: + args += ['--'] + filelist + self._run(args, catchout=False) + def action_reset(self, filelist=None): + args = ['remove', '--keep', '--new'] + if filelist: + args += ['--'] + filelist + self._run(args, catchout=False) # Data Interface - #--------------------------- - - def get_status_allfiles(self): - """Returns a dict indexed by files not in sync their status as values. - Paths are given relative to the root. Strips trailing '/' from dirs.""" - raw = self._bzr(self.path, ['status', '--short', '--no-classify'], catchout=True, bytes=True) - L = re.findall('^(..)\s*(.*?)\s*$', raw.decode('utf-8'), re.MULTILINE) - ret = {} - for st, p in L: - sta = self._bzr_file_status(st) - ret[os.path.normpath(p.strip())] = sta - return ret - - - def get_ignore_allfiles(self): - """Returns a set of all the ignored files in the repo. Strips trailing '/' from dirs.""" - raw = self._bzr(self.path, ['ls', '--ignored'], catchout=True) - return set(os.path.normpath(p) for p in raw.split('\n')) - - - # TODO: slow due to net access - def get_remote_status(self): - """Checks the status of the repo regarding sync state with remote branch""" - if self.get_remote() == None: - return "none" - - ahead = behind = True - try: - self._bzr(self.path, ['missing', '--mine-only'], silent=True) - except: - ahead = False - - try: - self._bzr(self.path, ['missing', '--theirs-only'], silent=True) - except: - behind = False - if ahead and behind: return "diverged" - elif ahead and not behind: return "ahead" - elif not ahead and behind: return "behind" - elif not ahead and not behind: return "sync" + def data_status_root(self): + statuses = set() + # Paths with status + output = self._run(['status', '--short', '--no-classify']).rstrip('\n') + if not output: + return 'sync' + for line in output.split('\n'): + statuses.add(self._status_translate(line[:2])) - def get_branch(self): - """Returns the current named branch, if this makes sense for the backend. None otherwise""" - branch = self._bzr(self.path, ['nick'], catchout=True) - return branch or None + for status in self.DIRSTATUSES: + if status in statuses: + return status + return 'sync' + def data_status_subpaths(self): + statuses = {} - def get_log(self, filelist=None, maxres=None): - """Get the entire log for the current HEAD""" - if not self._has_head(): return [] - return self._log(refspec=None, filelist=filelist) + # Ignored + output = self._run(['ls', '--null', '--ignored']).rstrip('\x00') + if output: + for path in output.split('\x00'): + statuses[path] = 'ignored' + # Paths with status + output = self._run(['status', '--short', '--no-classify']).rstrip('\n') + for line in output.split('\n'): + statuses[os.path.normpath(line[4:])] = self._status_translate(line[:2]) - def get_raw_log(self, filelist=None): - """Gets the raw log as a string""" - if not self._has_head(): return [] - args = ['log'] - if filelist: args = args + filelist - return self._bzr(self.path, args, catchout=True) + return statuses + def data_status_remote(self): + if not self._remote_url(): + return 'none' + return 'unknown' - def get_raw_diff(self, refspec=None, filelist=None): - """Gets the raw diff as a string""" - args = ['diff', '--git'] - if refspec: args = args + [refspec] - if filelist: args = args + filelist - return self._bzr(self.path, args, catchout=True) - - - def get_remote(self): - """Returns the url for the remote repo attached to head""" - try: - remote = self._bzr(self.path, ['config', 'parent_location'], catchout=True) - except VcsError: - remote = "" - - return remote.strip() or None - - - def get_revision_id(self, rev=None): - """Get a canonical key for the revision rev""" - if rev == None: rev = self.HEAD - elif rev == self.INDEX: return None - rev = self._sanitize_rev(rev) + def data_branch(self): try: - L = self._log(refspec=rev) + return self._run(['nick']).rstrip('\n') or None except VcsError: - L = [] - if len(L) == 0: return None - else: return L[0]['revid'] - - - def get_info(self, rev=None): - """Gets info about the given revision rev""" - if rev == None: rev = self.HEAD - rev = self._sanitize_rev(rev) - if rev == self.HEAD and not self._has_head(): return [] - - L = self._log(refspec=rev) - if len(L) == 0: - raise VcsError("Revision %s does not exist" % rev) - elif len(L) > 1: - raise VcsError("More than one instance of revision %s ?!?" % rev) + return None + + def data_info(self, rev=None): + if rev is None: + rev = self.HEAD + + log = self._log(refspec=rev) + if not log: + if rev == self.HEAD: + return None + else: + raise VcsError('Revision {0:s} does not exist'.format(rev)) + elif len(log) == 1: + return log[0] else: - return L[0] - - - def get_files(self, rev=None): - """Gets a list of files in revision rev""" - if rev == None: rev = self.HEAD - rev = self._sanitize_rev(rev) - - if rev: - if rev == self.INDEX: raw = self._bzr(self.path, ["ls"], catchout=True) - else: raw = self._bzr(self.path, ['ls', '--R', '-V', '-r', rev], catchout=True) - return raw.split('\n') - else: - return [] - -# vim: expandtab:shiftwidth=4:tabstop=4:softtabstop=4:textwidth=80 + raise VcsError('More than one instance of revision {0:s}'.format(rev)) |