# Copyright (C) 2003 Konstantin Korikov
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""Read and write accounts in xml files."""
from chestnut_dialer.account_set import Account,AccountSet,NoAccountException
from chestnut_dialer import compare_versions
from chestnut_dialer import _
from chestnut_dialer import debug_msg
from chestnut_dialer.sig import SignalAdaptor
def xstr_decode(s):
return unicode(s, "UTF-8")
def xstr_encode(s):
return s.encode("UTF-8")
class XmlAccount(Account):
_xml_node = None
def __init__(self, xml_node, changed_callback = None):
Account.__init__(self)
self._xml_node = xml_node
if changed_callback != None:
self.attr_changed_signal.append(
SignalAdaptor(changed_callback, lambda n,v: ()))
def _get_acc_attr(self, name):
node = self._xml_node
xname = name.replace('_', '-')
atype = self.get_attr_type(name)
if atype == 'list':
if xname[-1] == "s": xname = xname[:-1]
xcont = xname + "-list"
if node.xpathEval(xcont):
return tuple(map(lambda(n):xstr_decode(n.content),
node.xpathEval(xcont + "/" + xname)))
return None
if atype == 'text':
for n in node.xpathEval(xname):
return xstr_decode(n.content)
return None
v = node.prop(xname)
if v == None:
return None
if atype in ('integer', 'boolean'):
try: return int(v)
except ValueError: return 0
return xstr_decode(v)
def _set_acc_attr(self, name, value):
node = self._xml_node
xname = name.replace("_", "-")
atype = self.get_attr_type(name)
if atype == 'list':
if xname[-1] == "s": xname = xname[:-1]
xcont = xname + "-list"
if value == None:
for n in node.xpathEval(xcont):
n.unlinkNode()
n.freeNode()
else:
xres = node.xpathEval(xcont)
if not len(xres):
n = node.newChild(None, xcont, None)
else: n = xres[0]
while n.children:
nn = n.children
nn.unlinkNode()
nn.freeNode()
for v in value:
n.newChild(None, xname, xstr_encode(v).replace("&", "&"))
elif atype == 'text':
if value != None:
xres = node.xpathEval(xname)
if not len(xres):
node.newChild(None, xname, xstr_encode(value).replace("&", "&"))
else:
xres[0].setContent(xstr_encode(value).replace("&", "&"))
else:
for n in node.xpathEval(xname):
n.unlinkNode()
n.freeNode()
else:
if value != None:
if atype in ('integer', 'boolean'):
node.setProp(xname, str(int(value)))
else:
node.setProp(xname, xstr_encode(value))
else:
node.unsetProp(xname)
class XmlAccountSet(AccountSet):
_xml_node = None
_current_version = "0.3.0"
def __init__(self, xml_node, changed_callbacks = ()):
AccountSet.__init__(self)
self.changed_signal += changed_callbacks
self._xml_node = xml_node
version = xml_node.prop("version")
if not version: version = "0.0.1"
if compare_versions(version, self._current_version) < 0:
inter_version = version
while compare_versions(inter_version, self._current_version) < 0:
modname = "conv_acc_" + inter_version.replace(".", "_")
try: m = __import__(modname, globals(), locals())
except ImportError:
debug_msg(_("cannot convert accounts file of version %s") %
inter_version, 1)
break
m.convert_accounts(xml_node)
inter_version = xml_node.prop("version")
self.changed_signal()
elif compare_versions(version, self._current_version) > 0:
debug_msg(_("incompatible accounts file version (%s > %s)") %
(version, self._current_version), 1)
def ls_accounts(self):
return map(lambda n: (xstr_decode(n.prop("name")),
int(n.prop("id"))),
self._xml_node.xpathEval("account"))
def get_account(self, account_id):
xres = self._xml_node.xpathEval('account[@id=%d]' % account_id)
if len(xres):
return XmlAccount(xres[0], self.changed_signal)
raise NoAccountException(account_id)
def new_account(self, name = None):
ids = map(lambda a: a[1], self.ls_accounts())
ids.sort()
i = 0
while i < len(ids) and ids[i] == i: i += 1
acc = self._xml_node.newChild(None, 'account', None)
acc.setProp('id', str(i))
if name != None:
acc.setProp('name', xstr_encode(name))
else:
acc.setProp('name', xstr_encode(_("New Account %d") % i))
self.changed_signal()
return XmlAccount(acc, self.changed_signal)
def remove_accounts(self, account_ids):
if len(account_ids) == 0: return
expr = "account[@id=" + (" or @id=".join(
map(lambda aid:str(aid), account_ids))) + "]"
for n in self._xml_node.xpathEval(expr):
n.unlinkNode()
n.freeNode()
self.changed_signal()
def duplicate_account(self, account_id, name = None):
ids = map(lambda a: a[1], self.ls_accounts())
ids.sort()
i = 0
while i < len(ids) and ids[i] == i: i += 1
acc = self._xml_node.xpathEval(
"account[@id=%d]" % account_id)[0].copyNode(1)
acc.setProp('id', str(i))
if name != None:
acc.setProp('name', xstr_encode(name))
else:
acc.setProp('name', xstr_encode(
_("%s copy") % xstr_decode(acc.prop('name'))))
self._xml_node.addChild(acc)
self.changed_signal()
return XmlAccount(acc, self.changed_signal)
|