Skip to content

Commit

Permalink
Merge pull request #376 from ZJUEarthData/dev/Crt
Browse files Browse the repository at this point in the history
feat:Add meanshift algorithm and special function to clustering mode
  • Loading branch information
SanyHe authored Aug 26, 2024
2 parents b851a2e + 9f5f2b5 commit 5a78ac9
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 13 deletions.
2 changes: 1 addition & 1 deletion geochemistrypi/data_mining/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
# "Decision Tree",
# Histogram-based Gradient Boosting,
]
CLUSTERING_MODELS = ["KMeans", "DBSCAN", "Agglomerative", "AffinityPropagation"]
CLUSTERING_MODELS = ["KMeans", "DBSCAN", "Agglomerative", "AffinityPropagation", "MeanShift"]
DECOMPOSITION_MODELS = ["PCA", "T-SNE", "MDS"]
ANOMALYDETECTION_MODELS = ["Isolation Forest", "Local Outlier Factor"]

Expand Down
16 changes: 16 additions & 0 deletions geochemistrypi/data_mining/model/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,19 @@ def _plot_3d_surface_diagram(feature_data: pd.DataFrame, target_data: pd.DataFra
save_fig(f"3D Surface Diagram - {algorithm_name}", local_path, mlflow_path)
data = pd.concat([feature_data, target_data, y_test_predict], axis=1)
save_data(data, f"3D Surface Diagram - {algorithm_name}", local_path, mlflow_path)


class ClusteringMetricsMixin:
"""Mixin class for clustering metrics."""

@staticmethod
def _get_num_clusters(func_name: str, algorithm_name: str, trained_model: object, store_path: str) -> None:
"""Get and log the number of clusters."""
labels = trained_model.labels_
num_clusters = len(np.unique(labels))
print(f"-----* {func_name} *-----")
print(f"{func_name}: {num_clusters}")
num_clusters_dict = {f"{func_name}": num_clusters}
mlflow.log_metrics(num_clusters_dict)
num_clusters_str = json.dumps(num_clusters_dict, indent=4)
save_text(num_clusters_str, f"{func_name} - {algorithm_name}", store_path)
145 changes: 134 additions & 11 deletions geochemistrypi/data_mining/model/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@
import pandas as pd
from numpy.typing import ArrayLike
from rich import print
from sklearn.cluster import DBSCAN, AffinityPropagation, AgglomerativeClustering, KMeans
from sklearn.cluster import DBSCAN, AffinityPropagation, AgglomerativeClustering, KMeans, MeanShift

from ..constants import MLFLOW_ARTIFACT_DATA_PATH, MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH
from ..utils.base import clear_output, save_data, save_fig, save_text
from ._base import WorkflowBase
from ._base import ClusteringMetricsMixin, WorkflowBase
from .func.algo_clustering._affinitypropagation import affinitypropagation_manual_hyper_parameters
from .func.algo_clustering._agglomerative import agglomerative_manual_hyper_parameters
from .func.algo_clustering._common import plot_silhouette_diagram, plot_silhouette_value_diagram, scatter2d, scatter3d, score
from .func.algo_clustering._dbscan import dbscan_manual_hyper_parameters
from .func.algo_clustering._enum import ClusteringCommonFunction, KMeansSpecialFunction
from .func.algo_clustering._enum import ClusteringCommonFunction, KMeansSpecialFunction, MeanShiftSpecialFunction
from .func.algo_clustering._kmeans import kmeans_manual_hyper_parameters
from .func.algo_clustering._meanshift import meanshift_manual_hyper_parameters


class ClusteringWorkflowBase(WorkflowBase):
Expand Down Expand Up @@ -273,7 +274,7 @@ def __init__(
might change in the future for a better heuristic.
References
----------------------------------------
----------
Scikit-learn API: sklearn.cluster.KMeans
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html
"""
Expand Down Expand Up @@ -335,11 +336,11 @@ def special_components(self, **kwargs: Union[Dict, np.ndarray, int]) -> None:
)


class DBSCANClustering(ClusteringWorkflowBase):
class DBSCANClustering(ClusteringMetricsMixin, ClusteringWorkflowBase):
"""The automation workflow of using DBSCAN algorithm to make insightful products."""

name = "DBSCAN"
special_function = ["Virtualization of Result in 2D Graph"]
special_function = ["Num of Clusters"]

def __init__(
self,
Expand Down Expand Up @@ -388,7 +389,7 @@ def __init__(
The number of parallel jobs to run. None means 1 unless in a joblib.parallel_backend context. -1 means using all processors. See Glossary for more details.
References
----------------------------------------
----------
Scikit-learn API: sklearn.cluster.DBSCAN
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html
"""
Expand Down Expand Up @@ -425,7 +426,14 @@ def manual_hyper_parameters(cls) -> Dict:
return hyper_parameters

def special_components(self, **kwargs: Union[Dict, np.ndarray, int]) -> None:
"""Invoke all special application functions for this algorithms by Scikit-learn framework."""
"""Invoke all special application functions for this algorithm by Scikit-learn framework."""
GEOPI_OUTPUT_METRICS_PATH = os.getenv("GEOPI_OUTPUT_METRICS_PATH")
self._get_num_clusters(
func_name=MeanShiftSpecialFunction.NUM_CLUSTERS.value,
algorithm_name=self.naming,
trained_model=self.model,
store_path=GEOPI_OUTPUT_METRICS_PATH,
)


class Agglomerative(ClusteringWorkflowBase):
Expand Down Expand Up @@ -616,7 +624,7 @@ def __init__(
this parameter was previously hardcoded as 0.
References
----------------------------------------
----------
Scikit-learn API: sklearn.cluster.AffinityPropagation
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.AffinityPropagation
"""
Expand Down Expand Up @@ -658,9 +666,124 @@ def special_components(self, **kwargs: Union[Dict, np.ndarray, int]) -> None:
"""Invoke all special application functions for this algorithms by Scikit-learn framework."""


class MeanShiftClustering(ClusteringWorkflowBase):
class MeanShiftClustering(ClusteringMetricsMixin, ClusteringWorkflowBase):
name = "MeanShift"
pass

special_function = ["Num of Clusters"]

def __init__(
self,
*,
bandwidth: Optional[float] = None,
seeds: Optional[Union[np.ndarray, list]] = None,
bin_seeding: bool = False,
min_bin_freq: int = 1,
cluster_all: bool = True,
n_jobs: Optional[int] = None,
max_iter: int = 300,
) -> None:
"""
Parameters
----------
bandwidth : float, default=None
Bandwidth used in the flat kernel.
If not given, the bandwidth is estimated using
sklearn.cluster.estimate_bandwidth; see the documentation for that
function for hints on scalability (see also the Notes, below).
seeds : array-like of shape (n_samples, n_features), default=None
Seeds used to initialize kernels. If not set,
the seeds are calculated by clustering.get_bin_seeds
with bandwidth as the grid size and default values for
other parameters.
bin_seeding : bool, default=False
If true, initial kernel locations are not locations of all
points, but rather the location of the discretized version of
points, where points are binned onto a grid whose coarseness
corresponds to the bandwidth. Setting this option to True will speed
up the algorithm because fewer seeds will be initialized.
The default value is False.
Ignored if seeds argument is not None.
min_bin_freq : int, default=1
To speed up the algorithm, accept only those bins with at least
min_bin_freq points as seeds.
cluster_all : bool, default=True
If true, then all points are clustered, even those orphans that are
not within any kernel. Orphans are assigned to the nearest kernel.
If false, then orphans are given cluster label -1.
n_jobs : int, default=None
The number of jobs to use for the computation. The following tasks benefit
from the parallelization:
- The search of nearest neighbors for bandwidth estimation and label
assignments. See the details in the docstring of the
``NearestNeighbors`` class.
- Hill-climbing optimization for all seeds.
See :term:`Glossary <n_jobs>` for more details.
``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors. See :term:`Glossary <n_jobs>`
for more details.
max_iter : int, default=300
Maximum number of iterations, per seed point before the clustering
operation terminates (for that seed point), if has not converged yet.
.. versionadded:: 0.22
References
----------
Scikit-learn API: sklearn.cluster.MeanShift
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.MeanShift
"""
super().__init__()
self.bandwidth = bandwidth
self.seeds = seeds
self.bin_seeding = bin_seeding
self.min_bin_freq = min_bin_freq
self.cluster_all = cluster_all
self.n_jobs = n_jobs
self.max_iter = max_iter

self.model = MeanShift(
bandwidth=self.bandwidth, seeds=self.seeds, bin_seeding=self.bin_seeding, min_bin_freq=self.min_bin_freq, cluster_all=self.cluster_all, n_jobs=self.n_jobs, max_iter=self.max_iter
)
self.naming = MeanShiftClustering.name

@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
print(f"-*-*- {cls.name} - Hyper-parameters Specification -*-*-")
hyper_parameters = meanshift_manual_hyper_parameters()
clear_output()
return hyper_parameters

def special_components(self, **kwargs: Union[Dict, np.ndarray, int]) -> None:
"""Invoke all special application functions for this algorithm by Scikit-learn framework."""
GEOPI_OUTPUT_METRICS_PATH = os.getenv("GEOPI_OUTPUT_METRICS_PATH")
self._get_num_clusters(
func_name=MeanShiftSpecialFunction.NUM_CLUSTERS.value,
algorithm_name=self.naming,
trained_model=self.model,
store_path=GEOPI_OUTPUT_METRICS_PATH,
)

@staticmethod
def _get_num_clusters(func_name: str, algorithm_name: str, trained_model: object, store_path: str) -> None:
"""Get and log the number of clusters."""
labels = trained_model.labels_
num_clusters = len(np.unique(labels))
print(f"-----* {func_name} *-----")
print(f"{func_name}: {num_clusters}")
num_clusters_dict = {f"{func_name}": num_clusters}
mlflow.log_metrics(num_clusters_dict)
num_clusters_str = json.dumps(num_clusters_dict, indent=4)
save_text(num_clusters_str, f"{func_name} - {algorithm_name}", store_path)


class SpectralClustering(ClusteringWorkflowBase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ class ClusteringCommonFunction(Enum):

class KMeansSpecialFunction(Enum):
INERTIA_SCORE = "Inertia Score"


class MeanShiftSpecialFunction(Enum):
NUM_CLUSTERS = "Num of Clusters"
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import Dict

from rich import print

from ....constants import SECTION
from ....data.data_readiness import num_input, str_input


def meanshift_manual_hyper_parameters() -> Dict:
"""Manually set hyperparameters for MeanShift algorithm.
Returns
-------
hyper_parameters : dict
Dictionary containing the manually set hyperparameters.
"""
print("Bandwidth: The bandwidth of the kernel used in the algorithm. This parameter can greatly influence the results.")
print("If you do not have a specific value in mind, you can leave this as 0, and the algorithm will estimate it automatically.")
print("A good starting point could be around 0.5 to 1.5, depending on your data's scale.")
bandwidth_input = num_input(SECTION[2], "Enter Bandwidth (or 0 for automatic estimation): ")
bandwidth = None if bandwidth_input == 0 else bandwidth_input

print("Cluster All: By default, only points at least as close to a cluster center as the given bandwidth are assigned to that cluster.")
print("Setting this to False will prevent points from being assigned to any cluster if they are too far away. Leave it True if you want all data points to be part of some cluster.")
print("For most use cases, 'True' is recommended to ensure all points are clustered.")
cluster_all = str_input(["True", "False"], SECTION[2])

print("Bin Seeding: If true, initial kernel locations are binned points, speeding up the algorithm with fewer seeds. Default is False.")
print("Setting this to True can be useful for large datasets to speed up computation. Consider using True if your dataset is large.")
bin_seeding = str_input(["True", "False"], SECTION[2])

print("Min Bin Frequency: To speed up the algorithm, accept only those bins with at least min_bin_freq points as seeds.")
print("A typical value is 1, but you might increase this for very large datasets to reduce the number of seeds.")
min_bin_freq = num_input(SECTION[2], "Enter Min Bin Frequency (default is 1): ")

print("Number of Jobs: The number of jobs to use for the computation. 1 means using all processors.")
print("If you are unsure, use 1 to utilize all available processors.")
n_jobs = num_input(SECTION[2], "Enter Number of Jobs (or None): ")
n_jobs = -1 if n_jobs == 1 else int(n_jobs)

print("Max Iterations: Maximum number of iterations, per seed point before the clustering operation terminates (for that seed point), if has not converged yet.")
print("The default value is 300, which is sufficient for most use cases. You might increase this for very complex data.")
max_iter = num_input(SECTION[2], "Enter Max Iterations (default is 300): ")

hyper_parameters = {
"bandwidth": bandwidth,
"cluster_all": cluster_all == "True",
"bin_seeding": bin_seeding == "True",
"min_bin_freq": min_bin_freq,
"n_jobs": n_jobs if n_jobs != "None" else None,
"max_iter": max_iter,
}
return hyper_parameters
12 changes: 11 additions & 1 deletion geochemistrypi/data_mining/process/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pandas as pd

from ..model.clustering import AffinityPropagationClustering, Agglomerative, ClusteringWorkflowBase, DBSCANClustering, KMeansClustering
from ..model.clustering import AffinityPropagationClustering, Agglomerative, ClusteringWorkflowBase, DBSCANClustering, KMeansClustering, MeanShiftClustering
from ._base import ModelSelectionBase


Expand Down Expand Up @@ -62,6 +62,16 @@ def activate(
convergence_iter=hyper_parameters["convergence_iter"],
affinity=hyper_parameters["affinity"],
)
elif self.model_name == "MeanShift":
hyper_parameters = MeanShiftClustering.manual_hyper_parameters()
self.clt_workflow = MeanShiftClustering(
bandwidth=hyper_parameters["bandwidth"],
cluster_all=hyper_parameters["cluster_all"],
bin_seeding=hyper_parameters["bin_seeding"],
min_bin_freq=hyper_parameters["min_bin_freq"],
n_jobs=hyper_parameters["n_jobs"],
max_iter=hyper_parameters["max_iter"],
)
elif self.model_name == "":
pass

Expand Down

0 comments on commit 5a78ac9

Please sign in to comment.