您的位置:首页 > 其它

new relic agent for elasticsearch

2016-06-15 00:00 253 查看
流程:

MainProcess MainThread newrelic_plugin_agent.agent process L155 : Stats processed in 10.15 seconds, next wake in 49 seconds
MainProcess MainThread newrelic_plugin_agent.agent start_plugin_polling L292 : Enabling plugin: elasticsearch
MainProcess MainThread newrelic_plugin_agent.agent start_plugin_polling L296 : passed in plugin name: elasticsearch
MainProcess MainThread newrelic_plugin_agent.agent start_plugin_polling L298 : Plugin is available
MainProcess MainThread newrelic_plugin_agent.plugins.base finish L158 : ElasticSearch poll successful, completed in 0.12 seconds
MainProcess MainThread newrelic_plugin_agent.agent send_components L250 : Sending 94 metrics to NewRelic

几个关键py 及调用流程:

1. newrelic_plugin_agent/plugin/__init__.py

'elasticsearch':'newrelic_plugin_agent.plugins.elasticsearch.ElasticSearch',

2. newrelic_plugin_agent/agent.py

====

def process(self):
"""This method is called after every sleep interval. If the intention
is to use an IOLoop instead of sleep interval based daemon, override
the run method.

"""
start_time = time.time()
self.start_plugin_polling()

# Sleep for a second while threads are running
while self.threads_running:
time.sleep(1)

self.threads = list()
self.send_data_to_newrelic()
duration = time.time() - start_time
self.next_wake_interval = self._wake_interval - duration
if self.next_wake_interval < 1:
LOGGER.warning('Poll interval took greater than %i seconds',
duration)
self.next_wake_interval = int(self._wake_interval)
LOGGER.info('Stats processed in %.2f seconds, next wake in %i seconds',
duration, self.next_wake_interval)

====

def start_plugin_polling(self):
"""Iterate through each plugin and start the polling process."""
for plugin in [key for key in self.config.application.keys()
if key not in self.IGNORE_KEYS]:
LOGGER.info('Enabling plugin: %s', plugin)
plugin_class = None

# If plugin is part of the core agent plugin list
LOGGER.info('passed in plugin name: %s',plugin)
if plugin in plugins.available:
LOGGER.info("Plugin is available")
plugin_class = self._get_plugin(plugins.available[plugin])

# If plugin is in config and a qualified class name
elif '.' in plugin:
plugin_class = self._get_plugin(plugin)

# If plugin class could not be imported
if not plugin_class:
LOGGER.error('Enabled plugin %s not available', plugin)
continue
try:
self.poll_plugin(plugin, plugin_class,
self.config.application.get(plugin))

===

def poll_plugin(self, plugin_name, plugin, config):
"""Kick off a background thread to run the processing task.

:param newrelic_plugin_agent.plugins.base.Plugin plugin: The plugin
:param dict config: The config for the plugin

"""

if not isinstance(config, (list, tuple)):
config = [config]

for instance in config:
thread = threading.Thread(target=self.thread_process,
kwargs={'config': instance,
'name': plugin_name,
'plugin': plugin,
'poll_interval':
int(self._wake_interval)})
thread.run()
self.threads.append(thread)

====

def thread_process(self, name, plugin, config, poll_interval):
"""Created a thread process for the given name, plugin class,
config and poll interval. Process is added to a Queue object which
used to maintain the stack of running plugins.

:param str name: The name of the plugin
:param newrelic_plugin_agent.plugin.Plugin plugin: The plugin class
:param dictlconfig: The plugin configuration
:param int poll_interval: How often the plugin is invoked

"""
instance_name = "%s:%s" % (name, config.get('name', 'unnamed'))
obj = plugin(config, poll_interval,
self.derive_last_interval.get(instance_name))
obj.poll() # 调用elasticsearch.py 中从父类base.py继承来的poll()

3. newrelic_plugin_agent/plugin/base.py

def poll(self):
"""Poll HTTP JSON endpoint for stats data"""
self.initialize()
data = self.fetch_data()
if data:
self.add_datapoints(data)
self.finish()

4. newrelic_plugin_agent/plugin/elasticsearch.py

def add_datapoints(self, stats):
"""Add all of the datapoints for the Elasticsearch poll

:param dict stats: The stats to process for the values

"""
totals = dict()
for node in stats.get('nodes'):
if stats['nodes'][node]['host']==socket.gethostname():
totals=stats['nodes'][node]
break
self.add_index_datapoints(totals)
self.add_network_datapoints(totals)
self.add_jvm_datapoints(totals)
self.add_threadpool_datapoints(totals)
self.add_transport_datapoints(totals)
self.add_circuitbreaker_datapoints(totals)
self.add_cluster_stats()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: