使用MapReduce实现k-means算法_"system.out.println(\"initial cluster finished\");-程序员宅基地

技术标签: mapreduce  k-means  hadoop  数据挖掘  

主要的算法流程就是:

(1)随机选择k个点,放到磁盘上供个个点进行共享

(2)每一个map读取中心点,每一条及记录找到最近的Cluster,发出的记录是<(id),(cluster)>,Reduce的功能就是重新计算新的k均值,并写到hdfs中,供下一次的迭代使用

(3)当迭代停止,根据最终的中心点,分配所有的点,形成最终的聚类。

以下是具体的代码:

package kmeans;


import java.io.DataInput;


/*
 * k-means聚类算法簇信息
 */
public class Cluster implements Writable {
private int clusterID;
private long numOfPoints;
private Instance center;


public Cluster() {
this.setClusterID(-1);
this.setNumOfPoints(0);
this.setCenter(new Instance());
}


public Cluster(int clusterID, Instance center) {
this.setClusterID(clusterID);
this.setNumOfPoints(0);
this.setCenter(center);
}


public Cluster(String line) {
String[] value = line.split(",", 3);
clusterID = Integer.parseInt(value[0]);
numOfPoints = Long.parseLong(value[1]);
center = new Instance(value[2]);
}


public String toString() {
String result = String.valueOf(clusterID) + ","
+ String.valueOf(numOfPoints) + "," + center.toString();
return result;
}


public int getClusterID() {
return clusterID;
}


public void setClusterID(int clusterID) {
this.clusterID = clusterID;
}


public long getNumOfPoints() {
return numOfPoints;
}


public void setNumOfPoints(long numOfPoints) {
this.numOfPoints = numOfPoints;
}


public Instance getCenter() {
return center;
}


public void setCenter(Instance center) {
this.center = center;
}


public void observeInstance(Instance instance) {
try {
Instance sum = center.multiply(numOfPoints).add(instance);
numOfPoints++;
center = sum.divide(numOfPoints);
} catch (Exception e) {
e.printStackTrace();
}
}


@Override
public void write(DataOutput out) throws IOException {
out.writeInt(clusterID);
out.writeLong(numOfPoints);
center.write(out);
}


@Override
public void readFields(DataInput in) throws IOException {
clusterID = in.readInt();
numOfPoints = in.readLong();
center.readFields(in);
}
}


package kmeans;


import java.io.DataInput;


public class Instance implements Writable {
ArrayList<Double> value;


public Instance() {
value = new ArrayList<Double>();
}


public Instance(String line) {
String[] valueString = line.split(",");
value = new ArrayList<Double>();
for (int i = 0; i < valueString.length; i++) {
value.add(Double.parseDouble(valueString[i]));
}
}


public Instance(Instance ins) {
value = new ArrayList<Double>();
for (int i = 0; i < ins.getValue().size(); i++) {
value.add(new Double(ins.getValue().get(i)));
}
}


public Instance(int k) {
value = new ArrayList<Double>();
for (int i = 0; i < k; i++) {
value.add(0.0);
}
}


public ArrayList<Double> getValue() {
return value;
}


public Instance add(Instance instance) {
if (value.size() == 0)
return new Instance(instance);
else if (instance.getValue().size() == 0)
return new Instance(this);
else if (value.size() != instance.getValue().size())
try {
throw new Exception("can not add! dimension not compatible!"
+ value.size() + "," + instance.getValue().size());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
else {
Instance result = new Instance();
for (int i = 0; i < value.size(); i++) {
result.getValue()
.add(value.get(i) + instance.getValue().get(i));
}
return result;
}
}


public Instance multiply(double num) {
Instance result = new Instance();
for (int i = 0; i < value.size(); i++) {
result.getValue().add(value.get(i) * num);
}
return result;
}


public Instance divide(double num) {
Instance result = new Instance();
for (int i = 0; i < value.size(); i++) {
result.getValue().add(value.get(i) / num);
}
return result;
}


public String toString() {
String s = new String();
for (int i = 0; i < value.size() - 1; i++) {
s += (value.get(i) + ",");
}
s += value.get(value.size() - 1);
return s;
}


@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(value.size());
for (int i = 0; i < value.size(); i++) {
out.writeDouble(value.get(i));
}
}


@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
int size = 0;
value = new ArrayList<Double>();
if ((size = in.readInt()) != 0) {
for (int i = 0; i < size; i++) {
value.add(in.readDouble());
}
}
}
}


package kmeans;


import java.io.BufferedReader;


