gpg.py :  » Installer » Zero-Install » zeroinstall-injector-0.47 » zeroinstall » injector » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Installer » Zero Install 
Zero Install » zeroinstall injector 0.47 » zeroinstall » injector » gpg.py
"""
Python interface to GnuPG.

This module is used to invoke GnuPG to check the digital signatures on interfaces.

@see: L{iface_cache.PendingFeed}
"""

# Copyright (C) 2009, Thomas Leonard
# See the README file for details, or visit http://0install.net.

from zeroinstall import _
import subprocess
import base64, re
import os
import tempfile
from logging import info,warn

from zeroinstall.support import find_in_path,basedir
from zeroinstall.injector.trust import trust_db
from zeroinstall.injector.model import SafeException

_gnupg_options = None
def _run_gpg(args, **kwargs):
  global _gnupg_options
  if _gnupg_options is None:
    gpg_path = find_in_path('gpg') or find_in_path('gpg2') or 'gpg'
    _gnupg_options = [gpg_path, '--no-secmem-warning']

    if hasattr(os, 'geteuid') and os.geteuid() == 0 and 'GNUPGHOME' not in os.environ:
      _gnupg_options += ['--homedir', os.path.join(basedir.home, '.gnupg')]
      info(_("Running as root, so setting GnuPG home to %s"), _gnupg_options[-1])

  return subprocess.Popen(_gnupg_options + args, **kwargs)

class Signature(object):
  """Abstract base class for signature check results.
  @ivar status: the raw data returned by GPG
  @ivar messages: any messages printed by GPG which may be relevant to this signature
  """
  status = None
  messages = None

  def __init__(self, status):
    self.status = status

  def is_trusted(self, domain = None):
    """Whether this signature is trusted by the user."""
    return False
  
  def need_key(self):
    """Returns the ID of the key that must be downloaded to check this signature."""
    return None

class ValidSig(Signature):
  """A valid signature check result."""
  FINGERPRINT = 0
  TIMESTAMP = 2

  def __str__(self):
    return "Valid signature from " + self.status[self.FINGERPRINT]
  
  def is_trusted(self, domain = None):
    """Asks the L{trust.trust_db}."""
    return trust_db.is_trusted(self.status[self.FINGERPRINT], domain)
  
  def get_timestamp(self):
    """Get the time this signature was made."""
    return int(self.status[self.TIMESTAMP])

  fingerprint = property(lambda self: self.status[self.FINGERPRINT])

  def get_details(self):
    """Call 'gpg --list-keys' and return the results split into lines and columns.
    @rtype: [[str]]"""
    # Note: GnuPG 2 always uses --fixed-list-mode
    child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys', self.fingerprint], stdout = subprocess.PIPE)
    cout, unused = child.communicate()
    if child.returncode:
      info(_("GPG exited with code %d") % child.returncode)
    details = []
    for line in cout.split('\n'):
      details.append(line.split(':'))
    return details

class BadSig(Signature):
  """A bad signature (doesn't match the message)."""
  KEYID = 0

  def __str__(self):
    return _("BAD signature by %s (the message has been tampered with)") \
      % self.status[self.KEYID]

class ErrSig(Signature):
  """Error while checking a signature."""
  KEYID = 0
  ALG = 1
  RC = -1

  def __str__(self):
    msg = _("ERROR signature by %s: ") % self.status[self.KEYID]
    rc = int(self.status[self.RC])
    if rc == 4:
      msg += _("Unknown or unsupported algorithm '%s'") % self.status[self.ALG]
    elif rc == 9:
      msg += _("Unknown key. Try 'gpg --recv-key %s'") % self.status[self.KEYID]
    else:
      msg += _("Unknown reason code %d") % rc
    return msg

  def need_key(self):
    rc = int(self.status[self.RC])
    if rc == 9:
      return self.status[self.KEYID]
    return None

