Mahout clustering Canopy+K-means 源码分析

2023-12-01 05:18

本文主要是介绍Mahout clustering Canopy+K-means 源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

聚类分析

 

     聚类(Clustering)可以简单的理解为将数据对象分为多个簇(Cluster),每个簇里的所有数据对象具有一定的相似性,这样一个簇可以看作一个整体,以此可以提高计算质量或减少计算量。而数据对象间相似性的衡量通常是通过坐标系中空间距离的大小来判断;常见的有 欧几里得距离算法、余弦距离算法、皮尔逊相关系数算法等,Mahout对此都提供了实现,并且你可以在实现自己的聚类时,通过接口切换不同的距离算法。

 

 

数据模型

 

     在Mahout的聚类分析的计算过程中,数据对象会转化成向量(Vector)参与运算,在Mahout中的接口是org.apache.mahout.math.Vector  它里面每个域用一个浮点数(double)表示,你可以通过继承Mahout里的基类如:AbstractVector来实现自己的向量模型,也可以直接使用一些它提供的已有实现如下:

 

    1. DenseVector,它的实现就是一个浮点数数组,对向量里所有域都进行存储,适合用于存储密集向量。

 

    2. RandomAccessSparseVector 基于浮点数的 HashMap 实现的,key 是整形 (int) 类型,value 是浮点数(double) 类型,它只存储向量中不为空的值,并提供随机访问。

 

    3. SequentialAccessVector 实现为整形 (int) 类型和浮点数 (double) 类型的并行数组,它也只存储向量中不 为空的值,但只提供顺序访问。

 

 

聚类算法K-means与Canopy

 

       首先介绍先K-means算法:所有做聚类分析的数据对象,会被描述成n维空间中的一个点,用向量(Vector)表示;算法开始会随机选择K个点,作为一个簇的中心,然后其余的点会根据它与每个簇心的距离,被分配到最近簇中去;接着以迭代的方式,先重新计算每个簇的中心(通过其包含的所有向量的平均值),计算完成后对所有点属于哪个簇进行重新划分;一直如此迭代直到过程收敛;可证明迭代次数是有限的。

 

       虽然K-means简单且高效,但它存在一定问题,首先K值(即簇的数量)是人为确定的,在对数据不了解的情况下,很难给出合理的K值;其次初始簇心的选择是随机的,若选择到了较孤立的点,会对聚类的效果产生非常大的影响。因此通常会用Canopy算法配合,进行初始化,确定簇数以及初始簇心。

 

       Canopy算法首先会要求输入两个阀值 T1和T2,T1>T2;算法有一个集群这里叫Canopy的集合(Set),当然一开始它是空的;然后会将读取到的第一个点作为集合中的一个Canopy,接着读取下一个点,若该点与集合中的每个Canopy计算距离,若这个距离小于T1,则这个点会分配给这个Canopy(一个点可以分配给多个Canopy),而当这个距离小于T2时这个点不能作为一个新的Canopy而放到集合中。也就是说当一个点只要与集合中任意一个Canopy的距离小于T2了,即表示它里那个Canopy太近不能作为新的Canopy。若都没有则生成一个新的Canopy放入集合中。以此循环,直到没有点了。

 

       所以这里用到的聚类分析算法的思路是:首先通过Canopy算法进行聚类,以确定簇数以及初始簇心的,接着通过K-means算法进行迭代运算,收敛出最后的聚类结果。接下来我们看看实现。

 

 

 

 

代码示例

在 mahout-examples 中的 org.apache.mahout.clustering.syntheticcontrol.kmeans.Job类,对上述算法提供了较完整的实现,它是一个Hadoop的job,我们从源代码入手,看如何将实际的数据跑起来。下面是该类的核心逻辑代码:

