return file_data, success
+ def get_file_fail(self, filename):
+ """Safely read a file and fail the test upon error."""
+ file_data = None
+
+ try:
+ with open(filename, "r") as output_file:
+ file_data = output_file.read()
+ except OSError:
+ self.failure_reason += " unable to read file {0}".format(filename)
+ self.passed = False
+
+ return file_data
+
def check_result(self):
"""Check fio job results."""
if 'json' not in self.output_format:
return
- file_data, success = self.get_file(os.path.join(self.test_dir, self.fio_output))
- if not success:
- self.failure_reason = "{0} unable to open output file,".format(self.failure_reason)
- self.passed = False
+ file_data = self.get_file_fail(os.path.join(self.test_dir, self.fio_output))
+ if not file_data:
return
#
for i in range(1, 4):
filename = os.path.join(self.test_dir, "{0}_iops.{1}.log".format(os.path.basename(
self.fio_job), i))
- file_data, success = self.get_file(filename)
-
- if not success:
- self.failure_reason = "{0} unable to open output file,".format(self.failure_reason)
- self.passed = False
+ file_data = self.get_file_fail(filename)
+ if not file_data:
return
iops_files.append(file_data.splitlines())
for i in range(1, 4):
filename = os.path.join(self.test_dir, "{0}_iops.{1}.log".format(os.path.basename(
self.fio_job), i))
- file_data, success = self.get_file(filename)
-
- if not success:
- self.failure_reason = "{0} unable to open output file,".format(self.failure_reason)
- self.passed = False
+ file_data = self.get_file_fail(filename)
+ if not file_data:
return
iops_files.append(file_data.splitlines())
super(FioJobTest_t0019, self).check_result()
bw_log_filename = os.path.join(self.test_dir, "test_bw.log")
- file_data, success = self.get_file(bw_log_filename)
- if not success:
- self.failure_reason += " unable to open output file {0}".format(bw_log_filename)
- self.passed = False
+ file_data = self.get_file_fail(bw_log_filename)
+ if not file_data:
return
log_lines = file_data.split('\n')
super(FioJobTest_t0020, self).check_result()
bw_log_filename = os.path.join(self.test_dir, "test_bw.log")
- file_data, success = self.get_file(bw_log_filename)
- if not success:
- self.failure_reason += " unable to open output file {0}".format(bw_log_filename)
- self.passed = False
+ file_data = self.get_file_fail(bw_log_filename)
+ if not file_data:
return
log_lines = file_data.split('\n')
super(FioJobTest_t0022, self).check_result()
bw_log_filename = os.path.join(self.test_dir, "test_bw.log")
- file_data, success = self.get_file(bw_log_filename)
- if not success:
- self.failure_reason += " unable to open output file {0}".format(bw_log_filename)
- self.passed = False
+ file_data = self.get_file_fail(bw_log_filename)
+ if not file_data:
return
log_lines = file_data.split('\n')
"""Make sure that trims are followed by writes of the same size at the same offset."""
bw_log_filename = os.path.join(self.test_dir, filename)
- file_data, success = self.get_file(bw_log_filename)
- if not success:
- self.failure_reason += " unable to open output file {0}".format(bw_log_filename)
- self.passed = False
+ file_data = self.get_file_fail(bw_log_filename)
+ if not file_data:
return
log_lines = file_data.split('\n')
def check_all_offsets(self, filename, sectorsize, filesize):
"""Make sure all offsets were touched."""
- file_data, success = self.get_file(os.path.join(self.test_dir, filename))
- if not success:
- self.passed = False
- self.failure_reason = " could not open {0}".format(filename)
+ file_data = self.get_file_fail(os.path.join(self.test_dir, filename))
+ if not file_data:
return
log_lines = file_data.split('\n')