[sentry源码阅读] event请求与写入过程初步剖析
2017-01-20 17:55
471 查看
客户端向sentry服务发送一个错误日志在sentry内部被称为event,以js客户端为例,异常发送时的请求url为:
sentry后台基于django框架,将所有的url分为两类,api.endpoints包处理以/api/0开头的请求,web.frontend包处理其他请求。以上url请求被转给web.frontend包下的view处理,在web.urls.py文件中,查看到如下代码,发现该url被转给StoreView
1、过滤请求
这段代码检查请求字符串data携带的字段,决定是否过滤该请求。
2、限流
这段代码RateLimit对象判定当前是否限流,如果限流,请求被直接丢弃。
3、查看该请求是否已经被处理过,以及过滤请求中敏感字段:
同时在缓存中判断该请求是否已经存在:
4、将请求内容插入数据库:
这个是分析的重点,随后分析。
5、在缓存中写入一个flag,以免5分钟内对event_id相同的请求进行重复处理。然后返回response给客户端
首先,如果data是延迟加载的,则通过items()方法打破延时。LazyData的实现在sentry/coreapi.py中,items()方法会触发_decode()方法的执行,将请求字符串反序列为一个dict对象。然后,将data设置一个e:{project}:{event_id]的key,丢入缓存(sentry的缓存使用redis或者memcache),并将sentry.tasks.store.preprocess_event任务丢入celery队列后返回。
sentry使用3个异步任务完成请求的处理与写入,在name=events.preprocess_event的队列中,顺序完成对data的处理和写入,这三个异步任务为:
1、sentry.tasks.store.preprocess_event任务
这个task主要完成的任务是:
1)根据cache_key,从缓存中拿到data.
2) 遍历plugin列表,挨个儿对data进行处理:
然后,将接下来的任务,交给sentry.tasks.store.process_event,重新扔进队列
2、sentry.tasks.store.process_event
执行内容和preprocess类似,进一步完成对data的处理,然后将写入数据库的操作封装为sentry.tasks.store.save_event任务,再次扔入队列
3、sentry.tasks.store.save_event
完成data的写入,如源码所示,调用了EventManager的save()方法处理请求并写入,稍后重点分析
4、sentry.tasks.post_process.post_process_group
在EventManager的save()方法的最后一步,开启这个异步任务,post_process_group依然遍历plugin处理请求,其具体功能还没有深入分析,
1、抽取data的 event_id、level、date等等字段,封装一个Event对象。Event是一个model,对应sentry_message数据表.
2、生成tag。 每一个event都有一个tag字段,它是一个dict,其内容在之后被写入到EventTag、TagKey、TagValue三个model中。
其中,EventTag对应sentry_eventtag表,将TagKey的id和TagValue的id对应,TagKey的id和TagValue保存tag的具体内容。
3、根据data生成fingerprint,并根据fingerprint和event_id生成hashes:
接下来该写入数据表了。
4、event聚合,并写入:
这一步根据release、project字段,获取或者创建sentry_release的一个记录,然后在_save_aggregate中获取或者创建一个issue记录,并返回group(即issue)
5、写入sentry_eventmapping:
eventmapping保存了group到event的映射,这里是event相关信息被第一次写入
6、更新tsdb
首先根据project和group,获取environment和grouprelease,然后更新tsdb。如果group是刚刚创建的,则新建environment和grouprelease。grouprelease保存group的一些基本信息,如first_seen,last_seen等.
7、更新event_id为当前event的sentry_userreport记录
sentry_filterkey、sentry_filtervalue像一个常量表,sentry_eventtag保存了每个event的<key,value>tag,key和value指向这个两个表
9、发送first_event_received 信号,并开启post_process任务
这个函数中,会更新护或者创建OrganizationOnboardingTask model的记录,对应sentry_organizationonboardingtask表,这个表记录了每一个organization的task数目,状态以及完成的时间。
目前看来,直接包含event信息的表有:
1、sentry_message
2、sentry_eventmapping
3、sentry_eventtag
4、sentry_eventmapping
另外的一些表,如sentry_groupedmessage和sentry_grouprelease,其中的first_seen和last_seen也间接包含了event的信息。清理数据库中的event记录时,应该考虑怎么更新这些信息。
http://localhost:8000/api/2/store/?sentry_version=7&sentry_client=raven-js%2F3.8.1&sentry_key=36db8da42fa84f83bac146be5096815c
sentry后台基于django框架,将所有的url分为两类,api.endpoints包处理以/api/0开头的请求,web.frontend包处理其他请求。以上url请求被转给web.frontend包下的view处理,在web.urls.py文件中,查看到如下代码,发现该url被转给StoreView
# in web.urls.py urlpatterns += patterns( '', # Store endpoints first since they are the most active url(r'^api/store/$', api.StoreView.as_view(), name='sentry-api-store'), url(r'^api/(?P<project_id>[\w_-]+)/store/$', api.StoreView.as_view(), name='sentry-api-store'), url(r'^api/(?P<project_id>\d+)/csp-report/$', api.CspReportView.as_view(), name='sentry-api-csp-report'),
StoreView:
StoreView类中,有get、post两个方法,这两个方法都会调用process()方法。process()方法是处理请求的入口方法,请求的处理大体步骤为:1、过滤请求
if helper.should_filter(project, data, ip_address=remote_addr):
这段代码检查请求字符串data携带的字段,决定是否过滤该请求。
2、限流
if rate_limit is None or rate_limit.is_limited:
这段代码RateLimit对象判定当前是否限流,如果限流,请求被直接丢弃。
3、查看该请求是否已经被处理过,以及过滤请求中敏感字段:
if scrub_data: # We filter data immediately before it ever gets into the queue sensitive_fields_key = 'sentry:sensitive_fields'
同时在缓存中判断该请求是否已经存在:
cache_key = 'ev:%s:%s' % (project.id, event_id,) if cache.get(cache_key) is not None: raise APIForbidden('An event with the same ID already exists (%s)' % (event_id,))
4、将请求内容插入数据库:
# mutates data (strips a lot of context if not queued) helper.insert_data_to_database(data)
这个是分析的重点,随后分析。
5、在缓存中写入一个flag,以免5分钟内对event_id相同的请求进行重复处理。然后返回response给客户端
cache.set(cache_key, '', 60 * 5)
异步处理:
接下来跟踪helper.insert_data_to_database的执行过程,这个函数非常短,异步处理请求并写入数据库,将任务扔入celery后立刻返回。def insert_data_to_database(self, data): import pdb # pdb.set_trace() # we might be passed LazyData if isinstance(data, LazyData): data = dict(data.items()) cache_key = 'e:{1}:{0}'.format(data['project'], data['event_id']) default_cache.set(cache_key, data, timeout=3600) preprocess_event.delay(cache_key=cache_key, start_time=time())
首先,如果data是延迟加载的,则通过items()方法打破延时。LazyData的实现在sentry/coreapi.py中,items()方法会触发_decode()方法的执行,将请求字符串反序列为一个dict对象。然后,将data设置一个e:{project}:{event_id]的key,丢入缓存(sentry的缓存使用redis或者memcache),并将sentry.tasks.store.preprocess_event任务丢入celery队列后返回。
sentry使用3个异步任务完成请求的处理与写入,在name=events.preprocess_event的队列中,顺序完成对data的处理和写入,这三个异步任务为:
1、sentry.tasks.store.preprocess_event任务
这个task主要完成的任务是:
1)根据cache_key,从缓存中拿到data.
2) 遍历plugin列表,挨个儿对data进行处理:
for plugin in plugins.all(version=2): processors = safe_execute(plugin.get_event_preprocessors, data=data, _with_transaction=False) for processor in (processors or ()): # On the first processor found, we just defer to the process_event # queue to handle the actual work. process_event.delay(cache_key=cache_key, start_time=start_time) return
然后,将接下来的任务,交给sentry.tasks.store.process_event,重新扔进队列
2、sentry.tasks.store.process_event
执行内容和preprocess类似,进一步完成对data的处理,然后将写入数据库的操作封装为sentry.tasks.store.save_event任务,再次扔入队列
3、sentry.tasks.store.save_event
完成data的写入,如源码所示,调用了EventManager的save()方法处理请求并写入,稍后重点分析
try: manager = EventManager(data) manager.save(project) finally: if cache_key: default_cache.delete(cache_key) if start_time: metrics.timing('events.time-to-process', time() - start_time, instance=data['platform'])
4、sentry.tasks.post_process.post_process_group
在EventManager的save()方法的最后一步,开启这个异步任务,post_process_group依然遍历plugin处理请求,其具体功能还没有深入分析,
EventManager.save()分析:
这个函数完成了对data各个字段的解析,存入多个数据表。这个函数有几百行,处理逻辑比较复杂,读起来也很痛苦,目前还没能理解的很好。大体总结一下,这个函数完成了如下的内容:1、抽取data的 event_id、level、date等等字段,封装一个Event对象。Event是一个model,对应sentry_message数据表.
# First we pull out our top-level (non-data attr) kwargs event_id = data.pop('event_id') level = data.pop('level') culprit = data.pop('culprit', None) logger_name = data.pop('logger', None) ... # 封装Event对象 event = Event( project_id=project.id, event_id=event_id, data=data, time_spent=time_spent, datetime=date, **kwargs )
2、生成tag。 每一个event都有一个tag字段,它是一个dict,其内容在之后被写入到EventTag、TagKey、TagValue三个model中。
tags = dict(data.get('tags') or []) tags['level'] = LOG_LEVELS[level] if logger_name: tags['logger'] = logger_name if server_name: tags['server_name'] = server_name if site: tags['site'] = site if environment: tags['environment'] = environment if transaction_name: tags['transaction'] = transaction_name
其中,EventTag对应sentry_eventtag表,将TagKey的id和TagValue的id对应,TagKey的id和TagValue保存tag的具体内容。
3、根据data生成fingerprint,并根据fingerprint和event_id生成hashes:
if fingerprint: hashes = [ md5_from_hash(h) for h in get_hashes_from_fingerprint(event, fingerprint) ] elif checksum: hashes = [checksum] data['checksum'] = checksum else: hashes = [ md5_from_hash(h) for h in get_hashes_for_event(event) ]hashed的作用是在_save_aggregate中将多个event聚合为一个issue,每一个event,根据其hashes,被映射到一个Group的group_id。Group是issue的model表示,在postgres中对应sentry_groupedmessage表
def _save_aggregate(self, event, hashes, release, **kwargs): project = event.project # attempt to find a matching hash all_hashes = self._find_hashes(project, hashes) try: existing_group_id = six.next(h[0] for h in all_hashes if h[0]) except StopIteration: existing_group_id = None
接下来该写入数据表了。
4、event聚合,并写入:
if release: release = Release.get_or_create( project=project, version=release, date_added=date, ) group_kwargs['first_release'] = release # print('release '+str(release)) group, is_new, is_regression, is_sample = self._save_aggregate( event=event, hashes=hashes, release=release, **group_kwargs )
这一步根据release、project字段,获取或者创建sentry_release的一个记录,然后在_save_aggregate中获取或者创建一个issue记录,并返回group(即issue)
if existing_group_id is None: kwargs['score'] = ScoreClause.calculate(1, kwargs['last_seen']) with transaction.atomic(): short_id = project.next_short_id() group, group_is_new = Group.objects.create( project=project, short_id=short_id, **kwargs ), True else: group = Group.objects.get(id=existing_group_id) group_is_new = False
5、写入sentry_eventmapping:
try: with transaction.atomic(using=router.db_for_write(EventMapping)): EventMapping.objects.create( project=project, group=group, event_id=event_id)
eventmapping保存了group到event的映射,这里是event相关信息被第一次写入
6、更新tsdb
首先根据project和group,获取environment和grouprelease,然后更新tsdb。如果group是刚刚创建的,则新建environment和grouprelease。grouprelease保存group的一些基本信息,如first_seen,last_seen等.
7、更新event_id为当前event的sentry_userreport记录
reportToUpdate = UserReport.objects.filter( project=project, event_id=event_id, ) # print('UserReport.filter:' + str(reportToUpdate)) reportToUpdate.update(group=group)8、将event写入sentry_message表,并将tag写入sentry_eventtag、sentry_filterkey、sentry_filtervalue表:
# save the event unless its been sampled if not is_sample: try: with transaction.atomic(using=router.db_for_write(Event)): event.save() except IntegrityError: self.logger.info('duplicate.found', exc_info=True, extra={ 'event_id': event_id, 'project_id': project.id, 'group_id': group.id, 'model': Event.__name__, }) return event index_event_tags.delay( project_id=project.id, group_id=group.id, event_id=event.id, tags=tags, )
sentry_filterkey、sentry_filtervalue像一个常量表,sentry_eventtag保存了每个event的<key,value>tag,key和value指向这个两个表
9、发送first_event_received 信号,并开启post_process任务
print('................' + str(group.id)) if not raw: if not project.first_event: project.update(first_event=date) first_event_received.send(project=project, group=group, sender=Project) print('................' + str(group.id)) post_process_group.delay( group=group, event=event, is_new=is_new, is_sample=is_sample, is_regression=is_regression, ) else: self.logger.info('post_process.skip.raw_event', extra={'event_id': event.id})first_event_received信号在sentry/receivers/onbroading.py中被接收:
@first_event_received.connect(weak=False) def record_first_event(project, group, **kwargs):
这个函数中,会更新护或者创建OrganizationOnboardingTask model的记录,对应sentry_organizationonboardingtask表,这个表记录了每一个organization的task数目,状态以及完成的时间。
目前看来,直接包含event信息的表有:
1、sentry_message
2、sentry_eventmapping
3、sentry_eventtag
4、sentry_eventmapping
另外的一些表,如sentry_groupedmessage和sentry_grouprelease,其中的first_seen和last_seen也间接包含了event的信息。清理数据库中的event记录时,应该考虑怎么更新这些信息。
相关文章推荐
- Tomcat源码阅读之请求过程
- hdfs源码剖析文件写入过程时序图
- Spring MVC处理请求过程(Spring MVC源码阅读系列之二)
- 客户端向HDFS写入过程源码跟踪及剖析
- Yii2.0源码阅读-一次请求的完整过程
- 深入理解 Tomcat(九)源码剖析之请求过程
- SpringMVC源码剖析(四)- DispatcherServlet请求转发的实现
- UDT源码剖析(六):UDT::socket()过程代码注释
- ASP.NET温故而知新学习系列之深度剖析ASP.NET架构—ASP.NET请求的处理过程(一)
- ucos在s3c2410上运行过程整体剖析之基础知识-c语言和堆栈 分类: μc /os ii 系统有关知识 2012-03-08 21:00 586人阅读 评论(0) 收藏
- SpringMVC源码剖析(四)- DispatcherServlet请求转发的实现
- ucos在s3c2410上运行过程整体剖析---两种任务切换的实现方法 分类: μc /os ii 系统有关知识 2012-03-13 21:41 1102人阅读 评论(0) 收藏
- UDT源码剖析(三):UDT::startup()过程代码注释
- 非典型2D游戏引擎 Orx 源码阅读笔记(5) core部分(config,event)
- ASP.NET温故而知新学习系列之深度剖析ASP.NET架构—ASP.NET请求的处理过程(一)
- Tomcat 请求过程源码解析(二)
- UDT源码剖析(五):UDT::cleanup()过程代码注释
- 深度剖析ASP.NET架构—ASP.NET请求的处理过程(一)
- struts2的请求处理过程源码分析
- ucos在s3c2410上运行过程整体剖析-从加电到执行main函数 分类: μc /os ii 系统有关知识 2012-03-13 21:27 2409人阅读 评论(1) 收藏