sat.py :  » Installer » Zero-Install » zeroinstall-injector-0.47 » zeroinstall » injector » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Installer » Zero Install 
Zero Install » zeroinstall injector 0.47 » zeroinstall » injector » sat.py
# Copyright (C) 2010, Thomas Leonard
# See the README file for details, or visit http://0install.net.

# The design of this solver is very heavily based on the one described in
# the MiniSat paper "An Extensible SAT-solver [extended version 1.2]"
# http://minisat.se/Papers.html
#
# The main differences are:
#
# - We care about which solution we find (not just "satisfiable" or "not").
# - We take care to be deterministic (always select the same versions given
#   the same input). We do not do random restarts, etc.
# - We add an AtMostOneClause (the paper suggests this in the Excercises, and
#   it's very useful for our purposes).

import tempfile, subprocess, os, sys
from logging import warn

def debug(msg, *args):
  return
  #print "SAT:", msg % args

# variables are numbered from 0
# literals have the same number as the corresponding variable,
# except they for negatives they are (-1-v):
#
# Variable     Literal     not(Literal)
# 0         0     -1
# 1         1     -2
def neg(lit):
  return -1 - lit

def watch_index(lit):
  if lit >= 0:
    return lit * 2
  return neg(lit) * 2 + 1

def makeAtMostOneClause(solver):
  class AtMostOneClause:
    def __init__(self, lits):
      """Preferred literals come first."""
      self.lits = lits

      # The single literal from our set that is True.
      # We store this explicitly because the decider needs to know quickly.
      self.current = None

    def propagate(self, lit):
      # Re-add ourselves to the watch list.
      # (we we won't get any more notifications unless we backtrack,
      # in which case we'd need to get back on the list anyway)
      solver.watch_lit(lit, self)

      # value[lit] has just become True
      assert solver.lit_value(lit) == True
      assert lit >= 0

      #debug("%s: noticed %s has become True" % (self, solver.name_lit(lit)))

      # If we already propagated successfully when the first
      # one was set then we set all the others to False and
      # anyone trying to set one True will get rejected. And
      # if we didn't propagate yet, current will still be
      # None, even if we now have a conflict (which we'll
      # detect below).
      assert self.current is None

      self.current = lit

      # If we later backtrace, call our undo function to unset current
      solver.get_varinfo_for_lit(lit).undo.append(self)

      count = 0
      for l in self.lits:
        value = solver.lit_value(l)
        #debug("Value of %s is %s" % (solver.name_lit(l), value))
        if value is True and l is not lit:
          # Due to queuing, we might get called with current = None
          # and two versions already selected.
          debug("CONFLICT: already selected %s" % solver.name_lit(l))
          return False
        if value is None:
          # Since one of our lits is already true, all unknown ones
          # can be set to False.
          if not solver.enqueue(neg(l), self):
            debug("CONFLICT: enqueue failed for %s", solver.name_lit(neg(l)))
            return False  # Conflict; abort

      return True

    def undo(self, lit):
      debug("(backtracking: no longer selected %s)" % solver.name_lit(lit))
      assert lit == self.current
      self.current = None

    # Why is lit True?
    # Or, why are we causing a conflict (if lit is None)?
    def cacl_reason(self, lit):
      if lit is None:
        # Find two True literals
        trues = []
        for l in self.lits:
          if solver.lit_value(l) is True:
            trues.append(l)
            if len(trues) == 2: return trues
      else:
        for l in self.lits:
          if l is not lit and solver.lit_value(l) is True:
            return [l]
        # Find one True literal
      assert 0  # don't know why!

    def best_undecided(self):
      debug("best_undecided: %s" % (solver.name_lits(self.lits)))
      for lit in self.lits:
        #debug("%s = %s" % (solver.name_lit(lit), solver.lit_value(lit)))
        if solver.lit_value(lit) is None:
          return lit
      return None

    def __repr__(self):
      return "<lone: %s>" % (', '.join(solver.name_lits(self.lits)))

  return AtMostOneClause

