| 1 | #!/usr/bin/python2.7 |
| 2 | """ |
| 3 | Utility for converting *_clat_hist* files generated by fio into latency statistics. |
| 4 | |
| 5 | Example usage: |
| 6 | |
| 7 | $ fiologparser_hist.py *_clat_hist* |
| 8 | end-time, samples, min, avg, median, 90%, 95%, 99%, max |
| 9 | 1000, 15, 192, 1678.107, 1788.859, 1856.076, 1880.040, 1899.208, 1888.000 |
| 10 | 2000, 43, 152, 1642.368, 1714.099, 1816.659, 1845.552, 1888.131, 1888.000 |
| 11 | 4000, 39, 1152, 1546.962, 1545.785, 1627.192, 1640.019, 1691.204, 1744 |
| 12 | ... |
| 13 | |
| 14 | @author Karl Cronburg <karl.cronburg@gmail.com> |
| 15 | """ |
| 16 | import os |
| 17 | import sys |
| 18 | import pandas |
| 19 | import numpy as np |
| 20 | |
| 21 | err = sys.stderr.write |
| 22 | |
| 23 | def weighted_percentile(percs, vs, ws): |
| 24 | """ Use linear interpolation to calculate the weighted percentile. |
| 25 | |
| 26 | Value and weight arrays are first sorted by value. The cumulative |
| 27 | distribution function (cdf) is then computed, after which np.interp |
| 28 | finds the two values closest to our desired weighted percentile(s) |
| 29 | and linearly interpolates them. |
| 30 | |
| 31 | percs :: List of percentiles we want to calculate |
| 32 | vs :: Array of values we are computing the percentile of |
| 33 | ws :: Array of weights for our corresponding values |
| 34 | return :: Array of percentiles |
| 35 | """ |
| 36 | idx = np.argsort(vs) |
| 37 | vs, ws = vs[idx], ws[idx] # weights and values sorted by value |
| 38 | cdf = 100 * (ws.cumsum() - ws / 2.0) / ws.sum() |
| 39 | return np.interp(percs, cdf, vs) # linear interpolation |
| 40 | |
| 41 | def weights(start_ts, end_ts, start, end): |
| 42 | """ Calculate weights based on fraction of sample falling in the |
| 43 | given interval [start,end]. Weights computed using vector / array |
| 44 | computation instead of for-loops. |
| 45 | |
| 46 | Note that samples with zero time length are effectively ignored |
| 47 | (we set their weight to zero). |
| 48 | |
| 49 | start_ts :: Array of start times for a set of samples |
| 50 | end_ts :: Array of end times for a set of samples |
| 51 | start :: int |
| 52 | end :: int |
| 53 | return :: Array of weights |
| 54 | """ |
| 55 | sbounds = np.maximum(start_ts, start).astype(float) |
| 56 | ebounds = np.minimum(end_ts, end).astype(float) |
| 57 | ws = (ebounds - sbounds) / (end_ts - start_ts) |
| 58 | if np.any(np.isnan(ws)): |
| 59 | err("WARNING: zero-length sample(s) detected. Log file corrupt" |
| 60 | " / bad time values? Ignoring these samples.\n") |
| 61 | ws[np.where(np.isnan(ws))] = 0.0; |
| 62 | return ws |
| 63 | |
| 64 | def weighted_average(vs, ws): |
| 65 | return np.sum(vs * ws) / np.sum(ws) |
| 66 | |
| 67 | columns = ["end-time", "samples", "min", "avg", "median", "90%", "95%", "99%", "max"] |
| 68 | percs = [50, 90, 95, 99] |
| 69 | |
| 70 | def fmt_float_list(ctx, num=1): |
| 71 | """ Return a comma separated list of float formatters to the required number |
| 72 | of decimal places. For instance: |
| 73 | |
| 74 | fmt_float_list(ctx.decimals=4, num=3) == "%.4f, %.4f, %.4f" |
| 75 | """ |
| 76 | return ', '.join(["%%.%df" % ctx.decimals] * num) |
| 77 | |
| 78 | # Default values - see beginning of main() for how we detect number columns in |
| 79 | # the input files: |
| 80 | __HIST_COLUMNS = 1216 |
| 81 | __NON_HIST_COLUMNS = 3 |
| 82 | __TOTAL_COLUMNS = __HIST_COLUMNS + __NON_HIST_COLUMNS |
| 83 | |
| 84 | def read_chunk(rdr, sz): |
| 85 | """ Read the next chunk of size sz from the given reader. """ |
| 86 | try: |
| 87 | """ StopIteration occurs when the pandas reader is empty, and AttributeError |
| 88 | occurs if rdr is None due to the file being empty. """ |
| 89 | new_arr = rdr.read().values |
| 90 | except (StopIteration, AttributeError): |
| 91 | return None |
| 92 | |
| 93 | """ Extract array of just the times, and histograms matrix without times column. """ |
| 94 | times, rws, szs = new_arr[:,0], new_arr[:,1], new_arr[:,2] |
| 95 | hists = new_arr[:,__NON_HIST_COLUMNS:] |
| 96 | times = times.reshape((len(times),1)) |
| 97 | arr = np.append(times, hists, axis=1) |
| 98 | |
| 99 | return arr |
| 100 | |
| 101 | def get_min(fps, arrs): |
| 102 | """ Find the file with the current first row with the smallest start time """ |
| 103 | return min([fp for fp in fps if not arrs[fp] is None], key=lambda fp: arrs.get(fp)[0][0]) |
| 104 | |
| 105 | def histogram_generator(ctx, fps, sz): |
| 106 | |
| 107 | # Create a chunked pandas reader for each of the files: |
| 108 | rdrs = {} |
| 109 | for fp in fps: |
| 110 | try: |
| 111 | rdrs[fp] = pandas.read_csv(fp, dtype=int, header=None, chunksize=sz) |
| 112 | except ValueError as e: |
| 113 | if e.message == 'No columns to parse from file': |
| 114 | if ctx.warn: sys.stderr.write("WARNING: Empty input file encountered.\n") |
| 115 | rdrs[fp] = None |
| 116 | else: |
| 117 | raise(e) |
| 118 | |
| 119 | # Initial histograms from disk: |
| 120 | arrs = {fp: read_chunk(rdr, sz) for fp,rdr in rdrs.items()} |
| 121 | while True: |
| 122 | |
| 123 | try: |
| 124 | """ ValueError occurs when nothing more to read """ |
| 125 | fp = get_min(fps, arrs) |
| 126 | except ValueError: |
| 127 | return |
| 128 | arr = arrs[fp] |
| 129 | yield np.insert(arr[0], 1, fps.index(fp)) |
| 130 | arrs[fp] = arr[1:] |
| 131 | |
| 132 | if arrs[fp].shape[0] == 0: |
| 133 | arrs[fp] = read_chunk(rdrs[fp], sz) |
| 134 | |
| 135 | def _plat_idx_to_val(idx, edge=0.5, FIO_IO_U_PLAT_BITS=6, FIO_IO_U_PLAT_VAL=64): |
| 136 | """ Taken from fio's stat.c for calculating the latency value of a bin |
| 137 | from that bin's index. |
| 138 | |
| 139 | idx : the value of the index into the histogram bins |
| 140 | edge : fractional value in the range [0,1]** indicating how far into |
| 141 | the bin we wish to compute the latency value of. |
| 142 | |
| 143 | ** edge = 0.0 and 1.0 computes the lower and upper latency bounds |
| 144 | respectively of the given bin index. """ |
| 145 | |
| 146 | # MSB <= (FIO_IO_U_PLAT_BITS-1), cannot be rounded off. Use |
| 147 | # all bits of the sample as index |
| 148 | if (idx < (FIO_IO_U_PLAT_VAL << 1)): |
| 149 | return idx |
| 150 | |
| 151 | # Find the group and compute the minimum value of that group |
| 152 | error_bits = (idx >> FIO_IO_U_PLAT_BITS) - 1 |
| 153 | base = 1 << (error_bits + FIO_IO_U_PLAT_BITS) |
| 154 | |
| 155 | # Find its bucket number of the group |
| 156 | k = idx % FIO_IO_U_PLAT_VAL |
| 157 | |
| 158 | # Return the mean (if edge=0.5) of the range of the bucket |
| 159 | return base + ((k + edge) * (1 << error_bits)) |
| 160 | |
| 161 | def plat_idx_to_val_coarse(idx, coarseness, edge=0.5): |
| 162 | """ Converts the given *coarse* index into a non-coarse index as used by fio |
| 163 | in stat.h:plat_idx_to_val(), subsequently computing the appropriate |
| 164 | latency value for that bin. |
| 165 | """ |
| 166 | |
| 167 | # Multiply the index by the power of 2 coarseness to get the bin |
| 168 | # bin index with a max of 1536 bins (FIO_IO_U_PLAT_GROUP_NR = 24 in stat.h) |
| 169 | stride = 1 << coarseness |
| 170 | idx = idx * stride |
| 171 | lower = _plat_idx_to_val(idx, edge=0.0) |
| 172 | upper = _plat_idx_to_val(idx + stride, edge=1.0) |
| 173 | return lower + (upper - lower) * edge |
| 174 | |
| 175 | def print_all_stats(ctx, end, mn, ss_cnt, vs, ws, mx): |
| 176 | ps = weighted_percentile(percs, vs, ws) |
| 177 | |
| 178 | avg = weighted_average(vs, ws) |
| 179 | values = [mn, avg] + list(ps) + [mx] |
| 180 | row = [end, ss_cnt] + map(lambda x: float(x) / ctx.divisor, values) |
| 181 | fmt = "%d, %d, %d, " + fmt_float_list(ctx, 5) + ", %d" |
| 182 | print (fmt % tuple(row)) |
| 183 | |
| 184 | def update_extreme(val, fncn, new_val): |
| 185 | """ Calculate min / max in the presence of None values """ |
| 186 | if val is None: return new_val |
| 187 | else: return fncn(val, new_val) |
| 188 | |
| 189 | # See beginning of main() for how bin_vals are computed |
| 190 | bin_vals = [] |
| 191 | lower_bin_vals = [] # lower edge of each bin |
| 192 | upper_bin_vals = [] # upper edge of each bin |
| 193 | |
| 194 | def process_interval(ctx, samples, iStart, iEnd): |
| 195 | """ Construct the weighted histogram for the given interval by scanning |
| 196 | through all the histograms and figuring out which of their bins have |
| 197 | samples with latencies which overlap with the given interval |
| 198 | [iStart,iEnd]. |
| 199 | """ |
| 200 | |
| 201 | times, files, hists = samples[:,0], samples[:,1], samples[:,2:] |
| 202 | iHist = np.zeros(__HIST_COLUMNS) |
| 203 | ss_cnt = 0 # number of samples affecting this interval |
| 204 | mn_bin_val, mx_bin_val = None, None |
| 205 | |
| 206 | for end_time,file,hist in zip(times,files,hists): |
| 207 | |
| 208 | # Only look at bins of the current histogram sample which |
| 209 | # started before the end of the current time interval [start,end] |
| 210 | start_times = (end_time - 0.5 * ctx.interval) - bin_vals / 1000.0 |
| 211 | idx = np.where(start_times < iEnd) |
| 212 | s_ts, l_bvs, u_bvs, hs = start_times[idx], lower_bin_vals[idx], upper_bin_vals[idx], hist[idx] |
| 213 | |
| 214 | # Increment current interval histogram by weighted values of future histogram: |
| 215 | ws = hs * weights(s_ts, end_time, iStart, iEnd) |
| 216 | iHist[idx] += ws |
| 217 | |
| 218 | # Update total number of samples affecting current interval histogram: |
| 219 | ss_cnt += np.sum(hs) |
| 220 | |
| 221 | # Update min and max bin values seen if necessary: |
| 222 | idx = np.where(hs != 0)[0] |
| 223 | if idx.size > 0: |
| 224 | mn_bin_val = update_extreme(mn_bin_val, min, l_bvs[max(0, idx[0] - 1)]) |
| 225 | mx_bin_val = update_extreme(mx_bin_val, max, u_bvs[min(len(hs) - 1, idx[-1] + 1)]) |
| 226 | |
| 227 | if ss_cnt > 0: print_all_stats(ctx, iEnd, mn_bin_val, ss_cnt, bin_vals, iHist, mx_bin_val) |
| 228 | |
| 229 | def guess_max_from_bins(ctx, hist_cols): |
| 230 | """ Try to guess the GROUP_NR from given # of histogram |
| 231 | columns seen in an input file """ |
| 232 | max_coarse = 8 |
| 233 | if ctx.group_nr < 19 or ctx.group_nr > 26: |
| 234 | bins = [ctx.group_nr * (1 << 6)] |
| 235 | else: |
| 236 | bins = [1216,1280,1344,1408,1472,1536,1600,1664] |
| 237 | coarses = range(max_coarse + 1) |
| 238 | fncn = lambda z: list(map(lambda x: z/2**x if z % 2**x == 0 else -10, coarses)) |
| 239 | |
| 240 | arr = np.transpose(list(map(fncn, bins))) |
| 241 | idx = np.where(arr == hist_cols) |
| 242 | if len(idx[1]) == 0: |
| 243 | table = repr(arr.astype(int)).replace('-10', 'N/A').replace('array',' ') |
| 244 | err("Unable to determine bin values from input clat_hist files. Namely \n" |
| 245 | "the first line of file '%s' " % ctx.FILE[0] + "has %d \n" % (__TOTAL_COLUMNS,) + |
| 246 | "columns of which we assume %d " % (hist_cols,) + "correspond to histogram bins. \n" |
| 247 | "This number needs to be equal to one of the following numbers:\n\n" |
| 248 | + table + "\n\n" |
| 249 | "Possible reasons and corresponding solutions:\n" |
| 250 | " - Input file(s) does not contain histograms.\n" |
| 251 | " - You recompiled fio with a different GROUP_NR. If so please specify this\n" |
| 252 | " new GROUP_NR on the command line with --group_nr\n") |
| 253 | exit(1) |
| 254 | return bins[idx[1][0]] |
| 255 | |
| 256 | def main(ctx): |
| 257 | |
| 258 | if ctx.job_file: |
| 259 | try: |
| 260 | from configparser import SafeConfigParser, NoOptionError |
| 261 | except ImportError: |
| 262 | from ConfigParser import SafeConfigParser, NoOptionError |
| 263 | |
| 264 | cp = SafeConfigParser(allow_no_value=True) |
| 265 | with open(ctx.job_file, 'r') as fp: |
| 266 | cp.readfp(fp) |
| 267 | |
| 268 | if ctx.interval is None: |
| 269 | # Auto detect --interval value |
| 270 | for s in cp.sections(): |
| 271 | try: |
| 272 | hist_msec = cp.get(s, 'log_hist_msec') |
| 273 | if hist_msec is not None: |
| 274 | ctx.interval = int(hist_msec) |
| 275 | except NoOptionError: |
| 276 | pass |
| 277 | |
| 278 | if ctx.interval is None: |
| 279 | ctx.interval = 1000 |
| 280 | |
| 281 | # Automatically detect how many columns are in the input files, |
| 282 | # calculate the corresponding 'coarseness' parameter used to generate |
| 283 | # those files, and calculate the appropriate bin latency values: |
| 284 | with open(ctx.FILE[0], 'r') as fp: |
| 285 | global bin_vals,lower_bin_vals,upper_bin_vals,__HIST_COLUMNS,__TOTAL_COLUMNS |
| 286 | __TOTAL_COLUMNS = len(fp.readline().split(',')) |
| 287 | __HIST_COLUMNS = __TOTAL_COLUMNS - __NON_HIST_COLUMNS |
| 288 | |
| 289 | max_cols = guess_max_from_bins(ctx, __HIST_COLUMNS) |
| 290 | coarseness = int(np.log2(float(max_cols) / __HIST_COLUMNS)) |
| 291 | bin_vals = np.array(map(lambda x: plat_idx_to_val_coarse(x, coarseness), np.arange(__HIST_COLUMNS)), dtype=float) |
| 292 | lower_bin_vals = np.array(map(lambda x: plat_idx_to_val_coarse(x, coarseness, 0.0), np.arange(__HIST_COLUMNS)), dtype=float) |
| 293 | upper_bin_vals = np.array(map(lambda x: plat_idx_to_val_coarse(x, coarseness, 1.0), np.arange(__HIST_COLUMNS)), dtype=float) |
| 294 | |
| 295 | fps = [open(f, 'r') for f in ctx.FILE] |
| 296 | gen = histogram_generator(ctx, fps, ctx.buff_size) |
| 297 | |
| 298 | print(', '.join(columns)) |
| 299 | |
| 300 | try: |
| 301 | start, end = 0, ctx.interval |
| 302 | arr = np.empty(shape=(0,__TOTAL_COLUMNS - 1)) |
| 303 | more_data = True |
| 304 | while more_data or len(arr) > 0: |
| 305 | |
| 306 | # Read up to ctx.max_latency (default 20 seconds) of data from end of current interval. |
| 307 | while len(arr) == 0 or arr[-1][0] < ctx.max_latency * 1000 + end: |
| 308 | try: |
| 309 | new_arr = next(gen) |
| 310 | except StopIteration: |
| 311 | more_data = False |
| 312 | break |
| 313 | arr = np.append(arr, new_arr.reshape((1,__TOTAL_COLUMNS - 1)), axis=0) |
| 314 | arr = arr.astype(int) |
| 315 | |
| 316 | if arr.size > 0: |
| 317 | # Jump immediately to the start of the input, rounding |
| 318 | # down to the nearest multiple of the interval (useful when --log_unix_epoch |
| 319 | # was used to create these histograms): |
| 320 | if start == 0 and arr[0][0] - ctx.max_latency > end: |
| 321 | start = arr[0][0] - ctx.max_latency |
| 322 | start = start - (start % ctx.interval) |
| 323 | end = start + ctx.interval |
| 324 | |
| 325 | process_interval(ctx, arr, start, end) |
| 326 | |
| 327 | # Update arr to throw away samples we no longer need - samples which |
| 328 | # end before the start of the next interval, i.e. the end of the |
| 329 | # current interval: |
| 330 | idx = np.where(arr[:,0] > end) |
| 331 | arr = arr[idx] |
| 332 | |
| 333 | start += ctx.interval |
| 334 | end = start + ctx.interval |
| 335 | finally: |
| 336 | map(lambda f: f.close(), fps) |
| 337 | |
| 338 | |
| 339 | if __name__ == '__main__': |
| 340 | import argparse |
| 341 | p = argparse.ArgumentParser() |
| 342 | arg = p.add_argument |
| 343 | arg("FILE", help='space separated list of latency log filenames', nargs='+') |
| 344 | arg('--buff_size', |
| 345 | default=10000, |
| 346 | type=int, |
| 347 | help='number of samples to buffer into numpy at a time') |
| 348 | |
| 349 | arg('--max_latency', |
| 350 | default=20, |
| 351 | type=float, |
| 352 | help='number of seconds of data to process at a time') |
| 353 | |
| 354 | arg('-i', '--interval', |
| 355 | type=int, |
| 356 | help='interval width (ms), default 1000 ms') |
| 357 | |
| 358 | arg('-d', '--divisor', |
| 359 | required=False, |
| 360 | type=int, |
| 361 | default=1, |
| 362 | help='divide the results by this value.') |
| 363 | |
| 364 | arg('--decimals', |
| 365 | default=3, |
| 366 | type=int, |
| 367 | help='number of decimal places to print floats to') |
| 368 | |
| 369 | arg('--warn', |
| 370 | dest='warn', |
| 371 | action='store_true', |
| 372 | default=False, |
| 373 | help='print warning messages to stderr') |
| 374 | |
| 375 | arg('--group_nr', |
| 376 | default=29, |
| 377 | type=int, |
| 378 | help='FIO_IO_U_PLAT_GROUP_NR as defined in stat.h') |
| 379 | |
| 380 | arg('--job-file', |
| 381 | default=None, |
| 382 | type=str, |
| 383 | help='Optional argument pointing to the job file used to create the ' |
| 384 | 'given histogram files. Useful for auto-detecting --log_hist_msec and ' |
| 385 | '--log_unix_epoch (in fio) values.') |
| 386 | |
| 387 | main(p.parse_args()) |
| 388 | |