您的位置:首页 > 编程语言

nova boot代码流程分析(二):nova-scheduler主机选择

2016-04-10 21:35 441 查看
本篇文章将分析nova-scheduler服务在创建VM时如何进行主机的选择。完整来说,nova-scheduler主机选择的过程主要分为以下几个阶段:

1. nova.scheduler.rpcapi.SchedulerAPI发出RPC请求到nova.scheduler.manager.SchedulerManager。

2. 从SchedulerManager到调度器(类SchedulerDriver)。

3. 从SchedulerDriver到Filters。

4. 从Filters到权重计算和排序。

具体接到上一篇文章中跳过的选择主机的代码进行分析,代码如下。

#/nova/conductor/Manager.py:ComputeTaskManager
def build_instances(self, context, instances, image, filter_properties,
admin_password, injected_files, requested_networks,
security_groups, block_device_mapping=None, legacy_bdm=True):
# TODO(ndipanov): Remove block_device_mapping and legacy_bdm in version
#                 2.0 of the RPC API.
request_spec = scheduler_utils.build_request_spec(context, image,
instances)
# TODO(danms): Remove this in version 2.0 of the RPC API
if (requested_networks and
not isinstance(requested_networks,
objects.NetworkRequestList)):
requested_networks = objects.NetworkRequestList(
objects=[objects.NetworkRequest.from_tuple(t)
for t in requested_networks])
# TODO(melwitt): Remove this in version 2.0 of the RPC API
flavor = filter_properties.get('instance_type')
if flavor and not isinstance(flavor, objects.Flavor):
# Code downstream may expect extra_specs to be populated since it
# is receiving an object, so lookup the flavor to ensure this.
flavor = objects.Flavor.get_by_id(context, flavor['id'])
filter_properties = dict(filter_properties, instance_type=flavor)

try:
scheduler_utils.setup_instance_group(context, request_spec,
filter_properties)
# check retry policy. Rather ugly use of instances[0]...
# but if we've exceeded max retries... then we really only
# have a single instance.
scheduler_utils.populate_retry(filter_properties,
instances[0].uuid)
hosts = self.scheduler_client.select_destinations(context,
request_spec, filter_properties)
except Exception as exc:
updates = {'vm_state': vm_states.ERROR, 'task_state': None}
for instance in instances:
self._set_vm_state_and_notify(
context, instance.uuid, 'build_instances', updates,
exc, request_spec)
return

for (instance, host) in itertools.izip(instances, hosts):
try:
instance.refresh()
except (exception.InstanceNotFound,
exception.InstanceInfoCacheNotFound):
LOG.debug('Instance deleted during build', instance=instance)
continue
local_filter_props = copy.deepcopy(filter_properties)
scheduler_utils.populate_filter_properties(local_filter_props,
host)
# The block_device_mapping passed from the api doesn't contain
# instance specific information
bdms = objects.BlockDeviceMappingList.get_by_instance_uuid(
context, instance.uuid)

self.compute_rpcapi.build_and_run_instance(context,
instance=instance, host=host['host'], image=image,
request_spec=request_spec,
filter_properties=local_filter_props,
admin_password=admin_password,
injected_files=injected_files,
requested_networks=requested_networks,
security_groups=security_groups,
block_device_mapping=bdms, node=host['nodename'],
limits=host['limits'])


其中

hosts =self.scheduler_client.select_destinations(context,

request_spec, filter_properties)

是nova-scheduler服务选择host的代码流程,在本篇文章我们将重点进行分析,其中self.scheduler_client是/nova/scheduler/client/__init__.py: SchedulerClient对象。

1. nova.scheduler.rpcapi.SchedulerAPI发出RPC请求到nova.scheduler.manager.SchedulerManager

#/nova/scheduler/client/__init__.py:LazyLoader
class LazyLoader(object):

def __init__(self, klass, *args, **kwargs):
self.klass = klass
self.args = args
self.kwargs = kwargs
self.instance = None

def __getattr__(self, name):
return functools.partial(self.__run_method, name)

def __run_method(self, __name, *args, **kwargs):
if self.instance is None:
self.instance = self.klass(*self.args, **self.kwargs)
return getattr(self.instance, __name)(*args, **kwargs)

#/nova/scheduler/client/__init__.py:SchedulerClient
class SchedulerClient(object):
"""Client library for placing calls to the scheduler."""

def __init__(self):
self.queryclient = LazyLoader(importutils.import_class(
'nova.scheduler.client.query.SchedulerQueryClient'))
self.reportclient = LazyLoader(importutils.import_class(
'nova.scheduler.client.report.SchedulerReportClient'))

@utils.retry_select_destinations
def select_destinations(self, context, request_spec, filter_properties):
return self.queryclient.select_destinations(
context, request_spec, filter_properties)


从上述的代码可以看出,SchedulerClient类中的self.queryclient和self.reportclient其实质应该分别为nova.scheduler.client.query.SchedulerQueryClient和nova.scheduler.client.report.SchedulerReportClient对象。不过只是被LazyLoader类进行包裹了,LazyLoader类的作用主要是起到单例模式和代理的作用。这里的主机选择的代码流程是调用nova.scheduler.client.query.SchedulerQueryClient对象的select_destinations函数。

