1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# vcs - a python module to handle various version control systems
# Copyright 2011, 2012 Abdó Roig-Maranges <abdo.roig@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import subprocess
from datetime import datetime
class VcsError(Exception):
pass
class Vcs(object):
""" This class represents a version controlled path, abstracting the usual
operations from the different supported backends.
The backends are declared in te variable self.repo_types, and are derived
classes from Vcs with the following restrictions:
* do NOT implement __init__. Vcs takes care of this.
* do not create change internal state. All internal state should be
handled in Vcs
Objects from backend classes should never be created directly. Instead
create objects of Vcs class. The initialization calls update, which takes
care of detecting the right Vcs backend to use and dynamically changes the
object type accordingly.
"""
# These are abstracted revs, representing the current index (staged files),
# the current head and nothing. Every backend should redefine them if the
# version control has a similar concept, or implement _sanitize_rev method to
# clean the rev before using them
INDEX = "INDEX"
HEAD = "HEAD"
NONE = "NONE"
vcsname = None
# Possible status responses
FILE_STATUS = ['conflict', 'untracked', 'deleted', 'changed', 'staged', 'ignored', 'sync', 'none', 'unknown']
REMOTE_STATUS = ['none', 'sync', 'behind', 'ahead', 'diverged', 'unknown']
def __init__(self, path, vcstype=None):
# This is a bit hackish, but I need to import here to avoid circular imports
from .git import Git
from .hg import Hg
from .bzr import Bzr
self.repo_types = {'git': Git, 'hg': Hg, 'bzr': Bzr}
self.path = os.path.expanduser(path)
self.status = {}
self.ignored = set()
self.root = None
self.update(vcstype=vcstype)
# Auxiliar
#---------------------------
def _vcs(self, path, cmd, args, silent=False, catchout=False, bytes=False):
"""Executes a vcs command"""
with open('/dev/null', 'w') as devnull:
if silent: out=devnull
else: out=None
try:
if catchout:
raw = subprocess.check_output([cmd] + args, stderr=out, cwd=path)
if bytes: return raw
else: return raw.decode('utf-8', errors="ignore").strip()
else:
subprocess.check_call([cmd] + args, stderr=out, stdout=out, cwd=path)
except subprocess.CalledProcessError:
raise VcsError("%s error on %s. Command: %s" % (cmd, path, ' '.join([cmd] + args)))
def _path_contains(self, parent, path):
"""Checks wether path is an object belonging to the subtree in parent"""
if parent == path: return True
parent = os.path.normpath(parent + '/')
path = os.path.normpath(path)
return os.path.commonprefix([parent, path]) == parent
# Object manipulation
#---------------------------
# This may be a little hacky, but very useful. An instance of Vcs class changes its own class
# when calling update(), to match the right repo type. I can have the same object adapt to
# the path repo type, if this ever changes!
def get_repo_type(self, path):
"""Returns the right repo type for path. None if no repo present in path"""
for rn, rt in self.repo_types.items():
if path and os.path.exists(os.path.join(path, '.%s' % rn)): return rt
return None
def get_root(self, path):
"""Finds the repository root path. Otherwise returns none"""
curpath = os.path.abspath(path)
while curpath != '/':
if self.get_repo_type(curpath): return curpath
else: curpath = os.path.dirname(curpath)
return None
def update(self, vcstype=None):
"""Updates the repo instance. Re-checks the repo and changes object class if repo type changes
If vcstype is given, uses that repo type, without autodetection"""
if os.path.exists(self.path):
self.root = self.get_root(self.path)
if vcstype:
if vcstype in self.repo_types:
ty = self.repo_types[vcstype]
else:
raise VcsError("Unrecognized repo type %s" % vcstype)
else:
ty = self.get_repo_type(self.root)
if ty:
self.__class__ = ty
return
self.__class__ = Vcs
# Repo creation
#---------------------------
def init(self, repotype):
"""Initializes a repo in current path"""
if not repotype in self.repo_types:
raise VcsError("Unrecognized repo type %s" % repotype)
if not os.path.exists(self.path): os.makedirs(self.path)
rt = self.repo_types[repotype]
try:
self.__class__ = rt
self.init()
except:
self.__class__ = Vcs
raise
def clone(self, repotype, src):
"""Clones a repo from src"""
if not repotype in self.repo_types:
raise VcsError("Unrecognized repo type %s" % repotype)
if not os.path.exists(self.path): os.makedirs(self.path)
rt = self.repo_types[repotype]
try:
self.__class__ = rt
self.clone(src)
except:
self.__class__ = Vcs
raise
# Action interface
#---------------------------
def commit(self, message):
"""Commits with a given message"""
raise NotImplementedError
def add(self, filelist):
"""Adds files to the index, preparing for commit"""
raise NotImplementedError
def reset(self, filelist):
"""Removes files from the index"""
raise NotImplementedError
def pull(self):
"""Pulls from remote"""
raise NotImplementedError
def push(self):
"""Pushes to remote"""
raise NotImplementedError
def checkout(self, rev):
"""Checks out a branch or revision"""
raise NotImplementedError
def extract_file(self, rev, name, dest):
"""Extracts a file from a given revision and stores it in dest dir"""
raise NotImplementedError
# Data
#---------------------------
def is_repo(self):
"""Checks wether there is an initialized repo in self.path"""
return self.path and os.path.exists(self.path) and self.root != None
def is_tracking(self):
"""Checks whether HEAD is tracking a remote repo"""
return self.get_remote(self.HEAD) != None
def get_file_status(self, path):
"""Returns the status for a given path regarding the repo"""
# if path is relative, join it with root. otherwise do nothing
path = os.path.join(self.root, path)
# path is not in the repo
if not self._path_contains(self.root, path):
return "none"
# check if prel or some parent of prel is ignored
prel = os.path.relpath(path, self.root)
while len(prel) > 0 and prel != '/' and prel != '.':
if prel in self.ignored: return "ignored"
prel, tail = os.path.split(prel)
# check if prel or some parent of prel is listed in status
prel = os.path.relpath(path, self.root)
while len(prel) > 0 and prel != '/' and prel != '.':
if prel in self.status: return self.status[prel]
prel, tail = os.path.split(prel)
# check if prel is a directory that contains some file in status
prel = os.path.relpath(path, self.root)
if os.path.isdir(path):
sts = set(st for p, st in self.status.items()
if self._path_contains(path, os.path.join(self.root, p)))
for st in self.FILE_STATUS:
if st in sts: return st
# it seems prel is in sync
return "sync"
def get_status(self, path=None):
"""Returns a dict with changed files under path and their status.
If path is None, returns all changed files"""
self.status = self.get_status_allfiles()
self.ignored = self.get_ignore_allfiles()
if path:
path = os.path.join(self.root, path)
if os.path.commonprefix([self.root, path]) == self.root:
return dict((p, st) for p, st in self.status.items() if self._path_contains(path, os.path.join(self.root, p)))
else:
return {}
else:
return self.status
def get_status_allfiles(self):
"""Returns a dict indexed by files not in sync their status as values.
Paths are given relative to the root. Strips trailing '/' from dirs."""
raise NotImplementedError
def get_ignore_allfiles(self):
"""Returns a set of all the ignored files in the repo. Strips trailing '/' from dirs."""
raise NotImplementedError
def get_remote_status(self):
"""Checks the status of the entire repo"""
raise NotImplementedError
def get_branch(self):
"""Returns the current named branch, if this makes sense for the backend. None otherwise"""
raise NotImplementedError
def get_log(self):
"""Get the entire log for the current HEAD"""
raise NotImplementedError
def get_raw_log(self, filelist=None):
"""Gets the raw log as a string"""
raise NotImplementedError
def get_raw_diff(self, refspec=None, filelist=None):
"""Gets the raw diff as a string"""
raise NotImplementedError
def get_remote(self):
"""Returns the url for the remote repo attached to head"""
raise NotImplementedError
def get_revision_id(self, rev=None):
"""Get a canonical key for the revision rev"""
raise NotImplementedError
def get_info(self, rev=None):
"""Gets info about the given revision rev"""
raise NotImplementedError
def get_files(self, rev=None):
"""Gets a list of files in revision rev"""
raise NotImplementedError
# I / O
#---------------------------
def print_log(self, fmt):
log = self.log()
if fmt == "compact":
for dt in log:
print(self.format_revision_compact(dt))
else:
raise Exception("Unknown format %s" % fmt)
def format_revision_compact(self, dt):
return "{0:<10}{1:<20}{2}".format(dt['revshort'],
dt['date'].strftime('%a %b %d, %Y'),
dt['summary'])
def format_revision_text(self, dt):
L = ["revision: %s:%s" % (dt['revshort'], dt['revhash']),
"date: %s" % dt['date'].strftime('%a %b %d, %Y'),
"time: %s" % dt['date'].strftime('%H:%M'),
"user: %s" % dt['author'],
"description: %s" % dt['summary']]
return '\n'.join(L)
|