/**
 * KMeans聚类算法
 * 
 */
public class KMeans {
public static class KMeansMapper extends
Mapper<LongWritable, Text, IntWritable, Cluster> {
private ArrayList<Cluster> kClusters = new ArrayList<Cluster>();


/**
* 读入目前的簇信息
*/
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
FileSystem fs = FileSystem.get(context.getConfiguration());
FileStatus[] fileList = fs.listStatus(new Path(context
.getConfiguration().get("clusterPath")));
BufferedReader in = null;
FSDataInputStream fsi = null;
String line = null;
for (int i = 0; i < fileList.length; i++) {
if (!fileList[i].isDir()) {
fsi = fs.open(fileList[i].getPath());
in = new BufferedReader(new InputStreamReader(fsi, "UTF-8"));
while ((line = in.readLine()) != null) {
System.out.println("read a line:" + line);
Cluster cluster = new Cluster(line);
cluster.setNumOfPoints(0);
kClusters.add(cluster);
}
}
}
in.close();
fsi.close();
}


/**
* 读取一行然后寻找离该点最近的簇发射(clusterID,instance)
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Instance instance = new Instance(value.toString());
int id;
try {
id = getNearest(instance);
if (id == -1)
throw new InterruptedException("id == -1");
else {
Cluster cluster = new Cluster(id, instance);
cluster.setNumOfPoints(1);
System.out.println("cluster that i emit is:"
+ cluster.toString());
context.write(new IntWritable(id), cluster);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


/**
* 返回离instance最近的簇的ID

* @param instance
* @return
* @throws Exception
*/
public int getNearest(Instance instance) throws Exception {
int id = -1;
double distance = Double.MAX_VALUE;
Distance<Double> distanceMeasure = new EuclideanDistance<Double>();
double newDis = 0.0;
for (Cluster cluster : kClusters) {
newDis = distanceMeasure.getDistance(cluster.getCenter()
.getValue(), instance.getValue());
if (newDis < distance) {
id = cluster.getClusterID();
distance = newDis;
}
}
return id;
}


public Cluster getClusterByID(int id) {
for (Cluster cluster : kClusters) {
if (cluster.getClusterID() == id)
return cluster;
}
return null;
}
}


public static class KMeansCombiner extends
Reducer<IntWritable, Cluster, IntWritable, Cluster> {
public void reduce(IntWritable key, Iterable<Cluster> value,
Context context) throws IOException, InterruptedException {
Instance instance = new Instance();
int numOfPoints = 0;
for (Cluster cluster : value) {
numOfPoints += cluster.getNumOfPoints();
System.out.println("cluster is:" + cluster.toString());
instance = instance.add(cluster.getCenter().multiply(
cluster.getNumOfPoints()));
}
Cluster cluster = new Cluster(key.get(), instance
.divide(numOfPoints));
cluster.setNumOfPoints(numOfPoints);
System.out.println("combiner emit cluster:" + cluster.toString());
context.write(key, cluster);
}
}


public static class KMeansReducer extends
Reducer<IntWritable, Cluster, NullWritable, Cluster> {
public void reduce(IntWritable key, Iterable<Cluster> value,
Context context) throws IOException, InterruptedException {
Instance instance = new Instance();
int numOfPoints = 0;
for (Cluster cluster : value) {
numOfPoints += cluster.getNumOfPoints();
instance = instance.add(cluster.getCenter().multiply(
cluster.getNumOfPoints()));
}
Cluster cluster = new Cluster(key.get(), instance
.divide(numOfPoints));
cluster.setNumOfPoints(numOfPoints);
context.write(NullWritable.get(), cluster);
}
}
}


package kmeans;


import java.io.BufferedReader;


/**
 * 在收敛条件满足且所有簇中心的文件最后产生后,再对输入文件 中的所有实例进行划分簇的工作,最后把所有实例按照(实例,簇id) 的方式写进结果文件
 * 
 * @author KING
 * 
 */
public class KMeansCluster {
public static class KMeansClusterMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private ArrayList<Cluster> kClusters = new ArrayList<Cluster>();


/**
* 读入目前的簇信息
*/
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
FileSystem fs = FileSystem.get(context.getConfiguration());
FileStatus[] fileList = fs.listStatus(new Path(context
.getConfiguration().get("clusterPath")));
BufferedReader in = null;
FSDataInputStream fsi = null;
String line = null;
for (int i = 0; i < fileList.length; i++) {
if (!fileList[i].isDir()) {
fsi = fs.open(fileList[i].getPath());
in = new BufferedReader(new InputStreamReader(fsi, "UTF-8"));
while ((line = in.readLine()) != null) {
System.out.println("read a line:" + line);
Cluster cluster = new Cluster(line);
cluster.setNumOfPoints(0);
kClusters.add(cluster);
}
}
}
in.close();
fsi.close();
}


