Skip to content

Commit

Permalink
Add invalid ID tests for main.views (#1124)
Browse files Browse the repository at this point in the history
## Description of Changes
There were a handful of routes where we did not validate the initial ID
parameters. Additionally, I standardized how object querying was done
within the `main.views` file.

## Tests and Linting
- [x] This branch is up-to-date with the `develop` branch.
- [x] `pytest` passes on my local development environment.
- [x] `pre-commit` passes on my local development environment.
  • Loading branch information
michplunkett authored Oct 1, 2024
1 parent 8b12b4b commit 146cc8e
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 35 deletions.
94 changes: 60 additions & 34 deletions OpenOversight/app/main/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,14 @@ def redirect_sort_images(department_id: int):
)
@login_required
def sort_images(department_id: int):
try:
department = Department.query.filter_by(id=department_id).one()
except NoResultFound:
abort(HTTPStatus.NOT_FOUND)

# Select a random unsorted image from the database
image_query = Image.query.filter_by(contains_cops=None).filter_by(
department_id=department_id
department_id=department.id
)
image = get_random_image(image_query)

Expand All @@ -274,7 +279,7 @@ def sort_images(department_id: int):
else:
proper_path = None
return render_template(
"sort.html", image=image, path=proper_path, department_id=department_id
"sort.html", image=image, path=proper_path, department_id=department.id
)


