tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

test_output.py (1930B)


      1 #!/usr/bin/env python
      2 
      3 import io
      4 import os
      5 
      6 import mozunit
      7 import proctest
      8 from mozprocess import processhandler
      9 
     10 here = os.path.dirname(os.path.abspath(__file__))
     11 
     12 
     13 class ProcTestOutput(proctest.ProcTest):
     14    """Class to test operations related to output handling"""
     15 
     16    def test_process_output_twice(self):
     17        """
     18        Process is started, then processOutput is called a second time explicitly
     19        """
     20        p = processhandler.ProcessHandler(
     21            [self.python, self.proclaunch, "process_waittimeout_10s.ini"], cwd=here
     22        )
     23 
     24        p.run()
     25        p.processOutput(timeout=5)
     26        p.wait()
     27 
     28        self.determine_status(p, False, ())
     29 
     30    def test_process_output_nonewline(self):
     31        """
     32        Process is started, outputs data with no newline
     33        """
     34        p = processhandler.ProcessHandler(
     35            [self.python, os.path.join("scripts", "procnonewline.py")], cwd=here
     36        )
     37 
     38        p.run()
     39        p.processOutput(timeout=5)
     40        p.wait()
     41 
     42        self.determine_status(p, False, ())
     43 
     44    def test_stream_process_output(self):
     45        """
     46        Process output stream does not buffer
     47        """
     48        expected = "\n".join([str(n) for n in range(0, 10)])
     49 
     50        stream = io.BytesIO()
     51        buf = io.BufferedRandom(stream)
     52 
     53        p = processhandler.ProcessHandler(
     54            [self.python, os.path.join("scripts", "proccountfive.py")],
     55            cwd=here,
     56            stream=buf,
     57        )
     58 
     59        p.run()
     60        p.wait()
     61        for i in range(5, 10):
     62            stream.write(str(i).encode("utf8") + b"\n")
     63 
     64        buf.flush()
     65        self.assertEqual(stream.getvalue().strip().decode("utf8"), expected)
     66 
     67        # make sure mozprocess doesn't close the stream
     68        # since mozprocess didn't create it
     69        self.assertFalse(buf.closed)
     70        buf.close()
     71 
     72        self.determine_status(p, False, ())
     73 
     74 
     75 if __name__ == "__main__":
     76    mozunit.main()