/**

Java代码   收藏代码
  1.  * Run the kmeans clustering job on an input dataset using the given  
  2.  * distance measure, t1, t2 and iteration parameters. All output data will  
  3.  * be written to the output directory, which will be initially deleted if it  
  4.  * exists. The clustered points will reside in the path  
  5.  * <output>/clustered-points. By default, the job expects the a file  
  6.  * containing synthetic_control.data as obtained from  
  7.  * http://archive.ics.uci.  
  8.  * edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a  
  9.  * directory named "testdata", and writes output to a directory named  
  10.  * "output".  
  11.  *   
  12.  * @param conf  
  13.  *            the Configuration to use  
  14.  * @param input  
  15.  *            the String denoting the input directory path  
  16.  * @param output  
  17.  *            the String denoting the output directory path  
  18.  * @param measure  
  19.  *            the DistanceMeasure to use  
  20.  * @param t1  
  21.  *            the canopy T1 threshold  
  22.  * @param t2  
  23.  *            the canopy T2 threshold  
  24.  * @param convergenceDelta  
  25.  *            the double convergence criteria for iterations  
  26.  * @param maxIterations  
  27.  *            the int maximum number of iterations  
  28.  */  
  29. public static void run(Configuration conf, Path input, Path output,  
  30.         DistanceMeasure measure, double t1, double t2,  
  31.         double convergenceDelta, int maxIterations) throws Exception {  
  32.   
  33.     System.out.println("run canopy output: " + output);  
  34.     Path directoryContainingConvertedInput = new Path(output,  
  35.             DIRECTORY_CONTAINING_CONVERTED_INPUT);  
  36.     log.info("Preparing Input");  
  37.     InputDriver.runJob(input, directoryContainingConvertedInput,  
  38.             "org.apache.mahout.math.RandomAccessSparseVector");  
  39.     log.info("Running Canopy to get initial clusters");  
  40.     CanopyDriver.run(conf, directoryContainingConvertedInput, output,  
  41.             measure, t1, t2, falsefalse);  
  42.     log.info("Running KMeans");  
  43.     System.out.println("kmeans cluster starting...");  
  44.     KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(  
  45.             output, Cluster.INITIAL_CLUSTERS_DIR+"-final"), output, measure,  
  46.             convergenceDelta, maxIterations, truefalse);  
  47.     // run ClusterDumper  
  48.     ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,  
  49.             output, maxIterations), new Path(output, "clusteredPoints"));  
  50.     clusterDumper.printClusters(null);  
  51. }  

       这个Job中调用了3个Map/Reduce 任务以及一个转换,它们如下:


1. 第8行: InputDriver.runJob ( ) ,它用于将原始数据文件转换成 Mahout进行计算所需格式的文件 SequenceFile,它是Hadoop API提供的一种二进制文件支持。这种二进制文件直接将<key, value>对序列化到文件中。

2. 第11行:CanopyDriver.run( ) , 即用Canopy算法确定初始簇的个数和簇的中心。

3.  第14行:KMeansDriver.run( ) , 这显然是K-means算法进行聚类。

4. 第18~20行,ClusterDumper类将聚类的结果装换并写出来,若你了解了源代码,你也可以自己实现这个类的功能,因为聚类后的数据存储格式,往往跟自身业务有关。 

这里细讲下第一个Map/Reduce: InputDriver.runJob ( )因为我们需要了解,初始数据的格式,其他的任务CanopyDriver.run( )和KMeansDriver.run( )任务就不细讲了,主要就是Canopy和K-means算法,原理已经介绍了,实现也不难,需要你了解hadoop编程。
 InputDriver.runJob( )实现也非常简单,它只有Map,其代码如下:
Java代码   收藏代码
  1. @Override  
  2. protected void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {  
  3.   
  4.   String[] numbers = SPACE.split(values.toString());  
  5.   // sometimes there are multiple separator spaces  
  6.   Collection<Double> doubles = Lists.newArrayList();  
  7.   for (String value : numbers) {  
  8.     if (!value.isEmpty()) {  
  9.       doubles.add(Double.valueOf(value));  
  10.     }  
  11.   }  
  12.   // ignore empty lines in data file  
  13.   if (!doubles.isEmpty()) {  
  14.     try {  
  15.       Vector result = (Vector) constructor.newInstance(doubles.size());  
  16.       int index = 0;  
  17.       for (Double d : doubles) {  
  18.         result.set(index++, d);  
  19.       }  
  20.       VectorWritable vectorWritable = new VectorWritable(result);  
  21.       context.write(new Text(String.valueOf(index)), vectorWritable);  
  22.   
  23.     } catch (InstantiationException e) {  
  24.       throw new IllegalStateException(e);  
  25.     } catch (IllegalAccessException e) {  
  26.       throw new IllegalStateException(e);  
  27.     } catch (InvocationTargetException e) {  
  28.       throw new IllegalStateException(e);  
  29.     }  
  30.   }  
  31. }  
 由代码可以看出,它将你初始数据文件的每一行用空格切开成个 String[] numbers ,然后再将 numbers中的每个String转换成Double类型,并以此生成一个向量 Vector ,然后通过 SequenceFileOutputFormat的方式输出成SequenceFile,以作下一步计算的输入。由此我们可以了解到我们的初始数据的格式需要 以一行为一个单位,用空格分隔,每一列为一个Double数即可(当然你也可以反过来修改例子中的实现)。


