Скачать 1.13 Mb.
|
part = disk.partitions[0] fs_type = part.fileSystem.type defer.returnValue(fs_type) @defer.inlineCallbacks def _burnImage(self, volume_name, image_path): lvpath = yield self._getLVPath(volume_name) yield dd(src=image_path, dst=lvpath, bs='16M') @defer.inlineCallbacks def _extendPartition(self, volume_name): # XXX: May be to move partition related function(s) to special module? # XXX: split this funtion to several more specific functions self.msg("Maximaze first partition on volume %r." % volume_name) lvpath = yield self._getLVPath(volume_name) self.msg("Volume %r backed device: %r." % (volume_name, lvpath)) # XXX: BECAUSE OF BUG IN PY/LIBPARTED # removeFromCache() works only for clean device. device = parted.Device(lvpath) device.removeFromCache() device = parted.Device(lvpath) self.msg("Volume %r device info:\n%s." % (volume_name, device)) self.msg("Volume %r device physical size: %r KB" % (volume_name, device.getSize('KB'))) constraint = device.getConstraint() self.msg("Device constraint on volume %r:\n%s" % (volume_name, constraint)) disk = parted.Disk(device) self.msg("Volume %r disk info:\n%s." % (volume_name, disk)) parts_msg = "\n".join(str(p) for p in disk.partitions) self.msg("Volume %r partitions:\n%s." % (volume_name, parts_msg)) part = disk.partitions[0] orig_size = part.getSize('KB') self.msg("Current size of first partition on volume %r %s KB." % (volume_name, orig_size)) geometry = part.geometry self.msg("Current first partition geometry on volume %r:\n%s." % (volume_name, geometry)) new_geometry = parted.Geometry(device, start=geometry.start, end=constraint.maxSize - 1) new_constraint = parted.Constraint(exactGeom=new_geometry) self.msg("New geometry for first partition on volume %r:\n%s" % (volume_name, new_geometry)) result = disk.setPartitionGeometry(part, new_constraint, new_geometry.start, new_geometry.end) if not result: raise StorageError("Extending of partition failed on %r." % volume_name) new_size = part.getSize('KB') if new_size < orig_size: # XXX: bug in parted! self.msg("Nothing changes because new size of partition %s KB less than original %s KB." % (new_size, orig_size)) return self.msg("First partition on volume %r maximized to %s KB (was %s KB)." % (volume_name, new_size, orig_size)) # XXX: blocking call. Call commitToDevice() in thread. disk.commitToDevice() device.removeFromCache() @defer.inlineCallbacks def _resizeFS(self, volume_name, image_info): fs_type = image_info['fs_type'] if fs_type not in ['ext4', 'ext3', 'ext2', 'ntfs']: raise LVMStorageError("Unsupported filesystem to resize: %r." % fs_type) partpaths = yield self._makePartNodes(volume_name) if len(partpaths) > 1: self.msg("Volume %r has several partitions: %r." % (volume_name, partpaths)) partpath = partpaths[0] try: if fs_type == 'ntfs': yield self._resizeNTFS(partpath) else: yield self._resizeExtFS(partpath) finally: yield self._delPartNodes(volume_name) @defer.inlineCallbacks def _makePartNodes(self, volume_name): lvpath = yield self._getLVPath(volume_name) mappings = yield kpartx_add(lvpath, delimiter='__p') partpaths = ["/dev/mapper/%s" % map for map in mappings] self.msg("Partition nodes of volume %r (%r) successfully created: %r." % (volume_name, lvpath, partpaths)) defer.returnValue(partpaths) @defer.inlineCallbacks def _delPartNodes(self, volume_name): lvpath = yield self._getLVPath(volume_name) yield kpartx_del(lvpath, delimiter='__p') self.msg("Partitions nodes of volume %r successfully deleted." % volume_name) @defer.inlineCallbacks def _resizeExtFS(self, path): yield e2fsck(path, force=True) yield resize2fs(path) @defer.inlineCallbacks def _resizeNTFS(self, path): yield ntfsresize(path) @defer.inlineCallbacks def _getLVPath(self, volume_name): lv_info = yield lvm.lvdisplay(self.vg_name, volume_name) defer.returnValue(lv_info['path']) def list(self): def parser(stdout, stderr): data = [] for line in stdout.split('\n'): line = line.rstrip() if line: line = line.split(':') name = os.path.basename(line[0]) size = int(line[6]) * 512 / 1024 #in kilobytes data.append({ 'volume_name': name, 'size': size }) return data return self._runProcess(self.lvmExec, args=('lvdisplay', self.vg_name, '--colon'), resultParser=parser) def info(self): def sizeInKiloBytes(extent_size, size): return size * extent_size def parser(stdout, stderr): lines = stdout.split('\n') line = lines[0].strip() line = line.split(':') size_fmt = functools.partial(sizeInKiloBytes, int(line[12])) data = { 'total': size_fmt(int(line[13])), 'free': size_fmt(int(line[15])), 'used': size_fmt(int(line[14])), 'num_volumes': int(line[5]) } return data return self._runProcess(self.lvmExec, args=('vgdisplay', self.vg_name, '--colon'), resultParser=parser) template_store.py from twisted.internet.protocol import defer from twisted.python import log class TemplateStoreError(Exception): pass class TemplateStore(object): template_ext = '.xml' uri = 'templates' def __init__(self, file_loader, uri=None, template_ext=None): self.file_loader = file_loader if uri: self.uri = uri if template_ext: self.template_ext = template_ext def msg(self, msg): log.msg("HTTPTemplateStore | %s" % msg) def _getURI(self, name): uri = "%s/%s%s" % (self.uri, name, self.template_ext) self.msg("URI for template %r: %r." % (name, uri)) return uri @defer.inlineCallbacks def get(self, name): self.msg("Requested template %r." % name) uri = self._getURI(name) result = yield self.file_loader.get(uri) self.msg("Got template %r: %r." % (name, result)) defer.returnValue(result) txlibvirt.py '''Twisted libvirt API.''' from twisted.internet import threads import libvirt DOMAIN_STATE = { libvirt.VIR_DOMAIN_NOSTATE: 'nostate', libvirt.VIR_DOMAIN_RUNNING: 'running', libvirt.VIR_DOMAIN_BLOCKED: 'blocked', libvirt.VIR_DOMAIN_PAUSED: 'paused', libvirt.VIR_DOMAIN_SHUTDOWN: 'shutdown', libvirt.VIR_DOMAIN_SHUTOFF: 'shutoff', libvirt.VIR_DOMAIN_CRASHED: 'crashed' } threaded = threads.deferToThread.__get__ def libvirtErrorToDict(error): return { 'code': error.get_error_code(), 'domain': error.get_error_domain(), 'message': error.get_error_message(), 'level': error.get_error_level(), 'str1': error.get_str1(), 'str2': error.get_str2(), 'str3': error.get_str3(), 'int1': error.get_int1(), 'int2': error.get_int2() } @threaded def nodeGetInfo(conn): info = conn.getInfo() return { 'model': info[0], 'memory': info[1] * 1024, # in kilobytes 'cpus': info[2], 'mhz': info[3], 'nodes': info[4], 'sockets': info[5], 'cores': info[6], 'threads': info[7], } @threaded def listDomains(conn): domains = [] for id in conn.listDomainsID(): domains.append({ 'status': 'active', 'name': conn.lookupByID(id).name() }) for name in conn.listDefinedDomains(): domains.append({ 'status': 'inactive', 'name': name }) return domains @threaded def domainGetInfo(conn, name): domain = conn.lookupByName(name) info = domain.info() result = { 'state': DOMAIN_STATE[info[0]], 'max_mem': info[1], 'memory': info[2], 'number_vcpu': info[3], 'cpu_time': info[4] } return result @threaded def defineXML(conn, config): conn.defineXML(config) return None @threaded def domainGetXML(conn, name): return conn.lookupByName(name).XMLDesc(libvirt.VIR_DOMAIN_XML_SECURE | \ libvirt.VIR_DOMAIN_XML_INACTIVE) @threaded def domainStart(conn, name): conn.lookupByName(name).create() return None @threaded def domainReboot(conn, name): #flags unused conn.lookupByName(name).reboot(0) return None @threaded def domainShutdown(conn, name): conn.lookupByName(name).shutdown() return None @threaded def domainDestroy(conn, name): conn.lookupByName(name).destroy() return None @threaded def domainUndefine(conn, name): conn.lookupByName(name).undefine() return None @threaded def domainSuspend(conn, name): conn.lookupByName(name).suspend() return None @threaded def domainResume(conn, name): conn.lookupByName(name).resume() return None @threaded def domainMigrateToURL(conn, name, url, live, persist_dest, undefine_source, non_shared_disk): domain = conn.lookupByName(name) flags = libvirt.VIR_MIGRATE_PEER2PEER if live: flags |= libvirt.VIR_MIGRATE_LIVE if persist_dest: flags |= libvirt.VIR_MIGRATE_PERSIST_DEST if undefine_source: flags |= libvirt.VIR_MIGRATE_UNDEFINE_SOURCE if non_shared_disk: flags |= libvirt.VIR_MIGRATE_NON_SHARED_DISK #XXX: dname and bandwidth are hardcoded domain.migrateToURI(url, flags, None, 0) return None utils.py from twisted.internet import defer, utils from twisted.python import log import lxml.etree @defer.inlineCallbacks def runProcess(cmd, args): log.msg("runProcess | Executing '%s %s'." % (cmd, ' '.join(args))) result = yield utils.getProcessOutputAndValue(cmd, args) log.msg("runProcess | Result of executing '%s %s': %r." % (cmd, ' '.join(args), result)) defer.returnValue(result) class CommandError(Exception): def __init__(self, code, stdout, stderr): Exception.__init__(self, code, stdout, stderr) self.code = code self.stdout = stdout self.stderr = stderr class DDError(CommandError): pass @defer.inlineCallbacks def dd(src, dst, bs=None): DD_EXEC = '/bin/dd' args = [] args.append('if=%s' % src) args.append('of=%s' % dst) if bs: args.append('bs=%s' % bs) stdout, stderr, exitCode = yield runProcess(DD_EXEC, args) if exitCode: raise DDError(exitCode, stdout, stderr) class Resize2fsError(CommandError): pass @defer.inlineCallbacks def resize2fs(path): RESIZE2FS_EXEC = '/sbin/resize2fs' args = [path] stdout, stderr, exitCode = yield runProcess(RESIZE2FS_EXEC, args) if exitCode: raise Resize2fsError(exitCode, stdout, stderr) class E2fsckError(CommandError): pass @defer.inlineCallbacks def e2fsck(path, force=None, preen=True): E2FSCK = '/sbin/e2fsck' args = [] if preen: args.append('-p') if force: args.append('-f') args.append(path) stdout, stderr, exitCode = yield runProcess(E2FSCK, args) # fsck exit code is sum of # 0 - ok # 1 - errors corrected # 2 - reboot needed # n - fatal errors # See man 8 fsck. if exitCode > 3: raise E2fsckError(exitCode, stdout, stderr) class ExtractDevicesError(Exception): def __init__(self, xml, error): Exception.__init__(xml, error) self.xml = xml self.error = error def devicesFromDomainXML(xml): # TODO: raise errors if target or source not exist try: tree = lxml.etree.fromstring(xml) except lxml.etree.XMLSyntaxError as error: raise ExtractDevicesError(xml, error) devices = [] for device in tree.xpath("/domain/devices/disk[@device='disk']"): device_type = device.attrib['type'] target = device.find('./target').attrib source = device.find('./source').attrib if device_type == 'file': devices.append({ 'path': source['file'], 'dev': target['dev'] }) elif device_type == 'block': devices.append({ 'path': source['dev'], 'dev': target['dev'] }) return devices class NTFSResizeError(CommandError): pass @defer.inlineCallbacks def ntfsresize(path): NTFS_RESIZE = '/usr/sbin/ntfsresize' args = ['--no-progress-bar', '--force', path] stdout, stderr, exitCode = yield runProcess(NTFS_RESIZE, args) if exitCode: raise NTFSResizeError(exitCode, stdout, stderr) class KpartxError(Exception): pass class KpartxCommandError(KpartxError, CommandError): pass @defer.inlineCallbacks def kpartx_add(disk, delimiter='p'): KPARTX = '/sbin/kpartx' args = ['-v', '-p', delimiter, '-a', disk] stdout, stderr, exitCode = yield runProcess(KPARTX, args) if exitCode: raise KpartxCommandError(exitCode, stdout, stderr) if not stdout: raise KpartxError("No parts found on disk %r." % disk) mappings = [] stdout = stdout.strip() for row in stdout.split('\n'): mapping = row.split()[2] mappings.append(mapping) log.msg("kpartx_add | Disk %r partition mappings %r added to system." % (disk, mappings)) defer.returnValue(mappings) @defer.inlineCallbacks def kpartx_del(disk, delimiter='p'): KPARTX = '/sbin/kpartx' args = ['-v', '-p', delimiter, '-d', disk] stdout, stderr, exitCode = yield runProcess(KPARTX, args) if exitCode: raise KpartxCommandError(exitCode, stdout, stderr) Приложение Д Модуль тестирования ___________________ Аннотация Данное приложение представлено в виде модуля на языке Python, который реализует unit-тестирование для модулей приложения VirtPlatform. В данные модули входят: протокол, агент и клиентская часть. ___________________ ! * Наименование программы: VirtPlatform Tests ! * Авторы: Харин Илья, Гуреев Илья ! * Версия: 1.0 ! * Дата создания: 13 сентября 2011 ! * Дата последнего изменения: 25 октября 2011 __init__.py agent.py """ from twisted.trial import unittest from twisted.internet import defer import os import libvirt from virt_platform.protocol import Protocol, Request from virt_platform.agentd.service import AgentFactory from virt_platform.agentd.storage import LVMStorageManager from virt_platform.agentd.templates import FilesystemTemplateStorage config_subdir = 'testnode' basedir = os.path.dirname(os.path.abspath(__file__)) templatedir = os.path.join(basedir, config_subdir) testnode_xml = os.path.join(templatedir, 'testnode.xml') testdom_xml = os.path.join(templatedir, 'testdom.xml') template = file(testdom_xml).read() class AgentFactoryTest(unittest.TestCase): def setUp(self): url = "test:///%s" % testnode_xml self.connection = conn = libvirt.open(url) self.d1 = conn.defineXML(template % {'domain_name': 'domain1'}) self.d2 = conn.defineXML(template % {'domain_name': 'domain2'}) self.d2.create() self.factory = AgentFactory(LVMStorageManager('test'), {}, url=url) def tearDown(self): self.connection.close() @defer.inlineCallbacks def test_node_get_info(self): aq = self.assertEqual request = Request('node_get_info') response = yield self.factory.node_get_info(None, request) aq(response.code, 200) aq(response.data['model'], 'i986') aq(response.data['memory'], 8000) aq(response.data['cpus'], 50) aq(response.data['mhz'], 6000) aq(response.data['nodes'], 4) aq(response.data['sockets'], 4) aq(response.data['cores'], 4) aq(response.data['threads'], 2) @defer.inlineCallbacks def test_list_domains(self): at = self.assertTrue aq = self.assertEqual request = Request('list_domains') response = yield self.factory.list_domains(None, request) aq(response.code, 200) domains = {} for domain in response.data: domains[domain['name']] = domain['status'] at('domain1' in domains) at('domain2' in domains) aq(domains['domain1'], 'inactive') aq(domains['domain2'], 'active') @defer.inlineCallbacks def test_domain_get_info(self): aq = self.assertEqual request = Request('domain_get_info', {'domain_name': 'domain1'}) response = yield self.factory.domain_get_info(None, request) aq(response.code, 200) aq(response.data['state'], 'shutoff') request = Request('domain_get_info', {'domain_name': 'domain2'}) response = yield self.factory.domain_get_info(None, request) aq(response.code, 200) aq(response.data['state'], 'running') aq(response.data['max_mem'], 524288) aq(response.data['memory'], 524288) aq(response.data['number_vcpu'], 4) # cpu_time changing all the time .. #aq(response.data['cpu_time'], 1303166098417378000L) class AgentFactoryDomainDefinitionTest(unittest.TestCase): def setUp(self): self.connection = conn = libvirt.open("test:///%s" % testnode_xml) self.templates = FilesystemTemplateStorage(templatedir) self.factory = AgentFactory(self.connection, LVMStorageManager('test'), self.templates) def tearDown(self): self.connection.close() @defer.inlineCallbacks def test_domain_define_xml(self): aq = self.assertEqual config = template % {'domain_name': 'testdomain'} request = Request('domain_define_xml', {'xml': config}) response = yield self.factory.domain_define_xml(None, request) aq(response.code, 200) aq('testdomain', self.connection.listDefinedDomains()[0]) @defer.inlineCallbacks def domain_define_template(self): aq = self.assertEqual data = { 'template_name': 'testdom.xml', 'domain_name': 'test', 'memory': 524288, 'number_vcpu': 2, 'storage_volume': '/root/test' } request = Request('domain_define_template', data) response = yield self.factory.domain_define_xml(None, request) aq(response.code, 200) #XXX: change template and other tests and do this test correctly """ agentd_aoe.py from twisted.trial import unittest from twisted.internet import defer from twisted.python import log import sys from virt_platform.agentd.aoe import VbladeManager class VbladeExportOkTestCase(unittest.TestCase): def setUp(self): log.startLogging(sys.stdout) self.vblade = VbladeManager(1, 'eth0', self) def spawnProcess(self, protocol, execFile, args=()): log.msg("Process starting protocol:%r, execFile:%s, args:%r" % \ (protocol, execFile, args)) protocol.connectionMade() def callLater(self, seconds, func, *args, **kwargs): func(*args, **kwargs) @defer.inlineCallbacks def testExportOk(self): aq = self.assertEqual result = yield self.vblade.export(1, '/store/file') log.msg("Process started OK with result %r" % result) aq(result['shelf'], 1) aq(result['slot'], 1) agentd_file_loader.py import sys import os import os.path import string from twisted.trial import unittest from twisted.internet import reactor, defer from twisted.python import log from twisted.web import server, resource from virt_platform.agentd.file_loader import HTTPFileLoader big_data = string.ascii_letters * 1024 * 1024 class Resource(resource.Resource): isLeaf = True trigger_state = 0 def render_GET(self, request): if request.uri == '/abacaba': return "abacaba" elif request.uri == '/big_file': return big_data elif request.uri == '/trigger': result = str(self.trigger_state) self.trigger_state ^= 1 return result elif request.uri == "/a/b/a/c/a/b/a": return "abacaba" else: raise NotImplementedError("URI: %r not found." % request.uri) class MockFileLoaderHTTPServer(object): def __init__(self, port): self.port = port self.site = server.Site(Resource()) def listen(self): # XXX: we must return deferred return reactor.listenTCP(self.port, self.site) class HTTPFileLoaderTestCase(unittest.TestCase): def setUp(self): log.startLogging(sys.stdout) port = MockFileLoaderHTTPServer(22345).listen() self.addCleanup(port.stopListening) self.file_loader = HTTPFileLoader("localhost", 22345) @defer.inlineCallbacks def testGet(self): aq = self.assertEqual data = yield self.file_loader.get("abacaba") aq(data, "abacaba") @defer.inlineCallbacks def testDownload(self): aq = self.assertEqual path = yield self.file_loader.download("abacaba") with open(path) as f: data = f.read() aq(data, "abacaba") os.remove(path) @defer.inlineCallbacks def testCacheUsage(self): aq = self.assertEqual path = yield self.file_loader.download("abacaba") mtime1 = os.path.getmtime(path) path = yield self.file_loader.download("abacaba") mtime2 = os.path.getmtime(path) aq(mtime1, mtime2) os.remove(path) @defer.inlineCallbacks def testConcurentDownload(self): aq = self.assertEqual d = self.file_loader.download("big_file") d2 = self.file_loader.download("big_file") path = yield d path2 = yield d2 aq(path, path2) with file(path) as f: aq(big_data, f.read()) os.remove(path) @defer.inlineCallbacks def testCacheInvalidation(self): nq = self.assertNotEqual path = yield self.file_loader.download("trigger") with open(path) as f: data1 = f.read() yield self.file_loader.flushCache("trigger") yield self.file_loader.download("trigger") with open(path) as f: data2 = f.read() nq(data1, data2) @defer.inlineCallbacks def testDownloadWithPath(self): aq = self.assertEqual path = yield self.file_loader.download("a/b/a/c/a/b/a") with open(path) as f: data = f.read() aq(data, "abacaba") agentd_lvm.py import sys import os import os.path import string from twisted.trial import unittest from twisted.internet import reactor, defer from twisted.python import log from virt_platform.agentd import lvm class LVDisplayTestCase(unittest.TestCase): lvdisplay_all = """ /dev/s6vg0/root:s6vg0:3:1:-1:1:41943040:640:-1:0:-1:253:0 /dev/s6vg0/drbd:s6vg0:3:1:-1:2:314572800:4800:-1:0:-1:253:3 /dev/virtvg/vm_test:virtvg:3:1:-1:0:204800:25:-1:0:-1:253:4 /dev/virtvg/vm_ashtest:virtvg:3:1:-1:0:204800:25:-1:0:-1:253:5\n""" lvdisplay_virtvg = """ /dev/virtvg/vm_test:virtvg:3:1:-1:0:204800:25:-1:0:-1:253:4 /dev/virtvg/vm_ashtest:virtvg:3:1:-1:0:204800:25:-1:0:-1:253:5\n""" def setUp(self): log.startLogging(sys.stdout) lvm.runProcess = self.mockRunProcess def mockRunProcess(self, cmd, args): log.msg("Executing %s(%r)." % (cmd, args)) assert('lvm' in cmd) assert('-c' in args or '--columns' in args) if args[0] == 'lvdisplay': if len(args) == 2: result = self.lvdisplay_all, "", 0 elif len(args) == 3 and 'virtvg' in args: result = self.lvdisplay_virtvg, "", 0 else: raise NotImplementedError else: raise NotImplementedError log.msg("Result of %s(%r): %r." % (cmd, args, result)) return defer.succeed(result) @defer.inlineCallbacks def testSimpleDisplay(self): aq = self.assertEqual lvs = yield lvm.lvdisplay() aq(len(lvs), 4) for lv in lvs: if lv['name'] == 'root': aq(lv['path'], '/dev/s6vg0/root') aq(lv['vgname'], 's6vg0') aq(lv['open_count'], 1) aq(lv['associated_extents'], 640) elif lv['name'] == 'drbd': aq(lv['path'], '/dev/s6vg0/drbd') aq(lv['vgname'], 's6vg0') aq(lv['open_count'], 2) aq(lv['associated_extents'], 4800) elif lv['name'] == 'vm_test': aq(lv['path'], '/dev/virtvg/vm_test') aq(lv['vgname'], 'virtvg') aq(lv['open_count'], 0) aq(lv['associated_extents'], 25) elif lv['name'] == 'vm_ashtest': aq(lv['path'], '/dev/virtvg/vm_ashtest') aq(lv['vgname'], 'virtvg') aq(lv['open_count'], 0) aq(lv['associated_extents'], 25) @defer.inlineCallbacks def testDiplayOneVG(self): aq = self.assertEqual lvs = yield lvm.lvdisplay('virtvg') aq(len(lvs), 2) for lv in lvs: if lv['name'] == 'vm_test': aq(lv['path'], '/dev/virtvg/vm_test') aq(lv['vgname'], 'virtvg') aq(lv['open_count'], 0) aq(lv['associated_extents'], 25) elif lv['name'] == 'vm_ashtest': aq(lv['path'], '/dev/virtvg/vm_ashtest') aq(lv['vgname'], 'virtvg') aq(lv['open_count'], 0) aq(lv['associated_extents'], 25) @defer.inlineCallbacks def testDisplayOneLV(self): aq = self.assertEqual lv = yield lvm.lvdisplay(lv='vm_test') if lv['name'] == 'vm_test': aq(lv['path'], '/dev/virtvg/vm_test') aq(lv['vgname'], 'virtvg') aq(lv['open_count'], 0) aq(lv['associated_extents'], 25) @defer.inlineCallbacks def testDisplayUnknownLV(self): # XXX: trial does not allow use assertRaises as context manager. try: yield lvm.lvdisplay(lv='cadabra') except lvm.LVNotFound, e: log.msg("Caught exception: %r." % e) else: self.assertTrue(False, "Expected LVNotFound exception.") class VGDisplayTestCase(unittest.TestCase): vgdisplay_all = """ s6vg0:r/w:772:-1:0:4:4:-1:0:1:1:292814848:32768:8936:8128:808:3gt2iZ-EKiQ-rmdA-iYve-T8fL-QXno-6LylWk virtvg:r/w:772:-1:0:2:0:-1:0:1:1:157278208:4096:38398:50:38348:b7VQZe-mlrz-kY3P-eAQO-SrWv-hUfq-9hKiWP\n""" vgdisplay_virtvg = """ virtvg:r/w:772:-1:0:2:0:-1:0:1:1:157278208:4096:38398:50:38348:b7VQZe-mlrz-kY3P-eAQO-SrWv-hUfq-9hKiWP\n""" def setUp(self): log.startLogging(sys.stdout) lvm.runProcess = self.mockRunProcess def mockRunProcess(self, cmd, args): at = self.assertTrue log.msg("Executing %s(%r)." % (cmd, args)) at('lvm' in cmd) at('vgdisplay' in args) at('-c' in args or '--columns' in args) if len(args) == 2: result = self.vgdisplay_all, "", 0 elif len(args) == 3 and 'virtvg' in args: result = self.vgdisplay_virtvg, "", 0 else: raise NotImplementedError log.msg("Result of %s(%r): %r." % (cmd, args, result)) return defer.succeed(result) @defer.inlineCallbacks def testSimpleDisplay(self): aq = self.assertEqual vgs = yield lvm.vgdisplay() aq(len(vgs), 2) for vg in vgs: if vg['name'] == 's6vg0': aq(vg['number_lv'], 4) aq(vg['size'], 292814848) aq(vg['extent_size'], 32768) elif vg['name'] == 'virtvg': aq(vg['number_lv'], 2) aq(vg['size'], 157278208) aq(vg['extent_size'], 4096) else: self.assertTrue(False, "This should not be happen. vgs: %r." % vgs) @defer.inlineCallbacks def testDisplayOneVG(self): aq = self.assertEqual vg = yield lvm.vgdisplay('virtvg') aq(vg['name'], 'virtvg') aq(vg['number_lv'], 2) aq(vg['size'], 157278208) aq(vg['extent_size'], 4096) class LVCreateTestCase(unittest.TestCase): def setUp(self): log.startLogging(sys.stdout) lvm.runProcess = self.runProcess self.LVM = {} self.LVM['virtvg'] = {} def runProcess(self, cmd, args): at = self.assertTrue aq = self.assertEqual log.msg("Executing %s(%r)." % (cmd, args)) at('lvm' in cmd) at('lvcreate' in args) at('--name' in args or '-n' in args) at('--size' in args or '-L' in args) skip = [] for pos, arg in enumerate(args): if arg == '--name' or arg == '-n': name = args[pos+1] skip.append(pos + 1) elif arg == '--size' or arg == '-L': size = args[pos+1] value, unit = size[:-1], size[-1:] at(unit, 'K') size_kb = int(value) skip.append(pos + 1) elif pos not in skip: vg = arg at(vg in self.LVM) self.LVM[vg][name] = {'size': size_kb} return defer.succeed(["", "", 0]) @defer.inlineCallbacks def testSimpleCreateLV(self): aq = self.assertEqual at = self.assertTrue yield lvm.lvcreate('newlv', 4096, 'virtvg') at('newlv' in self.LVM['virtvg']) aq(self.LVM['virtvg']['newlv']['size'], 4096) class LVChangeTestCase(unittest.TestCase): def setUp(self): log.startLogging(sys.stdout) lvm.runProcess = self.runProcess self.LV_STATE = {'/dev/virtvg/lv': 0} def runProcess(self, cmd, args): at = self.assertTrue at('lvm' in cmd) at('lvchange' in args) skip = [] for pos, arg in enumerate(args): if arg == '--available' or arg == '-a': state_str = args[pos+1] state = int(state_str.endswith('y')) skip.append(pos + 1) elif pos not in skip: path = arg at(path in self.LV_STATE) self.LV_STATE[path] = state return defer.succeed(["", "", 0]) @defer.inlineCallbacks def testSimpleActiveLV(self): aq = self.assertEqual yield lvm.lvchange('/dev/virtvg/lv', available='y') aq(self.LV_STATE['/dev/virtvg/lv'], 1) agentd_template_store.py import sys import tempfile import shutil import os, os.path from twisted.trial import unittest from twisted.internet import reactor, defer from twisted.python import log from twisted.web import server, resource from virt_platform.agentd.file_loader import HTTPFileLoader from virt_platform.agentd.template_store import TemplateStore class Resource(resource.Resource): isLeaf = True trigger_state = 0 def render_GET(self, request): if request.uri == '/templates/template1.xml': return "template1_content" elif request.uri == '/templates/template2.json': return "template2_content" elif request.uri == '/non_standard/path/template3.xml': return "template3_content" else: raise NotImplementedError("URI: %r not found." % request.uri) class MockFileShareServer(object): def __init__(self, port): self.port = port self.site = server.Site(Resource()) def listen(self): # XXX: we must return deferred return reactor.listenTCP(self.port, self.site) class TemplateStoreTestCase(object): @defer.inlineCallbacks def testGet(self): aq = self.assertEqual template = yield self.template_store.get("template1") aq(template, "template1_content") @defer.inlineCallbacks def testNonStandardPath(self): aq = self.assertEqual template = yield self.template_store_non_standard_path.get("template3") aq(template, "template3_content") @defer.inlineCallbacks def testCustomExt(self): aq = self.assertEqual template = yield self.template_store_custom_ext.get("template2") aq(template, "template2_content") class TemplateStoreTestCase(TemplateStoreTestCase, unittest.TestCase): def setUp(self): log.startLogging(sys.stdout) port = MockFileShareServer(22345).listen() self.addCleanup(port.stopListening) self.file_loader = HTTPFileLoader("localhost", 22345) self.template_store = TemplateStore(self.file_loader) self.template_store_non_standard_path = TemplateStore(self.file_loader, 'non_standard/path') self.template_store_custom_ext = TemplateStore(self.file_loader, template_ext='.json') """ class FilesystemTemplateStoreTestCase(TemplateStoreTestCase, unittest.TestCase): def setUp(self): log.startLogging(sys.stdout) tmp = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, tmp) path = os.path.join(tmp, 'templates') os.mkdir(path) template1 = os.path.join(path, 'template1.xml') with file(template1, "w") as f: f.write("template1_content") template2 = os.path.join(path, 'template2.json') with file(template2, "w") as f: f.write("template2_content") template3 = os.path.join(path, 'template3.xml') with file(template3, "w") as f: f.write("template3_content") self.template_store = FilesystemTemplateStore(path) self.template_store_non_standard_path = self.template_store self.template_store_custom_ext = FilesystemTemplateStore(path, template_ext='.json') """ agentd_utils.py from twisted.trial import unittest from virt_platform.agentd.utils import devicesFromDomainXML xml_with_devices = \ """ """ xml_without_devices = \ """ """ class ExtractDevicesTestCase(unittest.TestCase): def testExtractDevices(self): aq = self.assertEqual result = devicesFromDomainXML(xml_with_devices) aq(len(result), 2) aq(result[0]['path'], '/dev/virtvg/vm_test') aq(result[0]['dev'], 'vda') aq(result[1]['path'], '/dev/virtvg/vm_test_add') aq(result[1]['dev'], 'vdb') def testDevicesNotExist(self): aq = self.assertEqual result = devicesFromDomainXML(xml_without_devices) aq(result, []) client_protocol.py import sys from twisted.trial import unittest from twisted.internet import reactor, defer from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ServerEndpoint, TCP4ClientEndpoint from twisted.python import log from virt_platform.protocol import Protocol from virt_platform.client.protocol import ClientProtocol from virt_platform.test.mock_agent import MockAgentFactory class ClientProtocolCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): log.startLogging(sys.stdout) server_factory = MockAgentFactory() server_factory.protocol = Protocol server_endpoint = TCP4ServerEndpoint(reactor, 12346) server_port = yield server_endpoint.listen(server_factory) client_factory = Factory() client_factory.protocol = ClientProtocol client_endpoint = TCP4ClientEndpoint(reactor, "localhost", 12346) client = yield client_endpoint.connect(client_factory) self.server_port = server_port self.client = client def tearDown(self): self.client.transport.loseConnection() self.server_port.stopListening() @defer.inlineCallbacks def testGetDomainInfoOK(self): aq = self.assertEqual info = yield self.client.getDomainInfo('vm_ok') aq(info['state'], 'running') mock_agent.py from twisted.internet.protocol import Factory from virt_platform.protocol import handler, create_response class MockAgentFactory(Factory): @handler def domain_get_info(self, proto, request): domain_name = request.data['domain_name'] result = self.getDomainInfo(domain_name) response = create_response(request, data=result) return response @handler def node_get_info(self, proto, request): result = self.getNodeInfo() return create_response(request, data=result) @handler def list_domains(self, proto, request): result = self.listDomains() return create_response(request, data=result) @handler def list_bridges(self, proto, request): result = [{'bridge_name': 'br0', 'interfaces': ['eth0'], 'status': 'down'}, {'bridge_name': 'br1', 'interfaces': ['eth1', 'vnet0'], 'status': 'up'}] return create_response(request, data=result) @handler def domain_define_xml(self, proto, request): return create_response(request) @handler def domain_define_template(self, proto, request): return create_response(request) @handler def domain_get_xml(self, proto, request): return create_response(request, data=" @handler def domain_start(self, proto, request): return create_response(request) @handler def domain_reboot(self, proto, request): return create_response(request) @handler def domain_shutdown(self, proto, request): return create_response(request) @handler def domain_destroy(self, proto, request): return create_response(request) @handler def domain_undefine(self, proto, request): return create_response(request) @handler def domain_migrate(self, proto, request): return create_response(request) @handler def domain_list_volumes(self, proto, request): return create_response(request, data=[{'volume_name': 'vm1'}, {'volume_name': 'backup'}]) @handler def storage_volume_aoe_export(self, proto, request): return create_response(request) @handler def storage_volume_aoe_import(self, proto, request): return create_response(request) @handler def storage_volume_create(self, proto, request): return create_response(request) @handler def storage_volume_list(self, proto, request): result = [{'volume_name': 'vm1', 'size': 10240}, {'volume_name': 'vm2', 'size': 123456}] return create_response(request, data=result) @handler def storage_image_write(self, proto, request): return create_response(request) @handler def storage_get_info(self, proto, request): result = {'total': 300000000, 'free': 102400000, 'used': 2400000, 'num_volumes': 10} return create_response(request, data=result) def listDomains(self): return [ {'name': 'vm1', 'status': 'active'}, {'name': 'vm2', 'status': 'inactive'} ] def getNodeInfo(self): return {'model': "x86_64", 'memory': 6124608, 'cpus': 4, 'mhz': 2400, 'nodes': 1, 'sockets': 1, 'cores': 4, 'threads': 1, } def getDomainInfo(self, name): return {'state': 'running', 'max_mem': 1024, 'memory': 1024, 'number_vcpu': 1, 'cpu_time': 0, } platform.py import sys from twisted.trial import unittest from twisted.internet import reactor, defer from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ServerEndpoint, TCP4ClientEndpoint from twisted.python import log from virt_platform import VirtPlatform from virt_platform.config import PlatformConfig from virt_platform.protocol import Protocol from virt_platform.client.protocol import ClientProtocol from virt_platform.test.mock_agent import MockAgentFactory class MockPlatformConfigError(Exception): pass class MockPlatformConfig(PlatformConfig): def getNodeAddress(self, node_name): if node_name == 'rageguy': return "localhost:12347" elif node_name == 'trollface': return "localhost:12348" else: raise MockPlatformConfigError("Node %r not found." % node_name) class VirtPlatformProtocolCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): log.startLogging(sys.stdout) yield self.startAgents() self.vp = VirtPlatform(MockPlatformConfig()) @defer.inlineCallbacks def startAgents(self): self.rageguy = yield self.startAgent(12347) self.trollface = yield self.startAgent(12348) def stopAgents(self): self.rageguy.stopListening() self.trollface.stopListening() @defer.inlineCallbacks def startAgent(self, port): factory = MockAgentFactory() factory.protocol = Protocol endpoint = TCP4ServerEndpoint(reactor, port) port = yield endpoint.listen(factory) defer.returnValue(port) def tearDown(self): self.stopAgents() self.vp.agent_pool.disconnectAll() @defer.inlineCallbacks def testGetDomainInfo(self): domain_info = yield self.vp.getDomainInfo('rageguy', 'test') protocol.py import sys from twisted.trial import unittest from twisted.internet import reactor, defer from twisted.internet.protocol import ClientCreator, ServerFactory from twisted.python import log from virt_platform.protocol import Protocol, Request, Response, create_response, handler class EchoFactory(ServerFactory): def not_handler(self, proto, request): raise Exception("This must not happen.") def echo(self, proto, request): return create_response(request, data=request.data, error_msg="Success") echo.is_handler = True @handler def dec_echo(self, proto, request): return create_response(request, data=request.data) @handler def def_echo(self, proto, request): return defer.succeed(create_response(request, data=request.data)) @handler def broken_handler(self, proto, request): raise ValueError("This is trollface.") class ProtocolTest(unittest.TestCase): @defer.inlineCallbacks def setUp(self): log.startLogging(sys.stdout) factory = EchoFactory() factory.protocol = Protocol listener = reactor.listenTCP(12345, factory) self.addCleanup(listener.stopListening) clientCreator = ClientCreator(reactor, Protocol) self.client = yield clientCreator.connectTCP('localhost', 12345) self.addCleanup(self.client.transport.loseConnection) @defer.inlineCallbacks def test_echo(self): aq = self.assertEqual request = Request('echo', "echo-message") log.msg("test_echo: send request: %r" % request) response = yield self.client.sendRequest(request) log.msg("test_echo: received response: %r" % response) aq(response.code, 200) aq(response.data, "echo-message") @defer.inlineCallbacks def test_dec_echo(self): aq = self.assertEqual request = Request('dec_echo', 'data') response = yield self.client.sendRequest(request) aq(response.code, 200) aq(response.data, 'data') @defer.inlineCallbacks def test_def_echo(self): aq = self.assertEqual request = Request('def_echo', 'data') response = yield self.client.sendRequest(request) aq(response.code, 200) aq(response.data, 'data') def test_create_response(self): aq = self.assertEqual request = Request('test') request.sequence_number = 1 response = create_response(request, data="data", error_msg="hello world!") aq(request.sequence_number, response.sequence_number) aq(response.data, "data") aq(response.error_msg, "hello world!") @defer.inlineCallbacks def test_unknown_request(self): aq = self.assertEqual request = Request('unknown') response = yield self.client.sendRequest(request) aq(response.code, 501) request = Request('not_handler') response = yield self.client.sendRequest(request) aq(response.code, 501) def test_unknown_response(self): response = Response() response.sequence_number = 1 self.client.sendResponse(response) @defer.inlineCallbacks def test_broken_handler(self): aq = self.assertEqual request = Request('broken_handler') response = yield self.client.sendRequest(request) aq(response.code, 500) scripts/agent_service.py import sys from virt_platform.test.mock_agent import MockAgentFactory from twisted.internet.endpoints import TCP4ServerEndpoint from virt_platform.protocol import Protocol from twisted.internet import reactor from twisted.python import log log.startLogging(sys.stdout) def create_service(port): server_factory = MockAgentFactory() server_factory.protocol = Protocol server_endpoint = TCP4ServerEndpoint(reactor, port) server_endpoint.listen(server_factory) if __name__ == '__main__': create_service(12346) create_service(12345) create_service(12347) reactor.run() testnode/testdome.xml 4dea22b31d52d8f32516782e98ab3fa0 --> testnode/testnode.xml This file gives an example config for the mock 'test' backend driver to libvirt. This is intended to allow reliable unit testing of applications using libvirt. --> type="dir"> /default-pool |
«Название темы» Обязательные структурные элементы отчета о нир выделены полужирным шрифтом. В отчет о нир объемом не более 10 страниц содержание... | Реферат Тема нир Тема нир: Исследование возможности принудительного радиоактивного распада ядер вольфрама | ||
Реферат Тема нир Тема нир: Исследование возможности принудительного радиоактивного распада ядер вольфрама | Реферат Тема нир Тема нир: Анализ экономического механизма и разработка критериев эффективности системы консалтинга для издательско-полиграфического... | ||
Реферат Тема нир ... | Реферат Тема нир ... | ||
Реферат Тема нир Тема нир: Анализ экономического механизма и разработка критериев эффективности системы консалтинга для издательско-полиграфического... | Реферат Тема нир Тема нир: Математическое моделирование и исследование квазипериодических структур | ||
Реферат Тема нир Тема нир: Математическое моделирование и исследование квазипериодических структур | Реферат Тема нир Тема нир: Разработка способов снижения износа и повышения химической стойкости резинотехнических изделий для полиграфии и других... | ||
Реферат Тема нир Тема нир: Методология определения цветового и структурного соответствия при сравнении изображений, воспроизведенных на носителях... | Рекомендации по предотвращению типовых недостатков в оформлении итоговых... Нир, выполненных по Программе фи, необходимо соблюдать требования гост 32-2001. По результатам внутренней экспертизы итоговых отчетов... | ||
Рекомендации по предотвращению типовых недостатков в оформлении итоговых... Нир, выполненных по Программе фи, необходимо соблюдать требования гост 32-2001. По результатам внутренней экспертизы итоговых отчетов... | План нир управления инновационного развития Совершенствование моделей краткосрочного прогнозирования социально-экономического развития Российской Федерации | ||
Отчет о нир (заключительный) Рекомендации по оформлению отчетной документации по государственным контрактам на выполнение нир в рамках федеральной целевой программы... | Реферат с элементами нир, представляемый на Конкурс может содержать следующие разделы К положению об организации и проведении конкурса на лучший реферат с элементами нир для студентов младших курсов в рамках ниу |