diff --git a/tests/conftest.py b/tests/conftest.py index 2f5fa57c445b..1f943e75c495 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,6 +10,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import hashlib import os import os.path import re @@ -109,26 +110,21 @@ def metrics(): ) -@pytest.fixture +@pytest.fixture(scope="session") def remote_addr(): - return "1.2.3.4" + return "192.0.2.1" -@pytest.fixture -def remote_addr_hashed(): - """ - Static output of `hashlib.sha256(remote_addr.encode("utf8")).hexdigest()` - Created statically to prevent needing to calculate it every run. - """ - return "6694f83c9f476da31f5df6bcc520034e7e57d421d247b9d34f49edbfc84a764c" +@pytest.fixture(scope="function") +def remote_addr_hashed(remote_addr): + if remote_addr is None: + return None + return hashlib.sha256(remote_addr.encode("utf8")).hexdigest() -@pytest.fixture -def remote_addr_salted(): - """ - Output of `hashlib.sha256((remote_addr + "pepa").encode("utf8")).hexdigest()` - """ - return "a69a49383d81404e4b1df297c7baa28e1cd6c4ee1495ed5d0ab165a63a147763" +@pytest.fixture(scope="session") +def remote_addr_salted(remote_addr): + return hashlib.sha256((remote_addr + "pepa").encode("utf8")).hexdigest() @pytest.fixture diff --git a/tests/unit/accounts/test_forms.py b/tests/unit/accounts/test_forms.py index 09401d8e1b89..a86fd0163f4f 100644 --- a/tests/unit/accounts/test_forms.py +++ b/tests/unit/accounts/test_forms.py @@ -35,9 +35,9 @@ class TestLoginForm: - def test_validate(self): + def test_validate(self, remote_addr): request = pretend.stub( - remote_addr="1.2.3.4", + remote_addr=remote_addr, banned=pretend.stub( by_ip=lambda ip_address: False, ), @@ -100,9 +100,9 @@ def test_validate_username_with_user(self, input_username, expected_username): assert user_service.find_userid.calls == [pretend.call(expected_username)] - def test_validate_password_no_user(self): + def test_validate_password_no_user(self, remote_addr): request = pretend.stub( - remote_addr="1.2.3.4", + remote_addr=remote_addr, banned=pretend.stub( by_ip=lambda ip_address: False, ), @@ -126,9 +126,11 @@ def test_validate_password_no_user(self): pretend.call("my_username"), ] - def test_validate_password_disabled_for_compromised_pw(self, db_session): + def test_validate_password_disabled_for_compromised_pw( + self, db_session, remote_addr + ): request = pretend.stub( - remote_addr="1.2.3.4", banned=pretend.stub(by_ip=lambda ip_address: False) + remote_addr=remote_addr, banned=pretend.stub(by_ip=lambda ip_address: False) ) user_service = pretend.stub( find_userid=pretend.call_recorder(lambda userid: 1), @@ -157,9 +159,9 @@ def test_validate_password_disabled_for_compromised_pw(self, db_session): ] assert user_service.is_disabled.calls == [pretend.call(1)] - def test_validate_password_ok(self): + def test_validate_password_ok(self, remote_addr): request = pretend.stub( - remote_addr="1.2.3.4", + remote_addr=remote_addr, banned=pretend.stub( by_ip=lambda ip_address: False, ), @@ -197,9 +199,9 @@ def test_validate_password_ok(self): pretend.call("pw", tags=["method:auth", "auth_method:login_form"]) ] - def test_validate_password_notok(self, db_session): + def test_validate_password_notok(self, db_session, remote_addr): request = pretend.stub( - remote_addr="1.2.3.4", + remote_addr=remote_addr, banned=pretend.stub( by_ip=lambda ip_address: False, ), @@ -240,9 +242,9 @@ def test_validate_password_notok(self, db_session): ) ] - def test_validate_password_too_many_failed(self): + def test_validate_password_too_many_failed(self, remote_addr): request = pretend.stub( - remote_addr="1.2.3.4", + remote_addr=remote_addr, banned=pretend.stub( by_ip=lambda ip_address: False, ), @@ -274,13 +276,13 @@ def test_validate_password_too_many_failed(self): assert user_service.is_disabled.calls == [] assert user_service.check_password.calls == [pretend.call(1, "pw", tags=None)] - def test_password_breached(self, monkeypatch): + def test_password_breached(self, monkeypatch, remote_addr): send_email = pretend.call_recorder(lambda *a, **kw: None) monkeypatch.setattr(forms, "send_password_compromised_email_hibp", send_email) user = pretend.stub(id=1) request = pretend.stub( - remote_addr="1.2.3.4", + remote_addr=remote_addr, banned=pretend.stub( by_ip=lambda ip_address: False, ), @@ -315,9 +317,9 @@ def test_password_breached(self, monkeypatch): ] assert send_email.calls == [pretend.call(request, user)] - def test_validate_password_ok_ip_banned(self): + def test_validate_password_ok_ip_banned(self, remote_addr): request = pretend.stub( - remote_addr="1.2.3.4", + remote_addr=remote_addr, banned=pretend.stub( by_ip=lambda ip_address: True, ), @@ -349,9 +351,9 @@ def test_validate_password_ok_ip_banned(self): assert user_service.check_password.calls == [] assert breach_service.check_password.calls == [] - def test_validate_password_notok_ip_banned(self, db_session): + def test_validate_password_notok_ip_banned(self, db_session, remote_addr): request = pretend.stub( - remote_addr="1.2.3.4", + remote_addr=remote_addr, banned=pretend.stub( by_ip=lambda ip_address: True, ), @@ -914,10 +916,10 @@ class TestTOTPAuthenticationForm: "123 456", ], ) - def test_validate(self, totp_value): + def test_validate(self, totp_value, remote_addr): user = pretend.stub(record_event=pretend.call_recorder(lambda *a, **kw: None)) get_user = pretend.call_recorder(lambda userid: user) - request = pretend.stub(remote_addr="1.2.3.4") + request = pretend.stub(remote_addr=remote_addr) form = forms.TOTPAuthenticationForm( formdata=MultiDict({"totp_value": totp_value}), @@ -937,10 +939,12 @@ def test_validate(self, totp_value): ("1 2 3 4 5 6 7", "TOTP code must be 6 digits."), ], ) - def test_totp_secret_not_valid(self, pyramid_config, totp_value, expected_error): + def test_totp_secret_not_valid( + self, pyramid_config, totp_value, expected_error, remote_addr + ): user = pretend.stub(record_event=pretend.call_recorder(lambda *a, **kw: None)) get_user = pretend.call_recorder(lambda userid: user) - request = pretend.stub(remote_addr="1.2.3.4") + request = pretend.stub(remote_addr=remote_addr) form = forms.TOTPAuthenticationForm( formdata=MultiDict({"totp_value": totp_value}), @@ -961,11 +965,11 @@ def test_totp_secret_not_valid(self, pyramid_config, totp_value, expected_error) ], ) def test_totp_secret_raises( - self, pyramid_config, exception, expected_error, reason + self, pyramid_config, exception, expected_error, reason, remote_addr ): user = pretend.stub(record_event=pretend.call_recorder(lambda *a, **kw: None)) get_user = pretend.call_recorder(lambda userid: user) - request = pretend.stub(remote_addr="1.2.3.4") + request = pretend.stub(remote_addr=remote_addr) user_service = pretend.stub( check_totp_value=pretend.raiser(exception), @@ -1099,8 +1103,8 @@ def test_validate(self): class TestRecoveryCodeForm: - def test_validate(self, monkeypatch): - request = pretend.stub(remote_addr="1.2.3.4") + def test_validate(self, monkeypatch, remote_addr): + request = pretend.stub(remote_addr=remote_addr) user = pretend.stub(id=pretend.stub(), username="foobar") user_service = pretend.stub( check_recovery_code=pretend.call_recorder(lambda *a, **kw: True), diff --git a/tests/unit/accounts/test_security_policy.py b/tests/unit/accounts/test_security_policy.py index ce7c792ed111..cb02b1dae302 100644 --- a/tests/unit/accounts/test_security_policy.py +++ b/tests/unit/accounts/test_security_policy.py @@ -99,12 +99,12 @@ def test_identity_credentials_fail(self, monkeypatch): pretend.stub( matched_route=None, banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", + remote_addr="192.0.2.1", ), pretend.stub( matched_route=pretend.stub(name="an.invalid.route"), banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", + remote_addr="192.0.2.1", ), ], ) @@ -186,7 +186,7 @@ def test_forget_and_remember(self, monkeypatch): pretend.call(request, userid, foo=None) ] - def test_identity_missing_route(self, monkeypatch): + def test_identity_missing_route(self, monkeypatch, remote_addr): session_helper_obj = pretend.stub() session_helper_cls = pretend.call_recorder(lambda: session_helper_obj) monkeypatch.setattr( @@ -203,7 +203,7 @@ def test_identity_missing_route(self, monkeypatch): add_response_callback=pretend.call_recorder(lambda cb: None), matched_route=None, banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", + remote_addr=remote_addr, ) assert policy.identity(request) is None @@ -220,7 +220,7 @@ def test_identity_missing_route(self, monkeypatch): "api.echo", ], ) - def test_identity_invalid_route(self, route_name, monkeypatch): + def test_identity_invalid_route(self, route_name, monkeypatch, remote_addr): session_helper_obj = pretend.stub() session_helper_cls = pretend.call_recorder(lambda: session_helper_obj) monkeypatch.setattr( @@ -237,7 +237,7 @@ def test_identity_invalid_route(self, route_name, monkeypatch): add_response_callback=pretend.call_recorder(lambda cb: None), matched_route=pretend.stub(name=route_name), banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", + remote_addr=remote_addr, ) assert policy.identity(request) is None @@ -247,7 +247,7 @@ def test_identity_invalid_route(self, route_name, monkeypatch): assert add_vary_cb.calls == [pretend.call("Cookie")] assert request.add_response_callback.calls == [pretend.call(vary_cb)] - def test_identity_no_userid(self, monkeypatch): + def test_identity_no_userid(self, monkeypatch, remote_addr): session_helper_obj = pretend.stub( authenticated_userid=pretend.call_recorder(lambda r: None) ) @@ -266,7 +266,7 @@ def test_identity_no_userid(self, monkeypatch): add_response_callback=pretend.call_recorder(lambda cb: None), matched_route=pretend.stub(name="a.permitted.route"), banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", + remote_addr=remote_addr, ) assert policy.identity(request) is None @@ -277,7 +277,7 @@ def test_identity_no_userid(self, monkeypatch): assert add_vary_cb.calls == [pretend.call("Cookie")] assert request.add_response_callback.calls == [pretend.call(vary_cb)] - def test_identity_no_user(self, monkeypatch): + def test_identity_no_user(self, monkeypatch, remote_addr): userid = pretend.stub() session_helper_obj = pretend.stub( authenticated_userid=pretend.call_recorder(lambda r: userid) @@ -299,7 +299,7 @@ def test_identity_no_user(self, monkeypatch): matched_route=pretend.stub(name="a.permitted.route"), find_service=pretend.call_recorder(lambda i, **kw: user_service), banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", + remote_addr=remote_addr, ) assert policy.identity(request) is None @@ -312,7 +312,7 @@ def test_identity_no_user(self, monkeypatch): assert add_vary_cb.calls == [pretend.call("Cookie")] assert request.add_response_callback.calls == [pretend.call(vary_cb)] - def test_identity_password_outdated(self, monkeypatch): + def test_identity_password_outdated(self, monkeypatch, remote_addr): userid = pretend.stub() session_helper_obj = pretend.stub( authenticated_userid=pretend.call_recorder(lambda r: userid) @@ -345,7 +345,7 @@ def test_identity_password_outdated(self, monkeypatch): flash=pretend.call_recorder(lambda *a, **kw: None), ), banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", + remote_addr=remote_addr, ) assert policy.identity(request) is None @@ -364,7 +364,7 @@ def test_identity_password_outdated(self, monkeypatch): assert add_vary_cb.calls == [pretend.call("Cookie")] assert request.add_response_callback.calls == [pretend.call(vary_cb)] - def test_identity_is_disabled(self, monkeypatch): + def test_identity_is_disabled(self, monkeypatch, remote_addr): userid = pretend.stub() session_helper_obj = pretend.stub( authenticated_userid=pretend.call_recorder(lambda r: userid) @@ -397,7 +397,7 @@ def test_identity_is_disabled(self, monkeypatch): flash=pretend.call_recorder(lambda *a, **kw: None), ), banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", + remote_addr=remote_addr, ) assert policy.identity(request) is None @@ -417,7 +417,7 @@ def test_identity_is_disabled(self, monkeypatch): assert add_vary_cb.calls == [pretend.call("Cookie")] assert request.add_response_callback.calls == [pretend.call(vary_cb)] - def test_identity(self, monkeypatch): + def test_identity(self, monkeypatch, remote_addr): userid = pretend.stub() session_helper_obj = pretend.stub( authenticated_userid=pretend.call_recorder(lambda r: userid) @@ -448,7 +448,7 @@ def test_identity(self, monkeypatch): password_outdated=pretend.call_recorder(lambda ts: False) ), banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", + remote_addr=remote_addr, ) assert policy.identity(request) is user @@ -463,7 +463,7 @@ def test_identity(self, monkeypatch): assert add_vary_cb.calls == [pretend.call("Cookie")] assert request.add_response_callback.calls == [pretend.call(vary_cb)] - def test_identity_ip_banned(self, monkeypatch): + def test_identity_ip_banned(self, monkeypatch, remote_addr): userid = pretend.stub() session_helper_obj = pretend.stub( authenticated_userid=pretend.call_recorder(lambda r: userid) @@ -493,7 +493,7 @@ def test_identity_ip_banned(self, monkeypatch): password_outdated=pretend.call_recorder(lambda ts: False) ), banned=pretend.stub(by_ip=lambda ip_address: True), - remote_addr="1.2.3.4", + remote_addr=remote_addr, ) assert policy.identity(request) is None diff --git a/tests/unit/accounts/test_views.py b/tests/unit/accounts/test_views.py index bc2a80abadd3..21c3d659b020 100644 --- a/tests/unit/accounts/test_views.py +++ b/tests/unit/accounts/test_views.py @@ -2548,7 +2548,7 @@ def test_verify_organization_role( db_request.method = "POST" db_request.GET.update({"token": "RANDOM_KEY"}) db_request.route_path = pretend.call_recorder(lambda *a, **kw: "/") - db_request.remote_addr = "192.168.1.1" + db_request.remote_addr = "198.51.100.1" db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) token_service.loads = pretend.call_recorder( lambda token: { @@ -2687,7 +2687,7 @@ def test_verify_organization_role_revoked(self, db_request, token_service): db_request.method = "POST" db_request.GET.update({"token": "RANDOM_KEY"}) db_request.route_path = pretend.call_recorder(lambda name: "/") - db_request.remote_addr = "192.168.1.1" + db_request.remote_addr = "198.51.100.1" db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) token_service.loads = pretend.call_recorder( lambda token: { @@ -2733,7 +2733,7 @@ def test_verify_organization_role_declined( {"token": "RANDOM_KEY", "decline": "Decline", "message": message} ) db_request.route_path = pretend.call_recorder(lambda name: "/") - db_request.remote_addr = "192.168.1.1" + db_request.remote_addr = "198.51.100.1" db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) token_service.loads = pretend.call_recorder( lambda token: { @@ -2805,7 +2805,7 @@ def test_verify_fails_with_different_user(self, db_request, token_service): db_request.method = "POST" db_request.GET.update({"token": "RANDOM_KEY"}) db_request.route_path = pretend.call_recorder(lambda name: "/") - db_request.remote_addr = "192.168.1.1" + db_request.remote_addr = "198.51.100.1" db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) token_service.loads = pretend.call_recorder( lambda token: { @@ -2843,7 +2843,7 @@ def test_verify_role_get_confirmation(self, db_request, token_service): db_request.method = "GET" db_request.GET.update({"token": "RANDOM_KEY"}) db_request.route_path = pretend.call_recorder(lambda name: "/") - db_request.remote_addr = "192.168.1.1" + db_request.remote_addr = "198.51.100.1" db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) token_service.loads = pretend.call_recorder( lambda token: { @@ -2878,7 +2878,7 @@ def test_verify_project_role( db_request.method = "POST" db_request.GET.update({"token": "RANDOM_KEY"}) db_request.route_path = pretend.call_recorder(lambda *a, **kw: "/") - db_request.remote_addr = "192.168.1.1" + db_request.remote_addr = "198.51.100.1" token_service.loads = pretend.call_recorder( lambda token: { "action": "email-project-role-verify", @@ -3024,7 +3024,7 @@ def test_verify_project_role_revoked(self, db_request, user_service, token_servi db_request.method = "POST" db_request.GET.update({"token": "RANDOM_KEY"}) db_request.route_path = pretend.call_recorder(lambda name: "/") - db_request.remote_addr = "192.168.1.1" + db_request.remote_addr = "198.51.100.1" token_service.loads = pretend.call_recorder( lambda token: { "action": "email-project-role-verify", @@ -3064,7 +3064,7 @@ def test_verify_project_role_declined( db_request.method = "POST" db_request.POST.update({"token": "RANDOM_KEY", "decline": "Decline"}) db_request.route_path = pretend.call_recorder(lambda name: "/") - db_request.remote_addr = "192.168.1.1" + db_request.remote_addr = "198.51.100.1" token_service.loads = pretend.call_recorder( lambda token: { "action": "email-project-role-verify", @@ -3105,7 +3105,7 @@ def test_verify_fails_with_different_user( db_request.method = "POST" db_request.GET.update({"token": "RANDOM_KEY"}) db_request.route_path = pretend.call_recorder(lambda name: "/") - db_request.remote_addr = "192.168.1.1" + db_request.remote_addr = "198.51.100.1" token_service.loads = pretend.call_recorder( lambda token: { "action": "email-project-role-verify", @@ -3142,7 +3142,7 @@ def test_verify_role_get_confirmation( db_request.method = "GET" db_request.GET.update({"token": "RANDOM_KEY"}) db_request.route_path = pretend.call_recorder(lambda name: "/") - db_request.remote_addr = "192.168.1.1" + db_request.remote_addr = "198.51.100.1" token_service.loads = pretend.call_recorder( lambda token: { "action": "email-project-role-verify", diff --git a/tests/unit/admin/test_bans.py b/tests/unit/admin/test_bans.py index 05fc68198163..36a323312edb 100644 --- a/tests/unit/admin/test_bans.py +++ b/tests/unit/admin/test_bans.py @@ -21,7 +21,7 @@ class TestAdminFlag: def test_no_ip_not_banned(self, db_request): - assert not db_request.banned.by_ip("4.3.2.1") + assert not db_request.banned.by_ip("192.0.2.2") def test_with_ip_not_banned(self, db_request): assert not db_request.banned.by_ip(db_request.ip_address.ip_address) diff --git a/tests/unit/email/test_init.py b/tests/unit/email/test_init.py index 58547adb69fb..714eb22fa1da 100644 --- a/tests/unit/email/test_init.py +++ b/tests/unit/email/test_init.py @@ -66,11 +66,11 @@ def test_compute_recipient(user, address, expected): @pytest.mark.parametrize( ("unauthenticated_userid", "user", "remote_addr", "expected"), [ - ("the_users_id", None, "1.2.3.4", False), - ("some_other_id", None, "1.2.3.4", True), - (None, pretend.stub(id="the_users_id"), "1.2.3.4", False), - (None, pretend.stub(id="some_other_id"), "1.2.3.4", True), - (None, None, "1.2.3.4", False), + ("the_users_id", None, "192.0.2.1", False), + ("some_other_id", None, "192.0.2.1", True), + (None, pretend.stub(id="the_users_id"), "192.0.2.1", False), + (None, pretend.stub(id="some_other_id"), "192.0.2.1", True), + (None, None, "192.0.2.1", False), (None, None, "127.0.0.1", True), ], ) @@ -143,7 +143,7 @@ def test_sends_to_user_with_verified( ) pyramid_request.user = user pyramid_request.registry.settings = {"mail.sender": "noreply@example.com"} - pyramid_request.remote_addr = "10.69.10.69" + pyramid_request.remote_addr = "198.51.100.69" if address is not None: address = pretend.stub(email=address, verified=True) diff --git a/tests/unit/ip_addresses/test_models.py b/tests/unit/ip_addresses/test_models.py index 6e386caee423..96f025c34ae7 100644 --- a/tests/unit/ip_addresses/test_models.py +++ b/tests/unit/ip_addresses/test_models.py @@ -24,7 +24,7 @@ class TestIpAddress: def test_repr(self, db_request): ip_address = db_request.ip_address assert isinstance(repr(ip_address), str) - assert repr(ip_address) == "1.2.3.4" + assert repr(ip_address) == "192.0.2.1" def test_invalid_transformed(self, db_request): ip_address = DBIpAddressFactory(ip_address="wutang") @@ -33,21 +33,21 @@ def test_invalid_transformed(self, db_request): @pytest.mark.parametrize( "kwargs", [ - {"ip_address": "1.2.3.4", "is_banned": True}, + {"ip_address": "192.0.2.1", "is_banned": True}, { - "ip_address": "1.2.3.4", + "ip_address": "192.0.2.1", "is_banned": True, "ban_reason": BanReason.AUTHENTICATION_ATTEMPTS, }, - {"ip_address": "1.2.3.4", "is_banned": True, "ban_date": sql.func.now()}, + {"ip_address": "192.0.2.1", "is_banned": True, "ban_date": sql.func.now()}, { - "ip_address": "1.2.3.4", + "ip_address": "192.0.2.1", "is_banned": False, "ban_reason": BanReason.AUTHENTICATION_ATTEMPTS, }, - {"ip_address": "1.2.3.4", "is_banned": False, "ban_date": sql.func.now()}, + {"ip_address": "192.0.2.1", "is_banned": False, "ban_date": sql.func.now()}, { - "ip_address": "1.2.3.4", + "ip_address": "192.0.2.1", "is_banned": False, "ban_reason": BanReason.AUTHENTICATION_ATTEMPTS, "ban_date": sql.func.now(), diff --git a/tests/unit/manage/test_forms.py b/tests/unit/manage/test_forms.py index f58bb61754bc..ff5f1202cc3f 100644 --- a/tests/unit/manage/test_forms.py +++ b/tests/unit/manage/test_forms.py @@ -290,9 +290,9 @@ class TestDeleteTOTPForm: Covers ConfirmPasswordForm """ - def test_validate_confirm_password(self): + def test_validate_confirm_password(self, remote_addr): request = pretend.stub( - remote_addr="1.2.3.4", banned=pretend.stub(by_ip=lambda ip_address: False) + remote_addr=remote_addr, banned=pretend.stub(by_ip=lambda ip_address: False) ) user_service = pretend.stub( find_userid=pretend.call_recorder(lambda userid: 1), @@ -646,7 +646,7 @@ def test_validate(self): assert form.user_service is user_service assert form.validate(), str(form.errors) - def test_validate_macaroon_id_invalid(self): + def test_validate_macaroon_id_invalid(self, remote_addr): macaroon_service = pretend.stub( find_macaroon=pretend.call_recorder(lambda id: None) ) @@ -654,7 +654,7 @@ def test_validate_macaroon_id_invalid(self): find_userid=lambda *a, **kw: 1, check_password=lambda *a, **kw: True ) request = pretend.stub( - remote_addr="1.2.3.4", banned=pretend.stub(by_ip=lambda ip_address: False) + remote_addr=remote_addr, banned=pretend.stub(by_ip=lambda ip_address: False) ) form = forms.DeleteMacaroonForm( formdata=MultiDict({"macaroon_id": pretend.stub(), "password": "password"}), @@ -667,7 +667,7 @@ def test_validate_macaroon_id_invalid(self): assert not form.validate() assert form.macaroon_id.errors.pop() == "No such macaroon" - def test_validate_macaroon_id(self): + def test_validate_macaroon_id(self, remote_addr): macaroon_service = pretend.stub( find_macaroon=pretend.call_recorder(lambda id: pretend.stub()) ) @@ -675,7 +675,7 @@ def test_validate_macaroon_id(self): find_userid=lambda *a, **kw: 1, check_password=lambda *a, **kw: True ) request = pretend.stub( - remote_addr="1.2.3.4", banned=pretend.stub(by_ip=lambda ip_address: False) + remote_addr=remote_addr, banned=pretend.stub(by_ip=lambda ip_address: False) ) form = forms.DeleteMacaroonForm( formdata=MultiDict( diff --git a/tests/unit/manage/test_views.py b/tests/unit/manage/test_views.py index fa05e94e22d9..01c230f5f381 100644 --- a/tests/unit/manage/test_views.py +++ b/tests/unit/manage/test_views.py @@ -4961,7 +4961,7 @@ def test_reinvite_role_after_expiration(self, monkeypatch, db_request): ) db_request.method = "POST" db_request.POST = pretend.stub() - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_1 form_obj = pretend.stub( validate=pretend.call_recorder(lambda: True), @@ -5115,7 +5115,7 @@ def test_cannot_reinvite_role(self, db_request): ) db_request.method = "POST" db_request.POST = pretend.stub() - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_1 form_obj = pretend.stub( validate=pretend.call_recorder(lambda: True), @@ -5174,7 +5174,7 @@ def test_revoke_invitation(self, db_request, token_service): ) db_request.method = "POST" db_request.POST = MultiDict({"user_id": user.id, "token": "TOKEN"}) - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_user db_request.route_path = pretend.call_recorder( lambda *a, **kw: "/manage/projects" @@ -5225,7 +5225,7 @@ def test_invitation_does_not_exist(self, db_request, token_service): ) db_request.method = "POST" db_request.POST = MultiDict({"user_id": user.id, "token": "TOKEN"}) - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_user db_request.route_path = pretend.call_recorder( lambda *a, **kw: "/manage/projects" @@ -5263,7 +5263,7 @@ def test_token_expired(self, db_request, token_service): ) db_request.method = "POST" db_request.POST = MultiDict({"user_id": user.id, "token": "TOKEN"}) - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_user db_request.route_path = pretend.call_recorder( lambda *a, **kw: "/manage/projects/roles" diff --git a/tests/unit/manage/views/test_organizations.py b/tests/unit/manage/views/test_organizations.py index b830f8ab765b..aada8896dc2d 100644 --- a/tests/unit/manage/views/test_organizations.py +++ b/tests/unit/manage/views/test_organizations.py @@ -1983,7 +1983,7 @@ def test_cannot_reinvite_organization_role( db_request.method = "POST" db_request.POST = pretend.stub() - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_1 db_request.session = pretend.stub( flash=pretend.call_recorder(lambda *a, **kw: None) @@ -2050,7 +2050,7 @@ def test_reinvite_organization_role_after_expiration( db_request.method = "POST" db_request.POST = pretend.stub() - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_1 db_request.session = pretend.stub( flash=pretend.call_recorder(lambda *a, **kw: None) @@ -2185,7 +2185,7 @@ def test_resend_invitation( db_request.method = "POST" db_request.POST = MultiDict({"user_id": user.id}) - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_user db_request.route_path = pretend.call_recorder( lambda *a, **kw: "/manage/organizations" @@ -2283,7 +2283,7 @@ def test_resend_invitation_fails_corrupt_token( db_request.method = "POST" db_request.POST = MultiDict({"user_id": user.id}) - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_user db_request.route_path = pretend.call_recorder( lambda *a, **kw: "/manage/organizations" @@ -2328,7 +2328,7 @@ def test_resend_invitation_fails_missing_invitation( db_request.method = "POST" db_request.POST = MultiDict({"user_id": user.id}) - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_user db_request.route_path = pretend.call_recorder( lambda *a, **kw: "/manage/organizations" @@ -2377,7 +2377,7 @@ def test_revoke_invitation( db_request.method = "POST" db_request.POST = MultiDict({"user_id": user.id, "token": "TOKEN"}) - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_user db_request.route_path = pretend.call_recorder( lambda *a, **kw: "/manage/organizations" @@ -2456,7 +2456,7 @@ def test_invitation_does_not_exist( db_request.method = "POST" db_request.POST = MultiDict({"user_id": user.id, "token": "TOKEN"}) - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_user db_request.route_path = pretend.call_recorder( lambda *a, **kw: "/manage/organizations" @@ -2491,7 +2491,7 @@ def test_token_expired(self, db_request, token_service, enable_organizations): db_request.method = "POST" db_request.POST = MultiDict({"user_id": user.id, "token": "TOKEN"}) - db_request.remote_addr = "10.10.10.10" + db_request.remote_addr = "198.51.100.2" db_request.user = owner_user db_request.route_path = pretend.call_recorder( lambda *a, **kw: "/manage/organizations/roles" diff --git a/tests/unit/utils/test_wsgi.py b/tests/unit/utils/test_wsgi.py index 3dff7aab719d..1a13a3345f84 100644 --- a/tests/unit/utils/test_wsgi.py +++ b/tests/unit/utils/test_wsgi.py @@ -10,6 +10,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import hashlib + import pretend import pytest import sentry_sdk @@ -24,6 +26,12 @@ from ...common.db.ip_addresses import IpAddressFactory as DBIpAddressFactory +def calculate_hashed_value(remote_addr, salt="pepa"): + if salt is not None: + return hashlib.sha256((remote_addr + salt).encode("utf8")).hexdigest() + return remote_addr + + class TestProxyFixer: def test_skips_headers(self): response = pretend.stub() @@ -32,7 +40,7 @@ def test_skips_headers(self): environ = { "HTTP_WAREHOUSE_TOKEN": "NOPE", "HTTP_WAREHOUSE_PROTO": "http", - "HTTP_WAREHOUSE_IP": "1.2.3.4", + "HTTP_WAREHOUSE_IP": "192.0.2.1", "HTTP_WAREHOUSE_HOST": "example.com", } start_response = pretend.stub() @@ -77,7 +85,7 @@ def test_accepts_warehouse_headers(self): environ = { "HTTP_WAREHOUSE_TOKEN": "1234", "HTTP_WAREHOUSE_PROTO": "http", - "HTTP_WAREHOUSE_IP": "1.2.3.4", + "HTTP_WAREHOUSE_IP": "192.0.2.1", "HTTP_WAREHOUSE_HASHED_IP": "hashbrowns", "HTTP_WAREHOUSE_HOST": "example.com", "HTTP_WAREHOUSE_CITY": "Anytown, ST", @@ -92,7 +100,7 @@ def test_accepts_warehouse_headers(self): assert app.calls == [ pretend.call( { - "REMOTE_ADDR": "1.2.3.4", + "REMOTE_ADDR": "192.0.2.1", "REMOTE_ADDR_HASHED": "hashbrowns", "HTTP_HOST": "example.com", "GEOIP_CITY": "Anytown, ST", @@ -130,13 +138,14 @@ def test_accepts_x_forwarded_headers(self, remote_addr_salted): resp = wsgi.ProxyFixer(app, token=None, ip_salt="pepa")(environ, start_response) + expected_remote_addr_hashed = calculate_hashed_value("1.2.3.4", "pepa") assert resp is response assert app.calls == [ pretend.call( { "HTTP_SOME_OTHER_HEADER": "woop", "REMOTE_ADDR": "1.2.3.4", - "REMOTE_ADDR_HASHED": remote_addr_salted, + "REMOTE_ADDR_HASHED": expected_remote_addr_hashed, "HTTP_HOST": "example.com", "wsgi.url_scheme": "http", }, @@ -148,7 +157,10 @@ def test_skips_x_forwarded_when_not_enough(self): response = pretend.stub() app = pretend.call_recorder(lambda e, s: response) - environ = {"HTTP_X_FORWARDED_FOR": "1.2.3.4", "HTTP_SOME_OTHER_HEADER": "woop"} + environ = { + "HTTP_X_FORWARDED_FOR": "192.0.2.1", + "HTTP_SOME_OTHER_HEADER": "woop", + } start_response = pretend.stub() resp = wsgi.ProxyFixer(app, token=None, ip_salt=None, num_proxies=2)( @@ -176,13 +188,14 @@ def test_selects_right_x_forwarded_value(self, remote_addr_salted): environ, start_response ) + expected_remote_addr_hashed = calculate_hashed_value("1.2.3.4", "pepa") assert resp is response assert app.calls == [ pretend.call( { "HTTP_SOME_OTHER_HEADER": "woop", "REMOTE_ADDR": "1.2.3.4", - "REMOTE_ADDR_HASHED": remote_addr_salted, + "REMOTE_ADDR_HASHED": expected_remote_addr_hashed, "HTTP_HOST": "example.com", "wsgi.url_scheme": "http", },