Wrappers for River Cluster Analyzers

../../_images/MLPro-Int-River-Cluster_Analyzers.drawio.png

Ver. 1.7.1 (2025-07-21)

This module provides wrapper root classes from River to MLPro, specifically for cluster analyzers.

Learn more: https://www.riverml.xyz/

class mlpro_int_river.wrappers.clusteranalyzers.basics.WrClusterAnalyzerRiver2MLPro(p_cls_cluster: type, p_river_algo: Clusterer, p_name: str = None, p_range_max=1, p_ada: bool = True, p_visualize: bool = True, p_logging=True, **p_kwargs)

Bases: WrapperRiver, ClusterAnalyzer

This is the base wrapper class for each River-based cluster analyzer to MLPro.

Parameters:
  • p_cls_cluster – Cluster class (Class Cluster or a child class).

  • p_river_algo (river.base.Clusterer) – Instantiated river-based clusterer.

  • p_name (str) – Name of the clusterer. Default: None.

  • p_range_max – MLPro machine learning task, either process or thread. Default: MLTask.C_RANGE_THREAD.

  • p_ada (bool) – Turn on adaptivity. Default: True.

  • p_visualize (bool) – Turn on visualization. Default: False.

  • p_logging – Set up type of logging. Default: Log.C_LOG_ALL.

  • p_kwargs (dict) – Further optional named parameters.

C_TYPE = 'River Cluster Analyzer'
C_CLUSTER_PROPERTIES: PropertyDefinitions = [('centroid', 0, False, <class 'mlpro.oa.streams.tasks.clusteranalyzers.clusters.properties.centroid.Centroid'>), ('size', 0, False, <class 'mlpro.bf.math.properties.Property'>)]
_adapt(p_instance_new: Instance) bool

This method is to adapt the current clusters according to the incoming instances.

Parameters:

p_instance_new (Instance) – New stream instances to be processed.

Returns:

True, if something has been adapted. False otherwise.

Return type:

bool

_update_clusters(input_data)

This method is to update the centroids of each introduced cluster.

get_algorithm() Clusterer

This method returns the river algorithm of the clusterer.

Returns:

The river algorithm of the clusterer.

Return type:

base.Clusterer

get_cluster_memberships(p_instance: Instance, p_scope: int = 2) List[Tuple[str, float, Cluster]]

Public custom method to determine the membership of the given instance to each cluster as a value in percent.

Parameters:
  • p_instance (Instance) – Instance to be evaluated.

  • p_scope (int) – Scope of the result list. See class attributes C_MS_SCOPE_* for possible values. Default value is C_MS_SCOPE_MAX.

Returns:

membership – List of membership tuples for each cluster. A tuple consists of a cluster id, a relative membership value in percent and a reference to the cluster.

Return type:

List[Tuple[str, float, Cluster]]

Ver. 1.5.1 (2025-07-21)

This module provides a wrapper class for the CluStream algorithm provided by River.

Learn more: https://www.riverml.xyz/ https://riverml.xyz/latest/api/cluster/CluStream/

class mlpro_int_river.wrappers.clusteranalyzers.clustream.WrRiverCluStream2MLPro(p_name: str = None, p_range_max=1, p_ada: bool = True, p_visualize: bool = False, p_logging=True, p_n_macro_clusters: int = 5, p_max_micro_clusters: int = 100, p_micro_cluster_r_factor: int = 2, p_time_window: int = 1000, p_time_gap: int = 100, p_seed: int = None, p_halflife: float = 0.5, p_mu: float = 1, p_sigma: float = 1, p_p: int = 2, **p_kwargs)

Bases: WrClusterAnalyzerRiver2MLPro

This is the wrapper class for CluStream clusterer.

According to https://riverml.xyz/latest/api/cluster/CluStream/ : The CluStream algorithm maintains statistical information about the data using micro-clusters. These micro-clusters are temporal extensions of cluster feature vectors. The micro-clusters are stored at snapshots in time following a pyramidal pattern. This pattern allows to recall summary statistics from different time horizons.

