"""
Interface to various cdrtools (currently cdrecord and mkisofs)
"""
from tools import cmdoutput,striplist,which,TRUE,FALSE,is_linux_kernel_2_6
from string import split,join,digits,find,strip,letters,atoi,lower,replace
from types import StringType
from log4py import Logger,LOGLEVEL_NORMAL,LOGLEVEL_DEBUG
from re import match,compile
import os
# Change this for experimental ATAPI support
USE_ATAPI = TRUE # TRUE or FALSE
TRACK_MODE_DATA = "data"
TRACK_MODE_MODE2 = "mode2"
TRACK_MODE_AUDIO = "audio"
TRACK_MODE_XA1 = "xa1"
TRACK_MODE_XA2 = "xa2"
WRITE_MODE_DAO = "dao"
BLANK_MODE_DISC = "disc"
BLANK_MODE_FAST = "fast"
BLANK_MODE_SESSION = "session"
TRACK_STDIN = "-"
class cdrtools:
def __init__(self, loglevel = LOGLEVEL_NORMAL, scan_atapi = USE_ATAPI):
self.__cdrtools_logger = Logger().get_instance(self)
self.__cdrtools_logger.set_loglevel(loglevel)
self.__cdrtools_devices = []
self.__cdrtools_cdrecord_command = which("cdrecord")
self.__cdrtools_mkisofs_command = which("mkisofs")
self.__cdrtools_scanbus(scan_atapi)
def devices(self):
""" Returns the devices found by cdrecord. """
return self.__cdrtools_devices
def joliet_charsets(self):
""" Returns an array containing all available joliet character sets. """
command_line = "%s -jcharset help 2>&1" % self.__cdrtools_mkisofs_command
self.__cdrtools_logger.debug("Reading available charsets: %s" % command_line)
jolietCharsetsTemp = cmdoutput(command_line, strip = TRUE)
jolietCharsets = []
appendCharset = FALSE
for i in range(len(jolietCharsetsTemp)):
if ((appendCharset == TRUE) and (jolietCharsetsTemp[i] != "")):
jolietCharsets.append(jolietCharsetsTemp[i])
if (jolietCharsetsTemp[i][:14] == "Known charsets"):
appendCharset = TRUE
if (jolietCharsetsTemp[i] == ""):
appendCharset = FALSE
jolietCharsets.sort()
return jolietCharsets
def __cdrtools_scanbus(self, scan_atapi):
""" Gets the list of available devices by executing "cdrecord -scanbus". """
device = ""
if (scan_atapi == TRUE):
device = "dev=ATAPI"
command_line = "%s %s -scanbus 2>&1" % (self.__cdrtools_cdrecord_command, device)
self.__cdrtools_logger.debug("Searching for devices: %s" % command_line)
output = cmdoutput(command_line)
self.__cdrtools_devices = []
for i in range(len(output)):
line = output[i]
if (line[0] == "\t") and (line[1] in digits):
if (find(line, "CD-ROM") != -1):
channel = line[1]
id = line[3]
lun = line[5]
vendor = "%s %s %s" % (strip(line[13:21]), strip(line[24:40]), strip(line[43:47]))
self.__cdrtools_devices.append([channel, id, lun, vendor])
class cdrecord:
valid_track_modes = [ TRACK_MODE_DATA, TRACK_MODE_MODE2, TRACK_MODE_AUDIO, TRACK_MODE_XA1, TRACK_MODE_XA2 ]
valid_write_modes = [ WRITE_MODE_DAO ]
valid_blank_modes = [ BLANK_MODE_DISC, BLANK_MODE_FAST, BLANK_MODE_SESSION]
verbose_mode = FALSE
burnfree = FALSE
overburn = FALSE
fixate = TRUE
dummy_mode = FALSE
multisession = FALSE
eject = TRUE
fifosize = 4 * 1024 * 1024 # default: 4MB
speed = 1
pad_tracks = FALSE
swap_audio_tracks = FALSE
track_mode = ""
write_mode = ""
blank_mode = None
tsize = ""
def __init__(self, bus, target, lun, device, loglevel = LOGLEVEL_NORMAL, use_atapi = USE_ATAPI):
self.__cdrecord_logger = Logger().get_instance(self)
self.__cdrecord_logger.set_loglevel(loglevel)
self.__cdrecord_command = which("cdrecord")
if (type(bus) == StringType): bus = atoi(bus)
if (type(target) == StringType): target = atoi(target)
if (type(lun) == StringType): lun = atoi(lun)
self.__cdrecord_device = device
self.__cdrecord_tracks = []
self.__cdrecord_version_major = ""
self.__cdrecord_version_minor = ""
self.__cdrecord_version_micro = ""
self.__cdrecord_version_extra = ""
self.__cdrecord_device_string = "dev="
if (use_atapi == TRUE):
self.__cdrecord_device_string = "%sATAPI:" % self.__cdrecord_device_string
self.__cdrecord_device_string = "%s%d,%d,%d" % (self.__cdrecord_device_string, bus, target, lun)
self.__cdrecord_read_version()
def version(self):
""" Returns the version string of cdrecord. """
if (self.__cdrecord_version_micro != None):
return "%s.%s%s%s" % (self.__cdrecord_version_major, self.__cdrecord_version_minor, self.__cdrecord_version_micro, self.__cdrecord_version_extra)
else:
return "%s.%s" % (self.__cdrecord_version_major, self.__cdrecord_version_minor)
def device(self):
return self.__cdrecord_device
def driver_opts(self):
""" Gets the available driver options for a given device.
Note: this function doesn't use the cdrecord driveropts yet.
"""
# output = cmdoutput("cdrecord -checkdrive dev=%s,%s,%s driveropts=help 2>&1" % (bus, target, lun), strip = TRUE)
driverOpts = []
if (atoi(self.__cdrecord_version_major) == 1):
if (atoi(self.__cdrecord_version_minor) >= 11) and ((self.__cdrecord_version_micro == None) or (self.__cdrecord_version_micro >= "a02")):
driverOpts.append("burnfree")
else:
driverOpts.append("burnfree")
if (not "burnfree" in driverOpts):
driverOpts.append("burnproof")
return driverOpts
def overburn_supported(self):
""" Version check wether overburn is supported or not. """
if (atoi(self.__cdrecord_version_major) == 1):
if (atoi(self.__cdrecord_version_minor) >= 11) and ((self.__cdrecord_version_micro == None) or (self.__cdrecord_version_micro >= "a01")):
return TRUE
else:
return FALSE
else:
return TRUE
def previous_session(self):
command_line = "cdrecord -msinfo %s 2>&1" % (self.__cdrecord_device_string)
self.__cdrecord_logger.debug("Reading previous session: %s" % command_line)
output = cmdoutput(command_line, TRUE)[-1]
if (find(lower(output), "cannot") != -1) or (not output[0] in digits):
return None
else:
return output
def add_track(self, value):
if (value == TRACK_STDIN):
self.__cdrecord_tracks.append(value)
else:
self.__cdrecord_tracks.append("\"%s\"" % value)
def command_line(self):
cmdline = self.__cdrecord_command
if (self.verbose_mode): cmdline = "%s -v" % cmdline
if (self.burnfree):
options = self.driver_opts()
if ("burnfree" in options):
cmdline = "%s driveropts=burnfree" % cmdline
elif ("burnproof" in options):
cmdline = "%s driveropts=burnproof" % cmdline
if (self.dummy_mode): cmdline ="%s -dummy" % cmdline
if (self.eject): cmdline = "%s -eject" % cmdline
if (self.blank_mode == None):
if (self.overburn):
if (self.overburn_supported() == TRUE):
self.write_mode = WRITE_MODE_DAO
cmdline = "%s -overburn" % cmdline
if (self.tsize != ""):
cmdline = "%s tsize=%s" % (cmdline, self.tsize)
if (self.fixate == FALSE): cmdline = "%s -nofix" % cmdline
if (self.multisession): cmdline = "%s -multi" % cmdline
if (self.pad_tracks):
cmdline = "%s -pad" % cmdline
else:
cmdline = "%s -nopad" % cmdline
if (cdrecord.valid_track_modes.count(lower(self.track_mode)) > 0):
cmdline = "%s -%s" % (cmdline, lower(self.track_mode))
if (cdrecord.valid_write_modes.count(lower(self.write_mode)) > 0):
cmdline = "%s -%s" % (cmdline, lower(self.write_mode))
if (self.track_mode == TRACK_MODE_AUDIO):
if (self.swap_audio_tracks): cmdline = "%s -swab" % cmdline
if (type(self.fifosize) == StringType): self.fifosize = atoi(self.fifosize)
cmdline = "%s -fs=%d" % (cmdline, self.fifosize)
cmdline = "%s %s" % (cmdline, self.__cdrecord_device_string)
if (type(self.speed) == StringType): self.speed = atoi(self.speed)
cmdline = "%s speed=%d" % (cmdline, self.speed)
if (self.blank_mode == None):
for i in range(len(self.__cdrecord_tracks)):
cmdline = "%s %s" % (cmdline, self.__cdrecord_tracks[i])
else:
if (cdrecord.valid_blank_modes.count(lower(self.blank_mode)) > 0):
cmdline = "%s -blank %s" % (cmdline, lower(self.blank_mode))
return cmdline
def get_write_speed(self):
""" Returns the maximum speed for writing CDs/DVDs. """
cmdline = self.__cdrecord_command
cmdline = "%s %s driveropts=help -checkdrive -prcap" % (cmdline, self.__cdrecord_device_string)
regexp = compile(".*Maximum\ write\ speed:[\d\s]+kB\/s\ \(CD\s+(\d+)x\,\ DVD\s+(\d+)x.*")
output = cmdoutput(cmdline)
max_write_speed_cd = 1
max_write_speed_dvd = 1
for i in range(len(output)):
matchobject = regexp.match(output[i])
if (matchobject != None):
max_write_speed_cd = int(matchobject.group(1))
max_write_speed_dvd = int(matchobject.group(2))
return (max_write_speed_cd, max_write_speed_dvd)
# Private methods of the cdrecord class
def __cdrecord_read_version(self):
""" Reads the version string by executing "cdrecord -version". """
output = cmdoutput("%s -version" % self.__cdrecord_command, strip = TRUE)
versionLine = output[0]
splitted = split(versionLine, " ")
version = split(splitted[1], ".")
self.__cdrecord_version_major = version[0]
position = -1
for i in range(len(version[1])):
if (version[1][i] in letters):
position = i
break
if (position != -1):
self.__cdrecord_version_minor = version[1][:position]
self.__cdrecord_version_micro = version[1][position:]
dash_position = find(self.__cdrecord_version_micro, "-")
if (dash_position != -1):
self.__cdrecord_version_extra = self.__cdrecord_version_micro[dash_position:]
self.__cdrecord_version_micro = self.__cdrecord_version_micro[:dash_position]
else:
self.__cdrecord_version_minor = version[1]
self.__cdrecord_version_micro = None
class mkisofs:
verbose_mode = FALSE
disable_deep_relocation = TRUE
full_iso9660_filenames = FALSE
allow_leading_dots = FALSE
follow_links = TRUE
joliet_charset = None
rational_rock = TRUE
rock_ridge = TRUE
omot_trailing_periods = TRUE
volume_id = ""
output_file = ""
boot_image = ""
boot_catalog = ""
gui_behaviour = FALSE
print_size = FALSE
def __init__(self, loglevel = LOGLEVEL_NORMAL):
self.__mkisofs_logger = Logger().get_instance(self)
self.__mkisofs_logger.set_loglevel(loglevel)
self.__mkisofs_command = which("mkisofs")
self.__mkisofs_gui_behaviour_supported = FALSE
self.__mkisofs_check_gui_parameter()
self.__mkisofs_multisession_magic_parameters = None
self.__mkisofs_multisession_device = None
self.__mkisofs_files = []
def multi_session(self, magic_parameters, device):
if (magic_parameters != None):
self.__mkisofs_multisession_magic_parameters = magic_parameters
self.__mkisofs_multisession_device = device
def add_file(self, filename):
if (self.__mkisofs_files.count(filename) == 0):
# mkisofs always requires "/" as separator, even on win32 platforms!
if (os.sep != "/"):
filename = replace(filename, os.sep, "/")
self.__mkisofs_files.append(filename)
def command_line(self):
""" Returns the complete command line including all set parameters. """
cmdline = self.__mkisofs_command
if (self.verbose_mode): cmdline = "%s -v" % cmdline
if (self.__mkisofs_gui_behaviour_supported) and (self.gui_behaviour): cmdline = "%s -gui" % cmdline
if (self.disable_deep_relocation): cmdline = "%s -D" % cmdline
if (self.full_iso9660_filenames): cmdline = "%s -l" % cmdline
if (self.allow_leading_dots): cmdline = "%s -L" % cmdline
if (self.follow_links): cmdline = "%s -f" % cmdline
if (self.joliet_charset != None): cmdline = "%s -J -jcharset %s" % (cmdline, self.joliet_charset)
if (self.__mkisofs_multisession_magic_parameters != None): cmdline = "%s -C %s -M %s" % (cmdline, self.__mkisofs_multisession_magic_parameters, self.__mkisofs_multisession_device)
if (self.rational_rock): cmdline = "%s -r" % cmdline
if (self.rock_ridge): cmdline = "%s -R" % cmdline
if (self.omot_trailing_periods): cmdline = "%s -d" % cmdline
if (self.volume_id != ""): cmdline = "%s -V \"%s\"" % (cmdline, self.volume_id)
if (self.output_file != ""): cmdline = "%s -o \"%s\"" % (cmdline, self.output_file)
if ((self.boot_catalog != "") and (self.boot_image != "")):
cmdline = "%s -b \"%s\" -c \"%s\"" % (cmdline, self.boot_image, self.boot_catalog)
if (self.print_size):
cmdline = "%s -print-size -quiet" % cmdline
if (len(self.__mkisofs_files) > 0):
cmdline = "%s -graft-points" % cmdline
for i in range(len(self.__mkisofs_files)):
filename = self.__mkisofs_files[i]
if (os.path.isdir(filename)):
if (filename[-1] == "/"):
filename = filename[:-1]
shortfilename = split(filename, "/")[-1]
cmdline = "%s \"/%s/\"=\"%s\"" % (cmdline, shortfilename, filename)
else:
cmdline = "%s \"%s\"" % (cmdline, filename)
return cmdline
def __mkisofs_check_gui_parameter(self):
output = cmdoutput("%s 2>&1 | grep gui" % self.__mkisofs_command)
if (len(output) == 0):
self.__mkisofs_gui_behaviour_supported = FALSE
else:
self.__mkisofs_gui_behaviour_supported = TRUE
class readcd:
filename = None
def __init__(self, bus, target, lun, loglevel = LOGLEVEL_NORMAL):
self.__readcd_logger = Logger().get_instance(self)
self.__readcd_logger.set_loglevel(loglevel)
self.__readcd_command = which("readcd")
if (type(bus) == StringType): bus = atoi(bus)
if (type(target) == StringType): target = atoi(target)
if (type(lun) == StringType): lun = atoi(lun)
self.__readcd_bus = bus
self.__readcd_target = target
self.__readcd_lun = lun
def command_line(self):
cmdline = self.__readcd_command
cmdline = "%s dev=%d,%d,%d" % (cmdline, self.__readcd_bus, self.__readcd_target, self.__readcd_lun)
cmdline = "%s -f \"%s\"" % (cmdline, self.filename)
return cmdline
class cdda2wav:
def __init__(self, bus, target, lun, loglevel = LOGLEVEL_NORMAL, use_atapi = USE_ATAPI):
self.__cdda2wav_logger = Logger().get_instance(self)
self.__cdda2wav_logger.set_loglevel(loglevel)
self.__cdda2wav_command = which("cdda2wav")
if (type(bus) == StringType): bus = atoi(bus)
if (type(target) == StringType): target = atoi(target)
if (type(lun) == StringType): lun = atoi(lun)
self.__cdda2wav_device_string = "dev="
if (use_atapi == TRUE):
self.__cdda2wav_device_string = "%sATAPI:" % self.__cdda2wav_device_string
self.__cdda2wav_device_string = "%s%d,%d,%d" % (self.__cdda2wav_device_string, bus, target, lun)
self.__cdda2wav_read_version()
def __cdda2wav_read_version(self):
command = "%s --version 2>&1" % self.__cdda2wav_command
self.__cdda2wav_logger.debug("Executing %s" % command)
output = cmdoutput(command)
regexp = compile(".*version\ (\d)\.(\d).*")
matchobject = regexp.match(output[0])
if (matchobject != None):
self.__cdda2wav_version_major = int(matchobject.group(1))
self.__cdda2wav_version_minor = int(matchobject.group(2))
else:
self.__cdda2wav_version_major = 1
self.__cdda2wav_version_minor = 0
def get_version(self):
return "%d.%d" % (self.__cdda2wav_version_major, self.__cdda2wav_version_minor)
def get_track_info(self):
command = "%s -N -J -g -Q" % (self.__cdda2wav_command)
if (self.__cdda2wav_version_major < 2):
command = "%s -v 2" % command
else:
command = "%s -v toc" % command
command = "%s %s 2>&1" % (command, self.__cdda2wav_device_string)
self.__cdda2wav_logger.debug("Executing %s" % command)
output = cmdoutput(command)
tracklengths = {}
trackinfo = {}
trackinfo["error_message"] = ""
for i in range(len(output)):
line = strip(output[i])
if (line != ""):
if (match("Tracks.*", line)):
splitted = split(line[7:], " ")
trackinfo["number_of_tracks"] = strip(splitted[0])
trackinfo["total_time"] = strip(splitted[1])
elif (match("T\d.*", line)):
tracknum = atoi(line[1:3])
length = strip(line[13:21])
tracklengths[tracknum] = length
elif (match(".*Permission.*denied.*", line)):
trackinfo["error_message"] = "Permission denied while trying to read the CD content"
elif (match(".*Read.*TOC.*size.*failed.*", line)):
trackinfo["error_message"] = "Failed to read CD content"
trackinfo["track_lengths"] = tracklengths
return trackinfo
def test():
mycdda2wav = cdda2wav(0, 0, 0, LOGLEVEL_DEBUG)
print mycdda2wav.get_version()
print mycdda2wav.get_track_info()
if (__name__ == "__main__"):
test()
|