Logo Search packages:      
Sourcecode: viewvc version File versions  Download package

__init__.py

# -*-python-*-
#
# Copyright (C) 1999-2006 The ViewCVS Group. All Rights Reserved.
#
# By using this file, you agree to the terms and conditions set forth in
# the LICENSE.html file which can be found at the top level of the ViewVC
# distribution or at http://viewvc.org/license-1.html.
#
# For more information, visit http://viewvc.org/
#
# -----------------------------------------------------------------------

"Version Control lib driver for remotely accessible Subversion repositories."

import vclib
import sys
import os
import string
import re
import tempfile
import popen2
import time
from vclib.svn import Revision, ChangedPath, _datestr_to_date, _compare_paths, _cleanup_path
from svn import core, delta, client, wc, ra


### Require Subversion 1.3.0 or better. (for svn_ra_get_locations support)
if (core.SVN_VER_MAJOR, core.SVN_VER_MINOR, core.SVN_VER_PATCH) < (1, 3, 0):
  raise Exception, "Version requirement not met (needs 1.3.0 or better)"

  
def _rev2optrev(rev):
  assert type(rev) is int
  rt = core.svn_opt_revision_t()
  rt.kind = core.svn_opt_revision_number
  rt.value.number = rev
  return rt


def date_from_rev(svnrepos, rev):
  datestr = ra.svn_ra_rev_prop(svnrepos.ra_session, rev,
                               'svn:date', svnrepos.pool)
  return _datestr_to_date(datestr, svnrepos.pool)


def get_location(svnrepos, path, rev, old_rev):
  try:
    results = ra.get_locations(svnrepos.ra_session, path, rev,
                               [old_rev], svnrepos.pool)
  except core.SubversionException, e:
    if e.apr_err == core.SVN_ERR_FS_NOT_FOUND:
      raise vclib.ItemNotFound(path)
    raise

  try:
    old_path = results[old_rev]
  except KeyError:
    raise vclib.ItemNotFound(path)

  return _cleanup_path(old_path)


def last_rev(svnrepos, path, peg_revision, limit_revision=None):
  """Given PATH, known to exist in PEG_REVISION, find the youngest
  revision older than, or equal to, LIMIT_REVISION in which path
  exists.  Return that revision, and the path at which PATH exists in
  that revision."""
  
  # Here's the plan, man.  In the trivial case (where PEG_REVISION is
  # the same as LIMIT_REVISION), this is a no-brainer.  If
  # LIMIT_REVISION is older than PEG_REVISION, we can use Subversion's
  # history tracing code to find the right location.  If, however,
  # LIMIT_REVISION is younger than PEG_REVISION, we suffer from
  # Subversion's lack of forward history searching.  Our workaround,
  # ugly as it may be, involves a binary search through the revisions
  # between PEG_REVISION and LIMIT_REVISION to find our last live
  # revision.
  peg_revision = svnrepos._getrev(peg_revision)
  limit_revision = svnrepos._getrev(limit_revision)
  if peg_revision == limit_revision:
    return peg_revision, path
  elif peg_revision > limit_revision:
    path = get_location(svnrepos, path, peg_revision, limit_revision)
    return limit_revision, path
  else:
    ### Warning: this is *not* an example of good pool usage.
    direction = 1
    while peg_revision != limit_revision:
      mid = (peg_revision + 1 + limit_revision) / 2
      try:
        path = get_location(svnrepos, path, peg_revision, mid)
      except vclib.ItemNotFound:
        limit_revision = mid - 1
      else:
        peg_revision = mid
    return peg_revision, path


def created_rev(svnrepos, full_name, rev):
  kind = ra.svn_ra_check_path(svnrepos.ra_session, full_name, rev,
                              svnrepos.pool)
  if kind == core.svn_node_dir:
    props = ra.svn_ra_get_dir(svnrepos.ra_session, full_name,
                              rev, svnrepos.pool)
    return int(props[core.SVN_PROP_ENTRY_COMMITTED_REV])
  return core.SVN_INVALID_REVNUM