Parameters:
  • p_name (str) – Name of the clusterer. Default: None.

  • p_range_max – MLPro machine learning task, either process or thread. Default: MLTask.C_RANGE_THREAD.

  • p_ada (bool) – Turn on adaptivity. Default: True.

  • p_visualize (bool) – Turn on visualization. Default: False.

  • p_logging – Set up type of logging. Default: Log.C_LOG_ALL.

  • p_n_macro_clusters (int) – The number of clusters (k) for the k-means algorithm. Default: 5.

  • p_max_micro_clusters (int) – The maximum number of micro-clusters to use. Default: 100.

  • p_micro_cluster_r_factor (int) – Multiplier for the micro-cluster radius. When deciding to add a new data point to a micro-cluster, the maximum boundary is defined as a factor of the micro_cluster_r_factor of the RMS deviation of the data points in the micro-cluster from the centroid. Default: 2.

  • p_time_window (int) – If the current time is T and the time window is h, we only consider the data that arrived within the period (T-h,T). Default: 1000.

  • p_time_gap (int) – An incremental k-means is applied on the current set of micro-clusters after each time_gap to form the final macro-cluster solution. Default: 100.

  • p_seed (int) – Random seed used for generating initial centroid positions. Default: None.

  • p_halflife (float) – Amount by which to move the cluster centers, a reasonable value if between 0 and 1. Default: 0.5.

  • p_mu (float) – Mean of the normal distribution used to instantiate cluster positions. Default: 1.

  • p_sigma (float) – Standard deviation of the normal distribution used to instantiate cluster positions. Default: 1.

  • p_p (int) – Power parameter for the Minkowski metric. When p=1, this corresponds to the Manhattan distance, while p=2 corresponds to the Euclidean distance. Default: 2.

  • p_kwargs (dict) – Further optional named parameters.

C_TYPE = 'River Cluster Analyzer CluStream'
_update_clusters(input_data)

This method is to update the centroids of each introduced cluster.

_get_clusters()

This method returns the current list of clusters.

Returns:

dict_of_clusters – Current list of clusters.

Return type:

dict[Cluster]

_renormalize(p_normalizer: Normalizer)

Internal enormalization is hard to realize here without getting strong dependencies on internal implementation details.

property clusters

This method returns the current list of clusters.

Returns:

dict_of_clusters – Current list of clusters.

Return type:

dict[Cluster]

Ver. 1.5.1 (2025-07-21)

This module provides a wrapper class for the DBStream algorithm provided by River.

Learn more: https://www.riverml.xyz/ https://riverml.xyz/latest/api/cluster/DBSTREAM/

class mlpro_int_river.wrappers.clusteranalyzers.dbstream.WrRiverDBStream2MLPro(p_name: str = None, p_range_max=1, p_ada: bool = True, p_visualize: bool = False, p_logging=True, p_clustering_threshold: float = 1.0, p_fading_factor: float = 0.01, p_cleanup_interval: float = 2, p_intersection_factor: float = 0.3, p_minimum_weight: float = 1.0, **p_kwargs)

Bases: WrClusterAnalyzerRiver2MLPro

This is the wrapper class for DBSTREAM clusterer.

According to https://riverml.xyz/latest/api/cluster/DBSTREAM/ : DBSTREAM is a clustering algorithm for evolving data streams. It is the first micro-cluster-based online clustering component that explicitely captures the density between micro-clusters via a shared density graph. The density information in the graph is then exploited for reclustering based on actual density between adjacent micro clusters.

The algorithm is divided into two parts:
  1. Online micro-cluster maintenance (learning)

  2. Offline generation of macro clusters (clustering)

Parameters:
  • p_name (str) – Name of the clusterer. Default: None.

  • p_range_max – MLPro machine learning task, either process or thread. Default: MLTask.C_RANGE_THREAD.

  • p_ada (bool) – Turn on adaptivity. Default: True.

  • p_visualize (bool) – Turn on visualization. Default: False.

  • p_logging – Set up type of logging. Default: Log.C_LOG_ALL.

  • p_clustering_threshold (float) – DBStream represents each micro cluster by a leader (a data point defining the micro cluster’s center) and the density in an area of a user-specified radius (clustering_threshold) around the center. Default: 1.0.

  • p_fading_factor (float) – Parameter that controls the importance of historical data to current cluster. Note that fading_factor has to be different from 0. Default: 0.01.

  • p_cleanup_interval (float) – The time interval between two consecutive time points when the cleanup process is conducted. Default: 2.

  • p_intersection_factor (float) – The intersection factor related to the area of the overlap of the micro clusters relative to the area cover by micro clusters. This parameter is used to determine whether a micro cluster or a shared density is weak. Default: 0.3.

  • p_minimum_weight (float) – The minimum weight for a cluster to be not “noisy”. Default: 1.0.

  • p_kwargs (dict) – Further optional named parameters.

C_TYPE = 'River Cluster Analyzer DBSTREAM'
_update_clusters(input_data)

This method is to update the centroids of each introduced cluster.

_get_clusters()

This method returns the current list of clusters.

Returns:

dict_of_clusters – Current list of clusters.

Return type:

dict[Cluster]

_renormalize(p_normalizer: Normalizer)

Internal renormalization is hard to realize here without getting strong dependencies on internal implementation details.

property clusters

This method returns the current list of clusters.

Returns:

dict_of_clusters – Current list of clusters.

Return type:

dict[Cluster]

