Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev1 #144

Merged
merged 3 commits into from
Aug 8, 2023
Merged

Dev1 #144

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions simulation-system/libs/csle-cli/src/csle_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from csle_common.util.cluster_util import ClusterUtil
from csle_common.util.general_util import GeneralUtil
from csle_cluster.cluster_manager.cluster_controller import ClusterController
from csle_cluster.cluster_manager.cluster_manager_pb2 import DockerContainerDTO, ContainerImageDTO
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
Expand Down Expand Up @@ -643,7 +643,7 @@ def stop_shell_complete(ctx, param, incomplete) -> List[str]:
emulations=MetastoreFacade.list_emulations())
emulations: List[str] = running_emulations
running_containers = ContainerController.list_all_running_containers()
containers: List[Tuple[str, str ,str]] = running_containers
containers: List[Tuple[str, str, str]] = running_containers
container_names: List[str] = list(map(lambda x: x[0], containers))
return ["prometheus", "node_exporter", "cadvisor", "pgadmin", "grafana", "flask",
"statsmanager", "all", "emulation_executions"] + emulations + container_names
Expand Down Expand Up @@ -1039,7 +1039,7 @@ def start_shell_complete(ctx, param, incomplete) -> List[str]:
emulations=MetastoreFacade.list_emulations())
emulations = stopped_emulations
stopped_containers = ContainerController.list_all_stopped_containers()
containers: List[Tuple[str, str ,str]] = stopped_containers
containers: List[Tuple[str, str, str]] = stopped_containers
container_names: List[str] = list(map(lambda x: x[0], containers))
images: List[Tuple[str, str, str, str, str]] = ContainerController.list_all_images()
image_names: List[str] = list(map(lambda x: x[0], images))
Expand Down Expand Up @@ -1313,7 +1313,7 @@ def rm_shell_complete(ctx, param, incomplete) -> List[str]:
emulations = list(map(lambda x: x.name, MetastoreFacade.list_emulations()))
running_containers = ContainerController.list_all_running_containers()
stopped_containers = ContainerController.list_all_stopped_containers()
containers: List[Tuple[str, str ,str]] = running_containers + stopped_containers
containers: List[Tuple[str, str, str]] = running_containers + stopped_containers
container_names: List[str] = list(map(lambda x: x[0], containers))
images: List[Tuple[str, str, str, str, str]] = ContainerController.list_all_images()
image_names: List[str] = list(map(lambda x: x[0], images))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
"""
from typing import Tuple
import json
from flask import Blueprint, Response, jsonify, request
import csle_common.constants.constants as constants
from csle_cluster.cluster_manager.cluster_controller import ClusterController
from csle_common.metastore.metastore_facade import MetastoreFacade
from flask import Blueprint, jsonify, request, Response
import csle_rest_api.constants.constants as api_constants
import csle_rest_api.util.rest_api_util as rest_api_util

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
"""
from typing import Tuple
import json
from flask import Blueprint, Response, jsonify, request
import csle_common.constants.constants as constants
from csle_cluster.cluster_manager.cluster_controller import ClusterController
from csle_common.metastore.metastore_facade import MetastoreFacade
from flask import Blueprint, jsonify, request, Response
import csle_rest_api.constants.constants as api_constants
import csle_rest_api.util.rest_api_util as rest_api_util

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import Union, Tuple
from flask import jsonify, Response
from typing import Tuple, Union
from flask import Response, jsonify
import csle_common.constants.constants as constants
from csle_common.dao.management.management_user import ManagementUser
from csle_common.metastore.metastore_facade import MetastoreFacade
import csle_rest_api.constants.constants as api_constants
from csle_common.dao.management.management_user import ManagementUser


