Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better token expiration handling #584

Merged
merged 6 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions lib/TWCManager/Control/HTTPControl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,11 +1265,8 @@ def process_save_settings(self, page="settings"):
carapi = master.getModuleByName("TeslaAPI")
if key == "carApiBearerToken":
carapi.setCarApiBearerToken(self.getFieldValue(key))
# New tokens expire after 8 hours
carapi.setCarApiTokenExpireTime(time.time() + 8 * 60 * 60)
elif key == "carApiRefreshToken":
carapi.setCarApiRefreshToken(self.getFieldValue(key))
carapi.setCarApiTokenExpireTime(time.time() + 45 * 24 * 60 * 60)

else:
# Write setting to dictionary
Expand Down
91 changes: 52 additions & 39 deletions lib/TWCManager/Vehicle/TeslaAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,45 +131,46 @@ def apiRefresh(self):
try:
req = requests.post(self.refreshURL, headers=headers, json=data)
logger.log(logging.INFO2, "Car API request" + str(req))
req.raise_for_status()
apiResponseDict = json.loads(req.text)
except requests.exceptions.RequestException:
logger.log(
logging.INFO2, "Request Exception parsing API Token Refresh Response"
)
pass
except ValueError:
pass
if req.status_code == 401:
logger.log(
logging.INFO2,
"TeslaAPI",
"ERROR: Can't access Tesla car via API. Please supply fresh tokens.",
)
self.setCarApiBearerToken("")
self.setCarApiRefreshToken("")
self.updateCarApiLastErrorTime()
# Instead of just setting carApiLastErrorTime, erase tokens to
# prevent further authorization attempts until user enters password
# on web interface. I feel this is safer than trying to log in every
# ten minutes with a bad token because Tesla might decide to block
# remote access to your car after too many authorization errors.
self.master.queue_background_task({"cmd": "saveSettings"})
return False
except json.decoder.JSONDecodeError:
logger.log(
logging.INFO2, "JSON Decode Error parsing API Token Refresh Response"
)
pass
except ValueError:
pass

try:
logger.log(logging.INFO4, "Car API auth response" + str(apiResponseDict))
self.setCarApiBearerToken(apiResponseDict["access_token"])
self.setCarApiRefreshToken(apiResponseDict["refresh_token"])
self.setCarApiTokenExpireTime(now + apiResponseDict["expires_in"])
self.master.queue_background_task({"cmd": "saveSettings"})
return True

except KeyError:
logger.log(
logging.INFO2,
"TeslaAPI",
"ERROR: Can't access Tesla car via API. Please log in again via web interface.",
)
self.updateCarApiLastErrorTime()
# Instead of just setting carApiLastErrorTime, erase tokens to
# prevent further authorization attempts until user enters password
# on web interface. I feel this is safer than trying to log in every
# ten minutes with a bad token because Tesla might decide to block
# remote access to your car after too many authorization errors.
self.setCarApiBearerToken("")
self.setCarApiRefreshToken("")
self.master.queue_background_task({"cmd": "saveSettings"})
except UnboundLocalError:
except:
pass

return False

def car_api_available(
self, email=None, password=None, charge=None, applyLimit=None
):
Expand Down Expand Up @@ -1124,29 +1125,41 @@ def setCarApiBearerToken(self, token=None):
return False
else:
self.carApiBearerToken = token
if not self.baseURL:
try:
decoded = jwt.decode(
token,
options={
"verify_signature": False,
"verify_aud": False,
"verify_exp": False,
},
)
try:
decoded = jwt.decode(
token,
options={
"verify_signature": False,
"verify_aud": False,
"verify_exp": False,
},
)
if not self.baseURL:
if "owner-api" in "".join(decoded.get("aud", "")):
self.baseURL = self.regionURL["OwnerAPI"]
elif decoded.get("ou_code", "") in self.regionURL:
self.baseURL = self.regionURL[decoded["ou_code"]]
except jwt.exceptions.DecodeError:
# Fallback to owner-api if we get an exception decoding jwt token
self.baseURL = self.regionURL["OwnerAPI"]

if "exp" in decoded:
self.setCarApiTokenExpireTime(int(decoded["exp"]))
else:
self.setCarApiTokenExpireTime(time.time() + 8 * 60 * 60)

except jwt.exceptions.DecodeError:
# Fallback to owner-api if we get an exception decoding jwt token
self.baseURL = self.regionURL["OwnerAPI"]
self.setCarApiTokenExpireTime(time.time() + 8 * 60 * 60)
return True
else:
return False

def setCarApiRefreshToken(self, token):
self.carApiRefreshToken = token
if token and not self.master.tokenSyncEnabled() and (
self.getCarApiBearerToken() == ""
or self.getCarApiTokenExpireTime() - time.time() < 60 * 60
):
return self.apiRefresh()
return True

def setCarApiTokenExpireTime(self, value):
Expand Down Expand Up @@ -1242,8 +1255,8 @@ def wakeVehicle(self, vehicle):
except requests.exceptions.RequestException:
if req.status_code == 401 and "expired" in req.text:
# If the token is expired, refresh it and try again
self.apiRefresh()
return self.wakeVehicle(vehicle)
if self.apiRefresh():
return self.wakeVehicle(vehicle)
elif req.status_code == 429:
# We're explicitly being told to back off
self.errorCount = max(30, self.errorCount)
Expand Down Expand Up @@ -1413,8 +1426,8 @@ def get_car_api(self, url, checkReady=True, provesOnline=True):
except requests.exceptions.RequestException:
if req.status_code == 401 and "expired" in req.text:
# If the token is expired, refresh it and try again
self.apiRefresh()
continue
if self.apiRefresh():
continue
elif req.status_code == 429:
# We're explicitly being told to back off
self.errorCount = max(30, self.errorCount)
Expand Down
Loading