diff --git a/application/cmd/cre_main.py b/application/cmd/cre_main.py index c5ad108b2..ef724b374 100644 --- a/application/cmd/cre_main.py +++ b/application/cmd/cre_main.py @@ -93,7 +93,7 @@ def register_node(node: defs.Node, collection: db.Node_collection) -> db.Node: register_node(node=link.document, collection=collection) elif type(link.document).__name__ == defs.CRE.__name__: - # dbcre = register_cre(link.document, collection) # CREs are idempotent + # dbcre,_ = register_cre(link.document, collection) # CREs are idempotent c = collection.get_CREs(name=link.document.name)[0] dbcre = db.dbCREfromCRE(c) collection.add_link(dbcre, linked_node, type=link.ltype) @@ -109,14 +109,19 @@ def register_node(node: defs.Node, collection: db.Node_collection) -> db.Node: return linked_node -def register_cre(cre: defs.CRE, collection: db.Node_collection) -> db.CRE: +def register_cre(cre: defs.CRE, collection: db.Node_collection) -> Tuple[db.CRE, bool]: + existing = False + if collection.get_CREs(name=cre.id): + existing = True + dbcre: db.CRE = collection.add_cre(cre) for link in cre.links: if type(link.document) == defs.CRE: logger.info(f"{link.document.id} {link.ltype} {cre.id}") + lower_cre, _ = register_cre(link.document, collection) collection.add_internal_link( higher=dbcre, - lower=register_cre(link.document, collection), + lower=lower_cre, type=link.ltype, ) else: @@ -125,7 +130,7 @@ def register_cre(cre: defs.CRE, collection: db.Node_collection) -> db.CRE: node=register_node(node=link.document, collection=collection), type=link.ltype, ) - return dbcre + return dbcre, existing def parse_file( @@ -209,7 +214,7 @@ def parse_file( def register_standard( standard_entries: List[defs.Standard], - collection: db.Node_collection, + collection: db.Node_collection = None, generate_embeddings=True, calculate_gap_analysis=True, db_connection_str: str = "", @@ -218,15 +223,17 @@ def register_standard( generate_embeddings = False if not standard_entries: - logger.warning("register_standard() calleed with no standard_entries") + logger.warning("register_standard() called with no standard_entries") return - if not collection: + + if collection is None: collection = db_connect(path=db_connection_str) + conn = redis.connect() ph = prompt_client.PromptHandler(database=collection) importing_name = standard_entries[0].name standard_hash = gap_analysis.make_resources_key([importing_name]) - if conn.get(standard_hash): + if calculate_gap_analysis and conn.get(standard_hash): logger.info( f"Standard importing job with info-hash {standard_hash} has already returned, skipping" ) @@ -248,11 +255,12 @@ def register_standard( generate_embeddings = False if generate_embeddings and importing_name: ph.generate_embeddings_for(importing_name) - populate_neo4j_db(collection) - # calculate gap analysis - jobs = [] - pending_stadards = collection.standards() + if calculate_gap_analysis and not os.environ.get("CRE_NO_CALCULATE_GAP_ANALYSIS"): + # calculate gap analysis + populate_neo4j_db(collection) + jobs = [] + pending_stadards = collection.standards() for standard_name in pending_stadards: if standard_name == importing_name: continue @@ -297,13 +305,7 @@ def parse_standards_from_spreadsheeet( ) -> None: """given a yaml with standards, build a list of standards in the db""" collection = db_connect(cache_location) - if "CRE:name" in cre_file[0].keys(): - collection = collection.with_graph() - documents = spreadsheet_parsers.parse_export_format(cre_file) - register_cre(documents, collection) - pass - - elif any(key.startswith("CRE hierarchy") for key in cre_file[0].keys()): + if any(key.startswith("CRE hierarchy") for key in cre_file[0].keys()): conn = redis.connect() collection = collection.with_graph() redis.empty_queues(conn) @@ -659,9 +661,7 @@ def create_spreadsheet( ) -> Any: """Reads cre docs exported from a standards_collection.export() dumps each doc into a workbook""" - flat_dicts = sheet_utils.prepare_spreadsheet( - collection=collection, docs=exported_documents - ) + flat_dicts = sheet_utils.prepare_spreadsheet(docs=exported_documents) return sheet_utils.write_spreadsheet( title=title, docs=flat_dicts, emails=share_with ) diff --git a/application/database/db.py b/application/database/db.py index 00aaee600..de2b24f10 100644 --- a/application/database/db.py +++ b/application/database/db.py @@ -1,6 +1,26 @@ +import networkx as nx +import uuid +import neo4j +import os +import logging +import re +import yaml + +from pprint import pprint + +from collections import Counter +from itertools import permutations +from typing import Any, Dict, List, Optional, Tuple, cast +from neomodel.exceptions import ( + DoesNotExist, + FeatureNotSupported, + NodeClassAlreadyDefined, +) from flask import json as flask_json -import json -from application.utils import redis +from sqlalchemy.orm import aliased +from flask_sqlalchemy.model import DefaultMeta +from sqlalchemy import func, delete + from neomodel import ( config, StructuredNode, @@ -13,27 +33,10 @@ StructuredRel, db, ) -from neomodel.exceptions import ( - DoesNotExist, - FeatureNotSupported, - NodeClassAlreadyDefined, -) -import neo4j -from sqlalchemy.orm import aliased -import os -import logging -import re -from collections import Counter -from itertools import permutations -from typing import Any, Dict, List, Optional, Tuple, cast -import networkx as nx -import yaml +from application.database import inmemory_graph +from application.utils import redis from application.defs import cre_defs from application.utils import file -from flask_sqlalchemy.model import DefaultMeta -from sqlalchemy import func, delete -import uuid - from application.utils.gap_analysis import ( get_path_score, make_resources_key, @@ -669,78 +672,6 @@ def parse_node_no_links(node: NeoDocument) -> cre_defs.Document: return node.to_cre_def(node, parse_links=False) -class CRE_Graph: - graph: nx.Graph = None - __instance = None - - @classmethod - def instance(cls, session): - if cls.__instance is None: - cls.__instance = cls.__new__(cls) - cls.graph = cls.load_cre_graph(session) - return cls.__instance - - def __init__(sel): - raise ValueError("CRE_Graph is a singleton, please call instance() instead") - - def add_edge(self, *args, **kwargs): - return self.graph.add_edge(*args, **kwargs) - - def add_node(self, *args, **kwargs): - return self.graph.add_node(*args, **kwargs) - - @classmethod - def add_cre(cls, dbcre: CRE, graph: nx.DiGraph) -> nx.DiGraph: - if dbcre: - graph.add_node( - f"CRE: {dbcre.id}", internal_id=dbcre.id, external_id=dbcre.external_id - ) - else: - logger.error("Called with dbcre being none") - return graph - - @classmethod - def add_dbnode(cls, dbnode: Node, graph: nx.DiGraph) -> nx.DiGraph: - if dbnode: - # coma separated tags - - graph.add_node( - "Node: " + str(dbnode.id), - internal_id=dbnode.id, - ) - else: - logger.error("Called with dbnode being none") - return graph - - @classmethod - def load_cre_graph(cls, session) -> nx.Graph: - graph = nx.DiGraph() - for il in session.query(InternalLinks).all(): - group = session.query(CRE).filter(CRE.id == il.group).first() - if not group: - logger.error(f"CRE {il.group} does not exist?") - graph = cls.add_cre(dbcre=group, graph=graph) - - cre = session.query(CRE).filter(CRE.id == il.cre).first() - if not cre: - logger.error(f"CRE {il.cre} does not exist?") - graph = cls.add_cre(dbcre=cre, graph=graph) - - graph.add_edge(f"CRE: {il.group}", f"CRE: {il.cre}", ltype=il.type) - - for lnk in session.query(Links).all(): - node = session.query(Node).filter(Node.id == lnk.node).first() - if not node: - logger.error(f"Node {lnk.node} does not exist?") - graph = cls.add_dbnode(dbnode=node, graph=graph) - - cre = session.query(CRE).filter(CRE.id == lnk.cre).first() - graph = cls.add_cre(dbcre=cre, graph=graph) - - graph.add_edge(f"CRE: {lnk.cre}", f"Node: {str(lnk.node)}", ltype=lnk.type) - return graph - - class Node_collection: graph: nx.Graph = None neo_db: NEO_DB = None @@ -751,9 +682,11 @@ def __init__(self) -> None: self.neo_db = NEO_DB.instance() self.session = sqla.session - def with_graph(self): + def with_graph(self) -> "Node_collection": logger.info("Loading CRE graph in memory, memory-heavy operation!") - self.graph = CRE_Graph.instance(sqla.session) + self.graph = inmemory_graph.CRE_Graph.instance( + documents=self.__get_all_nodes_and_cres() + ) return self def __get_external_links(self) -> List[Tuple[CRE, Node, str]]: @@ -803,6 +736,19 @@ def __get_unlinked_cres(self) -> List[CRE]: ) return cres + def __get_all_nodes_and_cres(self) -> List[cre_defs.Document]: + result = [] + nodes = [] + cres = [] + node_ids = self.session.query(Node.id).all() + for nid in node_ids: + result.extend(self.get_nodes(db_id=nid[0])) + + cre_ids = self.session.query(CRE.id).all() + for cid in cre_ids: + result.append(self.get_cre_by_db_id(cid[0])) + return result + def __introduces_cycle(self, node_from: str, node_to: str) -> Any: if not self.graph: logger.error("graph is null") @@ -1035,19 +981,24 @@ def get_nodes( description: Optional[str] = None, ntype: str = cre_defs.Standard.__name__, sectionID: Optional[str] = None, + db_id: Optional[str] = None, ) -> Optional[List[cre_defs.Node]]: nodes = [] - nodes_query = self.__get_nodes_query__( - name=name, - section=section, - subsection=subsection, - link=link, - version=version, - partial=partial, - ntype=ntype, - description=description, - sectionID=sectionID, - ) + nodes_query = None + if db_id: + nodes_query = self.session.query(Node).filter(Node.id == db_id) + else: + nodes_query = self.__get_nodes_query__( + name=name, + section=section, + subsection=subsection, + link=link, + version=version, + partial=partial, + ntype=ntype, + description=description, + sectionID=sectionID, + ) dbnodes = nodes_query.all() if dbnodes: for dbnode in dbnodes: @@ -1084,11 +1035,20 @@ def get_nodes( return [] - def get_node_by_db_id(self, id: str) -> cre_defs.Node: - return nodeFromDB(self.session.query(Node).filter(Node.id == id).first()) - def get_cre_by_db_id(self, id: str) -> cre_defs.CRE: - return CREfromDB(self.session.query(CRE).filter(CRE.id == id).first()) + """internal method, returns a shallow cre (no links) by its database id + + Args: + id (str): the uuid of the cre + + Returns: + cre_defs.CRE: _description_ + """ + external_id = self.session.query(CRE.external_id).filter(CRE.id == id).first() + if not external_id: + logger.error(f"CRE {id} does not exist in the db") + return None + return self.get_CREs(external_id=external_id[0])[0] def list_node_ids_by_ntype(self, ntype: str) -> List[str]: return self.session.query(Node.id).filter(Node.ntype == ntype).all() @@ -1221,12 +1181,13 @@ def get_CREs( for ls in linked_nodes: nd = self.session.query(Node).filter(Node.id == ls.node).first() if not include_only or (include_only and nd.name in include_only): - cre.add_link( - cre_defs.Link( - document=nodeFromDB(nd), - ltype=cre_defs.LinkTypes.from_str(ls.type), + n = nodeFromDB(nd) + if not cre.link_exists(n): + cre.add_link( + cre_defs.Link( + document=n, ltype=cre_defs.LinkTypes.from_str(ls.type) + ) ) - ) # TODO figure the query to merge the following two internal_links = ( self.session.query(InternalLinks) @@ -1258,7 +1219,9 @@ def get_CREs( elif il.group == dbcre.id: res = q.filter(CRE.id == il.cre).first() ltype = cre_defs.LinkTypes.from_str(il.type) - cre.add_link(cre_defs.Link(document=CREfromDB(res), ltype=ltype)) + c = CREfromDB(res) + if not cre.link_exists(c): + cre.add_link(cre_defs.Link(document=c, ltype=ltype)) cres.append(cre) return cres @@ -1366,6 +1329,51 @@ def all_cres_with_pagination( result.extend(self.get_CREs(external_id=cre.external_id)) return result, page, total_pages + def get_cre_path(self, fromID: str, toID: str) -> List[cre_defs.Document]: + if not self.graph: + self.with_graph() + + fromDbID = ( + self.session.query(CRE.id).filter(CRE.external_id == fromID).first()[0] + ) + toDbID = self.session.query(CRE.id).filter(CRE.external_id == toID).first()[0] + + forwardPath = self.graph.get_path(f"CRE: {fromDbID}", f"CRE: {toDbID}") + backwardsPath = self.graph.get_path(f"CRE: {toDbID}", f"CRE: {fromDbID}") + cres = [] + path = [] + if forwardPath: # our graph is directed, so we need to check both paths + path = forwardPath + else: + path = backwardsPath + + for entry in path: + entryID = entry.replace("CRE: ", "") + shallow_CRE = self.get_cre_by_db_id(entryID) + + if shallow_CRE: + cres.append(self.get_CREs(external_id=shallow_CRE.id)[0]) + return cres + + def get_cre_hierarchy(self, cre: cre_defs.CRE) -> int: + if not self.graph: + self.with_graph() + roots = self.get_root_cres() + root_cre_db_ids = [] + for r in roots: + dbid = self.session.query(CRE.id).filter(CRE.external_id == r.id).first()[0] + root_cre_db_ids.append(dbid) + + credbid = self.session.query(CRE.id).filter(CRE.external_id == cre.id).first() + if not credbid: + raise ValueError(f"CRE {cre.id} does not exist in the database") + credbid = credbid[0] + + if len(self.graph.graph.edges) == 0: + logger.error("graph is empty") + return -1 + return self.graph.get_hierarchy(rootIDs=root_cre_db_ids, creID=credbid) + # def all_nodes_with_pagination( # self, page: int = 1, per_page: int = 10 # ) -> List[cre_defs.Document]: diff --git a/application/database/inmemory_graph.py b/application/database/inmemory_graph.py new file mode 100644 index 000000000..c8c4f1e8b --- /dev/null +++ b/application/database/inmemory_graph.py @@ -0,0 +1,135 @@ +import sys +import networkx as nx +from typing import List, Tuple +from pprint import pprint +from application.defs import cre_defs as defs + + +class CRE_Graph: + graph: nx.Graph = None + __parent_child_subgraph = None + __instance = None + + @classmethod + def instance(cls, documents: List[defs.Document] = None) -> "CRE_Graph": + if cls.__instance is None: + cls.__instance = cls.__new__(cls) + cls.graph = nx.DiGraph() + cls.graph = cls.__load_cre_graph(documents=documents) + return cls.__instance + + def __init__(sel): + raise ValueError("CRE_Graph is a singleton, please call instance() instead") + + def add_edge(self, *args, **kwargs): + return self.graph.add_edge(*args, **kwargs) + + def add_node(self, *args, **kwargs): + return self.graph.add_node(*args, **kwargs) + + def get_hierarchy(self, rootIDs: List[str], creID: str): + if creID in rootIDs: + return 0 + + if self.__parent_child_subgraph == None: + if len(self.graph.edges) == 0: + raise ValueError("Graph has no edges") + include_cres = [] + for el in self.graph.edges: + edge_data = self.graph.get_edge_data(*el) + if ( + el[0].startswith("CRE") + and el[1].startswith("CRE") + and ( + edge_data["ltype"] == defs.LinkTypes.Contains + or edge_data["ltype"] == defs.LinkTypes.PartOf + ) + ): + include_cres.append(el[0]) + include_cres.append(el[1]) + + for el in rootIDs: + if ( + el not in include_cres + ): # If the root is not in the parent/children graph, add it to prevent an error and continue, there is not path to our CRE anyway + include_cres.append(f"CRE: {el}") + self.__parent_child_subgraph = self.graph.subgraph(set(include_cres)) + + shortest_path = sys.maxsize + for root in rootIDs: + try: + shortest_path = min( + shortest_path, + len( + nx.shortest_path( + self.__parent_child_subgraph, + f"CRE: {root}", + f"CRE: {creID}", + ) + ) + - 1, + ) + except ( + nx.NodeNotFound + ) as nnf: # If the CRE is not in the parent/children graph it means that it's a lone CRE, so it's a root and we return 0 + return 0 + except ( + nx.NetworkXNoPath + ) as nxnp: # If there is no path to the CRE, continue + continue + return shortest_path + + def get_path(self, start: str, end: str) -> List[Tuple[str, str]]: + try: + return nx.shortest_path(self.graph, start, end) + except nx.NetworkXNoPath: + return [] + + @classmethod + def add_cre(cls, dbcre: defs.CRE, graph: nx.DiGraph) -> nx.DiGraph: + if dbcre: + cls.graph.add_node(f"CRE: {dbcre.id}", internal_id=dbcre.id) + else: + logger.error("Called with dbcre being none") + return graph + + @classmethod + def add_dbnode(cls, dbnode: defs.Node, graph: nx.DiGraph) -> nx.DiGraph: + if dbnode: + cls.graph.add_node( + "Node: " + str(dbnode.id), + internal_id=dbnode.id, + ) + else: + logger.error("Called with dbnode being none") + return graph + + @classmethod + def __load_cre_graph(cls, documents: List[defs.Document]) -> nx.Graph: + graph = cls.graph + if not graph: + graph = nx.DiGraph() + + for doc in documents: + from_doctype = None + if doc.doctype == defs.Credoctypes.CRE: + graph = cls.add_cre(dbcre=doc, graph=graph) + from_doctype = defs.Credoctypes.CRE + else: + graph = cls.add_dbnode(dbnode=doc, graph=graph) + from_doctype = doc.doctype + for link in doc.links: + to_doctype = None + if link.document.doctype == defs.Credoctypes.CRE: + graph = cls.add_cre(dbcre=link.document, graph=graph) + to_doctype = defs.Credoctypes.CRE + else: + graph = cls.add_dbnode(dbnode=link.document, graph=graph) + to_doctype = "Node" + graph.add_edge( + f"{from_doctype}: {doc.id}", + f"{to_doctype}: {link.document.id}", + ltype=link.ltype, + ) + cls.graph = graph + return graph diff --git a/application/defs/cre_defs.py b/application/defs/cre_defs.py index 4380d82d2..216ec67f5 100644 --- a/application/defs/cre_defs.py +++ b/application/defs/cre_defs.py @@ -7,22 +7,20 @@ from application.defs import cre_exceptions -class ExportFormat( - Enum -): # TODO: this can likely be replaced with a method that iterates over an object's vars and formats headers to +class ExportFormat: # TODO: this can likely be replaced with a method that iterates over an object's vars and formats headers to # :: separator = "|" - section = "section" - subsection = "subsection" + section = "name" + subsection = "section" hyperlink = "hyperlink" link_type = "link_type" - name = "name" + # name = "name" id = "id" description = "description" cre_link = "Linked_CRE_" cre = "CRE" tooltype = "ToolType" - sectionID = "SectionID" + sectionID = "id" @staticmethod def get_doctype(header: str) -> Optional["Credoctypes"]: @@ -40,144 +38,135 @@ def get_doctype(header: str) -> Optional["Credoctypes"]: def node_name_key(sname: str) -> str: """returns :: used mostly for matching""" return "%s%s%s" % ( - ExportFormat.separator.value, + ExportFormat.separator, sname, - ExportFormat.separator.value, + ExportFormat.separator, ) - @staticmethod - def tooltype_key(sname: str, doctype: "Credoctypes") -> str: - "returns ::tooltype" - return "%s%s%s%s%s" % ( - doctype.value, - ExportFormat.separator.value, - sname, - ExportFormat.separator.value, - ExportFormat.tooltype.value, - ) + # @staticmethod + # def tooltype_key(sname: str, doctype: "Credoctypes") -> str: + # "returns ::tooltype" + # return "%s%s%s%s%s" % ( + # doctype, + # ExportFormat.separator, + # sname, + # ExportFormat.separator, + # ExportFormat.tooltype, + # ) @staticmethod - def sectionID_key(sname: str, doctype: "Credoctypes") -> str: - "returns ::sectionID" - return "%s%s%s%s%s" % ( - doctype.value, - ExportFormat.separator.value, - sname, - ExportFormat.separator.value, - ExportFormat.sectionID.value, - ) - - @staticmethod - def description_key(sname: str, doctype: "Credoctypes") -> str: - "returns ::description" - return "%s%s%s%s%s" % ( - doctype.value, - ExportFormat.separator.value, - sname, - ExportFormat.separator.value, - ExportFormat.description.value, - ) - - @staticmethod - def section_key(sname: str, doctype: "Credoctypes") -> str: - "returns ::section" - return "%s%s%s%s%s" % ( - doctype.value, - ExportFormat.separator.value, - sname, - ExportFormat.separator.value, - ExportFormat.section.value, - ) - - @staticmethod - def subsection_key(sname: str, doctype: "Credoctypes") -> str: - "returns ::subsection" - return "%s%s%s%s%s" % ( - doctype.value, - ExportFormat.separator.value, + def sectionID_key(sname: str) -> str: + "returns |id" + return "%s%s%s" % ( sname, - ExportFormat.separator.value, - ExportFormat.subsection.value, + ExportFormat.separator, + ExportFormat.sectionID, ) @staticmethod - def hyperlink_key(sname: str, doctype: "Credoctypes") -> str: - "returns :hyperlink" - return "%s%s%s%s%s" % ( - doctype.value, - ExportFormat.separator.value, + def description_key(sname: str) -> str: + "returns |description" + return "%s%s%s" % ( sname, - ExportFormat.separator.value, - ExportFormat.hyperlink.value, + ExportFormat.separator, + ExportFormat.description, ) @staticmethod - def link_type_key(sname: str, doctype: "Credoctypes") -> str: - "returns :link_type" - return "%s%s%s%s%s" % ( - doctype.value, - ExportFormat.separator.value, + def section_key(sname: str) -> str: + "returns |name" + return "%s%s%s" % ( sname, - ExportFormat.separator.value, - ExportFormat.link_type.value, - ) - - @staticmethod - def linked_cre_id_key(name: str) -> str: - "returns Linked_CRE_:id" - return "%s%s%s%s" % ( - ExportFormat.cre_link.value, - name, - ExportFormat.separator.value, - ExportFormat.id.value, - ) - - @staticmethod - def linked_cre_name_key(name: str) -> str: - "returns Linked_CRE_:name" - return "%s%s%s%s" % ( - ExportFormat.cre_link.value, - name, - ExportFormat.separator.value, - ExportFormat.name.value, - ) - - @staticmethod - def linked_cre_link_type_key(name: str) -> str: - "returns Linked_CRE_:link_type" - return "%s%s%s%s" % ( - ExportFormat.cre_link.value, - name, - ExportFormat.separator.value, - ExportFormat.link_type.value, + ExportFormat.separator, + ExportFormat.section, ) @staticmethod - def cre_id_key() -> str: - "returns CRE:id" + def subsection_key(sname: str) -> str: + "returns |section" return "%s%s%s" % ( - ExportFormat.cre.value, - ExportFormat.separator.value, - ExportFormat.id.value, + sname, + ExportFormat.separator, + ExportFormat.subsection, ) @staticmethod - def cre_name_key() -> str: - "returns CRE:name" + def hyperlink_key(sname: str) -> str: + "returns |hyperlink" return "%s%s%s" % ( - ExportFormat.cre.value, - ExportFormat.separator.value, - ExportFormat.name.value, + sname, + ExportFormat.separator, + ExportFormat.hyperlink, ) - @staticmethod - def cre_description_key() -> str: - "returns CRE:description" - return "%s%s%s" % ( - ExportFormat.cre.value, - ExportFormat.separator.value, - ExportFormat.description.value, - ) + # Todo(northdpole): the following to be uncommented when we import complex linktypes + # @staticmethod + # def link_type_key(sname: str, doctype: "Credoctypes") -> str: + # "returns :link_type" + # return "%s%s%s%s%s" % ( + # doctype.value, + # ExportFormat.separator, + # sname, + # ExportFormat.separator, + # ExportFormat.link_type, + # ) + + # @staticmethod + # def linked_cre_id_key(name: str) -> str: + # "returns Linked_CRE_:id" + # return "%s%s%s%s" % ( + # ExportFormat.cre_link, + # name, + # ExportFormat.separator, + # ExportFormat.id, + # ) + + # @staticmethod + # def linked_cre_name_key(name: str) -> str: + # "returns Linked_CRE_:name" + # return "%s%s%s%s" % ( + # ExportFormat.cre_link, + # name, + # ExportFormat.separator, + # ExportFormat.name, + # ) + + # @staticmethod + # def linked_cre_link_type_key(name: str) -> str: + # "returns Linked_CRE_:link_type" + # return "%s%s%s%s" % ( + # ExportFormat.cre_link, + # name, + # ExportFormat.separator, + # ExportFormat.link_type, + # ) + + # @staticmethod + # def cre_id_key() -> str: + # "returns CRE:id" + # return "%s%s%s" % ( + # ExportFormat.cre, + # ExportFormat.separator, + # ExportFormat.id, + # ) + + # @staticmethod + # def cre_name_key() -> str: + # "returns CRE:name" + # return "%s%s%s" % ( + # ExportFormat.cre, + # ExportFormat.separator, + # ExportFormat.name, + # ) + + # @staticmethod + # def cre_description_key() -> str: + # "returns CRE:description" + # return "%s%s%s" % ( + # ExportFormat.cre, + # ExportFormat.separator, + # ExportFormat.description, + # ) class EnumMetaWithContains(EnumMeta): @@ -381,10 +370,17 @@ def add_link(self, link: Link) -> "Document": self.links = [] if not isinstance(link, Link): raise ValueError("add_link only takes Link() types") + if link.document.id == self.id: + raise ValueError("Cannot link a document to itself") + if link.document.id in [l.document.id for l in self.links]: + raise ValueError("Cannot link the same document twice") self.links.append(link) return self + def link_exists(self, doc: "Document") -> bool: + return doc.id in [l.document.id for l in self.links] + def __post_init__(self): if not len(self.name) > 1: raise cre_exceptions.InvalidDocumentNameException(self) diff --git a/application/prompt_client/prompt_client.py b/application/prompt_client/prompt_client.py index 83b811935..c3838298a 100644 --- a/application/prompt_client/prompt_client.py +++ b/application/prompt_client/prompt_client.py @@ -148,7 +148,7 @@ def generate_embeddings( logger.info(f"generating {len(missing_embeddings)} embeddings") for id in missing_embeddings: cre = database.get_cre_by_db_id(id) - node = database.get_node_by_db_id(id) + node = database.get_nodes(db_id=id) content = "" if node: if is_valid_url(node.hyperlink): @@ -464,7 +464,7 @@ def generate_text(self, prompt: str) -> Dict[str, str]: ) closest_object = None if closest_id: - closest_object = self.database.get_node_by_db_id(closest_id) + closest_object = self.database.get_nodes(db_id=closest_id) logger.info( f"The prompt {prompt}, was most similar to object \n{closest_object}\n, with similarity:{similarity}" diff --git a/application/tests/cre_main_test.py b/application/tests/cre_main_test.py index 57e08bacc..2a55b461a 100644 --- a/application/tests/cre_main_test.py +++ b/application/tests/cre_main_test.py @@ -205,8 +205,10 @@ def test_register_cre(self) -> None: tags=["CREt1", "CREt2"], metadata={"tags": ["CREl1", "CREl2"]}, ) - self.assertEqual(main.register_cre(cre, self.collection).name, cre.name) - self.assertEqual(main.register_cre(cre, self.collection).external_id, cre.id) + c, _ = main.register_cre(cre, self.collection) + self.assertEqual(c.name, cre.name) + + self.assertEqual(c.external_id, cre.id) self.assertEqual( len(self.collection.session.query(db.CRE).all()), 1 ) # 1 cre in the db diff --git a/application/tests/db_test.py b/application/tests/db_test.py index 5ce4f6c92..588936618 100644 --- a/application/tests/db_test.py +++ b/application/tests/db_test.py @@ -1,3 +1,4 @@ +from pprint import pprint from application.utils.gap_analysis import make_resources_key, make_subresources_key import string import random @@ -14,6 +15,7 @@ from flask import json as flask_json import yaml +from application.tests.utils.data_gen import export_format_data from application import create_app, sqla # type: ignore from application.database import db from application.defs import cre_defs as defs @@ -33,7 +35,6 @@ def setUp(self) -> None: self.collection = db.Node_collection().with_graph() collection = self.collection - # collection.graph.graph = db.CRE_Graph.load_cre_graph(sqla.session) dbcre = collection.add_cre( defs.CRE(id="111-000", description="CREdesc", name="CREname") @@ -2140,6 +2141,53 @@ def test_all_cres_with_pagination(self): self.assertEqual(page, 1) self.assertEqual(total_pages, 4) - -if __name__ == "__main__": - unittest.main() + def test_get_cre_hierarchy(self) -> None: + collection = db.Node_collection().with_graph() + _, inputDocs = export_format_data() + importItems = [] + for name, items in inputDocs.items(): + for item in items: + importItems.append(item) + if name == defs.Credoctypes.CRE: + dbitem = collection.add_cre(item) + else: + dbitem = collection.add_node(item) + for link in item.links: + if link.document.doctype == defs.Credoctypes.CRE: + linked_item = collection.add_cre(link.document) + if item.doctype == defs.Credoctypes.CRE: + collection.add_internal_link( + dbitem, linked_item, type=link.ltype + ) + else: + collection.add_link( + node=dbitem, cre=linked_item, type=link.ltype + ) + else: + linked_item = collection.add_node(link.document) + if item.doctype == defs.Credoctypes.CRE: + collection.add_link( + cre=dbitem, node=linked_item, type=link.ltype + ) + else: + collection.add_internal_link( + cre=linked_item, node=dbitem, type=link.ltype + ) + + cres = inputDocs[defs.Credoctypes.CRE] + c0 = [c for c in cres if c.name == "C0"][0] + self.assertEqual(collection.get_cre_hierarchy(c0), 0) + c2 = [c for c in cres if c.name == "C2"][0] + self.assertEqual(collection.get_cre_hierarchy(c2), 1) + c3 = [c for c in cres if c.name == "C3"][0] + self.assertEqual(collection.get_cre_hierarchy(c3), 2) + c4 = [c for c in cres if c.name == "C4"][0] + self.assertEqual(collection.get_cre_hierarchy(c4), 3) + c5 = [c for c in cres if c.name == "C5"][0] + self.assertEqual(collection.get_cre_hierarchy(c5), 4) + c6 = [c for c in cres if c.name == "C6"][0] + self.assertEqual(collection.get_cre_hierarchy(c6), 0) + c7 = [c for c in cres if c.name == "C7"][0] + self.assertEqual(collection.get_cre_hierarchy(c7), 0) + c8 = [c for c in cres if c.name == "C8"][0] + self.assertEqual(collection.get_cre_hierarchy(c8), 0) diff --git a/application/tests/defs_test.py b/application/tests/defs_test.py index 0f8707e1d..b41c3f2b4 100644 --- a/application/tests/defs_test.py +++ b/application/tests/defs_test.py @@ -202,15 +202,6 @@ def test_doc_equality(self) -> None: s1_with_link = copy.deepcopy(d1).add_link(defs.Link(document=s2)) self.assertNotEqual(s1_with_link, d1) - # assert recursive link equality works - s1_with_link.links[0].document.add_link(defs.Link(document=c[0])) - self.assertEqual(s1_with_link, copy.deepcopy(s1_with_link)) - s1_with_link_copy = copy.deepcopy(s1_with_link) - s1_with_link_copy.links[0].document.links[0].document.add_link( - defs.Link(document=c[1]) - ) - self.assertFalse(s1_with_link.__eq__(s1_with_link_copy)) - def test_standards_equality(self) -> None: s1 = defs.Standard( name="s1", @@ -274,17 +265,6 @@ def test_standards_equality(self) -> None: s1_with_link = copy.deepcopy(s1).add_link(defs.Link(document=s2)) self.assertNotEqual(s1_with_link, s1) - # assert recursive link equality works - s1_with_link.links[0].document.add_link( - defs.Link(document=s[list(s.keys())[0]]) - ) - self.assertEqual(s1_with_link, copy.deepcopy(s1_with_link)) - s1_with_link_copy = copy.deepcopy(s1_with_link) - s1_with_link_copy.links[0].document.links[0].document.add_link( - defs.Link(document=s[list(s.keys())[1]]) - ) - self.assertFalse(s1_with_link.__eq__(s1_with_link_copy)) - def test_add_link(self) -> None: tool = defs.Tool(name="mctoolface") tool2 = defs.Tool(name="mctoolface2") diff --git a/application/tests/spreadsheet_parsers_test.py b/application/tests/spreadsheet_parsers_test.py index 324fa481f..800f1b5ce 100644 --- a/application/tests/spreadsheet_parsers_test.py +++ b/application/tests/spreadsheet_parsers_test.py @@ -1,3 +1,5 @@ +import json +from pprint import pprint import unittest from application.tests.utils import data_gen from application.defs import cre_defs as defs @@ -8,30 +10,18 @@ class TestParsers(unittest.TestCase): + def test_parse_export_format(self) -> None: - """Given - * CRE "C1" -> Standard "S1" section "SE1" - * CRE "C2" -> CRE "C3" linktype contains - * CRE "C3" -> "C2" (linktype is part of), Standard "S3" section "SE3" - * CRE "C5" -> Standard "S1" section "SE1" subsection "SBE1" - * CRE "C5" -> Standard "S1" section "SE1" subsection "SBE11" - * CRE "C6" -> Standard "S1" section "SE11", Standard "S2" section "SE22", CRE "C7"(linktype contains) , CRE "C8" (linktype contains) - * Standard "SL" - * Standard "SL2" -> Standard "SLL" - # * CRE "C9" - Expect: - 9 CRES - 9 standards - appropriate links among them based on the arrows above - """ + input_data, expected = data_gen.export_format_data() - result = parse_export_format(input_data) + documents = parse_export_format(input_data) + actual_cres = documents.pop(defs.Credoctypes.CRE.value) + standards = documents self.maxDiff = None - for key, val in result.items(): - # self.assertDictEqual(expected[key].todict(), val.todict()) - expected[key].links = [] - val.links = [] - self.assertDictEqual(val.todict(), expected[key].todict()) + + expected_cres = expected.pop(defs.Credoctypes.CRE) + self.assertListEqual(list(actual_cres), list(expected_cres)) + self.assertDictEqual(expected, standards) def test_parse_hierarchical_export_format(self) -> None: # TODO(northdpole): add a tags linking test diff --git a/application/tests/spreadsheet_test.py b/application/tests/spreadsheet_test.py index 35cc9786f..760547027 100644 --- a/application/tests/spreadsheet_test.py +++ b/application/tests/spreadsheet_test.py @@ -1,13 +1,13 @@ -import tempfile import unittest +import io +import csv +from pprint import pprint from application import create_app, sqla # type: ignore from application.database import db from application.defs import cre_defs as defs -from application.utils.spreadsheet import ( - prepare_spreadsheet, - generate_mapping_template_file, -) +from application.utils.spreadsheet import * +from application.tests.utils.data_gen import export_format_data class TestDB(unittest.TestCase): @@ -21,572 +21,87 @@ def setUp(self) -> None: self.app_context = self.app.app_context() self.app_context.push() sqla.create_all() - self.collection = db.Node_collection() - - def test_prepare_spreadsheet_standards(self) -> None: - """ - Given: - * 1 CRE "CREname" that links to - ** 2 subsections of Standard "ConflictStandName" - ** 2 subsections in standards "NormalStand1" and "NormalStand2" - ** CRE "CREGroup" - * 1 CRE "CREGroup" that links to - ** CRE "CREname" - ** 1 subsection in standard "GroupStand2" - * 1 Standard "LoneStand" - Expect: an array with 5 elements - * 1 element contains the mappings of "CREname" to "NormalStand1", "NormalStand2", "CREGroup" and 1 subsection of "ConflictStandName" - * 1 element contains ONLY the mapping of "CREname" to the remaining subsection of "ConflictStandName" - * 1 element contains the mappings of "CREGroup" to "CREname" and "GroupStand2" - * 1 element contains the entry of "LoneStand" without any mappings - * 1 element contains the entry of "OtherLoneStand" without any mappings - """ + self.collection = db.Node_collection().with_graph() + def test_prepare_spreadsheet_one_cre(self) -> None: collection = self.collection - - dbcre = db.CRE(description="CREdesc", name="CREname", external_id="060-060") - dbgroup = db.CRE( - description="CREGroupDesc", name="CREGroup", external_id="999-999" - ) - collection.session.add(dbcre) - collection.session.add(dbgroup) - collection.session.commit() - collection.session.add(db.InternalLinks(cre=dbcre.id, group=dbgroup.id)) - - conflict1 = db.Node( - subsection="4.5.1", - section="ConflictStandSection", - name="ConflictStandName", - link="https://example.com/1", - ntype="Standard", - ) - conflict2 = db.Node( - subsection="4.5.2", - section="ConflictStandSection", - name="ConflictStandName", - link="https://example.com/2", - ntype="Standard", - ) - collection.session.add(conflict1) - collection.session.add(conflict2) - collection.session.commit() - collection.session.add(db.Links(cre=dbcre.id, node=conflict1.id)) - collection.session.add(db.Links(cre=dbcre.id, node=conflict2.id)) - - dbs1 = db.Node( - subsection="4.5.1", - section="NormalStandSection1", - name="NormalStand1", - link="https://example.com/1", - ntype="Standard", - ) - dbs2 = db.Node( - subsection="4.5.2", - section="NormalStandSection2", - name="NormalStand2", - link="https://example.com/2", - ntype="Standard", - ) - dbsg = db.Node( - subsection="4.5.2", - section="GroupStandSection2", - name="GroupStand2", - link="https://example.com/g2", - ntype="Standard", - ) - dbls1 = db.Node( - subsection="4.5.2", - section="LoneStandSection", - name="LoneStand", - link="https://example.com/ls1", - ntype="Standard", - ) - dbls2 = db.Node( - subsection="4.5.2", - section="OtherLoneStandSection", - name="OtherLoneStand", - link="https://example.com/ls2", - ntype="Standard", - ) - collection.session.add(dbs1) - collection.session.add(dbs2) - collection.session.add(dbsg) - collection.session.add(dbls1) - collection.session.add(dbls2) - collection.session.commit() - collection.session.add(db.Links(cre=dbcre.id, node=dbs1.id)) - collection.session.add(db.Links(cre=dbcre.id, node=dbs2.id)) - collection.session.add(db.Links(cre=dbgroup.id, node=dbsg.id)) - collection.session.commit() - - expected = [ - { - "CRE:description": "CREGroupDesc", - "CRE:id": "999-999", - "CRE:name": "CREGroup", - "Standard:ConflictStandName:hyperlink": None, - "Standard:ConflictStandName:link_type": None, - "Standard:ConflictStandName:section": None, - "Standard:ConflictStandName:subsection": None, - "Standard:GroupStand2:hyperlink": "https://example.com/g2", - "Standard:GroupStand2:link_type": "SAME", - "Standard:GroupStand2:section": "GroupStandSection2", - "Standard:GroupStand2:subsection": "4.5.2", - "Linked_CRE_0:id": "060-060", - "Linked_CRE_0:link_type": "SAME", - "Linked_CRE_0:name": "CREname", - "Standard:LoneStand:hyperlink": None, - "Standard:LoneStand:link_type": None, - "Standard:LoneStand:section": None, - "Standard:LoneStand:subsection": None, - "Standard:NormalStand1:hyperlink": None, - "Standard:NormalStand1:link_type": None, - "Standard:NormalStand1:section": None, - "Standard:NormalStand1:subsection": None, - "Standard:NormalStand2:hyperlink": None, - "Standard:NormalStand2:link_type": None, - "Standard:NormalStand2:section": None, - "Standard:NormalStand2:subsection": None, - "Standard:OtherLoneStand:hyperlink": None, - "Standard:OtherLoneStand:link_type": None, - "Standard:OtherLoneStand:section": None, - "Standard:OtherLoneStand:subsection": None, - }, - { - "CRE:description": "CREdesc", - "CRE:id": "060-060", - "CRE:name": "CREname", - "Standard:ConflictStandName:hyperlink": "https://example.com/1", - "Standard:ConflictStandName:link_type": "SAME", - "Standard:ConflictStandName:section": "ConflictStandSection", - "Standard:ConflictStandName:subsection": "4.5.1", - "Standard:GroupStand2:hyperlink": None, - "Standard:GroupStand2:link_type": None, - "Standard:GroupStand2:section": None, - "Standard:GroupStand2:subsection": None, - "Linked_CRE_0:id": "999-999", - "Linked_CRE_0:link_type": "SAME", - "Linked_CRE_0:name": "CREGroup", - "Standard:LoneStand:hyperlink": None, - "Standard:LoneStand:link_type": None, - "Standard:LoneStand:section": None, - "Standard:LoneStand:subsection": None, - "Standard:NormalStand1:hyperlink": "https://example.com/1", - "Standard:NormalStand1:link_type": "SAME", - "Standard:NormalStand1:section": "NormalStandSection1", - "Standard:NormalStand1:subsection": "4.5.1", - "Standard:NormalStand2:hyperlink": "https://example.com/2", - "Standard:NormalStand2:link_type": "SAME", - "Standard:NormalStand2:section": "NormalStandSection2", - "Standard:NormalStand2:subsection": "4.5.2", - "Standard:OtherLoneStand:hyperlink": None, - "Standard:OtherLoneStand:link_type": None, - "Standard:OtherLoneStand:section": None, - "Standard:OtherLoneStand:subsection": None, - }, - { - "CRE:description": "CREdesc", - "CRE:id": "060-060", - "CRE:name": "CREname", - "Standard:ConflictStandName:hyperlink": "https://example.com/2", - "Standard:ConflictStandName:link_type": "SAME", - "Standard:ConflictStandName:section": "ConflictStandSection", - "Standard:ConflictStandName:subsection": "4.5.2", - "Standard:GroupStand2:hyperlink": None, - "Standard:GroupStand2:link_type": None, - "Standard:GroupStand2:section": None, - "Standard:GroupStand2:subsection": None, - "Linked_CRE_0:id": None, - "Linked_CRE_0:link_type": None, - "Linked_CRE_0:name": None, - "Standard:LoneStand:hyperlink": None, - "Standard:LoneStand:link_type": None, - "Standard:LoneStand:section": None, - "Standard:LoneStand:subsection": None, - "Standard:NormalStand1:hyperlink": None, - "Standard:NormalStand1:link_type": None, - "Standard:NormalStand1:section": None, - "Standard:NormalStand1:subsection": None, - "Standard:NormalStand2:hyperlink": None, - "Standard:NormalStand2:link_type": None, - "Standard:NormalStand2:section": None, - "Standard:NormalStand2:subsection": None, - "Standard:OtherLoneStand:hyperlink": None, - "Standard:OtherLoneStand:link_type": None, - "Standard:OtherLoneStand:section": None, - "Standard:OtherLoneStand:subsection": None, - }, - { - "CRE:description": None, - "CRE:id": None, - "CRE:name": None, - "Standard:ConflictStandName:hyperlink": None, - "Standard:ConflictStandName:link_type": None, - "Standard:ConflictStandName:section": None, - "Standard:ConflictStandName:subsection": None, - "Standard:GroupStand2:hyperlink": None, - "Standard:GroupStand2:link_type": None, - "Standard:GroupStand2:section": None, - "Standard:GroupStand2:subsection": None, - "Linked_CRE_0:id": None, - "Linked_CRE_0:link_type": None, - "Linked_CRE_0:name": None, - "Standard:LoneStand:hyperlink": "https://example.com/ls1", - "Standard:LoneStand:link_type": None, - "Standard:LoneStand:section": "LoneStandSection", - "Standard:LoneStand:subsection": "4.5.2", - "Standard:NormalStand1:hyperlink": None, - "Standard:NormalStand1:link_type": None, - "Standard:NormalStand1:section": None, - "Standard:NormalStand1:subsection": None, - "Standard:NormalStand2:hyperlink": None, - "Standard:NormalStand2:link_type": None, - "Standard:NormalStand2:section": None, - "Standard:NormalStand2:subsection": None, - "Standard:OtherLoneStand:hyperlink": None, - "Standard:OtherLoneStand:link_type": None, - "Standard:OtherLoneStand:section": None, - "Standard:OtherLoneStand:subsection": None, - }, - { - "CRE:description": None, - "CRE:id": None, - "CRE:name": None, - "Standard:ConflictStandName:hyperlink": None, - "Standard:ConflictStandName:link_type": None, - "Standard:ConflictStandName:section": None, - "Standard:ConflictStandName:subsection": None, - "Standard:GroupStand2:hyperlink": None, - "Standard:GroupStand2:link_type": None, - "Standard:GroupStand2:section": None, - "Standard:GroupStand2:subsection": None, - "Linked_CRE_0:id": None, - "Linked_CRE_0:link_type": None, - "Linked_CRE_0:name": None, - "Standard:LoneStand:hyperlink": None, - "Standard:LoneStand:link_type": None, - "Standard:LoneStand:section": None, - "Standard:LoneStand:subsection": None, - "Standard:NormalStand1:hyperlink": None, - "Standard:NormalStand1:link_type": None, - "Standard:NormalStand1:section": None, - "Standard:NormalStand1:subsection": None, - "Standard:NormalStand2:hyperlink": None, - "Standard:NormalStand2:link_type": None, - "Standard:NormalStand2:section": None, - "Standard:NormalStand2:subsection": None, - "Standard:OtherLoneStand:hyperlink": "https://example.com/ls2", - "Standard:OtherLoneStand:link_type": None, - "Standard:OtherLoneStand:section": "OtherLoneStandSection", - "Standard:OtherLoneStand:subsection": "4.5.2", - }, - ] - - result = prepare_spreadsheet( - collection, collection.export(dir=tempfile.mkdtemp()) - ) - self.assertCountEqual(result, expected) - - def test_prepare_spreadsheet_groups(self) -> None: - """Given: - * 1 CRE "CREname" that links to - ** 2 subsections of Standard "ConflictStandName" - ** 2 subsections in standards "NormalStand1" and "NormalStand2" - ** CRE "CREGroup" - * 1 CRE "CREGroup" that links to - ** CRE "CREname" - ** 1 subsection in standard "GroupStand2" - Expect: an array with 3 elements - * 1 element contains the mappings of "CREname" to "NormalStand1", "NormalStand2", "CREGroup" and 1 subsection of "ConflictStandName" - * 1 element contains ONLY the mapping of "CREname" to the remaining subsection of "ConflictStandName" - * 1 element contains the mappings of "CREGroup" to "CREname" and "GroupStand2" - """ - collection = self.collection - - dbcre = db.CRE(description="CREdesc", name="CREname", external_id="060-060") - dbgroup = db.CRE( - description="CREGroupDesc", name="CREGroup", external_id="999-999" - ) - collection.session.add(dbcre) - collection.session.add(dbgroup) - collection.session.commit() - collection.session.add(db.InternalLinks(cre=dbcre.id, group=dbgroup.id)) - - conflict1 = db.Node( - subsection="4.5.1", - section="ConflictStandSection", - name="ConflictStandName", - link="https://example.com/1", - ntype="Standard", - ) - conflict2 = db.Node( - subsection="4.5.2", - section="ConflictStandSection", - name="ConflictStandName", - link="https://example.com/2", - ntype="Standard", - ) - collection.session.add(conflict1) - collection.session.add(conflict2) - collection.session.commit() - collection.session.add(db.Links(cre=dbcre.id, node=conflict1.id)) - collection.session.add(db.Links(cre=dbcre.id, node=conflict2.id)) - - dbs1 = db.Node( - subsection="4.5.1", - section="NormalStandSection1", - name="NormalStand1", - link="https://example.com/1", - ntype="Standard", - ) - dbs2 = db.Node( - subsection="4.5.2", - section="NormalStandSection2", - name="NormalStand2", - link="https://example.com/2", - ntype="Standard", - ) - dbsg = db.Node( - subsection="4.5.2", - section="GroupStandSection2", - name="GroupStand2", - link="https://example.com/g2", - ntype="Standard", - ) - collection.session.add(dbs1) - collection.session.add(dbs2) - collection.session.add(dbsg) - collection.session.commit() - collection.session.add(db.Links(cre=dbcre.id, node=dbs1.id)) - collection.session.add(db.Links(cre=dbcre.id, node=dbs2.id)) - collection.session.add(db.Links(cre=dbgroup.id, node=dbsg.id)) - collection.session.commit() - expected = [ { - "CRE:description": "CREGroupDesc", - "CRE:id": "999-999", - "CRE:name": "CREGroup", - "Standard:ConflictStandName:hyperlink": None, - "Standard:ConflictStandName:link_type": None, - "Standard:ConflictStandName:section": None, - "Standard:ConflictStandName:subsection": None, - "Standard:GroupStand2:hyperlink": "https://example.com/g2", - "Standard:GroupStand2:link_type": "SAME", - "Standard:GroupStand2:section": "GroupStandSection2", - "Standard:GroupStand2:subsection": "4.5.2", - "Linked_CRE_0:id": "060-060", - "Linked_CRE_0:link_type": "SAME", - "Linked_CRE_0:name": "CREname", - "Standard:NormalStand1:hyperlink": None, - "Standard:NormalStand1:link_type": None, - "Standard:NormalStand1:section": None, - "Standard:NormalStand1:subsection": None, - "Standard:NormalStand2:hyperlink": None, - "Standard:NormalStand2:link_type": None, - "Standard:NormalStand2:section": None, - "Standard:NormalStand2:subsection": None, - }, - { - "CRE:description": "CREdesc", - "CRE:id": "060-060", - "CRE:name": "CREname", - "Standard:ConflictStandName:hyperlink": "https://example.com/1", - "Standard:ConflictStandName:link_type": "SAME", - "Standard:ConflictStandName:section": "ConflictStandSection", - "Standard:ConflictStandName:subsection": "4.5.1", - "Standard:GroupStand2:hyperlink": None, - "Standard:GroupStand2:link_type": None, - "Standard:GroupStand2:section": None, - "Standard:GroupStand2:subsection": None, - "Linked_CRE_0:id": "999-999", - "Linked_CRE_0:link_type": "SAME", - "Linked_CRE_0:name": "CREGroup", - "Standard:NormalStand1:hyperlink": "https://example.com/1", - "Standard:NormalStand1:link_type": "SAME", - "Standard:NormalStand1:section": "NormalStandSection1", - "Standard:NormalStand1:subsection": "4.5.1", - "Standard:NormalStand2:hyperlink": "https://example.com/2", - "Standard:NormalStand2:link_type": "SAME", - "Standard:NormalStand2:section": "NormalStandSection2", - "Standard:NormalStand2:subsection": "4.5.2", + "CRE 0": "444-444|CC", }, { - "CRE:description": "CREdesc", - "CRE:id": "060-060", - "CRE:name": "CREname", - "Standard:ConflictStandName:hyperlink": "https://example.com/2", - "Standard:ConflictStandName:link_type": "SAME", - "Standard:ConflictStandName:section": "ConflictStandSection", - "Standard:ConflictStandName:subsection": "4.5.2", - "Standard:GroupStand2:hyperlink": None, - "Standard:GroupStand2:link_type": None, - "Standard:GroupStand2:section": None, - "Standard:GroupStand2:subsection": None, - "Linked_CRE_0:id": None, - "Linked_CRE_0:link_type": None, - "Linked_CRE_0:name": None, - "Standard:NormalStand1:hyperlink": None, - "Standard:NormalStand1:link_type": None, - "Standard:NormalStand1:section": None, - "Standard:NormalStand1:subsection": None, - "Standard:NormalStand2:hyperlink": None, - "Standard:NormalStand2:link_type": None, - "Standard:NormalStand2:section": None, - "Standard:NormalStand2:subsection": None, + "CRE 1": "222-222|CD", }, ] - - result = prepare_spreadsheet( - collection, collection.export(dir=tempfile.mkdtemp()) + cd = defs.CRE(name="CD", description="CD", tags=["td"], id="222-222") + cc = defs.CRE( + name="CC", + description="CC", + links=[ + defs.Link( + document=cd, + ltype=defs.LinkTypes.Contains, + ) + ], + tags=["tc"], + metadata={}, + id="444-444", ) - self.assertDictEqual(result[0], expected[0]) - - def test_prepare_spreadsheet_simple(self) -> None: - """Given: - * 1 CRE "CREname" that links to - ** 2 subsections of Standard "ConflictStandName" - ** 2 subsections in standards "NormalStand0" and "NormalStand1" - Expect: an array with 2 elements - * 1 element contains the mappings of "CREname" to "NormalStand1", "NormalStand0" and 1 subsection of "ConflictStandName" - * 1 element contains ONLY the mapping of "CREname" to the remaining subsection of "ConflictStandName" - """ - # empty string means temporary db - collection = db.Node_collection() - # test 0, single CRE, connects to several standards - # 1 cre maps to the same standard in multiple sections/subsections - cre = defs.CRE(description="CREdesc", name="CREname", id="123-321-0") - conflict0 = defs.Standard( - subsection="4.5.0", - section="ConflictStandSection", - name="ConflictStandName", - hyperlink="https://example.com/0", + collection.add_internal_link( + collection.add_cre(cc), collection.add_cre(cd), type=defs.LinkTypes.Contains ) - conflict1 = defs.Standard( - subsection="4.5.1", - section="ConflictStandSection", - name="ConflictStandName", - hyperlink="https://example.com/1", - ) - s0 = defs.Standard( - subsection="4.5.0", - section="NormalStandSection0", - name="NormalStand0", - hyperlink="https://example.com/0", - ) - s1 = defs.Standard( - subsection="4.5.1", - section="NormalStandSection1", - name="NormalStand1", - hyperlink="https://example.com/1", - ) - dbcre = collection.add_cre(cre) - dbc0 = collection.add_node(conflict0) - dbc1 = collection.add_node(conflict1) - dbs0 = collection.add_node(s0) - dbs1 = collection.add_node(s1) - collection.add_link(dbcre, dbc0) - collection.add_link(dbcre, dbc1) - collection.add_link(dbcre, dbs0) - collection.add_link(dbcre, dbs1) - - expected = [ - { - "CRE:name": "CREname", - "CRE:id": "123-321-0", - "CRE:description": "CREdesc", - "Standard:ConflictStandName:hyperlink": "https://example.com/0", - "Standard:ConflictStandName:link_type": "SAME", - "Standard:ConflictStandName:section": "ConflictStandSection", - "Standard:ConflictStandName:subsection": "4.5.0", - "Standard:NormalStand0:hyperlink": "https://example.com/0", - "Standard:NormalStand0:link_type": "SAME", - "Standard:NormalStand0:section": "NormalStandSection0", - "Standard:NormalStand0:subsection": "4.5.0", - "Standard:NormalStand1:hyperlink": "https://example.com/1", - "Standard:NormalStand1:link_type": "SAME", - "Standard:NormalStand1:section": "NormalStandSection1", - "Standard:NormalStand1:subsection": "4.5.1", - }, - { - "CRE:name": "CREname", - "CRE:id": "123-321-0", - "CRE:description": "CREdesc", - "Standard:ConflictStandName:hyperlink": "https://example.com/1", - "Standard:ConflictStandName:link_type": "SAME", - "Standard:ConflictStandName:section": "ConflictStandSection", - "Standard:ConflictStandName:subsection": "4.5.1", - "Standard:NormalStand0:hyperlink": None, - "Standard:NormalStand0:link_type": None, - "Standard:NormalStand0:section": None, - "Standard:NormalStand0:subsection": None, - "Standard:NormalStand1:hyperlink": None, - "Standard:NormalStand1:link_type": None, - "Standard:NormalStand1:section": None, - "Standard:NormalStand1:subsection": None, - }, - ] - export = collection.export(dry_run=True) - result = prepare_spreadsheet(collection, export) - self.maxDiff = None + result = ExportSheet().prepare_spreadsheet(storage=collection, docs=[cc, cd]) self.assertCountEqual(result, expected) - def test_generate_mapping_template_file(self) -> None: - """ - Given: a CRE structure with 4 depth levels and 2 root cres - prepare a staggered csv accordingly - """ - # empty string means temporary db - collection = db.Node_collection().with_graph() - roots = [] - for j in range(2): - root = defs.CRE(description=f"root{j}", name=f"root{j}", id=f"123-30{j}") - db_root = collection.add_cre(root) - roots.append(root) - previous_db = db_root - previous_cre = root - - for i in range(4): - c = defs.CRE( - description=f"CREdesc{j}-{i}", - name=f"CREname{j}-{i}", - id=f"123-4{j}{i}", - ) - dbcre = collection.add_cre(c) - collection.add_internal_link( - higher=previous_db, lower=dbcre, type=defs.LinkTypes.Contains - ) - previous_cre.add_link( - defs.Link(document=c, ltype=defs.LinkTypes.Contains) - ) - previous_cre = c - previous_db = dbcre - csv = generate_mapping_template_file(database=collection, docs=roots) - self.assertEqual( - csv, - [ - { - "CRE 0": "", - "CRE 1": "", - "CRE 2": "", - "CRE 3": "", - "CRE 4": "", - }, - {"CRE 0": "123-300|root0"}, - {"CRE 1": "123-400|CREname0-0"}, - {"CRE 2": "123-401|CREname0-1"}, - {"CRE 3": "123-402|CREname0-2"}, - {"CRE 4": "123-403|CREname0-3"}, - {"CRE 0": "123-301|root1"}, - {"CRE 1": "123-410|CREname1-0"}, - {"CRE 2": "123-411|CREname1-1"}, - {"CRE 3": "123-412|CREname1-2"}, - {"CRE 4": "123-413|CREname1-3"}, - ], - ) + def test_prepare_spreadsheet_empty(self) -> None: + collection = self.collection + expected = [] + result = ExportSheet().prepare_spreadsheet(storage=collection, docs=[]) + self.assertCountEqual(result, expected) + def test_prepare_spreadsheet(self) -> None: + collection = self.collection + expected, inputDocs = export_format_data() + importItems = [] + for name, items in inputDocs.items(): + for item in items: + importItems.append(item) + if name == defs.Credoctypes.CRE: + dbitem = collection.add_cre(item) + else: + dbitem = collection.add_node(item) + for link in item.links: + if link.document.doctype == defs.Credoctypes.CRE: + linked_item = collection.add_cre(link.document) + if item.doctype == defs.Credoctypes.CRE: + collection.add_internal_link( + dbitem, linked_item, type=link.ltype + ) + else: + collection.add_link( + node=dbitem, cre=linked_item, type=link.ltype + ) + else: + linked_item = collection.add_node(link.document) + if item.doctype == defs.Credoctypes.CRE: + collection.add_link( + cre=dbitem, node=linked_item, type=link.ltype + ) + else: + collection.add_internal_link( + cre=linked_item, node=dbitem, type=link.ltype + ) + result = ExportSheet().prepare_spreadsheet(docs=importItems, storage=collection) + + output = io.StringIO() + header = expected[0].keys() + writer = csv.DictWriter(output, fieldnames=header) + writer.writeheader() + for row in result: + writer.writerow(row) + out = output.getvalue().splitlines() + result = list(csv.DictReader(out)) -if __name__ == "__main__": - unittest.main() + self.assertCountEqual(result, expected) diff --git a/application/tests/utils/data_gen.py b/application/tests/utils/data_gen.py index 369dfb5a4..c86da45b2 100644 --- a/application/tests/utils/data_gen.py +++ b/application/tests/utils/data_gen.py @@ -516,431 +516,641 @@ def root_csv_minimum_data(): def export_format_data(): input_data = [ { - "CRE:description": "C1 description", - "CRE:id": "111-111", - "CRE:name": "C1", - "Standard:S1:hyperlink": "https://example.com/S1", - "Standard:S1:link_type": "Linked To", - "Standard:S1:section": "SE1", - "Standard:S1:subsection": "SBE1", - "Tool:S2:hyperlink": "", - "Tool:S2:link_type": "", - "Tool:S2:description": "", - "Tool:S2:ToolType": "", - "Code:S3:hyperlink": "", - "Code:S3:link_type": "", - "Code:S3:description": "", - "Linked_CRE_0:id": "", - "Linked_CRE_0:link_type": "", - "Linked_CRE_0:name": "", - "Linked_CRE_1:id": "", - "Linked_CRE_1:link_type": "", - "Linked_CRE_1:name": "", - "SL:hyperlink": "", - "SL:link_type": "", - "SL:section": "", - "SL:subsection": "", - "SL2:hyperlink": "", - "SL2:link_type": "", - "SL2:section": "", - "SL2:subsection": "", - "SLL:hyperlink": "", - "SLL:link_type": "", - "SLL:section": "", - "SLL:subsection": "", + "CRE 0": "000-001|C0", + "CRE 1": "", + "CRE 2": "", + "CRE 3": "", + "CRE 4": "", + "CRE 5": "", + "S1|description": "SE1 description", + "S1|hyperlink": "https://example.com/S1", + "S1|id": "id1", + "S1|name": "SE1", + "S2|description": "", + "S2|hyperlink": "", + "S2|id": "", + "S2|name": "", + "S3|description": "", + "S3|hyperlink": "", + "S3|id": "", + "S3|name": "", + "S4|description": "", + "S4|hyperlink": "", + "S4|id": "", + "S4|name": "", + "S5|description": "", + "S5|hyperlink": "", + "S5|id": "", + "S5|name": "", + "SL2|hyperlink": "", + "SL2|id": "", + "SL2|name": "", + "SLL|hyperlink": "", + "SLL|id": "", + "SLL|name": "", + "SL|hyperlink": "", + "SL|id": "", + "SL|name": "", }, { - "CRE:description": "C2 description", - "CRE:id": "222-222", - "CRE:name": "C2", - "Standard:S1:hyperlink": "", - "Standard:S1:link_type": "", - "Standard:S1:section": "", - "Standard:S1:subsection": "", - "Tool:S2:hyperlink": "", - "Tool:S2:link_type": "", - "Tool:S2:description": "", - "Tool:S2:ToolType": "", - "Code:S3:hyperlink": "", - "Code:S3:link_type": "", - "Code:S3:description": "", - "Linked_CRE_0:id": "333-333", - "Linked_CRE_0:link_type": "Contains", - "Linked_CRE_0:name": "C3", - "Linked_CRE_1:id": "", - "Linked_CRE_1:link_type": "", - "Linked_CRE_1:name": "", - "SL:hyperlink": "", - "SL:link_type": "", - "SL:section": "", - "SL:subsection": "", - "SL2:hyperlink": "", - "SL2:link_type": "", - "SL2:section": "", - "SL2:subsection": "", - "SLL:hyperlink": "", - "SLL:link_type": "", - "SLL:section": "", - "SLL:subsection": "", + "CRE 0": "", + "CRE 1": "222-222|C2", + "CRE 2": "", + "CRE 3": "", + "CRE 4": "", + "CRE 5": "", + "S1|description": "", + "S1|hyperlink": "", + "S1|id": "", + "S1|name": "", + "S2|description": "", + "S2|hyperlink": "", + "S2|id": "", + "S2|name": "", + "S3|description": "", + "S3|hyperlink": "", + "S3|id": "", + "S3|name": "", + "S4|description": "", + "S4|hyperlink": "", + "S4|id": "", + "S4|name": "", + "S5|description": "", + "S5|hyperlink": "", + "S5|id": "", + "S5|name": "", + "SL2|hyperlink": "", + "SL2|id": "", + "SL2|name": "", + "SLL|hyperlink": "", + "SLL|id": "", + "SLL|name": "", + "SL|hyperlink": "", + "SL|id": "", + "SL|name": "", }, { - "CRE:description": "C3 description", - "CRE:id": "333-333", - "CRE:name": "C3", - "Standard:S1:hyperlink": "", - "Standard:S1:link_type": "", - "Standard:S1:section": "", - "Standard:S1:subsection": "", - "Tool:S2:hyperlink": "", - "Tool:S2:link_type": "", - "Tool:S2:description": "", - "Tool:S2:ToolType": "", - "Code:S3:hyperlink": "https://example.com/S3", - "Code:S3:link_type": "Linked To", - "Code:S3:description": "SE3", - "Linked_CRE_0:id": "222-222", - "Linked_CRE_0:link_type": "Is Part Of", - "Linked_CRE_0:name": "C2", - "Linked_CRE_1:id": "", - "Linked_CRE_1:link_type": "", - "Linked_CRE_1:name": "", - "SL:hyperlink": "", - "SL:link_type": "", - "SL:section": "", - "SL:subsection": "", - "SL2:hyperlink": "", - "SL2:link_type": "", - "SL2:section": "", - "SL2:subsection": "", - "SLL:hyperlink": "", - "SLL:link_type": "", - "SLL:section": "", - "SLL:subsection": "", + "CRE 0": "", + "CRE 1": "", + "CRE 2": "333-333|C3", + "CRE 3": "", + "CRE 4": "", + "CRE 5": "", + "S1|description": "", + "S1|hyperlink": "", + "S1|id": "", + "S1|name": "", + "S2|description": "", + "S2|hyperlink": "", + "S2|id": "", + "S2|name": "", + "S3|description": "SE3", + "S3|hyperlink": "https://example.com/S3", + "S3|id": "5.3", + "S3|name": "SE3", + "S4|description": "", + "S4|hyperlink": "", + "S4|id": "", + "S4|name": "", + "S5|description": "", + "S5|hyperlink": "", + "S5|id": "", + "S5|name": "", + "SL2|hyperlink": "", + "SL2|id": "", + "SL2|name": "", + "SLL|hyperlink": "", + "SLL|id": "", + "SLL|name": "", + "SL|hyperlink": "", + "SL|id": "", + "SL|name": "", }, { - "CRE:description": "C5 description", - "CRE:id": "555-555", - "CRE:name": "C5", - "Standard:S1:hyperlink": "https://example.com/S1", - "Standard:S1:link_type": "Linked To", - "Standard:S1:section": "SE1", - "Standard:S1:subsection": "SBE1", - "Tool:S2:hyperlink": "", - "Tool:S2:link_type": "", - "Tool:S2:description": "", - "Tool:S2:ToolType": "", - "Code:S3:hyperlink": "", - "Code:S3:link_type": "", - "Code:S3:description": "", - "Linked_CRE_0:id": "", - "Linked_CRE_0:link_type": "", - "Linked_CRE_0:name": "", - "Linked_CRE_1:id": "", - "Linked_CRE_1:link_type": "", - "Linked_CRE_1:name": "", - "SL:hyperlink": "", - "SL:link_type": "", - "SL:section": "", - "SL:subsection": "", - "SL2:hyperlink": "", - "SL2:link_type": "", - "SL2:section": "", - "SL2:subsection": "", - "SLL:hyperlink": "", - "SLL:link_type": "", - "SLL:section": "", - "SLL:subsection": "", + "CRE 0": "", + "CRE 1": "", + "CRE 2": "333-333|C3", + "CRE 3": "", + "CRE 4": "", + "CRE 5": "", + "S1|description": "", + "S1|hyperlink": "", + "S1|id": "", + "S1|name": "", + "S2|description": "", + "S2|hyperlink": "", + "S2|id": "", + "S2|name": "", + "S3|description": "SE3 double", + "S3|hyperlink": "https://example.com/S3.4", + "S3|id": "5.3.4", + "S3|name": "SE3.4", + "S4|description": "", + "S4|hyperlink": "", + "S4|id": "", + "S4|name": "", + "S5|description": "", + "S5|hyperlink": "", + "S5|id": "", + "S5|name": "", + "SL2|hyperlink": "", + "SL2|id": "", + "SL2|name": "", + "SLL|hyperlink": "", + "SLL|id": "", + "SLL|name": "", + "SL|hyperlink": "", + "SL|id": "", + "SL|name": "", }, { - "CRE:description": "C5 description", - "CRE:id": "555-555", - "CRE:name": "C5", - "Standard:S1:hyperlink": "https://example.com/S1", - "Standard:S1:link_type": "Linked To", - "Standard:S1:section": "SE1", - "Standard:S1:subsection": "SBE11", - "Tool:S2:hyperlink": "", - "Tool:S2:link_type": "", - "Tool:S2:description": "", - "Tool:S2:ToolType": "", - "Code:S3:hyperlink": "", - "Code:S3:link_type": "", - "Code:S3:description": "", - "Linked_CRE_0:id": "", - "Linked_CRE_0:link_type": "", - "Linked_CRE_0:name": "", - "Linked_CRE_1:id": "", - "Linked_CRE_1:link_type": "", - "Linked_CRE_1:name": "", - "SL:hyperlink": "", - "SL:link_type": "", - "SL:section": "", - "SL:subsection": "", - "SL2:hyperlink": "", - "SL2:link_type": "", - "SL2:section": "", - "SL2:subsection": "", - "SLL:hyperlink": "", - "SLL:link_type": "", - "SLL:section": "", - "SLL:subsection": "", + "CRE 0": "", + "CRE 1": "", + "CRE 2": "", + "CRE 3": "444-444|C4", + "CRE 4": "", + "CRE 5": "", + "S1|description": "SE1 description", + "S1|hyperlink": "https://example.com/S1", + "S1|id": "id1", + "S1|name": "SE1", + "S2|description": "", + "S2|hyperlink": "", + "S2|id": "", + "S2|name": "", + "S3|description": "", + "S3|hyperlink": "", + "S3|id": "", + "S3|name": "", + "S4|description": "", + "S4|hyperlink": "", + "S4|id": "", + "S4|name": "", + "S5|description": "", + "S5|hyperlink": "", + "S5|id": "", + "S5|name": "", + "SL2|hyperlink": "", + "SL2|id": "", + "SL2|name": "", + "SLL|hyperlink": "", + "SLL|id": "", + "SLL|name": "", + "SL|hyperlink": "", + "SL|id": "", + "SL|name": "", }, { - "CRE:description": "C6 description", - "CRE:id": "666-666", - "CRE:name": "C6", - "Standard:S1:hyperlink": "https://example.com/S1", - "Standard:S1:link_type": "Linked To", - "Standard:S1:section": "SE1", - "Standard:S1:subsection": "SBE11", - "Tool:S2:hyperlink": "https://example.com/S2", - "Tool:S2:link_type": "Linked To", - "Tool:S2:description": "SE2", - "Tool:S2:ToolType": "Offensive", - "Tool:S2:SectionID": "0", - "Tool:S2:section": "rule-0", - "Code:S3:hyperlink": "", - "Code:S3:link_type": "", - "Code:S3:description": "", - "Linked_CRE_0:id": "777-777", - "Linked_CRE_0:link_type": "Contains", - "Linked_CRE_0:name": "C7", - "Linked_CRE_1:id": "888-888", - "Linked_CRE_1:link_type": "Contains", - "Linked_CRE_1:name": "C8", - "SL:hyperlink": "", - "SL:link_type": "", - "SL:section": "", - "SL:subsection": "", - "SL2:hyperlink": "", - "SL2:link_type": "", - "SL2:section": "", - "SL2:subsection": "", - "SLL:hyperlink": "", - "SLL:link_type": "", - "SLL:section": "", - "SLL:subsection": "", + "CRE 0": "", + "CRE 1": "", + "CRE 2": "", + "CRE 3": "", + "CRE 4": "555-555|C5", + "CRE 5": "", + "S1|description": "SE1 description", + "S1|hyperlink": "https://example.com/S1", + "S1|id": "id1", + "S1|name": "SE1", + "S2|description": "", + "S2|hyperlink": "", + "S2|id": "", + "S2|name": "", + "S3|description": "", + "S3|hyperlink": "", + "S3|id": "", + "S3|name": "", + "S4|description": "", + "S4|hyperlink": "", + "S4|id": "", + "S4|name": "", + "S5|description": "", + "S5|hyperlink": "", + "S5|id": "", + "S5|name": "", + "SL2|hyperlink": "", + "SL2|id": "", + "SL2|name": "", + "SLL|hyperlink": "", + "SLL|id": "", + "SLL|name": "", + "SL|hyperlink": "", + "SL|id": "", + "SL|name": "", }, { - "CRE:description": "", - "CRE:id": "", - "CRE:name": "", - "Standard:S1:hyperlink": "", - "Standard:S1:link_type": "", - "Standard:S1:section": "", - "Standard:S1:subsection": "", - "S2:hyperlink": "", - "S2:link_type": "", - "S2:section": "", - "S2:subsection": "", - "Code:S3:hyperlink": "", - "Code:S3:link_type": "", - "Code:S3:description": "", - "Linked_CRE_0:id": "", - "Linked_CRE_0:link_type": "", - "Linked_CRE_0:name": "", - "Linked_CRE_1:id": "", - "Linked_CRE_1:link_type": "", - "Linked_CRE_1:name": "", - "SL:hyperlink": "https://example.com/SL", - "SL:link_type": "", - "SL:section": "SSL", - "SL:subsection": "SBESL", - "SL2:hyperlink": "", - "SL2:link_type": "", - "SL2:section": "", - "SL2:subsection": "", - "SLL:hyperlink": "", - "SLL:link_type": "", - "SLL:section": "", - "SLL:subsection": "", + "CRE 0": "666-666|C6", + "CRE 1": "", + "CRE 2": "", + "CRE 3": "", + "CRE 4": "", + "CRE 5": "", + "S1|description": "", + "S1|hyperlink": "", + "S1|id": "", + "S1|name": "", + "S2|description": "", + "S2|hyperlink": "", + "S2|id": "", + "S2|name": "", + "S3|description": "", + "S3|hyperlink": "", + "S3|id": "", + "S3|name": "", + "S4|description": "", + "S4|hyperlink": "", + "S4|id": "", + "S4|name": "", + "S5|description": "", + "S5|hyperlink": "", + "S5|id": "", + "S5|name": "", + "SL2|hyperlink": "", + "SL2|id": "", + "SL2|name": "", + "SLL|hyperlink": "", + "SLL|id": "", + "SLL|name": "", + "SL|hyperlink": "", + "SL|id": "", + "SL|name": "", }, { - "CRE:description": "", - "CRE:id": "", - "CRE:name": "", - "Standard:S1:hyperlink": "", - "Standard:S1:link_type": "", - "Standard:S1:section": "", - "Standard:S1:subsection": "", - "S2:hyperlink": "", - "S2:link_type": "", - "S2:section": "", - "S2:subsection": "", - "Code:S3:hyperlink": "", - "Code:S3:link_type": "", - "Code:S3:description": "", - "Linked_CRE_0:id": "", - "Linked_CRE_0:link_type": "", - "Linked_CRE_0:name": "", - "Linked_CRE_1:id": "", - "Linked_CRE_1:link_type": "", - "Linked_CRE_1:name": "", - "SL:hyperlink": "", - "SL:link_type": "", - "SL:section": "", - "SL:subsection": "SESL", - "SL2:hyperlink": "https://example.com/SL2", - "SL2:link_type": "", - "SL2:section": "SSL2", - "SL2:subsection": "SBESL2", - "SLL:hyperlink": "https://example.com/SLL", - "SLL:link_type": "SAM", - "SLL:section": "SSLL", - "SLL:subsection": "SBESLL", + "CRE 0": "777-777|C7", + "CRE 1": "", + "CRE 2": "", + "CRE 3": "", + "CRE 4": "", + "CRE 5": "", + "S1|description": "", + "S1|hyperlink": "", + "S1|id": "", + "S1|name": "", + "S2|description": "", + "S2|hyperlink": "", + "S2|id": "", + "S2|name": "", + "S3|description": "", + "S3|hyperlink": "", + "S3|id": "", + "S3|name": "", + "S4|description": "", + "S4|hyperlink": "", + "S4|id": "", + "S4|name": "", + "S5|description": "", + "S5|hyperlink": "", + "S5|id": "", + "S5|name": "", + "SL2|hyperlink": "", + "SL2|id": "", + "SL2|name": "", + "SLL|hyperlink": "", + "SLL|id": "", + "SLL|name": "", + "SL|hyperlink": "", + "SL|id": "", + "SL|name": "", + }, + { + "CRE 0": "888-888|C8", + "CRE 1": "", + "CRE 2": "", + "CRE 3": "", + "CRE 4": "", + "CRE 5": "", + "S1|description": "", + "S1|hyperlink": "", + "S1|id": "", + "S1|name": "", + "S2|description": "", + "S2|hyperlink": "", + "S2|id": "", + "S2|name": "", + "S3|description": "", + "S3|hyperlink": "", + "S3|id": "", + "S3|name": "", + "S4|description": "", + "S4|hyperlink": "", + "S4|id": "", + "S4|name": "", + "S5|description": "", + "S5|hyperlink": "", + "S5|id": "", + "S5|name": "", + "SL2|hyperlink": "", + "SL2|id": "", + "SL2|name": "", + "SLL|hyperlink": "", + "SLL|id": "", + "SLL|name": "", + "SL|hyperlink": "", + "SL|id": "", + "SL|name": "", + }, + { + "CRE 0": "", + "CRE 1": "", + "CRE 2": "", + "CRE 3": "", + "CRE 4": "", + "CRE 5": "", + "S1|description": "", + "S1|hyperlink": "", + "S1|id": "", + "S1|name": "", + "S2|description": "", + "S2|hyperlink": "", + "S2|id": "", + "S2|name": "", + "S3|description": "", + "S3|hyperlink": "", + "S3|id": "", + "S3|name": "", + "S4|description": "", + "S4|hyperlink": "", + "S4|id": "", + "S4|name": "", + "S5|description": "", + "S5|hyperlink": "", + "S5|id": "", + "S5|name": "", + "SL2|hyperlink": "", + "SL2|id": "", + "SL2|name": "", + "SLL|hyperlink": "", + "SLL|id": "", + "SLL|name": "", + "SL|hyperlink": "https://example.com/SL", + "SL|id": "slid", + "SL|name": "SSL", + }, + { + "CRE 0": "", + "CRE 1": "", + "CRE 2": "", + "CRE 3": "", + "CRE 4": "", + "CRE 5": "", + "S1|description": "", + "S1|hyperlink": "", + "S1|id": "", + "S1|name": "", + "S2|description": "", + "S2|hyperlink": "", + "S2|id": "", + "S2|name": "", + "S3|description": "", + "S3|hyperlink": "", + "S3|id": "", + "S3|name": "", + "S4|description": "", + "S4|hyperlink": "", + "S4|id": "", + "S4|name": "", + "S5|description": "", + "S5|hyperlink": "", + "S5|id": "", + "S5|name": "", + "SL2|hyperlink": "https://example.com/SL2", + "SL2|id": "sl2id", + "SL2|name": "SSL2", + "SLL|hyperlink": "", + "SLL|id": "", + "SLL|name": "", + "SL|hyperlink": "", + "SL|id": "", + "SL|name": "", + }, + { + "CRE 0": "", + "CRE 1": "", + "CRE 2": "", + "CRE 3": "", + "CRE 4": "", + "CRE 5": "", + "S1|description": "", + "S1|hyperlink": "", + "S1|id": "", + "S1|name": "", + "S2|description": "", + "S2|hyperlink": "", + "S2|id": "", + "S2|name": "", + "S3|description": "", + "S3|hyperlink": "", + "S3|id": "", + "S3|name": "", + "S4|description": "", + "S4|hyperlink": "", + "S4|id": "", + "S4|name": "", + "S5|description": "", + "S5|hyperlink": "", + "S5|id": "", + "S5|name": "", + "SL2|hyperlink": "", + "SL2|id": "", + "SL2|name": "", + "SLL|hyperlink": "https://example.com/SLL", + "SLL|id": "SBESLL", + "SLL|name": "SSLL", + "SL|hyperlink": "", + "SL|id": "", + "SL|name": "", }, ] expected = { - "C1": defs.CRE( - id="111-111", - description="C1 description", - name="C1", - links=[ - defs.Link( - ltype=defs.LinkTypes.LinkedTo, - document=defs.Standard( - name="S1", - section="SE1", - subsection="SBE1", - hyperlink="https://example.com/S1", + defs.Credoctypes.CRE.value: [ + defs.CRE( + id="000-001", + name="C0", + links=[ + defs.Link( + ltype=defs.LinkTypes.LinkedTo, + document=defs.Standard( + name="S1", + section="SE1", + sectionID="id1", + hyperlink="https://example.com/S1", + description="SE1 description", + ), ), - ) - ], - ), - "C2": defs.CRE( - id="222-222", - description="C2 description", - name="C2", - links=[ - defs.Link( - ltype=defs.LinkTypes.Contains, - document=defs.CRE(id="333-333", name="C3"), - ) - ], - ), - "C3": defs.CRE( - id="333-333", - description="C3 description", - name="C3", - links=[ - defs.Link( - ltype=defs.LinkTypes.PartOf, - document=defs.CRE( - id="222-222", description="C2 description", name="C2" + defs.Link( + ltype=defs.LinkTypes.Contains, + document=defs.CRE( + id="222-222", + name="C2", + ), ), - ), - defs.Link( - ltype=defs.LinkTypes.LinkedTo, - document=defs.Code( - name="S3", - description="SE3", - hyperlink="https://example.com/S3", + ], + ), + defs.CRE( + id="222-222", + name="C2", + links=[ + defs.Link( + ltype=defs.LinkTypes.Contains, + document=defs.CRE(id="333-333", name="C3"), + ) + ], + ), + defs.CRE( + id="333-333", + name="C3", + links=[ + defs.Link( + ltype=defs.LinkTypes.LinkedTo, + document=defs.Standard( + name="S3", + section="SE3", + sectionID="5.3", + hyperlink="https://example.com/S3", + description="SE3", + ), ), - ), - ], - ), - "C5": defs.CRE( - id="555-555", - description="C5 description", - name="C5", - links=[ - defs.Link( - ltype=defs.LinkTypes.LinkedTo, - document=defs.Standard( - name="S1", - section="SE1", - subsection="SBE1", - hyperlink="https://example.com/S1", + defs.Link( + ltype=defs.LinkTypes.LinkedTo, + document=defs.Standard( + name="S3", + section="SE3.4", + sectionID="5.3.4", + hyperlink="https://example.com/S3.4", + description="SE3 double", + ), ), - ), - defs.Link( - ltype=defs.LinkTypes.LinkedTo, - document=defs.Standard( - name="S1", - section="SE1", - subsection="SBE11", - hyperlink="https://example.com/S1", + defs.Link( + ltype=defs.LinkTypes.Contains, + document=defs.CRE(id="444-444", name="C4"), ), - ), - ], - ), - "C6": defs.CRE( - id="666-666", - description="C6 description", - name="C6", - links=[ - defs.Link( - ltype=defs.LinkTypes.LinkedTo, - document=defs.Tool( - name="S2", - section="rule-0", - sectionID="0", - tooltype=defs.ToolTypes.Offensive, - description="SE2", - hyperlink="https://example.com/S2", + ], + ), + defs.CRE( + id="444-444", + name="C4", + links=[ + defs.Link( + ltype=defs.LinkTypes.LinkedTo, + document=defs.Standard( + name="S1", + section="SE1", + sectionID="id1", + description="SE1 description", + hyperlink="https://example.com/S1", + ), ), - ), - defs.Link( - ltype=defs.LinkTypes.LinkedTo, - document=defs.Standard( - name="S1", - section="SE1", - subsection="SBE11", - hyperlink="https://example.com/S1", + defs.Link( + ltype=defs.LinkTypes.Contains, + document=defs.CRE(id="555-555", name="C5"), ), - ), - defs.Link( - ltype=defs.LinkTypes.Contains, - document=defs.CRE(id="777-777", name="C7"), - ), - defs.Link( - ltype=defs.LinkTypes.Contains, - document=defs.CRE(id="888-888", name="C8"), - ), - ], - ), - "C7": defs.CRE( - id="777-777", - name="C7", - links=[ - defs.Link( - ltype=defs.LinkTypes.PartOf, - document=defs.CRE( - id="666-666", description="C6 description", name="C6" + ], + ), + defs.CRE( + id="555-555", + name="C5", + links=[ + defs.Link( + ltype=defs.LinkTypes.LinkedTo, + document=defs.Standard( + name="S1", + section="SE1", + sectionID="id1", + description="SE1 description", + hyperlink="https://example.com/S1", + ), ), - ) - ], - ), - "C8": defs.CRE( - id="888-888", - name="C8", - links=[ - defs.Link( - ltype=defs.LinkTypes.PartOf, - document=defs.CRE( - id="666-666", description="C6 description", name="C6" + ], + ), + defs.CRE( + id="666-666", + name="C6", + ), + defs.CRE( + id="777-777", + name="C7", + ), + defs.CRE( + id="888-888", + name="C8", + ), + ], + "S1": [ + defs.Standard( + name="S1", + section="SE1", + description="SE1 description", + sectionID="id1", + hyperlink="https://example.com/S1", + links=[ + defs.Link( + ltype=defs.LinkTypes.LinkedTo, + document=defs.CRE(id="000-001", name="C0"), + ), + defs.Link( + ltype=defs.LinkTypes.LinkedTo, + document=defs.CRE(id="444-444", name="C4"), + ), + defs.Link( + ltype=defs.LinkTypes.LinkedTo, + document=defs.CRE(id="555-555", name="C5"), + ), + ], + ) + ], + "S3": [ + defs.Standard( + name="S3", + section="SE3", + description="SE3", + sectionID="5.3", + hyperlink="https://example.com/S3", + links=[ + defs.Link( + ltype=defs.LinkTypes.LinkedTo, + document=defs.CRE(id="333-333", name="C3"), ), - ) - ], - ), - "SL2:SSL2": defs.Standard( - name="SL2", - section="SSL2", - subsection="SBESL2", - hyperlink="https://example.com/SL2", - ), - "SL:SSL": defs.Standard( - name="SL", - section="SSL", - subsection="SBESL", - hyperlink="https://example.com/SL", - ), - "SLL:SSLL": defs.Standard( - name="SLL", - section="SSLL", - subsection="SBESLL", - hyperlink="https://example.com/SLL", - ), + ], + ), + defs.Standard( + name="S3", + section="SE3.4", + description="SE3 double", + sectionID="5.3.4", + hyperlink="https://example.com/S3.4", + links=[ + defs.Link( + ltype=defs.LinkTypes.LinkedTo, + document=defs.CRE(id="333-333", name="C3"), + ), + ], + ), + ], + "SL": [ + defs.Standard( + name="SL", + section="SSL", + description="", + sectionID="slid", + hyperlink="https://example.com/SL", + ) + ], + "SL2": [ + defs.Standard( + name="SL2", + section="SSL2", + sectionID="sl2id", + hyperlink="https://example.com/SL2", + ) + ], + "SLL": [ + defs.Standard( + name="SLL", + description="", + section="SSLL", + hyperlink="https://example.com/SLL", + sectionID="SBESLL", + ) + ], } return input_data, expected diff --git a/application/tests/web_main_test.py b/application/tests/web_main_test.py index 2b54dfe24..0f79e68d9 100644 --- a/application/tests/web_main_test.py +++ b/application/tests/web_main_test.py @@ -1,3 +1,4 @@ +from pprint import pprint import io import csv import random @@ -5,17 +6,20 @@ import re import json import unittest +import tempfile from unittest.mock import patch + import redis import rq import os from application import create_app, sqla # type: ignore +from application.tests.utils import data_gen from application.database import db +from application.utils import spreadsheet from application.defs import cre_defs as defs from application.web import web_main from application.utils.gap_analysis import GAP_ANALYSIS_TIMEOUT -from application.utils import spreadsheet class MockJob: @@ -109,7 +113,6 @@ def test_extend_cre_with_tag_links(self) -> None: def test_find_by_id(self) -> None: collection = db.Node_collection().with_graph() - # collection.graph.graph = db.CRE_Graph.load_cre_graph(sqla.session) cres = { "ca": defs.CRE(id="111-111", description="CA", name="CA", tags=["ta"]), @@ -145,18 +148,6 @@ def test_find_by_id(self) -> None: self.assertEqual(json.loads(response.data.decode()), expected) self.assertEqual(200, response.status_code) - # osib_response = client.get( - # f"/rest/v1/id/{cres['cb'].id}?osib=true", - # headers={"Content-Type": "application/json"}, - # ) - # osib_expected = { - # "data": cres["cb"].todict(), - # "osib": osib_defs.cre2osib([cres["cb"]]).todict(), - # } - - # self.assertEqual(json.loads(osib_response.data.decode()), osib_expected) - # self.assertEqual(200, osib_response.status_code) - md_expected = "
CRE---[222-222CD](https://www.opencre.org/cre/222-222),[111-111CA](https://www.opencre.org/cre/111-111),[333-333CB](https://www.opencre.org/cre/333-333)
" md_response = client.get( f"/rest/v1/id/{cres['cd'].id}?format=md", @@ -208,17 +199,6 @@ def test_find_by_name(self) -> None: self.assertEqual(200, response.status_code) self.assertEqual(json.loads(response.data.decode()), expected) - # osib_response = client.get( - # f"/rest/v1/name/{cres['cb'].name}?osib=true", - # headers={"Content-Type": "application/json"}, - # ) - # osib_expected = { - # "data": cres["cb"].todict(), - # "osib": osib_defs.cre2osib([cres["cb"]]).todict(), - # } - # self.assertEqual(json.loads(osib_response.data.decode()), osib_expected) - # self.assertEqual(200, osib_response.status_code) - md_expected = "
CRE---[222-222CD](https://www.opencre.org/cre/222-222),[111-111CA](https://www.opencre.org/cre/111-111),[333-333CB](https://www.opencre.org/cre/333-333),[444-444CC](https://www.opencre.org/cre/444-444)
" md_response = client.get( f"/rest/v1/name/{cres['cd'].name}?format=md", @@ -378,16 +358,6 @@ def test_find_node_by_name(self) -> None: ) self.assertEqual(200, non_standards_response.status_code) - # osib_expected = { - # "total_pages": 1, - # "page": 1, - # "standards": [nodes["c0"].todict()], - # "osib": osib_defs.cre2osib([nodes["c0"]]).todict(), - # } - # osib_response = client.get(f"/rest/v1/code/{nodes['c0'].name}?osib=true") - # self.assertEqual(json.loads(osib_response.data.decode()), osib_expected) - # self.assertEqual(200, osib_response.status_code) - md_expected = "
C0--[C0](https://example.com/c0)
" md_response = client.get(f"/rest/v1/code/{nodes['c0'].name}?format=md") self.assertEqual(re.sub("\s", "", md_response.data.decode()), md_expected) @@ -427,17 +397,6 @@ def test_find_document_by_tag(self) -> None: self.assertEqual(200, response.status_code) self.assertCountEqual(json.loads(response.data.decode()), expected) - # osib_response = client.get( - # f"/rest/v1/tags?tag=tb&tag=tc&osib=true", - # headers={"Content-Type": "application/json"}, - # ) - # osib_expected = { - # "data": cres["cb"].todict(), - # "osib": osib_defs.cre2osib([cres["cb"]]).todict(), - # } - # self.assertCountEqual(osib_response.json, osib_expected) - # self.assertEqual(200, osib_response.status_code) - def test_test_search(self) -> None: collection = db.Node_collection() docs = { @@ -517,17 +476,6 @@ def test_find_root_cres(self) -> None: self.assertEqual(json.loads(response.data.decode()), expected) self.assertEqual(200, response.status_code) - # osib_response = client.get( - # "/rest/v1/root_cres?osib=true", - # headers={"Content-Type": "application/json"}, - # ) - # osib_expected = { - # "data": [cres["ca"].todict(), cres["cb"].todict()], - # "osib": osib_defs.cre2osib([cres["ca"], cres["cb"]]).todict(), - # } - # self.assertEqual(json.loads(osib_response.data.decode()), osib_expected) - # self.assertEqual(200, osib_response.status_code) - def test_smartlink(self) -> None: self.maxDiff = None collection = db.Node_collection().with_graph() @@ -886,6 +834,46 @@ def test_all_cres(self, db_mock) -> None: json.loads(response.data), ) + def test_import_from_cre_csv(self) -> None: + os.environ["CRE_ALLOW_IMPORT"] = "True" + + input_data, _ = data_gen.export_format_data() + workspace = tempfile.mkdtemp() + data = {} + response = None + with open(os.path.join(workspace, "cre.csv"), "w") as f: + cdw = csv.DictWriter(f, fieldnames=input_data[0].keys()) + cdw.writeheader() + cdw.writerows(input_data) + + data["cre_csv"] = open(os.path.join(workspace, "cre.csv"), "rb") + + with self.app.test_client() as client: + response = client.post( + "/rest/v1/cre_csv_import", + data=data, + buffered=True, + content_type="multipart/form-data", + ) + self.assertEqual(200, response.status_code) + self.assertEqual( + { + "status": "success", + "new_cres": [ + "000-001", + "222-222", + "333-333", + "444-444", + "555-555", + "666-666", + "777-777", + "888-888", + ], + "new_standards": 5, + }, + json.loads(response.data), + ) + def test_get_cre_csv(self) -> None: # empty string means temporary db collection = db.Node_collection().with_graph() @@ -899,8 +887,8 @@ def test_get_cre_csv(self) -> None: for i in range(4): c = defs.CRE( - description=f"CREdesc{i}-{j}", - name=f"CREname{i}-{j}", + description=f"CREdesc{j}-{i}", + name=f"CREname{j}-{i}", id=f"123-4{j}{i}", ) dbcre = collection.add_cre(c) diff --git a/application/utils/external_project_parsers/parsers/cloud_native_security_controls.py b/application/utils/external_project_parsers/parsers/cloud_native_security_controls.py index 1d492fa52..be216c27d 100644 --- a/application/utils/external_project_parsers/parsers/cloud_native_security_controls.py +++ b/application/utils/external_project_parsers/parsers/cloud_native_security_controls.py @@ -64,7 +64,7 @@ def parse(self, cache: db.Node_collection, ph: prompt_client.PromptHandler): ) standard_id = ph.get_id_of_most_similar_node(cnsc_embeddings) if standard_id: - dbstandard = cache.get_node_by_db_id(standard_id) + dbstandard = cache.get_nodes(db_id=standard_id) logger.info( f"found an appropriate standard for Cloud Native Security Control {cnsc.section}:{cnsc.subsection}, it is: {dbstandard.name}:{dbstandard.section}" ) diff --git a/application/utils/external_project_parsers/parsers/juiceshop.py b/application/utils/external_project_parsers/parsers/juiceshop.py index 432255690..2f8cd21c6 100644 --- a/application/utils/external_project_parsers/parsers/juiceshop.py +++ b/application/utils/external_project_parsers/parsers/juiceshop.py @@ -78,7 +78,7 @@ def parse( f"could not find an appropriate CRE for Juiceshop challenge {chal.section}, findings similarities with standards instead" ) standard_id = ph.get_id_of_most_similar_node(challenge_embeddings) - dbstandard = cache.get_node_by_db_id(standard_id) + dbstandard = cache.get_nodes(db_id=standard_id) logger.info( f"found an appropriate standard for Juiceshop challenge {chal.section}, it is: {dbstandard.section}" ) diff --git a/application/utils/external_project_parsers/parsers/pci_dss.py b/application/utils/external_project_parsers/parsers/pci_dss.py index bafc20914..8a451d41f 100644 --- a/application/utils/external_project_parsers/parsers/pci_dss.py +++ b/application/utils/external_project_parsers/parsers/pci_dss.py @@ -79,7 +79,7 @@ def __parse( f"could not find an appropriate CRE for pci {pci_control.section}, findings similarities with standards instead" ) standard_id = prompt.get_id_of_most_similar_node(control_embeddings) - dbstandard = cache.get_node_by_db_id(standard_id) + dbstandard = cache.get_nodes(db_id=standard_id) logger.info( f"found an appropriate standard for pci {pci_control.section}, it is: {dbstandard.section}" ) diff --git a/application/utils/spreadsheet.py b/application/utils/spreadsheet.py index 5a858625c..95abeef5c 100644 --- a/application/utils/spreadsheet.py +++ b/application/utils/spreadsheet.py @@ -1,3 +1,4 @@ +from pprint import pprint import csv import io import logging @@ -25,6 +26,10 @@ def findDups(x): return {val for val in x if (val in seen or seen.add(val))} +def load_csv(): + pass + + def read_spreadsheet( url: str, alias: str, validate: bool = True, parse_numbered_only=True ) -> Dict[str, Any]: @@ -77,151 +82,199 @@ def read_spreadsheet( return result -def __add_cre_to_spreadsheet( - document: defs.Document, - header: Dict[str, Optional[str]], - cresheet: List[Dict[str, Any]], - maxgroups: int, -) -> List[Dict[str, Any]]: - cresheet.append(header.copy()) - working_array = cresheet[-1] - conflicts = [] - if document.doctype == defs.Credoctypes.CRE: - working_array[defs.ExportFormat.cre_name_key()] = document.name - working_array[defs.ExportFormat.cre_id_key()] = document.id - working_array[defs.ExportFormat.cre_description_key()] = document.description - # case where a lone standard is displayed without any CRE links - elif document.doctype == defs.Credoctypes.Standard: - working_array[ - defs.ExportFormat.section_key(sname=document.name, doctype=document.doctype) - ] = document.section # type: ignore - working_array[ - defs.ExportFormat.subsection_key( - sname=document.name, doctype=document.doctype +class ExportSheet: + processed_ids = [] + body = [] + input_cres = {} + + def __init__(self): + self.processed_ids = [] + self.body = [] + self.input_cres = {} + + def write_cre_entry(self, cre: defs.CRE, depth: int) -> Dict[str, str]: + self.processed_ids.append(cre.id) + return {f"CRE {depth}": f"{cre.id}{defs.ExportFormat.separator}{cre.name}"} + + def write_standard_entry(self, node: defs.Document) -> Dict[str, str]: + standardEntry = {} + if node.section: + standardEntry[f"{defs.ExportFormat.section_key(node.name)}"] = node.section + + if node.sectionID: + standardEntry[f"{defs.ExportFormat.sectionID_key(node.name)}"] = ( + node.sectionID ) - ] = document.subsection # type: ignore - working_array[ - defs.ExportFormat.hyperlink_key( - sname=document.name, doctype=document.doctype + + if node.hyperlink: + standardEntry[f"{defs.ExportFormat.hyperlink_key(node.name)}"] = ( + node.hyperlink ) - ] = document.hyperlink # type: ignore - for link in document.links: - if ( - link.document.doctype == defs.Credoctypes.Standard - ): # linking to normal standard - # a single CRE can link to multiple subsections of the same - # standard hence we can have conflicts - if working_array[ - defs.ExportFormat.section_key( - sname=link.document.name, doctype=link.document.doctype - ) - ]: - conflicts.append(link) - else: - working_array[ - defs.ExportFormat.section_key( - sname=link.document.name, doctype=link.document.doctype - ) - ] = link.document.section - working_array[ - defs.ExportFormat.subsection_key( - sname=link.document.name, doctype=link.document.doctype - ) - ] = link.document.subsection - working_array[ - defs.ExportFormat.hyperlink_key( - sname=link.document.name, doctype=link.document.doctype - ) - ] = link.document.hyperlink - working_array[ - defs.ExportFormat.link_type_key( - sname=link.document.name, doctype=link.document.doctype - ) - ] = link.ltype.value - elif link.document.doctype == defs.Credoctypes.CRE: - # linking to another CRE - grp_added = False - for i in range(0, maxgroups): - if not working_array[defs.ExportFormat.linked_cre_id_key(str(i))]: - grp_added = True - working_array[defs.ExportFormat.linked_cre_id_key(str(i))] = ( - link.document.id - ) - working_array[defs.ExportFormat.linked_cre_name_key(str(i))] = ( - link.document.name - ) - working_array[ - defs.ExportFormat.linked_cre_link_type_key(str(i)) - ] = link.ltype.value - break + if node.description: + standardEntry[f"{defs.ExportFormat.description_key(node.name)}"] = ( + node.description + ) + return standardEntry + + def process_cre(self, cre: defs.CRE, depth: int): + if not self.input_cres.get(cre.id): + return None + + cre_entry = self.write_cre_entry(cre=self.input_cres.pop(cre.id), depth=depth) + if not cre.links: + self.body.append(cre_entry) + return None + hasStandard = False + for link in cre.links: + entry = deepcopy(cre_entry) # start from the base for every link + if link.document.doctype != defs.Credoctypes.CRE: + hasStandard = True + entry_standard = self.write_standard_entry(link.document) + entry.update(entry_standard) + self.body.append(entry) + elif ( + self.input_cres.get(link.document.id) + and link.ltype == defs.LinkTypes.Contains + ): + if ( + not hasStandard + ): # if we have not written the entry yet (because there have been no standards to write), we write it now as a cre without standards - if not grp_added: - logger.fatal( - "Tried to add Group %s but all of the %s group " - "slots are filled. This must be a bug" - % (link.document.name, maxgroups) + self.body.append(entry) + self.process_cre( + cre=self.input_cres.get(link.document.id), depth=depth + 1 ) - # conflicts handling - if len(conflicts): - new_cre = deepcopy(document) - new_cre.links = conflicts - cresheet = __add_cre_to_spreadsheet( - document=new_cre, header=header, cresheet=cresheet, maxgroups=maxgroups - ) - return cresheet - - -def prepare_spreadsheet( - collection: db.Node_collection, docs: List[defs.Document] -) -> List[Dict[str, Any]]: - """ - Given a list of cre_defs.Document will create a list - of key,value dict representing the mappings - """ - nodes = collection.get_node_names() # get header from db (cheap enough) - header: Dict[str, Optional[str]] = { - defs.ExportFormat.cre_name_key(): None, - defs.ExportFormat.cre_id_key(): None, - defs.ExportFormat.cre_description_key(): None, - } - if nodes: - for typ, name in nodes: - header[ - defs.ExportFormat.section_key(name, defs.Credoctypes.from_str(typ)) - ] = None - header[ - defs.ExportFormat.subsection_key(name, defs.Credoctypes.from_str(typ)) - ] = None - header[ - defs.ExportFormat.hyperlink_key(name, defs.Credoctypes.from_str(typ)) - ] = None - header[ - defs.ExportFormat.link_type_key(name, defs.Credoctypes.from_str(typ)) - ] = None - maxgroups = collection.get_max_internal_connections() - for i in range(0, maxgroups): - header[defs.ExportFormat.linked_cre_id_key(str(i))] = None - header[defs.ExportFormat.linked_cre_name_key(str(i))] = None - header[defs.ExportFormat.linked_cre_link_type_key(str(i))] = None - - logger.debug(header) - - flatdict = {} - result = [] - for cre in docs: - flatdict[cre.name] = __add_cre_to_spreadsheet( - document=cre, header=header, cresheet=[], maxgroups=maxgroups + def inject_cre_in_processed_body(self, cre: defs.CRE, higher_cres: List[defs.CRE]): + """ + Given a cre to inject and a list of higher_cres to look for in order to inject afterwards, will inject the cre in the body + """ + lineIndex = 0 + for line in self.body: + for key in line.keys(): + ancestor_id = line[key].split(defs.ExportFormat.separator)[0] + if key.startswith("CRE") and ancestor_id in [ + h.id for h in higher_cres + ]: # we found the parent + new_depth = len(higher_cres) + entry = self.write_cre_entry(cre=cre, depth=new_depth) + for link in cre.links: + if link.document.doctype != defs.Credoctypes.CRE: + new_entry = self.write_standard_entry(link.document) + if set(new_entry.keys()).intersection(entry.keys()): + # this cre links to the same standard twice so we need to add two lines + self.body.insert( + lineIndex + 1, entry + ) # write buffer first + lineIndex += 1 + entry = ( + entry.copy() + ) # we need to copy the entry as we are going to update it and we want to update the values not the reference + # update buffer with new standard so we can write in the next line + entry.update(new_entry) + self.body.insert(lineIndex + 1, entry) + return None + lineIndex += 1 + + def prepare_spreadsheet( + self, docs: List[defs.Document], storage: db.Node_collection + ) -> List[Dict[str, Any]]: + """ + Given a list of cre_defs.Document will create a list + of key,value dict representing the mappings + """ + + # TODO (northdpole): traverses the same docs list multiple times + # should be optimized + + if not docs: + return self.body + + non_cre_docs = [] + + for doc in docs: + if doc.doctype == defs.Credoctypes.CRE: + self.input_cres[doc.id] = doc + for link in doc.links: + if link.document.doctype == defs.Credoctypes.CRE: + if not self.input_cres.get(link.document.id): + self.input_cres[link.document.id] = link.document + else: # flip the link to be CRE -> Node + newCRE = None + for link in doc.links: + if link.document.doctype == defs.Credoctypes.CRE: + newCRE = self.input_cres.get(link.document.id) + if not newCRE: + newCRE = link.document + if not newCRE.link_exists(doc): + newCRE.add_link( + defs.Link(document=doc.shallow_copy(), ltype=link.ltype) + ) + self.input_cres[newCRE.id] = newCRE + if not newCRE: + non_cre_docs.append(doc) + cre_id_to_depth = {} + for cre in self.input_cres.values(): + cre_id_to_depth[cre.id] = storage.get_cre_hierarchy(cre=cre) + + sorted_cres = sorted( + self.input_cres.values(), key=lambda x: cre_id_to_depth[x.id] ) - result.extend(flatdict[cre.name]) - return result + + for cre in sorted_cres: + processed = False + depth = cre_id_to_depth[cre.id] + if depth == 0: + self.process_cre(cre=cre, depth=depth) + continue + + # if we have a depth > 0, we need to find the parent + if not self.input_cres.get(cre.id): + # happy path first, we already processed this cre + continue + + # not so happy path second, find if there is a processed cre that has a path to this cre + for id in self.processed_ids: + path = storage.get_cre_path(fromID=cre.id, toID=id) + if path: + # we skip the last element as it is the cre we are looking for + self.inject_cre_in_processed_body(cre=cre, higher_cres=path[:-1]) + processed = True + break + if processed: + continue + # if we still have the cre, it means it needs a path to a root cre and we need to append it to the body + # find the root this cre is linked to + root = storage.get_root_cres() + + for r in root: + path = storage.get_cre_path(fromID=r.id, toID=cre.id) + if path: + pathIndex = 0 + for element in path: + self.body.append( + self.write_cre_entry(cre=element, depth=pathIndex) + ) + pathIndex += 1 + entry = self.write_cre_entry(cre=cre, depth=pathIndex) + for link in cre.links: + if link.document.doctype != defs.Credoctypes.CRE: + entry.update(self.write_standard_entry(link.document)) + self.body.append(entry) + break + + for doc in non_cre_docs: + entry = self.write_standard_entry(doc) + self.body.append(entry) + return self.body def write_csv(docs: List[Dict[str, Any]]) -> io.StringIO: data = io.StringIO() - fieldnames: List[str] = list(docs[0].keys()) - writer: csv.DictWriter = csv.DictWriter(data, fieldnames=fieldnames) # type: ignore + fieldnames = {} + [fieldnames.update(d) for d in docs] + writer: csv.DictWriter = csv.DictWriter(data, fieldnames=fieldnames.keys()) # type: ignore writer.writeheader() writer.writerows(docs) return data @@ -253,7 +306,7 @@ def add_offset_cre( rows = [] rows.append( - {f"CRE {offset}": f"{cre.id}{defs.ExportFormat.separator.value}{cre.name}"} + {f"CRE {offset}": f"{cre.id}{defs.ExportFormat.separator}{cre.name}"} ) visited_cres.add(cre.id) dbcre = database.get_CREs(external_id=cre.id) diff --git a/application/utils/spreadsheet_parsers.py b/application/utils/spreadsheet_parsers.py index 3ce4b0e9e..ade144146 100644 --- a/application/utils/spreadsheet_parsers.py +++ b/application/utils/spreadsheet_parsers.py @@ -1,3 +1,4 @@ +from pprint import pprint import logging import re from copy import copy @@ -134,178 +135,123 @@ def is_empty(value: Optional[str]) -> bool: ) -def parse_export_format(lfile: List[Dict[str, Any]]) -> Dict[str, defs.Document]: +def parse_export_format(lfile: List[Dict[str, Any]]) -> Dict[str, List[defs.Document]]: """ Given: a spreadsheet written by prepare_spreadsheet() return a list of CRE docs - cases: - standard - standard -> standard - cre -> other documents - cre -> standards - cre -> standards, other documents """ + cres: Dict[str, defs.CRE] = {} + standards: Dict[str, Dict[str, defs.Standard]] = {} + documents: Dict[str, List[defs.Document]] = {} - def get_linked_nodes(mapping: Dict[str, str]) -> List[defs.Link]: - nodes = [] - names = set( - [ - k.split(defs.ExportFormat.separator.value)[1] - for k, v in mapping.items() - if not is_empty(v) - and "CRE" not in k.upper() - and len(k.split(defs.ExportFormat.separator.value)) >= 3 - ] - ) - for name in names: - type = defs.ExportFormat.get_doctype( - [m for m in mapping.keys() if name in m][0] - ) - if not type: - raise ValueError( - f"Mapping of {name} not in format of :{name}:" - ) - section = str(mapping.get(defs.ExportFormat.section_key(name, type))) - subsection = str(mapping.get(defs.ExportFormat.subsection_key(name, type))) - hyperlink = str(mapping.get(defs.ExportFormat.hyperlink_key(name, type))) - link_type = str(mapping.get(defs.ExportFormat.link_type_key(name, type))) - tooltype = defs.ToolTypes.from_str( - str(mapping.get(defs.ExportFormat.tooltype_key(name, type))) - ) - sectionID = str(mapping.get(defs.ExportFormat.sectionID_key(name, type))) - description = str( - mapping.get(defs.ExportFormat.description_key(name, type)) - ) - node = None - if type == defs.Credoctypes.Standard: - node = defs.Standard( - name=name, - section=section, - subsection=subsection, - hyperlink=hyperlink, - sectionID=sectionID, - ) - elif type == defs.Credoctypes.Code: - node = defs.Code( - description=description, hyperlink=hyperlink, name=name - ) - elif type == defs.Credoctypes.Tool: - node = defs.Tool( - tooltype=tooltype, - name=name, - description=description, - hyperlink=hyperlink, - section=section, - sectionID=sectionID, - ) + if not lfile: + return documents - lt: defs.LinkTypes - if not is_empty(link_type): - lt = defs.LinkTypes.from_str(link_type) - else: - lt = defs.LinkTypes.LinkedTo - nodes.append(defs.Link(document=node, ltype=lt)) - return nodes - - cre: defs.Document - internal_mapping: defs.Document - documents: Dict[str, defs.Document] = {} - lone_nodes: Dict[str, defs.Node] = {} - link_types_regexp = re.compile(defs.ExportFormat.linked_cre_name_key("(\d+)")) max_internal_cre_links = len( - set([k for k, v in lfile[0].items() if link_types_regexp.match(k)]) + set([k for k in lfile[0].keys() if k.startswith("CRE")]) ) - for mapping in lfile: - # if the line does not register a CRE - if not mapping.get(defs.ExportFormat.cre_name_key()): - # standard -> nothing | standard - for st in get_linked_nodes(mapping): - lone_nodes[ - f"{st.document.doctype}:{st.document.name}:{st.document.section}" - ] = st.document - logger.info( - f"adding node: {st.document.doctype}:{st.document.name}:{st.document.section}" - ) - else: # cre -> standards, other documents - name = str(mapping.pop(defs.ExportFormat.cre_name_key())) - id = str(mapping.pop(defs.ExportFormat.cre_id_key())) - description = "" - if defs.ExportFormat.cre_description_key() in mapping: - description = mapping.pop(defs.ExportFormat.cre_description_key()) - - if name not in documents.keys(): # register new cre - cre = defs.CRE(name=name, id=id, description=description) - else: # it's a conflict mapping so we've seen this before, - # just retrieve so we can add the new info - cre = documents[name] - if cre.id != id: - if is_empty(id): - id = cre.id - else: - logger.fatal( - "id from sheet %s does not match already parsed id %s for cre %s, this looks like a bug" - % (id, cre.id, name) + standard_names = set( + [k.split("|")[0] for k in lfile[0].keys() if not k.startswith("CRE")] + ) + logger.info(f"Found standards with names: {standard_names}") + + highest_cre = None + highest_index = max_internal_cre_links + 1 + + previous_cre = None + previous_index = max_internal_cre_links + 1 + + for mapping_line in lfile: + working_cre = None + working_standard = None + # get highest numbered CRE entry (lowest in hierarchy) + for i in range(max_internal_cre_links - 1, -1, -1): + if not is_empty(mapping_line.get(f"CRE {i}")): + entry = mapping_line.get(f"CRE {i}").split(defs.ExportFormat.separator) + if not entry or len(entry) < 2: + line = mapping_line.get(f"CRE {i}") + raise ValueError( + f"mapping line contents: {line}, key: CRE {i} is not formatted correctly, missing separator {defs.ExportFormat.separator}" + ) + working_cre = defs.CRE(name=entry[1], id=entry[0]) + if cres.get(working_cre.id): + working_cre = cres[working_cre.id] + + if previous_index < i: # we found a higher hierarchy CRE + previous_index = i + highest_cre = previous_cre + cres[highest_cre.id] = highest_cre.add_link( + defs.Link( + document=working_cre.shallow_copy(), + ltype=defs.LinkTypes.Contains, ) - continue - if is_empty(cre.description) and not is_empty(description): - # might have seen the particular name/id as an internal - # mapping, in which case just update the description and continue - cre.description = description - - # register the standards part - for standard in get_linked_nodes(mapping): - cre.add_link(standard) - - # add the CRE links - for i in range(0, max_internal_cre_links): - name = str(mapping.pop(defs.ExportFormat.linked_cre_name_key(str(i)))) - if not is_empty(name): - id = str(mapping.pop(defs.ExportFormat.linked_cre_id_key(str(i)))) - link_type = str( - mapping.pop(defs.ExportFormat.linked_cre_link_type_key(str(i))) ) - if name in documents: - internal_mapping = documents[name] - if internal_mapping.id != id: - if is_empty(id): - id = internal_mapping.id - else: - logger.fatal( - "id from sheet %s does not match already parsed id %s for cre/group %s, this looks like a bug" - % (id, internal_mapping.id, name) - ) - continue - else: - internal_mapping = defs.CRE(name=name, id=id) - lt = defs.LinkTypes.from_str(link_type) - sub_lt: defs.LinkTypes - if lt == defs.LinkTypes.Contains: - sub_lt = defs.LinkTypes.PartOf - internal_mapping.add_link( + elif highest_index < i: # we found a higher hierarchy CRE + if not highest_cre.link_exists(working_cre): + cres[highest_cre.id] = highest_cre.add_link( defs.Link( - document=defs.CRE( # add a link to the original without the links - name=cre.name, - id=cre.id, - description=cre.description, - ), - ltype=sub_lt, + document=working_cre.shallow_copy(), + ltype=defs.LinkTypes.Contains, ) ) - documents[name] = internal_mapping + else: + logger.warning( + f"Link between {highest_cre.name} and {working_cre.name} already exists" + ) + elif highest_cre == None: + highest_cre = working_cre + highest_index = i + + previous_index = i + previous_cre = working_cre + break + + for s in standard_names: + if not is_empty(mapping_line.get(f"{s}{defs.ExportFormat.separator}name")): + working_standard = defs.Standard( + name=s, + sectionID=mapping_line.get(f"{s}{defs.ExportFormat.separator}id"), + section=mapping_line.get(f"{s}{defs.ExportFormat.separator}name"), + hyperlink=mapping_line.get( + f"{s}{defs.ExportFormat.separator}hyperlink", "" + ), + description=mapping_line.get( + f"{s}{defs.ExportFormat.separator}description", "" + ), + ) + if standards.get(working_standard.name) and standards.get( + working_standard.name + ).get(working_standard.id): + working_standard = standards[working_standard.name][ + working_standard.id + ] - if name not in [l.document.name for l in cre.links]: - cre.add_link( - defs.Link( - document=defs.CRE( - name=internal_mapping.name, - id=internal_mapping.id, - description=internal_mapping.description, - ), - ltype=defs.LinkTypes.from_str(link_type), - ) + if working_cre: + working_cre.add_link( + defs.Link( + document=working_standard.shallow_copy(), + ltype=defs.LinkTypes.LinkedTo, ) - documents[cre.name] = cre - documents.update(lone_nodes) + ) + working_standard.add_link( + defs.Link( + document=working_cre.shallow_copy(), + ltype=defs.LinkTypes.LinkedTo, + ) + ) + + if working_standard.name not in standards: + standards[working_standard.name] = {} + + standards[working_standard.name][working_standard.id] = working_standard + + if working_cre: + cres[working_cre.id] = working_cre + documents[defs.Credoctypes.CRE] = list(cres.values()) + + for standard_name, standard_entries in standards.items(): + logger.info(f"Adding {len(standard_entries)} entries for {standard_name}") + documents[standard_name] = list(standard_entries.values()) return documents diff --git a/application/web/web_main.py b/application/web/web_main.py index 0b5cc4831..2ac44d052 100644 --- a/application/web/web_main.py +++ b/application/web/web_main.py @@ -1,5 +1,7 @@ # type: ignore + # silence mypy for the routes file +import csv from functools import wraps import json import logging @@ -7,12 +9,16 @@ import io import pathlib import urllib.parse +from alive_progress import alive_bar from typing import Any from application.utils import oscal_utils, redis from rq import job, exceptions +from application.utils import spreadsheet_parsers +from application.utils import oscal_utils, redis from application.database import db +from application.cmd import cre_main from application.defs import cre_defs as defs from application.defs import osib_defs as odefs from application.utils import spreadsheet as sheet_utils @@ -71,7 +77,8 @@ def extend_cre_with_tag_links( others = list(frozenset(others)) for o in others: o.links = [] - cre.add_link(defs.Link(ltype=defs.LinkTypes.Related, document=o)) + if not cre.link_exists(o) and o.id != cre.id: + cre.add_link(defs.Link(ltype=defs.LinkTypes.Related, document=o)) return cre @@ -110,7 +117,9 @@ def find_cre(creid: str = None, crename: str = None) -> Any: # refer return f"
{mdutils.cre_to_md([cre])}
" elif opt_format == SupportedFormats.CSV.value: - docs = sheet_utils.prepare_spreadsheet(collection=database, docs=[cre]) + docs = sheet_utils.ExportSheet().prepare_spreadsheet( + docs=[cre], storage=database + ) return write_csv(docs=docs).getvalue().encode("utf-8") elif opt_format == SupportedFormats.OSCAL.value: @@ -181,7 +190,9 @@ def find_node_by_name(name: str, ntype: str = defs.Credoctypes.Standard.value) - return f"
{mdutils.cre_to_md(nodes)}
" elif opt_format == SupportedFormats.CSV.value: - docs = sheet_utils.prepare_spreadsheet(collection=database, docs=nodes) + docs = sheet_utils.ExportSheet().prepare_spreadsheet( + docs=nodes, storage=database + ) return write_csv(docs=docs).getvalue().encode("utf-8") elif opt_format == SupportedFormats.OSCAL.value: @@ -214,7 +225,9 @@ def find_document_by_tag() -> Any: if opt_format == SupportedFormats.Markdown.value: return f"
{mdutils.cre_to_md(documents)}
" elif opt_format == SupportedFormats.CSV.value: - docs = sheet_utils.prepare_spreadsheet(collection=database, docs=documents) + docs = sheet_utils.ExportSheet().prepare_spreadsheet( + docs=documents, storage=database + ) return write_csv(docs=docs).getvalue().encode("utf-8") elif opt_format == SupportedFormats.OSCAL.value: return jsonify(json.loads(oscal_utils.list_to_oscal(documents))) @@ -344,7 +357,9 @@ def text_search() -> Any: if opt_format == SupportedFormats.Markdown.value: return f"
{mdutils.cre_to_md(documents)}
" elif opt_format == SupportedFormats.CSV.value: - docs = sheet_utils.prepare_spreadsheet(collection=database, docs=documents) + docs = sheet_utils.ExportSheet().prepare_spreadsheet( + docs=documents, storage=database + ) return write_csv(docs=docs).getvalue().encode("utf-8") elif opt_format == SupportedFormats.OSCAL.value: return jsonify(json.loads(oscal_utils.list_to_oscal(documents))) @@ -373,7 +388,9 @@ def find_root_cres() -> Any: if opt_format == SupportedFormats.Markdown.value: return f"
{mdutils.cre_to_md(documents)}
" elif opt_format == SupportedFormats.CSV.value: - docs = sheet_utils.prepare_spreadsheet(collection=database, docs=documents) + docs = sheet_utils.ExportSheet().prepare_spreadsheet( + docs=documents, storage=database + ) return write_csv(docs=docs).getvalue().encode("utf-8") elif opt_format == SupportedFormats.OSCAL.value: return jsonify(json.loads(oscal_utils.list_to_oscal(documents))) @@ -668,6 +685,9 @@ def all_cres() -> Any: abort(404) +# Importing Handlers + + @app.route("/rest/v1/cre_csv", methods=["GET"]) def get_cre_csv() -> Any: database = db.Node_collection() @@ -692,6 +712,57 @@ def get_cre_csv() -> Any: abort(404) +@app.route("/rest/v1/cre_csv_import", methods=["POST"]) +def import_from_cre_csv() -> Any: + if not os.environ.get("CRE_ALLOW_IMPORT"): + abort( + 403, + "Importing is disabled, set the environment variable CRE_ALLOW_IMPORT to allow this functionality", + ) + + # TODO: (spyros) add optional gap analysis and embeddings calculation + database = db.Node_collection().with_graph() + file = request.files.get("cre_csv") + calculate_embeddings = ( + False if not request.args.get("calculate_embeddings") else True + ) + calculate_gap_analysis = ( + False if not request.args.get("calculate_gap_analysis") else True + ) + + if file is None: + abort(400, "No file provided") + contents = file.read() + csv_read = csv.DictReader(contents.decode("utf-8").splitlines()) + documents = spreadsheet_parsers.parse_export_format(list(csv_read)) + cres = documents.pop(defs.Credoctypes.CRE.value) + + standards = documents + new_cres = [] + for cre in cres: + new_cre, exists = cre_main.register_cre(cre, database) + if not exists: + new_cres.append(new_cre) + + for _, entries in standards.items(): + cre_main.register_standard( + collection=database, + standard_entries=list(entries), + generate_embeddings=calculate_embeddings, + calculate_gap_analysis=calculate_gap_analysis, + ) + return jsonify( + { + "status": "success", + "new_cres": [c.external_id for c in new_cres], + "new_standards": len(standards), + } + ) + + +# /End Importing Handlers + + # @app.route("/rest/v1/all_nodes", methods=["GET"]) # def all_nodes() -> Any: # database = db.Node_collection()