visa_study.py :  » Business-Application » Python-VISA » PyVISA-1.3 » src » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Business Application » Python VISA 
Python VISA » PyVISA 1.3 » src » visa_study.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#    visa_study.py - Alternative high-level VISA implementation
#
#    Copyright  2005, 2006, 2007, 2008
#                Torsten Bronger <bronger@physik.rwth-aachen.de>,
#                Gregor Thalhammer <gth@users.sourceforge.net>.
#  
#    This file is part of PyVISA.
#  
#    PyVISA is free software; you can redistribute it and/or modify it under
#    the terms of the MIT licence:
#
#    Permission is hereby granted, free of charge, to any person obtaining a
#    copy of this software and associated documentation files (the "Software"),
#    to deal in the Software without restriction, including without limitation
#    the rights to use, copy, modify, merge, publish, distribute, sublicense,
#    and/or sell copies of the Software, and to permit persons to whom the
#    Software is furnished to do so, subject to the following conditions:
#
#    The above copyright notice and this permission notice shall be included in
#    all copies or substantial portions of the Software.
#
#    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
#    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
#    DEALINGS IN THE SOFTWARE.
#

"""visa_study.py defines an Python interface to the VISA library
"""

__version__ = "$Revision: 1.4 $"
# $Source: /cvsroot/pyvisa/pyvisa/src/visa_study.py,v $

from ctypes import *
from visa_messages import *
from vpp43_types import *
from vpp43_constants import *
from visa_attributes import attributes_s,attributes
import os
import types
import sys
import logging

log = logging.getLogger('visa')


#load Visa library
if os.name == 'nt':
    visa = windll.visa32
elif os.name == 'posix':
    visa = cdll.LoadLibrary('libvisa.so')
else:
    raise "No implementation for your platform available."
    

class VisaError(IOError):
    """Base class for VISA errors"""
    def __init__(self, value):
        self.value = value
        
    def __str__(self):
        if self.value:
            (shortdesc, longdesc) = completion_and_error_messages[self.value]
            hexvalue = self.value #convert errorcodes (negative) to long
            if hexvalue < 0:
                hexvalue = hexvalue + 0x100000000L
            return shortdesc + " (%X): "%hexvalue + longdesc
            
#Checks return values for errors
def CheckStatus(status):
    #status = ViStatus(status).value
    if status < 0:
        raise VisaError(status)
    else:
        return status
    
#implement low level VISA functions

#VISA Resource Management
visa.viOpenDefaultRM.restype = CheckStatus
visa.viOpen.restype = CheckStatus
visa.viFindRsrc.restype = CheckStatus
visa.viFindNext.restype = CheckStatus
visa.viParseRsrc.restype = CheckStatus
visa.viParseRsrcEx.restype = CheckStatus
visa.viClose.restype    = CheckStatus
visa.viGetAttribute.restype = CheckStatus
visa.viSetAttribute.restype = CheckStatus

def OpenDefaultRM():
    """Return a session to the Default Resource Manager resource."""
    sesn = ViSession()
    result = visa.viOpenDefaultRM(byref(sesn))
    return (result, sesn.value)

def Open(sesn, rsrcName, accessMode, openTimeout):
    """Open a session to the specified device."""
    sesn = ViSession(sesn)
    rsrcName = ViRsrc(rsrcName)
    accessMode = ViAccessMode(accessMode)
    openTimout = ViUInt32(openTimeout)
    vi = ViSession()
    result = visa.viOpen(sesn, rsrcName, accessMode, openTimeout, byref(vi))
    return (result, vi.value)

def FindRsrc(session, expr):
    """Query a VISA system to locate the resources associated with a
    specified interface.

    This operation matches the value specified in the expr parameter
    with the resources available for a particular interface.

    In comparison to the original VISA function, this returns the
    complete list of found resources."""

    sesn = ViSession(session)
    expr = ViString(expr)
    findList = ViFindList()
    retcnt = ViUInt32()
    instrDesc = c_buffer('\000', 256)

    resource_list = []
    result = visa.viFindRsrc(sesn, expr, byref(findList), byref(retcnt), instrDesc)
    if result>=0:
        resource_list.append(instrDesc.value)
        for i in range(1, retcnt.value):
            visa.viFindNext(findList, instrDesc)
            resource_list.append(instrDesc.value)
        visa.viClose(findList)
    return resource_list