Ver. 1.5.1 (2025-07-21)

This module provides a wrapper class for the DenStream algorithm provided by River.

Learn more: https://www.riverml.xyz/ https://riverml.xyz/latest/api/cluster/DenStream/

class mlpro_int_river.wrappers.clusteranalyzers.denstream.WrRiverDenStream2MLPro(p_name: str = None, p_range_max=1, p_ada: bool = True, p_visualize: bool = False, p_logging=True, p_decaying_factor: float = 0.25, p_beta: float = 0.75, p_mu: float = 2, p_epsilon: float = 0.02, p_n_samples_init: int = 1000, p_stream_speed: int = 100, **p_kwargs)

Bases: WrClusterAnalyzerRiver2MLPro

This is the wrapper class for DenStream clusterer.

According to https://riverml.xyz/latest/api/cluster/DenStream/ : DenStream is a clustering algorithm for evolving data streams. DenStream can discover clusters with arbitrary shape and is robust against noise (outliers).

“Dense” micro-clusters (named core-micro-clusters) summarise the clusters of arbitrary shape. A pruning strategy based on the concepts of potential and outlier micro-clusters guarantees the precision of the weights of the micro-clusters with limited memory.

The algorithm is divided into two parts:
  1. Online micro-cluster maintenance (learning)

  2. Offline generation of macro clusters (clustering)

Parameters:
  • p_name (str) – Name of the clusterer. Default: None.

  • p_range_max – MLPro machine learning task, either process or thread. Default: MLTask.C_RANGE_THREAD.

  • p_ada (bool) – Turn on adaptivity. Default: True.

  • p_visualize (bool) – Turn on visualization. Default: False.

  • p_logging – Set up type of logging. Default: Log.C_LOG_ALL.

  • p_decaying_factor (float) – Parameter that controls the importance of historical data to current cluster. Note that decaying_factor has to be different from 0. Default: 0.25.

  • p_beta (float) – Parameter to determine the threshold of outlier relative to core micro-clusters. The value of beta must be within the range (0,1). Default: 0.75.

  • p_mu (float) – Parameter to determine the threshold of outliers relative to core micro-cluster. As beta * mu must be greater than 1, mu must be within the range (1/beta, inf). Default: 2.

  • p_epsilon (float) – Defines the epsilon neighborhood. Default: 0.02.

  • p_n_samples_init (int) – Number of points to to initiqalize the online process. Default: 1000.

  • p_stream_speed (int) – Number of points arrived in unit time. Default: 100.

  • p_kwargs (dict) – Further optional named parameters.

C_TYPE = 'River Cluster Analyzer DenStream'
_update_clusters(input_data)

This method is to update the centroids of each introduced cluster.

_get_clusters() dict[Cluster]

This method returns the current list of clusters.

Returns:

dict_of_clusters – Current list of clusters.

Return type:

dict[ClusterCentroid]

_renormalize(p_normalizer: Normalizer)

Internal renormalization of all clusters. See method OATask.renormalize_on_event() for further information.

Parameters:

p_normalizer (Normalizer) – Normalizer object to be applied on task-specific

property clusters: dict[Cluster]

This method returns the current list of clusters.

Returns:

dict_of_clusters – Current list of clusters.

Return type:

dict[ClusterCentroid]

Ver. 1.5.2 (2025-07-21)

This module provides a wrapper class for the KMeans algorithm provided by River.

Learn more: https://www.riverml.xyz/ https://riverml.xyz/latest/api/cluster/KMeans/

class mlpro_int_river.wrappers.clusteranalyzers.kmeans.WrRiverKMeans2MLPro(p_name: str = None, p_range_max=1, p_ada: bool = True, p_visualize: bool = False, p_logging=True, p_n_clusters: int = 5, p_halflife: float = 0.5, p_mu: float = 0, p_sigma: float = 1, p_p: int = 2, p_seed: int = None, **p_kwargs)

Bases: WrClusterAnalyzerRiver2MLPro

This is the wrapper class for KMeans clusterer.

According to https://riverml.xyz/latest/api/cluster/KMeans/ : Incremental k-means. The most common way to implement batch k-means is to use Lloyd’s algorithm, which consists in assigning all the data points to a set of cluster centers and then moving the centers accordingly.

