Mirror of Apache Kafka
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

298 lines
14 KiB

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#!/usr/bin/env python
# ===================================
# file: metrics.py
# ===================================
import inspect
import json
import logging
import os
import signal
import subprocess
import sys
import traceback
import csv
import time
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
from collections import namedtuple
import numpy
from pyh import *
import kafka_system_test_utils
import system_test_utils
logger = logging.getLogger("namedLogger")
thisClassName = '(metrics)'
d = {'name_of_class': thisClassName}
attributeNameToNameInReportedFileMap = {
'Min': 'min',
'Max': 'max',
'Mean': 'mean',
'50thPercentile': 'median',
'StdDev': 'stddev',
'95thPercentile': '95%',
'99thPercentile': '99%',
'999thPercentile': '99.9%',
'Count': 'count',
'OneMinuteRate': '1 min rate',
'MeanRate': 'mean rate',
'FiveMinuteRate': '5 min rate',
'FifteenMinuteRate': '15 min rate',
'Value': 'value'
}
def getCSVFileNameFromMetricsMbeanName(mbeanName):
return mbeanName.replace(":type=", ".").replace(",name=", ".") + ".csv"
def read_metrics_definition(metricsFile):
metricsFileData = open(metricsFile, "r").read()
metricsJsonData = json.loads(metricsFileData)
allDashboards = metricsJsonData['dashboards']
allGraphs = []
for dashboard in allDashboards:
dashboardName = dashboard['name']
graphs = dashboard['graphs']
for graph in graphs:
bean = graph['bean_name']
allGraphs.append(graph)
attributes = graph['attributes']
#print "Filtering on attributes " + attributes
return allGraphs
def get_dashboard_definition(metricsFile, role):
metricsFileData = open(metricsFile, "r").read()
metricsJsonData = json.loads(metricsFileData)
allDashboards = metricsJsonData['dashboards']
dashboardsForRole = []
for dashboard in allDashboards:
if dashboard['role'] == role:
dashboardsForRole.append(dashboard)
return dashboardsForRole
def ensure_valid_headers(headers, attributes):
if headers[0] != "# time":
raise Exception("First column should be time")
for header in headers:
logger.debug(header, extra=d)
# there should be exactly one column with a name that matches attributes
try:
attributeColumnIndex = headers.index(attributes)
return attributeColumnIndex
except ValueError as ve:
#print "#### attributes : ", attributes
#print "#### headers : ", headers
raise Exception("There should be exactly one column that matches attribute: {0} in".format(attributes) +
" headers: {0}".format(",".join(headers)))
def plot_graphs(inputCsvFiles, labels, title, xLabel, yLabel, attribute, outputGraphFile):
if not inputCsvFiles: return
# create empty plot
fig=plt.figure()
fig.subplots_adjust(bottom=0.2)
ax=fig.add_subplot(111)
labelx = -0.3 # axes coords
ax.set_xlabel(xLabel)
ax.set_ylabel(yLabel)
ax.grid()
#ax.yaxis.set_label_coords(labelx, 0.5)
Coordinates = namedtuple("Coordinates", 'x y')
plots = []
coordinates = []
# read data for all files, organize by label in a dict
for fileAndLabel in zip(inputCsvFiles, labels):
inputCsvFile = fileAndLabel[0]
label = fileAndLabel[1]
csv_reader = list(csv.reader(open(inputCsvFile, "rb")))
x,y = [],[]
xticks_labels = []
try:
# read first line as the headers
headers = csv_reader.pop(0)
attributeColumnIndex = ensure_valid_headers(headers, attributeNameToNameInReportedFileMap[attribute])
logger.debug("Column index for attribute {0} is {1}".format(attribute, attributeColumnIndex), extra=d)
start_time = (int)(os.path.getctime(inputCsvFile) * 1000)
int(csv_reader[0][0])
for line in csv_reader:
if(len(line) == 0):
continue
yVal = float(line[attributeColumnIndex])
xVal = int(line[0])
y.append(yVal)
epoch= start_time + int(line[0])
x.append(xVal)
xticks_labels.append(time.strftime("%H:%M:%S", time.localtime(epoch)))
coordinates.append(Coordinates(xVal, yVal))
p1 = ax.plot(x,y)
plots.append(p1)
except Exception as e:
logger.error("ERROR while plotting data for {0}: {1}".format(inputCsvFile, e), extra=d)
traceback.print_exc()
# find xmin, xmax, ymin, ymax from all csv files
xmin = min(map(lambda coord: coord.x, coordinates))
xmax = max(map(lambda coord: coord.x, coordinates))
ymin = min(map(lambda coord: coord.y, coordinates))
ymax = max(map(lambda coord: coord.y, coordinates))
# set x and y axes limits
plt.xlim(xmin, xmax)
plt.ylim(ymin, ymax)
# set ticks accordingly
xticks = numpy.arange(xmin, xmax, 0.2*xmax)
# yticks = numpy.arange(ymin, ymax)
plt.xticks(xticks,xticks_labels,rotation=17)
# plt.yticks(yticks)
plt.legend(plots,labels, loc=2)
plt.title(title)
plt.savefig(outputGraphFile)
def draw_all_graphs(metricsDescriptionFile, testcaseEnv, clusterConfig):
# go through each role and plot graphs for the role's metrics
roles = set(map(lambda config: config['role'], clusterConfig))
for role in roles:
dashboards = get_dashboard_definition(metricsDescriptionFile, role)
entities = kafka_system_test_utils.get_entities_for_role(clusterConfig, role)
for dashboard in dashboards:
graphs = dashboard['graphs']
# draw each graph for all entities
draw_graph_for_role(graphs, entities, role, testcaseEnv)
def draw_graph_for_role(graphs, entities, role, testcaseEnv):
for graph in graphs:
graphName = graph['graph_name']
yLabel = graph['y_label']
inputCsvFiles = []
graphLegendLabels = []
for entity in entities:
entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entity['entity_id'], "metrics")
entityMetricCsvFile = entityMetricsDir + "/" + getCSVFileNameFromMetricsMbeanName(graph['bean_name'])
if(not os.path.exists(entityMetricCsvFile)):
logger.warn("The file {0} does not exist for plotting".format(entityMetricCsvFile), extra=d)
else:
inputCsvFiles.append(entityMetricCsvFile)
graphLegendLabels.append(role + "-" + entity['entity_id'])
# print "Plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
try:
# plot one graph per mbean attribute
labels = graph['y_label'].split(',')
fullyQualifiedAttributeNames = map(lambda attribute: graph['bean_name'] + ':' + attribute,
graph['attributes'].split(','))
attributes = graph['attributes'].split(',')
for labelAndAttribute in zip(labels, fullyQualifiedAttributeNames, attributes):
outputGraphFile = testcaseEnv.testCaseDashboardsDir + "/" + role + "/" + labelAndAttribute[1] + ".svg"
plot_graphs(inputCsvFiles, graphLegendLabels, graph['graph_name'] + '-' + labelAndAttribute[2],
"time", labelAndAttribute[0], labelAndAttribute[2], outputGraphFile)
# print "Finished plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
except Exception as e:
logger.error("ERROR while plotting graph {0}: {1}".format(outputGraphFile, e), extra=d)
traceback.print_exc()
def build_all_dashboards(metricsDefinitionFile, testcaseDashboardsDir, clusterConfig):
metricsHtmlFile = testcaseDashboardsDir + "/metrics.html"
centralDashboard = PyH('Kafka Metrics Dashboard')
centralDashboard << h1('Kafka Metrics Dashboard', cl='center')
roles = set(map(lambda config: config['role'], clusterConfig))
for role in roles:
entities = kafka_system_test_utils.get_entities_for_role(clusterConfig, role)
dashboardPagePath = build_dashboard_for_role(metricsDefinitionFile, role,
entities, testcaseDashboardsDir)
centralDashboard << a(role, href = dashboardPagePath)
centralDashboard << br()
centralDashboard.printOut(metricsHtmlFile)
def build_dashboard_for_role(metricsDefinitionFile, role, entities, testcaseDashboardsDir):
# build all dashboards for the input entity's based on its role. It can be one of kafka, zookeeper, producer
# consumer
dashboards = get_dashboard_definition(metricsDefinitionFile, role)
entityDashboard = PyH('Kafka Metrics Dashboard for ' + role)
entityDashboard << h1('Kafka Metrics Dashboard for ' + role, cl='center')
entityDashboardHtml = testcaseDashboardsDir + "/" + role + "-dashboards.html"
for dashboard in dashboards:
# place the graph svg files in this dashboard
allGraphs = dashboard['graphs']
for graph in allGraphs:
attributes = map(lambda attribute: graph['bean_name'] + ':' + attribute,
graph['attributes'].split(','))
for attribute in attributes:
graphFileLocation = testcaseDashboardsDir + "/" + role + "/" + attribute + ".svg"
entityDashboard << embed(src = graphFileLocation, type = "image/svg+xml")
entityDashboard.printOut(entityDashboardHtml)
return entityDashboardHtml
def start_metrics_collection(jmxHost, jmxPort, role, entityId, systemTestEnv, testcaseEnv):
logger.info("starting metrics collection on jmx port : " + jmxPort, extra=d)
jmxUrl = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi"
clusterConfig = systemTestEnv.clusterEntityConfigDictList
metricsDefinitionFile = systemTestEnv.METRICS_PATHNAME
entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics")
dashboardsForRole = get_dashboard_definition(metricsDefinitionFile, role)
mbeansForRole = get_mbeans_for_role(dashboardsForRole)
kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "java_home")
for mbean in mbeansForRole:
outputCsvFile = entityMetricsDir + "/" + mbean + ".csv"
startMetricsCmdList = ["ssh " + jmxHost,
"'JAVA_HOME=" + javaHome,
"JMX_PORT= " + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool",
"--jmx-url " + jmxUrl,
"--object-name " + mbean + " 1> ",
outputCsvFile + " & echo pid:$! > ",
entityMetricsDir + "/entity_pid'"]
startMetricsCommand = " ".join(startMetricsCmdList)
logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
system_test_utils.async_sys_call(startMetricsCommand)
time.sleep(1)
pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid' 2> /dev/null"
logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
# keep track of JMX ppid in a dictionary of entity_id to list of JMX ppid
# testcaseEnv.entityJmxParentPidDict:
# key: entity_id
# val: list of JMX ppid associated to that entity_id
# { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... }
for line in subproc.stdout.readlines():
line = line.rstrip('\n')
logger.debug("line: [" + line + "]", extra=d)
if line.startswith("pid"):
logger.debug("found pid line: [" + line + "]", extra=d)
tokens = line.split(':')
thisPid = tokens[1]
if entityId not in testcaseEnv.entityJmxParentPidDict:
testcaseEnv.entityJmxParentPidDict[entityId] = []
testcaseEnv.entityJmxParentPidDict[entityId].append(thisPid)
#print "\n#### testcaseEnv.entityJmxParentPidDict ", testcaseEnv.entityJmxParentPidDict, "\n"
def stop_metrics_collection(jmxHost, jmxPort):
logger.info("stopping metrics collection on " + jmxHost + ":" + jmxPort, extra=d)
system_test_utils.sys_call("ps -ef | grep JmxTool | grep -v grep | grep " + jmxPort + " | awk '{print $2}' | xargs kill -9")
def get_mbeans_for_role(dashboardsForRole):
graphs = reduce(lambda x,y: x+y, map(lambda dashboard: dashboard['graphs'], dashboardsForRole))
return set(map(lambda metric: metric['bean_name'], graphs))