class Key:
  """A GPG key.
  @since: 0.27
  @param fingerprint: the fingerprint of the key
  @type fingerprint: str
  @ivar name: a short name for the key, extracted from the full name
  @type name: str
  """
  def __init__(self, fingerprint):
    self.fingerprint = fingerprint
    self.name = '(unknown)'
  
  def get_short_name(self):
    return self.name.split(' (', 1)[0].split(' <', 1)[0]

def load_keys(fingerprints):
  """Load a set of keys at once.
  This is much more efficient than making individual calls to L{load_key}.
  @return: a list of loaded keys, indexed by fingerprint
  @rtype: {str: L{Key}}
  @since: 0.27"""
  import codecs

  keys = {}

  # Otherwise GnuPG returns everything...
  if not fingerprints: return keys

  for fp in fingerprints:
    keys[fp] = Key(fp)

  current_fpr = None
  current_uid = None

  child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys',
        '--with-fingerprint', '--with-fingerprint'] + fingerprints, stdout = subprocess.PIPE)
  try:
    for line in child.stdout:
      if line.startswith('pub:'):
        current_fpr = None
        current_uid = None
      if line.startswith('fpr:'):
        current_fpr = line.split(':')[9]
        if current_fpr in keys and current_uid:
          # This is probably a subordinate key, where the fingerprint
          # comes after the uid, not before. Note: we assume the subkey is
          # cross-certified, as recent always ones are.
          try:
            keys[current_fpr].name = codecs.decode(current_uid, 'utf-8')
          except:
            warn("Not UTF-8: %s", current_uid)
            keys[current_fpr].name = current_uid
      if line.startswith('uid:'):
        assert current_fpr is not None
        # Only take primary UID
        if current_uid: continue
        parts = line.split(':')
        current_uid = parts[9]
        if current_fpr in keys:
          keys[current_fpr].name = current_uid
  finally:
    if child.wait():
      warn(_("gpg --list-keys failed with exit code %d") % child.returncode)

  return keys

def load_key(fingerprint):
  """Query gpg for information about this key.
  @return: a new key
  @rtype: L{Key}
  @since: 0.27"""
  return load_keys([fingerprint])[fingerprint]

def import_key(stream):
  """Run C{gpg --import} with this stream as stdin."""
  errors = tempfile.TemporaryFile()

  child = _run_gpg(['--quiet', '--import', '--batch'],
        stdin = stream, stderr = errors)

  status = child.wait()

  errors.seek(0)
  error_messages = errors.read().strip()
  errors.close()

  if error_messages:
    import codecs
    decoder = codecs.lookup('utf-8')
    error_messages = decoder.decode(error_messages, errors = 'replace')[0]

  if status != 0:
    if error_messages:
      raise SafeException(_("Errors from 'gpg --import':\n%s") % error_messages)
    else:
      raise SafeException(_("Non-zero exit code %d from 'gpg --import'") % status)
  elif error_messages:
    warn(_("Warnings from 'gpg --import':\n%s") % error_messages)

def _check_plain_stream(stream):
  data = tempfile.TemporaryFile()  # Python2.2 does not support 'prefix'
  errors = tempfile.TemporaryFile()

  status_r, status_w = os.pipe()

  # Note: Should ideally close status_r in the child, but we want to support Windows too
  child = _run_gpg(['--decrypt',
        # Not all versions support this:
        #'--max-output', str(1024 * 1024),
        '--batch',
        '--status-fd', str(status_w)],
    stdin = stream,
    stdout = data,
    stderr = errors)

  os.close(status_w)

  try:
    sigs = _get_sigs_from_gpg_status_stream(os.fdopen(status_r), child, errors)
  finally:
    data.seek(0)
  return (data, sigs)

