您的位置:首页 > 运维架构

openstack nova 源码分析5-4 -nova/virt/libvirt目录下的connection.py

2014-04-01 14:32 411 查看
由于该文件大于8万字符 所以我分4次挂载(4

 def get_cpu_info(self):  
#获取cpuinfd 的信息 返回utils.dumps(cpu_info)  
 
       """Get cpuinfo information.  
 
       Obtains cpu feature from virConnect.getCapabilities,  
       and returns as a json string.  
 
       :return: see above description  
 
       """ 
 
       xml = self._conn.getCapabilities()  
       xml = libxml2.parseDoc(xml)  
       nodes = xml.xpathEval('//host/cpu')  
       if len(nodes) != 1:  
           reason = _("'<cpu>' must be 1, but %d\n") % len(nodes)  
           reason += xml.serialize()  
           raise exception.InvalidCPUInfo(reason=reason)  
 
       cpu_info = dict()  
 
       arch_nodes = xml.xpathEval('//host/cpu/arch')  
       if arch_nodes:  
           cpu_info['arch'] = arch_nodes[0].getContent()  
 
       model_nodes = xml.xpathEval('//host/cpu/model')  
       if model_nodes:  
           cpu_info['model'] = model_nodes[0].getContent()  
 
       vendor_nodes = xml.xpathEval('//host/cpu/vendor')  
       if vendor_nodes:  
           cpu_info['vendor'] = vendor_nodes[0].getContent()  
 
       topology_nodes = xml.xpathEval('//host/cpu/topology')  
       topology = dict()  
       if topology_nodes:  
           topology_node = topology_nodes[0].get_properties()  
           while topology_node:  
               name = topology_node.get_name()  
               topology[name] = topology_node.getContent()  
               topology_node = topology_node.get_next()  
 
           keys = ['cores', 'sockets', 'threads']  
           tkeys = topology.keys()  
           if set(tkeys) != set(keys):  
               ks = ', '.join(keys)  
               reason = _("topology (%(topology)s) must have %(ks)s")  
               raise exception.InvalidCPUInfo(reason=reason % locals())  
 
       feature_nodes = xml.xpathEval('//host/cpu/feature')  
       features = list()  
       for nodes in feature_nodes:  
           features.append(nodes.get_properties().getContent())  
 
       cpu_info['topology'] = topology  
       cpu_info['features'] = features  
       return utils.dumps(cpu_info)  
 
   def block_stats(self, instance_name, disk):  
#接受实例名为参数 返回一个domain.blockStats(disk) 应该是域所在磁盘的一个状态??  
       """  
       Note that this function takes an instance name.  
       """ 
       domain = self._lookup_by_name(instance_name)  
       return domain.blockStats(disk)  
 
   def interface_stats(self, instance_name, interface):  
#接受实例名为参数 返回一个domain.interfanceStats(interface) 应该是域所在interface的一个状态  
       """  
       Note that this function takes an instance name.  
       """ 
       domain = self._lookup_by_name(instance_name)  
       return domain.interfaceStats(interface)  
 
   def get_console_pool_info(self, console_type):  
#返回控制池的信息 (给出的一个fake data) ip username password  
       #TODO(mdragon): console proxy should be implemented for libvirt,  
       #               in case someone wants to use it with kvm or  
       #               such. For now return fake data.  
       return  {'address': '127.0.0.1',  
                'username': 'fakeuser',  
                'password': 'fakepassword'}  
 
   def refresh_security_group_rules(self, security_group_id):  
       self.firewall_driver.refresh_security_group_rules(security_group_id)  
 
   def refresh_security_group_members(self, security_group_id):  
       self.firewall_driver.refresh_security_group_members(security_group_id)  
 
   def refresh_provider_fw_rules(self):  
       self.firewall_driver.refresh_provider_fw_rules()  
 
   def update_available_resource(self, ctxt, host):  
#在电脑节点表中更新电脑管理资源的信息 这是一个很重要的函数 当nova-computer 登陆的时候 执行nova-manage serverce ..  
       """Updates compute manager resource info on ComputeNode table.  
 
       This method is called when nova-coompute launches, and  
       whenever admin executes "nova-manage service update_resource".  
 
       :param ctxt: security context  
       :param host: hostname that compute manager is currently running  
 
       """ 
 
       try:  
           service_ref = db.service_get_all_compute_by_host(ctxt, host)[0]  
       except exception.NotFound:  
           raise exception.ComputeServiceUnavailable(host=host)  
 
       # Updating host information  
       dic = {'vcpus': self.get_vcpu_total(),  
              'memory_mb': self.get_memory_mb_total(),  
              'local_gb': self.get_local_gb_total(),  
              'vcpus_used': self.get_vcpu_used(),  
              'memory_mb_used': self.get_memory_mb_used(),  
              'local_gb_used': self.get_local_gb_used(),  
              'hypervisor_type': self.get_hypervisor_type(),  
              'hypervisor_version': self.get_hypervisor_version(),  
              'cpu_info': self.get_cpu_info()}  
 
       compute_node_ref = service_ref['compute_node']  
       if not compute_node_ref:  
           LOG.info(_('Compute_service record created for %s ') % host)  
           dic['service_id'] = service_ref['id']  
           db.compute_node_create(ctxt, dic)  
       else:  
           LOG.info(_('Compute_service record updated for %s ') % host)  
           db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic)  
 
   def compare_cpu(self, cpu_info):  
