Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix form.next for newer authentication mechanisms - including two-fac… #854

Merged
merged 1 commit into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Fixes
- (:issue:`826`) Fix error in quickstart (codycollier)
- (:pr:`835`) Update Armenian translations (amkrtchyan-tmp)
- (:pr:`831`) Update German translations. (sr-verde)
- (:issue:`853`) Fix 'next' propagation when passed as form.next (thanks cariaso)

Version 5.3.0
-------------
Expand Down
2 changes: 1 addition & 1 deletion flask_security/oauth_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def oauthresponse(name: str) -> "ResponseValue":
if user:
after_this_request(view_commit)
response = _security.two_factor_plugins.tf_enter(
user, False, "oauth", next_loc=propagate_next(request.url)
user, False, "oauth", next_loc=propagate_next(request.url, None)
)
if response:
return response
Expand Down
8 changes: 5 additions & 3 deletions flask_security/tf_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Flask-Security Two-Factor Plugin Module

:copyright: (c) 2022-2022 by J. Christopher Wagner (jwag).
:copyright: (c) 2022-2023 by J. Christopher Wagner (jwag).
:license: MIT, see LICENSE for more details.

TODO:
Expand Down Expand Up @@ -58,7 +58,9 @@ def __init__(self, *args, **kwargs):
def tf_select() -> "ResponseValue":
# Ask user which MFA method they want to use.
# This is used when a user has setup more than one type of 2FA.
form = build_form_from_request("two_factor_select_form")
form = t.cast(
TwoFactorSelectForm, build_form_from_request("two_factor_select_form")
)

# This endpoint is unauthenticated - make sure we're in a valid state
if not all(k in session for k in ["tf_user_id", "tf_select"]):
Expand All @@ -81,7 +83,7 @@ def tf_select() -> "ResponseValue":
if tf_impl:
json_payload = {"tf_required": True}
response = tf_impl.tf_login(
user, json_payload, next_loc=propagate_next(request.url)
user, json_payload, next_loc=propagate_next(request.url, None)
)
if not response: # pragma no cover
# This really can't happen unless between the time the started logging in
Expand Down
20 changes: 10 additions & 10 deletions flask_security/unified_signin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Flask-Security Unified Signin module

:copyright: (c) 2019-2022 by J. Christopher Wagner (jwag).
:copyright: (c) 2019-2023 by J. Christopher Wagner (jwag).
:license: MIT, see LICENSE for more details.

This implements a unified sign in endpoint - allowing
Expand All @@ -24,7 +24,6 @@
fact allow the user to add a password. Is that sufficient?
- This also means that there is no way to REMOVE your password once it is setup,
although user can register without one.
- Any reason to support 'next' in form? xx?next=yyy works fine.
- separate code validation times for SMS, email, authenticator?
- token versus code versus passcode? Confusing terminology.