def _check_xml_stream(stream):
  xml_comment_start = '<!-- Base64 Signature'

  data_to_check = stream.read()

  last_comment = data_to_check.rfind('\n' + xml_comment_start)
  if last_comment < 0:
    raise SafeException(_("No signature block in XML. Maybe this file isn't signed?"))
  last_comment += 1  # Include new-line in data
  
  data = tempfile.TemporaryFile()
  data.write(data_to_check[:last_comment])
  data.flush()
  os.lseek(data.fileno(), 0, 0)

  errors = tempfile.TemporaryFile()

  sig_lines = data_to_check[last_comment:].split('\n')
  if sig_lines[0].strip() != xml_comment_start:
    raise SafeException(_('Bad signature block: extra data on comment line'))
  while sig_lines and not sig_lines[-1].strip():
    del sig_lines[-1]
  if sig_lines[-1].strip() != '-->':
    raise SafeException(_('Bad signature block: last line is not end-of-comment'))
  sig_data = '\n'.join(sig_lines[1:-1])

  if re.match('^[ A-Za-z0-9+/=\n]+$', sig_data) is None:
    raise SafeException(_("Invalid characters found in base 64 encoded signature"))
  try:
    sig_data = base64.decodestring(sig_data) # (b64decode is Python 2.4)
  except Exception, ex:
    raise SafeException(_("Invalid base 64 encoded signature: %s") % str(ex))

  sig_fd, sig_name = tempfile.mkstemp(prefix = 'injector-sig-')
  try:
    sig_file = os.fdopen(sig_fd, 'w')
    sig_file.write(sig_data)
    sig_file.close()

    # Note: Should ideally close status_r in the child, but we want to support Windows too
    child = _run_gpg([# Not all versions support this:
          #'--max-output', str(1024 * 1024),
          '--batch',
          # Windows GPG can only cope with "1" here
          '--status-fd', '1',
          '--verify', sig_name, '-'],
         stdin = data,
         stdout = subprocess.PIPE,
         stderr = errors)

    try:
      sigs = _get_sigs_from_gpg_status_stream(child.stdout, child, errors)
    finally:
      os.lseek(stream.fileno(), 0, 0)
      stream.seek(0)
  finally:
    os.unlink(sig_name)
  return (stream, sigs)

def check_stream(stream):
  """Pass stream through gpg --decrypt to get the data, the error text,
  and a list of signatures (good or bad). If stream starts with "<?xml "
  then get the signature from a comment at the end instead (and the returned
  data is the original stream). stream must be seekable.
  @note: Stream returned may or may not be the one passed in. Be careful!
  @return: (data_stream, [Signatures])"""

  stream.seek(0)

  start = stream.read(6)
  stream.seek(0)
  if start == "<?xml ":
    return _check_xml_stream(stream)
  elif start == '-----B':
    import warnings
    warnings.warn(_("Plain GPG-signed feeds are deprecated!"), DeprecationWarning, stacklevel = 2)
    os.lseek(stream.fileno(), 0, 0)
    return _check_plain_stream(stream)
  else:
    raise SafeException(_("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s") % repr(stream.read(120)))

def _get_sigs_from_gpg_status_stream(status_r, child, errors):
  """Read messages from status_r and collect signatures from it.
  When done, reap 'child'.
  If there are no signatures, throw SafeException (using errors
  for the error message if non-empty)."""
  sigs = []

  # Should we error out on bad signatures, even if there's a good
  # signature too?

  for line in status_r:
    assert line.endswith('\n')
    assert line.startswith('[GNUPG:] ')
    line = line[9:-1]
    split_line = line.split(' ')
    code = split_line[0]
    args = split_line[1:]
    if code == 'VALIDSIG':
      sigs.append(ValidSig(args))
    elif code == 'BADSIG':
      sigs.append(BadSig(args))
    elif code == 'ERRSIG':
      sigs.append(ErrSig(args))

  status = child.wait()

  errors.seek(0)

  error_messages = errors.read().strip()
  errors.close()

  if not sigs:
    if error_messages:
      raise SafeException(_("No signatures found. Errors from GPG:\n%s") % error_messages)
    else:
      raise SafeException(_("No signatures found. No error messages from GPG."))
  elif error_messages:
    # Attach the warnings to all the signatures, in case they're useful.
    for s in sigs:
      s.messages = error_messages
  
  return sigs
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.