/**
* 读取一行然后寻找离该点最近的簇id发射(instance,clusterID)
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Instance instance = new Instance(value.toString());
int id;
try {
id = getNearest(instance);
if (id == -1)
throw new InterruptedException("id == -1");
else {
context.write(value, new IntWritable(id));
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


public int getNearest(Instance instance) throws Exception {
int id = -1;
double distance = Double.MAX_VALUE;
Distance<Double> distanceMeasure = new EuclideanDistance<Double>();
double newDis = 0.0;
for (Cluster cluster : kClusters) {
newDis = distanceMeasure.getDistance(cluster.getCenter()
.getValue(), instance.getValue());
if (newDis < distance) {
id = cluster.getClusterID();
distance = newDis;
}
}
return id;
}
}
}


package kmeans;


import java.io.IOException;


/**
 * 调度整个KMeans运行的过程
 * 
 */
public class KMeansDriver {
private int k;
private int iterationNum;
private String sourcePath;
private String outputPath;
private Configuration conf;


public KMeansDriver(int k, int iterationNum, String sourcePath,
String outputPath, Configuration conf) {
this.k = k;
this.iterationNum = iterationNum;
this.sourcePath = sourcePath;
this.outputPath = outputPath;
this.conf = conf;
}


public void clusterCenterJob() throws IOException, InterruptedException,
ClassNotFoundException {
for (int i = 0; i < iterationNum; i++) {
Job clusterCenterJob = new Job();
clusterCenterJob.setJobName("clusterCenterJob" + i);
clusterCenterJob.setJarByClass(KMeans.class);


clusterCenterJob.getConfiguration().set("clusterPath",
outputPath + "/cluster-" + i + "/");


clusterCenterJob.setMapperClass(KMeans.KMeansMapper.class);
clusterCenterJob.setMapOutputKeyClass(IntWritable.class);
clusterCenterJob.setMapOutputValueClass(Cluster.class);


clusterCenterJob.setCombinerClass(KMeans.KMeansCombiner.class);
clusterCenterJob.setReducerClass(KMeans.KMeansReducer.class);
clusterCenterJob.setOutputKeyClass(NullWritable.class);
clusterCenterJob.setOutputValueClass(Cluster.class);


FileInputFormat
.addInputPath(clusterCenterJob, new Path(sourcePath));
FileOutputFormat.setOutputPath(clusterCenterJob, new Path(
outputPath + "/cluster-" + (i + 1) + "/"));


clusterCenterJob.waitForCompletion(true);
System.out.println("finished!");
}
}


public void KMeansClusterJod() throws IOException, InterruptedException,
ClassNotFoundException {
Job kMeansClusterJob = new Job();
kMeansClusterJob.setJobName("KMeansClusterJob");
kMeansClusterJob.setJarByClass(KMeansCluster.class);


kMeansClusterJob.getConfiguration().set("clusterPath",
outputPath + "/cluster-" + (iterationNum - 1) + "/");


kMeansClusterJob
.setMapperClass(KMeansCluster.KMeansClusterMapper.class);
kMeansClusterJob.setMapOutputKeyClass(Text.class);
kMeansClusterJob.setMapOutputValueClass(IntWritable.class);


kMeansClusterJob.setNumReduceTasks(0);


FileInputFormat.addInputPath(kMeansClusterJob, new Path(sourcePath));
FileOutputFormat.setOutputPath(kMeansClusterJob, new Path(outputPath
+ "/clusteredInstances" + "/"));


kMeansClusterJob.waitForCompletion(true);
System.out.println("finished!");
}


public void generateInitialCluster() {
RandomClusterGenerator generator = new RandomClusterGenerator(conf,
sourcePath, k);
generator.generateInitialCluster(outputPath + "/");
}


public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
System.out.println("start");
Configuration conf = new Configuration();
int k = Integer.parseInt(args[0]);
int iterationNum = Integer.parseInt(args[1]);
String sourcePath = args[2];
String outputPath = args[3];
KMeansDriver driver = new KMeansDriver(k, iterationNum, sourcePath,
outputPath, conf);
driver.generateInitialCluster();
System.out.println("initial cluster finished");
driver.clusterCenterJob();
driver.KMeansClusterJod();
}
}


