VCR.py :  » Language-Interface » ChinesePython » chinesepython2.1.3-0.4 » Demo » sgi » video » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Language Interface » ChinesePython 
ChinesePython » chinesepython2.1.3 0.4 » Demo » sgi » video » VCR.py
import fcntl
import IOCTL
from IOCTL import *
import sys
import struct
import select
import posix
import time

DEVICE='/dev/ttyd2'

class UnixFile:
  def open(self, name, mode):
    self.fd = posix.open(name, mode)
    return self

  def read(self, len):
    return posix.read(self.fd, len)

  def write(self, data):
    dummy = posix.write(self.fd, data)

  def flush(self):
    pass

  def fileno(self):
    return self.fd

  def close(self):
    dummy = posix.close(self.fd)

def packttyargs(*args):
  if type(args) <> type(()):
    raise 'Incorrect argtype for packttyargs'
  if type(args[0]) == type(1):
    iflag, oflag, cflag, lflag, line, chars = args
  elif type(args[0]) == type(()):
    if len(args) <> 1:
      raise 'Only 1 argument expected'
    iflag, oflag, cflag, lflag, line, chars = args[0]
  elif type(args[0]) == type([]):
    if len(args) <> 1:
      raise 'Only 1 argument expected'
    [iflag, oflag, cflag, lflag, line, chars] = args[0]
  str = struct.pack('hhhhb', iflag, oflag, cflag, lflag, line)
  for c in chars:
    str = str + c
  return str

def nullttyargs():
  chars = ['\0']*IOCTL.NCCS
  return packttyargs(0, 0, 0, 0, 0, chars)

def unpackttyargs(str):
  args = str[:-IOCTL.NCCS]
  rawchars = str[-IOCTL.NCCS:]
  chars = []
  for c in rawchars:
    chars.append(c)
  iflag, oflag, cflag, lflag, line = struct.unpack('hhhhb', args)
  return (iflag, oflag, cflag, lflag, line, chars)

def initline(name):
  fp = UnixFile().open(name, 2)
  ofp = fp
  fd = fp.fileno()
  rv = fcntl.ioctl(fd, IOCTL.TCGETA, nullttyargs())
  iflag, oflag, cflag, lflag, line, chars = unpackttyargs(rv)
  iflag = iflag & ~(INPCK|ISTRIP|INLCR|IUCLC|IXON|IXOFF)
  oflag = oflag & ~OPOST
  cflag = B9600|CS8|CREAD|CLOCAL
  lflag = lflag & ~(ISIG|ICANON|ECHO|TOSTOP)
  chars[VMIN] = chr(1)
  chars[VTIME] = chr(0)
  arg = packttyargs(iflag, oflag, cflag, lflag, line, chars)
  dummy = fcntl.ioctl(fd, IOCTL.TCSETA, arg)
  return fp, ofp

#
#
error = 'VCR.error'

# Commands/replies:
COMPLETION = '\x01'
ACK  ='\x0a'
NAK  ='\x0b'

NUMBER_N = 0x30
ENTER  = '\x40'

EXP_7= '\xde'
EXP_8= '\xdf'

CL   ='\x56'
CTRL_ENABLE = EXP_8 + '\xc6'
SEARCH_DATA = EXP_8 + '\x93'
ADDR_SENSE = '\x60'

PLAY ='\x3a'
STOP ='\x3f'
EJECT='\x2a'
FF   ='\xab'
REW  ='\xac'
STILL='\x4f'
STEP_FWD ='\x2b'    # Was: '\xad'
FM_SELECT=EXP_8 + '\xc8'
FM_STILL=EXP_8 + '\xcd'
PREVIEW=EXP_7 + '\x9d'
REVIEW=EXP_7 + '\x9e'
DM_OFF=EXP_8 + '\xc9'
DM_SET=EXP_8 + '\xc4'
FWD_SHUTTLE='\xb5'
REV_SHUTTLE='\xb6'
EM_SELECT=EXP_8 + '\xc0'
N_FRAME_REC=EXP_8 + '\x92'
SEARCH_PREROLL=EXP_8 + '\x90'
EDIT_PB_STANDBY=EXP_8 + '\x96'
EDIT_PLAY=EXP_8 + '\x98'
AUTO_EDIT=EXP_7 + '\x9c'