#检查给出xml是否和主机cpu兼容 xml”必须是一个libvirt.openReadonly的一部分().getCapabilities()  
#返回值follows by virCPUCompareResult。  
   #如果0 >返回值,做动态迁移。  
#返回: 如果给定cpu信息与该服务器并不兼容, 抛出异常。  
 
       """Checks the host cpu is compatible to a cpu given by xml.  
 
       "xml" must be a part of libvirt.openReadonly().getCapabilities().  
       return values follows by virCPUCompareResult.  
       if 0 > return value, do live migration.  
       'http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult'  
 
       :param cpu_info: json string that shows cpu feature(see get_cpu_info())  
       :returns:  
           None. if given cpu info is not compatible to this server,  
           raise exception.  
 
       """ 
 
       LOG.info(_('Instance launched has CPU info:\n%s') % cpu_info)  
       dic = utils.loads(cpu_info)  
       xml = str(Template(self.cpuinfo_xml, searchList=dic))  
       LOG.info(_('to xml...\n:%s ' % xml))  
 
       u = "http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult" 
       m = _("CPU doesn't have compatibility.\n\n%(ret)s\n\nRefer to %(u)s")  
       # unknown character exists in xml, then libvirt complains  
       try:  
           ret = self._conn.compareCPU(xml, 0)  
       except libvirt.libvirtError, e:  
           ret = e.message  
           LOG.error(m % locals())  
           raise 
 
       if ret <= 0:  
           raise exception.InvalidCPUInfo(reason=m % locals())  
 
       return 
 
   def ensure_filtering_rules_for_instance(self, instance_ref, network_info,  
                                           time=None):  
    #不知道大家注意没 上面这些 好多都是重写driver.py中的方法   
    #设置过滤规则,并等待它完成  
       """Setting up filtering rules and waiting for its completion.  
 
       To migrate an instance, filtering rules to hypervisors  
       and firewalls are inevitable on destination host.  
       ( Waiting only for filterling rules to hypervisor,  
       since filtering rules to firewall rules can be set faster).  
 
       Concretely, the below method must be called.  
       - setup_basic_filtering (for nova-basic, etc.)  
       - prepare_instance_filter(for nova-instance-instance-xxx, etc.)  
 
       to_xml may have to be called since it defines PROJNET, PROJMASK.  
       but libvirt migrates those value through migrateToURI(),  
       so , no need to be called.  
 
       Don't use thread for this method since migration should  
       not be started when setting-up filtering rules operations  
       are not completed.  
 
       :params instance_ref: nova.db.sqlalchemy.models.Instance object  
 
       """ 
 
       if not time:  
           time = greenthread  
 
       # If any instances never launch at destination host,  
       # basic-filtering must be set here.  
       self.firewall_driver.setup_basic_filtering(instance_ref, network_info)  
       # setting up nova-instance-instance-xx mainly.  
       self.firewall_driver.prepare_instance_filter(instance_ref,  
               network_info)  
 
       # wait for completion  
       timeout_count = range(FLAGS.live_migration_retry_count)  
       while timeout_count:  
           if self.firewall_driver.instance_filter_exists(instance_ref,  
                                                          network_info):  
               break 
           timeout_count.pop()  
           if len(timeout_count) == 0:  
               msg = _('Timeout migrating for %s. nwfilter not found.')  
               raise exception.Error(msg % instance_ref.name)  
           time.sleep(1)  
 
   def live_migration(self, ctxt, instance_ref, dest,  
                      post_method, recover_method, block_migration=False):  
    #分发处理高负荷,当有高负荷操作时候,大量生成 live_mirgration  
       """Spawning live_migration operation for distributing high-load.  
 
       :params ctxt: security context  
       :params instance_ref:  
           nova.db.sqlalchemy.models.Instance object  
           instance object that is migrated.  
       :params dest: destination host  
       :params block_migration: destination host  
       :params post_method:  
           post operation method.  
           expected nova.compute.manager.post_live_migration.  
       :params recover_method:  
           recovery method when any exception occurs.  
           expected nova.compute.manager.recover_live_migration.  
       :params block_migration: if true, do block migration.  
 
       """ 
 
       greenthread.spawn(self._live_migration, ctxt, instance_ref, dest,  
                         post_method, recover_method, block_migration)  
 
   def _live_migration(self, ctxt, instance_ref, dest, post_method,  
                       recover_method, block_migration=False):  
       #动态迁移???  
    """Do live migration.  
 
       :params ctxt: security context  
       :params instance_ref:  
           nova.db.sqlalchemy.models.Instance object  
           instance object that is migrated.  
       :params dest: destination host  
       :params post_method:  
           post operation method.  
           expected nova.compute.manager.post_live_migration.  
       :params recover_method:  
           recovery method when any exception occurs.  
           expected nova.compute.manager.recover_live_migration.  
 
       """ 
 
       # Do live migration.  
       try:  
           if block_migration:  
               flaglist = FLAGS.block_migration_flag.split(',')  
           else:  
               flaglist = FLAGS.live_migration_flag.split(',')  
           flagvals = [getattr(libvirt, x.strip()) for x in flaglist]  
           logical_sum = reduce(lambda x, y: x | y, flagvals)  
 
           dom = self._conn.lookupByName(instance_ref.name)  
           dom.migrateToURI(FLAGS.live_migration_uri % dest,  
                            logical_sum,  
                            None,  
                            FLAGS.live_migration_bandwidth)  
 
       except Exception:  
           recover_method(ctxt, instance_ref, dest, block_migration)  
           raise 
 
       # Waiting for completion of live_migration.  
       timer = utils.LoopingCall(f=None)  
 
       def wait_for_live_migration():  
           """waiting for live migration completion""" 
           try:  
               self.get_info(instance_ref.name)['state']  
           except exception.NotFound:  
               timer.stop()  
               post_method(ctxt, instance_ref, dest, block_migration)  
 
       timer.f = wait_for_live_migration  
       timer.start(interval=0.5, now=True)  
 
   def pre_block_migration(self, ctxt, instance_ref, disk_info_json):  