Expand Down Expand Up @@ -52,6 +51,7 @@
from .decorators import anonymous_user_required, auth_required, unauth_csrf
from .forms import (
Form,
NextFormMixin,
Required,
build_form_from_request,
build_form,
Expand Down Expand Up @@ -231,7 +231,7 @@ def validate2(self) -> bool:
return False # pragma: no cover


class UnifiedSigninForm(_UnifiedPassCodeForm):
class UnifiedSigninForm(_UnifiedPassCodeForm, NextFormMixin):
"""A unified login form
For either identity/password or request and enter code.
"""
Expand Down Expand Up @@ -414,7 +414,7 @@ def us_signin_send_code() -> "ResponseValue":
This takes an identity (as configured in USER_IDENTITY_ATTRIBUTES)
and a method request to send a code.
"""
form: UnifiedSigninForm = build_form_from_request("us_signin_form")
form = t.cast(UnifiedSigninForm, build_form_from_request("us_signin_form"))
form.submit_send_code.data = True
form.submit.data = False

Expand Down Expand Up @@ -485,7 +485,7 @@ def us_verify_send_code() -> "ResponseValue":
"""
Send code during verify. POST only.
"""
form: UnifiedVerifyForm = build_form_from_request("us_verify_form")
form = t.cast(UnifiedVerifyForm, build_form_from_request("us_verify_form"))
form.submit_send_code.data = True
form.submit.data = False

Expand Down Expand Up @@ -557,7 +557,7 @@ def us_signin() -> "ResponseValue":
else:
return redirect(get_post_login_redirect())

form: UnifiedSigninForm = build_form_from_request("us_signin_form")
form = t.cast(UnifiedSigninForm, build_form_from_request("us_signin_form"))
form.submit.data = True
form.submit_send_code.data = False

Expand All @@ -570,7 +570,7 @@ def us_signin() -> "ResponseValue":
form.user,
remember_me,
form.authn_via,
next_loc=propagate_next(request.url),
next_loc=propagate_next(request.url, form),
)
if response:
return response
Expand Down Expand Up @@ -632,7 +632,7 @@ def us_verify() -> "ResponseValue":
will have filled in ?next=xxx - which we want to carefully not lose as we
go through these steps.
"""
form: UnifiedVerifyForm = build_form_from_request("us_verify_form")
form = t.cast(UnifiedVerifyForm, build_form_from_request("us_verify_form"))
form.submit.data = True
form.submit_send_code.data = False

Expand Down Expand Up @@ -736,7 +736,7 @@ def us_verify_link() -> "ResponseValue":
)
)
response = _security.two_factor_plugins.tf_enter(
user, False, "email", next_loc=propagate_next(request.url)
user, False, "email", next_loc=propagate_next(request.url, None)
)
if response:
return response
Expand Down Expand Up @@ -769,7 +769,7 @@ def us_setup() -> "ResponseValue":
use a timed signed token to pass along state.
GET - retrieve current info (json) or form.
"""
form: UnifiedSigninSetupForm = build_form_from_request("us_setup_form")
form = t.cast(UnifiedSigninSetupForm, build_form_from_request("us_setup_form"))

setup_methods = _compute_setup_methods()
active_methods = _compute_active_methods(current_user)
Expand Down
84 changes: 43 additions & 41 deletions flask_security/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import hmac
import time
import typing as t
from urllib.parse import parse_qsl, parse_qs, urlsplit, urlunsplit, urlencode
from urllib.parse import parse_qsl, urlsplit, urlunsplit, urlencode
import urllib.request
import urllib.error
import warnings
Expand All @@ -36,10 +36,11 @@
from flask_login import current_user
from flask_login import COOKIE_NAME as REMEMBER_COOKIE_NAME
from flask_principal import AnonymousIdentity, Identity, identity_changed, Need
from flask_wtf import csrf
from flask_wtf import csrf, FlaskForm
from wtforms import ValidationError
from itsdangerous import BadSignature, SignatureExpired
from werkzeug.local import LocalProxy
from werkzeug.datastructures import MultiDict

from .quart_compat import best, get_quart_status
from .proxies import _security, _datastore, _pwd_context, _hashing_context
Expand All @@ -48,7 +49,6 @@
if t.TYPE_CHECKING: # pragma: no cover
from flask import Flask, Response
from flask.typing import ResponseValue
from flask_wtf import FlaskForm
from .datastore import User

SB = t.Union[str, bytes]
Expand Down Expand Up @@ -547,46 +547,27 @@
return True


def get_post_action_redirect(config_key: str, declared: t.Optional[str] = None) -> str:
# All this nonsense due to mypy
arg_next_url = None
arg_next = request.args.get("next", None)
if arg_next:
arg_next_url = get_url(arg_next)
form_next_url = None
form_next = request.form.get("next", None)
if form_next:
form_next_url = get_url(form_next)
urls = [
arg_next_url,
form_next_url,
find_redirect(config_key),
]
if declared:
urls.insert(0, declared)
for url in urls:
if url and validate_redirect_url(url):
return url
raise ValueError("No valid redirect URL found - configuration error")
def get_post_action_redirect(config_key: str) -> str:
return propagate_next(find_redirect(config_key), request.form)


def get_post_login_redirect(declared: t.Optional[str] = None) -> str:
return get_post_action_redirect("SECURITY_POST_LOGIN_VIEW", declared)
def get_post_login_redirect() -> str:
return get_post_action_redirect("SECURITY_POST_LOGIN_VIEW")


def get_post_register_redirect(declared: t.Optional[str] = None) -> str:
return get_post_action_redirect("SECURITY_POST_REGISTER_VIEW", declared)
def get_post_register_redirect() -> str:
return get_post_action_redirect("SECURITY_POST_REGISTER_VIEW")


def get_post_logout_redirect(declared: t.Optional[str] = None) -> str:
return get_post_action_redirect("SECURITY_POST_LOGOUT_VIEW", declared)
def get_post_logout_redirect() -> str:
return get_post_action_redirect("SECURITY_POST_LOGOUT_VIEW")


def get_post_verify_redirect(declared: t.Optional[str] = None) -> str:
return get_post_action_redirect("SECURITY_POST_VERIFY_VIEW", declared)
def get_post_verify_redirect() -> str:
return get_post_action_redirect("SECURITY_POST_VERIFY_VIEW")


def find_redirect(key: str) -> t.Optional[str]:
def find_redirect(key: str) -> str:
"""Returns the URL to redirect to after a user logs in successfully.

:param key: The session or application configuration key to search for
Expand All @@ -599,17 +580,38 @@
app_url = None
if app_value:
app_url = get_url(app_value)
rv = session_url or app_url or current_app.config.get("APPLICATION_ROOT", "/")
rv = session_url or app_url or str(current_app.config.get("APPLICATION_ROOT", "/"))
return rv


