Skip to content

Commit

Permalink
Synchronize grades back to the LMS
Browse files Browse the repository at this point in the history
- New endpoints

GET /sync

Get the last sync for an assignment.

404 If there's no existing grade sync.

POST /sync

Create a new sync for an assignment.

400 If there's already a sync in progress.

- New tasks

sync_grades

Schedules each individual grade sync for one particular GradeSync

sync_grade

Submits a grade back to the LMS. Up to two retries.

sync_grade_complete

Summarizes the status of a GradingSync
  • Loading branch information
marcospri committed Sep 24, 2024
1 parent 6cf0c09 commit ff2e9ff
Show file tree
Hide file tree
Showing 9 changed files with 528 additions and 81 deletions.
52 changes: 49 additions & 3 deletions lms/services/auto_grading.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,54 @@
from sqlalchemy import select

from lms.js_config_types import AnnotationMetrics
from lms.models import AutoGradingConfig
from lms.models import AutoGradingConfig, GradingSync, GradingSyncGrade, LMSUser


class AutoGradingService:
def __init__(self, db):
self._db = db

def get_in_progress_sync(self, assignment) -> GradingSync | None:
return self._db.scalars(
self._search_query(
assignment=assignment, statuses=["scheduled", "in_progress"]
)
).one_or_none()

def get_last_sync(self, assignment) -> GradingSync | None:
return self._db.scalars(
self._search_query(assignment=assignment).order_by(
GradingSync.created.desc()
)
).first()

def create_grade_sync(
self, assignment, created_by: LMSUser, grades: dict[LMSUser, float]
) -> GradingSync:
grading_sync = GradingSync(
assignment_id=assignment.id, created_by=created_by, status="scheduled"
)
self._db.add(grading_sync)
self._db.flush()

for lms_user, grade in grades.items():
self._db.add(
GradingSyncGrade(
grading_sync_id=grading_sync.id,
lms_user_id=lms_user.id,
grade=grade,
)
)

return grading_sync

def _search_query(self, assignment, statuses: list[str] | None = None):
query = select(GradingSync).where(GradingSync.assignment_id == assignment.id)
if statuses:
query = query.where(GradingSync.status.in_(statuses))

return query