#/nova/scheduler/client/query.py:SchedulerQueryClient
class SchedulerQueryClient(object):
"""Client class for querying to the scheduler."""

def __init__(self):
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()

def select_destinations(self, context, request_spec, filter_properties):
"""Returns destinations(s) best suited for this request_spec and
filter_properties.

The result should be a list of dicts with 'host', 'nodename' and
'limits' as keys.
"""
return self.scheduler_rpcapi.select_destinations(
context, request_spec, filter_properties)

#/nova/scheduler/rpcapi.py:SchedulerAPI
def select_destinations(self, ctxt, request_spec, filter_properties):
cctxt = self.client.prepare(version='4.0')
return cctxt.call(ctxt, 'select_destinations',
request_spec=request_spec, filter_properties=filter_properties)


从上面的代码可以看出,nova-conductor服务最终会通过rpc调用去执行nova-scheduler服务的select_destinations函数。该函数在nova-scheduler的manager.py文件中。

2. 从SchedulerManager到调度器(类SchedulerDriver)

#/nova/scheduler/manager.py:SchedulerManager
class SchedulerManager(manager.Manager):
"""Chooses a host to run instances on."""

target = messaging.Target(version='4.2')

def __init__(self, scheduler_driver=None, *args, **kwargs):
if not scheduler_driver:
scheduler_driver = CONF.scheduler_driver
self.driver = importutils.import_object(scheduler_driver)
super(SchedulerManager, self).__init__(service_name='scheduler',
*args, **kwargs)
self.additional_endpoints.append(_SchedulerManagerV3Proxy(self))

@messaging.expected_exceptions(exception.NoValidHost)
def select_destinations(self, context, request_spec, filter_properties):
"""Returns destinations(s) best suited for this request_spec and
filter_properties.

The result should be a list of dicts with 'host', 'nodename' and
'limits' as keys.
"""
dests = self.driver.select_destinations(context, request_spec,
filter_properties)
return jsonutils.to_primitive(dests)

这里的self.driver的值是根据/etc/nova/nova.conf配置文件进行设置的,默认配置为:scheduler_driver=nova.scheduler.filter_scheduler.FilterScheduler。所以真正执行host的选择是在scheduler_driver对象中进行的。

#/nova/scheduler/filter_scheduler.py:FilterScheduler
class FilterScheduler(driver.Scheduler):
"""Scheduler that can be used for filtering and weighing."""
def __init__(self, *args, **kwargs):
super(FilterScheduler, self).__init__(*args, **kwargs)
self.options = scheduler_options.SchedulerOptions()
self.notifier = rpc.get_notifier('scheduler')

def select_destinations(self, context, request_spec, filter_properties):
"""Selects a filtered set of hosts and nodes."""
self.notifier.info(context, 'scheduler.select_destinations.start',
dict(request_spec=request_spec))

num_instances = request_spec['num_instances']
selected_hosts = self._schedule(context, request_spec,
filter_properties)

# Couldn't fulfill the request_spec
if len(selected_hosts) < num_instances:
# Log the details but don't put those into the reason since
# we don't want to give away too much information about our
# actual environment.
LOG.debug('There are %(hosts)d hosts available but '
'%(num_instances)d instances requested to build.',
{'hosts': len(selected_hosts),
'num_instances': num_instances})

reason = _('There are not enough hosts available.')
raise exception.NoValidHost(reason=reason)

dests = [dict(host=host.obj.host, nodename=host.obj.nodename,
limits=host.obj.limits) for host in selected_hosts]

self.notifier.info(context, 'scheduler.select_destinations.end',
dict(request_spec=request_spec))
return dests

然后在self._schedule函数中进行Filtering和Weighting操作。

#/nova/scheduler/filter_scheduler.py:FilterScheduler
def _schedule(self, context, request_spec, filter_properties):
"""Returns a list of hosts that meet the required specs,
ordered by their fitness.
"""
elevated = context.elevated()
instance_properties = request_spec['instance_properties']
instance_type = request_spec.get("instance_type", None)

update_group_hosts = filter_properties.get('group_updated', False)

config_options = self._get_configuration_options()

filter_properties.update({'context': context,
'request_spec': request_spec,
'config_options': config_options,
'instance_type': instance_type})

self.populate_filter_properties(request_spec,
filter_properties)

# Find our local list of acceptable hosts by repeatedly
# filtering and weighing our options. Each time we choose a
# host, we virtually consume resources on it so subsequent
# selections can adjust accordingly.

# Note: remember, we are using an iterator here. So only
# traverse this list once. This can bite you if the hosts
# are being scanned in a filter or weighing function.
hosts = self._get_all_host_states(elevated)

selected_hosts = []
num_instances = request_spec.get('num_instances', 1)
for num in xrange(num_instances):
# Filter local hosts based on requirements ...
hosts = self.host_manager.get_filtered_hosts(hosts,
filter_properties, index=num)
if not hosts:
# Can't get any more locally.
break

LOG.debug("Filtered %(hosts)s", {'hosts': hosts})

weighed_hosts = self.host_manager.get_weighed_hosts(hosts,
filter_properties)

LOG.debug("Weighed %(hosts)s", {'hosts': weighed_hosts})