def ParseRsrc(sesn, rsrcName):
    """Parse a resource string to get the interface information."""
    intfType = ViUInt16()
    intfNum = ViUInt16()
    result = visa.viParseRsrc(
        ViSession(sesn),
        ViRsrc(rsrcName),
        byref(intfType),
        byref(intfNum))
    return (result, intfType.value, intfNum.value)

def ParseRsrcEx(sesn, rsrcName):
    """Parse a resource string to get extended interface information."""
    intfType = ViUInt16()
    intfNum = ViUInt16()
    rsrcClass = c_buffer(256)
    unaliasedExpandedRsrcName = c_buffer(256)
    aliasIfExists = c_buffer(256)
    result = visa.viParseRsrcEx(sesn, rsrcName, byref(intfType), byref(intfNum),
                                rsrcClass, unaliasedExpandedRsrcName, aliasIfExists)
    return (result, intfType.value, intfNum.value, rsrcClass.value,
            unaliasedExpandedRsrcName.value, aliasIfExists.value)


    
#VISA Resource Template

def Close(object):
    """Close the specified session, event, or find list.

    This operation closes a session, event, or a find list. In this
    process all the data structures that had been allocated for the
    specified vi are freed."""
    result = visa.viClose(ViObject(object))
    return result

def GetAttribute(vi, attribute):
    """Retrieve the state of an attribute. Argument can be numeric or
    string argument"""
    #convert argument to numeric (attr_value)
    if isinstance(attribute, types.StringTypes):
        attr_name, attr_info = attribute, attributes_s[attribute]
        attr_value = attr_info.attribute_value
    else:
        attr_value,  attr_info = attribute, attributes[attribute]
        attr_name = attr_info.attribute_name

    #call viGetAttribute, convert output to proper data type
    attr_type = attr_info.datatype
    if attr_type is ViString:
        attr_val = c_buffer(256)
        result = visa.viGetAttribute(vi, attr_value, attr_val)
    elif attr_type is ViBuf:
        attr_val = c_void_p()
        result = visa.viGetAttribute(vi, attr_value, byref(attr_val))
    else:
        attr_val = attr_type(attr_value)
        result = visa.viGetAttribute(vi, attr_value, byref(attr_val))
    val = attr_val.value

    #convert result to string
    if attr_info.values:
        value_ext = attr_info.values.tostring(val)
    else:
        if isinstance(val, (types.IntType, types.LongType)):
            value_ext = str(val) + " (%s)"%hex(val)
        else:
            value_ext = str(val)
    
    return (attr_name, val, value_ext)

def SetAttribute(vi, attribute, value):
    """Set attribute"""
    #convert attribute to numeric ('attr_value')
    if isinstance(attribute, types.StringTypes):
        attr_name, attr_info = attribute, attributes_s[attribute]
        attr_value = attr_info.attribute_value
    else:
        attr_value,  attr_info = attribute, attributes[attribute]
        attr_name = attr_info.attribute_name

    #convert value to numeric ('val'), when appropriate
    if isinstance(value, types.StringTypes):
        if attr_info.values: 
            val = attr_info.values.fromstring(value)
        else: #fallback, FIXME
            val = str(value)
            print 'conversion from string argument not possible'
    else:
        val = value

    cval = attr_info.datatype(val)
    result = visa.viSetAttribute(vi, attr_value, cval)

    return result

#Basic I/O

visa.viWrite.restype = CheckStatus
def Write(vi, buf):
    vi = ViSession(vi)
    buf = c_buffer(buf, len(buf))
    count = ViUInt32(len(buf))
    retCount = ViUInt32()
    log.debug("Write: %s", buf)
    result = visa.viWrite(vi, buf, count, byref(retCount))
    return retCount.value

