Update test/3rdparty with the packages required to run the test suite in parallel mode. Change the short command line flag to "-j", matching make.

--HG--
rename : test/3rdparty/testscenarios-0.2/.bzrignore => test/3rdparty/testscenarios-0.4/.bzrignore
rename : test/3rdparty/testscenarios-0.2/Apache-2.0 => test/3rdparty/testscenarios-0.4/Apache-2.0
rename : test/3rdparty/testscenarios-0.2/BSD => test/3rdparty/testscenarios-0.4/BSD
rename : test/3rdparty/testscenarios-0.2/COPYING => test/3rdparty/testscenarios-0.4/COPYING
rename : test/3rdparty/testscenarios-0.2/GOALS => test/3rdparty/testscenarios-0.4/GOALS
rename : test/3rdparty/testscenarios-0.2/HACKING => test/3rdparty/testscenarios-0.4/HACKING
rename : test/3rdparty/testscenarios-0.2/MANIFEST.in => test/3rdparty/testscenarios-0.4/MANIFEST.in
rename : test/3rdparty/testscenarios-0.2/Makefile => test/3rdparty/testscenarios-0.4/Makefile
rename : test/3rdparty/testscenarios-0.2/doc/__init__.py => test/3rdparty/testscenarios-0.4/doc/__init__.py
rename : test/3rdparty/testscenarios-0.2/doc/example.py => test/3rdparty/testscenarios-0.4/doc/example.py
rename : test/3rdparty/testscenarios-0.2/doc/test_sample.py => test/3rdparty/testscenarios-0.4/doc/test_sample.py
rename : test/3rdparty/testtools-0.9.12/doc/conf.py => test/3rdparty/testtools-0.9.34/doc/conf.py
rename : test/3rdparty/testtools-0.9.12/doc/make.bat => test/3rdparty/testtools-0.9.34/doc/make.bat
rename : test/3rdparty/testtools-0.9.12/testtools/_compat2x.py => test/3rdparty/testtools-0.9.34/testtools/_compat2x.py
rename : test/3rdparty/testtools-0.9.12/testtools/_spinner.py => test/3rdparty/testtools-0.9.34/testtools/_spinner.py
rename : test/3rdparty/testtools-0.9.12/testtools/distutilscmd.py => test/3rdparty/testtools-0.9.34/testtools/distutilscmd.py
rename : test/3rdparty/testtools-0.9.12/testtools/monkey.py => test/3rdparty/testtools-0.9.34/testtools/monkey.py
rename : test/3rdparty/testtools-0.9.12/testtools/tests/test_monkey.py => test/3rdparty/testtools-0.9.34/testtools/tests/test_monkey.py
rename : test/3rdparty/testtools-0.9.12/testtools/tests/test_runtest.py => test/3rdparty/testtools-0.9.34/testtools/tests/test_runtest.py
rename : test/3rdparty/testtools-0.9.12/testtools/utils.py => test/3rdparty/testtools-0.9.34/testtools/utils.py
This commit is contained in:
Michael Cahill
2013-12-20 10:27:28 +11:00
parent e2fcf1a851
commit fa69e2a994
194 changed files with 24285 additions and 5494 deletions

View File

@@ -0,0 +1,63 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
import sys
from unittest import TestLoader
# Before the test module imports to avoid circularity.
# For testing: different pythons have different str() implementations.
if sys.version_info > (3, 0):
_remote_exception_repr = "testtools.testresult.real._StringException"
_remote_exception_str = "Traceback (most recent call last):\ntesttools.testresult.real._StringException"
_remote_exception_str_chunked = "57\r\n" + _remote_exception_str + ": boo qux\n0\r\n"
else:
_remote_exception_repr = "_StringException"
_remote_exception_str = "Traceback (most recent call last):\n_StringException"
_remote_exception_str_chunked = "3D\r\n" + _remote_exception_str + ": boo qux\n0\r\n"
from subunit.tests import (
test_chunked,
test_details,
test_filters,
test_progress_model,
test_run,
test_subunit_filter,
test_subunit_stats,
test_subunit_tags,
test_tap2subunit,
test_test_protocol,
test_test_protocol2,
test_test_results,
)
def test_suite():
loader = TestLoader()
result = loader.loadTestsFromModule(test_chunked)
result.addTest(loader.loadTestsFromModule(test_details))
result.addTest(loader.loadTestsFromModule(test_filters))
result.addTest(loader.loadTestsFromModule(test_progress_model))
result.addTest(loader.loadTestsFromModule(test_test_results))
result.addTest(loader.loadTestsFromModule(test_test_protocol))
result.addTest(loader.loadTestsFromModule(test_test_protocol2))
result.addTest(loader.loadTestsFromModule(test_tap2subunit))
result.addTest(loader.loadTestsFromModule(test_subunit_filter))
result.addTest(loader.loadTestsFromModule(test_subunit_tags))
result.addTest(loader.loadTestsFromModule(test_subunit_stats))
result.addTest(loader.loadTestsFromModule(test_run))
return result

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python
import sys
if sys.platform == "win32":
import msvcrt, os
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
if len(sys.argv) == 2:
# subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args
# uses this code path to be sure that the arguments were passed to
# sample-script.py
print("test fail")
print("error fail")
sys.exit(0)
print("test old mcdonald")
print("success old mcdonald")
print("test bing crosby")
print("failure bing crosby [")
print("foo.c:53:ERROR invalid state")
print("]")
print("test an error")
print("error an error")
sys.exit(0)

View File

@@ -0,0 +1,7 @@
#!/usr/bin/env python
import sys
print("test old mcdonald")
print("success old mcdonald")
print("test bing crosby")
print("success bing crosby")
sys.exit(0)

View File

