summary refs log blame commit diff stats
path: root/ranger/vcs/vcs.py
blob: 6aa21784a193a2c89940a26f8f1a50d45cbd09e9 (plain) (tree)
























































































































































































































































































































































                                                                                                                        
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# vcs - a python module to handle various version control systems
# Copyright 2011, 2012 Abdó Roig-Maranges <abdo.roig@gmail.com>
#
#   This program is free software: you can redistribute it and/or modify
#   it under the terms of the GNU General Public License as published by
#   the Free Software Foundation, either version 3 of the License, or
#   (at your option) any later version.
#
#   This program is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#   GNU General Public License for more details.
#
#   You should have received a copy of the GNU General Public License
#   along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import subprocess
from datetime import datetime


class VcsError(Exception):
    def __init__(self, msg):
        super().__init__(msg)


class Vcs(object):
    """ This class represents a version controlled path, abstracting the usual
        operations from the different supported backends.

        The backends are declared in te variable self.repo_types, and are derived
        classes from Vcs with the following restrictions:

         * do NOT implement __init__. Vcs takes care of this.

         * do not create change internal state. All internal state should be
           handled in Vcs

        Objects from backend classes should never be created directly. Instead
        create objects of Vcs class. The initialization calls update, which takes
        care of detecting the right Vcs backend to use and dynamically changes the
        object type accordingly.
        """

    # These are abstracted revs, representing the current index (staged files),
    # the current head and nothing. Every backend should redefine them if the
    # version control has a similar concept, or implement _sanitize_rev method to
    # clean the rev before using them
    INDEX    = "INDEX"
    HEAD     = "HEAD"
    NONE     = "NONE"
    vcsname  = None

    # Possible status responses
    FILE_STATUS   = ['conflict', 'untracked', 'deleted', 'changed', 'staged', 'ignored', 'sync', 'none', 'unknown']
    REMOTE_STATUS = ['none', 'sync', 'behind', 'ahead', 'diverged', 'unknown']


    def __init__(self, path, vcstype=None):
        # This is a bit hackish, but I need to import here to avoid circular imports
        from .git import Git
        from .hg  import Hg
        from .bzr import Bzr
        self.repo_types  = {'git': Git, 'hg': Hg, 'bzr': Bzr}

        self.path = os.path.expanduser(path)
        self.status = {}
        self.ignored = set()
        self.root = None

        self.update(vcstype=vcstype)


    # Auxiliar
    #---------------------------

    def _vcs(self, path, cmd, args, silent=False, catchout=False, bytes=False):
        """Executes a vcs command"""
        with open('/dev/null', 'w') as devnull:
            if silent: out=devnull
            else:      out=None
            try:
                if catchout:
                    raw = subprocess.check_output([cmd] + args, stderr=out, cwd=path)
                    if bytes: return raw
                    else:     return raw.decode('utf-8', errors="ignore").strip()
                else:
                    subprocess.check_call([cmd] + args, stderr=out, stdout=out, cwd=path)
            except subprocess.CalledProcessError:
                raise VcsError("%s error on %s. Command: %s" % (cmd, path, ' '.join([cmd] + args)))


    def _path_contains(self, parent, path):
        """Checks wether path is an object belonging to the subtree in parent"""
        if parent == path: return true
        return os.path.commonprefix([parent, path]) == parent


    # Object manipulation
    #---------------------------
    # This may be a little hacky, but very useful. An instance of Vcs class changes its own class
    # when calling update(), to match the right repo type. I can have the same object adapt to
    # the path repo type, if this ever changes!

    def get_repo_type(self, path):
        """Returns the right repo type for path. None if no repo present in path"""
        for rn, rt in self.repo_types.items():
            if path and os.path.exists(os.path.join(path, '.%s' % rn)): return rt
        return None


    def get_root(self, path):
        """Finds the repository root path. Otherwise returns none"""
        curpath = os.path.abspath(path)
        while curpath != '/':
            if self.get_repo_type(curpath): return curpath
            else:                           curpath = os.path.dirname(curpath)
        return None


    def update(self, vcstype=None):
        """Updates the repo instance. Re-checks the repo and changes object class if repo type changes
           If vcstype is given, uses that repo type, without autodetection"""
        if os.path.exists(self.path):
            self.root = self.get_root(self.path)
            if vcstype:
                if vcstype in self.repo_types:
                    ty = self.repo_types[vcstype]
                else:
                    raise VcsError("Unrecognized repo type %s" % vcstype)
            else:
                ty = self.get_repo_type(self.root)
            if ty:
                self.__class__ = ty
                return

        self.__class__ = Vcs


    # Repo creation
    #---------------------------

    def init(self, repotype):
        """Initializes a repo in current path"""
        if not repotype in self.repo_types:
            raise VcsError("Unrecognized repo type %s" % repotype)

        if not os.path.exists(self.path): os.makedirs(self.path)
        rt = self.repo_types[repotype]
        try:
            self.__class__ = rt
            self.init()
        except:
            self.__class__ = Vcs
            raise


    def clone(self, repotype, src):
        """Clones a repo from src"""
        if not repotype in self.repo_types:
            raise VcsError("Unrecognized repo type %s" % repotype)

        if not os.path.exists(self.path): os.makedirs(self.path)
        rt = self.repo_types[repotype]
        try:
            self.__class__ = rt
            self.clone(src)
        except:
            self.__class__ = Vcs
            raise


    # Action interface
    #---------------------------

    def commit(self, message):
        """Commits with a given message"""
        raise NotImplementedError


    def add(self, filelist):
        """Adds files to the index, preparing for commit"""
        raise NotImplementedError


    def reset(self, filelist):
        """Removes files from the index"""
        raise NotImplementedError


    def pull(self):
        """Pulls from remote"""
        raise NotImplementedError


    def push(self):
        """Pushes to remote"""
        raise NotImplementedError


    def checkout(self, rev):
        """Checks out a branch or revision"""
        raise NotImplementedError


    def extract_file(self, rev, name, dest):
        """Extracts a file from a given revision and stores it in dest dir"""
        raise NotImplementedError


    # Data
    #---------------------------

    def is_repo(self):
        """Checks wether there is an initialized repo in self.path"""
        return self.path and os.path.exists(self.path) and self.root != None


    def is_tracking(self):
        """Checks whether HEAD is tracking a remote repo"""
        return self.get_remote(self.HEAD) != None


    def get_file_status(self, path):
        """Returns the status for a given path regarding the repo"""

        # if path is relative, join it with root. otherwise do nothing
        path = os.path.join(self.root, path)

        if os.path.commonprefix([self.root, path]) == self.root:
            prel = os.path.relpath(path, self.root)
            if prel in self.ignored:   return "ignored"
            if os.path.isdir(path):
                sts = set([st for p, st in self.status.items()
                           if self._path_contains(path, os.path.join(self.root, p))]) | set(['sync'])
                for st in self.FILE_STATUS:
                    if st in sts: return st
            else:
                if prel in self.status:  return self.status[prel]
                else:                    return "sync"
        else:
            return "none"


    def get_status(self, path=None):
        """Returns a dict with changed files under path and their status.
           If path is None, returns all changed files"""

        self.status = self.get_status_allfiles()
        self.ignored = self.get_ignore_allfiles()
        if path:
            path = os.path.join(self.root, path)
            if os.path.commonprefix([self.root, path]) == self.root:
                return {p: st for p, st in self.status.items() if self._path_contains(path, os.path.join(self.root, p))}
            else:
                return {}
        else:
            return self.status


    def get_status_allfiles(self):
        """Returns a dict indexed by files not in sync their status as values.
           Paths are given relative to the root."""
        raise NotImplementedError


    def get_ignore_allfiles(self):
        """Returns a set of all the ignored files in the repo"""
        raise NotImplementedError


    def get_remote_status(self):
        """Checks the status of the entire repo"""
        raise NotImplementedError


    def get_branch(self):
        """Returns the current named branch, if this makes sense for the backend. None otherwise"""
        raise NotImplementedError


    def get_log(self):
        """Get the entire log for the current HEAD"""
        raise NotImplementedError


    def get_raw_log(self, filelist=None):
        """Gets the raw log as a string"""
        raise NotImplementedError


    def get_raw_diff(self, refspec=None, filelist=None):
        """Gets the raw diff as a string"""
        raise NotImplementedError


    def get_remote(self):
        """Returns the url for the remote repo attached to head"""
        raise NotImplementedError


    def get_revision_id(self, rev=None):
        """Get a canonical key for the revision rev"""
        raise NotImplementedError


    def get_info(self, rev=None):
        """Gets info about the given revision rev"""
        raise NotImplementedError


    def get_files(self, rev=None):
        """Gets a list of files in revision rev"""
        raise NotImplementedError



    # I / O
    #---------------------------

    def print_log(self, fmt):
        log = self.log()
        if fmt == "compact":
            for dt in log:
                print(self.format_revision_compact(dt))
        else:
            raise Exception("Unknown format %s" % fmt)


    def format_revision_compact(self, dt):
        return "{0:<10}{1:<20}{2}".format(dt['revshort'],
                                          dt['date'].strftime('%a %b %d, %Y'),
                                          dt['summary'])


    def format_revision_text(self, dt):
        L = ["revision:         %s:%s" % (dt['revshort'], dt['revhash']),
             "date:             %s" % dt['date'].strftime('%a %b %d, %Y'),
             "time:             %s" % dt['date'].strftime('%H:%M'),
             "user:             %s" % dt['author'],
             "description:      %s" % dt['summary']]
        return '\n'.join(L)