Expand Down Expand Up @@ -316,10 +321,6 @@ def officer_profile(officer_id: int):
officer = Officer.query.filter_by(id=officer_id).one()
except NoResultFound:
abort(HTTPStatus.NOT_FOUND)
except: # noqa: E722
exception_type, value, full_traceback = sys.exc_info()
error_str = " ".join([str(exception_type), str(value), format_exc()])
current_app.logger.error(f"Error finding officer: {error_str}")
form.job_title.query = (
Job.query.filter_by(department_id=officer.department_id)
.order_by(Job.order.asc())
Expand Down Expand Up @@ -379,15 +380,17 @@ def redirect_add_assignment(officer_id: int):
@ac_or_admin_required
def add_assignment(officer_id: int):
form = AssignmentForm()
officer = db.session.get(Officer, officer_id)
try:
officer = Officer.query.filter_by(id=officer_id).one()
except NoResultFound:
flash("Officer not found")
abort(HTTPStatus.NOT_FOUND)

form.job_title.query = (
Job.query.filter_by(department_id=officer.department_id)
.order_by(Job.order.asc())
.all()
)
if not officer:
flash("Officer not found")
abort(HTTPStatus.NOT_FOUND)

if form.validate_on_submit():
if current_user.is_administrator or (
Expand Down Expand Up @@ -441,7 +444,11 @@ def redirect_edit_assignment(officer_id: int, assignment_id: int):
@login_required
@ac_or_admin_required
def edit_assignment(officer_id: int, assignment_id: int):
officer = db.session.get(Officer, officer_id)
try:
officer = Officer.query.filter_by(id=officer_id).one()
except NoResultFound:
flash("Officer not found")
abort(HTTPStatus.NOT_FOUND)

if current_user.is_area_coordinator and not current_user.is_administrator:
if not ac_can_edit_officer(officer, current_user):
Expand Down Expand Up @@ -491,8 +498,9 @@ def redirect_add_salary(officer_id: int):
@ac_or_admin_required
def add_salary(officer_id: int):
form = SalaryForm()
officer = db.session.get(Officer, officer_id)
if not officer:
try:
officer = Officer.query.filter_by(id=officer_id).one()
except NoResultFound:
flash("Officer not found")
abort(HTTPStatus.NOT_FOUND)

Expand Down Expand Up @@ -556,7 +564,12 @@ def redirect_edit_salary(officer_id: int, salary_id: int):
@login_required
@ac_or_admin_required
def edit_salary(officer_id: int, salary_id: int):
officer = db.session.get(Officer, officer_id)
try:
officer = Officer.query.filter_by(id=officer_id).one()
except NoResultFound:
flash("Officer not found")
abort(HTTPStatus.NOT_FOUND)

if current_user.is_area_coordinator and not current_user.is_administrator:
if not ac_can_edit_officer(officer, current_user):
abort(HTTPStatus.FORBIDDEN)
Expand Down Expand Up @@ -591,10 +604,11 @@ def redirect_display_submission(image_id: int):
@login_required
def display_submission(image_id: int):
try:
image = db.session.get(Image, image_id)
proper_path = serve_image(image.filepath)
image = Image.query.filter_by(id=image_id).one()
except NoResultFound:
abort(HTTPStatus.NOT_FOUND)

proper_path = serve_image(image.filepath)
return render_template("image.html", image=image, path=proper_path)


Expand All @@ -610,10 +624,11 @@ def redirect_display_tag(tag_id: int):
@main.route("/tags/<int:tag_id>")
def display_tag(tag_id: int):
try:
tag = db.session.get(Face, tag_id)
proper_path = serve_image(tag.original_image.filepath)
tag = Face.query.filter_by(id=tag_id).one()
except NoResultFound:
abort(HTTPStatus.NOT_FOUND)

proper_path = serve_image(tag.original_image.filepath)
return render_template(
"tag.html", face=tag, path=proper_path, jsloads=["js/tag.js"]
)
Expand Down Expand Up @@ -743,7 +758,11 @@ def redirect_edit_department(department_id: int):
@login_required
@admin_required
def edit_department(department_id: int):
department = db.get_or_404(Department, department_id)
try:
department = Department.query.filter_by(id=department_id).one()
except NoResultFound:
abort(HTTPStatus.NOT_FOUND)

previous_name = department.name
form = EditDepartmentForm(obj=department)
original_ranks = department.jobs
Expand Down Expand Up @@ -896,6 +915,11 @@ def list_officer(
current_job=None,
require_photo: Optional[bool] = None,
):
try:
department = Department.query.filter_by(id=department_id).one()
except NoResultFound:
abort(HTTPStatus.NOT_FOUND)

form = BrowseForm()
form.rank.query = (
Job.query.filter_by(department_id=department_id, is_sworn_officer=True)
Expand All @@ -916,10 +940,6 @@ def list_officer(
form_data["unique_internal_identifier"] = unique_internal_identifier
form_data["require_photo"] = require_photo

department = db.session.get(Department, department_id)
if not department:
abort(HTTPStatus.NOT_FOUND)

age_range = {ac[0] for ac in AGE_CHOICES}

# Set form data based on URL
Expand Down Expand Up @@ -1189,7 +1209,10 @@ def redirect_edit_officer(officer_id: int):
@login_required
@ac_or_admin_required
def edit_officer(officer_id: int):
officer = db.session.get(Officer, officer_id)
try:
officer = Officer.query.filter_by(id=officer_id).one()
except NoResultFound:
abort(HTTPStatus.NOT_FOUND)
form = EditOfficerForm(obj=officer)

if request.method == HTTPMethod.GET:
Expand Down Expand Up @@ -1265,9 +1288,9 @@ def redirect_delete_tag(tag_id: int):
@login_required
@ac_or_admin_required
def delete_tag(tag_id: int):
tag = db.session.get(Face, tag_id)

if not tag:
try:
tag = Face.query.filter_by(id=tag_id).one()
except NoResultFound:
flash("Tag not found")
abort(HTTPStatus.NOT_FOUND)

Expand Down Expand Up @@ -1301,9 +1324,9 @@ def redirect_set_featured_tag(tag_id: int):
@login_required
@ac_or_admin_required
def set_featured_tag(tag_id: int):
tag = db.session.get(Face, tag_id)

if not tag:
try:
tag = Face.query.filter_by(id=tag_id).one()
except NoResultFound:
flash("Tag not found")
abort(HTTPStatus.NOT_FOUND)

Expand Down Expand Up @@ -1482,9 +1505,11 @@ def redirect_complete_tagging(image_id: int):
@login_required
def complete_tagging(image_id: int):
# Select a random untagged image from the database
image = db.session.get(Image, image_id)
if not image:
try:
image = Image.query.filter_by(id=image_id).one()
except NoResultFound:
abort(HTTPStatus.NOT_FOUND)

image.is_tagged = True
image.last_updated_by = current_user.id
db.session.commit()
Expand Down Expand Up @@ -1855,8 +1880,9 @@ def redirect_upload(department_id: int, officer_id: Optional[int] = None):
@limiter.limit("250/minute")
def upload(department_id: int, officer_id: Optional[int] = None):
if officer_id:
officer = db.session.get(Officer, officer_id)
if not officer:
try:
officer = Officer.query.filter_by(id=officer_id).one()
except NoResultFound:
return jsonify(error="This officer does not exist."), HTTPStatus.NOT_FOUND
if not (
current_user.is_administrator
Expand Down
3 changes: 3 additions & 0 deletions OpenOversight/tests/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# File Mode Constants
FILE_MODE_WRITE = "w"

# Miscellaneous Constants
INVALID_ID = 4000000

# User Constants
AC_USER_EMAIL = "[email protected]"
AC_USER_PASSWORD = "horse"
Expand Down
55 changes: 54 additions & 1 deletion OpenOversight/tests/routes/test_image_tagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from OpenOversight.app.models.database import Department, Face, Image, Officer, User
from OpenOversight.app.utils.constants import ENCODING_UTF_8
from OpenOversight.tests.conftest import AC_DEPT
from OpenOversight.tests.constants import INVALID_ID
from OpenOversight.tests.routes.route_helpers import login_ac, login_admin, login_user


Expand Down Expand Up @@ -46,6 +47,14 @@ def test_route_login_required(route, client, mockdata):
assert rv.status_code == HTTPStatus.FOUND


def test_invalid_department_image_sorting(client, session):
with current_app.test_request_context():
login_user(client)

rv = client.get(url_for("main.sort_images", department_id=INVALID_ID))
assert rv.status_code == HTTPStatus.NOT_FOUND


# POST-only routes
@pytest.mark.parametrize(
"route",
Expand All @@ -71,6 +80,16 @@ def test_logged_in_user_can_access_sort_form(mockdata, client, session):
assert b"Do you see uniformed law enforcement officers in the photo" in rv.data


def test_invalid_officer_id_display_submission(client, session):
with current_app.test_request_context():
login_admin(client)

rv = client.get(
url_for("main.display_submission", image_id=INVALID_ID),
)
assert rv.status_code == HTTPStatus.NOT_FOUND


def test_user_can_view_submission(mockdata, client, session):
with current_app.test_request_context():
login_user(client)
Expand All @@ -81,6 +100,14 @@ def test_user_can_view_submission(mockdata, client, session):
assert b"Image ID" in rv.data


def test_invalid_tag_id_display_tag(client, session):
with current_app.test_request_context():
login_user(client)

rv = client.get(url_for("main.display_tag", tag_id=INVALID_ID))
assert rv.status_code == HTTPStatus.NOT_FOUND


def test_user_can_view_tag(mockdata, client, session):
with current_app.test_request_context():
login_user(client)
Expand All @@ -93,6 +120,14 @@ def test_user_can_view_tag(mockdata, client, session):
assert attribute in rv.data


def test_invalid_id_delete_tag(mockdata, client, session):
with current_app.test_request_context():
login_admin(client)

rv = client.post(url_for("main.delete_tag", tag_id=INVALID_ID))
assert rv.status_code == HTTPStatus.NOT_FOUND


def test_admin_can_delete_tag(mockdata, client, session):
with current_app.test_request_context():
login_admin(client)
Expand Down Expand Up @@ -247,7 +282,15 @@ def test_user_cannot_tag_officer_mismatched_with_department(mockdata, client, se
) in rv.data


def test_user_can_finish_tagging(mockdata, client, session):
def test_invalid_id_complete_tagging(client, session):
with current_app.test_request_context():
login_user(client)

rv = client.get(url_for("main.complete_tagging", image_id=INVALID_ID))
assert rv.status_code == HTTPStatus.NOT_FOUND


def test_complete_tagging(mockdata, client, session):
with current_app.test_request_context():
_, user = login_user(client)
image_id = 4
Expand Down Expand Up @@ -288,6 +331,16 @@ def test_user_is_redirected_to_correct_department_after_tagging(
assert department.name in rv.data.decode(ENCODING_UTF_8)


def test_invalid_id_set_featured_tag(mockdata, client, session):
with current_app.test_request_context():
login_admin(client)

rv = client.post(
url_for("main.set_featured_tag", tag_id=INVALID_ID), follow_redirects=True
)
assert rv.status_code == HTTPStatus.NOT_FOUND


def test_admin_can_set_featured_tag(mockdata, client, session):
with current_app.test_request_context():
login_admin(client)
Expand Down
Loading

0 comments on commit 146cc8e

Please sign in to comment.