def makeUnionClause(solver):
  class UnionClause:
    def __init__(self, lits):
      self.lits = lits
    
    # Try to infer new facts.
    # We can do this only when all of our literals are False except one,
    # which is undecided. That is,
    #   False... or X or False... = True  =>  X = True
    #
    # To get notified when this happens, we tell the solver to
    # watch two of our undecided literals. Watching two undecided
    # literals is sufficient. When one changes we check the state
    # again. If we still have two or more undecided then we switch
    # to watching them, otherwise we propagate.
    #
    # Returns False on conflict.
    def propagate(self, lit):
      # value[get(lit)] has just become False

      #debug("%s: noticed %s has become False" % (self, solver.name_lit(neg(lit))))

      # For simplicity, only handle the case where self.lits[1]
      # is the one that just got set to False, so that:
      # - value[lits[0]] = None | True
      # - value[lits[1]] = False
      # If it's the other way around, just swap them before we start.
      if self.lits[0] == neg(lit):
        self.lits[0], self.lits[1] = self.lits[1], self.lits[0]

      if solver.lit_value(self.lits[0]) == True:
        # We're already satisfied. Do nothing.
        solver.watch_lit(lit, self)
        return True

      assert solver.lit_value(self.lits[1]) == False

      # Find a new literal to watch now that lits[1] is resolved,
      # swap it with lits[1], and start watching it.
      for i in range(2, len(self.lits)):
        value = solver.lit_value(self.lits[i])
        if value != False:
          # Could be None or True. If it's True then we've already done our job,
          # so this means we don't get notified unless we backtrack, which is fine.
          self.lits[1], self.lits[i] = self.lits[i], self.lits[1]
          solver.watch_lit(neg(self.lits[1]), self)
          return True

      # Only lits[0], is now undefined.
      solver.watch_lit(lit, self)
      return solver.enqueue(self.lits[0], self)

    def undo(self, lit): pass

    # Why is lit True?
    # Or, why are we causing a conflict (if lit is None)?
    def cacl_reason(self, lit):
      assert lit is None or lit is self.lits[0]

      # The cause is everything except lit.
      return [neg(l) for l in self.lits if l is not lit]

    def __repr__(self):
      return "<some: %s>" % (', '.join(solver.name_lits(self.lits)))
  return UnionClause

# Using an array of VarInfo objects is less efficient than using multiple arrays, but
# easier for me to understand.
class VarInfo(object):
  __slots__ = ['value', 'reason', 'level', 'undo', 'obj']
  def __init__(self, obj):
    self.value = None    # True/False/None
    self.reason = None    # The constraint that implied our value, if True or False
    self.level = -1      # The decision level at which we got a value (when not None)
    self.undo = []      # Constraints to update if we become unbound (by backtracking)
    self.obj = obj      # The object this corresponds to (for our caller and for debugging)
  
  def __repr__(self):
    return '%s=%s' % (self.name, self.value)

  @property
  def name(self):
    return str(self.obj)

