From 15d03e2ca29bcc4cef3a8f0521fb1adc27fc1439 Mon Sep 17 00:00:00 2001 From: Andris Raugulis Date: Mon, 25 May 2020 18:04:52 +0000 Subject: [PATCH] Fix usage of client certificates, as noted in #21. --- gp-okta.py | 154 +++++++++++++++++++++++++++++------------------------ 1 file changed, 85 insertions(+), 69 deletions(-) diff --git a/gp-okta.py b/gp-okta.py index 9345843..778a827 100755 --- a/gp-okta.py +++ b/gp-okta.py @@ -197,6 +197,7 @@ def __init__(self): self._store = {} # type: Dict[str, str] self._lines = {} # type: Dict[str, int] self.debug = False + self._session = None # type: Optional[requests.Session] self.vpn_url = '' # for reassignment self.certs = '' # for filename @@ -206,6 +207,21 @@ def __getattr__(self, name): return self._store[name] return '' + def get_session(self, name): + # type: (str) -> requests.Session + if self._session is None: + raise Exception('session not defined') + name = name.lower() + if name not in ['okta', 'portal', 'gateway']: + raise Exception('unknonw session: {0}'.format(name)) + s = self._session + if name == 'okta': + if self.okta_cli_cert: + s.cert = self.okta_cli_cert + if self.vpn_cli_cert: + s.cert = self.vpn_cli_cert + return s + def get_value(self, name): # type: (str) -> str return to_n(getattr(self, name)) @@ -294,7 +310,10 @@ def from_data(cls, content): if k == 'vpn_url': setattr(conf, k, conf._store[k].strip()) conf.debug = conf._store.get('debug', '').lower() in ['1', 'true'] - return conf + s = requests.Session() + s.headers['User-Agent'] = 'PAN GlobalProtect' + conf._session = s + return conf def mfa_priority(conf, ftype, fprovider): # type: (Conf, str, str) -> int @@ -369,9 +388,10 @@ def _send_req_post(conf, r, name, can_fail=False): err('{0}.request failed.\n{1}'.format(name, rr)) dbg(conf.debug, '{0}.response'.format(name), rr) -def send_req(conf, s, name, url, data, get=False, expected_url=None, verify=True, can_fail=False): - # type: (Conf, requests.Session, str, str, Dict[str, Any], bool, Optional[str], Union[bool, str], bool) -> Tuple[int, requests.structures.CaseInsensitiveDict[str], str] +def send_req(conf, dest, name, url, data, get=False, expected_url=None, verify=True, can_fail=False): + # type: (Conf, str, str, str, Dict[str, Any], bool, Optional[str], Union[bool, str], bool) -> Tuple[int, requests.structures.CaseInsensitiveDict[str], str] _send_req_pre(conf, name, url, data, expected_url) + s = conf.get_session(dest) if get: r = s.get(url, verify=verify) else: @@ -379,10 +399,11 @@ def send_req(conf, s, name, url, data, get=False, expected_url=None, verify=True _send_req_post(conf, r, name, can_fail) return r.status_code, r.headers, r.text -def send_json_req(conf, s, name, url, data, get=False, expected_url=None, verify=True, can_fail=False): - # type: (Conf, requests.Session, str, str, Dict[str, Any], bool, Optional[str], Union[bool, str], bool) -> Tuple[int, requests.structures.CaseInsensitiveDict[str], Dict[str, Any]] +def send_json_req(conf, dest, name, url, data, get=False, expected_url=None, verify=True, can_fail=False): + # type: (Conf, str, str, str, Dict[str, Any], bool, Optional[str], Union[bool, str], bool) -> Tuple[int, requests.structures.CaseInsensitiveDict[str], Dict[str, Any]] _send_req_pre(conf, name, url, data, expected_url) headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} + s = conf.get_session(dest) if get: r = s.get(url, headers=headers, verify=verify) else: @@ -390,12 +411,14 @@ def send_json_req(conf, s, name, url, data, get=False, expected_url=None, verify _send_req_post(conf, r, name, can_fail) return r.status_code, r.headers, parse_rjson(r) -def paloalto_prelogin(conf, s, gateway_url=None): - # type: (Conf, requests.Session, Optional[str]) -> etree._Element +def paloalto_prelogin(conf, gateway_url=None): + # type: (Conf, Optional[str]) -> etree._Element verify = True # type: Union[str, bool] + dest = 'portal' if gateway_url: # 2nd round or direct gateway: use gateway log('prelogin request [gateway_url]') + dest = 'gateway' url = '{0}/ssl-vpn/prelogin.esp'.format(gateway_url) if conf.certs: verify = conf.certs @@ -406,7 +429,7 @@ def paloalto_prelogin(conf, s, gateway_url=None): log('prelogin request [vpn_url]') url = '{0}/global-protect/prelogin.esp'.format(conf.vpn_url) verify = conf.get_cert('vpn_url', True) - _, _h, c = send_req(conf, s, 'prelogin', url, {}, get=True, verify=verify) + _, _h, c = send_req(conf, dest, 'prelogin', url, {}, get=True, verify=verify) x = parse_xml(c) saml_req = x.find('.//saml-request') if saml_req is None: @@ -428,20 +451,20 @@ def paloalto_prelogin(conf, s, gateway_url=None): saml_xml = parse_html(saml_raw) return saml_xml -def okta_saml(conf, s, saml_xml): - # type: (Conf, requests.Session, str) -> str +def okta_saml(conf, saml_xml): + # type: (Conf, str) -> str log('okta saml request [okta_url]') url, data = parse_form(saml_xml) dbg_form(conf, 'okta.saml request', data) - _, _h, c = send_req(conf, s, 'saml', url, data, + _, _h, c = send_req(conf, 'okta', 'saml', url, data, expected_url=conf.okta_url, verify=conf.get_cert('okta_url', True)) redirect_url = get_redirect_url(conf, c, url) if redirect_url is None: err('did not find redirect url') return redirect_url -def okta_auth(conf, s, stateToken = None): - # type: (Conf, requests.Session, Optional[str]) -> Any +def okta_auth(conf, stateToken = None): + # type: (Conf, Optional[str]) -> Any log('okta auth request [okta_url]') url = '{0}/api/v1/authn'.format(conf.okta_url) data = { @@ -454,16 +477,15 @@ def okta_auth(conf, s, stateToken = None): } if stateToken is None else { 'stateToken': stateToken } - _, _h, j = send_json_req(conf, s, 'auth', url, data, verify=conf.get_cert('okta_url', True)) - + _, _h, j = send_json_req(conf, 'okta', 'auth', url, data, verify=conf.get_cert('okta_url', True)) while True: - ok, r = okta_transaction_state(conf, s, j) + ok, r = okta_transaction_state(conf, j) if ok: return r j = r -def okta_transaction_state(conf, s, j): - # type: (Conf, requests.Session, Dict[str, Any]) -> Tuple[bool, Dict[str, Any]] +def okta_transaction_state(conf, j): + # type: (Conf, Dict[str, Any]) -> Tuple[bool, Dict[str, Any]] # https://developer.okta.com/docs/api/resources/authn#transaction-state status = j.get('status', '').strip().lower() dbg(conf.debug, 'status', status) @@ -478,7 +500,7 @@ def okta_transaction_state(conf, s, j): if len(state_token) == 0: err('empty state token') data = {'stateToken': state_token} - _, _h, j = send_json_req(conf, s, 'skip', url, data, + _, _h, j = send_json_req(conf, 'okta', 'skip', url, data, expected_url=conf.okta_url, verify=conf.get_cert('okta_url', True)) return False, j # status: password_expired @@ -490,7 +512,7 @@ def okta_transaction_state(conf, s, j): # status: mfa_enroll_activate # status: mfa_required if status == 'mfa_required': - j = okta_mfa(conf, s, j) + j = okta_mfa(conf, j) return False, j # status: mfa_challenge # status: success @@ -502,8 +524,8 @@ def okta_transaction_state(conf, s, j): print(j) err('unknown status: {0}'.format(status)) -def okta_mfa(conf, s, j): - # type: (Conf, requests.Session, Dict[str, Any]) -> Dict[str, Any] +def okta_mfa(conf, j): + # type: (Conf, Dict[str, Any]) -> Dict[str, Any] state_token = j.get('stateToken', '').strip() if len(state_token) == 0: err('empty state token') @@ -532,19 +554,19 @@ def okta_mfa(conf, s, j): fprovider = f.get('provider') r = None # type: Optional[Dict[str, Any]] if ftype == 'token:software:totp' or (ftype, fprovider) == ('token', 'symantec'): - r = okta_mfa_totp(conf, s, f, state_token) + r = okta_mfa_totp(conf, f, state_token) elif ftype == 'sms': - r = okta_mfa_sms(conf, s, f, state_token) + r = okta_mfa_sms(conf, f, state_token) elif ftype == 'push': - r = okta_mfa_push(conf, s, f, state_token) + r = okta_mfa_push(conf, f, state_token) elif ftype == 'webauthn': - r = okta_mfa_webauthn(conf, s, f, state_token) + r = okta_mfa_webauthn(conf, f, state_token) if r is not None: return r err('no factors processed') -def okta_mfa_totp(conf, s, factor, state_token): - # type: (Conf, requests.Session, Dict[str, str], str) -> Optional[Dict[str, Any]] +def okta_mfa_totp(conf, factor, state_token): + # type: (Conf, Dict[str, str], str) -> Optional[Dict[str, Any]] provider = factor.get('provider', '') secret = conf.get_value('totp.{0}'.format(provider)) code = None @@ -564,31 +586,31 @@ def okta_mfa_totp(conf, s, factor, state_token): 'passCode': code } log('mfa {0} totp request: {1} [okta_url]'.format(provider, code)) - _, _h, j = send_json_req(conf, s, 'totp mfa', factor.get('url', ''), data, + _, _h, j = send_json_req(conf, 'okta', 'totp mfa', factor.get('url', ''), data, expected_url=conf.okta_url, verify=conf.get_cert('okta_url', True)) return j -def okta_mfa_sms(conf, s, factor, state_token): - # type: (Conf, requests.Session, Dict[str, str], str) -> Optional[Dict[str, Any]] +def okta_mfa_sms(conf, factor, state_token): + # type: (Conf, Dict[str, str], str) -> Optional[Dict[str, Any]] provider = factor.get('provider', '') data = { 'factorId': factor.get('id'), 'stateToken': state_token } log('mfa {0} sms request [okta_url]'.format(provider)) - _, _h, j = send_json_req(conf, s, 'sms mfa (1)', factor.get('url', ''), data, + _, _h, j = send_json_req(conf, 'okta', 'sms mfa (1)', factor.get('url', ''), data, expected_url=conf.okta_url, verify=conf.get_cert('okta_url', True)) code = input('{0} SMS verification code: '.format(provider)).strip() if len(code) == 0: return None data['passCode'] = code log('mfa {0} sms request [okta_url]'.format(provider)) - _, _h, j = send_json_req(conf, s, 'sms mfa (2)', factor.get('url', ''), data, + _, _h, j = send_json_req(conf, 'okta', 'sms mfa (2)', factor.get('url', ''), data, expected_url=conf.okta_url, verify=conf.get_cert('okta_url', True)) return j -def okta_mfa_push(conf, s, factor, state_token): - # type: (Conf, requests.Session, Dict[str, str], str) -> Optional[Dict[str, Any]] +def okta_mfa_push(conf, factor, state_token): + # type: (Conf, Dict[str, str], str) -> Optional[Dict[str, Any]] provider = factor.get('provider', '') data = { 'factorId': factor.get('id'), @@ -598,7 +620,7 @@ def okta_mfa_push(conf, s, factor, state_token): status = 'MFA_CHALLENGE' counter = 0 while status == 'MFA_CHALLENGE': - _, _h, j = send_json_req(conf, s, 'push mfa ({0})'.format(counter), + _, _h, j = send_json_req(conf, 'okta', 'push mfa ({0})'.format(counter), factor.get('url', ''), data, expected_url=conf.okta_url, verify=conf.get_cert('okta_url', True)) status = j.get('status', '').strip() @@ -608,8 +630,8 @@ def okta_mfa_push(conf, s, factor, state_token): counter += 1 return j -def okta_mfa_webauthn(conf, s, factor, state_token): - # type: (Conf, requests.Session, Dict[str, str], str) -> Optional[Dict[str, Any]] +def okta_mfa_webauthn(conf, factor, state_token): + # type: (Conf, Dict[str, str], str) -> Optional[Dict[str, Any]] if not have_fido: err('Need fido2 package(s) for webauthn. Consider doing `pip install fido2` (or similar)') devices = list(CtapHidDevice.list_devices()) @@ -620,7 +642,7 @@ def okta_mfa_webauthn(conf, s, factor, state_token): data = { 'stateToken': state_token } - _, _h, j = send_json_req(conf, s, 'webauthn mfa challenge', factor.get('url', ''), data, + _, _h, j = send_json_req(conf, 'okta', 'webauthn mfa challenge', factor.get('url', ''), data, expected_url=conf.okta_url, verify=conf.get_cert('okta_url', True)) rfactor = j['_embedded']['factor'] profile = rfactor['profile'] @@ -649,12 +671,12 @@ def okta_mfa_webauthn(conf, s, factor, state_token): 'authenticatorData': to_n((base64.b64encode(assertion.auth_data)).decode('ascii')) } log('mfa {0} signature request [okta_url]'.format(provider)) - _, _h, j = send_json_req(conf, s, 'uf2 mfa signature', j['_links']['next']['href'], data, + _, _h, j = send_json_req(conf, 'okta', 'uf2 mfa signature', j['_links']['next']['href'], data, expected_url=conf.okta_url, verify=conf.get_cert('okta_url', True)) return j -def okta_redirect(conf, s, session_token, redirect_url): - # type: (Conf, requests.Session, str, str) -> Tuple[str, str] +def okta_redirect(conf, session_token, redirect_url): + # type: (Conf, str, str) -> Tuple[str, str] rc = 0 form_url = None # type: Optional[str] form_data = {} # type: Dict[str, str] @@ -672,7 +694,7 @@ def okta_redirect(conf, s, session_token, redirect_url): } url = '{0}/login/sessionCookieRedirect'.format(conf.okta_url) log('okta redirect request {0} [okta_url]'.format(rc)) - _, h, c = send_req(conf, s, 'redirect', url, data, + _, h, c = send_req(conf, 'okta', 'redirect', url, data, verify=conf.get_cert('okta_url', True)) state_token = get_state_token(conf, c) rurl = get_redirect_url(conf, c, url) @@ -684,7 +706,7 @@ def okta_redirect(conf, s, session_token, redirect_url): dbg_form(conf, 'okta.redirect request {0}'.format(rc), data) if state_token is not None: log('stateToken: {0}'.format(state_token)) - okta_auth(conf, s, state_token) + okta_auth(conf, state_token) elif form_url: log('okta redirect form request [vpn_url]') purl, pexp = parse_url(form_url), parse_url(conf.vpn_url) @@ -694,9 +716,9 @@ def okta_redirect(conf, s, session_token, redirect_url): verify = True # type: Union[str, bool] if conf.certs: verify = conf.certs - _, h, c = send_req(conf, s, 'redirect form', form_url, form_data, verify=verify) + _, h, c = send_req(conf, 'gateway', 'redirect form', form_url, form_data, verify=verify) else: - _, h, c = send_req(conf, s, 'redirect form', form_url, form_data, + _, h, c = send_req(conf, 'portal', 'redirect form', form_url, form_data, expected_url=conf.vpn_url, verify=conf.get_cert('vpn_url', True)) saml_username = h.get('saml-username', '').strip() prelogin_cookie = h.get('prelogin-cookie', '').strip() @@ -706,8 +728,8 @@ def okta_redirect(conf, s, session_token, redirect_url): dbg(conf.debug, 'saml prop', [saml_auth_status, saml_slo]) return saml_username, prelogin_cookie -def paloalto_getconfig(conf, s, username = None, prelogin_cookie = None, can_fail = False): - # type: (Conf, requests.Session, Optional[str], Optional[str], bool) -> Tuple[int, str, Dict[str, str]] +def paloalto_getconfig(conf, username = None, prelogin_cookie = None, can_fail = False): + # type: (Conf, Optional[str], Optional[str], bool) -> Tuple[int, str, Dict[str, str]] log('getconfig request [vpn_url]') url = '{0}/global-protect/getconfig.esp'.format(conf.vpn_url) data = { @@ -729,7 +751,7 @@ def paloalto_getconfig(conf, s, username = None, prelogin_cookie = None, can_fai 'prelogin-cookie': prelogin_cookie or '', 'ipv6-support': 'yes' } - sc, _h, c = send_req(conf, s, 'getconfig', url, data, + sc, _h, c = send_req(conf, 'portal', 'getconfig', url, data, verify=conf.get_cert('vpn_url', True), can_fail=can_fail) if sc != 200: return sc, '', {} @@ -755,12 +777,12 @@ def paloalto_getconfig(conf, s, username = None, prelogin_cookie = None, can_fai return 200, portal_userauthcookie, gateways # Combined first half of okta_saml with second half of okta_redirect -def okta_saml_2(conf, s, gateway_url, saml_xml): - # type: (Conf, requests.Session, str, str) -> Tuple[str, str] +def okta_saml_2(conf, gateway_url, saml_xml): + # type: (Conf, str, str) -> Tuple[str, str] log('okta saml request (2) [okta_url]') url, data = parse_form(saml_xml) dbg_form(conf, 'okta.saml request(2)', data) - _, h, c = send_req(conf, s, 'okta saml request (2)', url, data, + _, h, c = send_req(conf, 'okta', 'okta saml request (2)', url, data, expected_url=conf.okta_url, verify=conf.get_cert('okta_url', True)) xhtml = parse_html(c) url, data = parse_form(xhtml) @@ -769,7 +791,7 @@ def okta_saml_2(conf, s, gateway_url, saml_xml): verify = True # type: Union[str, bool] if conf.certs: verify = conf.certs - _, h, c = send_req(conf, s, 'okta redirect form (2)', url, data, + _, h, c = send_req(conf, 'gateway', 'okta redirect form (2)', url, data, expected_url=gateway_url, verify=verify) saml_username = h.get('saml-username', '').strip() if len(saml_username) == 0: @@ -848,15 +870,9 @@ def main(): conf = Conf.from_data(config_contents) - s = requests.Session() - s.headers['User-Agent'] = 'PAN GlobalProtect' - - if conf.okta_cli_cert: - s.cert = conf.okta_cli_cert - if args.list_gateways: log('listing gateways') - sc, _, gateways = paloalto_getconfig(conf, s, can_fail=True) + sc, _, gateways = paloalto_getconfig(conf, can_fail=True) if sc == 200: output_gateways(gateways) return 0 @@ -875,24 +891,24 @@ def main(): userauthcookie = None if another_dance or not gateway_url: - saml_xml = paloalto_prelogin(conf, s) + saml_xml = paloalto_prelogin(conf) else: - saml_xml = paloalto_prelogin(conf, s, gateway_url) + saml_xml = paloalto_prelogin(conf, gateway_url) - redirect_url = okta_saml(conf, s, saml_xml) - token = okta_auth(conf, s) + redirect_url = okta_saml(conf, saml_xml) + token = okta_auth(conf) log('sessionToken: {0}'.format(token)) - saml_username, prelogin_cookie = okta_redirect(conf, s, token, redirect_url) + saml_username, prelogin_cookie = okta_redirect(conf, token, redirect_url) if args.list_gateways: log('listing gateways') - sc, _, gateways = paloalto_getconfig(conf, s, saml_username, prelogin_cookie) + sc, _, gateways = paloalto_getconfig(conf, saml_username, prelogin_cookie) if sc == 200: output_gateways(gateways) return 0 err('could not list gateways') if another_dance or not gateway_url: - _, userauthcookie, gateways = paloalto_getconfig(conf, s, saml_username, prelogin_cookie) + _, userauthcookie, gateways = paloalto_getconfig(conf, saml_username, prelogin_cookie) gateway_url = choose_gateway_url(conf, gateways) log('portal-userauthcookie: {0}'.format(userauthcookie)) @@ -902,8 +918,8 @@ def main(): if another_dance: # 1st step: dance with the portal, 2nd step: dance with the gateway - saml_xml = paloalto_prelogin(conf, s, gateway_url) - saml_username, prelogin_cookie = okta_saml_2(conf, s, gateway_url, saml_xml) + saml_xml = paloalto_prelogin(conf, gateway_url) + saml_username, prelogin_cookie = okta_saml_2(conf, gateway_url, saml_xml) log('saml-username (2): {0}'.format(saml_username)) log('prelogin-cookie (2): {0}'.format(prelogin_cookie))