scheduler_host_subset_size = CONF.scheduler_host_subset_size
if scheduler_host_subset_size > len(weighed_hosts):
scheduler_host_subset_size = len(weighed_hosts)
if scheduler_host_subset_size < 1:
scheduler_host_subset_size = 1

chosen_host = random.choice(
weighed_hosts[0:scheduler_host_subset_size])
LOG.debug("Selected host: %(host)s", {'host': chosen_host})
selected_hosts.append(chosen_host)

# Now consume the resources so the filter/weights
# will change for the next instance.
chosen_host.obj.consume_from_instance(instance_properties)
if update_group_hosts is True:
# NOTE(sbauza): Group details are serialized into a list now
# that they are populated by the conductor, we need to
# deserialize them
if isinstance(filter_properties['group_hosts'], list):
filter_properties['group_hosts'] = set(
filter_properties['group_hosts'])
filter_properties['group_hosts'].add(chosen_host.obj.host)
return selected_hosts
首先执行hosts = self._get_all_host_states(elevated)去获取所有主机的信息。如下

#/nova/scheduler/filter_scheduler.py:FilterScheduler
def _get_all_host_states(self, context):
"""Template method, so a subclass can implement caching."""
return self.host_manager.get_all_host_states(context)

#/nova/scheduler/host_manager.py:HostManager
def get_all_host_states(self, context):
"""Returns a list of HostStates that represents all the hosts
the HostManager knows about. Also, each of the consumable resources
in HostState are pre-populated and adjusted based on data in the db.
"""

service_refs = {service.host: service
for service in objects.ServiceList.get_by_binary(
context, 'nova-compute')}
# Get resource usage across the available compute nodes:
compute_nodes = objects.ComputeNodeList.get_all(context)
seen_nodes = set()
for compute in compute_nodes:
service = service_refs.get(compute.host)

if not service:
LOG.warning(_LW(
"No compute service record found for host %(host)s"),
{'host': compute.host})
continue
host = compute.host
node = compute.hypervisor_hostname
state_key = (host, node)
host_state = self.host_state_map.get(state_key)
if host_state:
host_state.update_from_compute_node(compute)
else:
host_state = self.host_state_cls(host, node, compute=compute)
self.host_state_map[state_key] = host_state
# We force to update the aggregates info each time a new request
# comes in, because some changes on the aggregates could have been
# happening after setting this field for the first time
host_state.aggregates = [self.aggs_by_id[agg_id] for agg_id in
self.host_aggregates_map[
host_state.host]]
host_state.update_service(dict(service.iteritems()))
self._add_instance_info(context, compute, host_state)
seen_nodes.add(state_key)

# remove compute nodes from host_state_map if they are not active
dead_nodes = set(self.host_state_map.keys()) - seen_nodes
for state_key in dead_nodes:
host, node = state_key
LOG.info(_LI("Removing dead compute node %(host)s:%(node)s "
"from scheduler"), {'host': host, 'node': node})
del self.host_state_map[state_key]

return self.host_state_map.itervalues()

这里主要分析两点:1. 如何到数据库中获取信息(原来没有从代码层面分析)。2. host_state的作用。

对于第一点,我将根据objects.ServiceList.get_by_binary函数进行举例分析。

service_refs= {service.host: service

forservice
in objects.ServiceList.get_by_binary(

context, 'nova-compute')}

代码如下。

#/nova/objects/service.py:ServiceList
@base.remotable_classmethod
def get_by_binary(cls, context, binary):
db_services = db.service_get_all_by_binary(context, binary)
return base.obj_make_list(context, cls(context), objects.Service,
db_services)

注意get_by_binary函数上面的装饰器 @base.remotable_classmethod,其含义就是被装饰的函数(这里是get_by_binary函数)是否需要通过RPC远程被执行。这里不是我们关注的重点,重点是如何访问数据库。对于该装饰器的更多解释,可以参考这篇文章《 nova-computePeriodic
tasks 机制》。

#/nova/db/api.py
_BACKEND_MAPPING = {'sqlalchemy': 'nova.db.sqlalchemy.api'}

IMPL = concurrency.TpoolDbapiWrapper(CONF, backend_mapping=_BACKEND_MAPPING)

def service_get_all_by_binary(context, binary):
"""Get all services for a given binary."""
return IMPL.service_get_all_by_binary(context, binary)

#/nova/db/sqlalchemy/api.py
def service_get_all_by_host(context, host):
return model_query(context, models.Service, read_deleted="no").\
filter_by(host=host).\
all()

这里service_get_all_by_host函数将根据数据库nova中的表services查询binary=nova-compute的所有主机的相关信息。其中表services的定义如下。

#/nova/db/sqlalchemy/models.py:Service
class Service(BASE, NovaBase):
"""Represents a running service on a host."""

__tablename__ = 'services'
__table_args__ = (
schema.UniqueConstraint("host", "topic", "deleted",
name="uniq_services0host0topic0deleted"),
schema.UniqueConstraint("host", "binary", "deleted",
name="uniq_services0host0binary0deleted")
)

id = Column(Integer, primary_key=True)
host = Column(String(255))  # , ForeignKey('hosts.id'))
binary = Column(String(255))
topic = Column(String(255))
report_count = Column(Integer, nullable=False, default=0)
disabled = Column(Boolean, default=False)
disabled_reason = Column(String(255))