class Solver(object):
  def __init__(self):
    # Propagation
    self.watches = []    # watches[2i,2i+1] = constraints to check when literal[i] becomes True/False
    self.propQ = []      # propagation queue

    # Assignments
    self.assigns = []    # [VarInfo]
    self.trail = []      # order of assignments
    self.trail_lim = []    # decision levels

    self.toplevel_conflict = False

    self.makeAtMostOneClause = makeAtMostOneClause(self)
    self.makeUnionClause = makeUnionClause(self)
  
  def get_decision_level(self):
    return len(self.trail_lim)

  def add_variable(self, obj):
    debug("add_variable('%s')", obj)
    index = len(self.assigns)

    self.watches += [[], []]  # Add watch lists for X and not(X)
    self.assigns.append(VarInfo(obj))
    return index

  # lit is now True
  # reason is the clause that is asserting this
  # Returns False if this immediately causes a conflict.
  def enqueue(self, lit, reason):
    debug("%s => %s" % (reason, self.name_lit(lit)))
    old_value = self.lit_value(lit)
    if old_value is not None:
      if old_value is False:
        # Conflict
        return False
      else:
        # Already set (shouldn't happen)
        return True

    if lit < 0:
      var_info = self.assigns[neg(lit)]
      var_info.value = False
    else:
      var_info = self.assigns[lit]
      var_info.value = True
    var_info.level = self.get_decision_level()
    var_info.reason = reason

    self.trail.append(lit)
    self.propQ.append(lit)

    return True

  # Pop most recent assignment from self.trail
  def undo_one(self):
    lit = self.trail[-1]
    debug("(pop %s)", self.name_lit(lit))
    var_info = self.get_varinfo_for_lit(lit)
    var_info.value = None
    var_info.reason = None
    var_info.level = -1
    self.trail.pop()

    while var_info.undo:
      var_info.undo.pop().undo(lit)
  
  def cancel(self):
    n_this_level = len(self.trail) - self.trail_lim[-1]
    debug("backtracking from level %d (%d assignments)" %
        (self.get_decision_level(), n_this_level))
    while n_this_level != 0:
      self.undo_one()
      n_this_level -= 1
    self.trail_lim.pop()

  def cancel_until(self, level):
    while self.get_decision_level() > level:
      self.cancel()
  
  # Process the propQ.
  # Returns None when done, or the clause that caused a conflict.
  def propagate(self):
    #debug("propagate: queue length = %d", len(self.propQ))
    while self.propQ:
      lit = self.propQ[0]
      del self.propQ[0]
      var_info = self.get_varinfo_for_lit(lit)
      wi = watch_index(lit)
      watches = self.watches[wi]
      self.watches[wi] = []

      debug("%s -> True : watches: %s" % (self.name_lit(lit), watches))

      # Notifiy all watchers
      for i in range(len(watches)):
        clause = watches[i]
        if not clause.propagate(lit):
          # Conflict

          # Re-add remaining watches
          self.watches[wi] += watches[i+1:]
          
          # No point processing the rest of the queue as
          # we'll have to backtrack now.
          self.propQ = []

          return clause
    return None
  
  def impossible(self):
    self.toplevel_conflict = True

  def get_varinfo_for_lit(self, lit):
    if lit >= 0:
      return self.assigns[lit]
    else:
      return self.assigns[neg(lit)]
  
  def lit_value(self, lit):
    if lit >= 0:
      value = self.assigns[lit].value
      return value
    else:
      v = -1 - lit
      value = self.assigns[v].value
      if value is None:
        return None
      else:
        return not value
  
  # Call cb when lit becomes True
  def watch_lit(self, lit, cb):
    #debug("%s is watching for %s to become True" % (cb, self.name_lit(lit)))
    self.watches[watch_index(lit)].append(cb)

  # Returns the new clause if one was added, True if none was added
  # because this clause is trivially True, or False if the clause is
  # False.
  def _add_clause(self, lits, learnt):
    if not lits:
      assert not learnt
      self.toplevel_conflict = True
      return False
    elif len(lits) == 1:
      # A clause with only a single literal is represented
      # as an assignment rather than as a clause.
      if learnt:
        reason = "learnt"
      else:
        reason = "top-level"
      return self.enqueue(lits[0], reason)

    clause = self.makeUnionClause(lits)
    clause.learnt = learnt

    if learnt:
      # lits[0] is None because we just backtracked.
      # Start watching the next literal that we will
      # backtrack over.
      best_level = -1
      best_i = 1
      for i in range(1, len(lits)):
        level = self.get_varinfo_for_lit(lits[i]).level
        if level > best_level:
          best_level = level
          best_i = i
      lits[1], lits[best_i] = lits[best_i], lits[1]

    # Watch the first two literals in the clause (both must be
    # undefined at this point).
    for lit in lits[:2]:
      self.watch_lit(neg(lit), clause)

    return clause

  def name_lits(self, lst):
    return [self.name_lit(l) for l in lst]

  # For nicer debug messages
  def name_lit(self, lit):
    if lit >= 0:
      return self.assigns[lit].name
    return "not(%s)" % self.assigns[neg(lit)].name
  
  def add_clause(self, lits):
    # Public interface. Only used before the solve starts.
    assert lits

    debug("add_clause([%s])" % ', '.join(self.name_lits(lits)))

    if any(self.lit_value(l) == True for l in lits):
      # Trivially true already.
      return True
    lit_set = set(lits)
    for l in lits:
      if neg(l) in lit_set:
        # X or not(X) is always True.
        return True
    # Remove duplicates and values known to be False
    lits = [l for l in lit_set if self.lit_value(l) != False]

    retval = self._add_clause(lits, learnt = False)
    if not retval:
      self.toplevel_conflict = True
    return retval

  def at_most_one(self, lits):
    assert lits

    debug("at_most_one(%s)" % ', '.join(self.name_lits(lits)))

    # If we have zero or one literals then we're trivially true
    # and not really needed for the solve. However, Zero Install
    # monitors these objects to find out what was selected, so
    # keep even trivial ones around for that.
    #
    #if len(lits) < 2:
    #  return True  # Trivially true

    # Ensure no duplicates
    assert len(set(lits)) == len(lits), lits

    # Ignore any literals already known to be False.
    # If any are True then they're enqueued and we'll process them
    # soon.
    lits = [l for l in lits if self.lit_value(l) != False]

    clause = self.makeAtMostOneClause(lits)

    for lit in lits:
      self.watch_lit(lit, clause)

    return clause

  def analyse(self, cause):
    # After trying some assignments, we've discovered a conflict.
    # e.g.
    # - we selected A then B then C
    # - from A, B, C we got X, Y
    # - we have a rule: not(A) or not(X) or not(Y)
    #
    # The simplest thing to do would be:
    # 1. add the rule "not(A) or not(B) or not(C)"
    # 2. unassign C
    #
    # Then we we'd deduce not(C) and we could try something else.
    # However, that would be inefficient. We want to learn a more
    # general rule that will help us with the rest of the problem.
    #
    # We take the clause that caused the conflict ("cause") and
    # ask it for its cause. In this case:
    #
    #  A and X and Y => conflict
    #
    # Since X and Y followed logically from A, B, C there's no
    # point learning this rule; we need to know to avoid A, B, C
    # *before* choosing C. We ask the two variables deduced at the
    # current level (X and Y) what caused them, and work backwards.
    # e.g.
    #
    #  X: A and C => X
    #  Y: C => Y
    #
    # Combining these, we get the cause of the conflict in terms of
    # things we knew before the current decision level:
    #
    #  A and X and Y => conflict
    #  A and (A and C) and (C) => conflict
    #  A and C => conflict
    #
    # We can then learn (record) the more general rule:
    #
    #  not(A) or not(C)
    #
    # Then, in future, whenever A is selected we can remove C and
    # everything that depends on it from consideration.


    learnt = [None]    # The general rule we're learning
    btlevel = 0    # The deepest decision in learnt
    p = None    # The literal we want to expand now
    seen = set()    # The variables involved in the conflict

    counter = 0

    while True:
      # cause is the reason why p is True (i.e. it enqueued it).
      # The first time, p is None, which requests the reason
      # why it is conflicting.
      if p is None:
        debug("Why did %s make us fail?" % cause)
        p_reason = cause.cacl_reason(p)
        debug("Because: %s => conflict" % (' and '.join(self.name_lits(p_reason))))
      else:
        debug("Why did %s lead to %s?" % (cause, self.name_lit(p)))
        p_reason = cause.cacl_reason(p)
        debug("Because: %s => %s" % (' and '.join(self.name_lits(p_reason)), self.name_lit(p)))

      # p_reason is in the form (A and B and ...)
      # p_reason => p

      # Check each of the variables in p_reason that we haven't
      # already considered:
      # - if the variable was assigned at the current level,
      #   mark it for expansion
      # - otherwise, add it to learnt

      for lit in p_reason:
        var_info = self.get_varinfo_for_lit(lit)
        if var_info not in seen:
          seen.add(var_info)
          if var_info.level == self.get_decision_level():
            # We deduced this var since the last decision.
            # It must be in self.trail, so we'll get to it
            # soon. Remember not to stop until we've processed it.
            counter += 1
          elif var_info.level > 0:
            # We won't expand lit, just remember it.
            # (we could expand it if it's not a decision, but
            # apparently not doing so is useful)
            learnt.append(neg(lit))
            btlevel = max(btlevel, var_info.level)
        # else we already considered the cause of this assignment

      # At this point, counter is the number of assigned
      # variables in self.trail at the current decision level that
      # we've seen. That is, the number left to process. Pop
      # the next one off self.trail (as well as any unrelated
      # variables before it; everything up to the previous
      # decision has to go anyway).

      # On the first time round the loop, we must find the
      # conflict depends on at least one assignment at the
      # current level. Otherwise, simply setting the decision
      # variable caused a clause to conflict, in which case
      # the clause should have asserted not(decision-variable)
      # before we ever made the decision.
      # On later times round the loop, counter was already >
      # 0 before we started iterating over p_reason.
      assert counter > 0

      while True:
        p = self.trail[-1]
        var_info = self.get_varinfo_for_lit(p)
        cause = var_info.reason
        self.undo_one()
        if var_info in seen:
          break
        debug("(irrelevant)")
      counter -= 1

      if counter <= 0:
        assert counter == 0
        # If counter = 0 then we still have one more
        # literal (p) at the current level that we
        # could expand. However, apparently it's best
        # to leave this unprocessed (says the minisat
        # paper).
        break

    # p is the literal we decided to stop processing on. It's either
    # a derived variable at the current level, or the decision that
    # led to this level. Since we're not going to expand it, add it
    # directly to the learnt clause.
    learnt[0] = neg(p)

    debug("Learnt: %s" % (' or '.join(self.name_lits(learnt))))

    return learnt, btlevel

  def run_solver(self, decide):
    # Check whether we detected a trivial problem
    # during setup.
    if self.toplevel_conflict:
      debug("FAIL: toplevel_conflict before starting solve!")
      return False

    while True:
      # Use logical deduction to simplify the clauses
      # and assign literals where there is only one possibility.
      conflicting_clause = self.propagate()
      if not conflicting_clause:
        debug("new state: %s", self.assigns)
        if all(info.value != None for info in self.assigns):
          # Everything is assigned without conflicts
          debug("SUCCESS!")
          return True
        else:
          # Pick a variable and try assigning it one way.
          # If it leads to a conflict, we'll backtrack and
          # try it the other way.
          lit = decide()
          assert lit is not None, "decide function returned None!"
          assert self.lit_value(lit) is None
          self.trail_lim.append(len(self.trail))
          r = self.enqueue(lit, reason = "considering")
          assert r is True
      else:
        if self.get_decision_level() == 0:
          debug("FAIL: conflict found at top level")
          return False
        else:
          # Figure out the root cause of this failure.
          learnt, backtrack_level = self.analyse(conflicting_clause)

          self.cancel_until(backtrack_level)

          c = self._add_clause(learnt, learnt = True)

          if c is not True:
            # Everything except the first literal in learnt is known to
            # be False, so the first must be True.
            e = self.enqueue(learnt[0], c)
            assert e is True
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.