class LastHistoryCollector:
  def __init__(self):
    self.has_history = 0

  def add_history(self, paths, revision, author, date, message, pool):
    if not self.has_history:
      self.has_history = 1
      self.revision = revision
      self.author = author
      self.date = date
      self.message = message
      self.changes = []

      if not paths:
        return
      changed_paths = paths.keys()
      changed_paths.sort(lambda a, b: _compare_paths(a, b))
      action_map = { 'D' : 'deleted',
                     'A' : 'added',
                     'R' : 'replaced',
                     'M' : 'modified',
                     }
      for changed_path in changed_paths:
        change = paths[changed_path]
        action = action_map.get(change.action, 'modified')
        ### Wrong, diddily wrong wrong wrong.  Can you say,
        ### "Manufacturing data left and right because it hurts to
        ### figure out the right stuff?"
        if change.copyfrom_path and change.copyfrom_rev:
          self.changes.append(ChangedPath(changed_path[1:], None, 0, 0,
                                          change.copyfrom_path,
                                          change.copyfrom_rev, action, 1))
        else:
          self.changes.append(ChangedPath(changed_path[1:], None, 0, 0,
                                          changed_path[1:], 0, action, 0))

  def get_history(self):
    if not self.has_history:
      return None, None, None, None, None
    return self.revision, self.author, self.date, self.message, self.changes


def _get_rev_details(svnrepos, rev, pool):
  lhc = LastHistoryCollector()
  client.svn_client_log([svnrepos.rootpath],
                        _rev2optrev(rev), _rev2optrev(rev),
                        1, 0, lhc.add_history, svnrepos.ctx, pool)
  return lhc.get_history()

  
def get_revision_info(svnrepos, rev):
  rev, author, date, log, changes = \
       _get_rev_details(svnrepos, rev, svnrepos.pool)
  return _datestr_to_date(date, svnrepos.pool), author, log, changes


class LogCollector:
  def __init__(self, path, show_all_logs):
    # This class uses leading slashes for paths internally
    if not path:
      self.path = '/'
    else:
      self.path = path[0] == '/' and path or '/' + path
    self.logs = []
    self.show_all_logs = show_all_logs
    
  def add_log(self, paths, revision, author, date, message, pool):
    # Changed paths have leading slashes
    changed_paths = paths.keys()
    changed_paths.sort(lambda a, b: _compare_paths(a, b))
    this_path = None
    if self.path in changed_paths:
      this_path = self.path
      change = paths[self.path]
      if change.copyfrom_path:
        this_path = change.copyfrom_path
    for changed_path in changed_paths:
      if changed_path != self.path:
        # If a parent of our path was copied, our "next previous"
        # (huh?) path will exist elsewhere (under the copy source).
        if (string.rfind(self.path, changed_path) == 0) and \
               self.path[len(changed_path)] == '/':
          change = paths[changed_path]
          if change.copyfrom_path:
            this_path = change.copyfrom_path + self.path[len(changed_path):]
    if self.show_all_logs or this_path:
      date = _datestr_to_date(date, pool)
      entry = Revision(revision, date, author, message, None,
                       self.path[1:], None, None)
      self.logs.append(entry)
    if this_path:
      self.path = this_path
    

def get_logs(svnrepos, full_name, rev, files):
  dirents = svnrepos._get_dirents(full_name, rev)
  subpool = core.svn_pool_create(svnrepos.pool)
  rev_info_cache = { }
  for file in files:
    core.svn_pool_clear(subpool)
    entry = dirents[file.name]
    if rev_info_cache.has_key(entry.created_rev):
      rev, author, date, log = rev_info_cache[entry.created_rev]
    else:
      ### i think this needs some get_last_history action to be accurate
      rev, author, date, log, changes = \
           _get_rev_details(svnrepos, entry.created_rev, subpool)
      rev_info_cache[entry.created_rev] = rev, author, date, log
    file.rev = rev
    file.author = author
    file.date = _datestr_to_date(date, subpool)
    file.log = log
    file.size = entry.size
  core.svn_pool_destroy(subpool)    