def check_if_user_is_authorized(request, requires_admin: bool = False) -> Union[None, Tuple[Response, int]]:
Expand Down Expand Up @@ -48,7 +48,9 @@ def check_if_user_edit_is_authorized(request, user: ManagementUser) -> Union[Non
# Extract token and check if user is authorized
token = request.args.get(api_constants.MGMT_WEBAPP.TOKEN_QUERY_PARAM)
token_obj = MetastoreFacade.get_session_token_metadata(token=token)
request_user = MetastoreFacade.get_management_user_by_username(username=token_obj.username)
request_user = None
if token_obj is not None:
request_user = MetastoreFacade.get_management_user_by_username(username=token_obj.username)
if token_obj is None or token_obj.expired(valid_length_hours=api_constants.SESSION_TOKENS.EXPIRE_TIME_HOURS) \
or request_user is None or (not request_user.admin and request_user.username != user.username):
if token_obj is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import pytest_mock
import csle_common.constants.constants as constants
from csle_cluster.cluster_manager.cluster_manager_pb2 import NodeStatusDTO
from csle_common.dao.emulation_config.config import Config
import csle_rest_api.constants.constants as api_constants
from csle_rest_api.rest_api import create_app
from csle_common.dao.emulation_config.config import Config


class TestResourcespgAdminSuite():
class TestResourcespgAdminSuite:
"""
Test suite for /pgadmin resource
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import pytest_mock
import csle_common.constants.constants as constants
from csle_cluster.cluster_manager.cluster_manager_pb2 import NodeStatusDTO
from csle_common.dao.emulation_config.config import Config
import csle_rest_api.constants.constants as api_constants
from csle_rest_api.rest_api import create_app
from csle_common.dao.emulation_config.config import Config


class TestResourcesPrometheusSuite:
Expand Down
243 changes: 243 additions & 0 deletions simulation-system/libs/csle-rest-api/tests/test_rest_api_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
import json
import numpy as np
import pytest
import pytest_mock
from flask import Flask
import csle_common.constants.constants as constants
from csle_common.dao.management.management_user import ManagementUser
from csle_common.dao.management.session_token import SessionToken
import csle_rest_api.constants.constants as api_constants
import csle_rest_api.util.rest_api_util as rest_api_util
from csle_rest_api.rest_api import create_app


class TestRestAPIUtilSuite:
"""
Test suite for /experiments url
"""

class SyntReq:
"""
Mock class for synt reg
"""
def __init__(self, args) -> None:
"""
Initializes the object

:param args: the arguments for syntreg
"""
self.args = args

class Args:
"""
Mock class for syntehtic arguments to a request
"""
def __init__(self) -> None:
"""
Initializes the object
"""
pass

def get(self, token: str) -> None:
"""
Mocks the get method of the arguments

:param token: the token for authentication
:return: None
"""
return None

@pytest.fixture
def flask_app(self):
"""
Gets the Flask app

:return: the flask app fixture representing the webserver
"""
return create_app(static_folder="../../../../../management-system/csle-mgmt-webapp/build")

@pytest.fixture
def session_token(self, mocker: pytest_mock.MockFixture):
"""
Pytest fixture for mocking the get_session_token_metadata method

:param mocker: the pytest mocker object
:return: the mocked function
"""
def get_session_token_metadata(token: str) -> SessionToken:
return SessionToken(token="null", timestamp=1.5, username="JDoe")
get_session_token_metadata_mocker = mocker.MagicMock(side_effect=get_session_token_metadata)
return get_session_token_metadata_mocker

@pytest.fixture
def session_token_exp(self, mocker: pytest_mock.MockFixture):
"""
Pytest fixture for mocking the get_session_token_metadata method

:param mocker: the pytest mocker object
:return: the mocked function
"""
def get_session_token_metadata(token: str) -> SessionToken:
ses_tok = SessionToken(token="null", timestamp=1.5, username="JDoe")
api_constants.SESSION_TOKENS.EXPIRE_TIME_HOURS = np.iinfo(np.int32).max
return ses_tok
get_session_token_metadata_mocker = mocker.MagicMock(side_effect=get_session_token_metadata)
return get_session_token_metadata_mocker

@pytest.fixture
def management_user(self, mocker: pytest_mock.MockFixture):
"""
Pytest fixture for mocking the get_management_user_by_username method

:param mocker: the pytest mocker object
:return: the mocked function
"""
def get_management_user_by_username(username: str) -> ManagementUser:
mng_user = TestRestAPIUtilSuite.get_synthetic_mng_user()
return mng_user

get_management_user_by_username_mocker = mocker.MagicMock(side_effect=get_management_user_by_username)
return get_management_user_by_username_mocker

@pytest.fixture
def management_user_none(self, mocker: pytest_mock.MockFixture):
"""
Pytest fixture for mocking the get_management_user_by_username method

:param mocker: the pytest mocker object
:return: the mocked function
"""
def get_management_user_by_username(username: str) -> None:
return None

get_management_user_by_username_mocker = mocker.MagicMock(side_effect=get_management_user_by_username)
return get_management_user_by_username_mocker

@pytest.fixture
def remove(self, mocker: pytest_mock.MockFixture):
"""
Pytest fixture for mocking the reomve_session_token method

:param mocker: the pytest mocker object
:return: the mocked function
"""
def remove_session_token(session_token: SessionToken) -> None:
return None
remove_session_token_mocker = mocker.MagicMock(side_effect=remove_session_token)
return remove_session_token_mocker

@staticmethod
def get_args() -> Args:
"""
Returns a mock request argument

:return: the mocked request argument
"""
return TestRestAPIUtilSuite.Args()

@staticmethod
def get_synthetic_request(args) -> SyntReq:
"""
Static help method for returning a synthetic/mocked request, customized to work for testing without the use
of blueprint or flask app

:param args: the arguments for the mock request
:return: the syntethic request
"""
return TestRestAPIUtilSuite.SyntReq(args)

@staticmethod
def get_synthetic_mng_user() -> ManagementUser:
"""
Static help method for returning a synthetic/mocked management user

:return: the mocked management user
"""
mng_user = ManagementUser(username="JDoe", password="JDoe", email="[email protected]",
first_name="John", last_name="Doe", organization="CSLE",
admin=False, salt="null")
return mng_user

def test_util(self, flask_app, mocker: pytest_mock.MockFixture,
session_token, session_token_exp,
management_user, management_user_none, remove):
"""
Test method for the rest-api util

:param flask_app: the flask_app fixture
:param mocker: the pytest mocker object
:param session_token: the session_token fixture
:param management_user: the management_user fixture
:param remove: the remove fixture
"""
mocker.patch("csle_common.metastore.metastore_facade.MetastoreFacade.get_session_token_metadata",
side_effect=session_token)
mocker.patch("csle_common.metastore.metastore_facade.MetastoreFacade.get_management_user_by_username",
side_effect=management_user)
mocker.patch("csle_common.metastore.metastore_facade.MetastoreFacade.remove_session_token", side_effect=remove)
app = Flask(__name__)
mng_user = TestRestAPIUtilSuite.get_synthetic_mng_user()
args = TestRestAPIUtilSuite.get_args()
req = TestRestAPIUtilSuite.get_synthetic_request(args)
with app.app_context():
response = rest_api_util.check_if_user_is_authorized(request=req)
response1 = rest_api_util.check_if_user_edit_is_authorized(request=req, user=mng_user)
assert response is not None
response_data = response[0].data.decode("utf-8")
response_data_dict = json.loads(response_data)
response_status_code = response[1]
assert response_data_dict == {}
assert response_status_code == constants.HTTPS.UNAUTHORIZED_STATUS_CODE
assert response1 is not None
response_data1 = response1[0].data.decode("utf-8")
response_data_dict1 = json.loads(response_data1)
response_status_code1 = response1[1]
assert response_data_dict1 == {}
assert response_status_code1 == constants.HTTPS.UNAUTHORIZED_STATUS_CODE
mocker.patch("csle_common.metastore.metastore_facade.MetastoreFacade.get_management_user_by_username",
side_effect=management_user_none)
with app.app_context():
response1 = rest_api_util.check_if_user_edit_is_authorized(request=req, user=mng_user)
assert response1 is not None
response_data1 = response1[0].data.decode("utf-8")
response_data_dict1 = json.loads(response_data1)
response_status_code1 = response1[1]
assert response_data_dict1 == {}
assert response_status_code1 == constants.HTTPS.UNAUTHORIZED_STATUS_CODE
mocker.patch("csle_common.metastore.metastore_facade.MetastoreFacade.get_management_user_by_username",
side_effect=management_user)
with app.app_context():
response = rest_api_util.check_if_user_is_authorized(request=req)
assert response is not None
response_data = response[0].data.decode("utf-8")
response_data_dict = json.loads(response_data)
response_status_code = response[1]
assert response_data_dict == {}
assert response_status_code == constants.HTTPS.UNAUTHORIZED_STATUS_CODE
mocker.patch("csle_common.metastore.metastore_facade.MetastoreFacade.get_session_token_metadata",
side_effect=session_token_exp)
with app.app_context():
response = rest_api_util.check_if_user_is_authorized(request=req)
response1 = rest_api_util.check_if_user_edit_is_authorized(request=req, user=mng_user)
assert response is None
assert response1 is not None
assert isinstance(response1, ManagementUser)
ex_mng_user = TestRestAPIUtilSuite.get_synthetic_mng_user()
assert response1.username == ex_mng_user.username
assert response1.password == ex_mng_user.password
assert response1.admin == ex_mng_user.admin
assert response1.id == ex_mng_user.id
assert response1.salt == ex_mng_user.salt
assert response1.email == ex_mng_user.email
assert response1.first_name == ex_mng_user.first_name
assert response1.last_name == ex_mng_user.last_name
assert response1.organization == ex_mng_user.organization
assert response_status_code1 == constants.HTTPS.UNAUTHORIZED_STATUS_CODE
with app.app_context():
response = rest_api_util.check_if_user_is_authorized(request=req, requires_admin=True)
assert response is not None
response_data = response[0].data.decode("utf-8")
response_data_dict = json.loads(response_data)
response_status_code = response[1]
assert response_data_dict == {}
assert response_status_code == constants.HTTPS.UNAUTHORIZED_STATUS_CODE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Tuple, Optional
from typing import List, Tuple
import time
import os
import sys
Expand Down
Loading