您的位置:首页 > 其它

[sentry源码阅读] event请求与写入过程初步剖析

2017-01-20 17:55 471 查看
客户端向sentry服务发送一个错误日志在sentry内部被称为event,以js客户端为例,异常发送时的请求url为:

http://localhost:8000/api/2/store/?sentry_version=7&sentry_client=raven-js%2F3.8.1&sentry_key=36db8da42fa84f83bac146be5096815c

sentry后台基于django框架,将所有的url分为两类,api.endpoints包处理以/api/0开头的请求,web.frontend包处理其他请求。以上url请求被转给web.frontend包下的view处理,在web.urls.py文件中,查看到如下代码,发现该url被转给StoreView

# in web.urls.py

urlpatterns += patterns(
'',
# Store endpoints first since they are the most active
url(r'^api/store/$', api.StoreView.as_view(),
name='sentry-api-store'),
url(r'^api/(?P<project_id>[\w_-]+)/store/$', api.StoreView.as_view(),
name='sentry-api-store'),
url(r'^api/(?P<project_id>\d+)/csp-report/$', api.CspReportView.as_view(),
name='sentry-api-csp-report'),


StoreView:

StoreView类中,有get、post两个方法,这两个方法都会调用process()方法。process()方法是处理请求的入口方法,请求的处理大体步骤为:

1、过滤请求

if helper.should_filter(project, data, ip_address=remote_addr):

这段代码检查请求字符串data携带的字段,决定是否过滤该请求。

2、限流

if rate_limit is None or rate_limit.is_limited:

这段代码RateLimit对象判定当前是否限流,如果限流,请求被直接丢弃。

3、查看该请求是否已经被处理过,以及过滤请求中敏感字段:

if scrub_data:
# We filter data immediately before it ever gets into the queue
sensitive_fields_key = 'sentry:sensitive_fields'


同时在缓存中判断该请求是否已经存在:

cache_key = 'ev:%s:%s' % (project.id, event_id,)

if cache.get(cache_key) is not None:
raise APIForbidden('An event with the same ID already exists (%s)' % (event_id,))


4、将请求内容插入数据库:

# mutates data (strips a lot of context if not queued)
helper.insert_data_to_database(data)

这个是分析的重点,随后分析。

5、在缓存中写入一个flag,以免5分钟内对event_id相同的请求进行重复处理。然后返回response给客户端

cache.set(cache_key, '', 60 * 5)


异步处理:

接下来跟踪helper.insert_data_to_database的执行过程,这个函数非常短,异步处理请求并写入数据库,将任务扔入celery后立刻返回。

def insert_data_to_database(self, data):
import pdb
# pdb.set_trace()
# we might be passed LazyData
if isinstance(data, LazyData):
data = dict(data.items())
cache_key = 'e:{1}:{0}'.format(data['project'], data['event_id'])
default_cache.set(cache_key, data, timeout=3600)
preprocess_event.delay(cache_key=cache_key, start_time=time())

首先,如果data是延迟加载的,则通过items()方法打破延时。LazyData的实现在sentry/coreapi.py中,items()方法会触发_decode()方法的执行,将请求字符串反序列为一个dict对象。然后,将data设置一个e:{project}:{event_id]的key,丢入缓存(sentry的缓存使用redis或者memcache),并将sentry.tasks.store.preprocess_event任务丢入celery队列后返回。

sentry使用3个异步任务完成请求的处理与写入,在name=events.preprocess_event的队列中,顺序完成对data的处理和写入,这三个异步任务为:

1、sentry.tasks.store.preprocess_event任务

这个task主要完成的任务是:

1)根据cache_key,从缓存中拿到data.

2)  遍历plugin列表,挨个儿对data进行处理:

for plugin in plugins.all(version=2):
processors = safe_execute(plugin.get_event_preprocessors, data=data, _with_transaction=False)
for processor in (processors or ()):
# On the first processor found, we just defer to the process_event
# queue to handle the actual work.
process_event.delay(cache_key=cache_key, start_time=start_time)
return

然后,将接下来的任务,交给sentry.tasks.store.process_event,重新扔进队列

2、sentry.tasks.store.process_event

执行内容和preprocess类似,进一步完成对data的处理,然后将写入数据库的操作封装为sentry.tasks.store.save_event任务,再次扔入队列

3、sentry.tasks.store.save_event

完成data的写入,如源码所示,调用了EventManager的save()方法处理请求并写入,稍后重点分析

try:
manager = EventManager(data)
manager.save(project)
finally:
if cache_key:
default_cache.delete(cache_key)
if start_time:
metrics.timing('events.time-to-process', time() - start_time,
instance=data['platform'])


4、sentry.tasks.post_process.post_process_group

在EventManager的save()方法的最后一步,开启这个异步任务,post_process_group依然遍历plugin处理请求,其具体功能还没有深入分析,

EventManager.save()分析:

这个函数完成了对data各个字段的解析,存入多个数据表。这个函数有几百行,处理逻辑比较复杂,读起来也很痛苦,目前还没能理解的很好。大体总结一下,这个函数完成了如下的内容:

1、抽取data的 event_id、level、date等等字段,封装一个Event对象。Event是一个model,对应sentry_message数据表.