#准备block的 迁移  
       """Preparation block migration.  
 
       :params ctxt: security context  
       :params instance_ref:  
           nova.db.sqlalchemy.models.Instance object  
           instance object that is migrated.  
       :params disk_info_json:  
           json strings specified in get_instance_disk_info  
 
       """ 
       disk_info = utils.loads(disk_info_json)  
 
       # make instance directory  
       instance_dir = os.path.join(FLAGS.instances_path, instance_ref['name'])  
       if os.path.exists(instance_dir):  
           raise exception.DestinationDiskExists(path=instance_dir)  
       os.mkdir(instance_dir)  
 
       for info in disk_info:  
           base = os.path.basename(info['path'])  
           # Get image type and create empty disk image.  
           instance_disk = os.path.join(instance_dir, base)  
           utils.execute('qemu-img', 'create', '-f', info['type'],  
                         instance_disk, info['local_gb'])  
 
       # if image has kernel and ramdisk, just download  
       # following normal way.  
       if instance_ref['kernel_id']:  
           user = manager.AuthManager().get_user(instance_ref['user_id'])  
           project = manager.AuthManager().get_project(  
               instance_ref['project_id'])  
           self._fetch_image(nova_context.get_admin_context(),  
                             os.path.join(instance_dir, 'kernel'),  
                             instance_ref['kernel_id'],  
                             user,  
                             project)  
           if instance_ref['ramdisk_id']:  
               self._fetch_image(nova_context.get_admin_context(),  
                                 os.path.join(instance_dir, 'ramdisk'),  
                                 instance_ref['ramdisk_id'],  
                                 user,  
                                 project)  
 
   def post_live_migration_at_destination(self, ctxt,  
                                          instance_ref,  
                                          network_info,  
                                          block_migration):  
    #Post操作的动态迁移到目的地主机  
       """Post operation of live migration at destination host.  
 
       :params ctxt: security context  
       :params instance_ref:  
           nova.db.sqlalchemy.models.Instance object  
           instance object that is migrated.  
       :params network_info: instance network infomation  
       :params : block_migration: if true, post operation of block_migraiton.  
       """ 
       # Define migrated instance, otherwise, suspend/destroy does not work.  
       dom_list = self._conn.listDefinedDomains()  
       if instance_ref.name not in dom_list:  
           instance_dir = os.path.join(FLAGS.instances_path,  
                                       instance_ref.name)  
           xml_path = os.path.join(instance_dir, 'libvirt.xml')  
           # In case of block migration, destination does not have  
           # libvirt.xml  
           if not os.path.isfile(xml_path):  
               xml = self.to_xml(instance_ref, network_info=network_info)  
               f = open(os.path.join(instance_dir, 'libvirt.xml'), 'w+')  
               f.write(xml)  
               f.close()  
           # libvirt.xml should be made by to_xml(), but libvirt  
           # does not accept to_xml() result, since uuid is not  
           # included in to_xml() result.  
           dom = self._lookup_by_name(instance_ref.name)  
           self._conn.defineXML(dom.XMLDesc(0))  
 
   def get_instance_disk_info(self, ctxt, instance_ref):  
