tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

test_intelpowergadget.py (11165B)


      1 #!/usr/bin/env python
      2 
      3 import datetime
      4 import os
      5 import time
      6 from unittest import mock
      7 
      8 import mozunit
      9 import pytest
     10 from mozpower.intel_power_gadget import (
     11    IPGEmptyFileError,
     12    IPGMissingOutputFileError,
     13    IPGTimeoutError,
     14    IPGUnknownValueTypeError,
     15 )
     16 
     17 
     18 def test_ipg_pathsplitting(ipg_obj):
     19    """Tests that the output file path and prefix was properly split.
     20    This test assumes that it is in the same directory as the conftest.py file.
     21    """
     22    assert (
     23        ipg_obj.output_dir_path == os.path.abspath(os.path.dirname(__file__)) + "/files"
     24    )
     25    assert ipg_obj.output_file_prefix == "raptor-tp6-amazon-firefox_powerlog"
     26 
     27 
     28 def test_ipg_get_output_file_path(ipg_obj):
     29    """Tests that the output file path is constantly changing
     30    based on the file_counter value.
     31    """
     32    test_path = "/test_path/"
     33    test_ext = ".txt"
     34    ipg_obj._file_counter = 1
     35    ipg_obj._output_dir_path = test_path
     36    ipg_obj._output_file_ext = test_ext
     37 
     38    for i in range(1, 6):
     39        fpath = ipg_obj._get_output_file_path()
     40 
     41        assert fpath.startswith(test_path)
     42        assert fpath.endswith(test_ext)
     43        assert str(i) in fpath
     44 
     45 
     46 def test_ipg_start_and_stop(ipg_obj):
     47    """Tests that the IPG thread can start and stop properly."""
     48 
     49    def subprocess_side_effect(*args, **kwargs):
     50        time.sleep(1)
     51 
     52    with mock.patch("subprocess.check_output") as m:
     53        m.side_effect = subprocess_side_effect
     54 
     55        # Start recording IPG measurements
     56        ipg_obj.start_ipg()
     57        assert not ipg_obj._stop
     58 
     59        # Wait a bit for thread to start, then check it
     60        timeout = 10
     61        start = time.time()
     62        while time.time() - start < timeout and not ipg_obj._running:
     63            time.sleep(1)
     64 
     65        assert ipg_obj._running
     66        assert ipg_obj._thread.is_alive()
     67 
     68        # Stop recording IPG measurements
     69        ipg_obj.stop_ipg(wait_interval=1, timeout=30)
     70        assert ipg_obj._stop
     71        assert not ipg_obj._running
     72 
     73 
     74 def test_ipg_stopping_timeout(ipg_obj):
     75    """Tests that an IPGTimeoutError is raised when
     76    the thread is still "running" and the wait in _wait_for_ipg
     77    has exceeded the timeout value.
     78    """
     79    with pytest.raises(IPGTimeoutError):
     80        ipg_obj._running = True
     81        ipg_obj._wait_for_ipg(wait_interval=1, timeout=2)
     82 
     83 
     84 def test_ipg_rh_combine_cumulatives(ipg_rh_obj):
     85    """Tests that cumulatives are correctly combined in
     86    the _combine_cumulative_rows function.
     87    """
     88    cumulatives_to_combine = [
     89        [0, 1, 2, 3, 4, 5],
     90        [0, 1, 2, 3, 4, 5],
     91        [0, 1, 2, 3, 4, 5],
     92        [0, 1, 2, 3, 4, 5],
     93    ]
     94 
     95    combined_cumulatives = ipg_rh_obj._combine_cumulative_rows(cumulatives_to_combine)
     96 
     97    # Check that accumulation worked, final value must be the maximum
     98    assert combined_cumulatives[-1] == max(combined_cumulatives)
     99 
    100    # Check that the cumulative values are monotonically increasing
    101    for count, val in enumerate(combined_cumulatives[:-1]):
    102        assert combined_cumulatives[count + 1] - val >= 0
    103 
    104 
    105 def test_ipg_rh_clean_file(ipg_rh_obj):
    106    """Tests that IPGResultsHandler correctly cleans the data
    107    from one file.
    108    """
    109    file = ipg_rh_obj._output_files[0]
    110    linecount = 0
    111    with open(file) as f:
    112        for line in f:
    113            linecount += 1
    114 
    115    results, summary, clean_file = ipg_rh_obj._clean_ipg_file(file)
    116 
    117    # Check that each measure from the csv header
    118    # is in the results dict and that the clean file output
    119    # exists.
    120    for measure in results:
    121        assert measure in ipg_rh_obj._csv_header
    122    assert os.path.exists(clean_file)
    123 
    124    clean_rows = []
    125    with open(clean_file) as f:
    126        for line in f:
    127            if line.strip():
    128                clean_rows.append(line)
    129 
    130    # Make sure that the results and summary entries
    131    # have the expected lengths.
    132    for measure in results:
    133        # Add 6 for new lines that were removed
    134        assert len(results[measure]) + len(summary) + 6 == linecount
    135        # Subtract 1 for the csv header
    136        assert len(results[measure]) == len(clean_rows) - 1
    137 
    138 
    139 def test_ipg_rh_clean_ipg_data_no_files(ipg_rh_obj):
    140    """Tests that IPGResultsHandler correctly handles the case
    141    when no output files exist.
    142    """
    143    ipg_rh_obj._output_files = []
    144    clean_data = ipg_rh_obj.clean_ipg_data()
    145    assert clean_data is None
    146 
    147 
    148 def test_ipg_rh_clean_ipg_data(ipg_rh_obj):
    149    """Tests that IPGResultsHandler correctly handles cleaning
    150    all known files and that the results and the merged output
    151    are correct.
    152    """
    153    clean_data = ipg_rh_obj.clean_ipg_data()
    154    clean_files = ipg_rh_obj.cleaned_files
    155    merged_output_path = ipg_rh_obj.merged_output_path
    156 
    157    # Check that the expected output exists
    158    assert clean_data is not None
    159    assert len(clean_files) == len(ipg_rh_obj._output_files)
    160    assert os.path.exists(merged_output_path)
    161 
    162    # Check that the merged file length and results length
    163    # is correct, and that no lines were lost and no extra lines
    164    # were added.
    165    expected_merged_line_count = 0
    166    for file in clean_files:
    167        with open(file) as f:
    168            for count, line in enumerate(f):
    169                if count == 0:
    170                    continue
    171                if line.strip():
    172                    expected_merged_line_count += 1
    173 
    174    merged_line_count = 0
    175    with open(merged_output_path) as f:
    176        for count, line in enumerate(f):
    177            if count == 0:
    178                continue
    179            if line.strip():
    180                merged_line_count += 1
    181 
    182    assert merged_line_count == expected_merged_line_count
    183    for measure in clean_data:
    184        assert len(clean_data[measure]) == merged_line_count
    185 
    186    # Check that the clean data rows are ordered in increasing time
    187    times_in_seconds = []
    188    for sys_time in clean_data["System Time"]:
    189        split_sys_time = sys_time.split(":")
    190        hour_min_sec = ":".join(split_sys_time[:-1])
    191        millis = float(split_sys_time[-1]) / 1000
    192 
    193        timestruct = time.strptime(hour_min_sec, "%H:%M:%S")
    194        times_in_seconds.append(
    195            datetime.timedelta(
    196                hours=timestruct.tm_hour,
    197                minutes=timestruct.tm_min,
    198                seconds=timestruct.tm_sec,
    199            ).total_seconds()
    200            + millis
    201        )
    202 
    203    for count, val in enumerate(times_in_seconds[:-1]):
    204        assert times_in_seconds[count + 1] - val >= 0
    205 
    206 
    207 def test_ipg_rh_format_to_perfherder_with_no_results(ipg_rh_obj):
    208    """Tests that formatting the data to a perfherder-like format
    209    fails when clean_ipg_data was not called beforehand.
    210    """
    211    formatted_data = ipg_rh_obj.format_ipg_data_to_partial_perfherder(
    212        1000, ipg_rh_obj._output_file_prefix
    213    )
    214    assert formatted_data is None
    215 
    216 
    217 def test_ipg_rh_format_to_perfherder_without_cutoff(ipg_rh_obj):
    218    """Tests that formatting the data to a perfherder-like format
    219    works as expected.
    220    """
    221    ipg_rh_obj.clean_ipg_data()
    222    formatted_data = ipg_rh_obj.format_ipg_data_to_partial_perfherder(
    223        1000, ipg_rh_obj._output_file_prefix
    224    )
    225 
    226    # Check that the expected entries exist
    227    assert len(formatted_data.keys()) == 5
    228    assert "utilization" in formatted_data and "power-usage" in formatted_data
    229 
    230    assert (
    231        formatted_data["power-usage"]["test"]
    232        == ipg_rh_obj._output_file_prefix + "-cumulative"
    233    )
    234    assert (
    235        formatted_data["utilization"]["test"]
    236        == ipg_rh_obj._output_file_prefix + "-utilization"
    237    )
    238    assert (
    239        formatted_data["frequency-gpu"]["test"]
    240        == ipg_rh_obj._output_file_prefix + "-frequency-gpu"
    241    )
    242    assert (
    243        formatted_data["frequency-cpu"]["test"]
    244        == ipg_rh_obj._output_file_prefix + "-frequency-cpu"
    245    )
    246    assert (
    247        formatted_data["power-watts"]["test"]
    248        == ipg_rh_obj._output_file_prefix + "-watts"
    249    )
    250 
    251    for measure in formatted_data:
    252        # Make sure that the data exists
    253        assert len(formatted_data[measure]["values"]) >= 1
    254 
    255        for valkey in formatted_data[measure]["values"]:
    256            # Make sure the names were simplified
    257            assert "(" not in valkey
    258            assert ")" not in valkey
    259 
    260    # Check that gpu utilization doesn't exist but cpu does
    261    utilization_vals = formatted_data["utilization"]["values"]
    262    assert "cpu" in utilization_vals
    263    assert "gpu" not in utilization_vals
    264 
    265    expected_fields = ["processor-cores", "processor-package", "gpu", "dram"]
    266    consumption_vals = formatted_data["power-usage"]["values"]
    267 
    268    consumption_vals_measures = list(consumption_vals.keys())
    269 
    270    # This assertion ensures that the consumption values contain the expected
    271    # fields and nothing more.
    272    assert not list(set(consumption_vals_measures) - set(expected_fields))
    273 
    274 
    275 def test_ipg_rh_format_to_perfherder_with_cutoff(ipg_rh_obj):
    276    """Tests that formatting the data to a perfherder-like format
    277    works as expected.
    278    """
    279    ipg_rh_obj.clean_ipg_data()
    280    formatted_data = ipg_rh_obj.format_ipg_data_to_partial_perfherder(
    281        2.5, ipg_rh_obj._output_file_prefix
    282    )
    283 
    284    # Check that the formatted data was cutoff at the correct point,
    285    # expecting that only the first row of merged will exist.
    286    utilization_vals = formatted_data["utilization"]["values"]
    287    assert utilization_vals["cpu"] == 14
    288 
    289    # Expected vals are ordered in this way: [processor, cores, dram, gpu]
    290    expected_vals = [6.517, 5.847, 0.244, 0.006]
    291    consumption_vals = [
    292        formatted_data["power-usage"]["values"][measure]
    293        for measure in formatted_data["power-usage"]["values"]
    294    ]
    295    assert not list(set(expected_vals) - set(consumption_vals))
    296 
    297 
    298 def test_ipg_rh_missingoutputfile(ipg_rh_obj):
    299    """Tests that the IPGMissingOutputFileError is raised
    300    when a bad file path is passed to _clean_ipg_file.
    301    """
    302    bad_files = ["non-existent-file"]
    303    with pytest.raises(IPGMissingOutputFileError):
    304        ipg_rh_obj._clean_ipg_file(bad_files[0])
    305 
    306    ipg_rh_obj._output_files = bad_files
    307    with pytest.raises(IPGMissingOutputFileError):
    308        ipg_rh_obj.clean_ipg_data()
    309 
    310 
    311 def test_ipg_rh_emptyfile(ipg_rh_obj):
    312    """Tests that the empty file error is raised when
    313    a file exists, but does not contain any results in
    314    it.
    315    """
    316    base_path = os.path.abspath(os.path.dirname(__file__)) + "/files/"
    317    bad_files = [base_path + "emptyfile.txt"]
    318    with pytest.raises(IPGEmptyFileError):
    319        ipg_rh_obj._clean_ipg_file(bad_files[0])
    320 
    321    ipg_rh_obj._output_files = bad_files
    322    with pytest.raises(IPGEmptyFileError):
    323        ipg_rh_obj.clean_ipg_data()
    324 
    325 
    326 def test_ipg_rh_valuetypeerrorfile(ipg_rh_obj):
    327    """Tests that the IPGUnknownValueTypeError is raised
    328    when a bad entry is encountered in a file that is cleaned.
    329    """
    330    base_path = os.path.abspath(os.path.dirname(__file__)) + "/files/"
    331    bad_files = [base_path + "valueerrorfile.txt"]
    332    with pytest.raises(IPGUnknownValueTypeError):
    333        ipg_rh_obj._clean_ipg_file(bad_files[0])
    334 
    335    ipg_rh_obj._output_files = bad_files
    336    with pytest.raises(IPGUnknownValueTypeError):
    337        ipg_rh_obj.clean_ipg_data()
    338 
    339 
    340 if __name__ == "__main__":
    341    mozunit.main()