def propagate_next(url: str) -> str:
# return either URL or, if URL already has a ?next=xx, return that.
url_next = urlsplit(url)
qparams = parse_qs(url_next.query)
if "next" in qparams:
return qparams["next"][0]
return url
def propagate_next(
fallback_url: str, form: t.Optional[t.Union[FlaskForm, MultiDict]]
) -> str:
"""Compute appropriate redirect URL
The application can add a 'next' query parameter or have 'next' as a form field.
If either exist, make sure they are valid (not pointing to external location)
If neither, return the fallback_url

Can be passed either request.form (which is really a MultiDict OR a real form.
"""
form_next = None
if form and isinstance(form, FlaskForm):
if hasattr(form, "next") and form.next.data:
form_next = get_url(form.next.data)
elif form and form.get("next", None):
form_next = get_url(str(form.get("next")))
arg_next = None
if request.args.get("next", None):
arg_next = get_url(request.args.get("next")) # type: ignore[arg-type]
urls = [
arg_next,
form_next,
fallback_url,
]
for url in urls:
if url and validate_redirect_url(url):
return url
raise ValueError("No valid redirect URL found - configuration error")

Check warning on line 614 in flask_security/utils.py

View check run for this annotation

Codecov / codecov/patch

flask_security/utils.py#L614

Added line #L614 was not covered by tests


def get_config(app: "Flask") -> t.Dict[str, t.Any]:
Expand Down
11 changes: 7 additions & 4 deletions flask_security/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ def login() -> "ResponseValue":
assert form.user is not None
remember_me = form.remember.data if "remember" in form else None
response = _security.two_factor_plugins.tf_enter(
form.user, remember_me, "password", next_loc=propagate_next(request.url)
form.user,
remember_me,
"password",
next_loc=propagate_next(request.url, form),
)
if response:
return response
Expand Down Expand Up @@ -305,7 +308,7 @@ def register() -> "ResponseValue":
# signin - we adhere to historic behavior.
if not _security.confirmable or cv("LOGIN_WITHOUT_CONFIRMATION"):
response = _security.two_factor_plugins.tf_enter(
form.user, False, "register", next_loc=propagate_next(request.url)
form.user, False, "register", next_loc=propagate_next(request.url, form)
)
if response:
return response
Expand Down Expand Up @@ -490,7 +493,7 @@ def confirm_email(token):
# get the email.
# Note also this goes against OWASP recommendations.
response = _security.two_factor_plugins.tf_enter(
user, False, "confirm", next_loc=propagate_next(request.url)
user, False, "confirm", next_loc=propagate_next(request.url, None)
)
if response:
do_flash(m, c)
Expand Down Expand Up @@ -639,7 +642,7 @@ def reset_password(token):
if cv("AUTO_LOGIN_AFTER_RESET"):
# backwards compat - really shouldn't do this according to OWASP
response = _security.two_factor_plugins.tf_enter(
form.user, False, "reset", next_loc=propagate_next(request.url)
form.user, False, "reset", next_loc=propagate_next(request.url, None)
)
if response:
return response
Expand Down
11 changes: 7 additions & 4 deletions flask_security/webauthn.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from flask import current_app as app
from flask_login import current_user
from wtforms import BooleanField, HiddenField, RadioField, StringField, SubmitField
from .forms import NextFormMixin

try:
import webauthn
Expand Down Expand Up @@ -205,7 +206,7 @@ def validate(self, **kwargs: t.Any) -> bool:
return True


class WebAuthnSigninForm(Form):
class WebAuthnSigninForm(Form, NextFormMixin):
identity = StringField(get_form_field_label("identity"))
remember = BooleanField(get_form_field_label("remember_me"))
submit = SubmitField(label=get_form_field_xlate(_("Start")), id="wan_signin")
Expand Down Expand Up @@ -236,7 +237,7 @@ def validate(self, **kwargs: t.Any) -> bool:
return True


class WebAuthnSigninResponseForm(Form):
class WebAuthnSigninResponseForm(Form, NextFormMixin):
"""
This form is used both for signin (primary/first or secondary) and verify.
"""
Expand Down Expand Up @@ -621,7 +622,9 @@ def webauthn_signin() -> "ResponseValue":
cv("WAN_SIGNIN_TEMPLATE"),
wan_signin_form=form,
wan_signin_response_form=build_form(
"wan_signin_response_form", remember=form.remember.data
"wan_signin_response_form",
remember=form.remember.data,
next=form.next.data,
),
wan_state=state_token,
credential_options=json.dumps(o_json),
Expand Down Expand Up @@ -705,7 +708,7 @@ def webauthn_signin_response(token: str) -> "ResponseValue":
form.user,
remember_me,
"webauthn",
next_loc=propagate_next(request.url),
next_loc=propagate_next(request.url, form),
)
if response:
return response
Expand Down
12 changes: 12 additions & 0 deletions tests/test_two_factor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,18 @@ def test_propagate_next(app, client):
verify_url, data=dict(code=codes[0]["login_token"]), follow_redirects=False
)
assert "/im-in" in response.location
logout(client)

# do it with next in the form
data = dict(email="[email protected]", password="password", next="/im-in")
response = client.post("/login", data=data, follow_redirects=True)
assert "?next=/im-in" in response.request.url
# grab URL from form to show that our template propagates ?next
verify_url = get_form_action(response)
response = client.post(
verify_url, data=dict(code=codes[1]["login_token"]), follow_redirects=False
)
assert "/im-in" in response.location


@pytest.mark.settings(freshness=timedelta(minutes=0))
Expand Down
Loading