Скачать 1.13 Mb.
|
Приложение В Клиентская часть ___________________ Аннотация Данное ПО представляет собой клиентскую библиотеку на языке Python, которая использует протокол VirtPlatform и реализует клиентское приложение для работы в терминале. _____________________ ! * Наименование программы: VirtPlatform Client ! * Авторы: Харин Илья, Соколков Роман, Гуреев Илья ! * Версия: 1.0 ! * Дата создания: 13 сентября 2011 ! * Дата последнего изменения: 25 октября 2011 __init__.py protocol.py from twisted.internet import reactor, defer from twisted.python import log from virt_platform.protocol import Protocol, Request class ClientProtocolError(Exception): def __init__(self, code, data, error_msg): self.code = code self.data = data self.error_msg = error_msg def __str__(self): return "Error code: %s, data: %s, error_msg: %s" %\ (self.code, self.data, self.error_msg) class ClientProtocol(Protocol): def connectionLost(self, reason): self.msg("Connection lost. Reason: %r" % reason.value) self.connected = 0 def msg(self, msg): # TODO: add info about protocol instance to log log.msg("ClientProtocol | %s " % msg) @defer.inlineCallbacks def doRequest(self, method, data=None): request = Request(method, data) self.msg("Send request: %r" % request) response = yield self.sendRequest(request) self.msg("Received response %r for request %r" % (response, request)) # TODO: error handling (check response.code) if response.code != 200: raise ClientProtocolError(response.code, response.data, response.error_msg) defer.returnValue(response.data) @defer.inlineCallbacks def getNodeInfo(self): result = yield self.doRequest('node_get_info') defer.returnValue(result) @defer.inlineCallbacks def listDomains(self): result = yield self.doRequest('list_domains') defer.returnValue(result) @defer.inlineCallbacks def listBridges(self): result = yield self.doRequest('list_bridges') defer.returnValue(result) @defer.inlineCallbacks def getDomainInfo(self, name): result = yield self.doRequest('domain_get_info', {'domain_name': name}) defer.returnValue(result) @defer.inlineCallbacks def defineDomainXML(self, xml): result = yield self.doRequest('domain_define_xml', {'xml': xml}) defer.returnValue(result) @defer.inlineCallbacks def getDomainXML(self, name): result = yield self.doRequest('domain_get_xml', {'domain_name': name}) defer.returnValue(result) @defer.inlineCallbacks def defineDomainTemplate(self, template_name, domain_name, memory, number_vcpu, volume_name, network_bridge): yield self.doRequest('domain_define_template', {'template_name': template_name, 'domain_name': domain_name, 'memory': memory, 'number_vcpu': number_vcpu, 'volume_name': volume_name, 'network_bridge': network_bridge}) @defer.inlineCallbacks def startDomain(self, name): yield self.doRequest('domain_start', {'domain_name': name}) @defer.inlineCallbacks def rebootDomain(self, name): yield self.doRequest('domain_reboot', {'domain_name': name}) @defer.inlineCallbacks def shutdownDomain(self, name): yield self.doRequest('domain_shutdown', {'domain_name': name}) @defer.inlineCallbacks def destroyDomain(self, name): yield self.doRequest('domain_destroy', {'domain_name': name}) @defer.inlineCallbacks def undefineDomain(self, name): yield self.doRequest('domain_undefine', {'domain_name': name}) @defer.inlineCallbacks def suspendDomain(self, name): yield self.doRequest('domain_suspend', {'domain_name': name}) @defer.inlineCallbacks def resumeDomain(self, name): yield self.doRequest('domain_resume', {'domain_name': name}) @defer.inlineCallbacks def migrateDomain(self, name, dest, live, persist_dest, undefine_source, non_shared_disk): yield self.doRequest('domain_migrate', {'domain_name': name, 'dest_addr': dest, 'live': live, 'persist_dest': persist_dest, 'undefine_source': undefine_source, 'non_shared_disk': non_shared_disk,}) @defer.inlineCallbacks def listDomainStorageVolumes(self, name): result = yield self.doRequest('domain_list_volumes', {'domain_name': name}) defer.returnValue(result) @defer.inlineCallbacks def exportStorageVolume(self, volume): result = yield self.doRequest('storage_volume_aoe_export', {'volume_name': volume}) defer.returnValue(result) @defer.inlineCallbacks def importStorageVolume(self, shelf, slot, filename): yield self.doRequest('storage_volume_aoe_import', { 'shelf': shelf, 'slot': slot, 'filename': filename, }) @defer.inlineCallbacks def createStorageVolume(self, volume, size): yield self.doRequest('storage_volume_create', {'volume_name': volume, 'size': size}) @defer.inlineCallbacks def removeStorageVolume(self, volume): yield self.doRequest('storage_volume_remove', {'volume_name': volume}) @defer.inlineCallbacks def extendStorageVolume(self, volume, size, mode): yield self.doRequest('storage_volume_extend', {'volume_name': volume, 'size': size, 'mode': mode}) @defer.inlineCallbacks def resizeStorageVolumeFS(self, volume): yield self.doRequest('storage_volume_resizefs', {'volume_name': volume}) @defer.inlineCallbacks def activateStorageVolume(self, volume): yield self.doRequest('storage_volume_activate', {'volume_name': volume}) @defer.inlineCallbacks def deactivateStorageVolume(self, volume): yield self.doRequest('storage_volume_deactivate', {'volume_name': volume}) @defer.inlineCallbacks def listStorageVolumes(self): result = yield self.doRequest('storage_volume_list') defer.returnValue(result) @defer.inlineCallbacks def writeStorageImage(self, image, volume): yield self.doRequest('storage_image_write', {'image_name': image, 'volume_name': volume}) @defer.inlineCallbacks def getStorageInfo(self): result = yield self.doRequest('storage_get_info') defer.returnValue(result) Приложение Г Серверная часть ___________________ Приложение представляет собой модуль на языке Python, а так же серверное ПО в виде агента и содержит скрипты для управления данным сервисом в операционной системе Fedora. _____________________ ! * Наименование программы: VirtPlatform Agent ! * Авторы: Харин Илья, Соколков Роман, Гуреев Илья ! * Версия: 1.0 ! * Дата создания: 13 сентября 2011 ! * Дата последнего изменения: 25 октября 2011 __init__.py aoe.py from twisted.internet import utils, defer, protocol from twisted.internet.error import ProcessTerminated, ProcessDone from twisted.python import log import StringIO from virt_platform.agentd.utils import runProcess, CommandError from virt_platform.utils import ScatterDeferred # this module provided no production method only for test AOE_DISCOVER_EXEC = '/usr/sbin/aoe-discover' AOE_PING_EXEC = '/usr/sbin/aoeping' class AoeError(Exception): pass class AoeCommandError(CommandError, AoeError): pass def getReactor(reactor): if reactor is None: from twisted.internet import reactor return reactor return reactor @defer.inlineCallbacks def importStorageVolume(shelf, slot, netif): (stdout, stderr, exitCode) = yield runProcess(AOE_DISCOVER_EXEC, args=()) if exitCode: raise AoeCommandError(exitCode, stdout, stderr) args = ('-s2', shelf, slot, netif) (stdout, stderr, exitCode) = yield runProcess(AOE_PING_EXEC, args=args) if exitCode: raise AoeCommandError(exitCode, stdout, stderr) defer.returnValue(None) class VbladeProcessProtocol(protocol.ProcessProtocol): def __init__(self, manager, slot, filename, reactor): self.manager = manager self.slot = slot self.filename = filename self.reactor = reactor self.stdout = StringIO.StringIO() self.stderr = StringIO.StringIO() def __repr__(self): return " (self.manager.shelf, self.slot, self.manager.netif, self.filename) def msg(self, msg): log.msg("VbladeProcess | %r | %s" % (self, msg)) def outReceived(self, data): self.msg("Received stdout data %r" % data) self.stdout.write(data) def errReceived(self, data): self.msg("Received stderr data %r" % data) self.stderr.write(data) def processStarted(self): self.msg('Started.') self.manager.processStarted(self) def connectionMade(self): # XXX: we are waiting for one second after connectionMade called and # believe the process is successfully launched or it ended self.reactor.callLater(1, self.processStarted) def processEnded(self, reason): self.msg("Ended with reason %r." % reason) self.manager.processEnded(self, reason, self.stdout.getvalue(), self.stderr.getvalue()) class VbladeManager(object): execFile = '/usr/sbin/vblade' protocol = VbladeProcessProtocol def __init__(self, shelf, netif, reactor=None): self.shelf = shelf self.netif = netif self.reactor = reactor self.processes = {} self._pending_processes = {} def msg(self, msg): log.msg("VbladeManager | %s" % msg) def processStarted(self, vblade): d = self._pending_processes.pop(vblade.slot, None) if d is not None and not d.called: self.msg("Process %r started." % vblade) self.processes[vblade.slot] = vblade d.callback({'shelf': self.shelf, 'slot': vblade.slot, 'filename': vblade.filename}) else: self.msg("Process started %r but already stopped.") def processEnded(self, vblade, reason, stdout, stderr): self.msg("Process %r ended with reason %r" % (vblade, reason)) if isinstance(reason.value, ProcessTerminated): self.msg("Process %r terminated with signal = %s, exitCode = %s" % \ (vblade, reason.value.signal, reason.value.exitCode)) error = AoeCommandError(reason.value.exitCode, stdout, stderr) elif isinstance(reason.value, ProcessDone): error = NotImplementedError("Process successfully ended") d = self._pending_processes.pop(vblade.slot, None) if d is not None and not d.called: d.errback(error) def _startProcess(self, slot, filename): reactor = getReactor(self.reactor) protocol = self.protocol(self, slot, filename, reactor) self.msg("Starting vblade process %r" % protocol) args = (self.execFile, str(self.shelf), str(slot), self.netif, filename) reactor.spawnProcess(protocol, self.execFile, args=args) return protocol def export(self, slot, filename): vblade = self.processes.get(slot) if vblade is not None: return defer.succeed({'shelf': self.shelf, 'slot': vblade.slot, 'filename': vblade.filename}) d = defer.Deferred() scatter = self._pending_processes.get(slot) if scatter is not None: scatter.add(d) else: self._pending_processes[slot] = scatter = ScatterDeferred() scatter.add(d) try: self._startProcess(slot, filename) except Exception as error: # XXX: catch exceptions which only may be produced by reactor.spawnProcess self.msg("Process not runned: %r" % error) self._pending_processes.pop(slot, None) raise error return d file_loader.py import tempfile import os.path from twisted.internet import defer from twisted.web.client import getPage, downloadPage from twisted.python import failure from twisted.python import log from virt_platform.utils import ScatterDeferred class FileLoader(object): def download(self, uri): """Downloads file and returns deferred which will callback with file path on filesystem""" raise NotImplementedError def get(self, uri): """Downloads file and returns deferred which will callback with file content""" raise NotImplementedError class HTTPFileLoader(FileLoader): agent = "VirtPlatform HTTP FileLoader" default_port = 80 def __init__(self, host, port=None, cachedir=None): self.host = host self.port = port or self.default_port if not cachedir: cachedir = tempfile.mkdtemp() self.cachedir = cachedir self._pending_downloads = {} def msg(self, msg): log.msg("HTTPFIleLoader | %s" % msg) def createURL(self, uri): return "http://%s:%s/%s" % (self.host, self.port, uri) def createPath(self, uri): path = uri.split('/') file = os.path.join(self.cachedir, *path) return file def download(self, uri): def remove(result, uri): self._pending_downloads.pop(uri) if isinstance(result, failure.Failure): self.msg("download(uri=%r) failed: %r." % (uri, result.value)) result.raiseException() else: self.msg("Result of download(uri=%r): %s (size: %s, mtime: %r, stat: %r)." %\ (uri, result, os.path.getsize(result), os.path.getmtime(result), os.stat(result))) return result self.msg("Called download(uri=%r)." % uri) if uri in self._pending_downloads: self.msg("Downloading %r already started. Merge request." % uri) scatter = self._pending_downloads[uri] else: dl = self._download(uri) scatter = ScatterDeferred() self._pending_downloads[uri] = scatter dl.addBoth(remove, uri) dl.chainDeferred(scatter) d = defer.Deferred() scatter.add(d) return d def flushCache(self, uri): path = self.createPath(uri) os.remove(path) return defer.succeed(None) @defer.inlineCallbacks def _download(self, uri): url = self.createURL(uri) file = self.createPath(uri) if os.path.exists(file): self.msg("%r already exists. Return cached file." % uri) else: self.msg("Performing downloading %r from %r." % (uri, url)) self._prepareDownloading(file) # XXX: twisted.web doesn't support unicode url = url.encode('utf-8') yield downloadPage(url, file, agent=self.agent) defer.returnValue(file) def _prepareDownloading(self, file): dir = os.path.dirname(file) if not os.path.exists(dir): os.makedirs(dir) @defer.inlineCallbacks def get(self, uri): self.msg("Called get(url=%r)." % uri) url = self.createURL(uri) self.msg("Get content of %r from %r." % (uri, url)) # XXX: twisted.web doesn't support unicode url = url.encode('utf-8') result = yield getPage(url, agent=self.agent) self.msg("Result of get(uri=%r): %r." % (uri, result)) defer.returnValue(result) lvm.py import os.path from twisted.internet import defer from twisted.python import log from virt_platform.agentd.utils import runProcess LVM_EXEC = "/sbin/lvm" class LVMError(Exception): pass class LVMCommandError(LVMError): def __init__(self, code, stdout, stderr): Exception.__init__(self, code, stdout, stderr) self.code = code self.stdout = stdout self.stderr = stderr class LVNotFound(LVMError): pass def msg(msg): log.msg("LVM | %s" % msg) @defer.inlineCallbacks def lvdisplay(vg=None, lv=None): """Obtain attributes of a logical volumes. To obtain LV size do 'num_le * extent_size'. Extent size is LV's volume group attribute, which available from vgdisplay(). """ args = ['lvdisplay', '-c'] if vg: args.append(vg) stdout, stderr, exitCode = yield runProcess(LVM_EXEC, args) # XXX: analyze error code and raise more specific exceptions. if exitCode: raise LVMCommandError(exitCode, stdout, stderr) result = [] for row in stdout.split(): cols = row.split(':') path = cols[0] name = os.path.basename(path) info = { 'name': name, 'path': path, 'vgname': cols[1], 'open_count': int(cols[5]), 'associated_extents': int(cols[7]), 'minor_number': int(cols[12]), } if lv == name: defer.returnValue(info) if lv is None: result.append(info) if lv is not None: raise LVNotFound("Logical volume %r not found (volume group: %r)." % (lv, vg if vg else '*')) defer.returnValue(result) @defer.inlineCallbacks def vgdisplay(vg=None): args = ['vgdisplay', '-c'] if vg: args.append(vg) stdout, stderr, exitCode = yield runProcess(LVM_EXEC, args) # XXX: analyze error code and raise more specific exception. if exitCode: raise LVMCommandError(exitCode, stdout, stderr) result = [] for row in stdout.split(): cols = row.split(':') info = { 'name': cols[0], 'number_lv': int(cols[5]), 'size': int(cols[11]), 'extent_size': int(cols[12]), } if vg is not None: defer.returnValue(info) result.append(info) defer.returnValue(result) @defer.inlineCallbacks def lvcreate(name, size, vg): """Creates logical volume. size -- lv size, in kilobytes. """ size = '%iK' % size args = ['lvcreate', '--name', name, '--size', size, vg] stdout, stderr, exitCode = yield runProcess(LVM_EXEC, args) # XXX: analyze error code and raise more specific exception. if exitCode: raise LVMCommandError(exitCode, stdout, stderr) msg("Logical volume in VG %r created: name %r, size %r." % (vg, name, size)) @defer.inlineCallbacks def lvchange(path, available=None): args = ['lvchange'] if available: args.append('--available') args.append(available) args.append(path) stdout, stderr, exitCode = yield runProcess(LVM_EXEC, args) # XXX: analyze error code and raise more specific exception. if exitCode: raise LVMCommandError(exitCode, stdout, stderr) msg("Changed attributes of Logical volume with path %r: available: %r." % (path, available)) network.py from twisted.internet import utils, defer BRCTL_EXEC = '/usr/sbin/brctl' IP_EXEC = '/sbin/ip' class NetworkError(Exception): pass def runProcess(execute, args=(), resultParser=None): d = utils.getProcessOutputAndValue(execute, args=args) d.addBoth(gotResult, resultParser) return d def gotResult(result, resultParser=None): (stdout, stderr, exitCode) = result if exitCode: raise NetworkError(exitCode, stdout, stderr) if resultParser is not None: return resultParser(stdout, stderr) return None def completeResult(result, bridges): data = [] for (success, value) in result: if success: (bridge, interfaces) = bridges.pop(0) data.append({ 'bridge_name': bridge, 'interfaces': interfaces, 'status': value }) else: raise value return data def listBridges(): def parseBROutput(stdout, stderr): bridges = [] interfaces = [] processes = [] bridge = None lines = stdout.split('\n')[1:-1] #skip unimportant lines for line in lines: line = line.split() if len(line) == 3 or len(line) == 4: bridge, interfaces = line[0], [] bridges.append((bridge, interfaces)) if len(line) == 4: interfaces.append(line[3]) args = ('-oneline', 'link', 'show', bridge) p = runProcess(IP_EXEC, args=args, resultParser=parseIPOutput) processes.append(p) elif len(line) == 1: interfaces.append(line[0]) else: raise NetworkError("Parsing brctl output failed, may be output format change?", stdout, stderr) d = defer.DeferredList(processes).addCallback(completeResult, bridges) return d def parseIPOutput(stdout, stderr): if 'UP' in stdout: return 'up' return 'down' return runProcess(BRCTL_EXEC, args=('show',), resultParser=parseBROutput) service.py import socket from twisted.internet import defer from twisted.internet.protocol import ServerFactory from twisted.python import failure, log import libvirt import os from virt_platform.protocol import Protocol, create_response, handler from virt_platform.agentd.storage import StorageError from virt_platform.agentd import txlibvirt, aoe, network, lvm, utils class ConnectionPool(object): def __init__(self, url): self._free = [] self.url = url def msg(self, msg): log.msg("ConnectionPool | %s" % msg) def get(self): conn = None if self._free: conn = self._free.pop() if not self._is_alive(conn): conn = None if not conn: conn = self._create_new() return conn def free(self, conn): self._free.append(conn) def _create_new(self): # XXX: blocking call, don't care now conn = libvirt.open(self.url) self.msg("Created new connection to libvirt: %r." % conn) return conn def _is_alive(self, conn): # XXX: very simple check try: conn.getInfo() except Exception, e: self.msg("WARNING: stale connection: %r (%r)." % (conn, e)) return False return True class AgentFactory(ServerFactory): protocol = Protocol default_url = 'qemu:///system' default_url_format = 'qemu+ssh://%s/system' node_name = socket.getfqdn() def __init__(self, virt_platform, storage, templates, vblade, url=None, url_format=None, node_name=None): self.connection_pool = ConnectionPool(url or self.default_url) self.url_format = url_format or self.default_url_format self.virt_platform = virt_platform self.storage = storage self.templates = templates self.vblade = vblade if node_name is not None: self.node_name = node_name self.msg("Agent %r constructed." % self.node_name) def msg(self, msg): log.msg("AgentFactory | %s" % msg) def gotResult(self, result, request, code=None): if isinstance(result, failure.Failure): result.trap(libvirt.libvirtError, StorageError) if isinstance(result.value, libvirt.libvirtError): data = txlibvirt.libvirtErrorToDict(result.value) if data['level'] not in (libvirt.VIR_ERR_NONE, libvirt.VIR_ERR_WARNING): code = 550 return create_response(request, data=data, code=code, error_msg=data['message']) if isinstance(result.value, (StorageError, aoe.AoeCommandError, network.NetworkError)): error_msg = str(result.value) return create_response(request, code=500, error_msg=error_msg) else: return create_response(request, data=result, code=code) def freeConnection(self, conn): self.connection_pool.free(conn) def freeConnectionCb(self, result, conn): self.freeConnection(conn) return result def getConnection(self): return self.connection_pool.get() @defer.inlineCallbacks def getSiteStorageAgents(self, fail_on_error=False): site = yield self.virt_platform.getNodeSite(self.node_name) nodes = yield self.virt_platform.listNodesBySite(site) nodes.remove(self.node_name) dl = [] for node in nodes: # XXX: something is going wrong now. Don't worry. We'll fix it later. d = self.virt_platform._getLibvirtAgent(node) dl.append(d) consumeErrors = not fail_on_error result = yield defer.DeferredList(dl, consumeErrors=consumeErrors) agents = [] for success, agent in result: if not success: self.msg("Could not get remote agent: %r." % agent.value) agent.printDetailedTraceback() continue agents.append(agent) defer.returnValue(agents) @handler def node_get_info(self, proto, request): conn = self.getConnection() d = txlibvirt.nodeGetInfo(conn) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler def list_domains(self, proto, request): conn = self.getConnection() d = txlibvirt.listDomains(conn) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler def list_bridges(self, proto, request): d = network.listBridges() d.addBoth(self.gotResult, request) return d @handler def domain_get_info(self, proto, request): conn = self.getConnection() d = txlibvirt.domainGetInfo(conn, request.data['domain_name']) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler @defer.inlineCallbacks def domain_list_volumes(self, proto, request): domain_name = request.data['domain_name'] conn = self.getConnection() try: domain_xml = yield txlibvirt.domainGetXML(conn, domain_name) except txlibvirt.libvirtError as error: # XXX: copy-past programming, part of gotResult self.msg("Get XML for domain %s failed: %r" % (domain_name, error)) error_info = txlibvirt.libvirtErrorToDict(result.value) if error_info['level'] in (libvirt.VIR_ERR_NONE, libvirt.VIR_ERR_WARNING): code = 500 else: code = 550 defer.returnValue(create_response(request, data=error_info, code=code, error_msg=data['message'])) finally: self.freeConnection(conn) try: devices_from_xml = utils.devicesFromDomainXML(domain_xml) except utils.ExtractDevicesError as error: error_msg = "Extract devices from XML of domain %s failed: %r" % \ (domain_name, error) self.msg(error_msg) defer.returnValue(create_response(request, code=500, error_msg=error_msg)) try: # XXX: get information directly from lvm.lvdisplay not from storage, # replace it when storage.list be able to return more additional information, # e.g. full path to volume or name by full path # XXX: HAAAACK volumes = yield lvm.lvdisplay(vg=self.storage.vg_name) except lvm.LVMCommandError as error: error_msg = "Get volumes for vg %s failed with error: %r" % (self.storage.vg_name, error) self.msg(error_msg) defer.returnValue(create_response(request, code=500, error_msg=error_msg)) devices = [] for device in devices_from_xml: found = False for volume in volumes: if device['path'] == volume['path']: found = True devices.append({ 'volume_name': volume['name'] }) break if not found: error_msg = "WARNING: device %r declared in XML but not exist in storage %r for domain %s" % \ (device, volumes, domain_name) self.msg(error_msg) defer.returnValue(create_response(request, data=devices)) @handler def domain_define_xml(self, proto, request): conn = self.getConnection() d = txlibvirt.defineXML(conn, request.data['xml']) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler def domain_define_template(self, proto, request): def gotLVPath(lvpath, data): data['disk'] = lvpath d = self.templates.get(data.pop('template_name')) d.addCallback(gotTemplate, data) return d def gotTemplate(template, data): config = template % data conn = self.getConnection() d = txlibvirt.defineXML(conn, config) d.addBoth(self.freeConnectionCb, conn) d.addCallback(self.gotResult, request) return d data = request.data.copy() # XXX: storage backend-specific function d = self.storage._getLVPath(data['volume_name']) d.addCallback(gotLVPath, data) d.addErrback(self.gotResult, request) return d @handler def domain_get_xml(self, proto, request): conn = self.getConnection() d = txlibvirt.domainGetXML(conn, request.data['domain_name']) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler def domain_start(self, proto, request): conn = self.getConnection() d = txlibvirt.domainStart(conn, request.data['domain_name']) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler def domain_reboot(self, proto, request): conn = self.getConnection() d = txlibvirt.domainReboot(conn, request.data['domain_name']) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler def domain_shutdown(self, proto, request): conn = self.getConnection() d = txlibvirt.domainShutdown(conn, request.data['domain_name']) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler def domain_destroy(self, proto, request): conn = self.getConnection() d = txlibvirt.domainDestroy(conn, request.data['domain_name']) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler def domain_undefine(self, proto, request): conn = self.getConnection() d = txlibvirt.domainUndefine(conn, request.data['domain_name']) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler def domain_suspend(self, proto, request): conn = self.getConnection() d = txlibvirt.domainSuspend(conn, request.data['domain_name']) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler def domain_resume(self, proto, request): conn = self.getConnection() d = txlibvirt.domainResume(conn, request.data['domain_name']) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler def domain_migrate(self, proto, request): # XXX: extracting address hardcoded #url = self.url_format % request.data['dest_addr'] dest_addr = request.data['dest_addr'] if ':' in dest_addr: dest_addr = dest_addr.split(':')[0] conn = self.getConnection() d = txlibvirt.domainMigrateToURL(conn, request.data['domain_name'], self.url_format % dest_addr, request.data['live'], request.data['persist_dest'], request.data['undefine_source'], request.data['non_shared_disk'] ) d.addBoth(self.freeConnectionCb, conn) d.addBoth(self.gotResult, request) return d @handler @defer.inlineCallbacks def storage_volume_aoe_export(self, proto, request): # XXX: this operation hardcoded for lvm storage volume_name = request.data['volume_name'] try: info = yield lvm.lvdisplay(vg=self.storage.vg_name, lv=volume_name) except lvm.LVMError as error: error_msg = "Could not get info about storage volume %s: %r." % (volume_name, error) self.msg(error_msg) defer.returnValue(create_response(request, code=500, error_msg=error_msg)) try: data = yield self.vblade.export(info['minor_number'], info['path']) except aoe.AoeError as error: error_msg = "Export volume %s failed: %r." % (volume_name, error) self.msg(error_msg) defer.returnValue(create_response(request, code=500, error_msg=error_msg)) self.msg("Volume %r successfully exported." % volume_name) defer.returnValue(create_response(request, data=data)) @handler @defer.inlineCallbacks def storage_volume_aoe_import(self, proto, request): filename = request.data['filename'] shelf = request.data['shelf'] slot = request.data['slot'] try: yield aoe.importStorageVolume(shelf, slot, self.vblade.netif) except aoe.AoeError as error: error_msg = "Import volume with shelf=%s and slot=%s failed: %r." % \ (self, slot, error) self.msg(error_msg) defer.returnValue(create_response(request, code=500, error_msg=error_msg)) source = "/dev/etherd/e%d.%d" % (shelf, slot) if not os.path.exists(source): error_msg = "Volume shelf=%s and slot=%s imported but block device %s does not exist." % \ (self, slot, source) self.msg(error_msg) defer.returnValue(create_response(request, code=500, error_msg=error_msg)) if not os.path.exists(filename): basedir = os.path.dirname(filename) if not os.path.exists(basedir): os.makedirs(basedir) try: os.symlink(source, filename) except OSError as error: error_msg = "Create symbolic link $s -> %s failed: %r" % (source, link, error) self.msg(error_msg) defer.returnValue(create_response(request, code=500, error_msg=error_msg)) defer.returnValue(create_response(request)) @handler @defer.inlineCallbacks def storage_volume_create(self, proto, request): volume_name = request.data['volume_name'] try: yield self.storage.create(volume_name, request.data['size']) except Exception, e: # XXX: We should construct better error messages (especially for lvm exceptions). error_msg = "Could not create storage volume: %r" % e self.msg(error_msg) defer.returnValue(create_response(request, code=500, error_msg=error_msg)) self.msg("Volume %r successfully created." % volume_name) try: yield self._activate_remote_volumes(volume_name) except Exception, e: error_msg = "Activation remotes of %r volume failed: %r." % (volume_name, e) self.msg(error_msg) defer.returnValue(create_response(request, code=500, error_msg=error_msg)) defer.returnValue(create_response(request)) @defer.inlineCallbacks def _activate_remote_volumes(self, volume_name): agents = yield self.getSiteStorageAgents() dl = [] for agent in agents: d = agent.activateStorageVolume(volume_name) dl.append(d) yield defer.DeferredList(dl, fireOnOneErrback=True) self.msg("Remotes of %r volume successfully activated." % volume_name) @handler @defer.inlineCallbacks def storage_volume_remove(self, proto, request): volume_name = request.data['volume_name'] try: yield self._deactivate_remote_volumes(volume_name) except Exception, e: error_msg = "Deactivation remotes %r volume failed: %r." % (volume_name, e) self.msg(error_msg) self._activate_remote_volumes(volume_name) defer.returnValue(create_response(request, code=500, error_msg=error_msg)) try: yield self.storage.remove(volume_name) except Exception, e: error_msg = "Removing of %r volume failed: %r." % (volume_name, e) self.msg(error_msg) defer.returnValue(create_response(request, code=500, error_msg=error_msg)) self.msg("Volume %r successfully removed." % volume_name) defer.returnValue(create_response(request)) @defer.inlineCallbacks def _deactivate_remote_volumes(self, volume_name): agents = yield self.getSiteStorageAgents() dl = [] for agent in agents: d = agent.deactivateStorageVolume(volume_name) dl.append(d) yield defer.DeferredList(dl, fireOnOneErrback=True) self.msg("Remotes of %r volume successfully deactivated." % volume_name) @handler def storage_volume_extend(self, proto, request): append_mode = request.data['mode'] == 'add' d = self.storage.extend(request.data['volume_name'], request.data['size'], append_mode=append_mode) d.addBoth(self.gotResult, request) return d @handler def storage_volume_resizefs(self, proto, request): d = self.storage.resizefs(request.data['volume_name']) d.addBoth(self.gotResult, request) return d @handler def storage_volume_list(self, proto, request): return self.storage.list().addBoth(self.gotResult, request) @handler def storage_get_info(self, proto, request): return self.storage.info().addBoth(self.gotResult, request) @handler @defer.inlineCallbacks def storage_image_write(self, proto, request): volume_name = request.data['volume_name'] image_name = request.data['image_name'] self.msg("Received request to write image %r to %r volume." % (image_name, volume_name)) try: yield self._deactivate_remote_volumes(volume_name) except Exception, e: # XXX: looks like pattern. May be it's time to think how to move this code to one place? error_msg = "Deactivation remotes %r volume failed: %r." % (volume_name, e) self.msg(error_msg) self._activate_remote_volumes(volume_name) defer.returnValue(create_response(request, code=500, error_msg=error_msg)) yield self.storage.write(volume_name, image_name) yield self._activate_remote_volumes(volume_name) self.msg("Image %r successfully written to volume %r." % (image_name, volume_name)) defer.returnValue(create_response(request)) @handler def storage_volume_activate(self, proto, request): d = self.storage.activate(request.data['volume_name']) d.addBoth(self.gotResult, request) return d @handler def storage_volume_deactivate(self, proto, request): d = self.storage.deactivate(request.data['volume_name']) d.addBoth(self.gotResult, request) return d storage.py from twisted.internet import threads, defer, utils from twisted.python import log from virt_platform.agentd.utils import dd, resize2fs, e2fsck, ntfsresize, kpartx_add, kpartx_del from virt_platform.agentd import lvm import parted import os.path import functools import json class StorageError(Exception): pass class LVMStorageError(StorageError): pass class StorageManager(object): def create(self, name, size): raise NotImplementedError def remove(self, name): raise NotImplementedError def extend(self, name, size, append_mode=False): raise NotImplementedError def write(self, name, image_name): raise NotImplementedError def list(self): raise NotImplementedError def info(self): raise NotImplementedError class LVMStorageManager(StorageManager): lvmExec = '/sbin/lvm' imageInfoURI = "images/%s.json" imageURI = "images/%s.img" def __init__(self, vg_name, file_loader): self.vg_name = vg_name self.file_loader = file_loader self.map_name_fmt = "%s-%%s" % vg_name def msg(self, msg): log.msg("LVMStorageManager | %s" % msg) def _runProcess(self, execute, args=(), resultParser=None): d = utils.getProcessOutputAndValue(execute, args=args) d.addBoth(self._gotResult, resultParser) return d def _gotResult(self, result, resultParser=None): (stdout, stderr, exitCode) = result if exitCode: raise LVMStorageError(exitCode, stdout, stderr) if resultParser is not None: return resultParser(stdout, stderr) return None @defer.inlineCallbacks def create(self, name, size): yield lvm.lvcreate(name, size, self.vg_name) @defer.inlineCallbacks def activate(self, name): path = yield self._getLVPath(name) yield lvm.lvchange(path, available='y') @defer.inlineCallbacks def deactivate(self, name): path = yield self._getLVPath(name) yield lvm.lvchange(path, available='n') def remove(self, name): args = ('lvremove', '--force', "%s/%s" % (self.vg_name, name)) return self._runProcess(self.lvmExec, args=args) def extend(self, name, size, append_mode=False): if append_mode: size = "+%iK" % size else: size = "%iK" % size args = ('lvextend', '--size', size, "%s/%s" % (self.vg_name, name)) return self._runProcess(self.lvmExec, args=args) @defer.inlineCallbacks def resizefs(self, volume_name): yield self._extendPartition(volume_name) fs_type = yield self._guessFSType(volume_name) # XXX: image, wtf? image_info = {'fs_type': fs_type} yield self._resizeFS(volume_name, image_info) @defer.inlineCallbacks def write(self, name, image_name): volume_info = yield self._getVolumeInfo(name) image_info = yield self._getImageInfo(image_name) image_size_kb = image_info['size'] / 1024. if volume_info['size'] < image_size_kb: raise LVMStorageError("Image %r requires %s KB of volume size. %r volume size is %s." %\ (image_name, image_size_kb, name, volume_info['size'])) image_path = yield self._getImage(image_name, check_size=image_info['size']) yield self._writeImage(name, image_path, volume_info, image_info) @defer.inlineCallbacks def _getVolumeInfo(self, name): vlist = yield self.list() for v in vlist: if v['volume_name'] == name: defer.returnValue(v) raise LVMStorageError("Volume name %r not found." % name) @defer.inlineCallbacks def _getImageInfo(self, image_name): info_uri = self.imageInfoURI % image_name metadata = yield self.file_loader.get(info_uri) image_info = json.loads(metadata) defer.returnValue(image_info) @defer.inlineCallbacks def _getImage(self, image_name, check_size=None, check_sum=None, sum_type=None): image_uri = self.imageURI % image_name for _ in xrange(2): path = yield self.file_loader.download(image_uri) if check_size: real_size = os.path.getsize(path) if real_size != check_size: self.msg("Check size for %r (%r) failed: real size: %s, expected: %s. Flushing cache." % (image_name, path, real_size, check_size)) self.file_loader.flushCache(image_uri) else: self.msg("Check size OK for %r (%r)." % (image_name, path)) defer.returnValue(path) else: defer.returnValue(path) raise LVMStorageError("Failed to download image %r." % image_name) @defer.inlineCallbacks def _writeImage(self, volume_name, image_path, volume_info, image_info): yield self._burnImage(volume_name, image_path) yield self._extendPartition(volume_name) yield self._resizeFS(volume_name, image_info) @defer.inlineCallbacks def _guessFSType(self, volume_name): lvpath = yield self._getLVPath(volume_name) device = parted.Device(lvpath) disk = parted.Disk(device) |
«Название темы» Обязательные структурные элементы отчета о нир выделены полужирным шрифтом. В отчет о нир объемом не более 10 страниц содержание... | Реферат Тема нир Тема нир: Исследование возможности принудительного радиоактивного распада ядер вольфрама | ||
Реферат Тема нир Тема нир: Исследование возможности принудительного радиоактивного распада ядер вольфрама | Реферат Тема нир Тема нир: Анализ экономического механизма и разработка критериев эффективности системы консалтинга для издательско-полиграфического... | ||
Реферат Тема нир ... | Реферат Тема нир ... | ||
Реферат Тема нир Тема нир: Анализ экономического механизма и разработка критериев эффективности системы консалтинга для издательско-полиграфического... | Реферат Тема нир Тема нир: Математическое моделирование и исследование квазипериодических структур | ||
Реферат Тема нир Тема нир: Математическое моделирование и исследование квазипериодических структур | Реферат Тема нир Тема нир: Разработка способов снижения износа и повышения химической стойкости резинотехнических изделий для полиграфии и других... | ||
Реферат Тема нир Тема нир: Методология определения цветового и структурного соответствия при сравнении изображений, воспроизведенных на носителях... | Рекомендации по предотвращению типовых недостатков в оформлении итоговых... Нир, выполненных по Программе фи, необходимо соблюдать требования гост 32-2001. По результатам внутренней экспертизы итоговых отчетов... | ||
Рекомендации по предотвращению типовых недостатков в оформлении итоговых... Нир, выполненных по Программе фи, необходимо соблюдать требования гост 32-2001. По результатам внутренней экспертизы итоговых отчетов... | План нир управления инновационного развития Совершенствование моделей краткосрочного прогнозирования социально-экономического развития Российской Федерации | ||
Отчет о нир (заключительный) Рекомендации по оформлению отчетной документации по государственным контрактам на выполнение нир в рамках федеральной целевой программы... | Реферат с элементами нир, представляемый на Конкурс может содержать следующие разделы К положению об организации и проведении конкурса на лучший реферат с элементами нир для студентов младших курсов в рамках ниу |