From 0ee58ac2b38454d93e6a8e34868798206c7520ee Mon Sep 17 00:00:00 2001 From: Chris Wagner Date: Fri, 13 Oct 2023 18:26:28 -0700 Subject: [PATCH] Fix form.next for newer authentication mechanisms - including two-factor. Propagating 'next' when part of the request query string was working - but FS also supported 'next' as a hidden field in some endpoints such as /login and /register. form.next wasn't being propagated with 2FA. Also - newer authentication endpoints - /us-signin /wan-signin didn't have the 'next' hidden field. Also - there were NO tests for form.next. Fixed all that, added tests, combined all 'next' calculations in a single utility. closes #853 --- CHANGES.rst | 1 + flask_security/oauth_glue.py | 2 +- flask_security/tf_plugin.py | 8 +-- flask_security/unified_signin.py | 20 ++++---- flask_security/utils.py | 84 ++++++++++++++++---------------- flask_security/views.py | 11 +++-- flask_security/webauthn.py | 11 +++-- tests/test_two_factor.py | 12 +++++ tests/test_unified_signin.py | 21 ++++++++ tests/test_webauthn.py | 18 +++++++ 10 files changed, 125 insertions(+), 63 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index afbbdb65..61195048 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -23,6 +23,7 @@ Fixes - (:issue:`826`) Fix error in quickstart (codycollier) - (:pr:`835`) Update Armenian translations (amkrtchyan-tmp) - (:pr:`831`) Update German translations. (sr-verde) +- (:issue:`853`) Fix 'next' propagation when passed as form.next (thanks cariaso) Version 5.3.0 ------------- diff --git a/flask_security/oauth_glue.py b/flask_security/oauth_glue.py index b814b562..324bc993 100644 --- a/flask_security/oauth_glue.py +++ b/flask_security/oauth_glue.py @@ -115,7 +115,7 @@ def oauthresponse(name: str) -> "ResponseValue": if user: after_this_request(view_commit) response = _security.two_factor_plugins.tf_enter( - user, False, "oauth", next_loc=propagate_next(request.url) + user, False, "oauth", next_loc=propagate_next(request.url, None) ) if response: return response diff --git a/flask_security/tf_plugin.py b/flask_security/tf_plugin.py index 65555665..1172a66b 100644 --- a/flask_security/tf_plugin.py +++ b/flask_security/tf_plugin.py @@ -4,7 +4,7 @@ Flask-Security Two-Factor Plugin Module - :copyright: (c) 2022-2022 by J. Christopher Wagner (jwag). + :copyright: (c) 2022-2023 by J. Christopher Wagner (jwag). :license: MIT, see LICENSE for more details. TODO: @@ -58,7 +58,9 @@ def __init__(self, *args, **kwargs): def tf_select() -> "ResponseValue": # Ask user which MFA method they want to use. # This is used when a user has setup more than one type of 2FA. - form = build_form_from_request("two_factor_select_form") + form = t.cast( + TwoFactorSelectForm, build_form_from_request("two_factor_select_form") + ) # This endpoint is unauthenticated - make sure we're in a valid state if not all(k in session for k in ["tf_user_id", "tf_select"]): @@ -81,7 +83,7 @@ def tf_select() -> "ResponseValue": if tf_impl: json_payload = {"tf_required": True} response = tf_impl.tf_login( - user, json_payload, next_loc=propagate_next(request.url) + user, json_payload, next_loc=propagate_next(request.url, None) ) if not response: # pragma no cover # This really can't happen unless between the time the started logging in diff --git a/flask_security/unified_signin.py b/flask_security/unified_signin.py index 63784387..68a2a871 100644 --- a/flask_security/unified_signin.py +++ b/flask_security/unified_signin.py @@ -4,7 +4,7 @@ Flask-Security Unified Signin module - :copyright: (c) 2019-2022 by J. Christopher Wagner (jwag). + :copyright: (c) 2019-2023 by J. Christopher Wagner (jwag). :license: MIT, see LICENSE for more details. This implements a unified sign in endpoint - allowing @@ -24,7 +24,6 @@ fact allow the user to add a password. Is that sufficient? - This also means that there is no way to REMOVE your password once it is setup, although user can register without one. - - Any reason to support 'next' in form? xx?next=yyy works fine. - separate code validation times for SMS, email, authenticator? - token versus code versus passcode? Confusing terminology. @@ -52,6 +51,7 @@ from .decorators import anonymous_user_required, auth_required, unauth_csrf from .forms import ( Form, + NextFormMixin, Required, build_form_from_request, build_form, @@ -231,7 +231,7 @@ def validate2(self) -> bool: return False # pragma: no cover -class UnifiedSigninForm(_UnifiedPassCodeForm): +class UnifiedSigninForm(_UnifiedPassCodeForm, NextFormMixin): """A unified login form For either identity/password or request and enter code. """ @@ -414,7 +414,7 @@ def us_signin_send_code() -> "ResponseValue": This takes an identity (as configured in USER_IDENTITY_ATTRIBUTES) and a method request to send a code. """ - form: UnifiedSigninForm = build_form_from_request("us_signin_form") + form = t.cast(UnifiedSigninForm, build_form_from_request("us_signin_form")) form.submit_send_code.data = True form.submit.data = False @@ -485,7 +485,7 @@ def us_verify_send_code() -> "ResponseValue": """ Send code during verify. POST only. """ - form: UnifiedVerifyForm = build_form_from_request("us_verify_form") + form = t.cast(UnifiedVerifyForm, build_form_from_request("us_verify_form")) form.submit_send_code.data = True form.submit.data = False @@ -557,7 +557,7 @@ def us_signin() -> "ResponseValue": else: return redirect(get_post_login_redirect()) - form: UnifiedSigninForm = build_form_from_request("us_signin_form") + form = t.cast(UnifiedSigninForm, build_form_from_request("us_signin_form")) form.submit.data = True form.submit_send_code.data = False @@ -570,7 +570,7 @@ def us_signin() -> "ResponseValue": form.user, remember_me, form.authn_via, - next_loc=propagate_next(request.url), + next_loc=propagate_next(request.url, form), ) if response: return response @@ -632,7 +632,7 @@ def us_verify() -> "ResponseValue": will have filled in ?next=xxx - which we want to carefully not lose as we go through these steps. """ - form: UnifiedVerifyForm = build_form_from_request("us_verify_form") + form = t.cast(UnifiedVerifyForm, build_form_from_request("us_verify_form")) form.submit.data = True form.submit_send_code.data = False @@ -736,7 +736,7 @@ def us_verify_link() -> "ResponseValue": ) ) response = _security.two_factor_plugins.tf_enter( - user, False, "email", next_loc=propagate_next(request.url) + user, False, "email", next_loc=propagate_next(request.url, None) ) if response: return response @@ -769,7 +769,7 @@ def us_setup() -> "ResponseValue": use a timed signed token to pass along state. GET - retrieve current info (json) or form. """ - form: UnifiedSigninSetupForm = build_form_from_request("us_setup_form") + form = t.cast(UnifiedSigninSetupForm, build_form_from_request("us_setup_form")) setup_methods = _compute_setup_methods() active_methods = _compute_active_methods(current_user) diff --git a/flask_security/utils.py b/flask_security/utils.py index dea57cdc..a49090f7 100644 --- a/flask_security/utils.py +++ b/flask_security/utils.py @@ -16,7 +16,7 @@ import hmac import time import typing as t -from urllib.parse import parse_qsl, parse_qs, urlsplit, urlunsplit, urlencode +from urllib.parse import parse_qsl, urlsplit, urlunsplit, urlencode import urllib.request import urllib.error import warnings @@ -36,10 +36,11 @@ from flask_login import current_user from flask_login import COOKIE_NAME as REMEMBER_COOKIE_NAME from flask_principal import AnonymousIdentity, Identity, identity_changed, Need -from flask_wtf import csrf +from flask_wtf import csrf, FlaskForm from wtforms import ValidationError from itsdangerous import BadSignature, SignatureExpired from werkzeug.local import LocalProxy +from werkzeug.datastructures import MultiDict from .quart_compat import best, get_quart_status from .proxies import _security, _datastore, _pwd_context, _hashing_context @@ -48,7 +49,6 @@ if t.TYPE_CHECKING: # pragma: no cover from flask import Flask, Response from flask.typing import ResponseValue - from flask_wtf import FlaskForm from .datastore import User SB = t.Union[str, bytes] @@ -547,46 +547,27 @@ def validate_redirect_url(url: str) -> bool: return True -def get_post_action_redirect(config_key: str, declared: t.Optional[str] = None) -> str: - # All this nonsense due to mypy - arg_next_url = None - arg_next = request.args.get("next", None) - if arg_next: - arg_next_url = get_url(arg_next) - form_next_url = None - form_next = request.form.get("next", None) - if form_next: - form_next_url = get_url(form_next) - urls = [ - arg_next_url, - form_next_url, - find_redirect(config_key), - ] - if declared: - urls.insert(0, declared) - for url in urls: - if url and validate_redirect_url(url): - return url - raise ValueError("No valid redirect URL found - configuration error") +def get_post_action_redirect(config_key: str) -> str: + return propagate_next(find_redirect(config_key), request.form) -def get_post_login_redirect(declared: t.Optional[str] = None) -> str: - return get_post_action_redirect("SECURITY_POST_LOGIN_VIEW", declared) +def get_post_login_redirect() -> str: + return get_post_action_redirect("SECURITY_POST_LOGIN_VIEW") -def get_post_register_redirect(declared: t.Optional[str] = None) -> str: - return get_post_action_redirect("SECURITY_POST_REGISTER_VIEW", declared) +def get_post_register_redirect() -> str: + return get_post_action_redirect("SECURITY_POST_REGISTER_VIEW") -def get_post_logout_redirect(declared: t.Optional[str] = None) -> str: - return get_post_action_redirect("SECURITY_POST_LOGOUT_VIEW", declared) +def get_post_logout_redirect() -> str: + return get_post_action_redirect("SECURITY_POST_LOGOUT_VIEW") -def get_post_verify_redirect(declared: t.Optional[str] = None) -> str: - return get_post_action_redirect("SECURITY_POST_VERIFY_VIEW", declared) +def get_post_verify_redirect() -> str: + return get_post_action_redirect("SECURITY_POST_VERIFY_VIEW") -def find_redirect(key: str) -> t.Optional[str]: +def find_redirect(key: str) -> str: """Returns the URL to redirect to after a user logs in successfully. :param key: The session or application configuration key to search for @@ -599,17 +580,38 @@ def find_redirect(key: str) -> t.Optional[str]: app_url = None if app_value: app_url = get_url(app_value) - rv = session_url or app_url or current_app.config.get("APPLICATION_ROOT", "/") + rv = session_url or app_url or str(current_app.config.get("APPLICATION_ROOT", "/")) return rv -def propagate_next(url: str) -> str: - # return either URL or, if URL already has a ?next=xx, return that. - url_next = urlsplit(url) - qparams = parse_qs(url_next.query) - if "next" in qparams: - return qparams["next"][0] - return url +def propagate_next( + fallback_url: str, form: t.Optional[t.Union[FlaskForm, MultiDict]] +) -> str: + """Compute appropriate redirect URL + The application can add a 'next' query parameter or have 'next' as a form field. + If either exist, make sure they are valid (not pointing to external location) + If neither, return the fallback_url + + Can be passed either request.form (which is really a MultiDict OR a real form. + """ + form_next = None + if form and isinstance(form, FlaskForm): + if hasattr(form, "next") and form.next.data: + form_next = get_url(form.next.data) + elif form and form.get("next", None): + form_next = get_url(str(form.get("next"))) + arg_next = None + if request.args.get("next", None): + arg_next = get_url(request.args.get("next")) # type: ignore[arg-type] + urls = [ + arg_next, + form_next, + fallback_url, + ] + for url in urls: + if url and validate_redirect_url(url): + return url + raise ValueError("No valid redirect URL found - configuration error") def get_config(app: "Flask") -> t.Dict[str, t.Any]: diff --git a/flask_security/views.py b/flask_security/views.py index 79f973a1..5f8203bf 100644 --- a/flask_security/views.py +++ b/flask_security/views.py @@ -183,7 +183,10 @@ def login() -> "ResponseValue": assert form.user is not None remember_me = form.remember.data if "remember" in form else None response = _security.two_factor_plugins.tf_enter( - form.user, remember_me, "password", next_loc=propagate_next(request.url) + form.user, + remember_me, + "password", + next_loc=propagate_next(request.url, form), ) if response: return response @@ -305,7 +308,7 @@ def register() -> "ResponseValue": # signin - we adhere to historic behavior. if not _security.confirmable or cv("LOGIN_WITHOUT_CONFIRMATION"): response = _security.two_factor_plugins.tf_enter( - form.user, False, "register", next_loc=propagate_next(request.url) + form.user, False, "register", next_loc=propagate_next(request.url, form) ) if response: return response @@ -490,7 +493,7 @@ def confirm_email(token): # get the email. # Note also this goes against OWASP recommendations. response = _security.two_factor_plugins.tf_enter( - user, False, "confirm", next_loc=propagate_next(request.url) + user, False, "confirm", next_loc=propagate_next(request.url, None) ) if response: do_flash(m, c) @@ -639,7 +642,7 @@ def reset_password(token): if cv("AUTO_LOGIN_AFTER_RESET"): # backwards compat - really shouldn't do this according to OWASP response = _security.two_factor_plugins.tf_enter( - form.user, False, "reset", next_loc=propagate_next(request.url) + form.user, False, "reset", next_loc=propagate_next(request.url, None) ) if response: return response diff --git a/flask_security/webauthn.py b/flask_security/webauthn.py index 479ddf5a..9e1e883e 100644 --- a/flask_security/webauthn.py +++ b/flask_security/webauthn.py @@ -46,6 +46,7 @@ from flask import current_app as app from flask_login import current_user from wtforms import BooleanField, HiddenField, RadioField, StringField, SubmitField +from .forms import NextFormMixin try: import webauthn @@ -205,7 +206,7 @@ def validate(self, **kwargs: t.Any) -> bool: return True -class WebAuthnSigninForm(Form): +class WebAuthnSigninForm(Form, NextFormMixin): identity = StringField(get_form_field_label("identity")) remember = BooleanField(get_form_field_label("remember_me")) submit = SubmitField(label=get_form_field_xlate(_("Start")), id="wan_signin") @@ -236,7 +237,7 @@ def validate(self, **kwargs: t.Any) -> bool: return True -class WebAuthnSigninResponseForm(Form): +class WebAuthnSigninResponseForm(Form, NextFormMixin): """ This form is used both for signin (primary/first or secondary) and verify. """ @@ -621,7 +622,9 @@ def webauthn_signin() -> "ResponseValue": cv("WAN_SIGNIN_TEMPLATE"), wan_signin_form=form, wan_signin_response_form=build_form( - "wan_signin_response_form", remember=form.remember.data + "wan_signin_response_form", + remember=form.remember.data, + next=form.next.data, ), wan_state=state_token, credential_options=json.dumps(o_json), @@ -705,7 +708,7 @@ def webauthn_signin_response(token: str) -> "ResponseValue": form.user, remember_me, "webauthn", - next_loc=propagate_next(request.url), + next_loc=propagate_next(request.url, form), ) if response: return response diff --git a/tests/test_two_factor.py b/tests/test_two_factor.py index 4dc89ef4..c8944bd3 100644 --- a/tests/test_two_factor.py +++ b/tests/test_two_factor.py @@ -1246,6 +1246,18 @@ def test_propagate_next(app, client): verify_url, data=dict(code=codes[0]["login_token"]), follow_redirects=False ) assert "/im-in" in response.location + logout(client) + + # do it with next in the form + data = dict(email="gal@lp.com", password="password", next="/im-in") + response = client.post("/login", data=data, follow_redirects=True) + assert "?next=/im-in" in response.request.url + # grab URL from form to show that our template propagates ?next + verify_url = get_form_action(response) + response = client.post( + verify_url, data=dict(code=codes[1]["login_token"]), follow_redirects=False + ) + assert "/im-in" in response.location @pytest.mark.settings(freshness=timedelta(minutes=0)) diff --git a/tests/test_unified_signin.py b/tests/test_unified_signin.py index 183912b4..81866239 100644 --- a/tests/test_unified_signin.py +++ b/tests/test_unified_signin.py @@ -579,6 +579,10 @@ def test_post_already_authenticated(client, get_message): assert b"Post Login" in response.data response = client.post("/us-signin?next=/page1", data=data, follow_redirects=True) assert b"Page 1" in response.data + # should work in form as well + ndata = dict(email="matt@lp.com", password="password", next="/page1") + response = client.post("/us-signin", data=ndata, follow_redirects=True) + assert b"Page 1" in response.data headers = {"Accept": "application/json", "Content-Type": "application/json"} response = client.post("/us-signin", json=data, headers=headers) @@ -1243,6 +1247,7 @@ def test_next(app, client, get_message): assert "/post_login" in response.location logout(client) + # Test form.next response = client.post( "/us-signin", data=dict( @@ -2105,3 +2110,19 @@ def test_propagate_next_tf(app, client): follow_redirects=True, ) assert b"Profile Page" in response.data + + # Try form.next + logout(client, endpoint="/auth/logout") + + response = client.post( + "/auth/us-signin", + data=dict(identity="matt@lp.com", passcode="password", next="/im-in"), + follow_redirects=True, + ) + sendcode_url = get_form_action(response, 0) + response = client.post( + sendcode_url, + data=dict(code=sms_sender.messages[0].split()[-1]), + follow_redirects=False, + ) + assert "/im-in" in response.location diff --git a/tests/test_webauthn.py b/tests/test_webauthn.py index cdccfe16..59c74441 100644 --- a/tests/test_webauthn.py +++ b/tests/test_webauthn.py @@ -26,6 +26,7 @@ capture_flashes, get_existing_session, get_form_action, + get_form_input, json_authenticate, logout, reset_fresh, @@ -1654,3 +1655,20 @@ def test_login_next(app, client, get_message): ) assert response.status_code == 200 assert b"Profile Page" in response.data + + # Try form.next + logout(client, endpoint="/auth/logout") + reset_signcount(app, "matt@lp.com", "testr3") + + response = client.post( + "/auth/wan-signin", data=dict(identity="matt@lp.com", next="/im-in") + ) + response_url = get_form_action(response) + + next_loc = get_form_input(response, "next") + response = client.post( + response_url, + data=dict(credential=json.dumps(SIGNIN_DATA1), next=next_loc), + follow_redirects=False, + ) + assert "/im-in" in response.location