dnslib.py :  » Language-Interface » ChinesePython » chinesepython2.1.3-0.4 » Demo » dns » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Language Interface » ChinesePython 
ChinesePython » chinesepython2.1.3 0.4 » Demo » dns » dnslib.py
# Domain Name Server (DNS) interface
#
# See RFC 1035:
# ------------------------------------------------------------------------
# Network Working Group                                     P. Mockapetris
# Request for Comments: 1035                                           ISI
#                                                            November 1987
# Obsoletes: RFCs 882, 883, 973
# 
#             DOMAIN NAMES - IMPLEMENTATION AND SPECIFICATION
# ------------------------------------------------------------------------


import string

import dnstype
import dnsclass
import dnsopcode


# Low-level 16 and 32 bit integer packing and unpacking

def pack16bit(n):
  return chr((n>>8)&0xFF) + chr(n&0xFF)

def pack32bit(n):
  return chr((n>>24)&0xFF) + chr((n>>16)&0xFF) \
      + chr((n>>8)&0xFF) + chr(n&0xFF)

def unpack16bit(s):
  return (ord(s[0])<<8) | ord(s[1])

def unpack32bit(s):
  return (ord(s[0])<<24) | (ord(s[1])<<16) \
      | (ord(s[2])<<8) | ord(s[3])

def addr2bin(addr):
  if type(addr) == type(0):
    return addr
  bytes = string.splitfields(addr, '.')
  if len(bytes) != 4: raise ValueError, 'bad IP address'
  n = 0
  for byte in bytes: n = n<<8 | string.atoi(byte)
  return n

def bin2addr(n):
  return '%d.%d.%d.%d' % ((n>>24)&0xFF, (n>>16)&0xFF,
      (n>>8)&0xFF, n&0xFF)


# Packing class

class Packer:
  def __init__(self):
    self.buf = ''
    self.index = {}
  def getbuf(self):
    return self.buf
  def addbyte(self, c):
    if len(c) != 1: raise TypeError, 'one character expected'
    self.buf = self.buf + c
  def addbytes(self, bytes):
    self.buf = self.buf + bytes
  def add16bit(self, n):
    self.buf = self.buf + pack16bit(n)
  def add32bit(self, n):
    self.buf = self.buf + pack32bit(n)
  def addaddr(self, addr):
    n = addr2bin(addr)
    self.buf = self.buf + pack32bit(n)
  def addstring(self, s):
    self.addbyte(chr(len(s)))
    self.addbytes(s)
  def addname(self, name):
    # Domain name packing (section 4.1.4)
    # Add a domain name to the buffer, possibly using pointers.
    # The case of the first occurrence of a name is preserved.
    # Redundant dots are ignored.
    list = []
    for label in string.splitfields(name, '.'):
      if label:
        if len(label) > 63:
          raise PackError, 'label too long'
        list.append(label)
    keys = []
    for i in range(len(list)):
      key = string.upper(string.joinfields(list[i:], '.'))
      keys.append(key)
      if self.index.has_key(key):
        pointer = self.index[key]
        break
    else:
      i = len(list)
      pointer = None
    # Do it into temporaries first so exceptions don't
    # mess up self.index and self.buf
    buf = ''
    offset = len(self.buf)
    index = []
    for j in range(i):
      label = list[j]
      n = len(label)
      if offset + len(buf) < 0x3FFF:
        index.append( (keys[j], offset + len(buf)) )
      else:
        print 'dnslib.Packer.addname:',
        print 'warning: pointer too big'
      buf = buf + (chr(n) + label)
    if pointer:
      buf = buf + pack16bit(pointer | 0xC000)
    else:
      buf = buf + '\0'
    self.buf = self.buf + buf
    for key, value in index:
      self.index[key] = value
  def dump(self):
    keys = self.index.keys()
    keys.sort()
    print '-'*40
    for key in keys:
      print '%20s %3d' % (key, self.index[key])
    print '-'*40
    space = 1
    for i in range(0, len(self.buf)+1, 2):
      if self.buf[i:i+2] == '**':
        if not space: print
        space = 1
        continue
      space = 0
      print '%4d' % i,
      for c in self.buf[i:i+2]:
        if ' ' < c < '\177':
          print ' %c' % c,
        else:
          print '%2d' % ord(c),
      print
    print '-'*40


# Unpacking class

UnpackError = 'dnslib.UnpackError'  # Exception