visa.viRead.restype = CheckStatus
def Read(vi, count):
    vi = ViSession(vi)
    buf = create_string_buffer(count)
    count = ViUInt32(count)
    retCount = ViUInt32()
    result = visa.viRead(vi, buf, count, byref(retCount))
    log.debug("Read: buffer length %d at %s", sizeof(buf), hex(addressof(buf)))
    return (result, buf.raw[0:retCount.value])

visa.viWriteAsync.restype = CheckStatus
def WriteAsync(vi, buf):
    vi = ViSession(vi)
    buf = c_buffer(buf, len(buf))
    count = ViUInt32(len(buf))
    jobId = ViJobId()
    result = visa.viWriteAsync(vi, buf, count, byref(jobId))
    return jobId.value

visa.viReadAsync.restype = CheckStatus
def ReadAsync(vi, count):
    buf = create_string_buffer(count) #FIXME: buffer needs to survive garbage collection!
    jobId = ViJobId()
    log.debug("ReadAsync: buffer length %d at %s", sizeof(buf), hex(addressof(buf)))
    visa.viReadAsync(ViSession(vi),
                     buf,
                     ViUInt32(count),
                     byref(jobId))
    return jobId.value
    

#

visa.viGpibControlREN.restype = CheckStatus
def GpibControlREN(vi, mode):
    vi = ViSession(vi)
    mode = ViUInt16(mode)
    result = visa.viGpibControlREN(vi, mode)


#Event management

visa.viEnableEvent.restype = CheckStatus
def EnableEvent(vi, eventType, mechanism):
    visa.viEnableEvent(vi, eventType, mechanism, 0)

visa.viInstallHandler.restype = CheckStatus
visa.viInstallHandler.argtypes = [ViSession, ViEventType, ViHndlr, ViAddr]
#FIXME: dangerous ViAddr
def InstallHandler(vi, eventType, handler, userHandle):
    userHandle = ViAddr(userHandle)
    visa.viInstallHandler(vi, eventType, handler, userHandle)


#higher level classes and metho)ds

class ResourceManager:
    def __init__(self):
        result, self.session = OpenDefaultRM()

    def __del__(self):
        Close(self.session)

    def find_resource(self, expression):
        resource_list = FindRsrc(self.session, expression)
        return resource_list

    def parse_resource(self, resource_name):
        """give information about resource"""
        result, interface_type, interface_number, \
                rsrcClass, unaliasedExpandedRsrcName, \
                aliasIfExists = ParseRsrcEx(self.session, resource_name)
        return interface_type, interface_number, \
               rsrcClass, unaliasedExpandedRsrcName, \
               aliasIfExists

    def open(self, resourceName, exclusiveLock = None, loadConfig = None, openTimeout = 1000):
        accessMode = 0
        if exclusiveLock:
            accessMode = accessMode | VI_EXCLUSIVE_LOCK
        if loadConfig:
            accessMode = accessMode | VI_LOAD_CONFIG
        result, vi = Open(self.session, resourceName, accessMode, openTimeout)
        return Resource(vi)


class Resource:
    def __init__(self, vi):
        self.session = vi

    #def __del__(self): #shit happens
    #    self.close()

    def write(self, buf):
        return Write(self.session, buf)

    def read(self, maxcount = None):
        if maxcount:
            result, buf = Read(self.session, maxcount)
            return buf
        else:
            accumbuf = ''
            while 1:
                result, buf = Read(self.session, 1024)
                accumbuf = accumbuf + buf
                if result in (VI_SUCCESS, VI_SUCCESS_TERM_CHAR):
                    return accumbuf

    def getattr(self, attribute):
        attrvalue = GetAttribute(self.session, attribute)
        return attrvalue

    def setattr(self, attribute, value):
        SetAttribute(self.session, attribute, value)

                
    def setlocal(self):
        #VI_GPIB_REN_DEASSERT        = 0 
        #VI_GPIB_REN_ASSERT          = 1
        #VI_GPIB_REN_DEASSERT_GTL    = 2
        #VI_GPIB_REN_ASSERT_ADDRESS  = 3
        #VI_GPIB_REN_ASSERT_LLO      = 4
        #VI_GPIB_REN_ASSERT_ADDRESS_LLO = 5
        #VI_GPIB_REN_ADDRESS_GTL     = 6

        mode = 6
        #Test Marconi 2019: 0 local, 1 remote, 2 local, 4 local lockout, 5 local, 6 local
        GpibControlREN(self.session, mode)
        
    def close(self):
        Close(self.session)

    def install_handler(self, eventType, handler, user_handle):
        return InstallHandler(self.session,
                              eventType,
                              handler,
                              user_handle)

    def enable_event(self, eventType, mechanism):
        return EnableEvent(self.session, eventType, mechanism)
    
