1# mock.py 2# Test tools for mocking and patching. 3# Maintained by Michael Foord 4# Backport for other versions of Python available from 5# https://pypi.org/project/mock 6 7__all__ = ( 8 'Mock', 9 'MagicMock', 10 'patch', 11 'sentinel', 12 'DEFAULT', 13 'ANY', 14 'call', 15 'create_autospec', 16 'AsyncMock', 17 'FILTER_DIR', 18 'NonCallableMock', 19 'NonCallableMagicMock', 20 'mock_open', 21 'PropertyMock', 22 'seal', 23) 24 25 26import asyncio 27import contextlib 28import io 29import inspect 30import pprint 31import sys 32import builtins 33import pkgutil 34from asyncio import iscoroutinefunction 35from types import CodeType, ModuleType, MethodType 36from unittest.util import safe_repr 37from functools import wraps, partial 38from threading import RLock 39 40 41class InvalidSpecError(Exception): 42 """Indicates that an invalid value was used as a mock spec.""" 43 44 45_builtins = {name for name in dir(builtins) if not name.startswith('_')} 46 47FILTER_DIR = True 48 49# Workaround for issue #12370 50# Without this, the __class__ properties wouldn't be set correctly 51_safe_super = super 52 53def _is_async_obj(obj): 54 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock): 55 return False 56 if hasattr(obj, '__func__'): 57 obj = getattr(obj, '__func__') 58 return iscoroutinefunction(obj) or inspect.isawaitable(obj) 59 60 61def _is_async_func(func): 62 if getattr(func, '__code__', None): 63 return iscoroutinefunction(func) 64 else: 65 return False 66 67 68def _is_instance_mock(obj): 69 # can't use isinstance on Mock objects because they override __class__ 70 # The base class for all mocks is NonCallableMock 71 return issubclass(type(obj), NonCallableMock) 72 73 74def _is_exception(obj): 75 return ( 76 isinstance(obj, BaseException) or 77 isinstance(obj, type) and issubclass(obj, BaseException) 78 ) 79 80 81def _extract_mock(obj): 82 # Autospecced functions will return a FunctionType with "mock" attribute 83 # which is the actual mock object that needs to be used. 84 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'): 85 return obj.mock 86 else: 87 return obj 88 89 90def _get_signature_object(func, as_instance, eat_self): 91 """ 92 Given an arbitrary, possibly callable object, try to create a suitable 93 signature object. 94 Return a (reduced func, signature) tuple, or None. 95 """ 96 if isinstance(func, type) and not as_instance: 97 # If it's a type and should be modelled as a type, use __init__. 98 func = func.__init__ 99 # Skip the `self` argument in __init__ 100 eat_self = True 101 elif isinstance(func, (classmethod, staticmethod)): 102 if isinstance(func, classmethod): 103 # Skip the `cls` argument of a class method 104 eat_self = True 105 # Use the original decorated method to extract the correct function signature 106 func = func.__func__ 107 elif not isinstance(func, FunctionTypes): 108 # If we really want to model an instance of the passed type, 109 # __call__ should be looked up, not __init__. 110 try: 111 func = func.__call__ 112 except AttributeError: 113 return None 114 if eat_self: 115 sig_func = partial(func, None) 116 else: 117 sig_func = func 118 try: 119 return func, inspect.signature(sig_func) 120 except ValueError: 121 # Certain callable types are not supported by inspect.signature() 122 return None 123 124 125def _check_signature(func, mock, skipfirst, instance=False): 126 sig = _get_signature_object(func, instance, skipfirst) 127 if sig is None: 128 return 129 func, sig = sig 130 def checksig(self, /, *args, **kwargs): 131 sig.bind(*args, **kwargs) 132 _copy_func_details(func, checksig) 133 type(mock)._mock_check_sig = checksig 134 type(mock).__signature__ = sig 135 136 137def _copy_func_details(func, funcopy): 138 # we explicitly don't copy func.__dict__ into this copy as it would 139 # expose original attributes that should be mocked 140 for attribute in ( 141 '__name__', '__doc__', '__text_signature__', 142 '__module__', '__defaults__', '__kwdefaults__', 143 ): 144 try: 145 setattr(funcopy, attribute, getattr(func, attribute)) 146 except AttributeError: 147 pass 148 149 150def _callable(obj): 151 if isinstance(obj, type): 152 return True 153 if isinstance(obj, (staticmethod, classmethod, MethodType)): 154 return _callable(obj.__func__) 155 if getattr(obj, '__call__', None) is not None: 156 return True 157 return False 158 159 160def _is_list(obj): 161 # checks for list or tuples 162 # XXXX badly named! 163 return type(obj) in (list, tuple) 164 165 166def _instance_callable(obj): 167 """Given an object, return True if the object is callable. 168 For classes, return True if instances would be callable.""" 169 if not isinstance(obj, type): 170 # already an instance 171 return getattr(obj, '__call__', None) is not None 172 173 # *could* be broken by a class overriding __mro__ or __dict__ via 174 # a metaclass 175 for base in (obj,) + obj.__mro__: 176 if base.__dict__.get('__call__') is not None: 177 return True 178 return False 179 180 181def _set_signature(mock, original, instance=False): 182 # creates a function with signature (*args, **kwargs) that delegates to a 183 # mock. It still does signature checking by calling a lambda with the same 184 # signature as the original. 185 186 skipfirst = isinstance(original, type) 187 result = _get_signature_object(original, instance, skipfirst) 188 if result is None: 189 return mock 190 func, sig = result 191 def checksig(*args, **kwargs): 192 sig.bind(*args, **kwargs) 193 _copy_func_details(func, checksig) 194 195 name = original.__name__ 196 if not name.isidentifier(): 197 name = 'funcopy' 198 context = {'_checksig_': checksig, 'mock': mock} 199 src = """def %s(*args, **kwargs): 200 _checksig_(*args, **kwargs) 201 return mock(*args, **kwargs)""" % name 202 exec (src, context) 203 funcopy = context[name] 204 _setup_func(funcopy, mock, sig) 205 return funcopy 206 207 208def _setup_func(funcopy, mock, sig): 209 funcopy.mock = mock 210 211 def assert_called_with(*args, **kwargs): 212 return mock.assert_called_with(*args, **kwargs) 213 def assert_called(*args, **kwargs): 214 return mock.assert_called(*args, **kwargs) 215 def assert_not_called(*args, **kwargs): 216 return mock.assert_not_called(*args, **kwargs) 217 def assert_called_once(*args, **kwargs): 218 return mock.assert_called_once(*args, **kwargs) 219 def assert_called_once_with(*args, **kwargs): 220 return mock.assert_called_once_with(*args, **kwargs) 221 def assert_has_calls(*args, **kwargs): 222 return mock.assert_has_calls(*args, **kwargs) 223 def assert_any_call(*args, **kwargs): 224 return mock.assert_any_call(*args, **kwargs) 225 def reset_mock(): 226 funcopy.method_calls = _CallList() 227 funcopy.mock_calls = _CallList() 228 mock.reset_mock() 229 ret = funcopy.return_value 230 if _is_instance_mock(ret) and not ret is mock: 231 ret.reset_mock() 232 233 funcopy.called = False 234 funcopy.call_count = 0 235 funcopy.call_args = None 236 funcopy.call_args_list = _CallList() 237 funcopy.method_calls = _CallList() 238 funcopy.mock_calls = _CallList() 239 240 funcopy.return_value = mock.return_value 241 funcopy.side_effect = mock.side_effect 242 funcopy._mock_children = mock._mock_children 243 244 funcopy.assert_called_with = assert_called_with 245 funcopy.assert_called_once_with = assert_called_once_with 246 funcopy.assert_has_calls = assert_has_calls 247 funcopy.assert_any_call = assert_any_call 248 funcopy.reset_mock = reset_mock 249 funcopy.assert_called = assert_called 250 funcopy.assert_not_called = assert_not_called 251 funcopy.assert_called_once = assert_called_once 252 funcopy.__signature__ = sig 253 254 mock._mock_delegate = funcopy 255 256 257def _setup_async_mock(mock): 258 mock._is_coroutine = asyncio.coroutines._is_coroutine 259 mock.await_count = 0 260 mock.await_args = None 261 mock.await_args_list = _CallList() 262 263 # Mock is not configured yet so the attributes are set 264 # to a function and then the corresponding mock helper function 265 # is called when the helper is accessed similar to _setup_func. 266 def wrapper(attr, /, *args, **kwargs): 267 return getattr(mock.mock, attr)(*args, **kwargs) 268 269 for attribute in ('assert_awaited', 270 'assert_awaited_once', 271 'assert_awaited_with', 272 'assert_awaited_once_with', 273 'assert_any_await', 274 'assert_has_awaits', 275 'assert_not_awaited'): 276 277 # setattr(mock, attribute, wrapper) causes late binding 278 # hence attribute will always be the last value in the loop 279 # Use partial(wrapper, attribute) to ensure the attribute is bound 280 # correctly. 281 setattr(mock, attribute, partial(wrapper, attribute)) 282 283 284def _is_magic(name): 285 return '__%s__' % name[2:-2] == name 286 287 288class _SentinelObject(object): 289 "A unique, named, sentinel object." 290 def __init__(self, name): 291 self.name = name 292 293 def __repr__(self): 294 return 'sentinel.%s' % self.name 295 296 def __reduce__(self): 297 return 'sentinel.%s' % self.name 298 299 300class _Sentinel(object): 301 """Access attributes to return a named object, usable as a sentinel.""" 302 def __init__(self): 303 self._sentinels = {} 304 305 def __getattr__(self, name): 306 if name == '__bases__': 307 # Without this help(unittest.mock) raises an exception 308 raise AttributeError 309 return self._sentinels.setdefault(name, _SentinelObject(name)) 310 311 def __reduce__(self): 312 return 'sentinel' 313 314 315sentinel = _Sentinel() 316 317DEFAULT = sentinel.DEFAULT 318_missing = sentinel.MISSING 319_deleted = sentinel.DELETED 320 321 322_allowed_names = { 323 'return_value', '_mock_return_value', 'side_effect', 324 '_mock_side_effect', '_mock_parent', '_mock_new_parent', 325 '_mock_name', '_mock_new_name' 326} 327 328 329def _delegating_property(name): 330 _allowed_names.add(name) 331 _the_name = '_mock_' + name 332 def _get(self, name=name, _the_name=_the_name): 333 sig = self._mock_delegate 334 if sig is None: 335 return getattr(self, _the_name) 336 return getattr(sig, name) 337 def _set(self, value, name=name, _the_name=_the_name): 338 sig = self._mock_delegate 339 if sig is None: 340 self.__dict__[_the_name] = value 341 else: 342 setattr(sig, name, value) 343 344 return property(_get, _set) 345 346 347 348class _CallList(list): 349 350 def __contains__(self, value): 351 if not isinstance(value, list): 352 return list.__contains__(self, value) 353 len_value = len(value) 354 len_self = len(self) 355 if len_value > len_self: 356 return False 357 358 for i in range(0, len_self - len_value + 1): 359 sub_list = self[i:i+len_value] 360 if sub_list == value: 361 return True 362 return False 363 364 def __repr__(self): 365 return pprint.pformat(list(self)) 366 367 368def _check_and_set_parent(parent, value, name, new_name): 369 value = _extract_mock(value) 370 371 if not _is_instance_mock(value): 372 return False 373 if ((value._mock_name or value._mock_new_name) or 374 (value._mock_parent is not None) or 375 (value._mock_new_parent is not None)): 376 return False 377 378 _parent = parent 379 while _parent is not None: 380 # setting a mock (value) as a child or return value of itself 381 # should not modify the mock 382 if _parent is value: 383 return False 384 _parent = _parent._mock_new_parent 385 386 if new_name: 387 value._mock_new_parent = parent 388 value._mock_new_name = new_name 389 if name: 390 value._mock_parent = parent 391 value._mock_name = name 392 return True 393 394# Internal class to identify if we wrapped an iterator object or not. 395class _MockIter(object): 396 def __init__(self, obj): 397 self.obj = iter(obj) 398 def __next__(self): 399 return next(self.obj) 400 401class Base(object): 402 _mock_return_value = DEFAULT 403 _mock_side_effect = None 404 def __init__(self, /, *args, **kwargs): 405 pass 406 407 408 409class NonCallableMock(Base): 410 """A non-callable version of `Mock`""" 411 412 # Store a mutex as a class attribute in order to protect concurrent access 413 # to mock attributes. Using a class attribute allows all NonCallableMock 414 # instances to share the mutex for simplicity. 415 # 416 # See https://github.com/python/cpython/issues/98624 for why this is 417 # necessary. 418 _lock = RLock() 419 420 def __new__(cls, /, *args, **kw): 421 # every instance has its own class 422 # so we can create magic methods on the 423 # class without stomping on other mocks 424 bases = (cls,) 425 if not issubclass(cls, AsyncMockMixin): 426 # Check if spec is an async object or function 427 bound_args = _MOCK_SIG.bind_partial(cls, *args, **kw).arguments 428 spec_arg = bound_args.get('spec_set', bound_args.get('spec')) 429 if spec_arg is not None and _is_async_obj(spec_arg): 430 bases = (AsyncMockMixin, cls) 431 new = type(cls.__name__, bases, {'__doc__': cls.__doc__}) 432 instance = _safe_super(NonCallableMock, cls).__new__(new) 433 return instance 434 435 436 def __init__( 437 self, spec=None, wraps=None, name=None, spec_set=None, 438 parent=None, _spec_state=None, _new_name='', _new_parent=None, 439 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 440 ): 441 if _new_parent is None: 442 _new_parent = parent 443 444 __dict__ = self.__dict__ 445 __dict__['_mock_parent'] = parent 446 __dict__['_mock_name'] = name 447 __dict__['_mock_new_name'] = _new_name 448 __dict__['_mock_new_parent'] = _new_parent 449 __dict__['_mock_sealed'] = False 450 451 if spec_set is not None: 452 spec = spec_set 453 spec_set = True 454 if _eat_self is None: 455 _eat_self = parent is not None 456 457 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self) 458 459 __dict__['_mock_children'] = {} 460 __dict__['_mock_wraps'] = wraps 461 __dict__['_mock_delegate'] = None 462 463 __dict__['_mock_called'] = False 464 __dict__['_mock_call_args'] = None 465 __dict__['_mock_call_count'] = 0 466 __dict__['_mock_call_args_list'] = _CallList() 467 __dict__['_mock_mock_calls'] = _CallList() 468 469 __dict__['method_calls'] = _CallList() 470 __dict__['_mock_unsafe'] = unsafe 471 472 if kwargs: 473 self.configure_mock(**kwargs) 474 475 _safe_super(NonCallableMock, self).__init__( 476 spec, wraps, name, spec_set, parent, 477 _spec_state 478 ) 479 480 481 def attach_mock(self, mock, attribute): 482 """ 483 Attach a mock as an attribute of this one, replacing its name and 484 parent. Calls to the attached mock will be recorded in the 485 `method_calls` and `mock_calls` attributes of this one.""" 486 inner_mock = _extract_mock(mock) 487 488 inner_mock._mock_parent = None 489 inner_mock._mock_new_parent = None 490 inner_mock._mock_name = '' 491 inner_mock._mock_new_name = None 492 493 setattr(self, attribute, mock) 494 495 496 def mock_add_spec(self, spec, spec_set=False): 497 """Add a spec to a mock. `spec` can either be an object or a 498 list of strings. Only attributes on the `spec` can be fetched as 499 attributes from the mock. 500 501 If `spec_set` is True then only attributes on the spec can be set.""" 502 self._mock_add_spec(spec, spec_set) 503 504 505 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False, 506 _eat_self=False): 507 if _is_instance_mock(spec): 508 raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]') 509 510 _spec_class = None 511 _spec_signature = None 512 _spec_asyncs = [] 513 514 for attr in dir(spec): 515 if iscoroutinefunction(getattr(spec, attr, None)): 516 _spec_asyncs.append(attr) 517 518 if spec is not None and not _is_list(spec): 519 if isinstance(spec, type): 520 _spec_class = spec 521 else: 522 _spec_class = type(spec) 523 res = _get_signature_object(spec, 524 _spec_as_instance, _eat_self) 525 _spec_signature = res and res[1] 526 527 spec = dir(spec) 528 529 __dict__ = self.__dict__ 530 __dict__['_spec_class'] = _spec_class 531 __dict__['_spec_set'] = spec_set 532 __dict__['_spec_signature'] = _spec_signature 533 __dict__['_mock_methods'] = spec 534 __dict__['_spec_asyncs'] = _spec_asyncs 535 536 def __get_return_value(self): 537 ret = self._mock_return_value 538 if self._mock_delegate is not None: 539 ret = self._mock_delegate.return_value 540 541 if ret is DEFAULT: 542 ret = self._get_child_mock( 543 _new_parent=self, _new_name='()' 544 ) 545 self.return_value = ret 546 return ret 547 548 549 def __set_return_value(self, value): 550 if self._mock_delegate is not None: 551 self._mock_delegate.return_value = value 552 else: 553 self._mock_return_value = value 554 _check_and_set_parent(self, value, None, '()') 555 556 __return_value_doc = "The value to be returned when the mock is called." 557 return_value = property(__get_return_value, __set_return_value, 558 __return_value_doc) 559 560 561 @property 562 def __class__(self): 563 if self._spec_class is None: 564 return type(self) 565 return self._spec_class 566 567 called = _delegating_property('called') 568 call_count = _delegating_property('call_count') 569 call_args = _delegating_property('call_args') 570 call_args_list = _delegating_property('call_args_list') 571 mock_calls = _delegating_property('mock_calls') 572 573 574 def __get_side_effect(self): 575 delegated = self._mock_delegate 576 if delegated is None: 577 return self._mock_side_effect 578 sf = delegated.side_effect 579 if (sf is not None and not callable(sf) 580 and not isinstance(sf, _MockIter) and not _is_exception(sf)): 581 sf = _MockIter(sf) 582 delegated.side_effect = sf 583 return sf 584 585 def __set_side_effect(self, value): 586 value = _try_iter(value) 587 delegated = self._mock_delegate 588 if delegated is None: 589 self._mock_side_effect = value 590 else: 591 delegated.side_effect = value 592 593 side_effect = property(__get_side_effect, __set_side_effect) 594 595 596 def reset_mock(self, visited=None,*, return_value=False, side_effect=False): 597 "Restore the mock object to its initial state." 598 if visited is None: 599 visited = [] 600 if id(self) in visited: 601 return 602 visited.append(id(self)) 603 604 self.called = False 605 self.call_args = None 606 self.call_count = 0 607 self.mock_calls = _CallList() 608 self.call_args_list = _CallList() 609 self.method_calls = _CallList() 610 611 if return_value: 612 self._mock_return_value = DEFAULT 613 if side_effect: 614 self._mock_side_effect = None 615 616 for child in self._mock_children.values(): 617 if isinstance(child, _SpecState) or child is _deleted: 618 continue 619 child.reset_mock(visited, return_value=return_value, side_effect=side_effect) 620 621 ret = self._mock_return_value 622 if _is_instance_mock(ret) and ret is not self: 623 ret.reset_mock(visited) 624 625 626 def configure_mock(self, /, **kwargs): 627 """Set attributes on the mock through keyword arguments. 628 629 Attributes plus return values and side effects can be set on child 630 mocks using standard dot notation and unpacking a dictionary in the 631 method call: 632 633 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError} 634 >>> mock.configure_mock(**attrs)""" 635 for arg, val in sorted(kwargs.items(), 636 # we sort on the number of dots so that 637 # attributes are set before we set attributes on 638 # attributes 639 key=lambda entry: entry[0].count('.')): 640 args = arg.split('.') 641 final = args.pop() 642 obj = self 643 for entry in args: 644 obj = getattr(obj, entry) 645 setattr(obj, final, val) 646 647 648 def __getattr__(self, name): 649 if name in {'_mock_methods', '_mock_unsafe'}: 650 raise AttributeError(name) 651 elif self._mock_methods is not None: 652 if name not in self._mock_methods or name in _all_magics: 653 raise AttributeError("Mock object has no attribute %r" % name) 654 elif _is_magic(name): 655 raise AttributeError(name) 656 if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods): 657 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')): 658 raise AttributeError( 659 f"{name!r} is not a valid assertion. Use a spec " 660 f"for the mock if {name!r} is meant to be an attribute.") 661 662 with NonCallableMock._lock: 663 result = self._mock_children.get(name) 664 if result is _deleted: 665 raise AttributeError(name) 666 elif result is None: 667 wraps = None 668 if self._mock_wraps is not None: 669 # XXXX should we get the attribute without triggering code 670 # execution? 671 wraps = getattr(self._mock_wraps, name) 672 673 result = self._get_child_mock( 674 parent=self, name=name, wraps=wraps, _new_name=name, 675 _new_parent=self 676 ) 677 self._mock_children[name] = result 678 679 elif isinstance(result, _SpecState): 680 try: 681 result = create_autospec( 682 result.spec, result.spec_set, result.instance, 683 result.parent, result.name 684 ) 685 except InvalidSpecError: 686 target_name = self.__dict__['_mock_name'] or self 687 raise InvalidSpecError( 688 f'Cannot autospec attr {name!r} from target ' 689 f'{target_name!r} as it has already been mocked out. ' 690 f'[target={self!r}, attr={result.spec!r}]') 691 self._mock_children[name] = result 692 693 return result 694 695 696 def _extract_mock_name(self): 697 _name_list = [self._mock_new_name] 698 _parent = self._mock_new_parent 699 last = self 700 701 dot = '.' 702 if _name_list == ['()']: 703 dot = '' 704 705 while _parent is not None: 706 last = _parent 707 708 _name_list.append(_parent._mock_new_name + dot) 709 dot = '.' 710 if _parent._mock_new_name == '()': 711 dot = '' 712 713 _parent = _parent._mock_new_parent 714 715 _name_list = list(reversed(_name_list)) 716 _first = last._mock_name or 'mock' 717 if len(_name_list) > 1: 718 if _name_list[1] not in ('()', '().'): 719 _first += '.' 720 _name_list[0] = _first 721 return ''.join(_name_list) 722 723 def __repr__(self): 724 name = self._extract_mock_name() 725 726 name_string = '' 727 if name not in ('mock', 'mock.'): 728 name_string = ' name=%r' % name 729 730 spec_string = '' 731 if self._spec_class is not None: 732 spec_string = ' spec=%r' 733 if self._spec_set: 734 spec_string = ' spec_set=%r' 735 spec_string = spec_string % self._spec_class.__name__ 736 return "<%s%s%s id='%s'>" % ( 737 type(self).__name__, 738 name_string, 739 spec_string, 740 id(self) 741 ) 742 743 744 def __dir__(self): 745 """Filter the output of `dir(mock)` to only useful members.""" 746 if not FILTER_DIR: 747 return object.__dir__(self) 748 749 extras = self._mock_methods or [] 750 from_type = dir(type(self)) 751 from_dict = list(self.__dict__) 752 from_child_mocks = [ 753 m_name for m_name, m_value in self._mock_children.items() 754 if m_value is not _deleted] 755 756 from_type = [e for e in from_type if not e.startswith('_')] 757 from_dict = [e for e in from_dict if not e.startswith('_') or 758 _is_magic(e)] 759 return sorted(set(extras + from_type + from_dict + from_child_mocks)) 760 761 762 def __setattr__(self, name, value): 763 if name in _allowed_names: 764 # property setters go through here 765 return object.__setattr__(self, name, value) 766 elif (self._spec_set and self._mock_methods is not None and 767 name not in self._mock_methods and 768 name not in self.__dict__): 769 raise AttributeError("Mock object has no attribute '%s'" % name) 770 elif name in _unsupported_magics: 771 msg = 'Attempting to set unsupported magic method %r.' % name 772 raise AttributeError(msg) 773 elif name in _all_magics: 774 if self._mock_methods is not None and name not in self._mock_methods: 775 raise AttributeError("Mock object has no attribute '%s'" % name) 776 777 if not _is_instance_mock(value): 778 setattr(type(self), name, _get_method(name, value)) 779 original = value 780 value = lambda *args, **kw: original(self, *args, **kw) 781 else: 782 # only set _new_name and not name so that mock_calls is tracked 783 # but not method calls 784 _check_and_set_parent(self, value, None, name) 785 setattr(type(self), name, value) 786 self._mock_children[name] = value 787 elif name == '__class__': 788 self._spec_class = value 789 return 790 else: 791 if _check_and_set_parent(self, value, name, name): 792 self._mock_children[name] = value 793 794 if self._mock_sealed and not hasattr(self, name): 795 mock_name = f'{self._extract_mock_name()}.{name}' 796 raise AttributeError(f'Cannot set {mock_name}') 797 798 return object.__setattr__(self, name, value) 799 800 801 def __delattr__(self, name): 802 if name in _all_magics and name in type(self).__dict__: 803 delattr(type(self), name) 804 if name not in self.__dict__: 805 # for magic methods that are still MagicProxy objects and 806 # not set on the instance itself 807 return 808 809 obj = self._mock_children.get(name, _missing) 810 if name in self.__dict__: 811 _safe_super(NonCallableMock, self).__delattr__(name) 812 elif obj is _deleted: 813 raise AttributeError(name) 814 if obj is not _missing: 815 del self._mock_children[name] 816 self._mock_children[name] = _deleted 817 818 819 def _format_mock_call_signature(self, args, kwargs): 820 name = self._mock_name or 'mock' 821 return _format_call_signature(name, args, kwargs) 822 823 824 def _format_mock_failure_message(self, args, kwargs, action='call'): 825 message = 'expected %s not found.\nExpected: %s\nActual: %s' 826 expected_string = self._format_mock_call_signature(args, kwargs) 827 call_args = self.call_args 828 actual_string = self._format_mock_call_signature(*call_args) 829 return message % (action, expected_string, actual_string) 830 831 832 def _get_call_signature_from_name(self, name): 833 """ 834 * If call objects are asserted against a method/function like obj.meth1 835 then there could be no name for the call object to lookup. Hence just 836 return the spec_signature of the method/function being asserted against. 837 * If the name is not empty then remove () and split by '.' to get 838 list of names to iterate through the children until a potential 839 match is found. A child mock is created only during attribute access 840 so if we get a _SpecState then no attributes of the spec were accessed 841 and can be safely exited. 842 """ 843 if not name: 844 return self._spec_signature 845 846 sig = None 847 names = name.replace('()', '').split('.') 848 children = self._mock_children 849 850 for name in names: 851 child = children.get(name) 852 if child is None or isinstance(child, _SpecState): 853 break 854 else: 855 # If an autospecced object is attached using attach_mock the 856 # child would be a function with mock object as attribute from 857 # which signature has to be derived. 858 child = _extract_mock(child) 859 children = child._mock_children 860 sig = child._spec_signature 861 862 return sig 863 864 865 def _call_matcher(self, _call): 866 """ 867 Given a call (or simply an (args, kwargs) tuple), return a 868 comparison key suitable for matching with other calls. 869 This is a best effort method which relies on the spec's signature, 870 if available, or falls back on the arguments themselves. 871 """ 872 873 if isinstance(_call, tuple) and len(_call) > 2: 874 sig = self._get_call_signature_from_name(_call[0]) 875 else: 876 sig = self._spec_signature 877 878 if sig is not None: 879 if len(_call) == 2: 880 name = '' 881 args, kwargs = _call 882 else: 883 name, args, kwargs = _call 884 try: 885 bound_call = sig.bind(*args, **kwargs) 886 return call(name, bound_call.args, bound_call.kwargs) 887 except TypeError as e: 888 return e.with_traceback(None) 889 else: 890 return _call 891 892 def assert_not_called(self): 893 """assert that the mock was never called. 894 """ 895 if self.call_count != 0: 896 msg = ("Expected '%s' to not have been called. Called %s times.%s" 897 % (self._mock_name or 'mock', 898 self.call_count, 899 self._calls_repr())) 900 raise AssertionError(msg) 901 902 def assert_called(self): 903 """assert that the mock was called at least once 904 """ 905 if self.call_count == 0: 906 msg = ("Expected '%s' to have been called." % 907 (self._mock_name or 'mock')) 908 raise AssertionError(msg) 909 910 def assert_called_once(self): 911 """assert that the mock was called only once. 912 """ 913 if not self.call_count == 1: 914 msg = ("Expected '%s' to have been called once. Called %s times.%s" 915 % (self._mock_name or 'mock', 916 self.call_count, 917 self._calls_repr())) 918 raise AssertionError(msg) 919 920 def assert_called_with(self, /, *args, **kwargs): 921 """assert that the last call was made with the specified arguments. 922 923 Raises an AssertionError if the args and keyword args passed in are 924 different to the last call to the mock.""" 925 if self.call_args is None: 926 expected = self._format_mock_call_signature(args, kwargs) 927 actual = 'not called.' 928 error_message = ('expected call not found.\nExpected: %s\nActual: %s' 929 % (expected, actual)) 930 raise AssertionError(error_message) 931 932 def _error_message(): 933 msg = self._format_mock_failure_message(args, kwargs) 934 return msg 935 expected = self._call_matcher(_Call((args, kwargs), two=True)) 936 actual = self._call_matcher(self.call_args) 937 if actual != expected: 938 cause = expected if isinstance(expected, Exception) else None 939 raise AssertionError(_error_message()) from cause 940 941 942 def assert_called_once_with(self, /, *args, **kwargs): 943 """assert that the mock was called exactly once and that that call was 944 with the specified arguments.""" 945 if not self.call_count == 1: 946 msg = ("Expected '%s' to be called once. Called %s times.%s" 947 % (self._mock_name or 'mock', 948 self.call_count, 949 self._calls_repr())) 950 raise AssertionError(msg) 951 return self.assert_called_with(*args, **kwargs) 952 953 954 def assert_has_calls(self, calls, any_order=False): 955 """assert the mock has been called with the specified calls. 956 The `mock_calls` list is checked for the calls. 957 958 If `any_order` is False (the default) then the calls must be 959 sequential. There can be extra calls before or after the 960 specified calls. 961 962 If `any_order` is True then the calls can be in any order, but 963 they must all appear in `mock_calls`.""" 964 expected = [self._call_matcher(c) for c in calls] 965 cause = next((e for e in expected if isinstance(e, Exception)), None) 966 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls) 967 if not any_order: 968 if expected not in all_calls: 969 if cause is None: 970 problem = 'Calls not found.' 971 else: 972 problem = ('Error processing expected calls.\n' 973 'Errors: {}').format( 974 [e if isinstance(e, Exception) else None 975 for e in expected]) 976 raise AssertionError( 977 f'{problem}\n' 978 f'Expected: {_CallList(calls)}' 979 f'{self._calls_repr(prefix="Actual").rstrip(".")}' 980 ) from cause 981 return 982 983 all_calls = list(all_calls) 984 985 not_found = [] 986 for kall in expected: 987 try: 988 all_calls.remove(kall) 989 except ValueError: 990 not_found.append(kall) 991 if not_found: 992 raise AssertionError( 993 '%r does not contain all of %r in its call list, ' 994 'found %r instead' % (self._mock_name or 'mock', 995 tuple(not_found), all_calls) 996 ) from cause 997 998 999 def assert_any_call(self, /, *args, **kwargs): 1000 """assert the mock has been called with the specified arguments. 1001 1002 The assert passes if the mock has *ever* been called, unlike 1003 `assert_called_with` and `assert_called_once_with` that only pass if 1004 the call is the most recent one.""" 1005 expected = self._call_matcher(_Call((args, kwargs), two=True)) 1006 cause = expected if isinstance(expected, Exception) else None 1007 actual = [self._call_matcher(c) for c in self.call_args_list] 1008 if cause or expected not in _AnyComparer(actual): 1009 expected_string = self._format_mock_call_signature(args, kwargs) 1010 raise AssertionError( 1011 '%s call not found' % expected_string 1012 ) from cause 1013 1014 1015 def _get_child_mock(self, /, **kw): 1016 """Create the child mocks for attributes and return value. 1017 By default child mocks will be the same type as the parent. 1018 Subclasses of Mock may want to override this to customize the way 1019 child mocks are made. 1020 1021 For non-callable mocks the callable variant will be used (rather than 1022 any custom subclass).""" 1023 if self._mock_sealed: 1024 attribute = f".{kw['name']}" if "name" in kw else "()" 1025 mock_name = self._extract_mock_name() + attribute 1026 raise AttributeError(mock_name) 1027 1028 _new_name = kw.get("_new_name") 1029 if _new_name in self.__dict__['_spec_asyncs']: 1030 return AsyncMock(**kw) 1031 1032 _type = type(self) 1033 if issubclass(_type, MagicMock) and _new_name in _async_method_magics: 1034 # Any asynchronous magic becomes an AsyncMock 1035 klass = AsyncMock 1036 elif issubclass(_type, AsyncMockMixin): 1037 if (_new_name in _all_sync_magics or 1038 self._mock_methods and _new_name in self._mock_methods): 1039 # Any synchronous method on AsyncMock becomes a MagicMock 1040 klass = MagicMock 1041 else: 1042 klass = AsyncMock 1043 elif not issubclass(_type, CallableMixin): 1044 if issubclass(_type, NonCallableMagicMock): 1045 klass = MagicMock 1046 elif issubclass(_type, NonCallableMock): 1047 klass = Mock 1048 else: 1049 klass = _type.__mro__[1] 1050 return klass(**kw) 1051 1052 1053 def _calls_repr(self, prefix="Calls"): 1054 """Renders self.mock_calls as a string. 1055 1056 Example: "\nCalls: [call(1), call(2)]." 1057 1058 If self.mock_calls is empty, an empty string is returned. The 1059 output will be truncated if very long. 1060 """ 1061 if not self.mock_calls: 1062 return "" 1063 return f"\n{prefix}: {safe_repr(self.mock_calls)}." 1064 1065 1066_MOCK_SIG = inspect.signature(NonCallableMock.__init__) 1067 1068 1069class _AnyComparer(list): 1070 """A list which checks if it contains a call which may have an 1071 argument of ANY, flipping the components of item and self from 1072 their traditional locations so that ANY is guaranteed to be on 1073 the left.""" 1074 def __contains__(self, item): 1075 for _call in self: 1076 assert len(item) == len(_call) 1077 if all([ 1078 expected == actual 1079 for expected, actual in zip(item, _call) 1080 ]): 1081 return True 1082 return False 1083 1084 1085def _try_iter(obj): 1086 if obj is None: 1087 return obj 1088 if _is_exception(obj): 1089 return obj 1090 if _callable(obj): 1091 return obj 1092 try: 1093 return iter(obj) 1094 except TypeError: 1095 # XXXX backwards compatibility 1096 # but this will blow up on first call - so maybe we should fail early? 1097 return obj 1098 1099 1100class CallableMixin(Base): 1101 1102 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT, 1103 wraps=None, name=None, spec_set=None, parent=None, 1104 _spec_state=None, _new_name='', _new_parent=None, **kwargs): 1105 self.__dict__['_mock_return_value'] = return_value 1106 _safe_super(CallableMixin, self).__init__( 1107 spec, wraps, name, spec_set, parent, 1108 _spec_state, _new_name, _new_parent, **kwargs 1109 ) 1110 1111 self.side_effect = side_effect 1112 1113 1114 def _mock_check_sig(self, /, *args, **kwargs): 1115 # stub method that can be replaced with one with a specific signature 1116 pass 1117 1118 1119 def __call__(self, /, *args, **kwargs): 1120 # can't use self in-case a function / method we are mocking uses self 1121 # in the signature 1122 self._mock_check_sig(*args, **kwargs) 1123 self._increment_mock_call(*args, **kwargs) 1124 return self._mock_call(*args, **kwargs) 1125 1126 1127 def _mock_call(self, /, *args, **kwargs): 1128 return self._execute_mock_call(*args, **kwargs) 1129 1130 def _increment_mock_call(self, /, *args, **kwargs): 1131 self.called = True 1132 self.call_count += 1 1133 1134 # handle call_args 1135 # needs to be set here so assertions on call arguments pass before 1136 # execution in the case of awaited calls 1137 _call = _Call((args, kwargs), two=True) 1138 self.call_args = _call 1139 self.call_args_list.append(_call) 1140 1141 # initial stuff for method_calls: 1142 do_method_calls = self._mock_parent is not None 1143 method_call_name = self._mock_name 1144 1145 # initial stuff for mock_calls: 1146 mock_call_name = self._mock_new_name 1147 is_a_call = mock_call_name == '()' 1148 self.mock_calls.append(_Call(('', args, kwargs))) 1149 1150 # follow up the chain of mocks: 1151 _new_parent = self._mock_new_parent 1152 while _new_parent is not None: 1153 1154 # handle method_calls: 1155 if do_method_calls: 1156 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs))) 1157 do_method_calls = _new_parent._mock_parent is not None 1158 if do_method_calls: 1159 method_call_name = _new_parent._mock_name + '.' + method_call_name 1160 1161 # handle mock_calls: 1162 this_mock_call = _Call((mock_call_name, args, kwargs)) 1163 _new_parent.mock_calls.append(this_mock_call) 1164 1165 if _new_parent._mock_new_name: 1166 if is_a_call: 1167 dot = '' 1168 else: 1169 dot = '.' 1170 is_a_call = _new_parent._mock_new_name == '()' 1171 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name 1172 1173 # follow the parental chain: 1174 _new_parent = _new_parent._mock_new_parent 1175 1176 def _execute_mock_call(self, /, *args, **kwargs): 1177 # separate from _increment_mock_call so that awaited functions are 1178 # executed separately from their call, also AsyncMock overrides this method 1179 1180 effect = self.side_effect 1181 if effect is not None: 1182 if _is_exception(effect): 1183 raise effect 1184 elif not _callable(effect): 1185 result = next(effect) 1186 if _is_exception(result): 1187 raise result 1188 else: 1189 result = effect(*args, **kwargs) 1190 1191 if result is not DEFAULT: 1192 return result 1193 1194 if self._mock_return_value is not DEFAULT: 1195 return self.return_value 1196 1197 if self._mock_wraps is not None: 1198 return self._mock_wraps(*args, **kwargs) 1199 1200 return self.return_value 1201 1202 1203 1204class Mock(CallableMixin, NonCallableMock): 1205 """ 1206 Create a new `Mock` object. `Mock` takes several optional arguments 1207 that specify the behaviour of the Mock object: 1208 1209 * `spec`: This can be either a list of strings or an existing object (a 1210 class or instance) that acts as the specification for the mock object. If 1211 you pass in an object then a list of strings is formed by calling dir on 1212 the object (excluding unsupported magic attributes and methods). Accessing 1213 any attribute not in this list will raise an `AttributeError`. 1214 1215 If `spec` is an object (rather than a list of strings) then 1216 `mock.__class__` returns the class of the spec object. This allows mocks 1217 to pass `isinstance` tests. 1218 1219 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set* 1220 or get an attribute on the mock that isn't on the object passed as 1221 `spec_set` will raise an `AttributeError`. 1222 1223 * `side_effect`: A function to be called whenever the Mock is called. See 1224 the `side_effect` attribute. Useful for raising exceptions or 1225 dynamically changing return values. The function is called with the same 1226 arguments as the mock, and unless it returns `DEFAULT`, the return 1227 value of this function is used as the return value. 1228 1229 If `side_effect` is an iterable then each call to the mock will return 1230 the next value from the iterable. If any of the members of the iterable 1231 are exceptions they will be raised instead of returned. 1232 1233 * `return_value`: The value returned when the mock is called. By default 1234 this is a new Mock (created on first access). See the 1235 `return_value` attribute. 1236 1237 * `unsafe`: By default, accessing any attribute whose name starts with 1238 *assert*, *assret*, *asert*, *aseert* or *assrt* will raise an 1239 AttributeError. Passing `unsafe=True` will allow access to 1240 these attributes. 1241 1242 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then 1243 calling the Mock will pass the call through to the wrapped object 1244 (returning the real result). Attribute access on the mock will return a 1245 Mock object that wraps the corresponding attribute of the wrapped object 1246 (so attempting to access an attribute that doesn't exist will raise an 1247 `AttributeError`). 1248 1249 If the mock has an explicit `return_value` set then calls are not passed 1250 to the wrapped object and the `return_value` is returned instead. 1251 1252 * `name`: If the mock has a name then it will be used in the repr of the 1253 mock. This can be useful for debugging. The name is propagated to child 1254 mocks. 1255 1256 Mocks can also be called with arbitrary keyword arguments. These will be 1257 used to set attributes on the mock after it is created. 1258 """ 1259 1260 1261# _check_spec_arg_typos takes kwargs from commands like patch and checks that 1262# they don't contain common misspellings of arguments related to autospeccing. 1263def _check_spec_arg_typos(kwargs_to_check): 1264 typos = ("autospect", "auto_spec", "set_spec") 1265 for typo in typos: 1266 if typo in kwargs_to_check: 1267 raise RuntimeError( 1268 f"{typo!r} might be a typo; use unsafe=True if this is intended" 1269 ) 1270 1271 1272class _patch(object): 1273 1274 attribute_name = None 1275 _active_patches = [] 1276 1277 def __init__( 1278 self, getter, attribute, new, spec, create, 1279 spec_set, autospec, new_callable, kwargs, *, unsafe=False 1280 ): 1281 if new_callable is not None: 1282 if new is not DEFAULT: 1283 raise ValueError( 1284 "Cannot use 'new' and 'new_callable' together" 1285 ) 1286 if autospec is not None: 1287 raise ValueError( 1288 "Cannot use 'autospec' and 'new_callable' together" 1289 ) 1290 if not unsafe: 1291 _check_spec_arg_typos(kwargs) 1292 if _is_instance_mock(spec): 1293 raise InvalidSpecError( 1294 f'Cannot spec attr {attribute!r} as the spec ' 1295 f'has already been mocked out. [spec={spec!r}]') 1296 if _is_instance_mock(spec_set): 1297 raise InvalidSpecError( 1298 f'Cannot spec attr {attribute!r} as the spec_set ' 1299 f'target has already been mocked out. [spec_set={spec_set!r}]') 1300 1301 self.getter = getter 1302 self.attribute = attribute 1303 self.new = new 1304 self.new_callable = new_callable 1305 self.spec = spec 1306 self.create = create 1307 self.has_local = False 1308 self.spec_set = spec_set 1309 self.autospec = autospec 1310 self.kwargs = kwargs 1311 self.additional_patchers = [] 1312 1313 1314 def copy(self): 1315 patcher = _patch( 1316 self.getter, self.attribute, self.new, self.spec, 1317 self.create, self.spec_set, 1318 self.autospec, self.new_callable, self.kwargs 1319 ) 1320 patcher.attribute_name = self.attribute_name 1321 patcher.additional_patchers = [ 1322 p.copy() for p in self.additional_patchers 1323 ] 1324 return patcher 1325 1326 1327 def __call__(self, func): 1328 if isinstance(func, type): 1329 return self.decorate_class(func) 1330 if inspect.iscoroutinefunction(func): 1331 return self.decorate_async_callable(func) 1332 return self.decorate_callable(func) 1333 1334 1335 def decorate_class(self, klass): 1336 for attr in dir(klass): 1337 if not attr.startswith(patch.TEST_PREFIX): 1338 continue 1339 1340 attr_value = getattr(klass, attr) 1341 if not hasattr(attr_value, "__call__"): 1342 continue 1343 1344 patcher = self.copy() 1345 setattr(klass, attr, patcher(attr_value)) 1346 return klass 1347 1348 1349 @contextlib.contextmanager 1350 def decoration_helper(self, patched, args, keywargs): 1351 extra_args = [] 1352 with contextlib.ExitStack() as exit_stack: 1353 for patching in patched.patchings: 1354 arg = exit_stack.enter_context(patching) 1355 if patching.attribute_name is not None: 1356 keywargs.update(arg) 1357 elif patching.new is DEFAULT: 1358 extra_args.append(arg) 1359 1360 args += tuple(extra_args) 1361 yield (args, keywargs) 1362 1363 1364 def decorate_callable(self, func): 1365 # NB. Keep the method in sync with decorate_async_callable() 1366 if hasattr(func, 'patchings'): 1367 func.patchings.append(self) 1368 return func 1369 1370 @wraps(func) 1371 def patched(*args, **keywargs): 1372 with self.decoration_helper(patched, 1373 args, 1374 keywargs) as (newargs, newkeywargs): 1375 return func(*newargs, **newkeywargs) 1376 1377 patched.patchings = [self] 1378 return patched 1379 1380 1381 def decorate_async_callable(self, func): 1382 # NB. Keep the method in sync with decorate_callable() 1383 if hasattr(func, 'patchings'): 1384 func.patchings.append(self) 1385 return func 1386 1387 @wraps(func) 1388 async def patched(*args, **keywargs): 1389 with self.decoration_helper(patched, 1390 args, 1391 keywargs) as (newargs, newkeywargs): 1392 return await func(*newargs, **newkeywargs) 1393 1394 patched.patchings = [self] 1395 return patched 1396 1397 1398 def get_original(self): 1399 target = self.getter() 1400 name = self.attribute 1401 1402 original = DEFAULT 1403 local = False 1404 1405 try: 1406 original = target.__dict__[name] 1407 except (AttributeError, KeyError): 1408 original = getattr(target, name, DEFAULT) 1409 else: 1410 local = True 1411 1412 if name in _builtins and isinstance(target, ModuleType): 1413 self.create = True 1414 1415 if not self.create and original is DEFAULT: 1416 raise AttributeError( 1417 "%s does not have the attribute %r" % (target, name) 1418 ) 1419 return original, local 1420 1421 1422 def __enter__(self): 1423 """Perform the patch.""" 1424 new, spec, spec_set = self.new, self.spec, self.spec_set 1425 autospec, kwargs = self.autospec, self.kwargs 1426 new_callable = self.new_callable 1427 self.target = self.getter() 1428 1429 # normalise False to None 1430 if spec is False: 1431 spec = None 1432 if spec_set is False: 1433 spec_set = None 1434 if autospec is False: 1435 autospec = None 1436 1437 if spec is not None and autospec is not None: 1438 raise TypeError("Can't specify spec and autospec") 1439 if ((spec is not None or autospec is not None) and 1440 spec_set not in (True, None)): 1441 raise TypeError("Can't provide explicit spec_set *and* spec or autospec") 1442 1443 original, local = self.get_original() 1444 1445 if new is DEFAULT and autospec is None: 1446 inherit = False 1447 if spec is True: 1448 # set spec to the object we are replacing 1449 spec = original 1450 if spec_set is True: 1451 spec_set = original 1452 spec = None 1453 elif spec is not None: 1454 if spec_set is True: 1455 spec_set = spec 1456 spec = None 1457 elif spec_set is True: 1458 spec_set = original 1459 1460 if spec is not None or spec_set is not None: 1461 if original is DEFAULT: 1462 raise TypeError("Can't use 'spec' with create=True") 1463 if isinstance(original, type): 1464 # If we're patching out a class and there is a spec 1465 inherit = True 1466 if spec is None and _is_async_obj(original): 1467 Klass = AsyncMock 1468 else: 1469 Klass = MagicMock 1470 _kwargs = {} 1471 if new_callable is not None: 1472 Klass = new_callable 1473 elif spec is not None or spec_set is not None: 1474 this_spec = spec 1475 if spec_set is not None: 1476 this_spec = spec_set 1477 if _is_list(this_spec): 1478 not_callable = '__call__' not in this_spec 1479 else: 1480 not_callable = not callable(this_spec) 1481 if _is_async_obj(this_spec): 1482 Klass = AsyncMock 1483 elif not_callable: 1484 Klass = NonCallableMagicMock 1485 1486 if spec is not None: 1487 _kwargs['spec'] = spec 1488 if spec_set is not None: 1489 _kwargs['spec_set'] = spec_set 1490 1491 # add a name to mocks 1492 if (isinstance(Klass, type) and 1493 issubclass(Klass, NonCallableMock) and self.attribute): 1494 _kwargs['name'] = self.attribute 1495 1496 _kwargs.update(kwargs) 1497 new = Klass(**_kwargs) 1498 1499 if inherit and _is_instance_mock(new): 1500 # we can only tell if the instance should be callable if the 1501 # spec is not a list 1502 this_spec = spec 1503 if spec_set is not None: 1504 this_spec = spec_set 1505 if (not _is_list(this_spec) and not 1506 _instance_callable(this_spec)): 1507 Klass = NonCallableMagicMock 1508 1509 _kwargs.pop('name') 1510 new.return_value = Klass(_new_parent=new, _new_name='()', 1511 **_kwargs) 1512 elif autospec is not None: 1513 # spec is ignored, new *must* be default, spec_set is treated 1514 # as a boolean. Should we check spec is not None and that spec_set 1515 # is a bool? 1516 if new is not DEFAULT: 1517 raise TypeError( 1518 "autospec creates the mock for you. Can't specify " 1519 "autospec and new." 1520 ) 1521 if original is DEFAULT: 1522 raise TypeError("Can't use 'autospec' with create=True") 1523 spec_set = bool(spec_set) 1524 if autospec is True: 1525 autospec = original 1526 1527 if _is_instance_mock(self.target): 1528 raise InvalidSpecError( 1529 f'Cannot autospec attr {self.attribute!r} as the patch ' 1530 f'target has already been mocked out. ' 1531 f'[target={self.target!r}, attr={autospec!r}]') 1532 if _is_instance_mock(autospec): 1533 target_name = getattr(self.target, '__name__', self.target) 1534 raise InvalidSpecError( 1535 f'Cannot autospec attr {self.attribute!r} from target ' 1536 f'{target_name!r} as it has already been mocked out. ' 1537 f'[target={self.target!r}, attr={autospec!r}]') 1538 1539 new = create_autospec(autospec, spec_set=spec_set, 1540 _name=self.attribute, **kwargs) 1541 elif kwargs: 1542 # can't set keyword args when we aren't creating the mock 1543 # XXXX If new is a Mock we could call new.configure_mock(**kwargs) 1544 raise TypeError("Can't pass kwargs to a mock we aren't creating") 1545 1546 new_attr = new 1547 1548 self.temp_original = original 1549 self.is_local = local 1550 self._exit_stack = contextlib.ExitStack() 1551 try: 1552 setattr(self.target, self.attribute, new_attr) 1553 if self.attribute_name is not None: 1554 extra_args = {} 1555 if self.new is DEFAULT: 1556 extra_args[self.attribute_name] = new 1557 for patching in self.additional_patchers: 1558 arg = self._exit_stack.enter_context(patching) 1559 if patching.new is DEFAULT: 1560 extra_args.update(arg) 1561 return extra_args 1562 1563 return new 1564 except: 1565 if not self.__exit__(*sys.exc_info()): 1566 raise 1567 1568 def __exit__(self, *exc_info): 1569 """Undo the patch.""" 1570 if self.is_local and self.temp_original is not DEFAULT: 1571 setattr(self.target, self.attribute, self.temp_original) 1572 else: 1573 delattr(self.target, self.attribute) 1574 if not self.create and (not hasattr(self.target, self.attribute) or 1575 self.attribute in ('__doc__', '__module__', 1576 '__defaults__', '__annotations__', 1577 '__kwdefaults__')): 1578 # needed for proxy objects like django settings 1579 setattr(self.target, self.attribute, self.temp_original) 1580 1581 del self.temp_original 1582 del self.is_local 1583 del self.target 1584 exit_stack = self._exit_stack 1585 del self._exit_stack 1586 return exit_stack.__exit__(*exc_info) 1587 1588 1589 def start(self): 1590 """Activate a patch, returning any created mock.""" 1591 result = self.__enter__() 1592 self._active_patches.append(self) 1593 return result 1594 1595 1596 def stop(self): 1597 """Stop an active patch.""" 1598 try: 1599 self._active_patches.remove(self) 1600 except ValueError: 1601 # If the patch hasn't been started this will fail 1602 return None 1603 1604 return self.__exit__(None, None, None) 1605 1606 1607 1608def _get_target(target): 1609 try: 1610 target, attribute = target.rsplit('.', 1) 1611 except (TypeError, ValueError, AttributeError): 1612 raise TypeError( 1613 f"Need a valid target to patch. You supplied: {target!r}") 1614 return partial(pkgutil.resolve_name, target), attribute 1615 1616 1617def _patch_object( 1618 target, attribute, new=DEFAULT, spec=None, 1619 create=False, spec_set=None, autospec=None, 1620 new_callable=None, *, unsafe=False, **kwargs 1621 ): 1622 """ 1623 patch the named member (`attribute`) on an object (`target`) with a mock 1624 object. 1625 1626 `patch.object` can be used as a decorator, class decorator or a context 1627 manager. Arguments `new`, `spec`, `create`, `spec_set`, 1628 `autospec` and `new_callable` have the same meaning as for `patch`. Like 1629 `patch`, `patch.object` takes arbitrary keyword arguments for configuring 1630 the mock object it creates. 1631 1632 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX` 1633 for choosing which methods to wrap. 1634 """ 1635 if type(target) is str: 1636 raise TypeError( 1637 f"{target!r} must be the actual object to be patched, not a str" 1638 ) 1639 getter = lambda: target 1640 return _patch( 1641 getter, attribute, new, spec, create, 1642 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 1643 ) 1644 1645 1646def _patch_multiple(target, spec=None, create=False, spec_set=None, 1647 autospec=None, new_callable=None, **kwargs): 1648 """Perform multiple patches in a single call. It takes the object to be 1649 patched (either as an object or a string to fetch the object by importing) 1650 and keyword arguments for the patches:: 1651 1652 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'): 1653 ... 1654 1655 Use `DEFAULT` as the value if you want `patch.multiple` to create 1656 mocks for you. In this case the created mocks are passed into a decorated 1657 function by keyword, and a dictionary is returned when `patch.multiple` is 1658 used as a context manager. 1659 1660 `patch.multiple` can be used as a decorator, class decorator or a context 1661 manager. The arguments `spec`, `spec_set`, `create`, 1662 `autospec` and `new_callable` have the same meaning as for `patch`. These 1663 arguments will be applied to *all* patches done by `patch.multiple`. 1664 1665 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX` 1666 for choosing which methods to wrap. 1667 """ 1668 if type(target) is str: 1669 getter = partial(pkgutil.resolve_name, target) 1670 else: 1671 getter = lambda: target 1672 1673 if not kwargs: 1674 raise ValueError( 1675 'Must supply at least one keyword argument with patch.multiple' 1676 ) 1677 # need to wrap in a list for python 3, where items is a view 1678 items = list(kwargs.items()) 1679 attribute, new = items[0] 1680 patcher = _patch( 1681 getter, attribute, new, spec, create, spec_set, 1682 autospec, new_callable, {} 1683 ) 1684 patcher.attribute_name = attribute 1685 for attribute, new in items[1:]: 1686 this_patcher = _patch( 1687 getter, attribute, new, spec, create, spec_set, 1688 autospec, new_callable, {} 1689 ) 1690 this_patcher.attribute_name = attribute 1691 patcher.additional_patchers.append(this_patcher) 1692 return patcher 1693 1694 1695def patch( 1696 target, new=DEFAULT, spec=None, create=False, 1697 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs 1698 ): 1699 """ 1700 `patch` acts as a function decorator, class decorator or a context 1701 manager. Inside the body of the function or with statement, the `target` 1702 is patched with a `new` object. When the function/with statement exits 1703 the patch is undone. 1704 1705 If `new` is omitted, then the target is replaced with an 1706 `AsyncMock if the patched object is an async function or a 1707 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is 1708 omitted, the created mock is passed in as an extra argument to the 1709 decorated function. If `patch` is used as a context manager the created 1710 mock is returned by the context manager. 1711 1712 `target` should be a string in the form `'package.module.ClassName'`. The 1713 `target` is imported and the specified object replaced with the `new` 1714 object, so the `target` must be importable from the environment you are 1715 calling `patch` from. The target is imported when the decorated function 1716 is executed, not at decoration time. 1717 1718 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock` 1719 if patch is creating one for you. 1720 1721 In addition you can pass `spec=True` or `spec_set=True`, which causes 1722 patch to pass in the object being mocked as the spec/spec_set object. 1723 1724 `new_callable` allows you to specify a different class, or callable object, 1725 that will be called to create the `new` object. By default `AsyncMock` is 1726 used for async functions and `MagicMock` for the rest. 1727 1728 A more powerful form of `spec` is `autospec`. If you set `autospec=True` 1729 then the mock will be created with a spec from the object being replaced. 1730 All attributes of the mock will also have the spec of the corresponding 1731 attribute of the object being replaced. Methods and functions being 1732 mocked will have their arguments checked and will raise a `TypeError` if 1733 they are called with the wrong signature. For mocks replacing a class, 1734 their return value (the 'instance') will have the same spec as the class. 1735 1736 Instead of `autospec=True` you can pass `autospec=some_object` to use an 1737 arbitrary object as the spec instead of the one being replaced. 1738 1739 By default `patch` will fail to replace attributes that don't exist. If 1740 you pass in `create=True`, and the attribute doesn't exist, patch will 1741 create the attribute for you when the patched function is called, and 1742 delete it again afterwards. This is useful for writing tests against 1743 attributes that your production code creates at runtime. It is off by 1744 default because it can be dangerous. With it switched on you can write 1745 passing tests against APIs that don't actually exist! 1746 1747 Patch can be used as a `TestCase` class decorator. It works by 1748 decorating each test method in the class. This reduces the boilerplate 1749 code when your test methods share a common patchings set. `patch` finds 1750 tests by looking for method names that start with `patch.TEST_PREFIX`. 1751 By default this is `test`, which matches the way `unittest` finds tests. 1752 You can specify an alternative prefix by setting `patch.TEST_PREFIX`. 1753 1754 Patch can be used as a context manager, with the with statement. Here the 1755 patching applies to the indented block after the with statement. If you 1756 use "as" then the patched object will be bound to the name after the 1757 "as"; very useful if `patch` is creating a mock object for you. 1758 1759 Patch will raise a `RuntimeError` if passed some common misspellings of 1760 the arguments autospec and spec_set. Pass the argument `unsafe` with the 1761 value True to disable that check. 1762 1763 `patch` takes arbitrary keyword arguments. These will be passed to 1764 `AsyncMock` if the patched object is asynchronous, to `MagicMock` 1765 otherwise or to `new_callable` if specified. 1766 1767 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are 1768 available for alternate use-cases. 1769 """ 1770 getter, attribute = _get_target(target) 1771 return _patch( 1772 getter, attribute, new, spec, create, 1773 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 1774 ) 1775 1776 1777class _patch_dict(object): 1778 """ 1779 Patch a dictionary, or dictionary like object, and restore the dictionary 1780 to its original state after the test. 1781 1782 `in_dict` can be a dictionary or a mapping like container. If it is a 1783 mapping then it must at least support getting, setting and deleting items 1784 plus iterating over keys. 1785 1786 `in_dict` can also be a string specifying the name of the dictionary, which 1787 will then be fetched by importing it. 1788 1789 `values` can be a dictionary of values to set in the dictionary. `values` 1790 can also be an iterable of `(key, value)` pairs. 1791 1792 If `clear` is True then the dictionary will be cleared before the new 1793 values are set. 1794 1795 `patch.dict` can also be called with arbitrary keyword arguments to set 1796 values in the dictionary:: 1797 1798 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()): 1799 ... 1800 1801 `patch.dict` can be used as a context manager, decorator or class 1802 decorator. When used as a class decorator `patch.dict` honours 1803 `patch.TEST_PREFIX` for choosing which methods to wrap. 1804 """ 1805 1806 def __init__(self, in_dict, values=(), clear=False, **kwargs): 1807 self.in_dict = in_dict 1808 # support any argument supported by dict(...) constructor 1809 self.values = dict(values) 1810 self.values.update(kwargs) 1811 self.clear = clear 1812 self._original = None 1813 1814 1815 def __call__(self, f): 1816 if isinstance(f, type): 1817 return self.decorate_class(f) 1818 if inspect.iscoroutinefunction(f): 1819 return self.decorate_async_callable(f) 1820 return self.decorate_callable(f) 1821 1822 1823 def decorate_callable(self, f): 1824 @wraps(f) 1825 def _inner(*args, **kw): 1826 self._patch_dict() 1827 try: 1828 return f(*args, **kw) 1829 finally: 1830 self._unpatch_dict() 1831 1832 return _inner 1833 1834 1835 def decorate_async_callable(self, f): 1836 @wraps(f) 1837 async def _inner(*args, **kw): 1838 self._patch_dict() 1839 try: 1840 return await f(*args, **kw) 1841 finally: 1842 self._unpatch_dict() 1843 1844 return _inner 1845 1846 1847 def decorate_class(self, klass): 1848 for attr in dir(klass): 1849 attr_value = getattr(klass, attr) 1850 if (attr.startswith(patch.TEST_PREFIX) and 1851 hasattr(attr_value, "__call__")): 1852 decorator = _patch_dict(self.in_dict, self.values, self.clear) 1853 decorated = decorator(attr_value) 1854 setattr(klass, attr, decorated) 1855 return klass 1856 1857 1858 def __enter__(self): 1859 """Patch the dict.""" 1860 self._patch_dict() 1861 return self.in_dict 1862 1863 1864 def _patch_dict(self): 1865 values = self.values 1866 if isinstance(self.in_dict, str): 1867 self.in_dict = pkgutil.resolve_name(self.in_dict) 1868 in_dict = self.in_dict 1869 clear = self.clear 1870 1871 try: 1872 original = in_dict.copy() 1873 except AttributeError: 1874 # dict like object with no copy method 1875 # must support iteration over keys 1876 original = {} 1877 for key in in_dict: 1878 original[key] = in_dict[key] 1879 self._original = original 1880 1881 if clear: 1882 _clear_dict(in_dict) 1883 1884 try: 1885 in_dict.update(values) 1886 except AttributeError: 1887 # dict like object with no update method 1888 for key in values: 1889 in_dict[key] = values[key] 1890 1891 1892 def _unpatch_dict(self): 1893 in_dict = self.in_dict 1894 original = self._original 1895 1896 _clear_dict(in_dict) 1897 1898 try: 1899 in_dict.update(original) 1900 except AttributeError: 1901 for key in original: 1902 in_dict[key] = original[key] 1903 1904 1905 def __exit__(self, *args): 1906 """Unpatch the dict.""" 1907 if self._original is not None: 1908 self._unpatch_dict() 1909 return False 1910 1911 1912 def start(self): 1913 """Activate a patch, returning any created mock.""" 1914 result = self.__enter__() 1915 _patch._active_patches.append(self) 1916 return result 1917 1918 1919 def stop(self): 1920 """Stop an active patch.""" 1921 try: 1922 _patch._active_patches.remove(self) 1923 except ValueError: 1924 # If the patch hasn't been started this will fail 1925 return None 1926 1927 return self.__exit__(None, None, None) 1928 1929 1930def _clear_dict(in_dict): 1931 try: 1932 in_dict.clear() 1933 except AttributeError: 1934 keys = list(in_dict) 1935 for key in keys: 1936 del in_dict[key] 1937 1938 1939def _patch_stopall(): 1940 """Stop all active patches. LIFO to unroll nested patches.""" 1941 for patch in reversed(_patch._active_patches): 1942 patch.stop() 1943 1944 1945patch.object = _patch_object 1946patch.dict = _patch_dict 1947patch.multiple = _patch_multiple 1948patch.stopall = _patch_stopall 1949patch.TEST_PREFIX = 'test' 1950 1951magic_methods = ( 1952 "lt le gt ge eq ne " 1953 "getitem setitem delitem " 1954 "len contains iter " 1955 "hash str sizeof " 1956 "enter exit " 1957 # we added divmod and rdivmod here instead of numerics 1958 # because there is no idivmod 1959 "divmod rdivmod neg pos abs invert " 1960 "complex int float index " 1961 "round trunc floor ceil " 1962 "bool next " 1963 "fspath " 1964 "aiter " 1965) 1966 1967numerics = ( 1968 "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow" 1969) 1970inplace = ' '.join('i%s' % n for n in numerics.split()) 1971right = ' '.join('r%s' % n for n in numerics.split()) 1972 1973# not including __prepare__, __instancecheck__, __subclasscheck__ 1974# (as they are metaclass methods) 1975# __del__ is not supported at all as it causes problems if it exists 1976 1977_non_defaults = { 1978 '__get__', '__set__', '__delete__', '__reversed__', '__missing__', 1979 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__', 1980 '__getstate__', '__setstate__', '__getformat__', 1981 '__repr__', '__dir__', '__subclasses__', '__format__', 1982 '__getnewargs_ex__', 1983} 1984 1985 1986def _get_method(name, func): 1987 "Turns a callable object (like a mock) into a real function" 1988 def method(self, /, *args, **kw): 1989 return func(self, *args, **kw) 1990 method.__name__ = name 1991 return method 1992 1993 1994_magics = { 1995 '__%s__' % method for method in 1996 ' '.join([magic_methods, numerics, inplace, right]).split() 1997} 1998 1999# Magic methods used for async `with` statements 2000_async_method_magics = {"__aenter__", "__aexit__", "__anext__"} 2001# Magic methods that are only used with async calls but are synchronous functions themselves 2002_sync_async_magics = {"__aiter__"} 2003_async_magics = _async_method_magics | _sync_async_magics 2004 2005_all_sync_magics = _magics | _non_defaults 2006_all_magics = _all_sync_magics | _async_magics 2007 2008_unsupported_magics = { 2009 '__getattr__', '__setattr__', 2010 '__init__', '__new__', '__prepare__', 2011 '__instancecheck__', '__subclasscheck__', 2012 '__del__' 2013} 2014 2015_calculate_return_value = { 2016 '__hash__': lambda self: object.__hash__(self), 2017 '__str__': lambda self: object.__str__(self), 2018 '__sizeof__': lambda self: object.__sizeof__(self), 2019 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}", 2020} 2021 2022_return_values = { 2023 '__lt__': NotImplemented, 2024 '__gt__': NotImplemented, 2025 '__le__': NotImplemented, 2026 '__ge__': NotImplemented, 2027 '__int__': 1, 2028 '__contains__': False, 2029 '__len__': 0, 2030 '__exit__': False, 2031 '__complex__': 1j, 2032 '__float__': 1.0, 2033 '__bool__': True, 2034 '__index__': 1, 2035 '__aexit__': False, 2036} 2037 2038 2039def _get_eq(self): 2040 def __eq__(other): 2041 ret_val = self.__eq__._mock_return_value 2042 if ret_val is not DEFAULT: 2043 return ret_val 2044 if self is other: 2045 return True 2046 return NotImplemented 2047 return __eq__ 2048 2049def _get_ne(self): 2050 def __ne__(other): 2051 if self.__ne__._mock_return_value is not DEFAULT: 2052 return DEFAULT 2053 if self is other: 2054 return False 2055 return NotImplemented 2056 return __ne__ 2057 2058def _get_iter(self): 2059 def __iter__(): 2060 ret_val = self.__iter__._mock_return_value 2061 if ret_val is DEFAULT: 2062 return iter([]) 2063 # if ret_val was already an iterator, then calling iter on it should 2064 # return the iterator unchanged 2065 return iter(ret_val) 2066 return __iter__ 2067 2068def _get_async_iter(self): 2069 def __aiter__(): 2070 ret_val = self.__aiter__._mock_return_value 2071 if ret_val is DEFAULT: 2072 return _AsyncIterator(iter([])) 2073 return _AsyncIterator(iter(ret_val)) 2074 return __aiter__ 2075 2076_side_effect_methods = { 2077 '__eq__': _get_eq, 2078 '__ne__': _get_ne, 2079 '__iter__': _get_iter, 2080 '__aiter__': _get_async_iter 2081} 2082 2083 2084 2085def _set_return_value(mock, method, name): 2086 fixed = _return_values.get(name, DEFAULT) 2087 if fixed is not DEFAULT: 2088 method.return_value = fixed 2089 return 2090 2091 return_calculator = _calculate_return_value.get(name) 2092 if return_calculator is not None: 2093 return_value = return_calculator(mock) 2094 method.return_value = return_value 2095 return 2096 2097 side_effector = _side_effect_methods.get(name) 2098 if side_effector is not None: 2099 method.side_effect = side_effector(mock) 2100 2101 2102 2103class MagicMixin(Base): 2104 def __init__(self, /, *args, **kw): 2105 self._mock_set_magics() # make magic work for kwargs in init 2106 _safe_super(MagicMixin, self).__init__(*args, **kw) 2107 self._mock_set_magics() # fix magic broken by upper level init 2108 2109 2110 def _mock_set_magics(self): 2111 orig_magics = _magics | _async_method_magics 2112 these_magics = orig_magics 2113 2114 if getattr(self, "_mock_methods", None) is not None: 2115 these_magics = orig_magics.intersection(self._mock_methods) 2116 2117 remove_magics = set() 2118 remove_magics = orig_magics - these_magics 2119 2120 for entry in remove_magics: 2121 if entry in type(self).__dict__: 2122 # remove unneeded magic methods 2123 delattr(self, entry) 2124 2125 # don't overwrite existing attributes if called a second time 2126 these_magics = these_magics - set(type(self).__dict__) 2127 2128 _type = type(self) 2129 for entry in these_magics: 2130 setattr(_type, entry, MagicProxy(entry, self)) 2131 2132 2133 2134class NonCallableMagicMock(MagicMixin, NonCallableMock): 2135 """A version of `MagicMock` that isn't callable.""" 2136 def mock_add_spec(self, spec, spec_set=False): 2137 """Add a spec to a mock. `spec` can either be an object or a 2138 list of strings. Only attributes on the `spec` can be fetched as 2139 attributes from the mock. 2140 2141 If `spec_set` is True then only attributes on the spec can be set.""" 2142 self._mock_add_spec(spec, spec_set) 2143 self._mock_set_magics() 2144 2145 2146class AsyncMagicMixin(MagicMixin): 2147 def __init__(self, /, *args, **kw): 2148 self._mock_set_magics() # make magic work for kwargs in init 2149 _safe_super(AsyncMagicMixin, self).__init__(*args, **kw) 2150 self._mock_set_magics() # fix magic broken by upper level init 2151 2152class MagicMock(MagicMixin, Mock): 2153 """ 2154 MagicMock is a subclass of Mock with default implementations 2155 of most of the magic methods. You can use MagicMock without having to 2156 configure the magic methods yourself. 2157 2158 If you use the `spec` or `spec_set` arguments then *only* magic 2159 methods that exist in the spec will be created. 2160 2161 Attributes and the return value of a `MagicMock` will also be `MagicMocks`. 2162 """ 2163 def mock_add_spec(self, spec, spec_set=False): 2164 """Add a spec to a mock. `spec` can either be an object or a 2165 list of strings. Only attributes on the `spec` can be fetched as 2166 attributes from the mock. 2167 2168 If `spec_set` is True then only attributes on the spec can be set.""" 2169 self._mock_add_spec(spec, spec_set) 2170 self._mock_set_magics() 2171 2172 2173 2174class MagicProxy(Base): 2175 def __init__(self, name, parent): 2176 self.name = name 2177 self.parent = parent 2178 2179 def create_mock(self): 2180 entry = self.name 2181 parent = self.parent 2182 m = parent._get_child_mock(name=entry, _new_name=entry, 2183 _new_parent=parent) 2184 setattr(parent, entry, m) 2185 _set_return_value(parent, m, entry) 2186 return m 2187 2188 def __get__(self, obj, _type=None): 2189 return self.create_mock() 2190 2191 2192class AsyncMockMixin(Base): 2193 await_count = _delegating_property('await_count') 2194 await_args = _delegating_property('await_args') 2195 await_args_list = _delegating_property('await_args_list') 2196 2197 def __init__(self, /, *args, **kwargs): 2198 super().__init__(*args, **kwargs) 2199 # iscoroutinefunction() checks _is_coroutine property to say if an 2200 # object is a coroutine. Without this check it looks to see if it is a 2201 # function/method, which in this case it is not (since it is an 2202 # AsyncMock). 2203 # It is set through __dict__ because when spec_set is True, this 2204 # attribute is likely undefined. 2205 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine 2206 self.__dict__['_mock_await_count'] = 0 2207 self.__dict__['_mock_await_args'] = None 2208 self.__dict__['_mock_await_args_list'] = _CallList() 2209 code_mock = NonCallableMock(spec_set=CodeType) 2210 code_mock.co_flags = ( 2211 inspect.CO_COROUTINE 2212 + inspect.CO_VARARGS 2213 + inspect.CO_VARKEYWORDS 2214 ) 2215 code_mock.co_argcount = 0 2216 code_mock.co_varnames = ('args', 'kwargs') 2217 code_mock.co_posonlyargcount = 0 2218 code_mock.co_kwonlyargcount = 0 2219 self.__dict__['__code__'] = code_mock 2220 self.__dict__['__name__'] = 'AsyncMock' 2221 self.__dict__['__defaults__'] = tuple() 2222 self.__dict__['__kwdefaults__'] = {} 2223 self.__dict__['__annotations__'] = None 2224 2225 async def _execute_mock_call(self, /, *args, **kwargs): 2226 # This is nearly just like super(), except for special handling 2227 # of coroutines 2228 2229 _call = _Call((args, kwargs), two=True) 2230 self.await_count += 1 2231 self.await_args = _call 2232 self.await_args_list.append(_call) 2233 2234 effect = self.side_effect 2235 if effect is not None: 2236 if _is_exception(effect): 2237 raise effect 2238 elif not _callable(effect): 2239 try: 2240 result = next(effect) 2241 except StopIteration: 2242 # It is impossible to propagate a StopIteration 2243 # through coroutines because of PEP 479 2244 raise StopAsyncIteration 2245 if _is_exception(result): 2246 raise result 2247 elif iscoroutinefunction(effect): 2248 result = await effect(*args, **kwargs) 2249 else: 2250 result = effect(*args, **kwargs) 2251 2252 if result is not DEFAULT: 2253 return result 2254 2255 if self._mock_return_value is not DEFAULT: 2256 return self.return_value 2257 2258 if self._mock_wraps is not None: 2259 if iscoroutinefunction(self._mock_wraps): 2260 return await self._mock_wraps(*args, **kwargs) 2261 return self._mock_wraps(*args, **kwargs) 2262 2263 return self.return_value 2264 2265 def assert_awaited(self): 2266 """ 2267 Assert that the mock was awaited at least once. 2268 """ 2269 if self.await_count == 0: 2270 msg = f"Expected {self._mock_name or 'mock'} to have been awaited." 2271 raise AssertionError(msg) 2272 2273 def assert_awaited_once(self): 2274 """ 2275 Assert that the mock was awaited exactly once. 2276 """ 2277 if not self.await_count == 1: 2278 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 2279 f" Awaited {self.await_count} times.") 2280 raise AssertionError(msg) 2281 2282 def assert_awaited_with(self, /, *args, **kwargs): 2283 """ 2284 Assert that the last await was with the specified arguments. 2285 """ 2286 if self.await_args is None: 2287 expected = self._format_mock_call_signature(args, kwargs) 2288 raise AssertionError(f'Expected await: {expected}\nNot awaited') 2289 2290 def _error_message(): 2291 msg = self._format_mock_failure_message(args, kwargs, action='await') 2292 return msg 2293 2294 expected = self._call_matcher(_Call((args, kwargs), two=True)) 2295 actual = self._call_matcher(self.await_args) 2296 if actual != expected: 2297 cause = expected if isinstance(expected, Exception) else None 2298 raise AssertionError(_error_message()) from cause 2299 2300 def assert_awaited_once_with(self, /, *args, **kwargs): 2301 """ 2302 Assert that the mock was awaited exactly once and with the specified 2303 arguments. 2304 """ 2305 if not self.await_count == 1: 2306 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 2307 f" Awaited {self.await_count} times.") 2308 raise AssertionError(msg) 2309 return self.assert_awaited_with(*args, **kwargs) 2310 2311 def assert_any_await(self, /, *args, **kwargs): 2312 """ 2313 Assert the mock has ever been awaited with the specified arguments. 2314 """ 2315 expected = self._call_matcher(_Call((args, kwargs), two=True)) 2316 cause = expected if isinstance(expected, Exception) else None 2317 actual = [self._call_matcher(c) for c in self.await_args_list] 2318 if cause or expected not in _AnyComparer(actual): 2319 expected_string = self._format_mock_call_signature(args, kwargs) 2320 raise AssertionError( 2321 '%s await not found' % expected_string 2322 ) from cause 2323 2324 def assert_has_awaits(self, calls, any_order=False): 2325 """ 2326 Assert the mock has been awaited with the specified calls. 2327 The :attr:`await_args_list` list is checked for the awaits. 2328 2329 If `any_order` is False (the default) then the awaits must be 2330 sequential. There can be extra calls before or after the 2331 specified awaits. 2332 2333 If `any_order` is True then the awaits can be in any order, but 2334 they must all appear in :attr:`await_args_list`. 2335 """ 2336 expected = [self._call_matcher(c) for c in calls] 2337 cause = next((e for e in expected if isinstance(e, Exception)), None) 2338 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list) 2339 if not any_order: 2340 if expected not in all_awaits: 2341 if cause is None: 2342 problem = 'Awaits not found.' 2343 else: 2344 problem = ('Error processing expected awaits.\n' 2345 'Errors: {}').format( 2346 [e if isinstance(e, Exception) else None 2347 for e in expected]) 2348 raise AssertionError( 2349 f'{problem}\n' 2350 f'Expected: {_CallList(calls)}\n' 2351 f'Actual: {self.await_args_list}' 2352 ) from cause 2353 return 2354 2355 all_awaits = list(all_awaits) 2356 2357 not_found = [] 2358 for kall in expected: 2359 try: 2360 all_awaits.remove(kall) 2361 except ValueError: 2362 not_found.append(kall) 2363 if not_found: 2364 raise AssertionError( 2365 '%r not all found in await list' % (tuple(not_found),) 2366 ) from cause 2367 2368 def assert_not_awaited(self): 2369 """ 2370 Assert that the mock was never awaited. 2371 """ 2372 if self.await_count != 0: 2373 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited." 2374 f" Awaited {self.await_count} times.") 2375 raise AssertionError(msg) 2376 2377 def reset_mock(self, /, *args, **kwargs): 2378 """ 2379 See :func:`.Mock.reset_mock()` 2380 """ 2381 super().reset_mock(*args, **kwargs) 2382 self.await_count = 0 2383 self.await_args = None 2384 self.await_args_list = _CallList() 2385 2386 2387class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock): 2388 """ 2389 Enhance :class:`Mock` with features allowing to mock 2390 an async function. 2391 2392 The :class:`AsyncMock` object will behave so the object is 2393 recognized as an async function, and the result of a call is an awaitable: 2394 2395 >>> mock = AsyncMock() 2396 >>> iscoroutinefunction(mock) 2397 True 2398 >>> inspect.isawaitable(mock()) 2399 True 2400 2401 2402 The result of ``mock()`` is an async function which will have the outcome 2403 of ``side_effect`` or ``return_value``: 2404 2405 - if ``side_effect`` is a function, the async function will return the 2406 result of that function, 2407 - if ``side_effect`` is an exception, the async function will raise the 2408 exception, 2409 - if ``side_effect`` is an iterable, the async function will return the 2410 next value of the iterable, however, if the sequence of result is 2411 exhausted, ``StopIteration`` is raised immediately, 2412 - if ``side_effect`` is not defined, the async function will return the 2413 value defined by ``return_value``, hence, by default, the async function 2414 returns a new :class:`AsyncMock` object. 2415 2416 If the outcome of ``side_effect`` or ``return_value`` is an async function, 2417 the mock async function obtained when the mock object is called will be this 2418 async function itself (and not an async function returning an async 2419 function). 2420 2421 The test author can also specify a wrapped object with ``wraps``. In this 2422 case, the :class:`Mock` object behavior is the same as with an 2423 :class:`.Mock` object: the wrapped object may have methods 2424 defined as async function functions. 2425 2426 Based on Martin Richard's asynctest project. 2427 """ 2428 2429 2430class _ANY(object): 2431 "A helper object that compares equal to everything." 2432 2433 def __eq__(self, other): 2434 return True 2435 2436 def __ne__(self, other): 2437 return False 2438 2439 def __repr__(self): 2440 return '<ANY>' 2441 2442ANY = _ANY() 2443 2444 2445 2446def _format_call_signature(name, args, kwargs): 2447 message = '%s(%%s)' % name 2448 formatted_args = '' 2449 args_string = ', '.join([repr(arg) for arg in args]) 2450 kwargs_string = ', '.join([ 2451 '%s=%r' % (key, value) for key, value in kwargs.items() 2452 ]) 2453 if args_string: 2454 formatted_args = args_string 2455 if kwargs_string: 2456 if formatted_args: 2457 formatted_args += ', ' 2458 formatted_args += kwargs_string 2459 2460 return message % formatted_args 2461 2462 2463 2464class _Call(tuple): 2465 """ 2466 A tuple for holding the results of a call to a mock, either in the form 2467 `(args, kwargs)` or `(name, args, kwargs)`. 2468 2469 If args or kwargs are empty then a call tuple will compare equal to 2470 a tuple without those values. This makes comparisons less verbose:: 2471 2472 _Call(('name', (), {})) == ('name',) 2473 _Call(('name', (1,), {})) == ('name', (1,)) 2474 _Call(((), {'a': 'b'})) == ({'a': 'b'},) 2475 2476 The `_Call` object provides a useful shortcut for comparing with call:: 2477 2478 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3) 2479 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3) 2480 2481 If the _Call has no name then it will match any name. 2482 """ 2483 def __new__(cls, value=(), name='', parent=None, two=False, 2484 from_kall=True): 2485 args = () 2486 kwargs = {} 2487 _len = len(value) 2488 if _len == 3: 2489 name, args, kwargs = value 2490 elif _len == 2: 2491 first, second = value 2492 if isinstance(first, str): 2493 name = first 2494 if isinstance(second, tuple): 2495 args = second 2496 else: 2497 kwargs = second 2498 else: 2499 args, kwargs = first, second 2500 elif _len == 1: 2501 value, = value 2502 if isinstance(value, str): 2503 name = value 2504 elif isinstance(value, tuple): 2505 args = value 2506 else: 2507 kwargs = value 2508 2509 if two: 2510 return tuple.__new__(cls, (args, kwargs)) 2511 2512 return tuple.__new__(cls, (name, args, kwargs)) 2513 2514 2515 def __init__(self, value=(), name=None, parent=None, two=False, 2516 from_kall=True): 2517 self._mock_name = name 2518 self._mock_parent = parent 2519 self._mock_from_kall = from_kall 2520 2521 2522 def __eq__(self, other): 2523 try: 2524 len_other = len(other) 2525 except TypeError: 2526 return NotImplemented 2527 2528 self_name = '' 2529 if len(self) == 2: 2530 self_args, self_kwargs = self 2531 else: 2532 self_name, self_args, self_kwargs = self 2533 2534 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None) 2535 and self._mock_parent != other._mock_parent): 2536 return False 2537 2538 other_name = '' 2539 if len_other == 0: 2540 other_args, other_kwargs = (), {} 2541 elif len_other == 3: 2542 other_name, other_args, other_kwargs = other 2543 elif len_other == 1: 2544 value, = other 2545 if isinstance(value, tuple): 2546 other_args = value 2547 other_kwargs = {} 2548 elif isinstance(value, str): 2549 other_name = value 2550 other_args, other_kwargs = (), {} 2551 else: 2552 other_args = () 2553 other_kwargs = value 2554 elif len_other == 2: 2555 # could be (name, args) or (name, kwargs) or (args, kwargs) 2556 first, second = other 2557 if isinstance(first, str): 2558 other_name = first 2559 if isinstance(second, tuple): 2560 other_args, other_kwargs = second, {} 2561 else: 2562 other_args, other_kwargs = (), second 2563 else: 2564 other_args, other_kwargs = first, second 2565 else: 2566 return False 2567 2568 if self_name and other_name != self_name: 2569 return False 2570 2571 # this order is important for ANY to work! 2572 return (other_args, other_kwargs) == (self_args, self_kwargs) 2573 2574 2575 __ne__ = object.__ne__ 2576 2577 2578 def __call__(self, /, *args, **kwargs): 2579 if self._mock_name is None: 2580 return _Call(('', args, kwargs), name='()') 2581 2582 name = self._mock_name + '()' 2583 return _Call((self._mock_name, args, kwargs), name=name, parent=self) 2584 2585 2586 def __getattr__(self, attr): 2587 if self._mock_name is None: 2588 return _Call(name=attr, from_kall=False) 2589 name = '%s.%s' % (self._mock_name, attr) 2590 return _Call(name=name, parent=self, from_kall=False) 2591 2592 2593 def __getattribute__(self, attr): 2594 if attr in tuple.__dict__: 2595 raise AttributeError 2596 return tuple.__getattribute__(self, attr) 2597 2598 2599 def _get_call_arguments(self): 2600 if len(self) == 2: 2601 args, kwargs = self 2602 else: 2603 name, args, kwargs = self 2604 2605 return args, kwargs 2606 2607 @property 2608 def args(self): 2609 return self._get_call_arguments()[0] 2610 2611 @property 2612 def kwargs(self): 2613 return self._get_call_arguments()[1] 2614 2615 def __repr__(self): 2616 if not self._mock_from_kall: 2617 name = self._mock_name or 'call' 2618 if name.startswith('()'): 2619 name = 'call%s' % name 2620 return name 2621 2622 if len(self) == 2: 2623 name = 'call' 2624 args, kwargs = self 2625 else: 2626 name, args, kwargs = self 2627 if not name: 2628 name = 'call' 2629 elif not name.startswith('()'): 2630 name = 'call.%s' % name 2631 else: 2632 name = 'call%s' % name 2633 return _format_call_signature(name, args, kwargs) 2634 2635 2636 def call_list(self): 2637 """For a call object that represents multiple calls, `call_list` 2638 returns a list of all the intermediate calls as well as the 2639 final call.""" 2640 vals = [] 2641 thing = self 2642 while thing is not None: 2643 if thing._mock_from_kall: 2644 vals.append(thing) 2645 thing = thing._mock_parent 2646 return _CallList(reversed(vals)) 2647 2648 2649call = _Call(from_kall=False) 2650 2651 2652def create_autospec(spec, spec_set=False, instance=False, _parent=None, 2653 _name=None, *, unsafe=False, **kwargs): 2654 """Create a mock object using another object as a spec. Attributes on the 2655 mock will use the corresponding attribute on the `spec` object as their 2656 spec. 2657 2658 Functions or methods being mocked will have their arguments checked 2659 to check that they are called with the correct signature. 2660 2661 If `spec_set` is True then attempting to set attributes that don't exist 2662 on the spec object will raise an `AttributeError`. 2663 2664 If a class is used as a spec then the return value of the mock (the 2665 instance of the class) will have the same spec. You can use a class as the 2666 spec for an instance object by passing `instance=True`. The returned mock 2667 will only be callable if instances of the mock are callable. 2668 2669 `create_autospec` will raise a `RuntimeError` if passed some common 2670 misspellings of the arguments autospec and spec_set. Pass the argument 2671 `unsafe` with the value True to disable that check. 2672 2673 `create_autospec` also takes arbitrary keyword arguments that are passed to 2674 the constructor of the created mock.""" 2675 if _is_list(spec): 2676 # can't pass a list instance to the mock constructor as it will be 2677 # interpreted as a list of strings 2678 spec = type(spec) 2679 2680 is_type = isinstance(spec, type) 2681 if _is_instance_mock(spec): 2682 raise InvalidSpecError(f'Cannot autospec a Mock object. ' 2683 f'[object={spec!r}]') 2684 is_async_func = _is_async_func(spec) 2685 _kwargs = {'spec': spec} 2686 if spec_set: 2687 _kwargs = {'spec_set': spec} 2688 elif spec is None: 2689 # None we mock with a normal mock without a spec 2690 _kwargs = {} 2691 if _kwargs and instance: 2692 _kwargs['_spec_as_instance'] = True 2693 if not unsafe: 2694 _check_spec_arg_typos(kwargs) 2695 2696 _kwargs.update(kwargs) 2697 2698 Klass = MagicMock 2699 if inspect.isdatadescriptor(spec): 2700 # descriptors don't have a spec 2701 # because we don't know what type they return 2702 _kwargs = {} 2703 elif is_async_func: 2704 if instance: 2705 raise RuntimeError("Instance can not be True when create_autospec " 2706 "is mocking an async function") 2707 Klass = AsyncMock 2708 elif not _callable(spec): 2709 Klass = NonCallableMagicMock 2710 elif is_type and instance and not _instance_callable(spec): 2711 Klass = NonCallableMagicMock 2712 2713 _name = _kwargs.pop('name', _name) 2714 2715 _new_name = _name 2716 if _parent is None: 2717 # for a top level object no _new_name should be set 2718 _new_name = '' 2719 2720 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name, 2721 name=_name, **_kwargs) 2722 2723 if isinstance(spec, FunctionTypes): 2724 # should only happen at the top level because we don't 2725 # recurse for functions 2726 mock = _set_signature(mock, spec) 2727 if is_async_func: 2728 _setup_async_mock(mock) 2729 else: 2730 _check_signature(spec, mock, is_type, instance) 2731 2732 if _parent is not None and not instance: 2733 _parent._mock_children[_name] = mock 2734 2735 if is_type and not instance and 'return_value' not in kwargs: 2736 mock.return_value = create_autospec(spec, spec_set, instance=True, 2737 _name='()', _parent=mock) 2738 2739 for entry in dir(spec): 2740 if _is_magic(entry): 2741 # MagicMock already does the useful magic methods for us 2742 continue 2743 2744 # XXXX do we need a better way of getting attributes without 2745 # triggering code execution (?) Probably not - we need the actual 2746 # object to mock it so we would rather trigger a property than mock 2747 # the property descriptor. Likewise we want to mock out dynamically 2748 # provided attributes. 2749 # XXXX what about attributes that raise exceptions other than 2750 # AttributeError on being fetched? 2751 # we could be resilient against it, or catch and propagate the 2752 # exception when the attribute is fetched from the mock 2753 try: 2754 original = getattr(spec, entry) 2755 except AttributeError: 2756 continue 2757 2758 kwargs = {'spec': original} 2759 if spec_set: 2760 kwargs = {'spec_set': original} 2761 2762 if not isinstance(original, FunctionTypes): 2763 new = _SpecState(original, spec_set, mock, entry, instance) 2764 mock._mock_children[entry] = new 2765 else: 2766 parent = mock 2767 if isinstance(spec, FunctionTypes): 2768 parent = mock.mock 2769 2770 skipfirst = _must_skip(spec, entry, is_type) 2771 kwargs['_eat_self'] = skipfirst 2772 if iscoroutinefunction(original): 2773 child_klass = AsyncMock 2774 else: 2775 child_klass = MagicMock 2776 new = child_klass(parent=parent, name=entry, _new_name=entry, 2777 _new_parent=parent, 2778 **kwargs) 2779 mock._mock_children[entry] = new 2780 _check_signature(original, new, skipfirst=skipfirst) 2781 2782 # so functions created with _set_signature become instance attributes, 2783 # *plus* their underlying mock exists in _mock_children of the parent 2784 # mock. Adding to _mock_children may be unnecessary where we are also 2785 # setting as an instance attribute? 2786 if isinstance(new, FunctionTypes): 2787 setattr(mock, entry, new) 2788 2789 return mock 2790 2791 2792def _must_skip(spec, entry, is_type): 2793 """ 2794 Return whether we should skip the first argument on spec's `entry` 2795 attribute. 2796 """ 2797 if not isinstance(spec, type): 2798 if entry in getattr(spec, '__dict__', {}): 2799 # instance attribute - shouldn't skip 2800 return False 2801 spec = spec.__class__ 2802 2803 for klass in spec.__mro__: 2804 result = klass.__dict__.get(entry, DEFAULT) 2805 if result is DEFAULT: 2806 continue 2807 if isinstance(result, (staticmethod, classmethod)): 2808 return False 2809 elif isinstance(result, FunctionTypes): 2810 # Normal method => skip if looked up on type 2811 # (if looked up on instance, self is already skipped) 2812 return is_type 2813 else: 2814 return False 2815 2816 # function is a dynamically provided attribute 2817 return is_type 2818 2819 2820class _SpecState(object): 2821 2822 def __init__(self, spec, spec_set=False, parent=None, 2823 name=None, ids=None, instance=False): 2824 self.spec = spec 2825 self.ids = ids 2826 self.spec_set = spec_set 2827 self.parent = parent 2828 self.instance = instance 2829 self.name = name 2830 2831 2832FunctionTypes = ( 2833 # python function 2834 type(create_autospec), 2835 # instance method 2836 type(ANY.__eq__), 2837) 2838 2839 2840file_spec = None 2841open_spec = None 2842 2843 2844def _to_stream(read_data): 2845 if isinstance(read_data, bytes): 2846 return io.BytesIO(read_data) 2847 else: 2848 return io.StringIO(read_data) 2849 2850 2851def mock_open(mock=None, read_data=''): 2852 """ 2853 A helper function to create a mock to replace the use of `open`. It works 2854 for `open` called directly or used as a context manager. 2855 2856 The `mock` argument is the mock object to configure. If `None` (the 2857 default) then a `MagicMock` will be created for you, with the API limited 2858 to methods or attributes available on standard file handles. 2859 2860 `read_data` is a string for the `read`, `readline` and `readlines` of the 2861 file handle to return. This is an empty string by default. 2862 """ 2863 _read_data = _to_stream(read_data) 2864 _state = [_read_data, None] 2865 2866 def _readlines_side_effect(*args, **kwargs): 2867 if handle.readlines.return_value is not None: 2868 return handle.readlines.return_value 2869 return _state[0].readlines(*args, **kwargs) 2870 2871 def _read_side_effect(*args, **kwargs): 2872 if handle.read.return_value is not None: 2873 return handle.read.return_value 2874 return _state[0].read(*args, **kwargs) 2875 2876 def _readline_side_effect(*args, **kwargs): 2877 yield from _iter_side_effect() 2878 while True: 2879 yield _state[0].readline(*args, **kwargs) 2880 2881 def _iter_side_effect(): 2882 if handle.readline.return_value is not None: 2883 while True: 2884 yield handle.readline.return_value 2885 for line in _state[0]: 2886 yield line 2887 2888 def _next_side_effect(): 2889 if handle.readline.return_value is not None: 2890 return handle.readline.return_value 2891 return next(_state[0]) 2892 2893 global file_spec 2894 if file_spec is None: 2895 import _io 2896 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) 2897 2898 global open_spec 2899 if open_spec is None: 2900 import _io 2901 open_spec = list(set(dir(_io.open))) 2902 if mock is None: 2903 mock = MagicMock(name='open', spec=open_spec) 2904 2905 handle = MagicMock(spec=file_spec) 2906 handle.__enter__.return_value = handle 2907 2908 handle.write.return_value = None 2909 handle.read.return_value = None 2910 handle.readline.return_value = None 2911 handle.readlines.return_value = None 2912 2913 handle.read.side_effect = _read_side_effect 2914 _state[1] = _readline_side_effect() 2915 handle.readline.side_effect = _state[1] 2916 handle.readlines.side_effect = _readlines_side_effect 2917 handle.__iter__.side_effect = _iter_side_effect 2918 handle.__next__.side_effect = _next_side_effect 2919 2920 def reset_data(*args, **kwargs): 2921 _state[0] = _to_stream(read_data) 2922 if handle.readline.side_effect == _state[1]: 2923 # Only reset the side effect if the user hasn't overridden it. 2924 _state[1] = _readline_side_effect() 2925 handle.readline.side_effect = _state[1] 2926 return DEFAULT 2927 2928 mock.side_effect = reset_data 2929 mock.return_value = handle 2930 return mock 2931 2932 2933class PropertyMock(Mock): 2934 """ 2935 A mock intended to be used as a property, or other descriptor, on a class. 2936 `PropertyMock` provides `__get__` and `__set__` methods so you can specify 2937 a return value when it is fetched. 2938 2939 Fetching a `PropertyMock` instance from an object calls the mock, with 2940 no args. Setting it calls the mock with the value being set. 2941 """ 2942 def _get_child_mock(self, /, **kwargs): 2943 return MagicMock(**kwargs) 2944 2945 def __get__(self, obj, obj_type=None): 2946 return self() 2947 def __set__(self, obj, val): 2948 self(val) 2949 2950 2951def seal(mock): 2952 """Disable the automatic generation of child mocks. 2953 2954 Given an input Mock, seals it to ensure no further mocks will be generated 2955 when accessing an attribute that was not already defined. 2956 2957 The operation recursively seals the mock passed in, meaning that 2958 the mock itself, any mocks generated by accessing one of its attributes, 2959 and all assigned mocks without a name or spec will be sealed. 2960 """ 2961 mock._mock_sealed = True 2962 for attr in dir(mock): 2963 try: 2964 m = getattr(mock, attr) 2965 except AttributeError: 2966 continue 2967 if not isinstance(m, NonCallableMock): 2968 continue 2969 if isinstance(m._mock_children.get(attr), _SpecState): 2970 continue 2971 if m._mock_new_parent is mock: 2972 seal(m) 2973 2974 2975class _AsyncIterator: 2976 """ 2977 Wraps an iterator in an asynchronous iterator. 2978 """ 2979 def __init__(self, iterator): 2980 self.iterator = iterator 2981 code_mock = NonCallableMock(spec_set=CodeType) 2982 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE 2983 self.__dict__['__code__'] = code_mock 2984 2985 async def __anext__(self): 2986 try: 2987 return next(self.iterator) 2988 except StopIteration: 2989 pass 2990 raise StopAsyncIteration 2991