class Unpacker:
  def __init__(self, buf):
    self.buf = buf
    self.offset = 0
  def getbyte(self):
    c = self.buf[self.offset]
    self.offset = self.offset + 1
    return c
  def getbytes(self, n):
    s = self.buf[self.offset : self.offset + n]
    if len(s) != n: raise UnpackError, 'not enough data left'
    self.offset = self.offset + n
    return s
  def get16bit(self):
    return unpack16bit(self.getbytes(2))
  def get32bit(self):
    return unpack32bit(self.getbytes(4))
  def getaddr(self):
    return bin2addr(self.get32bit())
  def getstring(self):
    return self.getbytes(ord(self.getbyte()))
  def getname(self):
    # Domain name unpacking (section 4.1.4)
    c = self.getbyte()
    i = ord(c)
    if i & 0xC0 == 0xC0:
      d = self.getbyte()
      j = ord(d)
      pointer = ((i<<8) | j) & ~0xC000
      save_offset = self.offset
      try:
        self.offset = pointer
        domain = self.getname()
      finally:
        self.offset = save_offset
      return domain
    if i == 0:
      return ''
    domain = self.getbytes(i)
    remains = self.getname()
    if not remains:
      return domain
    else:
      return domain + '.' + remains


# Test program for packin/unpacking (section 4.1.4)

def testpacker():
  N = 25
  R = range(N)
  import timing
  # See section 4.1.4 of RFC 1035
  timing.start()
  for i in R:
    p = Packer()
    p.addbytes('*' * 20)
    p.addname('f.ISI.ARPA')
    p.addbytes('*' * 8)
    p.addname('Foo.F.isi.arpa')
    p.addbytes('*' * 18)
    p.addname('arpa')
    p.addbytes('*' * 26)
    p.addname('')
  timing.finish()
  print round(timing.milli() * 0.001 / N, 3), 'seconds per packing'
  p.dump()
  u = Unpacker(p.buf)
  u.getbytes(20)
  u.getname()
  u.getbytes(8)
  u.getname()
  u.getbytes(18)
  u.getname()
  u.getbytes(26)
  u.getname()
  timing.start()
  for i in R:
    u = Unpacker(p.buf)
    res = (u.getbytes(20),
           u.getname(),
           u.getbytes(8),
           u.getname(),
           u.getbytes(18),
           u.getname(),
           u.getbytes(26),
           u.getname())
  timing.finish()
  print round(timing.milli() * 0.001 / N, 3), 'seconds per unpacking'
  for item in res: print item


# Pack/unpack RR toplevel format (section 3.2.1)

class RRpacker(Packer):
  def __init__(self):
    Packer.__init__(self)
    self.rdstart = None
  def addRRheader(self, name, type, klass, ttl, *rest):
    self.addname(name)
    self.add16bit(type)
    self.add16bit(klass)
    self.add32bit(ttl)
    if rest:
      if res[1:]: raise TypeError, 'too many args'
      rdlength = rest[0]
    else:
      rdlength = 0
    self.add16bit(rdlength)
    self.rdstart = len(self.buf)
  def patchrdlength(self):
    rdlength = unpack16bit(self.buf[self.rdstart-2:self.rdstart])
    if rdlength == len(self.buf) - self.rdstart:
      return
    rdata = self.buf[self.rdstart:]
    save_buf = self.buf
    ok = 0
    try:
      self.buf = self.buf[:self.rdstart-2]
      self.add16bit(len(rdata))
      self.buf = self.buf + rdata
      ok = 1
    finally:
      if not ok: self.buf = save_buf
  def endRR(self):
    if self.rdstart is not None:
      self.patchrdlength()
    self.rdstart = None
  def getbuf(self):
    if self.rdstart is not None: self.patchrdlenth()
    return Packer.getbuf(self)
  # Standard RRs (section 3.3)
  def addCNAME(self, name, klass, ttl, cname):
    self.addRRheader(name, dnstype.CNAME, klass, ttl)
    self.addname(cname)
    self.endRR()
  def addHINFO(self, name, klass, ttl, cpu, os):
    self.addRRheader(name, dnstype.HINFO, klass, ttl)
    self.addstring(cpu)
    self.addstring(os)
    self.endRR()
  def addMX(self, name, klass, ttl, preference, exchange):
    self.addRRheader(name, dnstype.MX, klass, ttl)
    self.add16bit(preference)
    self.addname(exchange)
    self.endRR()
  def addNS(self, name, klass, ttl, nsdname):
    self.addRRheader(name, dnstype.NS, klass, ttl)
    self.addname(nsdname)
    self.endRR()
  def addPTR(self, name, klass, ttl, ptrdname):
    self.addRRheader(name, dnstype.PTR, klass, ttl)
    self.addname(ptrdname)
    self.endRR()
  def addSOA(self, name, klass, ttl,
      mname, rname, serial, refresh, retry, expire, minimum):
    self.addRRheader(name, dnstype.SOA, klass, ttl)
    self.addname(mname)
    self.addname(rname)
    self.add32bit(serial)
    self.add32bit(refresh)
    self.add32bit(retry)
    self.add32bit(expire)
    self.add32bit(minimum)
    self.endRR()
  def addTXT(self, name, klass, ttl, list):
    self.addRRheader(name, dnstype.TXT, klass, ttl)
    for txtdata in list:
      self.addstring(txtdata)
    self.endRR()
  # Internet specific RRs (section 3.4) -- class = IN
  def addA(self, name, ttl, address):
    self.addRRheader(name, dnstype.A, dnsclass.IN, ttl)
    self.addaddr(address)
    self.endRR()
  def addWKS(self, name, ttl, address, protocol, bitmap):
    self.addRRheader(name, dnstype.WKS, dnsclass.IN, ttl)
    self.addaddr(address)
    self.addbyte(chr(protocol))
    self.addbytes(bitmap)
    self.endRR()


class RRunpacker(Unpacker):
  def __init__(self, buf):
    Unpacker.__init__(self, buf)
    self.rdend = None
  def getRRheader(self):
    name = self.getname()
    type = self.get16bit()
    klass = self.get16bit()
    ttl = self.get32bit()
    rdlength = self.get16bit()
    self.rdend = self.offset + rdlength
    return (name, type, klass, ttl, rdlength)
  def endRR(self):
    if self.offset != self.rdend:
      raise UnpackError, 'end of RR not reached'
  def getCNAMEdata(self):
    return self.getname()
  def getHINFOdata(self):
    return self.getstring(), self.getstring()
  def getMXdata(self):
    return self.get16bit(), self.getname()
  def getNSdata(self):
    return self.getname()
  def getPTRdata(self):
    return self.getname()
  def getSOAdata(self):
    return self.getname(), \
           self.getname(), \
           self.get32bit(), \
           self.get32bit(), \
           self.get32bit(), \
           self.get32bit(), \
           self.get32bit()
  def getTXTdata(self):
    list = []
    while self.offset != self.rdend:
      list.append(self.getstring())
    return list
  def getAdata(self):
    return self.getaddr()
  def getWKSdata(self):
    address = self.getaddr()
    protocol = ord(self.getbyte())
    bitmap = self.getbytes(self.rdend - self.offset)
    return address, protocol, bitmap


# Pack/unpack Message Header (section 4.1)

class Hpacker(Packer):
  def addHeader(self, id, qr, opcode, aa, tc, rd, ra, z, rcode,
      qdcount, ancount, nscount, arcount):
    self.add16bit(id)
    self.add16bit((qr&1)<<15 | (opcode*0xF)<<11 | (aa&1)<<10
        | (tc&1)<<9 | (rd&1)<<8 | (ra&1)<<7
        | (z&7)<<4 | (rcode&0xF))
    self.add16bit(qdcount)
    self.add16bit(ancount)
    self.add16bit(nscount)
    self.add16bit(arcount)

class Hunpacker(Unpacker):
  def getHeader(self):
    id = self.get16bit()
    flags = self.get16bit()
    qr, opcode, aa, tc, rd, ra, z, rcode = (
        (flags>>15)&1,
        (flags>>11)&0xF,
        (flags>>10)&1,
        (flags>>9)&1,
        (flags>>8)&1,
        (flags>>7)&1,
        (flags>>4)&7,
        (flags>>0)&0xF)
    qdcount = self.get16bit()
    ancount = self.get16bit()
    nscount = self.get16bit()
    arcount = self.get16bit()
    return (id, qr, opcode, aa, tc, rd, ra, z, rcode,
        qdcount, ancount, nscount, arcount)


# Pack/unpack Question (section 4.1.2)

class Qpacker(Packer):
  def addQuestion(self, qname, qtype, qclass):
    self.addname(qname)
    self.add16bit(qtype)
    self.add16bit(qclass)

class Qunpacker(Unpacker):
  def getQuestion(self):
    return self.getname(), self.get16bit(), self.get16bit()


# Pack/unpack Message(section 4)
# NB the order of the base classes is important for __init__()!

class Mpacker(RRpacker, Qpacker, Hpacker):
  pass

class Munpacker(RRunpacker, Qunpacker, Hunpacker):
  pass


# Routines to print an unpacker to stdout, for debugging.
# These affect the unpacker's current position!