#_RM = ResourceManager() #liefert Exception in __del__ beim Beenden
#open = _RM.Open
#find_resource = _RM.FindResource

#for faster testing
def testvisa():

    logging.basicConfig(level = logging.DEBUG)
    #log.setLevel(logging.DEBUG) #FIXME
    
    #open ResourceManager
    RM = ResourceManager()

    #find resources
    resourcelist = RM.find_resource('ASRL?::INSTR')
    print resourcelist

    #get some information without opening
    result = RM.parse_resource('ASRL1::INSTR')
    print result

    #open
    device = RM.open('ASRL1::INSTR')

    #high level write
    device.write('*IDN?')

    #high level read
    #print device.read() #don't wan't to wait for timeout...

    #try getting all attributes
    for key in attributes_s.keys():
        try:
            print device.getattr(key)
        except IOError, value:
            print "Error retrieving Attribute ", key

    #set some attributes, try different combinations of numeric/string arguments
    device.setattr('VI_ATTR_ASRL_DATA_BITS', 7)
    print device.getattr(VI_ATTR_ASRL_DATA_BITS)

    #attribute value, string argument
    device.setattr(VI_ATTR_ASRL_STOP_BITS, 'VI_ASRL_STOP_TWO')
    print device.getattr('VI_ATTR_ASRL_STOP_BITS')
    print

    #bitfields
    device.setattr('VI_ATTR_ASRL_FLOW_CNTRL', VI_ASRL_FLOW_XON_XOFF | VI_ASRL_FLOW_RTS_CTS)
    print device.getattr('VI_ATTR_ASRL_FLOW_CNTRL')

    #bitfield with string argument
    device.setattr(VI_ATTR_ASRL_FLOW_CNTRL, 'VI_ASRL_FLOW_NONE')
    print device.getattr('VI_ATTR_ASRL_FLOW_CNTRL')
    
    #callback functions
    
    def event_handler(vi, event_type, context, userhandle):
    #def event_handler(vi, event_type, context): #FIX: should accept four arguments
        sys.stdout.flush()
        print 'in event handler'
        print 'vi: ', vi
        print 'event_type: ', event_type
        print 'context: ', context

        print GetAttribute(context, VI_ATTR_EVENT_TYPE)
        print GetAttribute(context, VI_ATTR_STATUS)
        print GetAttribute(context, VI_ATTR_JOB_ID)
        print GetAttribute(context, VI_ATTR_BUFFER)
        print GetAttribute(context, VI_ATTR_RET_COUNT)
        print GetAttribute(context, VI_ATTR_OPER_NAME)
        print 'userhandle: ', userhandle
        print 'leaving event handler'

        return VI_SUCCESS

    device.install_handler(VI_EVENT_IO_COMPLETION, ViHndlr(event_handler), 13)

    device.enable_event(VI_EVENT_IO_COMPLETION, VI_HNDLR)

    print 'session', device.session
    jobId = WriteAsync(device.session, 'going to nowhere city')
    print "jobId: ", jobId

    jobId = ReadAsync(device.session, 10)
    print "jobId: ", jobId
        
    device.close()

if __name__ == '__main__':
    testvisa()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.