@@ -0,0 +1,146 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
import unittest
from testtools.compat import _b, BytesIO
import subunit.chunked
class TestDecode(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.output = BytesIO()
self.decoder = subunit.chunked.Decoder(self.output)
def test_close_read_length_short_errors(self):
self.assertRaises(ValueError, self.decoder.close)
def test_close_body_short_errors(self):
self.assertEqual(None, self.decoder.write(_b('2\r\na')))
self.assertRaises(ValueError, self.decoder.close)
def test_close_body_buffered_data_errors(self):
self.assertEqual(None, self.decoder.write(_b('2\r')))
self.assertRaises(ValueError, self.decoder.close)
def test_close_after_finished_stream_safe(self):
self.assertEqual(None, self.decoder.write(_b('2\r\nab')))
self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
self.decoder.close()
def test_decode_nothing(self):
self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
self.assertEqual(_b(''), self.output.getvalue())
def test_decode_serialised_form(self):
self.assertEqual(None, self.decoder.write(_b("F\r\n")))
self.assertEqual(None, self.decoder.write(_b("serialised\n")))
self.assertEqual(_b(''), self.decoder.write(_b("form0\r\n")))
def test_decode_short(self):
self.assertEqual(_b(''), self.decoder.write(_b('3\r\nabc0\r\n')))
self.assertEqual(_b('abc'), self.output.getvalue())
def test_decode_combines_short(self):
self.assertEqual(_b(''), self.decoder.write(_b('6\r\nabcdef0\r\n')))
self.assertEqual(_b('abcdef'), self.output.getvalue())
def test_decode_excess_bytes_from_write(self):
self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
self.assertEqual(_b('abc'), self.output.getvalue())
def test_decode_write_after_finished_errors(self):
self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
self.assertRaises(ValueError, self.decoder.write, _b(''))
def test_decode_hex(self):
self.assertEqual(_b(''), self.decoder.write(_b('A\r\n12345678900\r\n')))
self.assertEqual(_b('1234567890'), self.output.getvalue())
def test_decode_long_ranges(self):
self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
self.assertEqual(None, self.decoder.write(_b('1' * 65536)))
self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
self.assertEqual(None, self.decoder.write(_b('2' * 65536)))
self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
self.assertEqual(_b('1' * 65536 + '2' * 65536), self.output.getvalue())
def test_decode_newline_nonstrict(self):
"""Tolerate chunk markers with no CR character."""
# From <http://pad.lv/505078>
self.decoder = subunit.chunked.Decoder(self.output, strict=False)
self.assertEqual(None, self.decoder.write(_b('a\n')))
self.assertEqual(None, self.decoder.write(_b('abcdeabcde')))
self.assertEqual(_b(''), self.decoder.write(_b('0\n')))
self.assertEqual(_b('abcdeabcde'), self.output.getvalue())
def test_decode_strict_newline_only(self):
"""Reject chunk markers with no CR character in strict mode."""
# From <http://pad.lv/505078>
self.assertRaises(ValueError,
self.decoder.write, _b('a\n'))
def test_decode_strict_multiple_crs(self):
self.assertRaises(ValueError,
self.decoder.write, _b('a\r\r\n'))
def test_decode_short_header(self):
self.assertRaises(ValueError,
self.decoder.write, _b('\n'))
class TestEncode(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.output = BytesIO()
self.encoder = subunit.chunked.Encoder(self.output)
def test_encode_nothing(self):
self.encoder.close()
self.assertEqual(_b('0\r\n'), self.output.getvalue())
def test_encode_empty(self):
self.encoder.write(_b(''))
self.encoder.close()
self.assertEqual(_b('0\r\n'), self.output.getvalue())
def test_encode_short(self):
self.encoder.write(_b('abc'))
self.encoder.close()
self.assertEqual(_b('3\r\nabc0\r\n'), self.output.getvalue())
def test_encode_combines_short(self):
self.encoder.write(_b('abc'))
self.encoder.write(_b('def'))
self.encoder.close()
self.assertEqual(_b('6\r\nabcdef0\r\n'), self.output.getvalue())
def test_encode_over_9_is_in_hex(self):
self.encoder.write(_b('1234567890'))
self.encoder.close()
self.assertEqual(_b('A\r\n12345678900\r\n'), self.output.getvalue())
def test_encode_long_ranges_not_combined(self):
self.encoder.write(_b('1' * 65536))
self.encoder.write(_b('2' * 65536))
self.encoder.close()
self.assertEqual(_b('10000\r\n' + '1' * 65536 + '10000\r\n' +
'2' * 65536 + '0\r\n'), self.output.getvalue())

View File

@@ -0,0 +1,106 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
import unittest
from testtools.compat import _b, StringIO
import subunit.tests
from subunit import content, content_type, details
class TestSimpleDetails(unittest.TestCase):
def test_lineReceived(self):
parser = details.SimpleDetailsParser(None)
parser.lineReceived(_b("foo\n"))
parser.lineReceived(_b("bar\n"))
self.assertEqual(_b("foo\nbar\n"), parser._message)
def test_lineReceived_escaped_bracket(self):
parser = details.SimpleDetailsParser(None)
parser.lineReceived(_b("foo\n"))
parser.lineReceived(_b(" ]are\n"))
parser.lineReceived(_b("bar\n"))
self.assertEqual(_b("foo\n]are\nbar\n"), parser._message)
def test_get_message(self):
parser = details.SimpleDetailsParser(None)
self.assertEqual(_b(""), parser.get_message())
def test_get_details(self):
parser = details.SimpleDetailsParser(None)
traceback = ""
expected = {}
expected['traceback'] = content.Content(
content_type.ContentType("text", "x-traceback",
{'charset': 'utf8'}),
lambda:[_b("")])
found = parser.get_details()
self.assertEqual(expected.keys(), found.keys())
self.assertEqual(expected['traceback'].content_type,
found['traceback'].content_type)
self.assertEqual(_b('').join(expected['traceback'].iter_bytes()),
_b('').join(found['traceback'].iter_bytes()))
def test_get_details_skip(self):
parser = details.SimpleDetailsParser(None)
traceback = ""
expected = {}
expected['reason'] = content.Content(
content_type.ContentType("text", "plain"),
lambda:[_b("")])
found = parser.get_details("skip")
self.assertEqual(expected, found)
def test_get_details_success(self):
parser = details.SimpleDetailsParser(None)
traceback = ""
expected = {}
expected['message'] = content.Content(
content_type.ContentType("text", "plain"),
lambda:[_b("")])
found = parser.get_details("success")
self.assertEqual(expected, found)
class TestMultipartDetails(unittest.TestCase):
def test_get_message_is_None(self):
parser = details.MultipartDetailsParser(None)
self.assertEqual(None, parser.get_message())
def test_get_details(self):
parser = details.MultipartDetailsParser(None)
self.assertEqual({}, parser.get_details())
def test_parts(self):
parser = details.MultipartDetailsParser(None)
parser.lineReceived(_b("Content-Type: text/plain\n"))
parser.lineReceived(_b("something\n"))
parser.lineReceived(_b("F\r\n"))
parser.lineReceived(_b("serialised\n"))
parser.lineReceived(_b("form0\r\n"))
expected = {}
expected['something'] = content.Content(
content_type.ContentType("text", "plain"),
lambda:[_b("serialised\nform")])
found = parser.get_details()
self.assertEqual(expected.keys(), found.keys())
self.assertEqual(expected['something'].content_type,
found['something'].content_type)
self.assertEqual(_b('').join(expected['something'].iter_bytes()),
_b('').join(found['something'].iter_bytes()))

View File

@@ -0,0 +1,35 @@
#
# subunit: extensions to Python unittest to get test results from subprocesses.
# Copyright (C) 2013 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
import sys
from tempfile import NamedTemporaryFile
from testtools import TestCase
from subunit.filters import find_stream
class TestFindStream(TestCase):
def test_no_argv(self):
self.assertEqual('foo', find_stream('foo', []))
def test_opens_file(self):
f = NamedTemporaryFile()
f.write(b'foo')
f.flush()
stream = find_stream('bar', [f.name])
self.assertEqual(b'foo', stream.read())

View File

@@ -0,0 +1,112 @@
#
# subunit: extensions to Python unittest to get test results from subprocesses.
# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
import unittest
import subunit
from subunit.progress_model import ProgressModel
class TestProgressModel(unittest.TestCase):
def assertProgressSummary(self, pos, total, progress):
"""Assert that a progress model has reached a particular point."""
self.assertEqual(pos, progress.pos())
self.assertEqual(total, progress.width())
def test_new_progress_0_0(self):
progress = ProgressModel()
self.assertProgressSummary(0, 0, progress)
def test_advance_0_0(self):
progress = ProgressModel()
progress.advance()
self.assertProgressSummary(1, 0, progress)
def test_advance_1_0(self):
progress = ProgressModel()
progress.advance()
self.assertProgressSummary(1, 0, progress)
def test_set_width_absolute(self):
progress = ProgressModel()
progress.set_width(10)
self.assertProgressSummary(0, 10, progress)
def test_set_width_absolute_preserves_pos(self):
progress = ProgressModel()
progress.advance()
progress.set_width(2)
self.assertProgressSummary(1, 2, progress)
def test_adjust_width(self):
progress = ProgressModel()
progress.adjust_width(10)
self.assertProgressSummary(0, 10, progress)
progress.adjust_width(-10)
self.assertProgressSummary(0, 0, progress)
def test_adjust_width_preserves_pos(self):
progress = ProgressModel()
progress.advance()
progress.adjust_width(10)
self.assertProgressSummary(1, 10, progress)
progress.adjust_width(-10)
self.assertProgressSummary(1, 0, progress)
def test_push_preserves_progress(self):
progress = ProgressModel()
progress.adjust_width(3)
progress.advance()
progress.push()
self.assertProgressSummary(1, 3, progress)
def test_advance_advances_substack(self):
progress = ProgressModel()
progress.adjust_width(3)
progress.advance()
progress.push()
progress.adjust_width(1)
progress.advance()
self.assertProgressSummary(2, 3, progress)
def test_adjust_width_adjusts_substack(self):
progress = ProgressModel()
progress.adjust_width(3)
progress.advance()
progress.push()
progress.adjust_width(2)
progress.advance()
self.assertProgressSummary(3, 6, progress)
def test_set_width_adjusts_substack(self):
progress = ProgressModel()
progress.adjust_width(3)
progress.advance()
progress.push()
progress.set_width(2)
progress.advance()
self.assertProgressSummary(3, 6, progress)
def test_pop_restores_progress(self):
progress = ProgressModel()
progress.adjust_width(3)
progress.advance()
progress.push()
progress.adjust_width(1)
progress.advance()
progress.pop()
self.assertProgressSummary(1, 3, progress)

View File

@@ -0,0 +1,64 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2011 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
from testtools.compat import BytesIO
import unittest
from testtools import PlaceHolder, TestCase
from testtools.testresult.doubles import StreamResult
import subunit
from subunit import run
from subunit.run import SubunitTestRunner
class TestSubunitTestRunner(TestCase):
def test_includes_timing_output(self):
io = BytesIO()
runner = SubunitTestRunner(stream=io)
test = PlaceHolder('name')
runner.run(test)
io.seek(0)
eventstream = StreamResult()
subunit.ByteStreamToStreamResult(io).run(eventstream)
timestamps = [event[-1] for event in eventstream._events
if event is not None]
self.assertNotEqual([], timestamps)
def test_enumerates_tests_before_run(self):
io = BytesIO()
runner = SubunitTestRunner(stream=io)
test1 = PlaceHolder('name1')
test2 = PlaceHolder('name2')
case = unittest.TestSuite([test1, test2])
runner.run(case)
io.seek(0)
eventstream = StreamResult()
subunit.ByteStreamToStreamResult(io).run(eventstream)
self.assertEqual([
('status', 'name1', 'exists'),
('status', 'name2', 'exists'),
], [event[:3] for event in eventstream._events[:2]])
def test_list_errors_if_errors_from_list_test(self):
io = BytesIO()
runner = SubunitTestRunner(stream=io)
def list_test(test):
return [], ['failed import']
self.patch(run, 'list_test', list_test)
exc = self.assertRaises(SystemExit, runner.list, None)
self.assertEqual((2,), exc.args)

View File

@@ -0,0 +1,346 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Tests for subunit.TestResultFilter."""
from datetime import datetime
import os
import subprocess
import sys
from subunit import iso8601
import unittest
from testtools import TestCase
from testtools.compat import _b, BytesIO
from testtools.testresult.doubles import ExtendedTestResult, StreamResult
import subunit
from subunit.test_results import make_tag_filter, TestResultFilter
from subunit import ByteStreamToStreamResult, StreamResultToBytes
class TestTestResultFilter(TestCase):
"""Test for TestResultFilter, a TestResult object which filters tests."""
# While TestResultFilter works on python objects, using a subunit stream
# is an easy pithy way of getting a series of test objects to call into
# the TestResult, and as TestResultFilter is intended for use with subunit
# also has the benefit of detecting any interface skew issues.
example_subunit_stream = _b("""\
tags: global
test passed
success passed
test failed
tags: local
failure failed
test error
error error [
error details
]
test skipped
skip skipped
test todo
xfail todo
""")
def run_tests(self, result_filter, input_stream=None):
"""Run tests through the given filter.
:param result_filter: A filtering TestResult object.
:param input_stream: Bytes of subunit stream data. If not provided,
uses TestTestResultFilter.example_subunit_stream.
"""
if input_stream is None:
input_stream = self.example_subunit_stream
test = subunit.ProtocolTestCase(BytesIO(input_stream))
test.run(result_filter)
def test_default(self):
"""The default is to exclude success and include everything else."""
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result)
self.run_tests(result_filter)
# skips are seen as success by default python TestResult.
self.assertEqual(['error'],
[error[0].id() for error in filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
filtered_result.failures])
self.assertEqual(4, filtered_result.testsRun)
def test_tag_filter(self):
tag_filter = make_tag_filter(['global'], ['local'])
result = ExtendedTestResult()
result_filter = TestResultFilter(
result, filter_success=False, filter_predicate=tag_filter)
self.run_tests(result_filter)
tests_included = [
event[1] for event in result._events if event[0] == 'startTest']
tests_expected = list(map(
subunit.RemotedTestCase,
['passed', 'error', 'skipped', 'todo']))
self.assertEquals(tests_expected, tests_included)
def test_tags_tracked_correctly(self):
tag_filter = make_tag_filter(['a'], [])
result = ExtendedTestResult()
result_filter = TestResultFilter(
result, filter_success=False, filter_predicate=tag_filter)
input_stream = _b(
"test: foo\n"
"tags: a\n"
"successful: foo\n"
"test: bar\n"
"successful: bar\n")
self.run_tests(result_filter, input_stream)
foo = subunit.RemotedTestCase('foo')
self.assertEquals(
[('startTest', foo),
('tags', set(['a']), set()),
('addSuccess', foo),
('stopTest', foo),
],
result._events)
def test_exclude_errors(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result, filter_error=True)
self.run_tests(result_filter)
# skips are seen as errors by default python TestResult.
self.assertEqual([], filtered_result.errors)
self.assertEqual(['failed'],
[failure[0].id() for failure in
filtered_result.failures])
self.assertEqual(3, filtered_result.testsRun)
def test_fixup_expected_failures(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result,
fixup_expected_failures=set(["failed"]))
self.run_tests(result_filter)
self.assertEqual(['failed', 'todo'],
[failure[0].id() for failure in filtered_result.expectedFailures])
self.assertEqual([], filtered_result.failures)
self.assertEqual(4, filtered_result.testsRun)
def test_fixup_expected_errors(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result,
fixup_expected_failures=set(["error"]))
self.run_tests(result_filter)
self.assertEqual(['error', 'todo'],
[failure[0].id() for failure in filtered_result.expectedFailures])
self.assertEqual([], filtered_result.errors)
self.assertEqual(4, filtered_result.testsRun)
def test_fixup_unexpected_success(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result, filter_success=False,
fixup_expected_failures=set(["passed"]))
self.run_tests(result_filter)
self.assertEqual(['passed'],
[passed.id() for passed in filtered_result.unexpectedSuccesses])
self.assertEqual(5, filtered_result.testsRun)
def test_exclude_failure(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result, filter_failure=True)
self.run_tests(result_filter)
self.assertEqual(['error'],
[error[0].id() for error in filtered_result.errors])
self.assertEqual([],
[failure[0].id() for failure in
filtered_result.failures])
self.assertEqual(3, filtered_result.testsRun)
def test_exclude_skips(self):
filtered_result = subunit.TestResultStats(None)
result_filter = TestResultFilter(filtered_result, filter_skip=True)
self.run_tests(result_filter)
self.assertEqual(0, filtered_result.skipped_tests)
self.assertEqual(2, filtered_result.failed_tests)
self.assertEqual(3, filtered_result.testsRun)
def test_include_success(self):
"""Successes can be included if requested."""
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result,
filter_success=False)
self.run_tests(result_filter)
self.assertEqual(['error'],
[error[0].id() for error in filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
filtered_result.failures])
self.assertEqual(5, filtered_result.testsRun)
def test_filter_predicate(self):
"""You can filter by predicate callbacks"""
# 0.0.7 and earlier did not support the 'tags' parameter, so we need
# to test that we still support behaviour without it.
filtered_result = unittest.TestResult()
def filter_cb(test, outcome, err, details):
return outcome == 'success'
result_filter = TestResultFilter(filtered_result,
filter_predicate=filter_cb,
filter_success=False)
self.run_tests(result_filter)
# Only success should pass
self.assertEqual(1, filtered_result.testsRun)
def test_filter_predicate_with_tags(self):
"""You can filter by predicate callbacks that accept tags"""
filtered_result = unittest.TestResult()
def filter_cb(test, outcome, err, details, tags):
return outcome == 'success'
result_filter = TestResultFilter(filtered_result,
filter_predicate=filter_cb,
filter_success=False)
self.run_tests(result_filter)
# Only success should pass
self.assertEqual(1, filtered_result.testsRun)
def test_time_ordering_preserved(self):
# Passing a subunit stream through TestResultFilter preserves the
# relative ordering of 'time' directives and any other subunit
# directives that are still included.
date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
subunit_stream = _b('\n'.join([
"time: %s",
"test: foo",
"time: %s",
"error: foo",
"time: %s",
""]) % (date_a, date_b, date_c))
result = ExtendedTestResult()
result_filter = TestResultFilter(result)
self.run_tests(result_filter, subunit_stream)
foo = subunit.RemotedTestCase('foo')
self.maxDiff = None
self.assertEqual(
[('time', date_a),
('time', date_b),
('startTest', foo),
('addError', foo, {}),
('stopTest', foo),
('time', date_c)], result._events)
def test_time_passes_through_filtered_tests(self):
# Passing a subunit stream through TestResultFilter preserves 'time'
# directives even if a specific test is filtered out.
date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
subunit_stream = _b('\n'.join([
"time: %s",
"test: foo",
"time: %s",
"success: foo",
"time: %s",
""]) % (date_a, date_b, date_c))
result = ExtendedTestResult()
result_filter = TestResultFilter(result)
result_filter.startTestRun()
self.run_tests(result_filter, subunit_stream)
result_filter.stopTestRun()
foo = subunit.RemotedTestCase('foo')
self.maxDiff = None
self.assertEqual(
[('startTestRun',),
('time', date_a),
('time', date_c),
('stopTestRun',),], result._events)
def test_skip_preserved(self):
subunit_stream = _b('\n'.join([
"test: foo",
"skip: foo",
""]))
result = ExtendedTestResult()
result_filter = TestResultFilter(result)
self.run_tests(result_filter, subunit_stream)
foo = subunit.RemotedTestCase('foo')
self.assertEquals(
[('startTest', foo),
('addSkip', foo, {}),
('stopTest', foo), ], result._events)
if sys.version_info < (2, 7):
# These tests require Python >=2.7.
del test_fixup_expected_failures, test_fixup_expected_errors, test_fixup_unexpected_success
class TestFilterCommand(TestCase):
def run_command(self, args, stream):
root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
script_path = os.path.join(root, 'filters', 'subunit-filter')
command = [sys.executable, script_path] + list(args)
ps = subprocess.Popen(
command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = ps.communicate(stream)
if ps.returncode != 0:
raise RuntimeError("%s failed: %s" % (command, err))
return out
def test_default(self):
byte_stream = BytesIO()
stream = StreamResultToBytes(byte_stream)
stream.status(test_id="foo", test_status="inprogress")
stream.status(test_id="foo", test_status="skip")
output = self.run_command([], byte_stream.getvalue())
events = StreamResult()
ByteStreamToStreamResult(BytesIO(output)).run(events)
ids = set(event[1] for event in events._events)
self.assertEqual([
('status', 'foo', 'inprogress'),
('status', 'foo', 'skip'),
], [event[:3] for event in events._events])
def test_tags(self):
byte_stream = BytesIO()
stream = StreamResultToBytes(byte_stream)
stream.status(
test_id="foo", test_status="inprogress", test_tags=set(["a"]))
stream.status(
test_id="foo", test_status="success", test_tags=set(["a"]))
stream.status(test_id="bar", test_status="inprogress")
stream.status(test_id="bar", test_status="inprogress")
stream.status(
test_id="baz", test_status="inprogress", test_tags=set(["a"]))
stream.status(
test_id="baz", test_status="success", test_tags=set(["a"]))
output = self.run_command(
['-s', '--with-tag', 'a'], byte_stream.getvalue())
events = StreamResult()
ByteStreamToStreamResult(BytesIO(output)).run(events)
ids = set(event[1] for event in events._events)
self.assertEqual(set(['foo', 'baz']), ids)
def test_no_passthrough(self):
output = self.run_command(['--no-passthrough'], b'hi thar')
self.assertEqual(b'', output)
def test_passthrough(self):
output = self.run_command([], b'hi thar')
byte_stream = BytesIO()
stream = StreamResultToBytes(byte_stream)
stream.status(file_name="stdout", file_bytes=b'hi thar')
self.assertEqual(byte_stream.getvalue(), output)

View File

@@ -0,0 +1,78 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Tests for subunit.TestResultStats."""
import unittest
from testtools.compat import _b, BytesIO, StringIO
import subunit
class TestTestResultStats(unittest.TestCase):
"""Test for TestResultStats, a TestResult object that generates stats."""
def setUp(self):
self.output = StringIO()
self.result = subunit.TestResultStats(self.output)
self.input_stream = BytesIO()
self.test = subunit.ProtocolTestCase(self.input_stream)
def test_stats_empty(self):
self.test.run(self.result)
self.assertEqual(0, self.result.total_tests)
self.assertEqual(0, self.result.passed_tests)
self.assertEqual(0, self.result.failed_tests)
self.assertEqual(set(), self.result.seen_tags)
def setUpUsedStream(self):
self.input_stream.write(_b("""tags: global
test passed
success passed
test failed
tags: local
failure failed
test error
error error
test skipped
skip skipped
test todo
xfail todo
"""))
self.input_stream.seek(0)
self.test.run(self.result)
def test_stats_smoke_everything(self):
# Statistics are calculated usefully.
self.setUpUsedStream()
self.assertEqual(5, self.result.total_tests)
self.assertEqual(2, self.result.passed_tests)
self.assertEqual(2, self.result.failed_tests)
self.assertEqual(1, self.result.skipped_tests)
self.assertEqual(set(["global", "local"]), self.result.seen_tags)
def test_stat_formatting(self):
expected = ("""
Total tests: 5
Passed tests: 2
Failed tests: 2
Skipped tests: 1
Seen tags: global, local
""")[1:]
self.setUpUsedStream()
self.result.formatStats()
self.assertEqual(expected, self.output.getvalue())

View File

@@ -0,0 +1,85 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Tests for subunit.tag_stream."""
from io import BytesIO
import testtools
from testtools.matchers import Contains
import subunit
import subunit.test_results
class TestSubUnitTags(testtools.TestCase):
def setUp(self):
super(TestSubUnitTags, self).setUp()
self.original = BytesIO()
self.filtered = BytesIO()
def test_add_tag(self):
# Literal values to avoid set sort-order dependencies. Python code show
# derivation.
# reference = BytesIO()
# stream = subunit.StreamResultToBytes(reference)
# stream.status(
# test_id='test', test_status='inprogress', test_tags=set(['quux', 'foo']))
# stream.status(
# test_id='test', test_status='success', test_tags=set(['bar', 'quux', 'foo']))
reference = [
b'\xb3)\x82\x17\x04test\x02\x04quux\x03foo\x05\x97n\x86\xb3)'
b'\x83\x1b\x04test\x03\x03bar\x04quux\x03fooqn\xab)',
b'\xb3)\x82\x17\x04test\x02\x04quux\x03foo\x05\x97n\x86\xb3)'
b'\x83\x1b\x04test\x03\x04quux\x03foo\x03bar\xaf\xbd\x9d\xd6',
b'\xb3)\x82\x17\x04test\x02\x04quux\x03foo\x05\x97n\x86\xb3)'
b'\x83\x1b\x04test\x03\x04quux\x03bar\x03foo\x03\x04b\r',
b'\xb3)\x82\x17\x04test\x02\x04quux\x03foo\x05\x97n\x86\xb3)'
b'\x83\x1b\x04test\x03\x03bar\x03foo\x04quux\xd2\x18\x1bC',
b'\xb3)\x82\x17\x04test\x02\x03foo\x04quux\xa6\xe1\xde\xec\xb3)'
b'\x83\x1b\x04test\x03\x03foo\x04quux\x03bar\x08\xc2X\x83',
b'\xb3)\x82\x17\x04test\x02\x03foo\x04quux\xa6\xe1\xde\xec\xb3)'
b'\x83\x1b\x04test\x03\x03bar\x03foo\x04quux\xd2\x18\x1bC',
b'\xb3)\x82\x17\x04test\x02\x03foo\x04quux\xa6\xe1\xde\xec\xb3)'
b'\x83\x1b\x04test\x03\x03foo\x03bar\x04quux:\x05e\x80',
]
stream = subunit.StreamResultToBytes(self.original)
stream.status(
test_id='test', test_status='inprogress', test_tags=set(['foo']))
stream.status(
test_id='test', test_status='success', test_tags=set(['foo', 'bar']))
self.original.seek(0)
self.assertEqual(
0, subunit.tag_stream(self.original, self.filtered, ["quux"]))
self.assertThat(reference, Contains(self.filtered.getvalue()))
def test_remove_tag(self):
reference = BytesIO()
stream = subunit.StreamResultToBytes(reference)
stream.status(
test_id='test', test_status='inprogress', test_tags=set(['foo']))
stream.status(
test_id='test', test_status='success', test_tags=set(['foo']))
stream = subunit.StreamResultToBytes(self.original)
stream.status(
test_id='test', test_status='inprogress', test_tags=set(['foo']))
stream.status(
test_id='test', test_status='success', test_tags=set(['foo', 'bar']))
self.original.seek(0)
self.assertEqual(
0, subunit.tag_stream(self.original, self.filtered, ["-bar"]))
self.assertEqual(reference.getvalue(), self.filtered.getvalue())

View File

@@ -0,0 +1,387 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Tests for TAP2SubUnit."""
from io import BytesIO, StringIO
import unittest
from testtools import TestCase
from testtools.compat import _u
from testtools.testresult.doubles import StreamResult
import subunit
UTF8_TEXT = 'text/plain; charset=UTF8'
class TestTAP2SubUnit(TestCase):
"""Tests for TAP2SubUnit.
These tests test TAP string data in, and subunit string data out.
This is ok because the subunit protocol is intended to be stable,
but it might be easier/pithier to write tests against TAP string in,
parsed subunit objects out (by hooking the subunit stream to a subunit
protocol server.
"""
def setUp(self):
super(TestTAP2SubUnit, self).setUp()
self.tap = StringIO()
self.subunit = BytesIO()
def test_skip_entire_file(self):
# A file
# 1..- # Skipped: comment
# results in a single skipped test.
self.tap.write(_u("1..0 # Skipped: entire file skipped\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([('status', 'file skip', 'skip', None, True,
'tap comment', b'Skipped: entire file skipped', True, None, None,
None)])
def test_ok_test_pass(self):
# A file
# ok
# results in a passed test with name 'test 1' (a synthetic name as tap
# does not require named fixtures - it is the first test in the tap
# stream).
self.tap.write(_u("ok\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([('status', 'test 1', 'success', None, False, None,
None, True, None, None, None)])
def test_ok_test_number_pass(self):
# A file
# ok 1
# results in a passed test with name 'test 1'
self.tap.write(_u("ok 1\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([('status', 'test 1', 'success', None, False, None,
None, True, None, None, None)])
def test_ok_test_number_description_pass(self):
# A file
# ok 1 - There is a description
# results in a passed test with name 'test 1 - There is a description'
self.tap.write(_u("ok 1 - There is a description\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([('status', 'test 1 - There is a description',
'success', None, False, None, None, True, None, None, None)])
def test_ok_test_description_pass(self):
# A file
# ok There is a description
# results in a passed test with name 'test 1 There is a description'
self.tap.write(_u("ok There is a description\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([('status', 'test 1 There is a description',
'success', None, False, None, None, True, None, None, None)])
def test_ok_SKIP_skip(self):
# A file
# ok # SKIP
# results in a skkip test with name 'test 1'
self.tap.write(_u("ok # SKIP\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([('status', 'test 1', 'skip', None, False, None,
None, True, None, None, None)])
def test_ok_skip_number_comment_lowercase(self):
self.tap.write(_u("ok 1 # skip no samba environment available, skipping compilation\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([('status', 'test 1', 'skip', None, False, 'tap comment',
b'no samba environment available, skipping compilation', True,
'text/plain; charset=UTF8', None, None)])
def test_ok_number_description_SKIP_skip_comment(self):
# A file
# ok 1 foo # SKIP Not done yet
# results in a skip test with name 'test 1 foo' and a log of
# Not done yet
self.tap.write(_u("ok 1 foo # SKIP Not done yet\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([('status', 'test 1 foo', 'skip', None, False,
'tap comment', b'Not done yet', True, 'text/plain; charset=UTF8',
None, None)])
def test_ok_SKIP_skip_comment(self):
# A file
# ok # SKIP Not done yet
# results in a skip test with name 'test 1' and a log of Not done yet
self.tap.write(_u("ok # SKIP Not done yet\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([('status', 'test 1', 'skip', None, False,
'tap comment', b'Not done yet', True, 'text/plain; charset=UTF8',
None, None)])
def test_ok_TODO_xfail(self):
# A file
# ok # TODO
# results in a xfail test with name 'test 1'
self.tap.write(_u("ok # TODO\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([('status', 'test 1', 'xfail', None, False, None,
None, True, None, None, None)])
def test_ok_TODO_xfail_comment(self):
# A file
# ok # TODO Not done yet
# results in a xfail test with name 'test 1' and a log of Not done yet
self.tap.write(_u("ok # TODO Not done yet\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([('status', 'test 1', 'xfail', None, False,
'tap comment', b'Not done yet', True, 'text/plain; charset=UTF8',
None, None)])
def test_bail_out_errors(self):
# A file with line in it
# Bail out! COMMENT
# is treated as an error
self.tap.write(_u("ok 1 foo\n"))
self.tap.write(_u("Bail out! Lifejacket engaged\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([
('status', 'test 1 foo', 'success', None, False, None, None, True,
None, None, None),
('status', 'Bail out! Lifejacket engaged', 'fail', None, False,
None, None, True, None, None, None)])
def test_missing_test_at_end_with_plan_adds_error(self):
# A file
# 1..3
# ok first test
# not ok third test
# results in three tests, with the third being created
self.tap.write(_u('1..3\n'))
self.tap.write(_u('ok first test\n'))
self.tap.write(_u('not ok second test\n'))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([
('status', 'test 1 first test', 'success', None, False, None,
None, True, None, None, None),
('status', 'test 2 second test', 'fail', None, False, None, None,
True, None, None, None),
('status', 'test 3', 'fail', None, False, 'tap meta',
b'test missing from TAP output', True, 'text/plain; charset=UTF8',
None, None)])
def test_missing_test_with_plan_adds_error(self):
# A file
# 1..3
# ok first test
# not ok 3 third test
# results in three tests, with the second being created
self.tap.write(_u('1..3\n'))
self.tap.write(_u('ok first test\n'))
self.tap.write(_u('not ok 3 third test\n'))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([
('status', 'test 1 first test', 'success', None, False, None, None,
True, None, None, None),
('status', 'test 2', 'fail', None, False, 'tap meta',
b'test missing from TAP output', True, 'text/plain; charset=UTF8',
None, None),
('status', 'test 3 third test', 'fail', None, False, None, None,
True, None, None, None)])
def test_missing_test_no_plan_adds_error(self):
# A file
# ok first test
# not ok 3 third test
# results in three tests, with the second being created
self.tap.write(_u('ok first test\n'))
self.tap.write(_u('not ok 3 third test\n'))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([
('status', 'test 1 first test', 'success', None, False, None, None,
True, None, None, None),
('status', 'test 2', 'fail', None, False, 'tap meta',
b'test missing from TAP output', True, 'text/plain; charset=UTF8',
None, None),
('status', 'test 3 third test', 'fail', None, False, None, None,
True, None, None, None)])
def test_four_tests_in_a_row_trailing_plan(self):
# A file
# ok 1 - first test in a script with no plan at all
# not ok 2 - second
# ok 3 - third
# not ok 4 - fourth
# 1..4
# results in four tests numbered and named
self.tap.write(_u('ok 1 - first test in a script with trailing plan\n'))
self.tap.write(_u('not ok 2 - second\n'))
self.tap.write(_u('ok 3 - third\n'))
self.tap.write(_u('not ok 4 - fourth\n'))
self.tap.write(_u('1..4\n'))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([
('status', 'test 1 - first test in a script with trailing plan',
'success', None, False, None, None, True, None, None, None),
('status', 'test 2 - second', 'fail', None, False, None, None,
True, None, None, None),
('status', 'test 3 - third', 'success', None, False, None, None,
True, None, None, None),
('status', 'test 4 - fourth', 'fail', None, False, None, None,
True, None, None, None)])
def test_four_tests_in_a_row_with_plan(self):
# A file
# 1..4
# ok 1 - first test in a script with no plan at all
# not ok 2 - second
# ok 3 - third
# not ok 4 - fourth
# results in four tests numbered and named
self.tap.write(_u('1..4\n'))
self.tap.write(_u('ok 1 - first test in a script with a plan\n'))
self.tap.write(_u('not ok 2 - second\n'))
self.tap.write(_u('ok 3 - third\n'))
self.tap.write(_u('not ok 4 - fourth\n'))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([
('status', 'test 1 - first test in a script with a plan',
'success', None, False, None, None, True, None, None, None),
('status', 'test 2 - second', 'fail', None, False, None, None,
True, None, None, None),
('status', 'test 3 - third', 'success', None, False, None, None,
True, None, None, None),
('status', 'test 4 - fourth', 'fail', None, False, None, None,
True, None, None, None)])
def test_four_tests_in_a_row_no_plan(self):
# A file
# ok 1 - first test in a script with no plan at all
# not ok 2 - second
# ok 3 - third
# not ok 4 - fourth
# results in four tests numbered and named
self.tap.write(_u('ok 1 - first test in a script with no plan at all\n'))
self.tap.write(_u('not ok 2 - second\n'))
self.tap.write(_u('ok 3 - third\n'))
self.tap.write(_u('not ok 4 - fourth\n'))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([
('status', 'test 1 - first test in a script with no plan at all',
'success', None, False, None, None, True, None, None, None),
('status', 'test 2 - second', 'fail', None, False, None, None,
True, None, None, None),
('status', 'test 3 - third', 'success', None, False, None, None,
True, None, None, None),
('status', 'test 4 - fourth', 'fail', None, False, None, None,
True, None, None, None)])
def test_todo_and_skip(self):
# A file
# not ok 1 - a fail but # TODO but is TODO
# not ok 2 - another fail # SKIP instead
# results in two tests, numbered and commented.
self.tap.write(_u("not ok 1 - a fail but # TODO but is TODO\n"))
self.tap.write(_u("not ok 2 - another fail # SKIP instead\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.subunit.seek(0)
events = StreamResult()
subunit.ByteStreamToStreamResult(self.subunit).run(events)
self.check_events([
('status', 'test 1 - a fail but', 'xfail', None, False,
'tap comment', b'but is TODO', True, 'text/plain; charset=UTF8',
None, None),
('status', 'test 2 - another fail', 'skip', None, False,
'tap comment', b'instead', True, 'text/plain; charset=UTF8',
None, None)])
def test_leading_comments_add_to_next_test_log(self):
# A file
# # comment
# ok
# ok
# results in a single test with the comment included
# in the first test and not the second.
self.tap.write(_u("# comment\n"))
self.tap.write(_u("ok\n"))
self.tap.write(_u("ok\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([
('status', 'test 1', 'success', None, False, 'tap comment',
b'# comment', True, 'text/plain; charset=UTF8', None, None),
('status', 'test 2', 'success', None, False, None, None, True,
None, None, None)])
def test_trailing_comments_are_included_in_last_test_log(self):
# A file
# ok foo
# ok foo
# # comment
# results in a two tests, with the second having the comment
# attached to its log.
self.tap.write(_u("ok\n"))
self.tap.write(_u("ok\n"))
self.tap.write(_u("# comment\n"))
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.check_events([
('status', 'test 1', 'success', None, False, None, None, True,
None, None, None),
('status', 'test 2', 'success', None, False, 'tap comment',
b'# comment', True, 'text/plain; charset=UTF8', None, None)])
def check_events(self, events):
self.subunit.seek(0)
eventstream = StreamResult()
subunit.ByteStreamToStreamResult(self.subunit).run(eventstream)
self.assertEqual(events, eventstream._events)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,436 @@
#
# subunit: extensions to Python unittest to get test results from subprocesses.
# Copyright (C) 2013 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
from io import BytesIO
import datetime
from testtools import TestCase
from testtools.matchers import Contains, HasLength
from testtools.tests.test_testresult import TestStreamResultContract
from testtools.testresult.doubles import StreamResult
import subunit
import subunit.iso8601 as iso8601
CONSTANT_ENUM = b'\xb3)\x01\x0c\x03foo\x08U_\x1b'
CONSTANT_INPROGRESS = b'\xb3)\x02\x0c\x03foo\x8e\xc1-\xb5'
CONSTANT_SUCCESS = b'\xb3)\x03\x0c\x03fooE\x9d\xfe\x10'
CONSTANT_UXSUCCESS = b'\xb3)\x04\x0c\x03fooX\x98\xce\xa8'
CONSTANT_SKIP = b'\xb3)\x05\x0c\x03foo\x93\xc4\x1d\r'
CONSTANT_FAIL = b'\xb3)\x06\x0c\x03foo\x15Po\xa3'
CONSTANT_XFAIL = b'\xb3)\x07\x0c\x03foo\xde\x0c\xbc\x06'
CONSTANT_EOF = b'\xb3!\x10\x08S\x15\x88\xdc'
CONSTANT_FILE_CONTENT = b'\xb3!@\x13\x06barney\x03wooA5\xe3\x8c'
CONSTANT_MIME = b'\xb3! #\x1aapplication/foo; charset=1x3Q\x15'
CONSTANT_TIMESTAMP = b'\xb3+\x03\x13<\x17T\xcf\x80\xaf\xc8\x03barI\x96>-'
CONSTANT_ROUTE_CODE = b'\xb3-\x03\x13\x03bar\x06source\x9cY9\x19'
CONSTANT_RUNNABLE = b'\xb3(\x03\x0c\x03foo\xe3\xea\xf5\xa4'
CONSTANT_TAGS = [
b'\xb3)\x80\x15\x03bar\x02\x03foo\x03barTHn\xb4',
b'\xb3)\x80\x15\x03bar\x02\x03bar\x03foo\xf8\xf1\x91o',
]
class TestStreamResultToBytesContract(TestCase, TestStreamResultContract):
"""Check that StreamResult behaves as testtools expects."""
def _make_result(self):
return subunit.StreamResultToBytes(BytesIO())
class TestStreamResultToBytes(TestCase):
def _make_result(self):
output = BytesIO()
return subunit.StreamResultToBytes(output), output
def test_numbers(self):
result = subunit.StreamResultToBytes(BytesIO())
packet = []
self.assertRaises(Exception, result._write_number, -1, packet)
self.assertEqual([], packet)
result._write_number(0, packet)
self.assertEqual([b'\x00'], packet)
del packet[:]
result._write_number(63, packet)
self.assertEqual([b'\x3f'], packet)
del packet[:]
result._write_number(64, packet)
self.assertEqual([b'\x40\x40'], packet)
del packet[:]
result._write_number(16383, packet)
self.assertEqual([b'\x7f\xff'], packet)
del packet[:]
result._write_number(16384, packet)
self.assertEqual([b'\x80\x40', b'\x00'], packet)
del packet[:]
result._write_number(4194303, packet)
self.assertEqual([b'\xbf\xff', b'\xff'], packet)
del packet[:]
result._write_number(4194304, packet)
self.assertEqual([b'\xc0\x40\x00\x00'], packet)
del packet[:]
result._write_number(1073741823, packet)
self.assertEqual([b'\xff\xff\xff\xff'], packet)
del packet[:]
self.assertRaises(Exception, result._write_number, 1073741824, packet)
self.assertEqual([], packet)
def test_volatile_length(self):
# if the length of the packet data before the length itself is
# considered is right on the boundary for length's variable length
# encoding, it is easy to get the length wrong by not accounting for
# length itself.
# that is, the encoder has to ensure that length == sum (length_of_rest
# + length_of_length)
result, output = self._make_result()
# 1 byte short:
result.status(file_name="", file_bytes=b'\xff'*0)
self.assertThat(output.getvalue(), HasLength(10))
self.assertEqual(b'\x0a', output.getvalue()[3:4])
output.seek(0)
output.truncate()
# 1 byte long:
result.status(file_name="", file_bytes=b'\xff'*53)
self.assertThat(output.getvalue(), HasLength(63))
self.assertEqual(b'\x3f', output.getvalue()[3:4])
output.seek(0)
output.truncate()
# 2 bytes short
result.status(file_name="", file_bytes=b'\xff'*54)
self.assertThat(output.getvalue(), HasLength(65))
self.assertEqual(b'\x40\x41', output.getvalue()[3:5])
output.seek(0)
output.truncate()
# 2 bytes long
result.status(file_name="", file_bytes=b'\xff'*16371)
self.assertThat(output.getvalue(), HasLength(16383))
self.assertEqual(b'\x7f\xff', output.getvalue()[3:5])
output.seek(0)
output.truncate()
# 3 bytes short
result.status(file_name="", file_bytes=b'\xff'*16372)
self.assertThat(output.getvalue(), HasLength(16385))
self.assertEqual(b'\x80\x40\x01', output.getvalue()[3:6])
output.seek(0)
output.truncate()
# 3 bytes long
result.status(file_name="", file_bytes=b'\xff'*4194289)
self.assertThat(output.getvalue(), HasLength(4194303))
self.assertEqual(b'\xbf\xff\xff', output.getvalue()[3:6])
output.seek(0)
output.truncate()
self.assertRaises(Exception, result.status, file_name="",
file_bytes=b'\xff'*4194290)
def test_trivial_enumeration(self):
result, output = self._make_result()
result.status("foo", 'exists')
self.assertEqual(CONSTANT_ENUM, output.getvalue())
def test_inprogress(self):
result, output = self._make_result()
result.status("foo", 'inprogress')
self.assertEqual(CONSTANT_INPROGRESS, output.getvalue())
def test_success(self):
result, output = self._make_result()
result.status("foo", 'success')
self.assertEqual(CONSTANT_SUCCESS, output.getvalue())
def test_uxsuccess(self):
result, output = self._make_result()
result.status("foo", 'uxsuccess')
self.assertEqual(CONSTANT_UXSUCCESS, output.getvalue())
def test_skip(self):
result, output = self._make_result()
result.status("foo", 'skip')
self.assertEqual(CONSTANT_SKIP, output.getvalue())
def test_fail(self):
result, output = self._make_result()
result.status("foo", 'fail')
self.assertEqual(CONSTANT_FAIL, output.getvalue())
def test_xfail(self):
result, output = self._make_result()
result.status("foo", 'xfail')
self.assertEqual(CONSTANT_XFAIL, output.getvalue())
def test_unknown_status(self):
result, output = self._make_result()
self.assertRaises(Exception, result.status, "foo", 'boo')
self.assertEqual(b'', output.getvalue())
def test_eof(self):
result, output = self._make_result()
result.status(eof=True)
self.assertEqual(CONSTANT_EOF, output.getvalue())
def test_file_content(self):
result, output = self._make_result()
result.status(file_name="barney", file_bytes=b"woo")
self.assertEqual(CONSTANT_FILE_CONTENT, output.getvalue())
def test_mime(self):
result, output = self._make_result()
result.status(mime_type="application/foo; charset=1")
self.assertEqual(CONSTANT_MIME, output.getvalue())
def test_route_code(self):
result, output = self._make_result()
result.status(test_id="bar", test_status='success',
route_code="source")
self.assertEqual(CONSTANT_ROUTE_CODE, output.getvalue())
def test_runnable(self):
result, output = self._make_result()
result.status("foo", 'success', runnable=False)
self.assertEqual(CONSTANT_RUNNABLE, output.getvalue())
def test_tags(self):
result, output = self._make_result()
result.status(test_id="bar", test_tags=set(['foo', 'bar']))
self.assertThat(CONSTANT_TAGS, Contains(output.getvalue()))
def test_timestamp(self):
timestamp = datetime.datetime(2001, 12, 12, 12, 59, 59, 45,
iso8601.Utc())
result, output = self._make_result()
result.status(test_id="bar", test_status='success', timestamp=timestamp)
self.assertEqual(CONSTANT_TIMESTAMP, output.getvalue())
class TestByteStreamToStreamResult(TestCase):
def test_non_subunit_encapsulated(self):
source = BytesIO(b"foo\nbar\n")
result = StreamResult()
subunit.ByteStreamToStreamResult(
source, non_subunit_name="stdout").run(result)
self.assertEqual([
('status', None, None, None, True, 'stdout', b'f', False, None, None, None),
('status', None, None, None, True, 'stdout', b'o', False, None, None, None),
('status', None, None, None, True, 'stdout', b'o', False, None, None, None),
('status', None, None, None, True, 'stdout', b'\n', False, None, None, None),
('status', None, None, None, True, 'stdout', b'b', False, None, None, None),
('status', None, None, None, True, 'stdout', b'a', False, None, None, None),
('status', None, None, None, True, 'stdout', b'r', False, None, None, None),
('status', None, None, None, True, 'stdout', b'\n', False, None, None, None),
], result._events)
self.assertEqual(b'', source.read())
def test_signature_middle_utf8_char(self):
utf8_bytes = b'\xe3\xb3\x8a'
source = BytesIO(utf8_bytes)
# Should be treated as one character (it is u'\u3cca') and wrapped
result = StreamResult()
subunit.ByteStreamToStreamResult(
source, non_subunit_name="stdout").run(
result)
self.assertEqual([
('status', None, None, None, True, 'stdout', b'\xe3', False, None, None, None),
('status', None, None, None, True, 'stdout', b'\xb3', False, None, None, None),
('status', None, None, None, True, 'stdout', b'\x8a', False, None, None, None),
], result._events)
def test_non_subunit_disabled_raises(self):
source = BytesIO(b"foo\nbar\n")
result = StreamResult()
case = subunit.ByteStreamToStreamResult(source)
e = self.assertRaises(Exception, case.run, result)
self.assertEqual(b'f', e.args[1])
self.assertEqual(b'oo\nbar\n', source.read())
self.assertEqual([], result._events)
def test_trivial_enumeration(self):
source = BytesIO(CONSTANT_ENUM)
result = StreamResult()
subunit.ByteStreamToStreamResult(
source, non_subunit_name="stdout").run(result)
self.assertEqual(b'', source.read())
self.assertEqual([
('status', 'foo', 'exists', None, True, None, None, False, None, None, None),
], result._events)
def test_multiple_events(self):
source = BytesIO(CONSTANT_ENUM + CONSTANT_ENUM)
result = StreamResult()
subunit.ByteStreamToStreamResult(
source, non_subunit_name="stdout").run(result)
self.assertEqual(b'', source.read())
self.assertEqual([
('status', 'foo', 'exists', None, True, None, None, False, None, None, None),
('status', 'foo', 'exists', None, True, None, None, False, None, None, None),
], result._events)
def test_inprogress(self):
self.check_event(CONSTANT_INPROGRESS, 'inprogress')
def test_success(self):
self.check_event(CONSTANT_SUCCESS, 'success')
def test_uxsuccess(self):
self.check_event(CONSTANT_UXSUCCESS, 'uxsuccess')
def test_skip(self):
self.check_event(CONSTANT_SKIP, 'skip')
def test_fail(self):
self.check_event(CONSTANT_FAIL, 'fail')
def test_xfail(self):
self.check_event(CONSTANT_XFAIL, 'xfail')
def check_events(self, source_bytes, events):
source = BytesIO(source_bytes)
result = StreamResult()
subunit.ByteStreamToStreamResult(
source, non_subunit_name="stdout").run(result)
self.assertEqual(b'', source.read())
self.assertEqual(events, result._events)
#- any file attachments should be byte contents [as users assume that].
for event in result._events:
if event[5] is not None:
self.assertIsInstance(event[6], bytes)
def check_event(self, source_bytes, test_status=None, test_id="foo",
route_code=None, timestamp=None, tags=None, mime_type=None,
file_name=None, file_bytes=None, eof=False, runnable=True):
event = self._event(test_id=test_id, test_status=test_status,
tags=tags, runnable=runnable, file_name=file_name,
file_bytes=file_bytes, eof=eof, mime_type=mime_type,
route_code=route_code, timestamp=timestamp)
self.check_events(source_bytes, [event])
def _event(self, test_status=None, test_id=None, route_code=None,
timestamp=None, tags=None, mime_type=None, file_name=None,
file_bytes=None, eof=False, runnable=True):
return ('status', test_id, test_status, tags, runnable, file_name,
file_bytes, eof, mime_type, route_code, timestamp)
def test_eof(self):
self.check_event(CONSTANT_EOF, test_id=None, eof=True)
def test_file_content(self):
self.check_event(CONSTANT_FILE_CONTENT,
test_id=None, file_name="barney", file_bytes=b"woo")
def test_file_content_length_into_checksum(self):
# A bad file content length which creeps into the checksum.
bad_file_length_content = b'\xb3!@\x13\x06barney\x04woo\xdc\xe2\xdb\x35'
self.check_events(bad_file_length_content, [
self._event(test_id="subunit.parser", eof=True,
file_name="Packet data", file_bytes=bad_file_length_content,
mime_type="application/octet-stream"),
self._event(test_id="subunit.parser", test_status="fail", eof=True,
file_name="Parser Error",
file_bytes=b"File content extends past end of packet: claimed 4 bytes, 3 available",
mime_type="text/plain;charset=utf8"),
])
def test_packet_length_4_word_varint(self):
packet_data = b'\xb3!@\xc0\x00\x11'
self.check_events(packet_data, [
self._event(test_id="subunit.parser", eof=True,
file_name="Packet data", file_bytes=packet_data,
mime_type="application/octet-stream"),
self._event(test_id="subunit.parser", test_status="fail", eof=True,
file_name="Parser Error",
file_bytes=b"3 byte maximum given but 4 byte value found.",
mime_type="text/plain;charset=utf8"),
])
def test_mime(self):
self.check_event(CONSTANT_MIME,
test_id=None, mime_type='application/foo; charset=1')
def test_route_code(self):
self.check_event(CONSTANT_ROUTE_CODE,
'success', route_code="source", test_id="bar")
def test_runnable(self):
self.check_event(CONSTANT_RUNNABLE,
test_status='success', runnable=False)
def test_tags(self):
self.check_event(CONSTANT_TAGS[0],
None, tags=set(['foo', 'bar']), test_id="bar")
def test_timestamp(self):
timestamp = datetime.datetime(2001, 12, 12, 12, 59, 59, 45,
iso8601.Utc())
self.check_event(CONSTANT_TIMESTAMP,
'success', test_id='bar', timestamp=timestamp)
def test_bad_crc_errors_via_status(self):
file_bytes = CONSTANT_MIME[:-1] + b'\x00'
self.check_events( file_bytes, [
self._event(test_id="subunit.parser", eof=True,
file_name="Packet data", file_bytes=file_bytes,
mime_type="application/octet-stream"),
self._event(test_id="subunit.parser", test_status="fail", eof=True,
file_name="Parser Error",
file_bytes=b'Bad checksum - calculated (0x78335115), '
b'stored (0x78335100)',
mime_type="text/plain;charset=utf8"),
])
def test_not_utf8_in_string(self):
file_bytes = CONSTANT_ROUTE_CODE[:5] + b'\xb4' + CONSTANT_ROUTE_CODE[6:-4] + b'\xce\x56\xc6\x17'
self.check_events(file_bytes, [
self._event(test_id="subunit.parser", eof=True,
file_name="Packet data", file_bytes=file_bytes,
mime_type="application/octet-stream"),
self._event(test_id="subunit.parser", test_status="fail", eof=True,
file_name="Parser Error",
file_bytes=b'UTF8 string at offset 2 is not UTF8',
mime_type="text/plain;charset=utf8"),
])
def test_NULL_in_string(self):
file_bytes = CONSTANT_ROUTE_CODE[:6] + b'\x00' + CONSTANT_ROUTE_CODE[7:-4] + b'\xd7\x41\xac\xfe'
self.check_events(file_bytes, [
self._event(test_id="subunit.parser", eof=True,
file_name="Packet data", file_bytes=file_bytes,
mime_type="application/octet-stream"),
self._event(test_id="subunit.parser", test_status="fail", eof=True,
file_name="Parser Error",
file_bytes=b'UTF8 string at offset 2 contains NUL byte',
mime_type="text/plain;charset=utf8"),
])
def test_bad_utf8_stringlength(self):
file_bytes = CONSTANT_ROUTE_CODE[:4] + b'\x3f' + CONSTANT_ROUTE_CODE[5:-4] + b'\xbe\x29\xe0\xc2'
self.check_events(file_bytes, [
self._event(test_id="subunit.parser", eof=True,
file_name="Packet data", file_bytes=file_bytes,
mime_type="application/octet-stream"),
self._event(test_id="subunit.parser", test_status="fail", eof=True,
file_name="Parser Error",
file_bytes=b'UTF8 string at offset 2 extends past end of '
b'packet: claimed 63 bytes, 10 available',
mime_type="text/plain;charset=utf8"),
])
def test_route_code_and_file_content(self):
content = BytesIO()
subunit.StreamResultToBytes(content).status(
route_code='0', mime_type='text/plain', file_name='bar',
file_bytes=b'foo')
self.check_event(content.getvalue(), test_id=None, file_name='bar',
route_code='0', mime_type='text/plain', file_bytes=b'foo')

View File

@@ -0,0 +1,566 @@
#
# subunit: extensions to Python unittest to get test results from subprocesses.
# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
import csv
import datetime
import sys
import unittest
from testtools import TestCase
from testtools.compat import StringIO
from testtools.content import (
text_content,
TracebackContent,
)
from testtools.testresult.doubles import ExtendedTestResult
import subunit
import subunit.iso8601 as iso8601
import subunit.test_results
import testtools
class LoggingDecorator(subunit.test_results.HookedTestResultDecorator):
def __init__(self, decorated):
self._calls = 0
super(LoggingDecorator, self).__init__(decorated)
def _before_event(self):
self._calls += 1
class AssertBeforeTestResult(LoggingDecorator):
"""A TestResult for checking preconditions."""
def __init__(self, decorated, test):
self.test = test
super(AssertBeforeTestResult, self).__init__(decorated)
def _before_event(self):
self.test.assertEqual(1, self.earlier._calls)
super(AssertBeforeTestResult, self)._before_event()
class TimeCapturingResult(unittest.TestResult):
def __init__(self):
super(TimeCapturingResult, self).__init__()
self._calls = []
self.failfast = False
def time(self, a_datetime):
self._calls.append(a_datetime)
class TestHookedTestResultDecorator(unittest.TestCase):
def setUp(self):
# An end to the chain
terminal = unittest.TestResult()
# Asserts that the call was made to self.result before asserter was
# called.
asserter = AssertBeforeTestResult(terminal, self)
# The result object we call, which much increase its call count.
self.result = LoggingDecorator(asserter)
asserter.earlier = self.result
self.decorated = asserter
def tearDown(self):
# The hook in self.result must have been called
self.assertEqual(1, self.result._calls)
# The hook in asserter must have been called too, otherwise the
# assertion about ordering won't have completed.
self.assertEqual(1, self.decorated._calls)
def test_startTest(self):
self.result.startTest(self)
def test_startTestRun(self):
self.result.startTestRun()
def test_stopTest(self):
self.result.stopTest(self)
def test_stopTestRun(self):
self.result.stopTestRun()
def test_addError(self):
self.result.addError(self, subunit.RemoteError())
def test_addError_details(self):
self.result.addError(self, details={})
def test_addFailure(self):
self.result.addFailure(self, subunit.RemoteError())
def test_addFailure_details(self):
self.result.addFailure(self, details={})
def test_addSuccess(self):
self.result.addSuccess(self)
def test_addSuccess_details(self):
self.result.addSuccess(self, details={})
def test_addSkip(self):
self.result.addSkip(self, "foo")
def test_addSkip_details(self):
self.result.addSkip(self, details={})
def test_addExpectedFailure(self):
self.result.addExpectedFailure(self, subunit.RemoteError())
def test_addExpectedFailure_details(self):
self.result.addExpectedFailure(self, details={})
def test_addUnexpectedSuccess(self):
self.result.addUnexpectedSuccess(self)
def test_addUnexpectedSuccess_details(self):
self.result.addUnexpectedSuccess(self, details={})
def test_progress(self):
self.result.progress(1, subunit.PROGRESS_SET)
def test_wasSuccessful(self):
self.result.wasSuccessful()
def test_shouldStop(self):
self.result.shouldStop
def test_stop(self):
self.result.stop()
def test_time(self):
self.result.time(None)
class TestAutoTimingTestResultDecorator(unittest.TestCase):
def setUp(self):
# And end to the chain which captures time events.
terminal = TimeCapturingResult()
# The result object under test.
self.result = subunit.test_results.AutoTimingTestResultDecorator(
terminal)
self.decorated = terminal
def test_without_time_calls_time_is_called_and_not_None(self):
self.result.startTest(self)
self.assertEqual(1, len(self.decorated._calls))
self.assertNotEqual(None, self.decorated._calls[0])
def test_no_time_from_progress(self):
self.result.progress(1, subunit.PROGRESS_CUR)
self.assertEqual(0, len(self.decorated._calls))
def test_no_time_from_shouldStop(self):
self.decorated.stop()
self.result.shouldStop
self.assertEqual(0, len(self.decorated._calls))
def test_calling_time_inhibits_automatic_time(self):
# Calling time() outputs a time signal immediately and prevents
# automatically adding one when other methods are called.
time = datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc())
self.result.time(time)
self.result.startTest(self)
self.result.stopTest(self)
self.assertEqual(1, len(self.decorated._calls))
self.assertEqual(time, self.decorated._calls[0])
def test_calling_time_None_enables_automatic_time(self):
time = datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc())
self.result.time(time)
self.assertEqual(1, len(self.decorated._calls))
self.assertEqual(time, self.decorated._calls[0])
# Calling None passes the None through, in case other results care.
self.result.time(None)
self.assertEqual(2, len(self.decorated._calls))
self.assertEqual(None, self.decorated._calls[1])
# Calling other methods doesn't generate an automatic time event.
self.result.startTest(self)
self.assertEqual(3, len(self.decorated._calls))
self.assertNotEqual(None, self.decorated._calls[2])
def test_set_failfast_True(self):
self.assertFalse(self.decorated.failfast)
self.result.failfast = True
self.assertTrue(self.decorated.failfast)
class TestTagCollapsingDecorator(TestCase):
def test_tags_collapsed_outside_of_tests(self):
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
tag_collapser.tags(set(['a']), set())
tag_collapser.tags(set(['b']), set())
tag_collapser.startTest(self)
self.assertEquals(
[('tags', set(['a', 'b']), set([])),
('startTest', self),
], result._events)
def test_tags_collapsed_outside_of_tests_are_flushed(self):
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
tag_collapser.startTestRun()
tag_collapser.tags(set(['a']), set())
tag_collapser.tags(set(['b']), set())
tag_collapser.startTest(self)
tag_collapser.addSuccess(self)
tag_collapser.stopTest(self)
tag_collapser.stopTestRun()
self.assertEquals(
[('startTestRun',),
('tags', set(['a', 'b']), set([])),
('startTest', self),
('addSuccess', self),
('stopTest', self),
('stopTestRun',),
], result._events)
def test_tags_forwarded_after_tests(self):
test = subunit.RemotedTestCase('foo')
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
tag_collapser.startTestRun()
tag_collapser.startTest(test)
tag_collapser.addSuccess(test)
tag_collapser.stopTest(test)
tag_collapser.tags(set(['a']), set(['b']))
tag_collapser.stopTestRun()
self.assertEqual(
[('startTestRun',),
('startTest', test),
('addSuccess', test),
('stopTest', test),
('tags', set(['a']), set(['b'])),
('stopTestRun',),
],
result._events)
def test_tags_collapsed_inside_of_tests(self):
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
test = subunit.RemotedTestCase('foo')
tag_collapser.startTest(test)
tag_collapser.tags(set(['a']), set())
tag_collapser.tags(set(['b']), set(['a']))
tag_collapser.tags(set(['c']), set())
tag_collapser.stopTest(test)
self.assertEquals(
[('startTest', test),
('tags', set(['b', 'c']), set(['a'])),
('stopTest', test)],
result._events)
def test_tags_collapsed_inside_of_tests_different_ordering(self):
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
test = subunit.RemotedTestCase('foo')
tag_collapser.startTest(test)
tag_collapser.tags(set(), set(['a']))
tag_collapser.tags(set(['a', 'b']), set())
tag_collapser.tags(set(['c']), set())
tag_collapser.stopTest(test)
self.assertEquals(
[('startTest', test),
('tags', set(['a', 'b', 'c']), set()),
('stopTest', test)],
result._events)
def test_tags_sent_before_result(self):
# Because addSuccess and friends tend to send subunit output
# immediately, and because 'tags:' before a result line means
# something different to 'tags:' after a result line, we need to be
# sure that tags are emitted before 'addSuccess' (or whatever).
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
test = subunit.RemotedTestCase('foo')
tag_collapser.startTest(test)
tag_collapser.tags(set(['a']), set())
tag_collapser.addSuccess(test)
tag_collapser.stopTest(test)
self.assertEquals(
[('startTest', test),
('tags', set(['a']), set()),
('addSuccess', test),
('stopTest', test)],
result._events)
class TestTimeCollapsingDecorator(TestCase):
def make_time(self):
# Heh heh.
return datetime.datetime(
2000, 1, self.getUniqueInteger(), tzinfo=iso8601.UTC)
def test_initial_time_forwarded(self):
# We always forward the first time event we see.
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
a_time = self.make_time()
tag_collapser.time(a_time)
self.assertEquals([('time', a_time)], result._events)
def test_time_collapsed_to_first_and_last(self):
# If there are many consecutive time events, only the first and last
# are sent through.
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
times = [self.make_time() for i in range(5)]
for a_time in times:
tag_collapser.time(a_time)
tag_collapser.startTest(subunit.RemotedTestCase('foo'))
self.assertEquals(
[('time', times[0]), ('time', times[-1])], result._events[:-1])
def test_only_one_time_sent(self):
# If we receive a single time event followed by a non-time event, we
# send exactly one time event.
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
a_time = self.make_time()
tag_collapser.time(a_time)
tag_collapser.startTest(subunit.RemotedTestCase('foo'))
self.assertEquals([('time', a_time)], result._events[:-1])
def test_duplicate_times_not_sent(self):
# Many time events with the exact same time are collapsed into one
# time event.
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
a_time = self.make_time()
for i in range(5):
tag_collapser.time(a_time)
tag_collapser.startTest(subunit.RemotedTestCase('foo'))
self.assertEquals([('time', a_time)], result._events[:-1])
def test_no_times_inserted(self):
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
a_time = self.make_time()
tag_collapser.time(a_time)
foo = subunit.RemotedTestCase('foo')
tag_collapser.startTest(foo)
tag_collapser.addSuccess(foo)
tag_collapser.stopTest(foo)
self.assertEquals(
[('time', a_time),
('startTest', foo),
('addSuccess', foo),
('stopTest', foo)], result._events)
class TestByTestResultTests(testtools.TestCase):
def setUp(self):
super(TestByTestResultTests, self).setUp()
self.log = []
self.result = subunit.test_results.TestByTestResult(self.on_test)
if sys.version_info >= (3, 0):
self.result._now = iter(range(5)).__next__
else:
self.result._now = iter(range(5)).next
def assertCalled(self, **kwargs):
defaults = {
'test': self,
'tags': set(),
'details': None,
'start_time': 0,
'stop_time': 1,
}
defaults.update(kwargs)
self.assertEqual([defaults], self.log)
def on_test(self, **kwargs):
self.log.append(kwargs)
def test_no_tests_nothing_reported(self):
self.result.startTestRun()
self.result.stopTestRun()
self.assertEqual([], self.log)
def test_add_success(self):
self.result.startTest(self)
self.result.addSuccess(self)
self.result.stopTest(self)
self.assertCalled(status='success')
def test_add_success_details(self):
self.result.startTest(self)
details = {'foo': 'bar'}
self.result.addSuccess(self, details=details)
self.result.stopTest(self)
self.assertCalled(status='success', details=details)
def test_tags(self):
if not getattr(self.result, 'tags', None):
self.skipTest("No tags in testtools")
self.result.tags(['foo'], [])
self.result.startTest(self)
self.result.addSuccess(self)
self.result.stopTest(self)
self.assertCalled(status='success', tags=set(['foo']))
def test_add_error(self):
self.result.startTest(self)
try:
1/0
except ZeroDivisionError:
error = sys.exc_info()
self.result.addError(self, error)
self.result.stopTest(self)
self.assertCalled(
status='error',
details={'traceback': TracebackContent(error, self)})
def test_add_error_details(self):
self.result.startTest(self)
details = {"foo": text_content("bar")}
self.result.addError(self, details=details)
self.result.stopTest(self)
self.assertCalled(status='error', details=details)
def test_add_failure(self):
self.result.startTest(self)
try:
self.fail("intentional failure")
except self.failureException:
failure = sys.exc_info()
self.result.addFailure(self, failure)
self.result.stopTest(self)
self.assertCalled(
status='failure',
details={'traceback': TracebackContent(failure, self)})
def test_add_failure_details(self):
self.result.startTest(self)
details = {"foo": text_content("bar")}
self.result.addFailure(self, details=details)
self.result.stopTest(self)
self.assertCalled(status='failure', details=details)
def test_add_xfail(self):
self.result.startTest(self)
try:
1/0
except ZeroDivisionError:
error = sys.exc_info()
self.result.addExpectedFailure(self, error)
self.result.stopTest(self)
self.assertCalled(
status='xfail',
details={'traceback': TracebackContent(error, self)})
def test_add_xfail_details(self):
self.result.startTest(self)
details = {"foo": text_content("bar")}
self.result.addExpectedFailure(self, details=details)
self.result.stopTest(self)
self.assertCalled(status='xfail', details=details)
def test_add_unexpected_success(self):
self.result.startTest(self)
details = {'foo': 'bar'}
self.result.addUnexpectedSuccess(self, details=details)
self.result.stopTest(self)
self.assertCalled(status='success', details=details)
def test_add_skip_reason(self):
self.result.startTest(self)
reason = self.getUniqueString()
self.result.addSkip(self, reason)
self.result.stopTest(self)
self.assertCalled(
status='skip', details={'reason': text_content(reason)})
def test_add_skip_details(self):
self.result.startTest(self)
details = {'foo': 'bar'}
self.result.addSkip(self, details=details)
self.result.stopTest(self)
self.assertCalled(status='skip', details=details)
def test_twice(self):
self.result.startTest(self)
self.result.addSuccess(self, details={'foo': 'bar'})
self.result.stopTest(self)
self.result.startTest(self)
self.result.addSuccess(self)
self.result.stopTest(self)
self.assertEqual(
[{'test': self,
'status': 'success',
'start_time': 0,
'stop_time': 1,
'tags': set(),
'details': {'foo': 'bar'}},
{'test': self,
'status': 'success',
'start_time': 2,
'stop_time': 3,
'tags': set(),
'details': None},
],
self.log)
class TestCsvResult(testtools.TestCase):
def parse_stream(self, stream):
stream.seek(0)
reader = csv.reader(stream)
return list(reader)
def test_csv_output(self):
stream = StringIO()
result = subunit.test_results.CsvResult(stream)
if sys.version_info >= (3, 0):
result._now = iter(range(5)).__next__
else:
result._now = iter(range(5)).next
result.startTestRun()
result.startTest(self)
result.addSuccess(self)
result.stopTest(self)
result.stopTestRun()
self.assertEqual(
[['test', 'status', 'start_time', 'stop_time'],
[self.id(), 'success', '0', '1'],
],
self.parse_stream(stream))
def test_just_header_when_no_tests(self):
stream = StringIO()
result = subunit.test_results.CsvResult(stream)
result.startTestRun()
result.stopTestRun()
self.assertEqual(
[['test', 'status', 'start_time', 'stop_time']],
self.parse_stream(stream))
def test_no_output_before_events(self):
stream = StringIO()
subunit.test_results.CsvResult(stream)
self.assertEqual([], self.parse_stream(stream))