def dumpM(u):
  print 'HEADER:',
  (id, qr, opcode, aa, tc, rd, ra, z, rcode,
      qdcount, ancount, nscount, arcount) = u.getHeader()
  print 'id=%d,' % id,
  print 'qr=%d, opcode=%d, aa=%d, tc=%d, rd=%d, ra=%d, z=%d, rcode=%d,' \
      % (qr, opcode, aa, tc, rd, ra, z, rcode)
  if tc: print '*** response truncated! ***'
  if rcode: print '*** nonzero error code! (%d) ***' % rcode
  print '  qdcount=%d, ancount=%d, nscount=%d, arcount=%d' \
      % (qdcount, ancount, nscount, arcount)
  for i in range(qdcount):
    print 'QUESTION %d:' % i,
    dumpQ(u)
  for i in range(ancount):
    print 'ANSWER %d:' % i,
    dumpRR(u)
  for i in range(nscount):
    print 'AUTHORITY RECORD %d:' % i,
    dumpRR(u)
  for i in range(arcount):
    print 'ADDITIONAL RECORD %d:' % i,
    dumpRR(u)

def dumpQ(u):
  qname, qtype, qclass = u.getQuestion()
  print 'qname=%s, qtype=%d(%s), qclass=%d(%s)' \
      % (qname,
         qtype, dnstype.typestr(qtype),
         qclass, dnsclass.classstr(qclass))

def dumpRR(u):
  name, type, klass, ttl, rdlength = u.getRRheader()
  typename = dnstype.typestr(type)
  print 'name=%s, type=%d(%s), class=%d(%s), ttl=%d' \
      % (name,
         type, typename,
         klass, dnsclass.classstr(klass),
         ttl)
  mname = 'get%sdata' % typename
  if hasattr(u, mname):
    print '  formatted rdata:', getattr(u, mname)()
  else:
    print '  binary rdata:', u.getbytes(rdlength)


# Test program

def test():
  import sys
  import getopt
  import socket
  protocol = 'udp'
  server = 'cnri.reston.va.us' # XXX adapt this to your local 
  port = 53
  opcode = dnsopcode.QUERY
  rd = 0
  qtype = dnstype.MX
  qname = 'cwi.nl'
  try:
    opts, args = getopt.getopt(sys.argv[1:], 'Trs:tu')
    if len(args) > 2: raise getopt.error, 'too many arguments'
  except getopt.error, msg:
    print msg
    print 'Usage: python dnslib.py',
    print '[-T] [-r] [-s server] [-t] [-u]',
    print '[qtype [qname]]'
    print '-T:        run testpacker() and exit'
    print '-r:        recursion desired (default not)'
    print '-s server: use server (default %s)' % server
    print '-t:        use TCP protocol'
    print '-u:        use UDP protocol (default)'
    print 'qtype:     query type (default %s)' % \
        dnstype.typestr(qtype)
    print 'qname:     query name (default %s)' % qname
    print 'Recognized qtype values:'
    qtypes = dnstype.typemap.keys()
    qtypes.sort()
    n = 0
    for qtype in qtypes:
      n = n+1
      if n >= 8: n = 1; print
      print '%s = %d' % (dnstype.typemap[qtype], qtype),
    print
    sys.exit(2)
  for o, a in opts:
    if o == '-T': testpacker(); return
    if o == '-t': protocol = 'tcp'
    if o == '-u': protocol = 'udp'
    if o == '-s': server = a
    if o == '-r': rd = 1
  if args[0:]:
    try:
      qtype = eval(string.upper(args[0]), dnstype.__dict__)
    except (NameError, SyntaxError):
      print 'bad query type:', `args[0]`
      sys.exit(2)
  if args[1:]:
    qname = args[1]
  if qtype == dnstype.AXFR:
    print 'Query type AXFR, protocol forced to TCP'
    protocol = 'tcp'
  print 'QTYPE %d(%s)' % (qtype, dnstype.typestr(qtype))
  m = Mpacker()
  m.addHeader(0,
      0, opcode, 0, 0, rd, 0, 0, 0,
      1, 0, 0, 0)
  m.addQuestion(qname, qtype, dnsclass.IN)
  request = m.getbuf()
  if protocol == 'udp':
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect((server, port))
    s.send(request)
    reply = s.recv(1024)
  else:
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((server, port))
    s.send(pack16bit(len(request)) + request)
    s.shutdown(1)
    f = s.makefile('r')
    header = f.read(2)
    if len(header) < 2:
      print '*** EOF ***'
      return
    count = unpack16bit(header)
    reply = f.read(count)
    if len(reply) != count:
      print '*** Incomplete reply ***'
      return
  u = Munpacker(reply)
  dumpM(u)
  if protocol == 'tcp' and qtype == dnstype.AXFR:
    while 1:
      header = f.read(2)
      if len(header) < 2:
        print '========== EOF =========='
        break
      count = unpack16bit(header)
      if not count:
        print '========== ZERO COUNT =========='
        break
      print '========== NEXT =========='
      reply = f.read(count)
      if len(reply) != count:
        print '*** Incomplete reply ***'
        break
      u = Munpacker(reply)
      dumpM(u)


# Run test program when called as a script

if __name__ == '__main__':
  test()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.