diff --git a/coriolis/api/middleware/auth.py b/coriolis/api/middleware/auth.py index 3219f412..52d7dab5 100644 --- a/coriolis/api/middleware/auth.py +++ b/coriolis/api/middleware/auth.py @@ -14,22 +14,36 @@ class CoriolisKeystoneContext(wsgi.Middleware): - @webob.dec.wsgify(RequestClass=wsgi.Request) - def __call__(self, req): + def _get_project_id(self, req): + if 'X_TENANT_ID' in req.headers: + # This is the new header since Keystone went to ID/Name + return req.headers['X_TENANT_ID'] + elif 'X_TENANT' in req.headers: + # This is for legacy compatibility + return req.headers['X_TENANT'] + else: + raise webob.exc.HTTPBadRequest( + explanation=_("No 'X_TENANT_ID' or 'X_TENANT' passed.")) + + def _get_user(self, req): user = req.headers.get('X_USER') user = req.headers.get('X_USER_ID', user) if user is None: LOG.debug("Neither X_USER_ID nor X_USER found in request") return webob.exc.HTTPUnauthorized() + return user + + @webob.dec.wsgify(RequestClass=wsgi.Request) + def __call__(self, req): + user = self._get_user(req) + + if isinstance(user, webob.exc.HTTPUnauthorized): + return user + + project_id = self._get_project_id(req) # get the roles roles = [r.strip() for r in req.headers.get('X_ROLE', '').split(',')] - if 'X_TENANT_ID' in req.headers: - # This is the new header since Keystone went to ID/Name - project_id = req.headers['X_TENANT_ID'] - else: - # This is for legacy compatibility - project_id = req.headers['X_TENANT'] project_name = req.headers.get('X_TENANT_NAME') project_domain_name = req.headers.get('X-Project-Domain-Name') diff --git a/coriolis/tests/api/__init__.py b/coriolis/tests/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/coriolis/tests/api/middleware/__init__.py b/coriolis/tests/api/middleware/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/coriolis/tests/api/middleware/test_auth.py b/coriolis/tests/api/middleware/test_auth.py new file mode 100644 index 00000000..3e7f3af7 --- /dev/null +++ b/coriolis/tests/api/middleware/test_auth.py @@ -0,0 +1,229 @@ +# Copyright 2022 Cloudbase Solutions Srl +# All Rights Reserved. + +from unittest import mock + +from oslo_middleware import request_id +import webob + +from coriolis.api.middleware import auth +from coriolis.api.middleware.auth import CoriolisKeystoneContext +from coriolis.api import wsgi +from coriolis import context +from coriolis.tests import test_base + + +class CoriolisTestException(Exception): + pass + + +class CoriolisKeystoneContextTestCase(test_base.CoriolisBaseTestCase): + """Test suite for the Coriolis api middleware auth.""" + + def setUp(self): + super(CoriolisKeystoneContextTestCase, self).setUp() + self.auth = auth.CoriolisKeystoneContext(wsgi.Middleware) + + @mock.patch.object(context, "RequestContext") + @mock.patch.object(CoriolisKeystoneContext, "_get_project_id") + @mock.patch.object(CoriolisKeystoneContext, "_get_user") + def test__call__( + self, + mock_get_user, + mock__get_project_id, + mock_request_context + ): + req_mock = mock.Mock() + + expected_roles = ['1', '2', '3'] + expected_project_name = 'mock_project_name' + expected_project_domain_name = 'mock_project_domain_name' + expected_user_domain_name = 'mock_user_domain_name' + expected_auth_token = 'mock_token123' + expected_remote_address = 'mock_addr' + expected_service_catalog = {"catalog1": ["service1", "service2"]} + expected_req_id = 'mock_req_id' + + req_mock.remote_addr = expected_remote_address + req_mock.environ = { + request_id.ENV_REQUEST_ID: expected_req_id.encode() + } + + req_mock.headers = { + 'X_ROLE': '1,2,3', + 'X_TENANT_NAME': expected_project_name, + 'X-Project-Domain-Name': expected_project_domain_name, + 'X-User-Domain-Name': expected_user_domain_name, + 'X_AUTH_TOKEN': expected_auth_token, + 'X_SERVICE_CATALOG': + str(expected_service_catalog).replace("'", '"'), + } + + result = self.auth(req_mock) + + self.assertEqual( + self.auth.application, + result + ) + + mock_get_user.assert_called_once_with(req_mock) + mock__get_project_id.assert_called_once_with(req_mock) + mock_request_context.assert_called_once_with( + mock_get_user.return_value, + mock__get_project_id.return_value, + project_name=expected_project_name, + project_domain_name=expected_project_domain_name, + user_domain_name=expected_user_domain_name, + roles=expected_roles, + auth_token=expected_auth_token, + remote_address=expected_remote_address, + service_catalog=expected_service_catalog, + request_id=expected_req_id + ) + + self.assertEqual( + req_mock.environ['coriolis.context'], + mock_request_context.return_value + ) + + @mock.patch.object(CoriolisKeystoneContext, "_get_project_id") + @mock.patch.object(CoriolisKeystoneContext, "_get_user") + def test__call__invalid_service_catalog( + self, + mock_get_user, + mock__get_project_id, + ): + req_mock = mock.Mock() + + invalid_service_catalog = "mock_invalid_service_catalog" + + req_mock.headers = { + 'X_SERVICE_CATALOG': invalid_service_catalog, + } + + self.assertRaises( + webob.exc.HTTPInternalServerError, + self.auth, + req_mock, + ) + + @mock.patch.object(CoriolisKeystoneContext, "_get_user") + def test__call__no_headers( + self, + mock_get_user, + ): + req_mock = mock.Mock() + mock_get_user.side_effect = webob.exc.HTTPUnauthorized() + + req_mock.headers = {} + + self.assertRaises( + webob.exc.HTTPUnauthorized, + self.auth, + req_mock + ) + + def test_get_project_id_tenant_id(self): + req_mock = mock.Mock() + + expected_result = 'mock_tenant' + + req_mock.headers = { + 'X_TENANT_ID': expected_result, + } + + result = self.auth._get_project_id(req_mock) + + self.assertEqual( + expected_result, + result + ) + + def test_get_project_id_tenant(self): + req_mock = mock.Mock() + + expected_result = 'mock_tenant' + + req_mock.headers = { + 'X_TENANT': expected_result, + } + + result = self.auth._get_project_id(req_mock) + + self.assertEqual( + expected_result, + result + ) + + def test_get_project_id_no_tenant(self): + req_mock = mock.Mock() + + req_mock.headers = {} + + self.assertRaises( + webob.exc.HTTPBadRequest, + self.auth._get_project_id, + req_mock + ) + + def test_get_user(self): + req_mock = mock.Mock() + + mock_user = 'mock_user' + + expected_result = 'mock_user_id' + + req_mock.headers = { + 'X_USER': mock_user, + 'X_USER_ID': expected_result, + } + + result = self.auth._get_user(req_mock) + + self.assertEqual( + expected_result, + result + ) + + def test_get_user_has_user(self): + req_mock = mock.Mock() + + expected_result = 'mock_user' + + req_mock.headers = { + 'X_USER': expected_result, + } + + result = self.auth._get_user(req_mock) + + self.assertEqual( + expected_result, + result + ) + + def test_get_user_has_user_id(self): + req_mock = mock.Mock() + + expected_result = 'mock_user_id' + + req_mock.headers = { + 'X_USER_ID': expected_result, + } + + result = self.auth._get_user(req_mock) + + self.assertEqual( + expected_result, + result + ) + + def test_get_user_none(self): + req_mock = mock.Mock() + + req_mock.headers = {} + + self.assertRaises( + webob.exc.HTTPUnauthorized, + self.auth._get_user, + req_mock + ) diff --git a/coriolis/tests/api/v1/__init__py b/coriolis/tests/api/v1/__init__py new file mode 100644 index 00000000..e69de29b diff --git a/coriolis/tests/api/v1/views/__init__py b/coriolis/tests/api/v1/views/__init__py new file mode 100644 index 00000000..e69de29b