在我的环境中的数据库nova的表services的信息如下。

MariaDB [nova]> select * from services;

+---------------------+---------------------+------------+----+------+------------------+-------------+--------------+----------+---------+-----------------+

| created_at | updated_at | deleted_at | id | host | binary | topic | report_count | disabled | deleted | disabled_reason |

+---------------------+---------------------+------------+----+------+------------------+-------------+--------------+----------+---------+-----------------+

| 2015-08-28 14:06:37 | 2016-04-10 01:02:38 | NULL | 1 | jun | nova-consoleauth | consoleauth | 172420 | 0 | 0 | NULL |

| 2015-08-28 14:06:41 | 2016-04-10 01:02:37 | NULL | 2 | jun | nova-scheduler | scheduler | 172189 | 0 | 0 | NULL |

| 2015-08-28 14:07:05 | 2016-04-10 01:02:41 | NULL | 3 | jun | nova-conductor | conductor | 172317 | 0 | 0 | NULL |

| 2015-08-28 14:07:18 | 2016-04-10 01:02:39 | NULL | 7 | jun2 | nova-compute | compute | 31553 | 0 | 0 | NULL |

| 2015-08-28 14:08:40 | 2016-04-10 01:02:36 | NULL | 8 | jun | nova-cert | cert | 172612 | 0 | 0 | NULL |

+---------------------+---------------------+------------+----+------+------------------+-------------+--------------+----------+---------+-----------------+

5 rows in set (0.05 sec)

最终service_get_all_by_host函数只会返回binary=nova-compute的主机信息,由于我的环境中只有一个nova-compute服务,所以数据库中显示的nova-compute相关的信息只有一条。

上述便是查询数据库信息的内容,因为对于compute_nodes = objects.ComputeNodeList.get_all(context)的查询方式也相似,该条 语句查询的数据库nova中的表compute_nodes中的信息。即如下。

#/nova/db/sqlalchemy/api.py
@require_admin_context
def compute_node_get_all(context):
return model_query(context, models.ComputeNode, read_deleted='no').all()

#/nova/db/sqlalchemy/models.py:Service
class ComputeNode(BASE, NovaBase):
"""Represents a running compute service on a host."""

__tablename__ = 'compute_nodes'
__table_args__ = (
schema.UniqueConstraint(
'host', 'hypervisor_hostname', 'deleted',
name="uniq_compute_nodes0host0hypervisor_hostname0deleted"),
)
id = Column(Integer, primary_key=True)
service_id = Column(Integer, nullable=True)

# FIXME(sbauza: Host field is nullable because some old Juno compute nodes
# can still report stats from an old ResourceTracker without setting this
# field.
# This field has to be set non-nullable in a later cycle (probably Lxxx)
# once we are sure that all compute nodes in production report it.
host = Column(String(255), nullable=True)
vcpus = Column(Integer, nullable=False)
memory_mb = Column(Integer, nullable=False)
local_gb = Column(Integer, nullable=False)
vcpus_used = Column(Integer, nullable=False)
memory_mb_used = Column(Integer, nullable=False)
local_gb_used = Column(Integer, nullable=False)
hypervisor_type = Column(MediumText(), nullable=False)
hypervisor_version = Column(Integer, nullable=False)
hypervisor_hostname = Column(String(255))

# Free Ram, amount of activity (resize, migration, boot, etc) and
# the number of running VM's are a good starting point for what's
# important when making scheduling decisions.
free_ram_mb = Column(Integer)
free_disk_gb = Column(Integer)
current_workload = Column(Integer)
running_vms = Column(Integer)

# Note(masumotok): Expected Strings example:
#
# '{"arch":"x86_64",
#   "model":"Nehalem",
#   "topology":{"sockets":1, "threads":2, "cores":3},
#   "features":["tdtscp", "xtpr"]}'
#
# Points are "json translatable" and it must have all dictionary keys
# above, since it is copied from <cpu> tag of getCapabilities()
# (See libvirt.virtConnection).
cpu_info = Column(MediumText(), nullable=False)
disk_available_least = Column(Integer)
host_ip = Column(types.IPAddress())
supported_instances = Column(Text)
metrics = Column(Text)

# Note(yongli): json string PCI Stats
# '{"vendor_id":"8086", "product_id":"1234", "count":3 }'
pci_stats = Column(Text)

# extra_resources is a json string containing arbitrary
# data about additional resources.
extra_resources = Column(Text)

# json-encode string containing compute node statistics
stats = Column(Text, default='{}')

# json-encoded dict that contains NUMA topology as generated by
# objects.NUMATopoloogy._to_json()
numa_topology = Column(Text)

由于数据库nova中的表compute_nodes的内容信息太多,我就不贴出来了,我们可以通过MariaDB[nova]> select * from compute_nodes;命令去查询。

继续回到/nova/scheduler/host_manager.py:HostManager的get_all_host_states函数。

#/nova/scheduler/host_manager.py:HostManager
def get_all_host_states(self, context):
"""Returns a list of HostStates that represents all the hosts
the HostManager knows about. Also, each of the consumable resources
in HostState are pre-populated and adjusted based on data in the db.
"""