# First we pull out our top-level (non-data attr) kwargs
event_id = data.pop('event_id')
level = data.pop('level')

culprit = data.pop('culprit', None)
logger_name = data.pop('logger', None)
...
# 封装Event对象
event = Event(
project_id=project.id,
event_id=event_id,
data=data,
time_spent=time_spent,
datetime=date,
**kwargs
)


2、生成tag。 每一个event都有一个tag字段,它是一个dict,其内容在之后被写入到EventTag、TagKey、TagValue三个model中。

tags = dict(data.get('tags') or [])
tags['level'] = LOG_LEVELS[level]
if logger_name:
tags['logger'] = logger_name
if server_name:
tags['server_name'] = server_name
if site:
tags['site'] = site
if environment:
tags['environment'] = environment
if transaction_name:
tags['transaction'] = transaction_name

其中,EventTag对应sentry_eventtag表,将TagKey的id和TagValue的id对应,TagKey的id和TagValue保存tag的具体内容。

3、根据data生成fingerprint,并根据fingerprint和event_id生成hashes:

if fingerprint:
hashes = [
md5_from_hash(h)
for h in get_hashes_from_fingerprint(event, fingerprint)
]
elif checksum:
hashes = [checksum]
data['checksum'] = checksum
else:
hashes = [
md5_from_hash(h)
for h in get_hashes_for_event(event)
]
hashed的作用是在_save_aggregate中将多个event聚合为一个issue,每一个event,根据其hashes,被映射到一个Group的group_id。Group是issue的model表示,在postgres中对应sentry_groupedmessage表

def _save_aggregate(self, event, hashes, release, **kwargs):
project = event.project

# attempt to find a matching hash
all_hashes = self._find_hashes(project, hashes)

try:
existing_group_id = six.next(h[0] for h in all_hashes if h[0])
except StopIteration:
existing_group_id = None


接下来该写入数据表了。

4、event聚合,并写入:

if release:
release = Release.get_or_create(
project=project,
version=release,
date_added=date,
)

group_kwargs['first_release'] = release

# print('release '+str(release))
group, is_new, is_regression, is_sample = self._save_aggregate(
event=event,
hashes=hashes,
release=release,
**group_kwargs
)

这一步根据release、project字段,获取或者创建sentry_release的一个记录,然后在_save_aggregate中获取或者创建一个issue记录,并返回group(即issue)

if existing_group_id is None:
kwargs['score'] = ScoreClause.calculate(1, kwargs['last_seen'])
with transaction.atomic():
short_id = project.next_short_id()
group, group_is_new = Group.objects.create(
project=project,
short_id=short_id,
**kwargs
), True
else:
group = Group.objects.get(id=existing_group_id)

group_is_new = False


5、写入sentry_eventmapping:

try:
with transaction.atomic(using=router.db_for_write(EventMapping)):
EventMapping.objects.create(
project=project, group=group, event_id=event_id)

eventmapping保存了group到event的映射,这里是event相关信息被第一次写入

6、更新tsdb

首先根据project和group,获取environment和grouprelease,然后更新tsdb。如果group是刚刚创建的,则新建environment和grouprelease。grouprelease保存group的一些基本信息,如first_seen,last_seen等.

7、更新event_id为当前event的sentry_userreport记录

reportToUpdate = UserReport.objects.filter(
project=project, event_id=event_id,
)
# print('UserReport.filter:' + str(reportToUpdate))
reportToUpdate.update(group=group)
8、将event写入sentry_message表,并将tag写入sentry_eventtag、sentry_filterkey、sentry_filtervalue表:

# save the event unless its been sampled
if not is_sample:
try:
with transaction.atomic(using=router.db_for_write(Event)):
event.save()
except IntegrityError:
self.logger.info('duplicate.found', exc_info=True, extra={
'event_id': event_id,
'project_id': project.id,
'group_id': group.id,
'model': Event.__name__,
})
return event

index_event_tags.delay(
project_id=project.id,
group_id=group.id,
event_id=event.id,
tags=tags,
)


sentry_filterkey、sentry_filtervalue像一个常量表,sentry_eventtag保存了每个event的<key,value>tag,key和value指向这个两个表

9、发送first_event_received 信号,并开启post_process任务

print('................' + str(group.id))
if not raw:
if not project.first_event:
project.update(first_event=date)
first_event_received.send(project=project, group=group, sender=Project)
print('................' + str(group.id))
post_process_group.delay(
group=group,
event=event,
is_new=is_new,
is_sample=is_sample,
is_regression=is_regression,
)
else:
self.logger.info('post_process.skip.raw_event', extra={'event_id': event.id})
first_event_received信号在sentry/receivers/onbroading.py中被接收:

@first_event_received.connect(weak=False)
def record_first_event(project, group, **kwargs):

这个函数中,会更新护或者创建OrganizationOnboardingTask model的记录,对应sentry_organizationonboardingtask表,这个表记录了每一个organization的task数目,状态以及完成的时间。

目前看来,直接包含event信息的表有:

1、sentry_message

2、sentry_eventmapping

3、sentry_eventtag

4、sentry_eventmapping

另外的一些表,如sentry_groupedmessage和sentry_grouprelease,其中的first_seen和last_seen也间接包含了event的信息。清理数据库中的event记录时,应该考虑怎么更新这些信息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