Parameters:
  • p_name (str) – Name of the clusterer. Default: None.

  • p_range_max – MLPro machine learning task, either process or thread. Default: MLTask.C_RANGE_THREAD.

  • p_ada (bool) – Turn on adaptivity. Default: True.

  • p_visualize (bool) – Turn on visualization. Default: False.

  • p_logging – Set up type of logging. Default: Log.C_LOG_ALL.

  • p_n_clusters (int) – Maximum number of clusters to assign. Default: 5.

  • p_seed (int) – Random seed used for generating initial centroid positions. Default: None.

  • p_halflife (float) – Amount by which to move the cluster centers, a reasonable value if between 0 and 1. Default: 0.5.

  • p_mu (float) – Mean of the normal distribution used to instantiate cluster positions. Default: 1.

  • p_sigma (float) – Standard deviation of the normal distribution used to instantiate cluster positions. Default: 1.

  • p_p (int) – Power parameter for the Minkowski metric. When p=1, this corresponds to the Manhattan distance, while p=2 corresponds to the Euclidean distance. Default: 2.

  • p_kwargs (dict) – Further optional named parameters.

C_TYPE = 'River Cluster Analyzer KMeans'
C_CLUSTER_PROPERTIES: PropertyDefinitions = [('centroid', 1, False, <class 'mlpro.oa.streams.tasks.clusteranalyzers.clusters.properties.centroid.Centroid'>), ('size', 1, False, <class 'mlpro.bf.math.properties.Property'>), ('size_geo', 1, False, <class 'mlpro.bf.math.properties.Property'>)]
_update_clusters(input_data)

This method is to update the centroids of each introduced cluster.

_get_clusters()

This method returns the current list of clusters.

Returns:

dict_of_clusters – Current list of clusters.

Return type:

dict[Cluster]

_renormalize(p_normalizer: Normalizer)

Internal renormalization of all clusters. See method OATask.renormalize_on_event() for further information.

Parameters:

p_normalizer (Normalizer) – Normalizer object to be applied on task-specific

property clusters

This method returns the current list of clusters.

Returns:

dict_of_clusters – Current list of clusters.

Return type:

dict[Cluster]

Ver. 1.5.1 (2025-07-21)

This module provides a wrapper class for the STREAMKMeans algorithm provided by River.

Learn more: https://www.riverml.xyz/ https://riverml.xyz/latest/api/cluster/STREAMKMeans/

class mlpro_int_river.wrappers.clusteranalyzers.streamkmeans.WrRiverStreamKMeans2MLPro(p_name: str = None, p_range_max=1, p_ada: bool = True, p_visualize: bool = False, p_logging=True, p_chunk_size: int = 10, p_n_clusters: int = 5, p_halflife: float = 0.5, p_mu: float = 0, p_sigma: float = 1, p_p: int = 2, p_seed: int = None, **p_kwargs)

Bases: WrClusterAnalyzerRiver2MLPro

This is the wrapper class for STREAMKMeans clusterer.

According to https://riverml.xyz/latest/api/cluster/STREAMKMeans/ : STREAMKMeans is an alternative version of the original algorithm STREAMLSEARCH proposed by O’Callaghan et al., by replacing the k-medians using LSEARCH by the k-means algorithm.

However, instead of using the traditional k-means, which requires a total reclustering each time the temporary chunk of data points is full, the implementation of this algorithm uses an increamental k-means.

Parameters:
  • p_name (str) – Name of the clusterer. Default: None.

  • p_range_max – MLPro machine learning task, either process or thread. Default: MLTask.C_RANGE_THREAD.

  • p_ada (bool) – Turn on adaptivity. Default: True.

  • p_visualize (bool) – Turn on visualization. Default: False.

  • p_logging – Set up type of logging. Default: Log.C_LOG_ALL.

  • p_chunk_size (int) – Maximum size allowed for the temporary data chunk. Default: 10.

  • p_n_clusters (int) – Number of clusters generated by the algorithm. Default: 5.

  • p_seed (int) – Random seed used for generating initial centroid positions. Default: None.

  • p_halflife (float) – Amount by which to move the cluster centers, a reasonable value if between 0 and 1. Default: 0.5.

  • p_mu (float) – Mean of the normal distribution used to instantiate cluster positions. Default: 1.

  • p_sigma (float) – Standard deviation of the normal distribution used to instantiate cluster positions. Default: 1.

  • p_p (int) – Power parameter for the Minkowski metric. When p=1, this corresponds to the Manhattan distance, while p=2 corresponds to the Euclidean distance. Default: 2.

  • p_kwargs (dict) – Further optional named parameters.

C_NAME = 'River Cluster Analyzer STREAMKMeans'
_update_clusters(input_data)

This method is to update the centroids of each introduced cluster.

_get_clusters()

This method returns the current list of clusters.

Returns:

dict_of_clusters – Current list of clusters.

Return type:

dict[Cluster]

_renormalize(p_normalizer: Normalizer)

Internal renormalization of all clusters. See method OATask.renormalize_on_event() for further information.

Parameters:

p_normalizer (Normalizer) – Normalizer object to be applied on task-specific

property clusters

This method returns the current list of clusters.

Returns:

dict_of_clusters – Current list of clusters.

Return type:

dict[Cluster]