IN_ENTRY=EXP_7 + '\x90'
IN_ENTRY_RESET=EXP_7 + '\x91'
IN_ENTRY_SET=EXP_7 + '\x98'
IN_ENTRY_INC=EXP_7 + '\x94'
IN_ENTRY_DEC=EXP_7 + '\x95'
IN_ENTRY_SENSE=EXP_7 + '\x9a'

OUT_ENTRY=EXP_7 + '\x92'
OUT_ENTRY_RESET=EXP_7 + '\x93'
OUT_ENTRY_SET=EXP_7 + '\x99'
OUT_ENTRY_INC=EXP_7 + '\x96'
OUT_ENTRY_DEC=EXP_7 + '\x98'
OUT_ENTRY_SENSE=EXP_7 + '\x9b'

MUTE_AUDIO = '\x24'
MUTE_AUDIO_OFF = '\x25'
MUTE_VIDEO = '\x26'
MUTE_VIDEO_OFF = '\x27'
MUTE_AV = EXP_7 + '\xc6'
MUTE_AV_OFF = EXP_7 + '\xc7'

DEBUG=0

class VCR:
  def __init__(self):
    self.ifp, self.ofp = initline(DEVICE)
    self.busy_cmd = None
    self.async = 0
    self.cb = None
    self.cb_arg = None

  def _check(self):
    if self.busy_cmd:
      raise error, 'Another command active: '+self.busy_cmd

  def _endlongcmd(self, name):
    if not self.async:
      self.waitready()
      return 1
    self.busy_cmd = name
    return 'VCR BUSY'

  def fileno(self):
    return self.ifp.fileno()

  def setasync(self, async):
    self.async = async

  def setcallback(self, cb, arg):
    self.setasync(1)
    self.cb = cb
    self.cb_arg = arg

  def poll(self):
    if not self.async:
      raise error, 'Can only call poll() in async mode'
    if not self.busy_cmd:
      return
    if self.testready():
      if self.cb:
        apply(self.cb, (self.cb_arg,))

  def _cmd(self, cmd):
    if DEBUG:
      print '>>>',`cmd`
    self.ofp.write(cmd)
    self.ofp.flush()

  def _waitdata(self, len, tout):
    rep = ''
    while len > 0:
      if tout == None:
        ready, d1, d2 = select.select( \
            [self.ifp], [], [])
      else:
        ready, d1, d2 = select.select( \
            [self.ifp], [], [], tout)
      if ready == []:
##        if rep:
##          print 'FLUSHED:', `rep`
        return None
      data = self.ifp.read(1)
      if DEBUG:
        print '<<<',`data`
      if data == NAK:
        return NAK
      rep = rep + data
      len = len - 1
    return rep

  def _reply(self, len):
    data = self._waitdata(len, 10)
    if data == None:
      raise error, 'Lost contact with VCR'
    return data

  def _getnumber(self, len):
    data = self._reply(len)
    number = 0
    for c in data:
      digit = ord(c) - NUMBER_N
      if digit < 0 or digit > 9:
        raise error, 'Non-digit in number'+`c`
      number = number*10 + digit
    return number

  def _iflush(self):
    dummy = self._waitdata(10000, 0)
##    if dummy:
##      print 'IFLUSH:', dummy

  def simplecmd(self,cmd):
    self._iflush()
    for ch in cmd:
      self._cmd(ch)
      rep = self._reply(1)
      if rep == NAK:
        return 0
      elif rep <> ACK:
        raise error, 'Unexpected reply:' + `rep`
    return 1

  def replycmd(self, cmd):
    if not self.simplecmd(cmd[:-1]):
      return 0
    self._cmd(cmd[-1])

  def _number(self, number, digits):
    if number < 0:
      raise error, 'Unexpected negative number:'+ `number`
    maxnum = pow(10, digits)
    if number > maxnum:
      raise error, 'Number too big'
    while maxnum > 1:
      number = number % maxnum
      maxnum = maxnum / 10
      digit = number / maxnum
      ok = self.simplecmd(chr(NUMBER_N + digit))
      if not ok:
        raise error, 'Error while transmitting number'

  def initvcr(self, *optarg):
    timeout = None
    if optarg <> ():
      timeout = optarg[0]
      starttime = time.time()
    self.busy_cmd = None
    self._iflush()
    while 1:
