您的位置:首页 > 其它

keystone 认证分析

2015-10-13 20:23 330 查看
<token>
关于V2token的分析
1、/v2.0/token
keystone.service下面的
def admin_app_factory
token.routers.Router()
2、keystone.token.routers
mapper.connect('/tokens', controller=token_controller,
action='authenticate',
conditions=dict(method=['POST']))
token_controller = controllers.Auth()

3、keystone.token.controllers.Auth
@controller.v2_deprecated
def authenticate(self, context, auth=None):
<auth function="认证的字典">
tenantName 可选,如果不指定tenantName那么返回的就是unscoped token,这个token
会被用于获取scoped token,否则就是限定到tenant的scoped token
{
"auth":{
"passwordCredentials":{
"username":"test_user",
"password":"mypass"
},
"tenantName":"customer-x"
}

</auth>

if auth is None:
raise exception.ValidationError(attribute='auth', target='request body')
# auth不能为空

if ‘token’ in auth:
# 也就是说不是第一次获取token,那么下面就检测是unscoped token 还是scoped token
auth_info = self._authenticate_token(context, auth)
# auth_info = (current_user_ref, tenant_ref, metadata_ref, expiry, bind, audit_id)
<self._authenticate_token value="(context, auth)" function="检测token是否已经存在,返回 auth_token_data">
def _authenticate_token(self, context, auth):
# 如果token不在auth字典里,报错
if 'token' not in auth:
raise exception.ValidationError(attribute='token', target='auth')
if 'id' not in auth['token']:
raise exception.ValidationError(attribute='id', target='token')

old_token = auth['token']['id']
# 获取token id
if len(old_token) > CONF.max_token_size: CONF.max_token_size = 8192
raise exception.ValidationSizeError(attibute='token', size=CONF.max_token_size)

try:
token_model_ref = token_model.KeystoneToken(
token_id=old_token,
token_data=self.token_provider_api.validate_token(old_token)
# 通过token_provider_api 获取ID为old_token的token data
# keystone.token.provider
<self.token_provider_api.validate_token values="old_token" function="通过token_provider_api 获取ID为old_token的token data">
def validate_token(self, token_id, belongs_to=None):
unique_id = utils.generate_unique_id(token_id)
# 如果token是asn1或者pki,返回hash后的token,否则直接返回
<utils.generate_unique_id values="token_id" function="为token生成唯一的ID" path="keystone.token.utils">
def generate_unique_id(token_id):

return cms.cms_hash_token(token_id, mode=cfg.CONF.token.hash_algorithm) # mode='md5'

</utils.generate_unique_id>

token = self._validate_token(unique_id)
# 需要检查persistence的持久化后端
# 默认使用的provider后端是uuid
# 持久化只负责数据的CRUD操作,provider负责逻辑判断
<self._validate_token values="unique_id" function="">
@MEMOIZE
def _validate_token(sef, token_id):
if not self._needs_persistence:
# self._needs_persistence 是返回的uuid need_persistence ,返回是True
# 然后通过持久化驱动根据token_id 获取token_ref
#self._persistence 获取持久化驱动
# 默认的持久化驱动是sql
return self.driver.validate_v3_token(token_id)

token_ref = self._persistence.get_token(token_id)
# _persistence驱动是token.persistence.backends.sql
<self._persistence.get_token values="token_id" function="从数据库中获取指定id的token_ref">
# 查询的是TokenModel表
class TokenModel(sql.ModelBase, sql.DictBase):
__tablename__ = 'token'
attributes = ['id', 'expires', 'user_id', 'trust_id']
id = sql.Column(sql.String(64), primary_key=True)
expires = sql.Column(sql.DateTime(), default=None)
extra = sql.Column(sql.JsonBlob())
valid = sql.Column(sql.Boolean(), default=True, nullable=False)
user_id = sql.Column(sql.String(64))
trust_id = sql.Column(sql.String(64))
__table_args__ = (
sql.Index('ix_token_expires', 'expires'),
sql.Index('ix_token_expires_valid', 'expires', 'valid'),
sql.Index('ix_token_user_id', 'user_id'),
sql.Index('ix_token_trust_id', 'trust_id')
)

class Token(token.persistence.TokenDriverV8):
def get_token(self, token_id):
if token_id is None:
raise exception.TokenNotFound(token_id=token_id)
session = sql.get_session()
token_ref = session.query(TokenModel).get(token_id)
if not token_ref or not token_ref.valid:
raise exception.TokenNotFound(token_id=token_id)

return token_ref.to_dict()
# to_dict调用的是class DictBase下面的to_dict 将属性变成字典

</self._persistence.get_token>

version = self.driver.get_token_version(token_ref)
# uuid类继承的token.provider.common.BaseProvider
# self.driver也就返回的BaseProvider
<self.driver.get_token_version values="token_ref">
class BaseProvider(provider.Provider):
def get_token_version(self, token_data):
token_data['access']['id'] =====>v2
token_data['token']['methods'] ======>v3
</self.driver.get_token_version>

if version == self.V3:
return self.driver.validate_v3_token(token_ref)
# self.driver 最后是调用的common.BaseProvider
<self.driver.validate_v3_token values="token_ref" function="返回token_data">
class BaseProvider(provider.Provider):
def validate_v3_token(self, token_ref):
trust_id = token_ref.get('trust_id')
if trust_id:
# 这个组件以后再说
self.trust_api.get_trust(trust_id)
token_data = token_ref.get('token_data')

# 从token_ref中取出token_data

if not token_data or 'token' not in token_data:
# token_ref 是由V2 api创建的,则需要变换
project_id = None
project_ref = token_ref.get('tenant')
if project_ref:
project_id = project_ref['id']

issued_at = token_ref['token_data']['access']['token']['issued_at']
audit = token_ref['token_data']['token'].get('audit_ids')

token_data = self.v3_token_data_helper.get_token_data(
token_ref['user']['id'],
['password', 'token'],
project_id=project_id,
bind=token_ref.get('bind'),
expires=token_ref['expires'],
issued_at=issued_at,
audit_info=audit

)

<get_token_data path="keystone.token.common.V3_Token_Data_Helper" function="">
# 返回{'token': token_data}字典
def get_token_data(...):
if extras is None:
extras = {}
if extras:
# 这边是要废弃的提示
token_data = {'methods': method_names, 'extras': extras}

# 如果token不为空,直接将属性作字典存储
if token:
for x in ('roles', 'user', 'catalog', 'project', 'domain'):
if x in token:
token_data[x] = token[x]

if CONF.trust.enabled and trust:
if user_id != trust['trustee_user_id']:
raise exception.Forbidden(_('User is not a trustee.'))

if bind:
token_data['bind'] = bind

self._populate_scope(token_data, domain_id, project_id)
# 将domain project放入token_data字典中
<self._populate_scope values="token_data, domain_id, project_id" function="">
def _populate_scope(self, token_data, domain_id, project_id):
if 'domain' in token_data or 'project' in token_data:
# scope已经存在
return
if domain_id:
token_data['domain'] = self._get_filtered_domain(domain_id)
# 通过resource_api获取domain_ref
# 返回{'id': domain_ref['id'], 'name': domain_ref['name']}
if project_id:
token_data['project'] = self._get_filtered_project(project)
# 通过resource_api获取project_ref
# filter_project = {'id': project_ref['id'], 'name': project_ref['name']}
# filter_project['domain'] = self._get_filtered_domain(project_ref['domain_id'])
# 返回 {'id': project_ref['id'], 'name': project_ref['name'], 'domain': {...上面的那样}}
</self._populate_scope>
self._populate_user(token_data, user_id, trust)
# user_ref 调用identity_api 返回对应user_id的user_ref
# 没有使用trust,所以就返回
# token_data['user'] = {'id': user_ref['id'], 'name': user_ref['name'], 'domain': self._get_filtered_domain(user_ref['domain_id'])}
self._populate_roles(token_data, user_id, domain_id, project_id, trust, access_token)
# 返回 token_data['roles'] = filtered_roles
self._populate_audit_info(token_data, access_token)

if include_catalog:
self._populate_service_catalog(token_data, user_id, domain_id, project_id, trust)
# 返回 token_data['catalog'] = catalog_api.get_v3_catalog(user_id, project_id)
self._populate_service_providers(token_data)
# fedration启用才会处理
self._populate_token_dates(token_data, expires=expires, trust=trust, issued_at=issued_at)
# 处理token_data时间
# token_data['expires_at']
# token_data['issued_at']
self._populate_oauth_section(token_data, access_token)
# access_token不为空再处理
def _populate_oauth_section(self, token_data, access_token):
if access_token:
access_token_id = access_token['id']
consumer_id = access_token['consumer_id']
token_data['OS-OAUTH1'] = ({'access_token_id': access_token_id, 'consumer_id': consumer_id})

return {'token': token_data}
</get_token_data>
return token_data

</self.driver.validate_v3_token>
elif version == self.V2:
return self.driver.validate_v2_token(token_ref)
# 判断是否为federation token 以及是否为默认domain
# 如果是V3token 就将其转变成v2 token
def validate_v2_token(self, token_ref):
try:
self._assert_is_not_federation_token(token_ref)
self._assert_default_domain(token_ref)
token_data = token_ref.get('token_data')

if (self.get_token_version(token_data) != token.provider.V2):
token_data = self.v2_token_data_helper.v3_to_v2_token(token_data)
# v3 token 转变到v2 token

trust_id = token_data['access'].get('trust', {}).get('id')
if trust_id:
self.trust_api.get_trust(trust_id)
return token_data
except exception.ValidationError as e:
LOG.exception(_LE('Failed to validate token'))
raise exception.TokenNotFound(e)

raise exception.UnsupportedTokenVersionException()
</self._validate_token>

self._token_belongs_to(token, belongs_to)
self._is_valid_token(token)
return token
</self.token_provider_api.validate_token>

)
except exception.NotFound as e:
raise exception.Unauthorized(e)

wsgi.validate_token_bind(context, token_model_ref)
# token bind 设置的为permissive,所以直接返回

self._restrict_scope(token_model_ref)
# 设置all_rescoped_scoped_token为False,以阻止用户用已经变更的token来获取其他token
user_id = token_model_ref.user_id
# 用户id
tenant_id = self._get_project_id_from_auth(auth)
# 通过resource_api根据auth的project name获取project id
# resource_api通过tenant name 以及domain id获取tenant id
current_user_ref = self.identity_api.get_user(user_id)

metadata_ref = {}
tenant_ref, metadata_ref['roles'] = self._get_project_roles_and_ref(user_id, tenant_id)
# 根据tenant id通过resource_api获取tenant ref
# 根据user id tenant id通过assignment_api的get_roles_for_user_and_project获取role list

expiry = token_model_ref.expires

bind = token_mode_ref.bind

audit_id = token_model_ref.audit_chain_id

return (current_user_ref, tenant_ref, metadata_ref, metadata_ref, expiry, bind, audit_id)

</self._authenticate_token>
else:
# 其他认证方式
try:
auth_info = self._authenticate_external(context, auth)
# 通过REMOTE_USER认证外部user,并返回auth_token_data(user_ref, tenant_ref, metadata_ref)
<self._authenticate_extenral values="(context, auth)">
def _authenticate_external(self, context, auth):
environment = context.get('environment', {})
if not environment.get('REMOTE_USER'):
raise ExternalAuthNotApplicable()

username = environment['REMOTE_USER']
try:
user_ref = self.identity_api.get_user_by_name(username, CONF.identity.default_domain_id)
# CONF.identity.default_domain_id = 'default'
# 根据username domain id通过identity_api获取user_ref
user_id = user_ref['id']
except exception.UserNotFound as e:
raise exception.Unauthorized(e)

metadata_ref = {}
tenant_id = self._get_project_id_from_auth(auth)
# 根据auth['tenantName']通过resource_api 的get_project_by_name获取tenant id
tenant_ref, metadata_ref['roles'] = self._get_project_roles_and_ref(user_id, tenant_id)
# 返回tenant_ref role_list

expiry = provider.default_expire_time()

bind = None

if ('kerberos' in CONF.token.bind and environment.get('AUTH_TYPE', '').lower() == 'negotiate'):
bind = {'kerberos': username}

audit_id = None
return (user_ref, tenant_ref, metadata_ref, expiry, bind, audit_id)

</self._authenticate_extenral>
except ExternalAuthNotApplicable:
auth_info = self._authenticate_local(context, auth)
# 通过后端进行身份认证
<self._authenticate_local >
if 'passwordCredentials' not in auth:
# 不在就报错
if 'password' not in auth['passwordCredentials']:
# password不在passwordCredentials里面就报错
password = auth['passwordCredentials']['password']

if password and len(password) > CONF.identity.max_password_length:
# 长度超过max_password_length报错
if # passwordCredentials字典里面即没有userId又没有userName 报错
user_id = auth['passwordCredentials'].get('userId')

if user_id超出max_param_size报错

username = auth['passwordCredentials'].get('username', '')

if username:
# username 长度超出max_param_size,报错
user_ref = self.identity_api.get_user_by_name(username, CONF.identity.default_domain_id)
# ?为什么不用get_user_by_id,这个是因为identity_api未提供此接口,而且在get_user_by_id中没有default_id这个参数
user_id = user_ref['id']
# 如果没有就报错
metadata_ref = {}
tenant_ref, metadata_ref['roles'] = self._get_project_roles_and_ref(user_id, tenant_id)
expiry = provider.default_expire_time()
bind = None
audit_id = None
return (user_ref, tenant_ref, metadata_ref, expiry, bind, audit_id)
</self._authenticate_local>

user_ref, tenant_ref, metadata_ref, expiry, bind, audit_id = auth_info

# 查看project domain是否启用
try:
self.identity_api.assert_user_enabled(user_id=user_ref['id'], user=user_ref)
# 确认user user的domain有效
# 根据user['domain_id']通过resource_api 的assert_domain_enabled获取 domain是否disabled
# 如果domain disabled则报错

if tenant_ref:
self.resource_api.assert_project_enabled(project_id=tenant_ref['id'], project=tenant_ref)

except AssertionError as e:
报错退出

user_ref = self.v3_to_v2_user(user_ref)
# 将v3中default_project_id变为tenantId,过滤掉domain,用username属性

if tenant_ref:
tenant_ref = self.v3_to_v2_project(tenant_ref)
# 移除domain,移除is_domain,移除parent id

auth_token_data = self._get_auth_token_data(user_ref, tenant_ref, metadata_ref, expiry, audit_id)
# 变成字典dict(user=user, tenant=tenant, metadata=metadata, expires=expiry, parent_audit_id=audit_id)

if tenant_ref:
catalog_ref = self.catalog_api.get_catalog(user_ref['id'], tenant_ref['id'])
# 获取catalog
else:
catalog_ref = {}

auth_token_data['id'] = 'placeholder'

if bind:
auth_token_data['bind'] = bind

roles_ref = []

for role_id in metadata_ref.get('roles', []):
role_ref = self.role_api.get_role(role_id)
roles_ref.append(dict(name=role_ref['name']))
# 根据role_id通过role_api 的get_role获取role ref

(token_id, token_data) = self.token_provider_api.issue_v2_token(auth_token_data, roles_ref=roles_ref, catalog_ref=catalog_ref)
# uuid继承common.BaseProvider
# 转换成v2token  返回 token id以及token_data
<self.token_provider_api.issue_v2_token>
def issue_v2_token(self, token_ref, roles_ref=None, catalog_ref=None):
token_id, token_data = self.driver.issue_v2_token(token_ref, roles_ref, catalog_ref)
# 调用后端issue_v2_token
def issue_v2_token(self, token_ref, roles_ref=None, catalog_ref=None):
metadata_ref = token_ref['metadata']
trust_ref = None
# trust这块略过
token_data = self.v2_token_data_helper.format_token(token_ref, roles_ref, catalog_ref, trust_ref)
token_id = self._get_token_id(token_data)
token_data['access']['token']['id'] = token_id
return token_id, token_data
if self._needs_persistence:
data = dict(key=token_id, id=token_id, ...)
self._create_token(token_id, data)
return token_id, token_data

</self.token_provider_api.issue_v2_token>
# trust这一块先滤过
return token_data

</token>

查看原文:http://www.zoues.com/index.php/2015/10/13/keystone-%e8%ae%a4%e8%af%81%e5%88%86%e6%9e%90/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: