|
1 #!/usr/bin/env python |
|
2 |
|
3 import io |
|
4 import os |
|
5 import unittest |
|
6 import proctest |
|
7 from mozprocess import processhandler |
|
8 |
|
9 here = os.path.dirname(os.path.abspath(__file__)) |
|
10 |
|
11 class ProcTestOutput(proctest.ProcTest): |
|
12 """ Class to test operations related to output handling """ |
|
13 |
|
14 def test_process_output_nonewline(self): |
|
15 """ |
|
16 Process is started, outputs data with no newline |
|
17 """ |
|
18 p = processhandler.ProcessHandler([self.python, "procnonewline.py"], |
|
19 cwd=here) |
|
20 |
|
21 p.run() |
|
22 p.processOutput(timeout=5) |
|
23 p.wait() |
|
24 |
|
25 detected, output = proctest.check_for_process("procnonewline.py") |
|
26 self.determine_status(detected, |
|
27 output, |
|
28 p.proc.returncode, |
|
29 p.didTimeout, |
|
30 False, |
|
31 ()) |
|
32 |
|
33 def test_stream_process_output(self): |
|
34 """ |
|
35 Process output stream does not buffer |
|
36 """ |
|
37 expected = '\n'.join([str(n) for n in range(0,10)]) |
|
38 |
|
39 stream = io.BytesIO() |
|
40 buf = io.BufferedRandom(stream) |
|
41 |
|
42 p = processhandler.ProcessHandler([self.python, "proccountfive.py"], |
|
43 cwd=here, |
|
44 stream=buf) |
|
45 |
|
46 p.run() |
|
47 p.wait() |
|
48 for i in range(5, 10): |
|
49 stream.write(str(i)+'\n') |
|
50 |
|
51 buf.flush() |
|
52 self.assertEquals(stream.getvalue().strip(), expected) |
|
53 |
|
54 # make sure mozprocess doesn't close the stream |
|
55 # since mozprocess didn't create it |
|
56 self.assertFalse(buf.closed) |
|
57 buf.close() |
|
58 |
|
59 detected, output = proctest.check_for_process("proccountfive.py") |
|
60 self.determine_status(detected, |
|
61 output, |
|
62 p.proc.returncode, |
|
63 p.didTimeout, |
|
64 False, |
|
65 ()) |
|
66 |
|
67 if __name__ == '__main__': |
|
68 unittest.main() |