##      print 'SENDCL'
      self._cmd(CL)
      rep = self._waitdata(1, 2)
##      print `rep`
      if rep in ( None, CL, NAK ):
        if timeout:
          if time.time() - starttime > timeout:
            raise error, \
                'No reply from VCR'
        continue
      break
    if rep <> ACK:
      raise error, 'Unexpected reply:' + `rep`
    dummy = self.simplecmd(CTRL_ENABLE)

  def waitready(self):
    rep = self._waitdata(1, None)
    if rep == None:
      raise error, 'Unexpected None reply from waitdata'
    if rep not in  (COMPLETION, ACK):
      self._iflush()
      raise error, 'Unexpected waitready reply:' + `rep`
    self.busy_cmd = None

  def testready(self):
    rep = self._waitdata(1, 0)
    if rep == None:
      return 0
    if rep not in  (COMPLETION, ACK):
      self._iflush()
      raise error, 'Unexpected waitready reply:' + `rep`
    self.busy_cmd = None
    return 1

  def play(self): return self.simplecmd(PLAY)
  def stop(self): return self.simplecmd(STOP)
  def ff(self):   return self.simplecmd(FF)
  def rew(self):  return self.simplecmd(REW)
  def eject(self):return self.simplecmd(EJECT)
  def still(self):return self.simplecmd(STILL)
  def step(self): return self.simplecmd(STEP_FWD)

  def goto(self, (h, m, s, f)):
    if not self.simplecmd(SEARCH_DATA):
      return 0
    self._number(h, 2)
    self._number(m, 2)
    self._number(s, 2)
    self._number(f, 2)
    if not self.simplecmd(ENTER):
      return 0
    return self._endlongcmd('goto')

  # XXXX TC_SENSE doesn't seem to work
  def faulty_where(self):
    self._check()
    self._cmd(TC_SENSE)
    h = self._getnumber(2)
    m = self._getnumber(2)
    s = self._getnumber(2)
    f = self._getnumber(2)
    return (h, m, s, f)

  def where(self):
    return self.addr2tc(self.sense())

  def sense(self):
    self._check()
    self._cmd(ADDR_SENSE)
    num = self._getnumber(5)
    return num

  def addr2tc(self, num):
    f = num % 25
    num = num / 25
    s = num % 60
    num = num / 60
    m = num % 60
    h = num / 60
    return (h, m, s, f)

  def tc2addr(self, (h, m, s, f)):
    return ((h*60 + m)*60 + s)*25 + f

  def fmmode(self, mode):
    self._check()
    if mode == 'off':
      arg = 0
    elif mode == 'buffer':
      arg = 1
    elif mode == 'dnr':
      arg = 2
    else:
      raise error, 'fmmode arg should be off, buffer or dnr'
    if not self.simplecmd(FM_SELECT):
      return 0
    self._number(arg, 1)
    if not self.simplecmd(ENTER):
      return 0
    return 1

  def mute(self, mode, value):
    self._check()
    if mode == 'audio':
      cmds = (MUTE_AUDIO_OFF, MUTE_AUDIO)
    elif mode == 'video':
      cmds = (MUTE_VIDEO_OFF, MUTE_VIDEO)
    elif mode == 'av':
      cmds = (MUTE_AV_OFF, MUTE_AV)
    else:
      raise error, 'mute type should be audio, video or av'
    cmd = cmds[value]
    return self.simplecmd(cmd)

  def editmode(self, mode):
    self._check()
    if mode == 'off':
      a0 = a1 = a2 = 0
    elif mode == 'format':
      a0 = 4
      a1 = 7
      a2 = 4
    elif mode == 'asmbl':
      a0 = 1
      a1 = 7
      a2 = 4
    elif mode == 'insert-video':
      a0 = 2
      a1 = 4
      a2 = 0
    else:
      raise 'editmode should be off,format,asmbl or insert-video'
    if not self.simplecmd(EM_SELECT):
      return 0
    self._number(a0, 1)
    self._number(a1, 1)
    self._number(a2, 1)
    if not self.simplecmd(ENTER):
      return 0
    return 1

  def autoedit(self):
    self._check()
    return self._endlongcmd(AUTO_EDIT)

  def nframerec(self, num):
    if not self.simplecmd(N_FRAME_REC):
      return 0
    self._number(num, 4)
    if not self.simplecmd(ENTER):
      return 0
    return self._endlongcmd('nframerec')

  def fmstill(self):
    if not self.simplecmd(FM_STILL):
      return 0
    return self._endlongcmd('fmstill')

  def preview(self):
    if not self.simplecmd(PREVIEW):
      return 0
    return self._endlongcmd('preview')

  def review(self):
    if not self.simplecmd(REVIEW):
      return 0
    return self._endlongcmd('review')

  def search_preroll(self):
    if not self.simplecmd(SEARCH_PREROLL):
      return 0
    return self._endlongcmd('search_preroll')

  def edit_pb_standby(self):
    if not self.simplecmd(EDIT_PB_STANDBY):
      return 0
    return self._endlongcmd('edit_pb_standby')

  def edit_play(self):
    if not self.simplecmd(EDIT_PLAY):
      return 0
    return self._endlongcmd('edit_play')

  def dmcontrol(self, mode):
    self._check()
    if mode == 'off':
      return self.simplecmd(DM_OFF)
    if mode == 'multi freeze':
      num = 1000
    elif mode == 'zoom freeze':
      num = 2000
    elif mode == 'digital slow':
      num = 3000
    elif mode == 'freeze':
      num = 4011
    else:
      raise error, 'unknown dmcontrol argument: ' + `mode`
    if not self.simplecmd(DM_SET):
      return 0
    self._number(num, 4)
    if not self.simplecmd(ENTER):
      return 0
    return 1

  def fwdshuttle(self, num):
    if not self.simplecmd(FWD_SHUTTLE):
      return 0
    self._number(num, 1)
    return 1

  def revshuttle(self, num):
    if not self.simplecmd(REV_SHUTTLE):
      return 0
    self._number(num, 1)
    return 1

  def getentry(self, which):
    self._check()
    if which == 'in':
      cmd = IN_ENTRY_SENSE
    elif which == 'out':
      cmd = OUT_ENTRY_SENSE
    self.replycmd(cmd)
    h = self._getnumber(2)
    m = self._getnumber(2)
    s = self._getnumber(2)
    f = self._getnumber(2)
    return (h, m, s, f)

  def inentry(self, arg):
    return self.ioentry(arg, (IN_ENTRY, IN_ENTRY_RESET, \
        IN_ENTRY_SET, IN_ENTRY_INC, IN_ENTRY_DEC))

  def outentry(self, arg):
    return self.ioentry(arg, (OUT_ENTRY, OUT_ENTRY_RESET, \
        OUT_ENTRY_SET, OUT_ENTRY_INC, OUT_ENTRY_DEC))

  def ioentry(self, arg, (Load, Clear, Set, Inc, Dec)):
    self._check()
    if type(arg) == type(()):
      h, m, s, f = arg
      if not self.simplecmd(Set):
        return 0
      self._number(h,2)
      self._number(m,2)
      self._number(s,2)
      self._number(f,2)
      if not self.simplecmd(ENTER):
        return 0
      return 1
    elif arg == 'reset':
      cmd = Clear
    elif arg == 'load':
      cmd = Load
    elif arg == '+':
      cmd = Inc
    elif arg == '-':
      cmd = Dec
    else:
      raise error, 'Arg should be +,-,reset,load or (h,m,s,f)'
    return self.simplecmd(cmd)

  def cancel(self):
    d = self.simplecmd(CL)
    self.busy_cmd = None
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.