def calculate_grade(
self,
auto_grading_config: AutoGradingConfig,
Expand Down Expand Up @@ -62,5 +108,5 @@ def calculate_grade(
return round(grade, 2)


def factory(_context, _request):
return AutoGradingService()
def factory(_context, request):
return AutoGradingService(db=request.db)
6 changes: 2 additions & 4 deletions lms/services/lti_grading/_v13.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def sync_grade( # noqa: PLR0913
self,
lti_registration: LTIRegistration,
lis_outcome_service_url: str,
grade_timestamp: datetime,
grade_timestamp: str,
user_grading_id: str,
score: float,
):
Expand All @@ -106,9 +106,7 @@ def sync_grade( # noqa: PLR0913
This is very similar to `record_result` but not scoped to the request context,
taking all the necessary information as parameters.
"""
payload = self._record_score_payload(
score, user_grading_id, grade_timestamp.isoformat()
)
payload = self._record_score_payload(score, user_grading_id, grade_timestamp)
return self._ltia_service.request(
lti_registration,
"POST",
Expand Down
131 changes: 131 additions & 0 deletions lms/tasks/grading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from sqlalchemy import exists, select

from lms.models import (
ApplicationInstance,
GradingSync,
GradingSyncGrade,
Grouping,
LTIRegistration,
)
from lms.services import LTIAHTTPService
from lms.services.lti_grading.factory import LTI13GradingService
from lms.tasks.celery import app


@app.task()
def sync_grades():
"""Start processing pending (scheduled) GradingSync."""
with app.request_context() as request:
with request.tm:
scheduled_syncs = request.db.scalars(
select(GradingSync)
.where(GradingSync.status == "scheduled")
# We'll call this task once per new GradingSync but if we find more pending ones also start those.
.limit(5)
.with_for_update()
)

for sync in scheduled_syncs:
assignment = sync.assignment
assert (
assignment.lis_outcome_service_url
), "Assignment without grading URL"
lti_registration = request.db.scalars(
select(LTIRegistration)
.join(ApplicationInstance)
.join(Grouping)
.where(Grouping.id == assignment.course_id)
.order_by(LTIRegistration.updated.desc())
).first()
assert lti_registration, "No LTI registraion for LTI1.3 assignment"

for grade in sync.grades:
sync_grade.delay(
lti_registration_id=lti_registration.id,
lis_outcome_service_url=assignment.lis_outcome_service_url,
grading_sync_grade_id=grade.id,
)

sync.status = "in_progress"


@app.task(
max_retries=2,
retry_backoff=10,
autoretry_for=(Exception,),
)
def sync_grade(
*, lti_registration_id, lis_outcome_service_url: str, grading_sync_grade_id: int
):
"""Send one particular grade to the LMS."""
with app.request_context() as request:
with request.tm:
grading_sync_grade = request.db.get(GradingSyncGrade, grading_sync_grade_id)
grading_sync = grading_sync_grade.grading_sync

grading_service = LTI13GradingService(
ltia_service=request.find_service(LTIAHTTPService),
line_item_url=None,
line_item_container_url=None,
product_family=None, # type: ignore
misc_plugin=None, # type: ignore
lti_registration=None, # type: ignore
)
try:
grading_service.sync_grade(
request.db.get(LTIRegistration, lti_registration_id),
lis_outcome_service_url,
grading_sync.created.isoformat(),
grading_sync_grade.lms_user.lti_v13_user_id,
grading_sync_grade.grade,
)
except Exception as err:
if sync_grade.request.retries >= sync_grade.max_retries:
# If this is the last retry, mark this particular grade as an error
grading_sync_grade.success = False
grading_sync_grade.error_details = {"exception": str(err)}

_schedule_sync_grades_complete(grading_sync.id, countdown=1)
return

raise

grading_sync_grade.success = True
_schedule_sync_grades_complete(grading_sync.id, countdown=1)


@app.task()
def sync_grades_complete(*, grading_sync_id):
"""Summarize a GradingSync status based on the state of its children GradingSyncGrade."""
with app.request_context() as request:
with request.tm:
grading_sync = request.db.get(GradingSync, grading_sync_id)

result = request.db.execute(
select(
# Are all GradingSyncGrade completed?
~exists(
select(GradingSyncGrade).where(
GradingSyncGrade.grading_sync_id == grading_sync_id,
GradingSyncGrade.success.is_(None),
)
).label("completed"),
# Are all GradingSyncGrade scucesfully?
exists(
select(GradingSyncGrade).where(
GradingSyncGrade.grading_sync_id == grading_sync_id,
GradingSyncGrade.success.is_(False),
)
).label("failed"),
)
).one()
is_completed, is_failed = result.completed, result.failed

if is_completed:
grading_sync.status = "failed" if is_failed else "finished"


def _schedule_sync_grades_complete(grading_sync_id: int, countdown: int):
sync_grades_complete.apply_async(
(), {"grading_sync_id": grading_sync_id}, countdown=countdown
)
93 changes: 49 additions & 44 deletions lms/views/dashboard/api/grading.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
import logging
from datetime import datetime

from marshmallow import Schema, fields
from pyramid.httpexceptions import HTTPNotFound
from pyramid.view import view_config
from sqlalchemy import select

from lms.models import (
ApplicationInstance,
Grouping,
LMSUser,
LTIRegistration,
)
from lms.models import LMSUser
from lms.security import Permissions
from lms.services import LTIAHTTPService
from lms.services import AutoGradingService
from lms.services.dashboard import DashboardService
from lms.services.lti_grading.factory import LTI13GradingService
from lms.tasks.grading import sync_grades
from lms.validation._base import JSONPyramidRequestSchema

LOG = logging.getLogger(__name__)
Expand All @@ -36,6 +31,9 @@ def __init__(self, request) -> None:
self.dashboard_service: DashboardService = request.find_service(
name="dashboard"
)
self.auto_grading_service: AutoGradingService = request.find_service(
AutoGradingService
)

@view_config(
route_name="api.dashboard.assignments.grading.sync",
Expand All @@ -44,43 +42,50 @@ def __init__(self, request) -> None:
permission=Permissions.GRADE_ASSIGNMENT,
schema=AutoGradeSyncSchema,
)
def auto_grading_sync(self):
def create_grading_sync(self):
assignment = self.dashboard_service.get_request_assignment(self.request)
assert assignment.lis_outcome_service_url, "Assignment without grading URL"
lti_registration = self.db.scalars(
select(LTIRegistration)
.join(ApplicationInstance)
.join(Grouping)
.where(Grouping.id == assignment.course_id)
.order_by(LTIRegistration.updated.desc())
).first()
assert lti_registration, "No LTI registraion for LTI1.3 assignment"

sync_h_user_ids = [g["h_userid"] for g in self.request.parsed_params["grades"]]
if self.auto_grading_service.get_in_progress_sync(assignment):
self.request.response.status_int = 400
return {"message": "There's already an auto-grade sync in progress"}

sync_lms_users = self.db.execute(
select(LMSUser.h_userid, LMSUser.lti_v13_user_id).where(
LMSUser.h_userid.in_(sync_h_user_ids)
)
sync_h_user_ids = [g["h_userid"] for g in self.request.parsed_params["grades"]]
sync_lms_users = self.db.scalars(
select(LMSUser).where(LMSUser.h_userid.in_(sync_h_user_ids))
).all()
# Organize the data in a dict h_userid -> lti_user_id for easier access
sync_lms_users_by_h_userid = {r[0]: r[1] for r in sync_lms_users}

grading_service = LTI13GradingService(
ltia_service=self.request.find_service(LTIAHTTPService),
line_item_url=None,
line_item_container_url=None,
product_family=None, # type: ignore
misc_plugin=None, # type: ignore
lti_registration=None, # type: ignore
sync_lms_users_by_h_userid: dict[str, LMSUser] = {
lms_user.h_userid: lms_user for lms_user in sync_lms_users
}

lms_user_grades = {
sync_lms_users_by_h_userid[g["h_userid"]]: g["grade"]
for g in self.request.parsed_params["grades"]
if g["h_userid"] in sync_lms_users_by_h_userid
}
grading_sync = self.auto_grading_service.create_grade_sync(
assignment,
self.request.user.lms_user,
lms_user_grades,
)
# Use the same timestamp for all grades of the same sync
grade_sync_time_stamp = datetime.now()
for grade in self.request.parsed_params["grades"]:
grading_service.sync_grade(
lti_registration,
assignment.lis_outcome_service_url,
grade_sync_time_stamp,
sync_lms_users_by_h_userid[grade["h_userid"]],
grade["grade"],
)
self.request.add_finished_callback(self._start_sync_grades)
return {"status": grading_sync.status}

@view_config(
route_name="api.dashboard.assignments.grading.sync",
request_method="GET",
renderer="json",
permission=Permissions.GRADE_ASSIGNMENT,
)
def get_grading_sync(self):
assignment = self.dashboard_service.get_request_assignment(self.request)
if grading_sync := self.auto_grading_service.get_last_sync(assignment):
return {"status": grading_sync.status}

raise HTTPNotFound()

@staticmethod
def _start_sync_grades(_request) -> None:
"""Start processing a GradeSync after its creation.
We use this helper method instead of a lambda to make the test asserts easier.
""" # noqa: D205
sync_grades.delay()
Loading

0 comments on commit ff2e9ff

Please sign in to comment.