def get_youngest_revision(svnrepos):
  return svnrepos.youngest

def temp_checkout(svnrepos, path, rev, pool):
  """Check out file revision to temporary file"""
  temp = tempfile.mktemp()
  stream = core.svn_stream_from_aprfile(temp, pool)
  url = svnrepos.rootpath + (path and '/' + path)
  client.svn_client_cat(core.Stream(stream), url, _rev2optrev(rev),
                        svnrepos.ctx, pool)
  core.svn_stream_close(stream)
  return temp

class SelfCleanFP:
  def __init__(self, path):
    self._fp = open(path, 'r')
    self._path = path
    self._eof = 0
    
  def read(self, len):
    if len:
      chunk = self._fp.read(len)
    else:
      chunk = self._fp.read()
    if chunk == '':
      self._eof = 1
    return chunk
  
  def readline(self):
    chunk = self._fp.readline()
    if chunk == '':
      self._eof = 1
    return chunk

  def close(self):
    self._fp.close()
    os.remove(self._path)

  def __del__(self):
    self.close()
    
  def eof(self):
    return self._eof


class SubversionRepository(vclib.Repository):
  def __init__(self, name, rootpath):
    # Init the client app
    core.apr_initialize()
    pool = core.svn_pool_create(None)
    core.svn_config_ensure(None, pool)

    # Start populating our members
    self.pool = pool
    self.name = name
    self.rootpath = rootpath

    # Setup the client context baton, complete with non-prompting authstuffs.
    ctx = client.svn_client_ctx_t()
    providers = []
    providers.append(client.svn_client_get_simple_provider(pool))
    providers.append(client.svn_client_get_username_provider(pool))
    providers.append(client.svn_client_get_ssl_server_trust_file_provider(pool))
    providers.append(client.svn_client_get_ssl_client_cert_file_provider(pool))
    providers.append(client.svn_client_get_ssl_client_cert_pw_file_provider(pool))
    ctx.auth_baton = core.svn_auth_open(providers, pool)
    ctx.config = core.svn_config_get_config(None, pool)
    self.ctx = ctx

    ra_callbacks = ra.svn_ra_callbacks_t()
    ra_callbacks.auth_baton = ctx.auth_baton
    self.ra_session = ra.svn_ra_open(self.rootpath, ra_callbacks, None,
                                     ctx.config, pool)
    self.youngest = ra.svn_ra_get_latest_revnum(self.ra_session, pool)
    self._dirent_cache = { }

  def __del__(self):
    core.svn_pool_destroy(self.pool)
    core.apr_terminate()
    
  def itemtype(self, path_parts, rev):
    path = self._getpath(path_parts[:-1])
    rev = self._getrev(rev)
    if not len(path_parts):
      return vclib.DIR
    dirents = self._get_dirents(path, rev)
    try:
      entry = dirents[path_parts[-1]]
      if entry.kind == core.svn_node_dir:
        return vclib.DIR
      if entry.kind == core.svn_node_file:
        return vclib.FILE
    except KeyError:
      raise vclib.ItemNotFound(path_parts)

  def openfile(self, path_parts, rev):
    rev = self._getrev(rev)
    url = self.rootpath
    if len(path_parts):
      url = self.rootpath + '/' + self._getpath(path_parts)
    tmp_file = tempfile.mktemp()
    stream = core.svn_stream_from_aprfile(tmp_file, self.pool)
    ### rev here should be the last history revision of the URL
    client.svn_client_cat(core.Stream(stream), url,
                          _rev2optrev(rev), self.ctx, self.pool)
    core.svn_stream_close(stream)
    return SelfCleanFP(tmp_file), rev

  def listdir(self, path_parts, rev, options):
    path = self._getpath(path_parts)
    rev = self._getrev(rev)
    entries = [ ]
    dirents = self._get_dirents(path, rev)
    for name in dirents.keys():
      entry = dirents[name]
      if entry.kind == core.svn_node_dir:
        kind = vclib.DIR
      elif entry.kind == core.svn_node_file:
        kind = vclib.FILE
      entries.append(vclib.DirEntry(name, kind))
    return entries

  def dirlogs(self, path_parts, rev, entries, options):
    get_logs(self, self._getpath(path_parts), self._getrev(rev), entries)

  def itemlog(self, path_parts, rev, options):
    full_name = self._getpath(path_parts)
    rev = self._getrev(rev)

    # It's okay if we're told to not show all logs on a file -- all
    # the revisions should match correctly anyway.
    lc = LogCollector(full_name, options.get('svn_show_all_dir_logs', 0))
    dir_url = self.rootpath
    if full_name:
      dir_url = dir_url + '/' + full_name

    cross_copies = options.get('svn_cross_copies', 0)
    client.svn_client_log([dir_url], _rev2optrev(rev), _rev2optrev(1),
                          1, not cross_copies, lc.add_log,
                          self.ctx, self.pool)
    revs = lc.logs
    revs.sort()
    prev = None
    for rev in revs:
      rev.prev = prev
      prev = rev

    return revs

  def annotate(self, path_parts, rev):
    path = self._getpath(path_parts)
    rev = self._getrev(rev)
    url = self.rootpath + (path and '/' + path)

    blame_data = []

    def _blame_cb(line_no, revision, author, date,
                  line, pool, blame_data=blame_data):
      prev_rev = None
      if revision > 1:
        prev_rev = revision - 1
      blame_data.append(_item(text=line, line_number=line_no+1,
                              rev=revision, prev_rev=prev_rev,
                              author=author, date=None))
      
    client.svn_client_blame(url, _rev2optrev(1), _rev2optrev(rev),
                            _blame_cb, self.ctx, self.pool)

    return blame_data, rev
    
  def rawdiff(self, path_parts1, rev1, path_parts2, rev2, type, options={}):
    p1 = self._getpath(path_parts1)
    p2 = self._getpath(path_parts2)
    r1 = self._getrev(rev1)
    r2 = self._getrev(rev2)
    args = vclib._diff_args(type, options)

    try:
      temp1 = temp_checkout(self, p1, r1, self.pool)
      temp2 = temp_checkout(self, p2, r2, self.pool)
      info1 = p1, date_from_rev(self, r1), r1
      info2 = p2, date_from_rev(self, r2), r2
      return vclib._diff_fp(temp1, temp2, info1, info2, args)
    except vclib.svn.core.SubversionException, e:
      if e.apr_err == vclib.svn.core.SVN_ERR_FS_NOT_FOUND:
        raise vclib.InvalidRevision
      raise

  def _getpath(self, path_parts):
    return string.join(path_parts, '/')

  def _getrev(self, rev):
    if rev is None or rev == 'HEAD':
      return self.youngest
    try:
      rev = int(rev)
    except ValueError:
      raise vclib.InvalidRevision(rev)
    if (rev < 0) or (rev > self.youngest):
      raise vclib.InvalidRevision(rev)
    return rev

  def _get_dirents(self, path, rev):
    if path:
      key = str(rev) + '/' + path
      dir_url = self.rootpath + '/' + path
    else:
      key = str(rev)
      dir_url = self.rootpath
    dirents = self._dirent_cache.get(key)
    if dirents:
      return dirents
    dirents = client.svn_client_ls(dir_url, _rev2optrev(rev), 0,
                                   self.ctx, self.pool)
    self._dirent_cache[key] = dirents
    return dirents
    
    
class _item:
  def __init__(self, **kw):
    vars(self).update(kw)

Generated by  Doxygen 1.6.0   Back to index