service_refs = {service.host: service
for service in objects.ServiceList.get_by_binary(
context, 'nova-compute')}
# Get resource usage across the available compute nodes:
compute_nodes = objects.ComputeNodeList.get_all(context)
seen_nodes = set()
for compute in compute_nodes:
service = service_refs.get(compute.host)

if not service:
LOG.warning(_LW(
"No compute service record found for host %(host)s"),
{'host': compute.host})
continue
host = compute.host
node = compute.hypervisor_hostname
state_key = (host, node)
host_state = self.host_state_map.get(state_key)
if host_state:
host_state.update_from_compute_node(compute)
else:
host_state = self.host_state_cls(host, node, compute=compute)
self.host_state_map[state_key] = host_state
# We force to update the aggregates info each time a new request
# comes in, because some changes on the aggregates could have been
# happening after setting this field for the first time
host_state.aggregates = [self.aggs_by_id[agg_id] for agg_id in
self.host_aggregates_map[
host_state.host]]
host_state.update_service(dict(service.iteritems()))
self._add_instance_info(context, compute, host_state)
seen_nodes.add(state_key)

# remove compute nodes from host_state_map if they are not active
dead_nodes = set(self.host_state_map.keys()) - seen_nodes
for state_key in dead_nodes:
host, node = state_key
LOG.info(_LI("Removing dead compute node %(host)s:%(node)s "
"from scheduler"), {'host': host, 'node': node})
del self.host_state_map[state_key]

return self.host_state_map.itervalues()

从数据库nova中查询到表services中binary=nova-compute的信息和表compute_nodes的信息作比较,如果在表compute_nodes查询到信息在表services有记录中,则将信息保存到host_state和seen_nodes中,如果在表compute_nodes查询到信息没有在表services记录中,则不保存。

下面我们分析host_state的作用。

host_state = self.host_state_map.get(state_key)
if host_state:
host_state.update_from_compute_node(compute)
else:
host_state = self.host_state_cls(host, node, compute=compute)
self.host_state_map[state_key] = host_state

如上所示,self.host_state_map保存着host的相关信息,查询self.host_state_map是否有数据库保存的host信息,如果有,则执行update_from_compute_node函数,将数据库中查询的信息更新到host_state中,如果没有,则创建一个HostState对象,在创建该对象时,也将调用update_from_compute_node函数去将数据库中查询的信息更新到host_state中。如下

#/nova/scheduler/host_manager.py:HostManager
# Can be overridden in a subclass
def host_state_cls(self, host, node, **kwargs):
return HostState(host, node, **kwargs)

#/nova/scheduler/host_manager.py:HostState
class HostState(object):
"""Mutable and immutable information tracked for a host.
This is an attempt to remove the ad-hoc data structures
previously used and lock down access.
"""

def __init__(self, host, node, compute=None):
self.host = host
self.nodename = node

# Mutable available resources.
# These will change as resources are virtually "consumed".
self.total_usable_ram_mb = 0
self.total_usable_disk_gb = 0
self.disk_mb_used = 0
self.free_ram_mb = 0
self.free_disk_mb = 0
self.vcpus_total = 0
self.vcpus_used = 0
self.pci_stats = None
self.numa_topology = None

# Additional host information from the compute node stats:
self.num_instances = 0
self.num_io_ops = 0

# Other information
self.host_ip = None
self.hypervisor_type = None
self.hypervisor_version = None
self.hypervisor_hostname = None
self.cpu_info = None
self.supported_instances = None

# Resource oversubscription values for the compute host:
self.limits = {}

# Generic metrics from compute nodes
self.metrics = {}

# List of aggregates the host belongs to
self.aggregates = []

# Instances on this host
self.instances = {}

self.updated = None
if compute:
self.update_from_compute_node(compute)

#/nova/scheduler/host_manager.py:HostState
def update_from_compute_node(self, compute):
"""Update information about a host from a ComputeNode object."""
if (self.updated and compute.updated_at
and self.updated > compute.updated_at):
return
all_ram_mb = compute.memory_mb

# Assume virtual size is all consumed by instances if use qcow2 disk.
free_gb = compute.free_disk_gb
least_gb = compute.disk_available_least
if least_gb is not None:
if least_gb > free_gb:
# can occur when an instance in database is not on host
LOG.warning(_LW("Host %(hostname)s has more disk space than "
"database expected "
"(%(physical)sgb > %(database)sgb)"),
{'physical': least_gb, 'database': free_gb,
'hostname': compute.hypervisor_hostname})
free_gb = min(least_gb, free_gb)
free_disk_mb = free_gb * 1024

self.disk_mb_used = compute.local_gb_used * 1024

# NOTE(jogo) free_ram_mb can be negative
self.free_ram_mb = compute.free_ram_mb
self.total_usable_ram_mb = all_ram_mb
self.total_usable_disk_gb = compute.local_gb
self.free_disk_mb = free_disk_mb
self.vcpus_total = compute.vcpus
self.vcpus_used = compute.vcpus_used
self.updated = compute.updated_at
self.numa_topology = compute.numa_topology
self.pci_stats = pci_stats.PciDeviceStats(
compute.pci_device_pools)