package kmeans;


import java.io.IOException;


/**
 * This class generates the initial Cluster centers as the input of successive
 * process. it randomly chooses k instances as the initial k centers and store
 * it as a sequenceFile.Specificly,we scan all the instances and each time when
 * we scan a new instance.we first check if the number of clusters no less than
 * k. we simply add current instance to our cluster if the condition is
 * satisfied or we will replace the first cluster with it with probability
 * 1/(currentNumber + 1).
 * 
 */
public final class RandomClusterGenerator {
private int k;
private FileStatus[] fileList;
private FileSystem fs;
private ArrayList<Cluster> kClusters;
private Configuration conf;


public RandomClusterGenerator(Configuration conf, String filePath, int k) {
this.k = k;
try {
fs = FileSystem.get(URI.create(filePath), conf);
fileList = fs.listStatus((new Path(filePath)));
kClusters = new ArrayList<Cluster>(k);
this.conf = conf;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}


}


/**

* @param destinationPath
*            the destination Path we will store our cluster file in.the
*            initial file will be named clusters-0
*/
public void generateInitialCluster(String destinationPath) {
Text line = new Text();
FSDataInputStream fsi = null;
try {
for (int i = 0; i < fileList.length; i++) {
fsi = fs.open(fileList[i].getPath());
LineReader lineReader = new LineReader(fsi, conf);
while (lineReader.readLine(line) > 0) {
// 判断是否应该加入到中心集合中去
System.out.println("read a line:" + line);
Instance instance = new Instance(line.toString());
makeDecision(instance);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
// in.close();
fsi.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}


}


writeBackToFile(destinationPath);


}


public void makeDecision(Instance instance) {
if (kClusters.size() < k) {
Cluster cluster = new Cluster(kClusters.size() + 1, instance);
kClusters.add(cluster);
} else {
int choice = randomChoose(k);
if (!(choice == -1)) {
int id = kClusters.get(choice).getClusterID();
kClusters.remove(choice);
Cluster cluster = new Cluster(id, instance);
kClusters.add(cluster);
}
}
}


/**
* 以1/(1+k)的概率返回一个[0,k-1]中的正整数,以 k/k+1的概率返回-1.

* @param k
* @return
*/
public int randomChoose(int k) {
Random random = new Random();
if (random.nextInt(k + 1) == 0) {
return new Random().nextInt(k);
} else
return -1;
}


public void writeBackToFile(String destinationPath) {
// /clusters
Path path = new Path(destinationPath + "cluster-0");
FSDataOutputStream fsi = null;
try {
fsi = fs.create(path);
for (Cluster cluster : kClusters) {
fsi.write((cluster.toString() + "\n").getBytes());
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
fsi.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


}
}


数据:

2,1,3,4,1,4
3,2,5,2,3,5
4,4,4,3,1,5
2,3,1,2,0,3
4,0,1,1,1,5
1,2,3,5,0,1
5,3,2,2,1,3
3,4,1,1,2,1
0,2,3,3,1,4
0,2,5,0,2,2
2,1,4,5,4,3
4,1,4,3,3,2
0,3,2,2,0,1
1,3,1,0,3,0
3,3,4,2,1,3
3,5,3,5,3,2
2,3,2,3,0,1
4,3,3,2,2,3
1,4,3,4,3,1
3,2,3,0,2,5
1,0,2,1,0,4
4,4,3,5,5,4
5,1,4,3,5,2
3,4,4,4,1,1
2,2,4,4,5,5
5,2,0,3,1,3
1,1,3,1,1,3
2,4,2,0,3,5
1,1,1,1,0,4
1,1,4,1,3,0

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u011955252/article/details/50553789

智能推荐

while循环&CPU占用率高问题深入分析与解决方案_main函数使用while(1)循环cpu占用99-程序员宅基地

文章浏览阅读3.8k次,点赞9次,收藏28次。直接上一个工作中碰到的问题,另外一个系统开启多线程调用我这边的接口,然后我这边会开启多线程批量查询第三方接口并且返回给调用方。使用的是两三年前别人遗留下来的方法,放到线上后发现确实是可以正常取到结果,但是一旦调用,CPU占用就直接100%(部署环境是win server服务器)。因此查看了下相关的老代码并使用JProfiler查看发现是在某个while循环的时候有问题。具体项目代码就不贴了,类似于下面这段代码。​​​​​​while(flag) {//your code;}这里的flag._main函数使用while(1)循环cpu占用99

【无标题】jetbrains idea shift f6不生效_idea shift +f6快捷键不生效-程序员宅基地

文章浏览阅读347次。idea shift f6 快捷键无效_idea shift +f6快捷键不生效

node.js学习笔记之Node中的核心模块_node模块中有很多核心模块,以下不属于核心模块,使用时需下载的是-程序员宅基地

文章浏览阅读135次。Ecmacript 中没有DOM 和 BOM核心模块Node为JavaScript提供了很多服务器级别,这些API绝大多数都被包装到了一个具名和核心模块中了,例如文件操作的 fs 核心模块 ,http服务构建的http 模块 path 路径操作模块 os 操作系统信息模块// 用来获取机器信息的var os = require('os')// 用来操作路径的var path = require('path')// 获取当前机器的 CPU 信息console.log(os.cpus._node模块中有很多核心模块,以下不属于核心模块,使用时需下载的是

数学建模【SPSS 下载-安装、方差分析与回归分析的SPSS实现(软件概述、方差分析、回归分析)】_化工数学模型数据回归软件-程序员宅基地

文章浏览阅读10w+次,点赞435次,收藏3.4k次。SPSS 22 下载安装过程7.6 方差分析与回归分析的SPSS实现7.6.1 SPSS软件概述1 SPSS版本与安装2 SPSS界面3 SPSS特点4 SPSS数据7.6.2 SPSS与方差分析1 单因素方差分析2 双因素方差分析7.6.3 SPSS与回归分析SPSS回归分析过程牙膏价格问题的回归分析_化工数学模型数据回归软件

利用hutool实现邮件发送功能_hutool发送邮件-程序员宅基地

文章浏览阅读7.5k次。如何利用hutool工具包实现邮件发送功能呢?1、首先引入hutool依赖<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.19</version></dependency>2、编写邮件发送工具类package com.pc.c..._hutool发送邮件

docker安装elasticsearch,elasticsearch-head,kibana,ik分词器_docker安装kibana连接elasticsearch并且elasticsearch有密码-程序员宅基地

文章浏览阅读867次,点赞2次,收藏2次。docker安装elasticsearch,elasticsearch-head,kibana,ik分词器安装方式基本有两种,一种是pull的方式,一种是Dockerfile的方式,由于pull的方式pull下来后还需配置许多东西且不便于复用,个人比较喜欢使用Dockerfile的方式所有docker支持的镜像基本都在https://hub.docker.com/docker的官网上能找到合..._docker安装kibana连接elasticsearch并且elasticsearch有密码

随便推点

Python 攻克移动开发失败!_beeware-程序员宅基地

文章浏览阅读1.3w次,点赞57次,收藏92次。整理 | 郑丽媛出品 | CSDN(ID:CSDNnews)近年来,随着机器学习的兴起,有一门编程语言逐渐变得火热——Python。得益于其针对机器学习提供了大量开源框架和第三方模块,内置..._beeware

Swift4.0_Timer 的基本使用_swift timer 暂停-程序员宅基地

文章浏览阅读7.9k次。//// ViewController.swift// Day_10_Timer//// Created by dongqiangfei on 2018/10/15.// Copyright 2018年 飞飞. All rights reserved.//import UIKitclass ViewController: UIViewController { ..._swift timer 暂停

元素三大等待-程序员宅基地

文章浏览阅读986次,点赞2次,收藏2次。1.硬性等待让当前线程暂停执行,应用场景:代码执行速度太快了,但是UI元素没有立马加载出来,造成两者不同步,这时候就可以让代码等待一下,再去执行找元素的动作线程休眠,强制等待 Thread.sleep(long mills)package com.example.demo;import org.junit.jupiter.api.Test;import org.openqa.selenium.By;import org.openqa.selenium.firefox.Firefox.._元素三大等待

Java软件工程师职位分析_java岗位分析-程序员宅基地

文章浏览阅读3k次,点赞4次,收藏14次。Java软件工程师职位分析_java岗位分析

Java:Unreachable code的解决方法_java unreachable code-程序员宅基地

文章浏览阅读2k次。Java:Unreachable code的解决方法_java unreachable code

标签data-*自定义属性值和根据data属性值查找对应标签_如何根据data-*属性获取对应的标签对象-程序员宅基地

文章浏览阅读1w次。1、html中设置标签data-*的值 标题 11111 222222、点击获取当前标签的data-url的值$('dd').on('click', function() { var urlVal = $(this).data('ur_如何根据data-*属性获取对应的标签对象

推荐文章

热门文章

相关标签