#返回 跟前面的pre_block_migration有些区别  
   #        json strings with below format.  
   #       "[{'path':'disk', 'type':'raw', 'local_gb':'10G'},...]"  
       """Preparation block migration.  
 
       :params ctxt: security context  
       :params instance_ref:  
           nova.db.sqlalchemy.models.Instance object  
           instance object that is migrated.  
       :return:  
           json strings with below format.  
          "[{'path':'disk', 'type':'raw', 'local_gb':'10G'},...]"  
 
       """ 
       disk_info = []  
 
       virt_dom = self._lookup_by_name(instance_ref.name)  
       xml = virt_dom.XMLDesc(0)  
       doc = libxml2.parseDoc(xml)  
       disk_nodes = doc.xpathEval('//devices/disk')  
       path_nodes = doc.xpathEval('//devices/disk/source')  
       driver_nodes = doc.xpathEval('//devices/disk/driver')  
 
       for cnt, path_node in enumerate(path_nodes):  
           disk_type = disk_nodes[cnt].get_properties().getContent()  
           path = path_node.get_properties().getContent()  
 
           if disk_type != 'file':  
               LOG.debug(_('skipping %(path)s since it looks like volume') %  
                         locals())  
               continue 
 
           # In case of libvirt.xml, disk type can be obtained  
           # by the below statement.  
           # -> disk_type = driver_nodes[cnt].get_properties().getContent()  
           # but this xml is generated by kvm, format is slightly different.  
           disk_type = \  
               driver_nodes[cnt].get_properties().get_next().getContent()  
           if disk_type == 'raw':  
               size = int(os.path.getsize(path))  
           else:  
               out, err = utils.execute('qemu-img', 'info', path)  
               size = [i.split('(')[1].split()[0] for i in out.split('\n')  
                   if i.strip().find('virtual size') >= 0]  
               size = int(size[0])  
 
           # block migration needs same/larger size of empty image on the  
           # destination host. since qemu-img creates bit smaller size image  
           # depending on original image size, fixed value is necessary.  
           for unit, divisor in [('G', 1024 ** 3), ('M', 1024 ** 2),  
                                 ('K', 1024), ('', 1)]:  
               if size / divisor == 0:  
                   continue 
               if size % divisor != 0:  
                   size = size / divisor + 1 
               else:  
                   size = size / divisor  
               size = str(size) + unit  
               break 
 
           disk_info.append({'type': disk_type, 'path': path,  
                             'local_gb': size})  
 
       return utils.dumps(disk_info)  
 
   def unfilter_instance(self, instance_ref, network_info):  
#在防火墙驱动统一方法的see comments???  
       """See comments of same method in firewall_driver.""" 
       self.firewall_driver.unfilter_instance(instance_ref,  
                                              network_info=network_info)  
 
   def update_host_status(self):  
       """See xenapi_conn.py implementation.""" 
       pass 
 
   def get_host_stats(self, refresh=False):  
       """See xenapi_conn.py implementation.""" 
       pass 
 
   def host_power_action(self, host, action):  
#重启或者关闭 或者...  
       """Reboots, shuts down or powers up the host.""" 
       pass 
 
   def set_host_enabled(self, host, enabled):  
       """Sets the specified host's ability to accept new instances.""" 
       pass 

)

 

     该文件位于nova/virt/libvirt目录下的connection.py!我只是浅浅的分析了一下类中函数的方法 细节并没有多看,肯定有很多地方是错的 或者不好!希望大家能够帮忙指出错误!

      接下来 看源代码如下:中文部分是我加的注释 !或许大家会问 为什么要看这个connection.py呢 因为我发现该文件外部virt目录下有个connection.py 其中引用了 这个文件 所以觉得这个应该很重要 而且发现 好多方法都是重写的底层的driver的方法

 
本文出自 “LuoZhengWu” 博客,请务必保留此出处http://brucemars.blog.51cto.com/5288106/968461
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