Python unittest 模块,mock() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用unittest.mock()。
def tests_event_registration(self):
"""Tests that events register correctly."""
# Get the event controller
events = self.abode.events
self.assertIsNotNone(events)
# Create mock callback
callback = Mock()
# Test that a valid event registers
self.assertTrue(
events.add_event_callback(TIMELINE.ALARM_GROUP, callback))
# Test that no event group returns false
self.assertFalse(events.add_event_callback(None, callback))
# Test that an invalid event throws exception
with self.assertRaises(abodepy.AbodeException):
events.add_event_callback("lol", callback)
def tests_timeline_registration(self):
"""Tests that timeline events register correctly."""
# Get the event controller
events = self.abode.events
self.assertIsNotNone(events)
# Create mock callback
callback = Mock()
# Test that a valid timeline event registers
self.assertTrue(
events.add_timeline_callback(
TIMELINE.CAPTURE_IMAGE, callback))
# Test that no timeline event returns false
self.assertFalse(events.add_timeline_callback(None, callback))
# Test that an invalid timeline event string throws exception
with self.assertRaises(abodepy.AbodeException):
events.add_timeline_callback("lol", callback)
# Test that an invalid timeline event dict throws exception
with self.assertRaises(abodepy.AbodeException):
events.add_timeline_callback({"lol": "lol"}, callback)
def tests_multi_events_callback(self):
"""Tests that multiple event updates callback correctly."""
# Get the event controller
events = self.abode.events
self.assertIsNotNone(events)
# Create mock callback
callback = Mock()
# Register our events
self.assertTrue(
events.add_event_callback(
[TIMELINE.ALARM_GROUP, TIMELINE.CAPTURE_GROUP],
callback))
# Call our events callback method and trigger a capture group event
# pylint: disable=protected-access
event_json = json.loads(IRCAMERA.timeline_event())
events._on_timeline_update(event_json)
# Ensure our callback was called
callback.assert_called_with(event_json)
def tests_multi_timeline_callback(self):
"""Tests that multiple timeline updates callback correctly."""
# Get the event controller
events = self.abode.events
self.assertIsNotNone(events)
# Create mock callback
callback = Mock()
# Register our events
self.assertTrue(
events.add_timeline_callback(
[TIMELINE.CAPTURE_IMAGE, TIMELINE.OPENED], callback))
# Call our events callback method and trigger a capture group event
# pylint: disable=protected-access
event_json = json.loads(IRCAMERA.timeline_event())
events._on_timeline_update(event_json)
# Ensure our callback was called
callback.assert_called_with(event_json)
def tests_automations_callback(self):
"""Tests that automation updates callback correctly."""
# Get the event controller
events = self.abode.events
self.assertIsNotNone(events)
# Create mock callbacks
automation_callback = Mock()
# Register our events
self.assertTrue(
events.add_event_callback(
TIMELINE.AUTOMATION_EDIT_GROUP, automation_callback))
# Call our events callback method and trigger a capture group event
# pylint: disable=protected-access
events._on_automation_update('{}')
# Our capture callback should get one, but our alarm should not
automation_callback.assert_called_with('{}')
def test_subscription_recording_processor_for_request(self):
replaced_subscription_id = str(uuid.uuid4())
rp = SubscriptionRecordingProcessor(replaced_subscription_id)
uri_templates = ['https://management.azure.com/subscriptions/{}/providers/Microsoft.ContainerRegistry/'
'checkNameAvailability?api-version=2017-03-01',
'https://graph.windows.net/{}/applications?api-version=1.6']
for template in uri_templates:
mock_sub_id = str(uuid.uuid4())
mock_request = mock.Mock()
mock_request.uri = template.format(mock_sub_id)
mock_request.body = self._mock_subscription_request_body(mock_sub_id)
rp.process_request(mock_request)
self.assertEqual(mock_request.uri, template.format(replaced_subscription_id))
self.assertEqual(mock_request.body,
self._mock_subscription_request_body(replaced_subscription_id))
def setUp(self):
# disable logging while testing
logging.disable(logging.CRITICAL)
self.patched = {}
if hasattr(self, 'patch_these'):
for patch_this in self.patch_these:
namespace = patch_this[0] if isinstance(patch_this, (list, set)) else patch_this
patcher = mock.patch(namespace)
mocked = patcher.start()
mocked.reset_mock()
self.patched[namespace] = mocked
if isinstance(patch_this, (list, set)) and len(patch_this) > 0:
retval = patch_this[1]
if callable(retval):
retval = retval()
mocked.return_value = retval
def test_reset_kill_reconnect(self):
client = AprsClient(aprs_user='testuser', aprs_filter='')
client.connect()
# .run() should be allowed to execute after .connect()
mock_callback = mock.MagicMock(
side_effect=lambda raw_msg: client.disconnect())
self.assertFalse(client._kill)
client.run(callback=mock_callback, autoreconnect=True)
# After .disconnect(), client._kill should be True
self.assertTrue(client._kill)
self.assertEqual(mock_callback.call_count, 1)
# After we reconnect, .run() should be able to run again
mock_callback.reset_mock()
client.connect()
client.run(callback=mock_callback, autoreconnect=True)
self.assertEqual(mock_callback.call_count, 1)
def test_normal_execution(self, mock_isfile, mock_glob):
""" Test the normal behaviour of the function.
"""
# Set the mocked function returned values.
mock_isfile.side_effect = [True]
mock_glob.return_value = ["/my/path/mock_output"]
# Test execution
fslreorient2std(**self.kwargs)
self.assertEqual([
mock.call(["which", "fslreorient2std"],
env={}, stderr=-1, stdout=-1),
mock.call(["fslreorient2std",
self.kwargs["input_image"],
self.kwargs["output_image"]],
cwd=None, env={}, stderr=-1, stdout=-1)],
self.mock_popen.call_args_list)
self.assertEqual(len(self.mock_env.call_args_list), 1)
def mock_ioctl(fd, command, msg):
print("Mocking ioctl")
assert fd == MOCK_FD
assert command is not None
# Reproduce ioctl read operations
if command == I2C_SMBUS and msg.read_write == I2C_SMBUS_READ:
offset = msg.command
if msg.size == I2C_SMBUS_BYTE_DATA:
msg.data.contents.byte = test_buffer[offset]
elif msg.size == I2C_SMBUS_WORD_DATA:
msg.data.contents.word = test_buffer[offset+1]*256 + test_buffer[offset]
elif msg.size == I2C_SMBUS_I2C_BLOCK_DATA:
for k in range(msg.data.contents.byte):
msg.data.contents.block[k+1] = test_buffer[offset+k]
# Override open, close and ioctl with our mock functions
def test_checkOtp(self, mock):
reply = {"requestId": "123", "status": "0000", "statusMessage": "Success", "credentialId": "",
"credentialType": "STANDARD_OTP", "authContext": {"params": {"Key": "authLevel.level", "Value": 1}}}
mock.SymantecUserServices.checkOtp.return_value = Mock()
mock.checkOtp.return_value.hash.return_value = reply
response = symantec_package.lib.userService.SymantecUserServices.checkOut("checkOutyou")
self.assertTrue(response.hash() != reply)
response = symantec_package.lib.userService.SymantecUserServices.checkOtp("PARAMS")
self.assertTrue((response.hash()) == reply)
self.assertTrue(response.hash()["status"] == "0000")
self.assertTrue(response.hash()['requestId'] == "123")
self.assertTrue(response.hash()['statusMessage'] == "Success")
self.assertTrue(response.hash()['credentialId'] == "")
self.assertTrue(response.hash()['credentialType'] == "STANDARD_OTP")
self.assertTrue(response.hash()['authContext']['params']['Key'] == "authLevel.level")
self.assertTrue(response.hash()['authContext']['params']['Value'] == 1)
pass
def test_poll_in_Push(self, mock):
reply = {"requestId": "ac123", "status": "6040", "statusMessage": "Mobile push request sent",
"pushDetail": {"pushCredentialId": "133709001", "pushSent": True},
"transactionId": "RealTransactionId",
"authContext": {"params": {"Key": "authLevel.level", "Value": 10}}}
mock.SymantecServices.authenticateUserWithPushThenPolling.return_value = Mock()
mock.authenticateUserWithPushThenPolling.return_value.hash.return_value = reply
response = symantec_package.lib.allServices.SymantecServices.authenticateUserWithNothing()
self.assertTrue(response.hash() != reply)
response = symantec_package.lib.allServices.SymantecServices.authenticateUserWithPushThenPolling("Parameters Here!")
self.assertTrue((response.hash()) == reply)
self.assertTrue(response.hash()["status"] == "6040")
self.assertTrue(response.hash()['requestId'] == "ac123")
self.assertTrue(response.hash()['statusMessage'] == "Mobile push request sent")
self.assertTrue(response.hash()["pushDetail"]['pushCredentialId'] == "133709001")
self.assertTrue(response.hash()["pushDetail"]['pushSent'] is True)
self.assertTrue(response.hash()['transactionId'] == "RealTransactionId")
self.assertTrue(response.hash()['authContext']['params']['Key'] == "authLevel.level")
self.assertTrue(response.hash()['authContext']['params']['Value'] == 10)
pass
def test_mock_create_user(self, mock_managementservices):
reply = {'requestId': 'create_123', 'status': '0000', 'statusMessage': 'Success'}
# Configure the mock to return a response with an OK status code. Also, the mock should have
# a `json()` method that returns a list of todos.
mock_managementservices.createUser.return_value = Mock()
mock_managementservices.createUser.return_value.json.return_value = reply
# Call the service, which will send a request to the server.
response = symantec_package.lib.managementService.SymantecManagementServices.createUser("create_123",
"new_user3")
print(response.json())
# If the request is sent successfully, then I expect a response to be returned.
self.assertTrue((response.json()) == reply)
self.assertTrue((response.json()["status"] == "0000"))
pass
def test_mock_add_STANDARDOTP_credential(self, mock_managementservices):
reply = {'statusMessage': "Success", 'requestId': 'add_otp_cred', 'status': '0000'}
# Configure the mock to return a response with an OK status code. Also, the mock should have
# a `json()` method that returns a list of todos.
mock_managementservices.addCredentialOtp.return_value = Mock()
mock_managementservices.addCredentialOtp.return_value.json.return_value = reply
response = symantec_package.lib.managementService.SymantecManagementServices.addCredentialOtp("add_otp_cred", "new_user3",
"", "STANDARD_OTP", \
"678066") # change with what's on your device
print(response.json())
# If the request is sent successfully, then I expect a response to be returned.
self.assertTrue((response.json()) == reply)
self.assertTrue((response.json()["status"] == "0000"))
pass
def test_mock_update_STANDARDOTP_credential(self, mock_managementservices):
reply = {'statusMessage': 'Success', 'requestId': 'update_123', 'status': '0000'}
# Configure the mock to return a response with an OK status code. Also, the mock should have
# a `json()` method that returns a list of todos.
mock_managementservices.updateCredential.return_value = Mock()
mock_managementservices.updateCredential.return_value.json.return_value = reply
response = symantec_package.lib.managementService.SymantecManagementServices.updateCredential("update_123", "gabe_phone",
"", "STANDARD_OTP",
"My personal cell phone")
print(response.json())
# If the request is sent successfully, then I expect a response to be returned.
self.assertTrue((response.json()) == reply)
self.assertTrue((response.json()["status"] == "0000"))
pass
def test_mock_setTemporaryPasswordSMSDelivery(self, mock_managementservices):
reply = {'status': '0000', 'requestId': 'setTempPWD', 'statusMessage': 'Success', 'temporaryPassword': '998241'}
# Configure the mock to return a response with an OK status code. Also, the mock should have
# a `json()` method that returns a list of todos.
mock_managementservices.setTemporaryPasswordSMSDelivery.return_value = Mock()
mock_managementservices.setTemporaryPasswordSMSDelivery.return_value.json.return_value = reply
response = symantec_package.lib.managementService.SymantecManagementServices.setTemporaryPasswordSMSDelivery("setTempPWD",
"gabe_phone",
"12313608781",
"17879481605")
print(response.json())
# If the request is sent successfully, then I expect a response to be returned.
self.assertTrue((response.json()) == reply)
self.assertTrue((response.json()["status"] == "0000"))
pass
def request_callback(self, request):
logger.debug('Mock request {} {}'.format(request.method, request.url))
path = self.build_path(request.method, request.url)
if os.path.exists(path):
# Load local file
logger.info('Using mock file {}'.format(path))
with gzip.open(path, 'rb') as f:
response = pickle.load(f)
else:
# Build from actual request
logger.info('Building mock file {}'.format(path))
response = self.real_request(request)
# Save in local file for future use
with gzip.open(path, 'wb') as f:
# Use old pickle ascii protocol (default)
# to be compatible with Python 2
f.write(pickle.dumps(response, protocol=2))
return (
response['status'],
response['headers'],
response['body'],
)
def test_mkdir_p(self, path_mocker_stopall):
# mock
mock_logger_info = path_mocker_stopall.MagicMock(name="mock_logger_info")
mock_dir_exists = path_mocker_stopall.MagicMock(name="mock_dir_exists")
mock_path = path_mocker_stopall.MagicMock(name="mock_path")
# patch
path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'info', mock_logger_info)
path_mocker_stopall.patch.object(scarlett_os.internal.path, 'dir_exists', mock_dir_exists)
path_mocker_stopall.patch.object(scarlett_os.internal.path, 'Path', mock_path)
path = '/home/pi/dev/bossjones-github/scarlett_os/_debug'
mock_dir_exists.return_value = True
# run test
s_path.mkdir_p(path)
# assert
assert mock_logger_info.call_count == 1
mock_path.assert_called_once_with(path)
# from scarlett_os.internal.debugger import dump
mock_path().mkdir.assert_any_call(parents=True, exist_ok=True)
mock_logger_info.assert_any_call("Verify mkdir_p ran: {}".format(mock_dir_exists.return_value))
def test_dir_exists_false(self, path_mocker_stopall):
# mock
mock_logger_error = path_mocker_stopall.MagicMock(name="mock_logger_error")
mock_path = path_mocker_stopall.MagicMock(name="mock_path")
# patch
path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'error', mock_logger_error)
path_mocker_stopall.patch.object(scarlett_os.internal.path, 'Path', mock_path)
path = '/home/pi/dev/bossjones-github/scarlett_os/_debug'
mock_path_instance = mock_path()
mock_path_instance.is_dir.return_value = False
# run test
s_path.dir_exists(path)
# assert
assert mock_logger_error.call_count == 1
assert mock_path_instance.is_dir.call_count == 2
mock_logger_error.assert_any_call("This is not a dir: {}".format(path))
def test_dir_exists_true(self, path_mocker_stopall):
# mock
mock_logger_error = path_mocker_stopall.MagicMock(name="mock_logger_error")
mock_path = path_mocker_stopall.MagicMock(name="mock_path")
# patch
path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'error', mock_logger_error)
path_mocker_stopall.patch.object(scarlett_os.internal.path, 'Path', mock_path)
path = '/home/pi/dev/bossjones-github/scarlett_os/_debug'
mock_path_instance = mock_path()
#
mock_path_instance.is_dir.return_value = True
# run test
s_path.dir_exists(path)
# assert
assert mock_logger_error.call_count == 0
assert mock_path_instance.is_dir.call_count == 2
mock_logger_error.assert_not_called()
def test_mkdir_if_does_not_exist_false(self, path_mocker_stopall):
# mock
mock_mkdir_p = path_mocker_stopall.MagicMock(name="mock_mkdir_p")
mock_dir_exists = path_mocker_stopall.MagicMock(name="mock_dir_exists", return_value=False)
# patch
path_mocker_stopall.patch.object(scarlett_os.internal.path, 'mkdir_p', mock_mkdir_p)
path_mocker_stopall.patch.object(scarlett_os.internal.path, 'dir_exists', mock_dir_exists)
path = '/home/pi/dev/bossjones-github/scarlett_os/_debug'
# run test
result = s_path.mkdir_if_does_not_exist(path)
# assert
assert mock_mkdir_p.call_count == 1
assert mock_dir_exists.call_count == 1
assert result == True
def test_mkdir_if_does_not_exist_true(self, path_mocker_stopall):
# mock
mock_mkdir_p = path_mocker_stopall.MagicMock(name="mock_mkdir_p")
mock_dir_exists = path_mocker_stopall.MagicMock(name="mock_dir_exists", return_value=True)
# patch
path_mocker_stopall.patch.object(scarlett_os.internal.path, 'mkdir_p', mock_mkdir_p)
path_mocker_stopall.patch.object(scarlett_os.internal.path, 'dir_exists', mock_dir_exists)
path = '/home/pi/dev/bossjones-github/scarlett_os/_debug'
# run test
result = s_path.mkdir_if_does_not_exist(path)
# assert
assert mock_mkdir_p.call_count == 0
assert mock_dir_exists.call_count == 1
assert result == False
def test_fname_exists_false(self, path_mocker_stopall):
# mock
mock_path = path_mocker_stopall.MagicMock(name="mock_path")
# patch
path_mocker_stopall.patch.object(scarlett_os.internal.path, 'Path', mock_path)
path = '/home/pi/dev/bossjones-github/scarlett_os/_debug/generator.dot'
mock_path_instance = mock_path()
mock_path_instance.exists.return_value = False
# run test
result = s_path.fname_exists(path)
assert mock_path_instance.exists.call_count == 1
mock_path_instance.exists.assert_called_once_with()
mock_path.assert_any_call(path)
assert result == False
def test_dir_isWritable(self, path_mocker_stopall):
# mock
mock_os_access = path_mocker_stopall.MagicMock(name="mock_os_access")
mock_os_path_isdir = path_mocker_stopall.MagicMock(name="mock_os_path_isdir")
# patch
path_mocker_stopall.patch.object(scarlett_os.internal.path.os, 'access', mock_os_access)
path_mocker_stopall.patch.object(scarlett_os.internal.path.os.path, 'isdir', mock_os_path_isdir)
path = 'file:///tmp'
# patch return values
mock_os_path_isdir.return_value = True
mock_os_access.return_value = True
# run test
result = s_path.isWritable(path)
# tests
mock_os_path_isdir.assert_called_once_with('file:///tmp')
mock_os_access.assert_called_once_with('file:///tmp', os.W_OK)
assert result == True
def test_unicode_decode_error_isWritable(self, path_mocker_stopall):
# mock
mock_os_path_isdir = path_mocker_stopall.MagicMock(name="mock_os_path_isdir")
mock_os_access = path_mocker_stopall.MagicMock(name="mock_os_access")
mock_unicode_error_dialog = path_mocker_stopall.MagicMock(name="mock_unicode_error_dialog")
mock_logger_error = path_mocker_stopall.MagicMock(name="mock_logger_error")
# patch
path_mocker_stopall.patch.object(scarlett_os.internal.path.os.path, 'isdir', mock_os_path_isdir)
path_mocker_stopall.patch.object(scarlett_os.internal.path.os, 'access', mock_os_access)
path_mocker_stopall.patch.object(scarlett_os.internal.path, 'unicode_error_dialog', mock_unicode_error_dialog)
path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'error', mock_logger_error)
path = b'file:///tmp/fake_file'
# patch return values
mock_os_path_isdir.side_effect = UnicodeDecodeError('', b'', 1, 0, '')
s_path.isWritable(path)
assert mock_unicode_error_dialog.call_count == 1
def test_unicode_error_dialog(self, path_mocker_stopall):
# mock
mock_logger_error = path_mocker_stopall.MagicMock(name="mock_logger_error")
# patch
path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'error', mock_logger_error)
s_path.unicode_error_dialog()
assert mock_logger_error.call_count == 1
_message = _("The system's locale that you are using is not UTF-8 capable. "
"Unicode support is required for Python3 software like Pitivi. "
"Please correct your system settings; if you try to use Pitivi "
"with a broken locale, weird bugs will happen.")
mock_logger_error.assert_any_call(_message)
def __get_result_set_from_mock(mock, query):
result_wrapper = Mock()
influx_points = []
result = re.search(r"(\d{19}).*?(\d{19})", query)
start, end = result.groups()
# extract range
points = list(filter(lambda point: point["time"] > int(start) and point["time"] < int(end),
mock.points))
country_result = re.search(r"\"country\"=\'(\w+)\'", query)
if country_result:
country = country_result.groups()[0]
points = list(filter(lambda point: point["tags"]["country"] == country, points))
for point in points:
d = {**point["fields"], **point.get("tags", {})}
d["time"] = datetime.utcfromtimestamp(point["time"] // 1000000000).strftime('%Y-%m-%dT%H:%M:%SZ')
influx_points.append(d)
result_wrapper.get_points.return_value = influx_points
return result_wrapper
def test_get_all(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name
))
) as mock:
ret = getattr(nb, self.name).all()
self.assertTrue(ret)
self.assertTrue(isinstance(ret, list))
self.assertTrue(isinstance(ret[0], self.ret))
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
def test_filter(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name
))
) as mock:
ret = getattr(nb, self.name).filter(pk=1)
self.assertTrue(ret)
self.assertTrue(isinstance(ret, list))
self.assertTrue(isinstance(ret[0], self.ret))
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/?pk=1'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
def test_get(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name[:-1]
))
) as mock:
ret = getattr(nb, self.name).get(1)
self.assertTrue(ret)
self.assertTrue(isinstance(ret, self.ret))
self.assertTrue(isinstance(str(ret), str))
self.assertTrue(isinstance(dict(ret), dict))
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/1/'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
def test_delete(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name[:-1]
))
) as mock, patch('pynetbox.lib.query.requests.delete') as delete:
ret = getattr(nb, self.name).get(1)
self.assertTrue(ret.delete())
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/1/'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
delete.assert_called_with(
'http://localhost:8000/api/{}/{}/1/'.format(
self.app,
self.name.replace('_', '-')
),
headers=AUTH_HEADERS
)
def test_create_device_bulk(self, mock):
data = [
{
'name': 'test-device',
'site': 1,
'device_type': 1,
'device_role': 1,
},
{
'name': 'test-device1',
'site': 1,
'device_type': 1,
'device_role': 1,
},
]
ret = nb.devices.create(data)
self.assertTrue(ret)
self.assertTrue(len(ret), 2)
def test_get_all(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name
))
) as mock:
ret = getattr(nb, self.name).all()
self.assertTrue(ret)
self.assertTrue(isinstance(ret, list))
self.assertTrue(isinstance(ret[0], self.ret))
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
def test_get(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name[:-1]
))
) as mock:
ret = getattr(nb, self.name).get(1)
self.assertTrue(ret)
self.assertTrue(isinstance(ret, self.ret))
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/1/'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
def test_get_all(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name
))
) as mock:
ret = getattr(nb, self.name).all()
self.assertTrue(ret)
self.assertTrue(isinstance(ret, list))
self.assertTrue(isinstance(ret[0], self.ret))
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
def test_filter(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name
))
) as mock:
ret = getattr(nb, self.name).filter(pk=1)
self.assertTrue(ret)
self.assertTrue(isinstance(ret, list))
self.assertTrue(isinstance(ret[0], self.ret))
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/?pk=1'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
def test_get(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name[:-1]
))
) as mock:
ret = getattr(nb, self.name).get(1)
self.assertTrue(ret)
self.assertTrue(isinstance(ret, self.ret))
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/1/'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
def test_get_all(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name
))
) as mock:
ret = getattr(nb, self.name).all()
self.assertTrue(ret)
self.assertTrue(isinstance(ret, list))
self.assertTrue(isinstance(ret[0], self.ret))
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
def test_get(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name[:-1]
))
) as mock:
ret = getattr(nb, self.name).get(1)
self.assertTrue(ret)
self.assertTrue(isinstance(ret, self.ret))
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/1/'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
def test_get_all(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name
))
) as mock:
ret = getattr(nb, self.name).all()
self.assertTrue(ret)
self.assertTrue(isinstance(ret, list))
self.assertTrue(isinstance(ret[0], self.ret))
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
def test_filter(self):
with patch(
'pynetbox.lib.query.requests.get',
return_value=Response(fixture='{}/{}.json'.format(
self.app,
self.name
))
) as mock:
ret = getattr(nb, self.name).filter(pk=1)
self.assertTrue(ret)
self.assertTrue(isinstance(ret, list))
self.assertTrue(isinstance(ret[0], self.ret))
mock.assert_called_with(
'http://localhost:8000/api/{}/{}/?pk=1'.format(
self.app,
self.name.replace('_', '-')
),
headers=HEADERS
)
def test_update_no_active_player_makes_next_active(self):
# Setup, add some players to waiting list.
laser_players = players.Players(1)
laser_players.add_player('192.168.0.1')
# Check none are yet active.
self.assertEqual(None, laser_players.active_player())
# Setup, create mock callbacks that will remember how they were called.
start_active = unittest.mock.Mock()
end_active = unittest.mock.Mock()
# Call update with 1 second of time elapsed.
laser_players.update(0.5, start_active, end_active)
# Check start active was called with the first in line IP, and that the
# currently active player is the first one added (with 1 second of playtime
# left).
start_active.assert_called_once_with('192.168.0.1')
end_active.assert_not_called()
self.assertEqual(('192.168.0.1', 1), laser_players.active_player())
def test_update_moves_to_next_after_playetime_elapsed(self):
# Setup, add two players and make the first active.
laser_players = players.Players(1)
laser_players.add_player('192.168.0.1')
laser_players.add_player('192.168.0.2')
start_active = unittest.mock.Mock()
end_active = unittest.mock.Mock()
laser_players.update(1, start_active, end_active)
start_active.reset_mock()
end_active.reset_mock()
# Update with two seconds of time so the first player time is up, then
# check next player is active.
laser_players.update(2, start_active, end_active)
end_active.assert_called_once_with('192.168.0.1')
start_active.assert_called_once_with('192.168.0.2')
self.assertEqual(('192.168.0.2', 1), laser_players.active_player())
def test_uploading_syslog(self):
'''Tests syslog upload command'''
syslog_uploader_cli = ["syslog_uploader", "-c", self._ndr_config_file, TEST_SYSLOG_DATA]
with unittest.mock.patch.object(sys, 'argv', syslog_uploader_cli):
ndr.tools.syslog_uploader.main()
# Make sure there's only one file in the queue
outbound_queue = os.listdir(self._ncc.outgoing_upload_spool)
self.assertEqual(len(outbound_queue), 1)
this_msg = self._ncc.outgoing_upload_spool + "/" + outbound_queue[0]
loaded_msg = ndr.IngestMessage.verify_and_load_message(
self._ncc, this_msg, only_accept_cn="ndr_test_suite")
os.remove(this_msg)
self.assertEqual(loaded_msg.message_type, ndr.IngestMessageTypes.SYSLOG_UPLOAD)
syslog = ndr.SyslogUploadMessage().from_message(loaded_msg)
def test_uploading_status(self):
'''Tests uploading status messages'''
status_uploader_cli = ["status_uploader", "-c", self._ndr_config_file]
with unittest.mock.patch.object(sys, 'argv', status_uploader_cli):
ndr.tools.status.main()
# Make sure there's only one file in the queue
outbound_queue = os.listdir(self._ncc.outgoing_upload_spool)
self.assertEqual(len(outbound_queue), 1)
this_msg = self._ncc.outgoing_upload_spool + "/" + outbound_queue[0]
loaded_msg = ndr.IngestMessage.verify_and_load_message(
self._ncc, this_msg, only_accept_cn="ndr_test_suite")
os.remove(this_msg)
self.assertEqual(loaded_msg.message_type, ndr.IngestMessageTypes.STATUS)
def test_alert_tester(self):
'''Tests alert tester messages'''
alert_tester_cli = ["alert_tester", "-c", self._ndr_config_file]
with unittest.mock.patch.object(sys, 'argv', alert_tester_cli):
ndr.tools.alert_tester.main()
# Make sure there's only one file in the queue
outbound_queue = os.listdir(self._ncc.outgoing_upload_spool)
self.assertEqual(len(outbound_queue), 1)
this_msg = self._ncc.outgoing_upload_spool + "/" + outbound_queue[0]
loaded_msg = ndr.IngestMessage.verify_and_load_message(
self._ncc, this_msg, only_accept_cn="ndr_test_suite")
os.remove(this_msg)
self.assertEqual(loaded_msg.message_type, ndr.IngestMessageTypes.TEST_ALERT)
def test_pcap_processing(self):
'''Tests uploading and processing a pcap message'''
alert_tester_cli = ["pcap_processor", "-k", "-c", self._ndr_config_file, TSHARK_PCAP]
with unittest.mock.patch.object(sys, 'argv', alert_tester_cli):
ndr.tools.pcap_to_traffic_report.main()
# Make sure there's only one file in the queue
outbound_queue = os.listdir(self._ncc.outgoing_upload_spool)
self.assertEqual(len(outbound_queue), 1)
this_msg = self._ncc.outgoing_upload_spool + "/" + outbound_queue[0]
loaded_msg = ndr.IngestMessage.verify_and_load_message(
self._ncc, this_msg, only_accept_cn="ndr_test_suite")
os.remove(this_msg)
self.assertEqual(loaded_msg.message_type, ndr.IngestMessageTypes.TRAFFIC_REPORT)
def test_recursive_query_basic_failure(self):
resolver = dns.resolver.Resolver()
domain = dns.name.from_text('example.com.')
record_type = 'NS'
with unittest.mock.patch.object(fierce, 'query', return_value=None) as mock_method:
result = fierce.recursive_query(resolver, domain, record_type=record_type)
expected = [
unittest.mock.call(resolver, 'example.com.', record_type),
unittest.mock.call(resolver, 'com.', record_type),
unittest.mock.call(resolver, '', record_type),
]
mock_method.assert_has_calls(expected)
self.assertIsNone(result)
def test_recursive_query_long_domain_failure(self):
resolver = dns.resolver.Resolver()
domain = dns.name.from_text('sd1.sd2.example.com.')
record_type = 'NS'
with unittest.mock.patch.object(fierce, 'query', return_value=None) as mock_method:
result = fierce.recursive_query(resolver, domain, record_type=record_type)
expected = [
unittest.mock.call(resolver, 'sd1.sd2.example.com.', record_type),
unittest.mock.call(resolver, 'sd2.example.com.', record_type),
unittest.mock.call(resolver, 'example.com.', record_type),
unittest.mock.call(resolver, 'com.', record_type),
unittest.mock.call(resolver, '', record_type),
]
mock_method.assert_has_calls(expected)
self.assertIsNone(result)
def test_recursive_query_basic_success(self):
resolver = dns.resolver.Resolver()
domain = dns.name.from_text('example.com.')
record_type = 'NS'
good_response = unittest.mock.MagicMock()
side_effect = [
None,
good_response,
None,
]
with unittest.mock.patch.object(fierce, 'query', side_effect=side_effect) as mock_method:
result = fierce.recursive_query(resolver, domain, record_type=record_type)
expected = [
unittest.mock.call(resolver, 'example.com.', record_type),
unittest.mock.call(resolver, 'com.', record_type),
]
mock_method.assert_has_calls(expected)
self.assertEqual(result, good_response)