Skip to content

Commit

Permalink
Make Google API call in the background (#5)
Browse files Browse the repository at this point in the history
Currently, we execute up to 3 API calls in the `/confirm` endpoint:
1. get entity from azure table
2. create google group membership
3. delete entity from azure table

However, we've been running into `SSLEOF` errors when calling the Google
API.

Internal reference:
https://watonomous.sentry.io/issues/5728320814/?project=4507799285989376&query=is:unresolved+issue.priority:%5Bhigh,+medium%5D+!namespace:mimir+!namespace:wato-finance-frontend+!namespace:wato-finance-backend&statsPeriod=24h&stream_index=0

<img width="2672" alt="image"
src="https://github.com/user-attachments/assets/33e9c6fc-14d0-47d7-ac0a-32d4a10070e0">


According to
[this](https://stackoverflow.com/questions/77473865/receiving-ssl-unexpected-eof-error-in-google-cloud-run-when-performing-get-api),
it's something to do with not having consistent CPU resources. Without
digging into the details, we implement something that is general enough
to handle this.

This PR offloads the Google API call to the background. Now, the
`/confirm` endpoint simply:
1. Update azure table with a note saying that the entity is confirmed

And we run a background job called `commit` to do the rest:
1. query confirmed entities from azure table
2. create google group memberships
3. delete entities from azure table

The background job is idempotent, so we can retry it when it fails. We
keep track of the last successful commit and healthcheck based on it.

During manual testing, if we trigger an exception in the background job
loop, it will not interrupt the loop.

This PR changes the RowKey format and is not backwards compatible.

Tested and working in prod. It turns out the local instance of azure
tables allows "eq null" (not sure if it works because "ne null"
certainly doesn't work) and the cloud one doesn't. It has the error
```
HttpResponseError
One of the request inputs is not valid.
RequestId:5af5611d-e002-0045-48ee-f1c4ed000000
Time:2024-08-19T04:15:00.0135139Z
ErrorCode:InvalidInput
```
  • Loading branch information
ben-z authored Aug 19, 2024
1 parent dc67415 commit 128e6b2
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 39 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ apscheduler>=3.10.4,<4
google-api-python-client>=2.141.0,<3
google-auth-httplib2>=0.2.0,<1
google-auth-oauthlib>=1.2.1,<2
python-slugify>=8.0.4,<9
113 changes: 78 additions & 35 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
from email.mime.text import MIMEText
from html.parser import HTMLParser
from smtplib import SMTP, SMTPNotSupportedError
from textwrap import dedent

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from azure.core.exceptions import ResourceNotFoundError
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
from watcloud_utils.fastapi import WATcloudFastAPI
from watcloud_utils.logging import logger, set_up_logging

from google_admin_sdk_utils import DirectoryService
from utils import get_azure_table_client, random_str
from utils import get_azure_table_client, random_str, make_azure_table_key


class HTMLTextFilter(HTMLParser):
Expand All @@ -34,15 +34,20 @@ def handle_data(self, data):

@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.add_job(cleanup, trigger=CronTrigger.from_crontab("* * * * *"))
scheduler.add_job(clean_up, trigger=CronTrigger.from_crontab("* * * * *"))
scheduler.add_job(commit, trigger=CronTrigger.from_crontab("* * * * *"))
yield
scheduler.shutdown()


def healthcheck(app: WATcloudFastAPI):
cleanup_delay_threshold = 120
if time.time() - app.runtime_info["last_cleanup_time"] > cleanup_delay_threshold:
msg = f"Last cleanup was more than {cleanup_delay_threshold} seconds ago."
healthcheck_threshold_sec = 120
if time.time() - app.runtime_info["last_cleanup_time"] > healthcheck_threshold_sec:
msg = f"Last cleanup was more than {healthcheck_threshold_sec} seconds ago."
logger.error(msg)
raise HTTPException(status_code=500, detail=msg)
if time.time() - app.runtime_info["last_commit_time"] > healthcheck_threshold_sec:
msg = f"Last commit was more than {healthcheck_threshold_sec} seconds ago."
logger.error(msg)
raise HTTPException(status_code=500, detail=msg)

Expand All @@ -60,7 +65,9 @@ def healthcheck(app: WATcloudFastAPI):
"num_successful_confirms": 0,
"num_failed_confirms": 0,
"num_expired_signups": 0,
"num_successful_commits": 0,
"last_cleanup_time": time.time(),
"last_commit_time": time.time(),
},
health_fns=[healthcheck],
)
Expand All @@ -71,7 +78,7 @@ class SignUpRequest(BaseModel):
email: str


CODE_TTL_SEC = 15 * 60
CODE_TTL_SEC = 60 * 60 * 24


@app.post("/sign-up")
Expand All @@ -84,14 +91,16 @@ def sign_up(req: SignUpRequest, request: Request):
raise HTTPException(status_code=400, detail="Invalid mailing list")

# Generate a random code
code = random_str(10)
code = random_str(32)

table_client.upsert_entity(
entity={
"PartitionKey": req.mailing_list,
"RowKey": req.email,
"PartitionKey": make_azure_table_key([req.mailing_list]),
"RowKey": make_azure_table_key([req.email, code]),
"CreatedAt": time.time(),
"Code": code,
"ConfirmedAt": 0,
"MailingList": req.mailing_list,
"Email": req.email,
}
)

Expand All @@ -112,7 +121,7 @@ def sign_up(req: SignUpRequest, request: Request):
<body>
<h1>Confirm Your Subscription</h1>
<p>Thanks for signing up for updates from "{req.mailing_list}"!</p>
<p>Please confirm your subscription by clicking the button below. This confirmation email will expire in {CODE_TTL_SEC // 60} minutes.</p>
<p>Please confirm your subscription by clicking the button below. This confirmation email will expire in {int(CODE_TTL_SEC / 60 / 60)} hours.</p>
<a class="confirmation-button" href="{confirmation_url}">Confirm Email</a>
<p>If the button above does not work, please copy and paste the following URL into your browser:</p>
<pre class="monospace-text">{confirmation_url}</pre>
Expand Down Expand Up @@ -178,43 +187,41 @@ def sign_up(req: SignUpRequest, request: Request):

@app.get("/confirm/{mailing_list}/{email}/{code}")
def confirm(mailing_list: str, email: str, code: str):
from azure.core.exceptions import ResourceNotFoundError

"""
Confirm the subscription and schedule the addition to the mailing list.
We schedule the addition instead of adding it immediately to minimize the room
for error in this handler (e.g., network issues when adding to the mailing list).
"""
try:
entity = table_client.get_entity(partition_key=mailing_list, row_key=email)
# update_entity merges the new entity with the existing entity, and throws
# ResourceNotFoundError if the entity does not exist.
table_client.update_entity(
entity={
"PartitionKey": make_azure_table_key([mailing_list]),
"RowKey": make_azure_table_key([email, code]),
"ConfirmedAt": time.time(),
}
)
except ResourceNotFoundError:
app.runtime_info["num_failed_confirms"] += 1
raise HTTPException(status_code=400, detail="Code expired or invalid")

if entity["Code"] != code or time.time() - entity["CreatedAt"] > CODE_TTL_SEC:
app.runtime_info["num_failed_confirms"] += 1
raise HTTPException(status_code=400, detail="Code expired or invalid")

if not directory_service.is_whitelisted_group(mailing_list):
raise HTTPException(
status_code=500, detail="Invalid mailing list found in the database"
)

directory_service.insert_member(mailing_list, email)

# delete the entity
table_client.delete_entity(partition_key=mailing_list, row_key=email)
raise HTTPException(status_code=400, detail="Link expired or invalid. Please sign up again.")

app.runtime_info["num_successful_confirms"] += 1

return {
"status": "ok",
"message": f"Subscription confirmed! '{email}' has been added to the '{mailing_list}' mailing list.",
"message": f"Subscription confirmed! Details: {mailing_list=}, {email=}",
}


@app.post("/cleanup")
def cleanup():
@app.post("/clean-up")
def clean_up():
"""
Clean up expired signups.
"""
# find unconfirmed signups that are older than CODE_TTL_SEC
expired_entities = table_client.query_entities(
query_filter=f"CreatedAt lt @ExpiryTime",
query_filter=f"ConfirmedAt eq 0 and CreatedAt lt @ExpiryTime",
select=["PartitionKey", "RowKey"],
parameters={"ExpiryTime": time.time() - CODE_TTL_SEC},
headers={"Accept": "application/json;odata=nometadata"},
Expand All @@ -228,6 +235,42 @@ def cleanup():

app.runtime_info["num_expired_signups"] += deleted_count
app.runtime_info["last_cleanup_time"] = time.time()
msg = f"cleanup: Deleted {deleted_count} expired signup(s)."
msg = f"clean_up: Deleted {deleted_count} expired signup(s)."
logger.info(msg)
return {"status": "ok", "message": msg}

@app.post("/commit")
def commit():
"""
Add confirmed signups to the mailing list.
Adding to the mailing list is idempotent, so we can safely retry this operation.
"""
confirmed_entities = table_client.query_entities(
query_filter="ConfirmedAt gt 0",
select=["PartitionKey", "RowKey", "MailingList", "Email"],
headers={"Accept": "application/json;odata=nometadata"},
)

commit_count = 0
for entity in confirmed_entities:
mailing_list = entity["MailingList"]
email = entity["Email"]

# Sanity check to ensure the mailing list is valid
if not directory_service.is_whitelisted_group(mailing_list):
raise HTTPException(
status_code=500, detail="Invalid mailing list found in the database"
)

directory_service.insert_member(mailing_list, email)

table_client.delete_entity(partition_key=entity["PartitionKey"], row_key=entity["RowKey"])

commit_count += 1

app.runtime_info["num_successful_commits"] += commit_count
app.runtime_info["last_commit_time"] = time.time()

msg = f"commit: Committed {commit_count} confirmed signup(s) to the mailing list."
logger.info(msg)
return {"status": "ok", "message": msg}
33 changes: 29 additions & 4 deletions src/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import os
import random
import string

from slugify import slugify
from watcloud_utils.logging import logger
from watcloud_utils.typer import app

Expand Down Expand Up @@ -42,14 +45,36 @@ def delete_azure_table(table_name: str):


@app.command()
def random_str(length: int = 10):
def random_str(length: int = 32, chars: str = string.ascii_lowercase):
"""
Generate a random string of the given length.
The default dictionary of characters to choose from is the lowercase alphabet.
"""
return "".join(random.choices(chars, k=length))

@app.command()
def make_azure_table_key(strs: list[str]):
r"""
Generate an Azure Table key from the given strings.
The generated key conforms to the following requirements:
- (azure) up to 1024 characters
- (azure) does not contain the characters '/', '\', '#', '?', or control characters
- (custom) the beginning of each str is guaranteed to be included in the key
- (custom) the generated key is deterministic for the given input
Requirements derived from:
- https://learn.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model
"""
import random
import string
# Just a naive implementation for now
max_len_per_str = 1024 // len(strs)

key = "".join(slugify(s)[:max_len_per_str] for s in strs)

return key


return "".join(random.choices(string.ascii_letters, k=length))


if __name__ == "__main__":
Expand Down

0 comments on commit 128e6b2

Please sign in to comment.