##
# wmi - a lightweight Python wrapper around Microsoft's WMI interface
#
# Windows Management Instrumentation (WMI) is Microsoft's answer to
# the DMTF's Common Information Model. It allows you to query just
# about any conceivable piece of information from any computer which
# is running the necessary agent and over which have you the
# necessary authority.
#
# The implementation is by means of COM/DCOM and most of the examples
# assume you're running one of Microsoft's scripting technologies.
# Fortunately, Mark Hammond's pywin32 has pretty much all you need
# for a workable Python adaptation. I haven't tried any of the fancier
# stuff like Async calls and so on, so I don't know if they'd work.
#
# Since the COM implementation doesn't give much away to Python
# programmers, I've wrapped it in some lightweight classes with
# some getattr / setattr magic to ease the way. In particular:
#
# <ul>
# <li>
# The _wmi_namespace object itself will determine its classes
# and allow you to return all instances of any of them by
# using its name as an attribute. As an additional shortcut,
# you needn't specify the Win32_; if the first lookup fails
# it will try again with a Win32_ on the front:
#
# <pre class="code">
# disks = wmi.WMI ().Win32_LogicalDisk ()
# </pre>
#
# In addition, you can specify what would become the WHERE clause
# as keyword parameters:
#
# <pre class="code">
# fixed_disks = wmi.WMI ().Win32_LogicalDisk (DriveType = 3)
# </pre>
# </li>
#
# <li>
# The objects returned by a WMI lookup are wrapped in a Python
# class which determines their methods and classes and allows
# you to access them as though they were Python classes. The
# methods only allow named parameters.
#
# <pre class="code">
# for p in wmi.WMI ().Win32_Process ():
# if p.Name.lower () == 'notepad.exe':
# p.Terminate (Result=1)
# </pre>
# </li>
#
# <li>
# Doing a print on one of the WMI objects will result in its
# GetObjectText_ method being called, which usually produces
# a meaningful printout of current values.
# The repr of the object will include its full WMI path,
# which lets you get directly to it if you need to.
# </li>
#
# <li>
# You can get the associators and references of an object as
# a list of python objects by calling the associators () and
# references () methods on a WMI Python object.
# NB Don't do this on a Win32_ComputerSystem object; it will
# take all day and kill your machine!
#
# <pre class="code">
# for p in wmi.WMI ().Win32_Process ():
# if p.Name.lower () == 'notepad.exe':
# for r in p.references ():
# print r.Name
# </pre>
# </li>
#
# <li>
# WMI classes (as opposed to instances) are first-class
# objects, so you can get hold of a class, and call
# its methods or set up a watch against it.
#
# <pre class="code">
# process = wmi.WMI ().Win32_Process
# process.Create (CommandLine="notepad.exe")
# </pre>
#
# </li>
#
# <li>
# To make it easier to use in embedded systems and py2exe-style
# executable wrappers, the module will not force early Dispatch.
# To do this, it uses a handy hack by Thomas Heller for easy access
# to constants.
# </li>
#
# <li>
# Typical usage will be:
#
# <pre class="code">
# import wmi
#
# vodev1 = wmi.WMI ("vodev1")
# for disk in vodev1.Win32_LogicalDisk ():
# if disk.DriveType == 3:
# space = 100 * long (disk.FreeSpace) / long (disk.Size)
# print "%s has %d%% free" % (disk.Name, space)
# </pre>
# </li>
#
# </ul>
#
# Many thanks, obviously to Mark Hammond for creating the win32all
# extensions, but also to Alex Martelli and Roger Upole, whose
# c.l.py postings pointed me in the right direction.
# Thanks especially in release 1.2 to Paul Tiemann for his code
# contributions and robust testing.
#
# (c) Tim Golden - mail at timgolden.me.uk 5th June 2003
# Licensed under the (GPL-compatible) MIT License:
# http://www.opensource.org/licenses/mit-license.php
#
# For change history see CHANGELOG.TXT
##
try:
True, False
except NameError:
True = 1
False = 0
try:
object
except NameError:
class object: pass
__VERSION__ = "1.3.2"
_DEBUG = False
import sys
import re
import datetime
from win32com.client import GetObject,Dispatch
import pywintypes
class ProvideConstants (object):
"""
A class which, when called on a win32com.client.Dispatch object,
provides lazy access to constants defined in the typelib.
They can be accessed as attributes of the _constants property.
From Thomas Heller on c.l.py
"""
def __init__(self, comobj):
"@param comobj A COM object whose typelib constants are to be exposed"
comobj.__dict__["_constants"] = self
# Get the typelibrary's typecomp interface
self.__typecomp = \
comobj._oleobj_.GetTypeInfo().GetContainingTypeLib()[0].GetTypeComp()
def __getattr__(self, name):
if name.startswith("__") and name.endswith("__"):
raise AttributeError, name
result = self.__typecomp.Bind(name)
# Bind returns a 2-tuple, first item is TYPEKIND,
# the second item has the value
if not result[0]:
raise AttributeError, name
return result[1].value
obj = GetObject ("winmgmts:")
ProvideConstants (obj)
wbemErrInvalidQuery = obj._constants.wbemErrInvalidQuery
wbemErrTimedout = obj._constants.wbemErrTimedout
wbemFlagReturnImmediately = obj._constants.wbemFlagReturnImmediately
wbemFlagForwardOnly = obj._constants.wbemFlagForwardOnly
def handle_com_error (error_info):
"""Convenience wrapper for displaying all manner of COM errors.
Raises a x_wmi exception with more useful information attached
@param error_info The structure attached to a pywintypes.com_error
"""
hresult_code, hresult_name, additional_info, parameter_in_error = error_info
exception_string = ["%s - %s" % (hex (hresult_code), hresult_name)]
if additional_info:
wcode, source_of_error, error_description, whlp_file, whlp_context, scode = additional_info
exception_string.append (" Error in: %s" % source_of_error)
exception_string.append (" %s - %s" % (hex (scode), (error_description or "").strip ()))
raise x_wmi, "\n".join (exception_string)
BASE = datetime.datetime (1601, 1, 1)
def from_1601 (ns100):
return BASE + datetime.timedelta (microseconds=int (ns100) / 10)
def from_time (year=None, month=None, day=None, hours=None, minutes=None, seconds=None, microseconds=None, timezone=None):
"""
Convenience wrapper to take a series of date/time elements and return a WMI time
of the form yyyymmddHHMMSS.mmmmmm+UUU. All elements may be int, string or
omitted altogether. If omitted, they will be replaced in the output string
by a series of stars of the appropriate length.
@param year The year element of the date/time
@param month The month element of the date/time
@param day The day element of the date/time
@param hours The hours element of the date/time
@param minutes The minutes element of the date/time
@param seconds The seconds element of the date/time
@param microseconds The microseconds element of the date/time
@param timezone The timeezone element of the date/time
@return A WMI datetime string of the form: yyyymmddHHMMSS.mmmmmm+UUU
"""
def str_or_stars (i, length):
if i is None:
return "*" * length
else:
return str (i).rjust (length, "0")
wmi_time = ""
wmi_time += str_or_stars (year, 4)
wmi_time += str_or_stars (month, 2)
wmi_time += str_or_stars (day, 2)
wmi_time += str_or_stars (hours, 2)
wmi_time += str_or_stars (minutes, 2)
wmi_time += str_or_stars (seconds, 2)
wmi_time += "."
wmi_time += str_or_stars (microseconds, 6)
wmi_time += str_or_stars (timezone, 4)
return wmi_time
def to_time (wmi_time):
"""
Convenience wrapper to take a WMI datetime string of the form
yyyymmddHHMMSS.mmmmmm+UUU and return a 9-tuple containing the
individual elements, or None where string contains placeholder
stars.
@param wmi_time The WMI datetime string in yyyymmddHHMMSS.mmmmmm+UUU format
@return A 9-tuple of (year, month, day, hours, minutes, seconds, microseconds, timezone)
"""
def int_or_none (s, start, end):
try:
return int (s[start:end])
except ValueError:
return None
year = int_or_none (wmi_time, 0, 4)
month = int_or_none (wmi_time, 4, 6)
day = int_or_none (wmi_time, 6, 8)
hours = int_or_none (wmi_time, 8, 10)
minutes = int_or_none (wmi_time, 10, 12)
seconds = int_or_none (wmi_time, 12, 14)
microseconds = int_or_none (wmi_time, 15, 21)
timezone = wmi_time[21:]
return year, month, day, hours, minutes, seconds, microseconds, timezone
#
# Exceptions
#
class x_wmi (Exception):
pass
class x_wmi_invalid_query (x_wmi):
pass
class x_wmi_timed_out (x_wmi):
pass
class x_wmi_no_namespace (x_wmi):
pass
WMI_EXCEPTIONS = {
wbemErrInvalidQuery : x_wmi_invalid_query,
wbemErrTimedout : x_wmi_timed_out
}
def _set (obj, attribute, value):
"""
Helper function to add an attribute directly into the instance
dictionary, bypassing possible __getattr__ calls
@param obj Any python object
@param attribute String containing attribute name
@param value Any python object
"""
obj.__dict__[attribute] = value
class _wmi_method:
"""
A currying sort of wrapper around a WMI method name. It
abstract's the method's parameters and can be called like
a normal Python object passing in the parameter values.
Output parameters are returned from the call as a tuple.
In addition, the docstring is set up as the method's
signature, including an indication as to whether any
given parameter is expecting an array, and what
special privileges are required to call the method.
"""
def __init__ (self, ole_object, method_name):
"""
@param ole_object The WMI class/instance whose method is to be called
@param method_name The name of the method to be called
"""
try:
self.ole_object = Dispatch (ole_object)
self.method = ole_object.Methods_ (method_name)
self.qualifiers = {}
for q in self.method.Qualifiers_:
self.qualifiers[q.Name] = q.Value
self.provenance = "\n".join (self.qualifiers.get ("MappingStrings", []))
self.in_parameters = self.method.InParameters
self.out_parameters = self.method.OutParameters
if self.in_parameters is None:
self.in_parameter_names = []
else:
self.in_parameter_names = [(i.Name, i.IsArray) for i in self.in_parameters.Properties_]
if self.out_parameters is None:
self.out_parameter_names = []
else:
self.out_parameter_names = [(i.Name, i.IsArray) for i in self.out_parameters.Properties_]
doc = "%s (%s) => (%s)" % (
method_name,
", ".join ([name + ("", "[]")[is_array] for (name, is_array) in self.in_parameter_names]),
", ".join ([name + ("", "[]")[is_array] for (name, is_array) in self.out_parameter_names])
)
privileges = self.qualifiers.get ("Privileges", [])
if privileges:
doc += " | Needs: " + ", ".join (privileges)
self.__doc__ = doc
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def __call__ (self, *args, **kwargs):
"""
Execute the call to a WMI method, returning
a tuple (even if is of only one value) containing
the out and return parameters.
"""
try:
if self.in_parameters:
parameter_names = {}
for name, is_array in self.in_parameter_names:
parameter_names[name] = is_array
parameters = self.in_parameters
#
# Check positional parameters first
#
for n_arg in range (len (args)):
arg = args[n_arg]
parameter = parameters.Properties_[n_arg]
if parameter.IsArray:
try: list (arg)
except TypeError: raise TypeError, "parameter %d must be iterable" % n_arg
parameter.Value = arg
#
# If any keyword param supersedes a positional one,
# it'll simply overwrite it.
#
for k, v in kwargs.items ():
is_array = parameter_names.get (k)
if is_array is None:
raise AttributeError, "%s is not a valid parameter for %s" % (k, self.__doc__)
else:
if is_array:
try: list (v)
except TypeError: raise TypeError, "%s must be iterable" % k
parameters.Properties_ (k).Value = v
result = self.ole_object.ExecMethod_ (self.method.Name, self.in_parameters)
else:
result = self.ole_object.ExecMethod_ (self.method.Name)
results = []
for name, is_array in self.out_parameter_names:
value = result.Properties_ (name).Value
if is_array:
#
# Thanks to Jonas Bjering for bug report and patch
#
results.append (list (value or []))
else:
results.append (value)
return tuple (results)
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def __repr__ (self):
return "<function %s>" % self.__doc__
#
# class _wmi_object
#
class _wmi_object:
"A lightweight wrapper round an OLE WMI object"
def __init__ (self, ole_object, instance_of=None, fields=[], property_map={}):
try:
_set (self, "ole_object", ole_object)
_set (self, "_instance_of", instance_of)
_set (self, "properties", {})
_set (self, "methods", {})
_set (self, "property_map", property_map)
if fields:
for field in fields:
self.properties[field] = None
else:
for p in ole_object.Properties_:
self.properties[p.Name] = None
for m in ole_object.Methods_:
self.methods[m.Name] = None
_set (self, "_properties", self.properties.keys ())
_set (self, "_methods", self.methods.keys ())
_set (self, "qualifiers", {})
for q in self.ole_object.Qualifiers_:
self.qualifiers[q.Name] = q.Value
_set (self, "is_association", self.qualifiers.has_key ("Association"))
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def __str__ (self):
"""
For a call to print [object] return the OLE description
of the properties / values of the object
"""
try:
return self.ole_object.GetObjectText_ ()
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def __repr__ (self):
"""
Indicate both the fact that this is a wrapped WMI object
and the WMI object's own identifying class.
"""
try:
return "<%s: %s>" % (self.__class__.__name__, str (self.Path_.Path))
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def _cached_properties (self, attribute):
if self.properties[attribute] is None:
self.properties[attribute] = self.ole_object.Properties_ (attribute)
return self.properties[attribute]
def _cached_methods (self, attribute):
if self.methods[attribute] is None:
self.methods[attribute] = _wmi_method (self.ole_object, attribute)
return self.methods[attribute]
def __getattr__ (self, attribute):
"""
Attempt to pass attribute calls to the proxied COM object.
If the attribute is recognised as a property, return its value;
if it is recognised as a method, return a method wrapper which
can then be called with parameters; otherwise pass the lookup
on to the underlying object.
"""
try:
if self.properties.has_key (attribute):
factory = self.property_map.get (attribute, lambda x: x)
value = factory (self._cached_properties (attribute).Value)
#
# If this is an association, its properties are
# actually the paths to the two aspects of the
# association, so translate them automatically
# into WMI objects.
#
if self.is_association:
return WMI (moniker=value)
else:
return value
elif self.methods.has_key (attribute):
return self._cached_methods (attribute)
else:
return getattr (self.ole_object, attribute)
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def __setattr__ (self, attribute, value):
"""
If the attribute to be set is valid for the proxied
COM object, set that objects's parameter value; if not,
raise an exception.
"""
try:
if self.properties.has_key (attribute):
self._cached_properties (attribute).Value = value
if self.ole_object.Path_.Path:
self.ole_object.Put_ ()
else:
raise AttributeError, attribute
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def __eq__ (self, other):
"""
Use WMI's CompareTo_ to compare this object with
another. Don't try to do anything if the other
object is not a wmi object. It might be possible
to compare this object's unique key with a string
or something, but this doesn't seem to be universal
enough to merit a special case.
"""
if isinstance (other, self.__class__):
return self.ole_object.CompareTo_ (other.ole_object)
else:
raise x_wmi, "Can't compare a WMI object with something else"
def _getAttributeNames (self):
"""Return list of methods/properties for IPython completion"""
attribs = [str (x) for x in self.methods.keys ()]
attribs.extend ([str (x) for x in self.properties.keys ()])
return attribs
def put (self):
self.ole_object.Put_ ()
def set (self, **kwargs):
"""
Set several properties of the underlying object
at one go. This is particularly useful in combination
with the new () method below. However, an instance
which has been spawned in this way won't have enough
information to write pack, so only try if the
instance has a path.
"""
if kwargs:
try:
for attribute, value in kwargs.items ():
if self.properties.has_key (attribute):
self._cached_properties (attribute).Value = value
else:
raise AttributeError, attribute
#
# Only try to write the attributes
# back if the object exists.
#
if self.ole_object.Path_.Path:
self.ole_object.Put_ ()
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def path (self):
"""
Return the WMI URI to this object. Can be used to
determine the path relative to the parent namespace. eg,
<pre class="code">
pp0 = wmi.WMI ().Win32_ParallelPort ()[0]
print pp0.path ().RelPath
</pre>
"""
try:
return self.ole_object.Path_
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def derivation (self):
"""Return a tuple representing the object derivation for
this object, with the most specific object first. eg,
pp0 = wmi.WMI ().Win32_ParallelPort ()[0]
print ' <- '.join (pp0.derivation ())
"""
try:
return self.ole_object.Derivation_
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def associators (self, wmi_association_class="", wmi_result_class=""):
"""Return a list of objects related to this one, optionally limited
either by association class (ie the name of the class which relates
them) or by result class (ie the name of the class which would be
retrieved)
<pre class="code">
c = wmi.WMI ()
pp = c.Win32_ParallelPort ()[0]
for i in pp.associators (wmi_association_class="Win32_PortResource"):
print i
for i in pp.associators (wmi_result_class="Win32_PnPEntity"):
print i
</pre>
"""
try:
return [
_wmi_object (i) for i in \
self.ole_object.Associators_ (
strAssocClass=wmi_association_class,
strResultClass=wmi_result_class
)
]
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def references (self, wmi_class=""):
"""Return a list of associations involving this object, optionally
limited by the result class (the name of the association class).
NB Associations are treated specially; although WMI only returns
the string corresponding to the instance of each associated object,
this module will automatically convert that to the object itself.
<pre class="code">
c = wmi.WMI ()
sp = c.Win32_SerialPort ()[0]
for i in sp.references ():
print i
for i in sp.references (wmi_class="Win32_SerialPortSetting"):
print i
</pre>
"""
try:
return [_wmi_object (i) for i in self.ole_object.References_ (strResultClass=wmi_class)]
except pywintypes.com_error, error_info:
handle_com_error (error_info)
#
# class _wmi_event
#
class _wmi_event (_wmi_object):
"""Slight extension of the _wmi_object class to allow
objects which are the result of events firing to return
extra information such as the type of event.
"""
event_type_re = re.compile ("__Instance(Creation|Modification|Deletion)Event")
def __init__ (self, event, event_info):
_wmi_object.__init__ (self, event)
_set (self, "event_type", None)
_set (self, "timestamp", None)
_set (self, "previous", None)
if event_info:
event_type = self.event_type_re.match (event_info.Path_.Class).group (1).lower ()
_set (self, "event_type", event_type)
if hasattr (event_info, "TIME_CREATED"):
_set (self, "timestamp", from_1601 (event_info.TIME_CREATED))
if hasattr (event_info, "PreviousInstance"):
_set (self, "previous", event_info.PreviousInstance)
#
# class _wmi_class
#
class _wmi_class (_wmi_object):
"""Currying class to assist in issuing queries against
a WMI namespace. The idea is that when someone issues
an otherwise unknown method against the WMI object, if
it matches a known WMI class a query object will be
returned which may then be called with one or more params
which will form the WHERE clause. eg,
<pre class="code">
c = wmi.WMI ()
c_drive = c.Win32_LogicalDisk (Name='C:')
</pre>
"""
def __init__ (self, namespace, wmi_class):
_wmi_object.__init__ (self, wmi_class)
_set (self, "_class_name", wmi_class.Path_.Class)
if namespace:
_set (self, "_namespace", namespace)
else:
class_moniker = wmi_class.Path_.DisplayName
winmgmts, namespace_moniker, class_name = class_moniker.split (":")
namespace = _wmi_namespace (GetObject (winmgmts + ":" + namespace_moniker), False)
_set (self, "_namespace", namespace)
def query (self, fields=[], **where_clause):
"""Make it slightly easier to query against the class,
by calling the namespace's query with the class preset.
Won't work if the class has been instantiated directly.
"""
if self._namespace is None:
raise x_wmi_no_namespace, "You cannot query directly from a WMI class"
try:
field_list = ", ".join (fields) or "*"
wql = "SELECT " + field_list + " FROM " + self._class_name
if where_clause:
wql += " WHERE " + " AND ". join (["%s = '%s'" % (k, v) for k, v in where_clause.items ()])
return self._namespace.query (wql, self, fields)
except pywintypes.com_error, error_info:
handle_com_error (error_info)
__call__ = query
def watch_for (
self,
notification_type="operation",
delay_secs=1,
**where_clause
):
if self._namespace is None:
raise x_wmi_no_namespace, "You cannot watch directly from a WMI class"
return self._namespace.watch_for (
notification_type=notification_type,
wmi_class=self,
delay_secs=delay_secs,
**where_clause
)
def instances (self):
"""Return a list of instances of the WMI class
"""
try:
return [_wmi_object (instance, self) for instance in self.Instances_ ()]
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def new (self, **kwargs):
"""This is the equivalent to the raw-WMI SpawnInstance_
method. Note that there are relatively few uses for
this, certainly fewer than you might imagine. Most
classes which need to create a new *real* instance
of themselves, eg Win32_Process, offer a .Create
method. SpawnInstance_ is generally reserved for
instances which are passed as parameters to such
.Create methods, a common example being the
Win32_SecurityDescriptor, passed to Win32_Share.Create
and other instances which need security.
The example here is Win32_ProcessStartup, which
controls the shown/hidden state etc. of a new
Win32_Process instance.
<pre class="code">
import win32con
import wmi
c = wmi.WMI ()
startup = c.Win32_ProcessStartup.new (ShowWindow=win32con.SW_SHOWMINIMIZED)
pid, retval = c.Win32_Process.Create (
CommandLine="notepad.exe",
ProcessStartupInformation=startup
)
</pre>
NB previous versions of this module, used this function
to create new process. This is *not* a good example
of its use; it is better handled with something like
the example above.
"""
try:
obj = _wmi_object (self.SpawnInstance_ (), self)
obj.set (**kwargs)
return obj
except pywintypes.com_error, error_info:
handle_com_error (error_info)
#
# class _wmi_result
#
class _wmi_result:
"""Simple, data only result for targeted WMI queries which request
data only result classes via fetch_as_classes.
"""
def __init__(self, obj, attributes):
if attributes:
for attr in attributes:
self.__dict__[attr] = obj.Properties_ (attr).Value
else:
for p in obj.Properties_:
attr = p.Name
self.__dict__[attr] = obj.Properties_(attr).Value
#
# class WMI
#
class _wmi_namespace:
"""A WMI root of a computer system. The classes attribute holds a list
of the classes on offer. This means you can explore a bit with
things like this:
<pre class="code">
c = wmi.WMI ()
for i in c.classes:
if "user" in i.lower ():
print i
</pre>
"""
def __init__ (self, namespace, find_classes):
_set (self, "_namespace", namespace)
#
# wmi attribute preserved for backwards compatibility
#
_set (self, "wmi", namespace)
# Initialise the "classes" attribute, to avoid infinite recursion in the
# __getattr__ method (which uses it).
self.classes = {}
#
# Pick up the list of classes under this namespace
# so that they can be queried, and used as though
# properties of the namespace by means of the __getattr__
# hook below.
# If the namespace does not support SubclassesOf, carry on
# regardless
#
if find_classes:
try:
self.classes.update (self.subclasses_of ())
except AttributeError:
pass
def __repr__ (self):
return "<_wmi_namespace: %s>" % self.wmi
def __str__ (self):
return repr (self)
def get (self, moniker):
try:
return _wmi_object (self.wmi.Get (moniker))
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def handle (self):
"""The raw OLE object representing the WMI namespace"""
return self._namespace
def subclasses_of (self, root="", regex=r".*"):
classes = {}
for c in self._namespace.SubclassesOf (root):
klass = c.Path_.Class
if re.match (regex, klass):
classes[klass] = None
return classes
def instances (self, class_name):
"""Return a list of instances of the WMI class. This is
(probably) equivalent to querying with no qualifiers.
<pre class="code">
system.instances ("Win32_LogicalDisk")
# should be the same as
system.Win32_LogicalDisk ()
</pre>
"""
try:
return [_wmi_object (obj) for obj in self._namespace.InstancesOf (class_name)]
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def new (self, wmi_class, **kwargs):
"""This is now implemented by a call to _wmi_namespace.new (qv)"""
return getattr (self, wmi_class).new (**kwargs)
new_instance_of = new
def _raw_query (self, wql):
"""Execute a WQL query and return its raw results. Use the flags
recommended by Microsoft to achieve a read-only, semi-synchronous
query where the time is taken while looping through. Should really
be a generator, but ...
NB Backslashes need to be doubled up.
"""
flags = wbemFlagReturnImmediately | wbemFlagForwardOnly
wql = wql.replace ("\\", "\\\\")
if _DEBUG: print "_raw_query(wql):", wql
try:
return self._namespace.ExecQuery (strQuery=wql, iFlags=flags)
except pywintypes.com_error, (hresult, hresult_text, additional, param_in_error):
raise WMI_EXCEPTIONS.get (hresult, x_wmi (hresult))
def query (self, wql, instance_of=None, fields=[]):
"""Perform an arbitrary query against a WMI object, and return
a list of _wmi_object representations of the results.
"""
return [ _wmi_object (obj, instance_of, fields) for obj in self._raw_query(wql) ]
def fetch_as_classes (self, wmi_classname, fields=(), **where_clause):
"""Build and execute a wql query to fetch the specified list of fields from
the specified wmi_classname + where_clause, then return the results as
a list of simple class instances with attributes matching fields_list.
If fields is left empty, select * and pre-load all class attributes for
each class returned.
"""
wql = "SELECT %s FROM %s" % (fields and ", ".join (fields) or "*", wmi_classname)
if where_clause:
wql += " WHERE " + " AND ".join (["%s = '%s'" % (k, v) for k, v in where_clause.items()])
return [_wmi_result (obj, fields) for obj in self._raw_query(wql)]
def fetch_as_lists (self, wmi_classname, fields, **where_clause):
"""Build and execute a wql query to fetch the specified list of fields from
the specified wmi_classname + where_clause, then return the results as
a list of lists whose values correspond fields_list.
"""
wql = "SELECT %s FROM %s" % (", ".join (fields), wmi_classname)
if where_clause:
wql += " WHERE " + " AND ".join (["%s = '%s'" % (k, v) for k, v in where_clause.items()])
results = []
for obj in self._raw_query(wql):
results.append ([obj.Properties_ (field).Value for field in fields])
return results
def watch_for (
self,
raw_wql=None,
notification_type="operation",
wmi_class=None,
delay_secs=1,
**where_clause
):
"""Set up an event tracker on a WMI event. This function
returns an wmi_watcher which can be called to get the
next event. eg,
<pre class="code">
c = wmi.WMI ()
raw_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA 'Win32_Process'"
watcher = c.watch_for (raw_wql=raw_wql)
while 1:
process_created = watcher ()
print process_created.Name
# or
watcher = c.watch_for (
notification_type="Creation",
wmi_class="Win32_Process",
delay_secs=2,
Name='calc.exe'
)
calc_created = watcher ()
</pre>
Now supports timeout on the call to watcher, eg:
<pre class="code">
import pythoncom
import wmi
c = wmi.WMI (privileges=["Security"])
watcher1 = c.watch_for (
notification_type="Creation",
wmi_class="Win32_NTLogEvent",
Type="error"
)
watcher2 = c.watch_for (
notification_type="Creation",
wmi_class="Win32_NTLogEvent",
Type="warning"
)
while 1:
try:
error_log = watcher1 (500)
except wmi.x_wmi_timed_out:
pythoncom.PumpWaitingMessages ()
else:
print error_log
try:
warning_log = watcher2 (500)
except wmi.x_wmi_timed_out:
pythoncom.PumpWaitingMessages ()
else:
print warning_log
</pre>
"""
if isinstance (wmi_class, _wmi_class):
class_name = wmi_class._class_name
else:
class_name = wmi_class
wmi_class = getattr (self, class_name)
is_extrinsic = "__ExtrinsicEvent" in wmi_class.derivation ()
if raw_wql:
wql = raw_wql
else:
if is_extrinsic:
if where_clause:
where = " WHERE " + " AND ".join (["%s = '%s'" % (k, v) for k, v in where_clause.items ()])
else:
where = ""
wql = "SELECT * FROM " + class_name + where
else:
if where_clause:
where = " AND " + " AND ".join (["TargetInstance.%s = '%s'" % (k, v) for k, v in where_clause.items ()])
else:
where = ""
wql = \
"SELECT * FROM __Instance%sEvent WITHIN %d WHERE TargetInstance ISA '%s' %s" % \
(notification_type, delay_secs, class_name, where)
if _DEBUG: print wql
try:
return _wmi_watcher (self._namespace.ExecNotificationQuery (wql), is_extrinsic=is_extrinsic)
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def __getattr__ (self, attribute):
"""Offer WMI classes as simple attributes. Pass through any untrapped
unattribute to the underlying OLE object. This means that new or
unmapped functionality is still available to the module user.
"""
#
# Don't try to match against known classes as was previously
# done since the list may not have been requested
# (find_classes=False).
#
try:
return self._cached_classes (attribute)
except pywintypes.com_error, error_info:
try:
return self._cached_classes ("Win32_" + attribute)
except pywintypes.com_error, error_info:
return getattr (self._namespace, attribute)
def _cached_classes (self, class_name):
"""Standard caching helper which keeps track of classes
already retrieved by name and returns the existing object
if found. If this is the first retrieval, store it and
pass it back
"""
if self.classes.get (class_name) is None:
self.classes[class_name] = _wmi_class (self, self._namespace.Get (class_name))
return self.classes[class_name]
def _getAttributeNames (self):
"""Return list of classes for IPython completion engine"""
classes = [str (x) for x in self.classes.keys () if not x.startswith ('__')]
return classes
#
# class _wmi_watcher
#
class _wmi_watcher:
"""Helper class for WMI.watch_for below (qv)"""
_event_property_map = {
"TargetInstance" : _wmi_object,
"PreviousInstance" : _wmi_object
}
def __init__ (self, wmi_event, is_extrinsic):
self.wmi_event = wmi_event
self.is_extrinsic = is_extrinsic
def __call__ (self, timeout_ms=-1):
"""When called, return the instance which caused the event. Supports
timeout in milliseconds (defaulting to infinite). If the watcher
times out, x_wmi_timed_out is raised. This makes it easy to support
watching for multiple objects.
"""
try:
event = self.wmi_event.NextEvent (timeout_ms)
if self.is_extrinsic:
return _wmi_event (event, None)
else:
return _wmi_event (
event.Properties_ ("TargetInstance").Value,
_wmi_object (event, property_map=self._event_property_map)
)
except pywintypes.com_error, error_info:
hresult_code, hresult_name, additional_info, parameter_in_error = error_info
if additional_info:
wcode, source_of_error, error_description, whlp_file, whlp_context, scode = additional_info
if scode == wbemErrTimedout:
raise x_wmi_timed_out
handle_com_error (error_info)
PROTOCOL = "winmgmts:"
IMPERSONATION_LEVEL = "impersonate"
AUTHENTICATION_LEVEL = "default"
NAMESPACE = "root/cimv2"
def connect (
computer=".",
impersonation_level="",
authentication_level="",
authority="",
privileges="",
moniker="",
wmi=None,
namespace="",
suffix="",
user="",
password="",
find_classes=True,
debug=False
):
"""The WMI constructor can either take a ready-made moniker or as many
parts of one as are necessary. Eg,
<pre class="code">
c = wmi.WMI (moniker="winmgmts:{impersonationLevel=Delegate}//remote")
# or
c = wmi.WMI (computer="remote", privileges=["!RemoteShutdown", "Security"])
</pre>
I daren't link to a Microsoft URL; they change so often. Try Googling for
WMI construct moniker and see what it comes back with.
For complete control, a named argument "wmi" can be supplied, which
should be a SWbemServices object, which you create yourself. Eg,
<pre class="code">
loc = win32com.client.Dispatch("WbemScripting.SWbemLocator")
svc = loc.ConnectServer(...)
c = wmi.WMI(wmi=svc)
</pre>
This is the only way of connecting to a remote computer with a different
username, as the moniker syntax does not allow specification of a user
name.
If the "wmi" parameter is supplied, all other parameters are ignored.
"""
global _DEBUG
_DEBUG = debug
#
# If namespace is a blank string, leave
# it unaltered as it might to trying to
# access the root namespace
#
#if namespace is None:
# namespace = NAMESPACE
try:
if wmi:
obj = wmi
elif moniker:
if not moniker.startswith (PROTOCOL):
moniker = PROTOCOL + moniker
if _DEBUG: print moniker
obj = GetObject (moniker)
else:
if user:
if impersonation_level or authentication_level or privileges or suffix:
raise x_wmi, "You can't specify an impersonation, authentication or privilege as well as a username"
else:
obj = connect_server (
server=computer,
namespace=namespace,
user=user,
password=password,
authority=authority
)
else:
moniker = construct_moniker (
computer=computer,
impersonation_level=impersonation_level or IMPERSONATION_LEVEL,
authentication_level=authentication_level or AUTHENTICATION_LEVEL,
authority=authority,
privileges=privileges,
namespace=namespace,
suffix=suffix
)
if _DEBUG: print moniker
obj = GetObject (moniker)
wmi_type = get_wmi_type (obj)
if wmi_type == "namespace":
return _wmi_namespace (obj, find_classes)
elif wmi_type == "class":
return _wmi_class (None, obj)
elif wmi_type == "instance":
return _wmi_object (obj)
else:
raise x_wmi, "Unknown moniker type"
except pywintypes.com_error, error_info:
handle_com_error (error_info)
WMI = connect
def construct_moniker (
computer=None,
impersonation_level="Impersonate",
authentication_level="Default",
authority=None,
privileges=None,
namespace=None,
suffix=None
):
security = []
if impersonation_level: security.append ("impersonationLevel=%s" % impersonation_level)
if authentication_level: security.append ("authenticationLevel=%s" % authentication_level)
#
# Use of the authority descriptor is invalid on the local machine
#
if authority and computer: security.append ("authority=%s" % authority)
if privileges: security.append ("(%s)" % ", ".join (privileges))
moniker = [PROTOCOL]
if security: moniker.append ("{%s}/" % ",".join (security))
if computer: moniker.append ("/%s/" % computer)
if namespace:
parts = re.split (r"[/\\]", namespace)
if parts[0] != 'root':
parts.insert (0, "root")
moniker.append ("/".join (parts))
if suffix: moniker.append (":%s" % suffix)
return "".join (moniker)
def get_wmi_type (obj):
try:
path = obj.Path_
except AttributeError:
return "namespace"
else:
if path.IsClass:
return "class"
else:
return "instance"
def connect_server (
server,
namespace = "",
user = "",
password = "",
locale = "",
authority = "",
security_flags = 0,
named_value_set = None
):
"""Return a remote server running WMI
server - name of the server
namespace - namespace to connect to: defaults to whatever's defined as default
user - username to connect as, either local or domain (dom\name or user@domain for XP)
password: leave blank to use current context
locale: desired locale in form MS_XXXX (eg MS_409 for Am En)
authority: either "Kerberos:" or an NT domain. Not needed if included in user
security_flags: if 0, connect will wait forever; if 0x80, connect will timeout at 2 mins
named_value_set: typically empty, otherwise a context-specific SWbemNamedValueSet
<pre class="code">
c = wmi.WMI (wmi=wmi.connect_server (server="remote_machine", user="myname", password="mypassword"))
</pre>
"""
if _DEBUG:
print server
print namespace
print user
print password
print locale
print authority
print security_flags
print named_value_set
return Dispatch ("WbemScripting.SWbemLocator").\
ConnectServer (
server,
namespace,
user,
password,
locale,
authority,
security_flags,
named_value_set
)
def Registry (
computer=None,
impersonation_level="Impersonate",
authentication_level="Default",
authority=None,
privileges=None,
moniker=None
):
if not moniker:
moniker = construct_moniker (
computer=computer,
impersonation_level=impersonation_level,
authentication_level=authentication_level,
authority=authority,
privileges=privileges,
namespace="default",
suffix="StdRegProv"
)
try:
return _wmi_object (GetObject (moniker))
except pywintypes.com_error, error_info:
handle_com_error (error_info)
#
# From a post to python-win32 by Sean
#
def machines_in_domain (domain_name):
adsi = Dispatch ("ADsNameSpaces")
nt = adsi.GetObject ("","WinNT:")
result = nt.OpenDSObject ("WinNT://%s" % domain_name, "", "", 0)
result.Filter = ["computer"]
domain = []
for machine in result:
domain.append (machine.Name)
return domain
#
# Typical use test
#
if __name__ == '__main__':
system = WMI ()
for my_computer in system.Win32_ComputerSystem ():
print "Disks on", my_computer.Name
for disk in system.Win32_LogicalDisk ():
print disk.Caption, disk.Description, disk.ProviderName or ""
|