new relic agent for elasticsearch
2016-06-15 00:00
253 查看
流程:
MainProcess MainThread newrelic_plugin_agent.agent process L155 : Stats processed in 10.15 seconds, next wake in 49 seconds
MainProcess MainThread newrelic_plugin_agent.agent start_plugin_polling L292 : Enabling plugin: elasticsearch
MainProcess MainThread newrelic_plugin_agent.agent start_plugin_polling L296 : passed in plugin name: elasticsearch
MainProcess MainThread newrelic_plugin_agent.agent start_plugin_polling L298 : Plugin is available
MainProcess MainThread newrelic_plugin_agent.plugins.base finish L158 : ElasticSearch poll successful, completed in 0.12 seconds
MainProcess MainThread newrelic_plugin_agent.agent send_components L250 : Sending 94 metrics to NewRelic
几个关键py 及调用流程:
1. newrelic_plugin_agent/plugin/__init__.py
'elasticsearch':'newrelic_plugin_agent.plugins.elasticsearch.ElasticSearch',
2. newrelic_plugin_agent/agent.py
====
def process(self):
"""This method is called after every sleep interval. If the intention
is to use an IOLoop instead of sleep interval based daemon, override
the run method.
"""
start_time = time.time()
self.start_plugin_polling()
# Sleep for a second while threads are running
while self.threads_running:
time.sleep(1)
self.threads = list()
self.send_data_to_newrelic()
duration = time.time() - start_time
self.next_wake_interval = self._wake_interval - duration
if self.next_wake_interval < 1:
LOGGER.warning('Poll interval took greater than %i seconds',
duration)
self.next_wake_interval = int(self._wake_interval)
LOGGER.info('Stats processed in %.2f seconds, next wake in %i seconds',
duration, self.next_wake_interval)
====
def start_plugin_polling(self):
"""Iterate through each plugin and start the polling process."""
for plugin in [key for key in self.config.application.keys()
if key not in self.IGNORE_KEYS]:
LOGGER.info('Enabling plugin: %s', plugin)
plugin_class = None
# If plugin is part of the core agent plugin list
LOGGER.info('passed in plugin name: %s',plugin)
if plugin in plugins.available:
LOGGER.info("Plugin is available")
plugin_class = self._get_plugin(plugins.available[plugin])
# If plugin is in config and a qualified class name
elif '.' in plugin:
plugin_class = self._get_plugin(plugin)
# If plugin class could not be imported
if not plugin_class:
LOGGER.error('Enabled plugin %s not available', plugin)
continue
try:
self.poll_plugin(plugin, plugin_class,
self.config.application.get(plugin))
===
def poll_plugin(self, plugin_name, plugin, config):
"""Kick off a background thread to run the processing task.
:param newrelic_plugin_agent.plugins.base.Plugin plugin: The plugin
:param dict config: The config for the plugin
"""
if not isinstance(config, (list, tuple)):
config = [config]
for instance in config:
thread = threading.Thread(target=self.thread_process,
kwargs={'config': instance,
'name': plugin_name,
'plugin': plugin,
'poll_interval':
int(self._wake_interval)})
thread.run()
self.threads.append(thread)
====
def thread_process(self, name, plugin, config, poll_interval):
"""Created a thread process for the given name, plugin class,
config and poll interval. Process is added to a Queue object which
used to maintain the stack of running plugins.
:param str name: The name of the plugin
:param newrelic_plugin_agent.plugin.Plugin plugin: The plugin class
:param dictlconfig: The plugin configuration
:param int poll_interval: How often the plugin is invoked
"""
instance_name = "%s:%s" % (name, config.get('name', 'unnamed'))
obj = plugin(config, poll_interval,
self.derive_last_interval.get(instance_name))
obj.poll() # 调用elasticsearch.py 中从父类base.py继承来的poll()
3. newrelic_plugin_agent/plugin/base.py
def poll(self):
"""Poll HTTP JSON endpoint for stats data"""
self.initialize()
data = self.fetch_data()
if data:
self.add_datapoints(data)
self.finish()
4. newrelic_plugin_agent/plugin/elasticsearch.py
def add_datapoints(self, stats):
"""Add all of the datapoints for the Elasticsearch poll
:param dict stats: The stats to process for the values
"""
totals = dict()
for node in stats.get('nodes'):
if stats['nodes'][node]['host']==socket.gethostname():
totals=stats['nodes'][node]
break
self.add_index_datapoints(totals)
self.add_network_datapoints(totals)
self.add_jvm_datapoints(totals)
self.add_threadpool_datapoints(totals)
self.add_transport_datapoints(totals)
self.add_circuitbreaker_datapoints(totals)
self.add_cluster_stats()
MainProcess MainThread newrelic_plugin_agent.agent process L155 : Stats processed in 10.15 seconds, next wake in 49 seconds
MainProcess MainThread newrelic_plugin_agent.agent start_plugin_polling L292 : Enabling plugin: elasticsearch
MainProcess MainThread newrelic_plugin_agent.agent start_plugin_polling L296 : passed in plugin name: elasticsearch
MainProcess MainThread newrelic_plugin_agent.agent start_plugin_polling L298 : Plugin is available
MainProcess MainThread newrelic_plugin_agent.plugins.base finish L158 : ElasticSearch poll successful, completed in 0.12 seconds
MainProcess MainThread newrelic_plugin_agent.agent send_components L250 : Sending 94 metrics to NewRelic
几个关键py 及调用流程:
1. newrelic_plugin_agent/plugin/__init__.py
'elasticsearch':'newrelic_plugin_agent.plugins.elasticsearch.ElasticSearch',
2. newrelic_plugin_agent/agent.py
====
def process(self):
"""This method is called after every sleep interval. If the intention
is to use an IOLoop instead of sleep interval based daemon, override
the run method.
"""
start_time = time.time()
self.start_plugin_polling()
# Sleep for a second while threads are running
while self.threads_running:
time.sleep(1)
self.threads = list()
self.send_data_to_newrelic()
duration = time.time() - start_time
self.next_wake_interval = self._wake_interval - duration
if self.next_wake_interval < 1:
LOGGER.warning('Poll interval took greater than %i seconds',
duration)
self.next_wake_interval = int(self._wake_interval)
LOGGER.info('Stats processed in %.2f seconds, next wake in %i seconds',
duration, self.next_wake_interval)
====
def start_plugin_polling(self):
"""Iterate through each plugin and start the polling process."""
for plugin in [key for key in self.config.application.keys()
if key not in self.IGNORE_KEYS]:
LOGGER.info('Enabling plugin: %s', plugin)
plugin_class = None
# If plugin is part of the core agent plugin list
LOGGER.info('passed in plugin name: %s',plugin)
if plugin in plugins.available:
LOGGER.info("Plugin is available")
plugin_class = self._get_plugin(plugins.available[plugin])
# If plugin is in config and a qualified class name
elif '.' in plugin:
plugin_class = self._get_plugin(plugin)
# If plugin class could not be imported
if not plugin_class:
LOGGER.error('Enabled plugin %s not available', plugin)
continue
try:
self.poll_plugin(plugin, plugin_class,
self.config.application.get(plugin))
===
def poll_plugin(self, plugin_name, plugin, config):
"""Kick off a background thread to run the processing task.
:param newrelic_plugin_agent.plugins.base.Plugin plugin: The plugin
:param dict config: The config for the plugin
"""
if not isinstance(config, (list, tuple)):
config = [config]
for instance in config:
thread = threading.Thread(target=self.thread_process,
kwargs={'config': instance,
'name': plugin_name,
'plugin': plugin,
'poll_interval':
int(self._wake_interval)})
thread.run()
self.threads.append(thread)
====
def thread_process(self, name, plugin, config, poll_interval):
"""Created a thread process for the given name, plugin class,
config and poll interval. Process is added to a Queue object which
used to maintain the stack of running plugins.
:param str name: The name of the plugin
:param newrelic_plugin_agent.plugin.Plugin plugin: The plugin class
:param dictlconfig: The plugin configuration
:param int poll_interval: How often the plugin is invoked
"""
instance_name = "%s:%s" % (name, config.get('name', 'unnamed'))
obj = plugin(config, poll_interval,
self.derive_last_interval.get(instance_name))
obj.poll() # 调用elasticsearch.py 中从父类base.py继承来的poll()
3. newrelic_plugin_agent/plugin/base.py
def poll(self):
"""Poll HTTP JSON endpoint for stats data"""
self.initialize()
data = self.fetch_data()
if data:
self.add_datapoints(data)
self.finish()
4. newrelic_plugin_agent/plugin/elasticsearch.py
def add_datapoints(self, stats):
"""Add all of the datapoints for the Elasticsearch poll
:param dict stats: The stats to process for the values
"""
totals = dict()
for node in stats.get('nodes'):
if stats['nodes'][node]['host']==socket.gethostname():
totals=stats['nodes'][node]
break
self.add_index_datapoints(totals)
self.add_network_datapoints(totals)
self.add_jvm_datapoints(totals)
self.add_threadpool_datapoints(totals)
self.add_transport_datapoints(totals)
self.add_circuitbreaker_datapoints(totals)
self.add_cluster_stats()
相关文章推荐
- python继承中的变量
- 汇编学习----使用数字
- yii中延迟加载模块
- android事件机制
- 外贸企业如何选择域名
- dfs+线段树______Snakes(hdu 5692 2016百度之星初赛A)
- Spring JPA经验汇总
- python使用Visual Studio 2015作为IDE开发django项目的环境部署
- Android学习网站
- 瑞云亮相国际动漫节,其渲染VR游戏作品吸引众人眼球
- 漫威、星战迷不可错过的国漫节超级英雄之旅
- 文博会|从超级电脑展望VR新时代
- Learn V-ray与瑞云合作,让技术激发艺术灵感
- 云渲染服务领导品牌“瑞云”惊艳亮相京交会
- 瑞云为您开启《精灵王座》的魔幻之旅,看跨界CP的燃情虐恋
- iRedmail配置手册
- js 类的简单实现与调用
- iOS TextField 使用大全
- Day1-php 语言概述
- Day2-php 变量的声明