# All virt drivers report host_ip
self.host_ip = compute.host_ip
self.hypervisor_type = compute.hypervisor_type
self.hypervisor_version = compute.hypervisor_version
self.hypervisor_hostname = compute.hypervisor_hostname
self.cpu_info = compute.cpu_info
if compute.supported_hv_specs:
self.supported_instances = [spec.to_list() for spec
in compute.supported_hv_specs]
else:
self.supported_instances = []

# Don't store stats directly in host_state to make sure these don't
# overwrite any values, or get overwritten themselves. Store in self so
# filters can schedule with them.
self.stats = compute.stats or {}

# Track number of instances on host
self.num_instances = int(self.stats.get('num_instances', 0))

self.num_io_ops = int(self.stats.get('io_workload', 0))

# update metrics
self._update_metrics_from_compute_node(compute)

注意,update_from_compute_node函数中的HostState自身维护了一个updated属性,该update属性的值会与数据库中的update_at的值作比较。原因是这样的:nova-compute对数据的更新是周期性的(根据nova-compute的periodic task机制),而nova-scheduler在选择最佳主机时则要求数据必须是最新的,因此nova-scheduler中维护了一份数据,里面包含了从上次数据库更新到现在的 主机资源变化情况,这份工作就是由/nova/scheduler/host_manager.py:HostState完成的。

如上所示,如果数据库中数据的更新时间update_at小于nova-scheduler所维护数据的更新时间,则说明数据库中的数据已经比较过时了,此时不需要从数据库中更新信息。这里需要注意的是:nova-scheduler对所维护的资源数据并不会同步到数据库中,它只是从数据库中同步数据。

下面分析nova-scheduler的Filtering机制。

3. 从SchedulerDriver到Filters。

hosts = self.host_manager.get_filtered_hosts(hosts,

filter_properties,index=num)

#/nova/scheduler/host_manager.py:HostManager
def get_filtered_hosts(self, hosts, filter_properties,
filter_class_names=None, index=0):
"""Filter hosts and return only ones passing all filters."""

def _strip_ignore_hosts(host_map, hosts_to_ignore):
ignored_hosts = []
for host in hosts_to_ignore:
for (hostname, nodename) in host_map.keys():
if host == hostname:
del host_map[(hostname, nodename)]
ignored_hosts.append(host)
ignored_hosts_str = ', '.join(ignored_hosts)
msg = _('Host filter ignoring hosts: %s')
LOG.info(msg % ignored_hosts_str)

def _match_forced_hosts(host_map, hosts_to_force):
forced_hosts = []
for (hostname, nodename) in host_map.keys():
if hostname not in hosts_to_force:
del host_map[(hostname, nodename)]
else:
forced_hosts.append(hostname)
if host_map:
forced_hosts_str = ', '.join(forced_hosts)
msg = _('Host filter forcing available hosts to %s')
else:
forced_hosts_str = ', '.join(hosts_to_force)
msg = _("No hosts matched due to not matching "
"'force_hosts' value of '%s'")
LOG.info(msg % forced_hosts_str)

def _match_forced_nodes(host_map, nodes_to_force):
forced_nodes = []
for (hostname, nodename) in host_map.keys():
if nodename not in nodes_to_force:
del host_map[(hostname, nodename)]
else:
forced_nodes.append(nodename)
if host_map:
forced_nodes_str = ', '.join(forced_nodes)
msg = _('Host filter forcing available nodes to %s')
else:
forced_nodes_str = ', '.join(nodes_to_force)
msg = _("No nodes matched due to not matching "
"'force_nodes' value of '%s'")
LOG.info(msg % forced_nodes_str)

if filter_class_names is None:
filters = self.default_filters
else:
filters = self._choose_host_filters(filter_class_names)
ignore_hosts = filter_properties.get('ignore_hosts', [])
force_hosts = filter_properties.get('force_hosts', [])
force_nodes = filter_properties.get('force_nodes', [])

if ignore_hosts or force_hosts or force_nodes:
# NOTE(deva): we can't assume "host" is unique because
#             one host may have many nodes.
name_to_cls_map = {(x.host, x.nodename): x for x in hosts}
if ignore_hosts:
_strip_ignore_hosts(name_to_cls_map, ignore_hosts)
if not name_to_cls_map:
return []
# NOTE(deva): allow force_hosts and force_nodes independently
if force_hosts:
_match_forced_hosts(name_to_cls_map, force_hosts)
if force_nodes:
_match_forced_nodes(name_to_cls_map, force_nodes)
if force_hosts or force_nodes:
# NOTE(deva): Skip filters when forcing host or node
if name_to_cls_map:
return name_to_cls_map.values()
hosts = name_to_cls_map.itervalues()

return self.filter_handler.get_filtered_objects(filters,
hosts, filter_properties, index)

从上面的代码可以看出,get_filtered_objects函数对所有的主机进行一层一层过滤,即如果目前有host1,host2和host3主机需要判断是否满足创建VM的需求,且目前的filter为filterOne,则利用filterOne对host1,host2和host3进行过滤判断,如果host2被filterOne过滤掉,那么经过filterOne过滤后,只剩下host1和host3主机了,对于后续的filterTwo,则只对host1和host3进行过滤判断,以此类推,直到通过了所有filters,最终剩下的主机就是都创建VM需求的主机。这里我们采用RamFilterfilter举例进行分析如何过滤判断。当执行objs=
filter.filter_all(list_objs, filter_properties)将进行过滤 判断。