参考资料:

https://cwiki.apache.org/confluence/display/MAHOUT/K-Means+Clustering

https://cwiki.apache.org/confluence/display/MAHOUT/Canopy+Clustering

http://www.ibm.com/developerworks/cn/java/j-mahout-scaling/

http://www.ibm.com/developerworks/cn/web/1103_zhaoct_recommstudy3/

《Mahout in action》

https://cwiki.apache.org/MAHOUT/cluster-dumper.html
转自: Mahout clustering Canopy+K-means 源码分析,保存在此以学习。

这篇关于Mahout clustering Canopy+K-means 源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/439946

相关文章

Olingo分析和实践之EDM 辅助序列化器详解(最佳实践)

《Olingo分析和实践之EDM辅助序列化器详解(最佳实践)》EDM辅助序列化器是ApacheOlingoOData框架中无需完整EDM模型的智能序列化工具,通过运行时类型推断实现灵活数据转换,适用... 目录概念与定义什么是 EDM 辅助序列化器?核心概念设计目标核心特点1. EDM 信息可选2. 智能类

Olingo分析和实践之OData框架核心组件初始化(关键步骤)

《Olingo分析和实践之OData框架核心组件初始化(关键步骤)》ODataSpringBootService通过初始化OData实例和服务元数据,构建框架核心能力与数据模型结构,实现序列化、URI... 目录概述第一步:OData实例创建1.1 OData.newInstance() 详细分析1.1.1

Olingo分析和实践之ODataImpl详细分析(重要方法详解)

《Olingo分析和实践之ODataImpl详细分析(重要方法详解)》ODataImpl.java是ApacheOlingoOData框架的核心工厂类,负责创建序列化器、反序列化器和处理器等组件,... 目录概述主要职责类结构与继承关系核心功能分析1. 序列化器管理2. 反序列化器管理3. 处理器管理重要方

SpringBoot中六种批量更新Mysql的方式效率对比分析

《SpringBoot中六种批量更新Mysql的方式效率对比分析》文章比较了MySQL大数据量批量更新的多种方法,指出REPLACEINTO和ONDUPLICATEKEY效率最高但存在数据风险,MyB... 目录效率比较测试结构数据库初始化测试数据批量修改方案第一种 for第二种 case when第三种

解决1093 - You can‘t specify target table报错问题及原因分析

《解决1093-Youcan‘tspecifytargettable报错问题及原因分析》MySQL1093错误因UPDATE/DELETE语句的FROM子句直接引用目标表或嵌套子查询导致,... 目录报js错原因分析具体原因解决办法方法一:使用临时表方法二:使用JOIN方法三:使用EXISTS示例总结报错原

MySQL中的LENGTH()函数用法详解与实例分析

《MySQL中的LENGTH()函数用法详解与实例分析》MySQLLENGTH()函数用于计算字符串的字节长度,区别于CHAR_LENGTH()的字符长度,适用于多字节字符集(如UTF-8)的数据验证... 目录1. LENGTH()函数的基本语法2. LENGTH()函数的返回值2.1 示例1:计算字符串

Android kotlin中 Channel 和 Flow 的区别和选择使用场景分析

《Androidkotlin中Channel和Flow的区别和选择使用场景分析》Kotlin协程中,Flow是冷数据流,按需触发,适合响应式数据处理;Channel是热数据流,持续发送,支持... 目录一、基本概念界定FlowChannel二、核心特性对比数据生产触发条件生产与消费的关系背压处理机制生命周期

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景

python中Hash使用场景分析

《python中Hash使用场景分析》Python的hash()函数用于获取对象哈希值,常用于字典和集合,不可变类型可哈希,可变类型不可,常见算法包括除法、乘法、平方取中和随机数哈希,各有优缺点,需根... 目录python中的 Hash除法哈希算法乘法哈希算法平方取中法随机数哈希算法小结在Python中,