diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 0000000..7fe70d7
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1 @@
+*.json filter=lfs diff=lfs merge=lfs -text
diff --git a/.github/workflows/add_dunder_methods.yaml b/.github/workflows/add_dunder_methods.yaml
new file mode 100644
index 0000000..c56c931
--- /dev/null
+++ b/.github/workflows/add_dunder_methods.yaml
@@ -0,0 +1,37 @@
+name: add dunder methods to genome_annotation model
+
+on:
+ push:
+ paths:
+ - 'bkbit/models/genome_annotation.py'
+
+permissions:
+ contents: write
+
+jobs:
+ run-script:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout this repository
+ uses: actions/checkout@v3
+
+ - name: Set up Python
+ uses: actions/setup-python@v4
+ with:
+ python-version: 3.9
+
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+
+
+ - name: Run add_dunderMethods_genomeAnnotation
+ run: python bkbit/model_editors/add_dunderMethods_genomeAnnotation.py
+
+ - name: Commit changes
+ run: |
+ git config --global user.name 'github-actions'
+ git config --global user.email 'github-actions@github.com'
+ git add bkbit/models/genome_annotation.py
+ git commit -m 'Update genome_annotation.py with dunder methods'
+ git push
\ No newline at end of file
diff --git a/bkbit/cli.py b/bkbit/cli.py
index 8201ab7..3c8eff2 100644
--- a/bkbit/cli.py
+++ b/bkbit/cli.py
@@ -3,6 +3,8 @@
from bkbit.data_translators.library_generation_translator import specimen2jsonld
from bkbit.model_converters.yaml2sheet_converter import yaml2cvs
from bkbit.data_translators.file_manifest_translator import filemanifest2jsonld
+from bkbit.data_translators.genome_annotation_translator import gff2jsonld
+from bkbit.utils.get_ncbi_taxonomy import download_ncbi_taxonomy
@click.group()
def cli():
@@ -14,6 +16,8 @@ def cli():
cli.add_command(specimen2jsonld)
cli.add_command(yaml2cvs)
cli.add_command(filemanifest2jsonld)
+cli.add_command(gff2jsonld)
+cli.add_command(download_ncbi_taxonomy)
if __name__ == '__main__':
cli()
diff --git a/bkbit/data_translators/README.md b/bkbit/data_translators/README.md
index a3178fb..a4f1987 100644
--- a/bkbit/data_translators/README.md
+++ b/bkbit/data_translators/README.md
@@ -95,4 +95,111 @@ ls .
DO-XIQQ6047.jsonld
DO-WFFF3774.jsonld
DO-RMRL6873.jsonld
+# genome_annotation_translator.py
+
+## Overview
+genome_annotation_translator uses annotated genome data in GFF3 format to generate respective data objects representing genes, genome assemblies, and organisms. All data object are defined in the [Genome Annotation Schema](https://brain-bican.github.io/models/index_genome_annotation/).
+Each jsonld file will contain:
+- GeneAnnotation objects
+- 1 GenomeAnnotation object
+- 1 GenomeAssembly object
+- 1 OrganismTaxon object
+- 1 Checksum object
+
+
+
+## Command Line
+### gen-geneannotation
+```python
+gen-geneannotation [OPTIONS] GFF3_URL
+```
+
+#### Options
+-a, --assembly_accession
+ ID assigned to the genomic assembly used in the GFF3 file.
+ *Note*: Must be provided when using ENSEMBL GFF3 files
+
+-s, --assembly_strain
+ Specific strain of the organism associated with the GFF3 file.
+
+-l, --log_level
+ Logging level.
+ DEFAULT:
+ 'WARNING'
+ OPTIONS:
+ DEBUG | INFO | WARNING | ERROR | CRITICIAL
+
+-f, --log_to_file
+ Log to a file instead of the console.
+ DEFAULT:
+ False
+
+## Examples
+#### Example 1: NCBI GFF3 File
+
+```python
+pip install bkbit
+
+gen-geneannotation 'https://ftp.ncbi.nlm.nih.gov/genomes/all/annotation_releases/9823/106/GCF_000003025.6_Sscrofa11.1/GCF_000003025.6_Sscrofa11.1_genomic.gff.gz' > output.jsonld
+```
+
+#### Example 2: ENSEMBL GFF3 File
+
+```python
+pip install bkbit
+
+# genome_annotation_translator.py
+
+## Overview
+genome_annotation_translator uses annotated genome data in GFF3 format to generate respective data objects representing genes, genome assemblies, and organisms. All data object are defined in the [Genome Annotation Schema](https://brain-bican.github.io/models/index_genome_annotation/).
+Each jsonld file will contain:
+- GeneAnnotation objects
+- 1 GenomeAnnotation object
+- 1 GenomeAssembly object
+- 1 OrganismTaxon object
+- 1 Checksum object
+
+
+
+## Command Line
+### gen-geneannotation
+```python
+gen-geneannotation [OPTIONS] GFF3_URL
+```
+
+#### Options
+-a, --assembly_accession
+ ID assigned to the genomic assembly used in the GFF3 file.
+ *Note*: Must be provided when using ENSEMBL GFF3 files
+
+-s, --assembly_strain
+ Specific strain of the organism associated with the GFF3 file.
+
+-l, --log_level
+ Logging level.
+ DEFAULT:
+ 'WARNING'
+ OPTIONS:
+ DEBUG | INFO | WARNING | ERROR | CRITICIAL
+
+-f, --log_to_file
+ Log to a file instead of the console.
+ DEFAULT:
+ False
+
+## Examples
+#### Example 1: NCBI GFF3 File
+
+```python
+pip install bkbit
+
+gen-geneannotation 'https://ftp.ncbi.nlm.nih.gov/genomes/all/annotation_releases/9823/106/GCF_000003025.6_Sscrofa11.1/GCF_000003025.6_Sscrofa11.1_genomic.gff.gz' > output.jsonld
+```
+
+#### Example 2: ENSEMBL GFF3 File
+
+```python
+pip install bkbit
+
+gen-geneannotation -a 'GCF_003339765.1' 'https://ftp.ensembl.org/pub/release-104/gff3/macaca_mulatta/Macaca_mulatta.Mmul_10.104.gff3.gz' > output.jsonld
```
\ No newline at end of file
diff --git a/bkbit/data_translators/genome_annotation_translator.py b/bkbit/data_translators/genome_annotation_translator.py
index 625c237..0615853 100644
--- a/bkbit/data_translators/genome_annotation_translator.py
+++ b/bkbit/data_translators/genome_annotation_translator.py
@@ -1,71 +1,77 @@
+"""
+Module for downloading, parsing, and processing GFF3 files from NCBI and Ensembl repositories. This module provides functionality to:
+
+1. Download a GFF3 file from a specified URL and calculate its checksums.
+2. Parse the GFF3 file to extract gene annotations.
+3. Generate various metadata objects such as organism taxon, genome assembly, and genome annotation.
+4. Serialize the extracted information into JSON-LD format for further use.
+
+Classes:
+ Gff3: A class to handle the entire process of downloading, parsing, and processing GFF3 files.
+
+Functions:
+ cli: Command line interface function to execute the module as a script.
+
+Usage:
+ The module can be run as a standalone script by executing it with appropriate arguments and options:
+
+ ```
+ python genome_annotation_translator.py -a -s -l -f
+ ```
+ The script will download the GFF3 file from the specified URL, parse it, and serialize the extracted information into JSON-LD format.
+
+Example:
+ ```
+ python genome_annotation_translator.py "https://example.com/path/to/gff3.gz" -a "GCF_000001405.39" -s "strain_name" -l "INFO" -f True
+ ```
+
+Dependencies:
+ - re
+ - hashlib
+ - tempfile
+ - uuid
+ - urllib
+ - urllib.request
+ - urllib.parse
+ - os
+ - json
+ - datetime
+ - collections.defaultdict
+ - subprocess
+ - gzip
+ - tqdm
+ - click
+ - pkg_resources
+ - bkbit.models.genome_annotation as ga
+ - bkbit.utils.setup_logger as setup_logger
+ - bkbit.utils.load_json as load_json
+"""
+
import re
import hashlib
import tempfile
import uuid
-import logging
import urllib
import urllib.request
+from urllib.parse import urlparse
import os
import json
from datetime import datetime
from collections import defaultdict
import subprocess
import gzip
+import sys
from tqdm import tqdm
+import click
+import pkg_resources
from bkbit.models import genome_annotation as ga
+from bkbit.utils.setup_logger import setup_logger
+from bkbit.utils.load_json import load_json
-logging.basicConfig(
- filename="gff3_translator_" + datetime.now().strftime("%Y-%m-%d_%H:%M:%S") + ".log",
- format="%(levelname)s: %(message)s (%(asctime)s)",
- datefmt="%m/%d/%Y %I:%M:%S %p",
- level=logging.INFO,
-)
-logger = logging.getLogger(__name__)
-
## CONSTANTS ##
-TAXON_SCIENTIFIC_NAME = {
- "9606": "Homo sapiens",
- "10090": "Mus musculus",
- "9544": "Macaca mulatta",
- "9483": "Callithrix jacchus",
- "60711": "Chlorocebus sabaeus",
- "9361": "Dasypus novemcinctus",
- "9685": "Felis catus",
- "9669": "Mustela putorius furo",
- "30611": "Otolemur garnettii",
- "9593": "Gorilla gorilla",
- "13616": "Monodelphis domestica",
- "9823": "Sus scrofa",
- "9986": "Oryctolagus cuniculus",
- "10116": "Rattus norvegicus",
- "27679": "Saimiri boliviensis",
- "246437": "Tupaia belangeri chinensis",
- "9407": "Rousettus aegyptiacus",
- "9598": "Pan troglodytes"
-}
-TAXON_COMMON_NAME = {
- "9606": "human",
- "10090": "mouse",
- "9544": "rhesus macaque",
- "9483": "common marmoset",
- "60711": "green monkey",
- "9361": "nine-banded armadillo",
- "9685": "cat",
- "9669": "ferret",
- "30611": "galago",
- "9593": "gorilla",
- "13616":"gray short-tailed opossum",
- "9823": "pig",
- "9986": "rabbit",
- "10116": "rat",
- "27679": "squirrel monkey",
- "246437": "Chinese tree shrew",
- "9407": "egyptian fruit bat",
- "9598": "chimpanzee"
-}
PREFIX_MAP = {
"NCBITaxon": "http://purl.obolibrary.org/obo/NCBITaxon_",
"NCBIGene": "http://identifiers.org/ncbigene/",
@@ -81,72 +87,273 @@
"{authority} {taxon_scientific_name} Annotation Release {genome_version}"
)
DEFAULT_FEATURE_FILTER = ("gene", "pseudogene", "ncRNA_gene")
-DEFAULT_HASH = ("SHA256", "MD5")
-
+DEFAULT_HASH = ("MD5",)
+LOG_FILE_NAME = (
+ "gff3_translator_" + datetime.now().strftime("%Y-%m-%d_%H:%M:%S") + ".log"
+)
+TAXON_DIR_PATH = "../utils/ncbi_taxonomy/"
+SCIENTIFIC_NAME_TO_TAXONID_PATH = pkg_resources.resource_filename(__name__, TAXON_DIR_PATH + "scientific_name_to_taxid.json")
+TAXON_SCIENTIFIC_NAME_PATH = pkg_resources.resource_filename(__name__, TAXON_DIR_PATH + "taxid_to_scientific_name.json")
+TAXON_COMMON_NAME_PATH = pkg_resources.resource_filename(__name__, TAXON_DIR_PATH + "taxid_to_common_name.json")
class Gff3:
+ """
+ A class to handle the downloading, parsing, and processing of GFF3 files from NCBI and Ensembl repositories.
+
+ Attributes:
+ content_url (str): The URL of the GFF file.
+ assembly_accession (str): The ID of the genome assembly.
+ assembly_strain (str, optional): The strain of the genome assembly. Defaults to None.
+ log_level (str): The logging level. Defaults to 'WARNING'.
+ log_to_file (bool): Flag to log messages to a file. Defaults to False.
+
+ Methods:
+ __init__(content_url, assembly_accession=None, assembly_strain=None, log_level="WARNING", log_to_file=False):
+ Initializes the Gff3 class with the provided parameters.
+
+ parse_url():
+ Parses the content URL and extracts information about the genome annotation.
+
+ __download_gff_file():
+ Downloads a GFF file from a given URL and calculates the MD5, SHA256, and SHA1 hashes.
+
+ generate_organism_taxon(taxon_id):
+ Generates an organism taxon object based on the provided taxon ID.
+
+ assign_authority_type(authority):
+ Assigns the authority type based on the given authority string.
+
+ generate_genome_assembly(assembly_id, assembly_version, assembly_label, assembly_strain=None):
+ Generates a genome assembly object based on the provided parameters.
+
+ generate_genome_annotation(genome_label, genome_version):
+ Generates a genome annotation object based on the provided parameters.
+
+ generate_digest(hash_values, hash_functions=DEFAULT_HASH):
+ Generates checksum digests for the GFF file using the specified hash functions.
+
+ __get_line_count(file_path):
+ Returns the line count of a file.
+
+ parse(feature_filter=DEFAULT_FEATURE_FILTER):
+ Parses the GFF file and extracts gene annotations based on the provided feature filter.
+
+ generate_ensembl_gene_annotation(attributes, curr_line_num):
+ Generates a GeneAnnotation object for Ensembl based on the provided attributes.
+
+ generate_ncbi_gene_annotation(attributes, curr_line_num):
+ Generates a GeneAnnotation object for NCBI based on the provided attributes.
+
+ __get_attribute(attributes, attribute_name, curr_line_num):
+ Retrieves the value of a specific attribute from the given attributes dictionary.
+
+ __resolve_ncbi_gene_annotation(new_gene_annotation, curr_line_num):
+ Resolves conflicts between existing and new gene annotations based on certain conditions.
+
+ __merge_values(t):
+ Merges values from a list of lists into a dictionary of sets.
+
+ serialize_to_jsonld(exclude_none=True, exclude_unset=False):
+ Serializes the object and either writes it to the specified output file or prints it to the CLI.
+ """
+
def __init__(
self,
content_url,
- taxon_id,
- assembly_id,
- assembly_version,
- assembly_label,
- genome_label: str,
- genome_version: str,
- genome_authority: str,
- hash_functions: tuple[str] = DEFAULT_HASH,
+ assembly_accession=None,
assembly_strain=None,
- gff_file=None,
+ log_level="WARNING",
+ log_to_file=False,
):
"""
Initializes an instance of the GFFTranslator class.
Parameters:
- content_url (str): The URL of the GFF file.
- - taxon_id (int): The taxon ID of the organism.
- assembly_id (str): The ID of the genome assembly.
- - assembly_version (str): The version of the genome assembly.
- - assembly_label (str): The label of the genome assembly.
- - genome_label (str): The label of the genome.
- - genome_version (str): The version of the genome.
- - genome_authority (str): The authority responsible for the genome.
- - hash_functions (tuple[str]): A list of hash functions to use for generating checksums. Defaults to ("SHA256", "MD5").
- assembly_strain (str, optional): The strain of the genome assembly. Defaults to None.
- - gff_file (str, optional): The local path to the GFF file if file is already downloaded. Defaults to None.
+ - hash_functions (tuple[str]): A tuple of hash functions to use for generating checksums. Defaults to ('MD5').
"""
- self.logger = logger
+ self.logger = setup_logger(LOG_FILE_NAME, log_level, log_to_file)
+ try:
+ self.scientific_name_to_taxonid = load_json(SCIENTIFIC_NAME_TO_TAXONID_PATH)
+ self.taxon_scientific_name = load_json(TAXON_SCIENTIFIC_NAME_PATH)
+ self.taxon_common_name = load_json(TAXON_COMMON_NAME_PATH)
+ except FileNotFoundError as e:
+ self.logger.critical("NCBI Taxonomy not downloaded. Run 'bkbit download_ncbi_taxonomy' command first." )
+ print(e)
+ sys.exit(2)
+
self.content_url = content_url
- if gff_file is None:
- self.gff_file = self.__download_gff_file()
- else:
- self.gff_file = gff_file
- self.authority = self.assign_authority_type(genome_authority)
+
+ ## STEP 1: Parse the content URL to get metadata
+ # Parse content_url to get metadata
+ url_metadata = self.parse_url()
+ if url_metadata is None:
+ self.logger.critical(
+ "The provided content URL is not supported. Please provide a valid URL."
+ )
+ raise ValueError(
+ "The provided content URL is not supported. Please provide a valid URL."
+ )
+
+ # Define variables to store metadata
+ (
+ taxon_id,
+ assembly_id,
+ assembly_version,
+ assembly_label,
+ genome_label,
+ genome_version,
+ ) = (None, None, None, None, None, None)
+
+ # Assign the authority type
+ self.authority = url_metadata.get("authority")
+
+ # Assign the taxon_id and assembly_id based on the authority
+ if self.authority.value == ga.AuthorityType.NCBI.value:
+ taxon_id = url_metadata.get("taxonid")
+ assembly_id = url_metadata.get("assembly_accession")
+ elif self.authority.value == ga.AuthorityType.ENSEMBL.value:
+ taxon_id = self.scientific_name_to_taxonid.get(
+ url_metadata.get("scientific_name").replace("_", " ")
+ )
+ if assembly_accession is None:
+ self.logger.critical(
+ "The assembly ID is required for Ensembl URLs. Please provide the assembly ID."
+ )
+ raise ValueError(
+ "The assembly ID is required for Ensembl URLs. Please provide the assembly ID."
+ )
+ assembly_id = assembly_accession
+
+ # Assign assembly_version, assembly_label, genome_version, and genome_label
+ assembly_version = (
+ assembly_id.split(".")[1] if len(assembly_id.split(".")) >= 1 else None
+ )
+ assembly_label = url_metadata.get("assembly_name")
+ genome_version = url_metadata.get("release_version")
+ genome_label = self.authority.value + "-" + taxon_id + "-" + genome_version
+
+ ## STEP 2: Download the GFF file
+ # Download the GFF file
+ self.gff_file, hash_values = self.__download_gff_file()
+
+ ## STEP 3: Generate the organism taxon, genome assembly, checksums, and genome annotation objects
+ # Generate the organism taxon object
self.organism_taxon = self.generate_organism_taxon(taxon_id)
self.genome_assembly = self.generate_genome_assembly(
assembly_id, assembly_version, assembly_label, assembly_strain
)
- self.checksums = self.generate_digest(hash_functions)
+ self.checksums = self.generate_digest(hash_values, DEFAULT_HASH)
self.genome_annotation = self.generate_genome_annotation(
genome_label, genome_version
)
+
self.gene_annotations = {}
+ def parse_url(self):
+ """
+ Parses the content URL and extracts information about the genome annotation.
+
+ Returns:
+ A dictionary containing the following information:
+ - 'authority': The authority type (NCBI or ENSEMBL).
+ - 'taxonid': The taxon ID of the genome.
+ - 'release_version': The release version of the genome annotation.
+ - 'assembly_accession': The assembly accession of the genome.
+ - 'assembly_name': The name of the assembly.
+ - 'species': The species name (only for ENSEMBL URLs).
+ """
+ # Define regex patterns for NCBI and Ensembl URLs
+ # NCBI : [assembly accession.version]_[assembly name]_[content type].[optional format]
+ # ENSEMBL : ..<_version>.gff3.gz -> organism full name, assembly name, genome version
+ ncbi_pattern = r"/genomes/all/annotation_releases/(\d+)(?:/(\d+))?/(GCF_\d+\.\d+)[_-]([^/]+)/(GCF_\d+\.\d+)[_-]([^/]+)_genomic\.gff\.gz"
+ ensembl_pattern = (
+ r"/pub/release-(\d+)/gff3/([^/]+)/([^/.]+)\.([^/.]+)\.([^/.]+)\.gff3\.gz"
+ )
+
+ # Parse the URL to get the path
+ parsed_url = urlparse(self.content_url)
+ path = parsed_url.path
+
+ # Determine if the URL is from NCBI or Ensembl and extract information
+ if "ncbi" in parsed_url.netloc:
+ ncbi_match = re.search(ncbi_pattern, path)
+ if ncbi_match:
+ return {
+ "authority": ga.AuthorityType.NCBI,
+ "taxonid": ncbi_match.group(1),
+ "release_version": (
+ ncbi_match.group(2)
+ if ncbi_match.group(2)
+ else ncbi_match.group(4)
+ ),
+ "assembly_accession": ncbi_match.group(3),
+ "assembly_name": ncbi_match.group(6),
+ }
+
+ elif "ensembl" in parsed_url.netloc:
+ ensembl_match = re.search(ensembl_pattern, path)
+ if ensembl_match:
+ return {
+ "authority": ga.AuthorityType.ENSEMBL,
+ "release_version": ensembl_match.group(1),
+ "scientific_name": ensembl_match.group(3),
+ "assembly_name": ensembl_match.group(4),
+ }
+
+ # If no match is found, return None
+ return None
+
def __download_gff_file(self):
"""
- Downloads a GFF file from the specified content URL, decompresses it, and returns the path to the temporary file.
+ Downloads a GFF file from a given URL and calculates the MD5, SHA256, and SHA1 hashes.
Returns:
- str: The path to the temporary file containing the decompressed GFF data.
+ tuple: A tuple containing the path to the downloaded gzip file and a dictionary
+ with the MD5, SHA256, and SHA1 hashes of the file.
"""
- with urllib.request.urlopen(self.content_url) as response:
- gzip_data = response.read()
+ response = urllib.request.urlopen(self.content_url)
+ total_size = int(response.headers.get("content-length", 0))
+ block_size = 1024 # 1 Kilobyte
+
+ # Create hash objects
+ md5_hash = hashlib.md5()
+ sha256_hash = hashlib.sha256()
+ sha1_hash = hashlib.sha1()
# Create a temporary file for the gzip data
with tempfile.NamedTemporaryFile(suffix=".gz", delete=False) as f_gzip:
- f_gzip.write(gzip_data)
gzip_file_path = f_gzip.name
- return gzip_file_path
+
+ # Create a progress bar
+ progress_bar = tqdm(
+ total=total_size,
+ unit="iB",
+ unit_scale=True,
+ desc="Downloading GFF file",
+ )
+
+ # Read the file in chunks, write to the temporary file, and update the hash
+ while True:
+ data = response.read(block_size)
+ if not data:
+ break
+ f_gzip.write(data)
+ md5_hash.update(data)
+ sha256_hash.update(data)
+ sha1_hash.update(data)
+ progress_bar.update(len(data))
+
+ progress_bar.close()
+
+ # Return the path to the temporary file and the md5 hash
+ return gzip_file_path, {
+ "MD5": md5_hash.hexdigest(),
+ "SHA256": sha256_hash.hexdigest(),
+ "SHA1": sha1_hash.hexdigest(),
+ }
def generate_organism_taxon(self, taxon_id: str):
"""
@@ -158,11 +365,10 @@ def generate_organism_taxon(self, taxon_id: str):
Returns:
ga.OrganismTaxon: The generated organism taxon object.
"""
- self.logger.debug("Generating organism taxon")
return ga.OrganismTaxon(
id=TAXON_PREFIX + ":" + taxon_id,
- full_name=TAXON_SCIENTIFIC_NAME[taxon_id],
- name=TAXON_COMMON_NAME[taxon_id],
+ full_name=self.taxon_scientific_name[taxon_id],
+ name=self.taxon_common_name[taxon_id],
iri=PREFIX_MAP[TAXON_PREFIX] + taxon_id,
)
@@ -179,12 +385,11 @@ def assign_authority_type(self, authority: str):
Raises:
Exception: If the authority is not supported. Only NCBI and Ensembl authorities are supported.
"""
- self.logger.debug("Assigning authority type")
if authority.upper() == ga.AuthorityType.NCBI.value:
return ga.AuthorityType.NCBI
if authority.upper() == ga.AuthorityType.ENSEMBL.value:
return ga.AuthorityType.ENSEMBL
- logger.critical(
+ self.logger.critical(
"Authority %s is not supported. Please use NCBI or Ensembl.", authority
)
raise ValueError(
@@ -210,7 +415,6 @@ def generate_genome_assembly(
Returns:
ga.GenomeAssembly: The generated genome assembly object.
"""
- self.logger.debug("Generating genome assembly")
return ga.GenomeAssembly(
id=ASSEMBLY_PREFIX + ":" + assembly_id,
in_taxon=[self.organism_taxon.id],
@@ -231,7 +435,6 @@ def generate_genome_annotation(self, genome_label: str, genome_version: str):
Returns:
ga.GenomeAnnotation: The generated genome annotation.
"""
- self.logger.debug("Generating genome annotation")
return ga.GenomeAnnotation(
id=BICAN_ANNOTATION_PREFIX + genome_label.upper(),
digest=[checksum.id for checksum in self.checksums],
@@ -250,6 +453,7 @@ def generate_genome_annotation(self, genome_label: str, genome_version: str):
def generate_digest(
self,
+ hash_values: dict,
hash_functions: tuple[str] = DEFAULT_HASH,
) -> list[ga.Checksum]:
"""
@@ -265,11 +469,7 @@ def generate_digest(
ValueError: If an unsupported hash algorithm is provided.
"""
- gff_data = open(
- self.gff_file, "rb"
- ).read() # TODO: Modify this to read the file in chunks
checksums = []
-
for hash_type in hash_functions:
# Generate a UUID version 4
uuid_value = uuid.uuid4()
@@ -278,31 +478,32 @@ def generate_digest(
urn = f"urn:uuid:{uuid_value}"
hash_type = hash_type.strip().upper()
# Create a Checksum object
- if hash_type == "SHA256":
- digest = hashlib.sha256(gff_data).hexdigest()
+ if hash_type == ga.DigestType.SHA256.name:
checksums.append(
ga.Checksum(
id=urn,
checksum_algorithm=ga.DigestType.SHA256,
- value=digest,
+ value=hash_values.get("SHA256"),
)
)
- elif hash_type == "MD5":
- digest = hashlib.md5(gff_data).hexdigest()
+ elif hash_type == ga.DigestType.MD5.name:
checksums.append(
ga.Checksum(
- id=urn, checksum_algorithm=ga.DigestType.MD5, value=digest
+ id=urn,
+ checksum_algorithm=ga.DigestType.MD5,
+ value=hash_values.get("MD5"),
)
)
- elif hash_type == "SHA1":
- digest = hashlib.sha1(gff_data).hexdigest()
+ elif hash_type == ga.DigestType.SHA1.name:
checksums.append(
ga.Checksum(
- id=urn, checksum_algorithm=ga.DigestType.SHA1, value=digest
+ id=urn,
+ checksum_algorithm=ga.DigestType.SHA1,
+ value=hash_values.get("SHA1"),
)
)
else:
- logger.error(
+ self.logger.error(
"Hash algorithm %s is not supported. Please use SHA256, MD5, or SHA1.",
hash_type,
)
@@ -321,9 +522,9 @@ def __get_line_count(self, file_path):
result = subprocess.run(
["wc", "-l", file_path], stdout=subprocess.PIPE, check=True
- ) # If check is True and the exit code was non-zero, it raises a CalledProcessError.
- # The CalledProcessError object will have the return code in the returncode attribute,
- # and output & stderr attributes if those streams were captured.
+ ) # If check is True and the exit code was non-zero, it raises a CalledProcessError.
+ # The CalledProcessError object will have the return code in the returncode attribute,
+ # and output & stderr attributes if those streams were captured.
output = result.stdout.decode().strip()
line_count = int(output.split()[0]) # Extract the line count from the output
return line_count
@@ -362,9 +563,8 @@ def parse(self, feature_filter: tuple[str] = DEFAULT_FEATURE_FILTER):
for line_raw in file:
line_strip = line_raw.strip()
if curr_line_num == 1 and not line_strip.startswith("##gff-version 3"):
- logger.critical(
- 'Line %s: ##gff-version 3" missing from the first line.',
- curr_line_num,
+ self.logger.warning(
+ '"##gff-version 3" missing from the first line of the file. The given file may not be a valid GFF3 file.'
)
elif len(line_strip) == 0: # blank line
continue
@@ -375,7 +575,7 @@ def parse(self, feature_filter: tuple[str] = DEFAULT_FEATURE_FILTER):
else: # line may be a feature or unknown
tokens = list(map(str.strip, line_raw.split("\t")))
if len(tokens) != 9:
- logger.warning(
+ self.logger.warning(
"Line %s: Features are expected 9 columns, found %s.",
curr_line_num,
len(tokens),
@@ -387,19 +587,13 @@ def parse(self, feature_filter: tuple[str] = DEFAULT_FEATURE_FILTER):
tuple(a.split("=") for a in tokens[8].split(";"))
)
# TODO: Write cleaner code that calls respective generate function based on the authority automatically
- if (
- self.genome_annotation.authority
- == ga.AuthorityType.ENSEMBL
- ):
+ if self.genome_annotation.authority == ga.AuthorityType.ENSEMBL:
gene_annotation = self.generate_ensembl_gene_annotation(
attributes, curr_line_num
)
if gene_annotation is not None:
self.gene_annotations[gene_annotation] = gene_annotation
- elif (
- self.genome_annotation.authority
- == ga.AuthorityType.NCBI
- ):
+ elif self.genome_annotation.authority == ga.AuthorityType.NCBI:
gene_annotation = self.generate_ncbi_gene_annotation(
attributes, curr_line_num
)
@@ -483,14 +677,14 @@ def generate_ncbi_gene_annotation(self, attributes, curr_line_num):
if len(geneid_values) == 1:
stable_id = geneid_values.pop()
else:
- logger.error(
+ self.logger.error(
"Line %s: No GeneAnnotation object created for this row due to missing dbxref attribute.",
curr_line_num,
)
return None
if not stable_id:
- logger.error(
+ self.logger.error(
"Line %s: No GeneAnnotation object created for this row due to number of GeneIDs provided in dbxref attribute is not equal to one.",
curr_line_num,
)
@@ -513,7 +707,7 @@ def generate_ncbi_gene_annotation(self, attributes, curr_line_num):
)
synonyms.sort() # note: this is not required, but it makes the output more predictable therefore easier to test
else:
- logger.warning(
+ self.logger.debug(
"Line %s: synonym is not set for this row's GeneAnnotation object due to missing gene_synonym attribute.",
curr_line_num,
)
@@ -536,12 +730,12 @@ def generate_ncbi_gene_annotation(self, attributes, curr_line_num):
gene_annotation, curr_line_num
)
if name != self.gene_annotations[gene_annotation.id].name:
- logger.warning(
+ self.logger.debug(
"Line %s: GeneAnnotation object with id %s already exists with a different name. Current name: %s, Existing name: %s",
curr_line_num,
stable_id,
name,
- self.gene_annotations[gene_annotation.id].name
+ self.gene_annotations[gene_annotation.id].name,
)
return None
return gene_annotation
@@ -561,7 +755,7 @@ def __get_attribute(self, attributes, attribute_name, curr_line_num):
value = None
if attribute_name in attributes:
if len(attributes[attribute_name]) != 1:
- logger.warning(
+ self.logger.debug(
"Line %s: %s not set for this row's GeneAnnotation object due to more than one %s provided.",
curr_line_num,
attribute_name,
@@ -576,7 +770,7 @@ def __get_attribute(self, attributes, attribute_name, curr_line_num):
else:
value = attributes[attribute_name].pop()
if value.find(",") != -1:
- logger.warning(
+ self.logger.debug(
'Line %s: %s not set for this row\'s GeneAnnotation object due to value of %s attribute containing ",".',
curr_line_num,
attribute_name,
@@ -584,7 +778,7 @@ def __get_attribute(self, attributes, attribute_name, curr_line_num):
)
value = None
else:
- logger.warning(
+ self.logger.debug(
"Line %s: %s not set for this row's GeneAnnotation object due to missing %s attribute.",
curr_line_num,
attribute_name,
@@ -610,19 +804,14 @@ def __resolve_ncbi_gene_annotation(self, new_gene_annotation, curr_line_num):
"""
existing_gene_annotation = self.gene_annotations[new_gene_annotation.id]
- if (
- existing_gene_annotation.description is None
- and new_gene_annotation.description is not None
- ):
- return new_gene_annotation
if (
existing_gene_annotation.description is not None
and new_gene_annotation.description is None
):
return None
if (
- existing_gene_annotation.molecular_type is None
- and new_gene_annotation.molecular_type is not None
+ existing_gene_annotation.description is None
+ and new_gene_annotation.description is not None
):
return new_gene_annotation
if (
@@ -631,17 +820,17 @@ def __resolve_ncbi_gene_annotation(self, new_gene_annotation, curr_line_num):
):
return None
if (
- existing_gene_annotation.molecular_type == ga.BioType.noncoding.value
- and new_gene_annotation.molecular_type != ga.BioType.noncoding.value
+ existing_gene_annotation.molecular_type is None
+ and new_gene_annotation.molecular_type is not None
):
return new_gene_annotation
- if (
- existing_gene_annotation.molecular_type != ga.BioType.noncoding.value
- and new_gene_annotation.molecular_type == ga.BioType.noncoding.value
- ):
+ if existing_gene_annotation.molecular_type == ga.BioType.protein_coding.value:
return None
- logger.critical(
- "Line %s: Unable to resolve duplicates for GeneID: %s.\nexisting gene: %s\nnew gene: %s",
+ if new_gene_annotation.molecular_type == ga.BioType.protein_coding.value:
+ return new_gene_annotation
+
+ self.logger.error(
+ "Line %s: Unable to resolve duplicates for GeneID: %s.\nexisting gene: %s\nnew gene: %s",
curr_line_num,
new_gene_annotation.id,
existing_gene_annotation,
@@ -660,7 +849,6 @@ def __merge_values(self, t):
dict: A dictionary where each key maps to a set of values.
"""
- self.logger.debug("Merging values")
result = defaultdict(set)
for lst in t:
key = lst[0].strip()
@@ -670,45 +858,85 @@ def __merge_values(self, t):
return result
def serialize_to_jsonld(
- self, output_file: str, exclude_none: bool = True, exclude_unset: bool = False
+ self, exclude_none: bool = True, exclude_unset: bool = False
):
"""
- Serialize the object and write it to the specified output file.
+ Serialize the object and either write it to the specified output file or print it to the CLI.
Parameters:
- output_file (str): The path of the output file.
+ exclude_none (bool): Whether to exclude None values in the output.
+ exclude_unset (bool): Whether to exclude unset values in the output.
Returns:
None
"""
- logger.debug("Serializing to JSON-LD")
- with open(output_file, "w", encoding="utf-8") as f:
- data = [
- self.organism_taxon.dict(
- exclude_none=exclude_none, exclude_unset=exclude_unset
- ),
- self.genome_assembly.dict(
- exclude_none=exclude_none, exclude_unset=exclude_unset
- ),
- self.genome_annotation.dict(
- exclude_none=exclude_none, exclude_unset=exclude_unset
- ),
- ]
- for ck in self.checksums:
- data.append(
- ck.dict(exclude_none=exclude_none, exclude_unset=exclude_unset)
- )
- for ga in self.gene_annotations.values():
- data.append(
- ga.dict(exclude_none=exclude_none, exclude_unset=exclude_unset)
- )
- output_data = {
- "@context": "https://raw.githubusercontent.com/brain-bican/models/main/jsonld-context-autogen/genome_annotation.context.jsonld",
- "@graph": data,
- }
- f.write(json.dumps(output_data, indent=2))
+ data = [
+ self.organism_taxon.dict(
+ exclude_none=exclude_none, exclude_unset=exclude_unset
+ ),
+ self.genome_assembly.dict(
+ exclude_none=exclude_none, exclude_unset=exclude_unset
+ ),
+ self.genome_annotation.dict(
+ exclude_none=exclude_none, exclude_unset=exclude_unset
+ ),
+ ]
+ for ck in self.checksums:
+ data.append(ck.dict(exclude_none=exclude_none, exclude_unset=exclude_unset))
+ for ga in self.gene_annotations.values():
+ data.append(ga.dict(exclude_none=exclude_none, exclude_unset=exclude_unset))
+
+ output_data = {
+ "@context": "https://raw.githubusercontent.com/brain-bican/models/main/jsonld-context-autogen/genome_annotation.context.jsonld",
+ "@graph": data,
+ }
+
+ print(json.dumps(output_data, indent=2))
+
+
+@click.command()
+##ARGUEMENTS##
+# Argument #1: The URL of the GFF file
+@click.argument("content_url", type=str)
+
+##OPTIONS##
+# Option #1: The ID of the genome assembly
+@click.option("assembly_accession", "-a", required=False, default=None, type=str)
+# Option #2: The strain of the genome assembly
+@click.option(
+ "--assembly_strain",
+ "-s",
+ required=False,
+ default=None,
+ type=str,
+ help="The strain of the genome assembly. Defaults to None.",
+)
+# Option #3: The log level
+@click.option(
+ "--log_level",
+ "-l",
+ required=False,
+ default="WARNING",
+ help="The log level. Defaults to WARNING.",
+)
+# Option #4: Log to file
+@click.option(
+ "--log_to_file",
+ "-f",
+ is_flag=True,
+ help="Log to a file instead of the console.",
+)
+def gff2jsonld(content_url, assembly_accession, assembly_strain, log_level, log_to_file):
+ '''
+ Creates GeneAnnotation objects from a GFF3 file and serializes them to JSON-LD format.
+ '''
+ gff3 = Gff3(
+ content_url, assembly_accession, assembly_strain, log_level, log_to_file
+ )
+ gff3.parse()
+ gff3.serialize_to_jsonld()
if __name__ == "__main__":
- pass
\ No newline at end of file
+ gff2jsonld()
diff --git a/bkbit/model_editors/add_dunderMethods_genomeAnnotation.py b/bkbit/model_editors/add_dunderMethods_genomeAnnotation.py
index 31cb1cf..507e988 100644
--- a/bkbit/model_editors/add_dunderMethods_genomeAnnotation.py
+++ b/bkbit/model_editors/add_dunderMethods_genomeAnnotation.py
@@ -1,23 +1,29 @@
import re
# Read the file
-file_path = "../models/genome_annotation.py"
+file_path = "bkbit/models/genome_annotation.py"
with open(file_path, "r") as file:
content = file.read()
+# Define the patterns to check if the functions already exist
+hash_pattern = r"def __hash__\(self\):"
+
# Find the GeneAnnotation class
-pattern = r"class GeneAnnotation\(Gene\):\s+\"\"\"\n An annotation describing the location, boundaries, and functions of individual genes within a genome annotation.\n \"\"\""
-match = re.search(pattern, content)
+class_pattern = r"class GeneAnnotation\(Gene\):\s+\"\"\"\n An annotation describing the location, boundaries, and functions of individual genes within a genome annotation.\n \"\"\""
+class_match = re.search(class_pattern, content)
+
+if class_match:
+ class_start = class_match.end()
-if match:
- # Add the function to the class
- updated_content = content.replace(match.group(), match.group() + "\n def __ne__(self, other):\n return (self.description != other.description) or (self.molecular_type != other.molecular_type)\n ")
- updated_content = updated_content.replace(match.group(), match.group() + "\n def __eq__(self, other):\n return (self.description == other.description) and (self.molecular_type == other.molecular_type)\n ")
- updated_content = updated_content.replace(match.group(), match.group() + "\n\n def __hash__(self):\n return hash(tuple([self.id, self.name, self.molecular_type, self.description]))\n ")
+ # Check if the functions already exist
+ has_hash = re.search(hash_pattern, content[class_start:])
+
+ # Add the functions only if they do not exist
+ if not has_hash:
+ content = content.replace(class_match.group(), class_match.group() + "\n\n def __hash__(self):\n return hash(tuple([self.id, self.name, self.molecular_type, self.description]))\n ")
# Write the updated content back to the file
with open(file_path, "w") as file:
- file.write(updated_content)
+ file.write(content)
else:
print("GeneAnnotation class not found in the file.")
-
diff --git a/bkbit/models/genome_annotation.py b/bkbit/models/genome_annotation.py
index a10acbc..d83ba6b 100644
--- a/bkbit/models/genome_annotation.py
+++ b/bkbit/models/genome_annotation.py
@@ -412,6 +412,10 @@ class GeneAnnotation(Gene):
"""
An annotation describing the location, boundaries, and functions of individual genes within a genome annotation.
"""
+
+ def __hash__(self):
+ return hash(tuple([self.id, self.name, self.molecular_type, self.description]))
+
molecular_type: Optional[Union[BioType, str]] = Field(None)
source_id: Optional[str] = Field(None, description="""The authority specific identifier.""")
referenced_in: Union[GenomeAnnotation, str] = Field(..., description="""The genome annotation that this gene annotation was referenced from.""")
diff --git a/bkbit/utils/get_ncbi_taxonomy.py b/bkbit/utils/get_ncbi_taxonomy.py
new file mode 100644
index 0000000..2b57023
--- /dev/null
+++ b/bkbit/utils/get_ncbi_taxonomy.py
@@ -0,0 +1,187 @@
+"""
+This script downloads a zip file containing taxonomic data from a given URL, extracts and processes
+the content of the 'names.dmp' file in memory, and saves the parsed data into JSON files. The script
+includes three main functions:
+
+1. download_and_extract_zip_in_memory(url):
+ Downloads a zip file from the given URL and extracts the content of the 'names.dmp' file in memory.
+
+2. parse_dmp_content(dmp_content):
+ Parses the content of a DMP file and extracts taxonomic information into dictionaries.
+
+3. process_and_save_taxdmp_in_memory(url, output_dir):
+ Downloads and processes the taxdump file from the given URL, and saves the parsed data into
+ separate JSON files in the specified output directory.
+
+Usage:
+ The script can be executed as a standalone program. Modify the URL and output directory as needed.
+"""
+
+import json
+import zipfile
+import io
+import os
+import requests
+import pkg_resources
+import click
+
+NCBI_TAXON_URL = "https://ftp.ncbi.nih.gov/pub/taxonomy/taxdmp.zip"
+OUTPUT_DIR_NAME = "ncbi_taxonomy"
+OUTPUT_DIR_PATH = pkg_resources.resource_filename(__name__, OUTPUT_DIR_NAME)
+SCIENTIFIC_NAME_TO_TAXONID_PATH = pkg_resources.resource_filename(__name__, "ncbi_taxonomy/scientific_name_to_taxid.json")
+TAXON_SCIENTIFIC_NAME_PATH = pkg_resources.resource_filename(__name__, "ncbi_taxonomy/taxid_to_scientific_name.json")
+TAXON_COMMON_NAME_PATH = pkg_resources.resource_filename(__name__, "ncbi_taxonomy/taxid_to_common_name.json")
+
+
+
+def download_and_extract_zip_in_memory(url):
+ """
+ Downloads a zip file from the given URL and extracts the content of the 'names.dmp' file in memory.
+
+ Args:
+ url (str): The URL of the zip file to download.
+
+ Returns:
+ str: The content of the 'names.dmp' file as a string.
+
+ Raises:
+ requests.exceptions.HTTPError: If the file download fails with a non-200 status code.
+ """
+ # Download the file
+ response = requests.get(url, timeout=30)
+ if response.status_code == 200:
+ # Unzip the file in memory
+ with zipfile.ZipFile(io.BytesIO(response.content)) as z:
+ # Extract names.dmp file content into memory
+ with z.open("names.dmp") as names_dmp_file:
+ names_dmp_content = names_dmp_file.read().decode("utf-8")
+ return names_dmp_content
+ else:
+ raise requests.exceptions.HTTPError(
+ f"Failed to download file, status code: {response.status_code}"
+ )
+
+
+def parse_dmp_content(dmp_content):
+ """
+ Parses the content of a DMP file and extracts taxonomic information.
+
+ Args:
+ dmp_content (str): The content of the DMP file.
+
+ Returns:
+ tuple: A tuple containing three dictionaries:
+ - taxid_to_scientific_name: A dictionary mapping taxonomic IDs to scientific names.
+ - taxid_to_common_name: A dictionary mapping taxonomic IDs to common names.
+ - scientific_name_to_taxid: A dictionary mapping scientific names to taxonomic IDs.
+ """
+ taxid_to_scientific_name = {}
+ taxid_to_common_name = {}
+ scientific_name_to_taxid = {}
+
+ for line in dmp_content.strip().split("\n"):
+ # Split the line by the delimiter '|'
+ parts = line.strip().split("|")
+
+ # Remove leading and trailing whitespace from each part
+ parts = [part.strip() for part in parts]
+ # Taxonomy names file (names.dmp):
+ # tax_id-- the id of node associated with this name
+ # name_txt-- name itself
+ # unique name-- the unique variant of this name if name not unique
+ # name class-- (synonym, common name, ...)
+ taxid = parts[0]
+ name = parts[1]
+ unique_name = parts[2]
+ name_class = parts[3]
+
+ # Create a dictionary with the parsed data
+ if name_class == "scientific name" and taxid not in taxid_to_scientific_name:
+ if unique_name:
+ taxid_to_scientific_name[taxid] = unique_name
+ scientific_name_to_taxid[unique_name] = taxid
+ else:
+ taxid_to_scientific_name[taxid] = name
+ scientific_name_to_taxid[name] = taxid
+ elif name_class == "genbank common name" and taxid not in taxid_to_common_name:
+ taxid_to_common_name[taxid] = name
+ return taxid_to_scientific_name, taxid_to_common_name, scientific_name_to_taxid
+
+
+def process_and_save_taxdmp_in_memory(url, output_dir):
+ """
+ Downloads and processes the taxdump file from the given URL,
+ and saves the parsed data into separate JSON files in the specified output directory.
+
+ Args:
+ url (str): The URL of the taxdump file to download and process.
+ output_dir (str): The directory where the parsed data will be saved.
+
+ Returns:
+ None
+ """
+ # Ensure the output directory exists
+ if not os.path.exists(output_dir):
+ os.makedirs(output_dir)
+
+ # Step 1: Download and unzip the folder in memory
+ names_dmp_content = download_and_extract_zip_in_memory(url)
+
+ # Step 2: Parse the names.dmp content
+ taxid_to_scientific_name, taxid_to_common_name, scientific_name_to_taxid = (
+ parse_dmp_content(names_dmp_content)
+ )
+
+ # Step 3: Save the dictionaries to files
+ with open(
+ os.path.join(output_dir, "taxid_to_common_name.json"), "w", encoding="utf-8"
+ ) as f:
+ json.dump(taxid_to_common_name, f, indent=4)
+
+ with open(
+ os.path.join(output_dir, "taxid_to_scientific_name.json"), "w", encoding="utf-8"
+ ) as f:
+ json.dump(taxid_to_scientific_name, f, indent=4)
+
+ with open(
+ os.path.join(output_dir, "scientific_name_to_taxid.json"), "w", encoding="utf-8"
+ ) as f:
+ json.dump(scientific_name_to_taxid, f, indent=4)
+
+
+
+def load_json(file_path):
+ """
+ Load JSON data from a file.
+
+ Args:
+ file_path (str): The path to the JSON file.
+
+ Returns:
+ dict: The loaded JSON data.
+
+ """
+ with open(file_path, "r", encoding="utf-8") as f:
+ return json.load(f)
+
+@click.command()
+@click.option("--reload", '-r', is_flag=True, help="Reload NCBI taxonomy data")
+
+def download_ncbi_taxonomy(reload=False):
+
+ """
+ Load JSON data from a file.
+
+ Args:
+ file_path (str): The path to the JSON file.
+
+ Returns:
+ dict: The loaded JSON data as a dictionary.
+ """
+ if reload or not os.path.exists(SCIENTIFIC_NAME_TO_TAXONID_PATH) or not os.path.exists(TAXON_SCIENTIFIC_NAME_PATH) or not os.path.exists(TAXON_COMMON_NAME_PATH):
+ process_and_save_taxdmp_in_memory(NCBI_TAXON_URL, OUTPUT_DIR_PATH)
+ else:
+ print("PRINT already downloaded")
+
+if __name__ == "__main__":
+ download_ncbi_taxonomy()
diff --git a/bkbit/utils/load_json.py b/bkbit/utils/load_json.py
new file mode 100644
index 0000000..d16b6bf
--- /dev/null
+++ b/bkbit/utils/load_json.py
@@ -0,0 +1,19 @@
+import json
+
+def load_json(file_path):
+ """
+ Load a JSON file from the given file path.
+
+ Args:
+ file_path (str): The path to the JSON file.
+
+ Returns:
+ dict: The contents of the JSON file as a dictionary.
+
+ Raises:
+ FileNotFoundError: If the file does not exist.
+ JSONDecodeError: If the file is not a valid JSON.
+
+ """
+ with open(file_path, "r", encoding="utf-8") as f:
+ return json.load(f)
\ No newline at end of file
diff --git a/bkbit/utils/setup_logger.py b/bkbit/utils/setup_logger.py
new file mode 100644
index 0000000..9d4f772
--- /dev/null
+++ b/bkbit/utils/setup_logger.py
@@ -0,0 +1,80 @@
+"""
+Logger Setup Module
+
+This module provides a utility function to configure and set up logging for an application.
+The `setup_logger` function allows for customizable logging levels and output destinations,
+either to a file or to the console.
+
+Available log levels:
+- DEBUG
+- INFO
+- WARNING
+- ERROR
+- CRITICAL
+
+Example usage:
+ from setup_logger import setup_logger
+ import logging
+
+ # Set up the logger to log to a file with INFO level
+ logger = setup_logger(log_level="INFO", log_to_file=True)
+
+ # Log some messages
+ logger.info("This is an info message")
+ logger.error("This is an error message")
+
+Functions:
+ setup_logger(log_level="WARNING", log_to_file=False):
+ Configures and returns a logger with the specified log level and output destination.
+
+Attributes:
+ LOG_LEVELS (dict): A dictionary mapping log level names to their corresponding logging constants.
+"""
+
+import logging
+
+LOG_LEVELS = {
+ "DEBUG": logging.DEBUG,
+ "INFO": logging.INFO,
+ "WARNING": logging.WARNING,
+ "ERROR": logging.ERROR,
+ "CRITICAL": logging.CRITICAL,
+}
+
+
+def setup_logger(
+ file_name,
+ log_level="WARNING",
+ log_to_file=False,
+):
+ """
+ Set up a logger with the specified log level and log destination.
+
+ Args:
+ log_level (str, optional): The desired log level. Defaults to "WARNING".
+ log_to_file (bool, optional): Whether to log to a file. Defaults to False.
+
+ Returns:
+ logger: The configured logger object.
+
+ Raises:
+ ValueError: If an invalid log level is provided.
+ """
+ if log_level.upper() not in LOG_LEVELS:
+ raise ValueError(f"Invalid log level: {log_level}")
+ if log_to_file:
+ logging.basicConfig(
+ filename=file_name,
+ format="%(levelname)s: %(message)s (%(asctime)s)",
+ datefmt="%m/%d/%Y %I:%M:%S %p",
+ level=LOG_LEVELS[log_level.upper()],
+ )
+ else:
+ logging.basicConfig(
+ format="%(levelname)s: %(message)s (%(asctime)s)",
+ datefmt="%m/%d/%Y %I:%M:%S %p",
+ level=LOG_LEVELS[log_level.upper()],
+ )
+
+ logger = logging.getLogger(__name__)
+ return logger
diff --git a/pyproject.toml b/pyproject.toml
index e8d96ef..db7caa2 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -6,9 +6,9 @@ build-backend = 'setuptools.build_meta'
[project]
name = "bkbit"
authors = [
- { name="bkbit developers"},
+ { name="brain-bican-bkbit developers"},
]
-description = "A library for atlaskb data models"
+description = "A library for using brain-bican data models"
readme = "README.md"
requires-python = ">=3.7"
classifiers = [
@@ -36,7 +36,7 @@ docs = [
]
[project.urls]
-"Homepage" = "https://github.com/atlaskb/bkbit"
+"Homepage" = "https://github.com/brain-bican/bkbit"
[tool.setuptools_scm]
write_to = "bkbit/_version.py"
\ No newline at end of file