#/nova/scheduler/filters/ram_filter.py:BaseRamFilter
class BaseRamFilter(filters.BaseHostFilter):

def _get_ram_allocation_ratio(self, host_state, filter_properties):
raise NotImplementedError

def host_passes(self, host_state, filter_properties):
"""Only return hosts with sufficient available RAM."""
instance_type = filter_properties.get('instance_type')
requested_ram = instance_type['memory_mb']
free_ram_mb = host_state.free_ram_mb
total_usable_ram_mb = host_state.total_usable_ram_mb

ram_allocation_ratio = self._get_ram_allocation_ratio(host_state,
filter_properties)

memory_mb_limit = total_usable_ram_mb * ram_allocation_ratio
used_ram_mb = total_usable_ram_mb - free_ram_mb
usable_ram = memory_mb_limit - used_ram_mb
if not usable_ram >= requested_ram:
LOG.debug("%(host_state)s does not have %(requested_ram)s MB "
"usable ram, it only has %(usable_ram)s MB usable ram.",
{'host_state': host_state,
'requested_ram': requested_ram,
'usable_ram': usable_ram})
return False

# save oversubscription limit for compute node to test against:
host_state.limits['memory_mb'] = memory_mb_limit
return True

class RamFilter(BaseRamFilter):
"""Ram Filter with over subscription flag."""

def _get_ram_allocation_ratio(self, host_state, filter_properties):
return CONF.ram_allocation_ratio

#/nova/scheduler/filters/__init__.py:BaseHostFilter
class BaseHostFilter(filters.BaseFilter):
"""Base class for host filters."""
def _filter_one(self, obj, filter_properties):
"""Return True if the object passes the filter, otherwise False."""
return self.host_passes(obj, filter_properties)

def host_passes(self, host_state, filter_properties):
"""Return True if the HostState passes the filter, otherwise False.
Override this in a subclass.
"""
raise NotImplementedError()

#/nova/ filters.py:BaseFilter
class BaseFilter(object):
"""Base class for all filter classes."""
def _filter_one(self, obj, filter_properties):
"""Return True if it passes the filter, False otherwise.
Override this in a subclass.
"""
return True

def filter_all(self, filter_obj_list, filter_properties):
"""Yield objects that pass the filter.

Can be overridden in a subclass, if you need to base filtering
decisions on all objects.  Otherwise, one can just override
_filter_one() to filter a single object.
"""
for obj in filter_obj_list:
if self._filter_one(obj, filter_properties):
yield obj

从上述代码可以看出,filter_all函数的最终调用位置在/nova/filters.py:BaseFilter中的filter_all函数,这是所有filter的基类,在filter_all函数的调用中,执行_filter_one函数,根据面向对象的规则,该函数将在/nova/scheduler/filters/__init__.py:BaseHostFilter被调用,从上述代码可以看出,最终的调用的代码为host_passes函数,这里需要说明的是,所有的filter都是采用这样的设计方式,如果我们也想要设计一个filter对host进行过滤,则可以在/nova/scheduler/filters/目录中创建我们的filter,且该filter需继承/nova/scheduler/filters/__init__.py:BaseHostFilter类,同时实现host_passes函数。

回到RamFilter过滤程序host_passes,该过滤器的函数首先读取host_state保存的Ram相关信息,且读取/etc/nova/nova.conf文件的ram_allocation_ratio配置信息,默认情况下,ram_allocation_ratio=1.5。这里将从host_state中读取的可使用的Ram乘以ram_allocation_ratio,从而得到总共可使用的内存,即ram_allocation_ratio可以对选择的host作Ram的扩充或限制过滤。

最终将计算出来的可使用的Ram与VM要求的Ram做比较,如果该host满足VM的Ram要求,则返回True,否则返回False。

最终经过所有过滤器过滤后的host即为filter的结果,将通过列表的形式返回到/nova/scheduler/filter_scheduler.py:FilterScheduler的_scheduler函数中。

4. 从Filters到权重计算和排序。

weighed_hosts = self.host_manager.get_weighed_hosts(hosts,

filter_properties)

#/nova/scheduler/host_manager.py:HostManager
def get_weighed_hosts(self, hosts, weight_properties):
"""Weigh the hosts."""
return self.weight_handler.get_weighed_objects(self.weighers,
hosts, weight_properties)

这里self.weighers根据/etc/nova/nova.conf文件中的scheduler_weight_classes配置参数进行设置的,默认情况下,

scheduler_weight_classes= nova.scheduler.weights.all_weighers

这里scheduler_weight_classes是一个函数,在HostManger的__init__函数中有相应的操作,其作用就是找到所有的weigher权重器,然后生成对象进行权重的计算。默认情况下,有3个weigher:IoOpsWeigher, MetricsWeigher和RAMWeigher

#/nova/scheduler/weights/__init__.py:HostWeighHandler
class HostWeightHandler(weights.BaseWeightHandler):
object_class = WeighedHost

def __init__(self):
super(HostWeightHandler, self).__init__(BaseHostWeigher)

#/nova/weights.py:BaseWeightHandler
class BaseWeightHandler(loadables.BaseLoader):
object_class = WeighedObject

def get_weighed_objects(self, weighers, obj_list, weighing_properties):
"""Return a sorted (descending), normalized list of WeighedObjects."""

if not obj_list:
return []

weighed_objs = [self.object_class(obj, 0.0) for obj in obj_list]
for weigher in weighers:
weights = weigher.weigh_objects(weighed_objs, weighing_properties)

# Normalize the weights
weights = normalize(weights,
minval=weigher.minval,
maxval=weigher.maxval)

for i, weight in enumerate(weights):
obj = weighed_objs[i]
obj.weight += weigher.weight_multiplier() * weight

return sorted(weighed_objs, key=lambda x: x.weight, reverse=True)

这里首先为每个被filter过滤后剩下的host包裹一个WeighedObject类,该类定义如下。

#/nova/weights.py:WeighedObject
class WeighedObject(object):
"""Object with weight information."""
def __init__(self, obj, weight):
self.obj = obj
self.weight = weight

def __repr__(self):
return "<WeighedObject '%s': %s>" % (self.obj, self.weight)

WeighedObject类主要包括host和weight信息。初始化weight为0.0。

然后对每个weigher执行weigh_objects函数。

#/nova/weights.py:BaseWeigher
def weigh_objects(self, weighed_obj_list, weight_properties):
"""Weigh multiple objects.

Override in a subclass if you need access to all objects in order
to calculate weights. Do not modify the weight of an object here,
just return a list of weights.
"""
# Calculate the weights
weights = []
for obj in weighed_obj_list:
weight = self._weigh_object(obj.obj, weight_properties)

# Record the min and max values if they are None. If they anything
# but none we assume that the weigher has set them
if self.minval is None:
self.minval = weight
if self.maxval is None:
self.maxval = weight

if weight < self.minval:
self.minval = weight
elif weight > self.maxval:
self.maxval = weight

weights.append(weight)

return weights

weigh_objects函数将对每个host进行权值计算。我们这里假设RAMWeigher对host1,host2和host3主机进行权值计算。首先调用RAMWeigher的_weigh_objects获得free Ram的weight。如下

#/nova/scheduler/weights/ram.py:RAMWeigher
class RAMWeigher(weights.BaseHostWeigher):
minval = 0

def weight_multiplier(self):
"""Override the weight multiplier."""
return CONF.ram_weight_multiplier

def _weigh_object(self, host_state, weight_properties):
"""Higher weights win.  We want spreading to be the default."""
return host_state.free_ram_mb

然后/nova/weights.py:BaseWeigher的weigh_objects函数得到每个主机的Ram的权值,且将所有主机比较后的最大权值和最小权值保存到self.maxval和self.minval中。最后将每个主机的Ram的权值以列表的形式返回到/nova/weights.py:BaseWeightHandler中的get_weighed_objects函数。

回到/nova/weights.py:BaseWeightHandler中的get_weighed_objects函数,在得到weigher计算的每个host的权值后,再近些归一化处理。

#/nova/weights.py
def normalize(weight_list, minval=None, maxval=None):
"""Normalize the values in a list between 0 and 1.0.

The normalization is made regarding the lower and upper values present in
weight_list. If the minval and/or maxval parameters are set, these values
will be used instead of the minimum and maximum from the list.

If all the values are equal, they are normalized to 0.
"""

if not weight_list:
return ()

if maxval is None:
maxval = max(weight_list)

if minval is None:
minval = min(weight_list)

maxval = float(maxval)
minval = float(minval)

if minval == maxval:
return [0] * len(weight_list)

range_ = maxval - minval
return ((i - minval) / range_ for i in weight_list)

这里我们用具体的数据进行举例说明,如host1(Ram)=3,host2(Ram)=10,host3(Ram)=8。最终normalize函数返回的元祖为(0, 1, 5/7)。然后将归一化信息更新到包裹host和weight的WeighedObject对象中。

for i, weight in enumerate(weights):
obj = weighed_objs[i]
obj.weight += weigher.weight_multiplier() * weight

不过这里有个weigher.weight_multiplier()函数返回的值,这是权重的系数,最终排序时需要将每种weigher得到的权重分别乘以它对应的这个系数。这里举例的RAMWeigher weigher的系数是从/etc/nova/nova.conf文件中读取ram_weight_multiplier配置参数获取的。默认情况下ram_weight_multiplier=1.0。

假设这里还有一个IoOpsWeigher weigher。假设它对host1(IoOps)=4,host2(IoOps)=6,host3(IoOps)=8。最终normalize函数返回的元祖为(0, 1/2, 1)。且假设IoOpsWeigher weigher的系数io_ops_weight_multiplier=1.0(采用默认值)。在每一个weigher的权值计算过程中都会将每个主机前一个weigher处理后权值进行累加。因此host1最终的weight为0,即host1(weight)=0,host2(weight)=1+1/2=3/2,host3(weight)=5/7+1=12/7。最终/nova/weights.py:BaseWeightHandler中的get_weighed_objects函数返回按权值从大到小的顺序进行排序后的结果。即(host3,host2,host1)。

这样便完成了nova-scheduler主机选择的代码流程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: