# Copyright 2011 Revolution Analytics
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Sys.setenv(HADOOP_STREAMING="/usr/lib/hadoop-mapreduce/hadoop-streaming-2.0.0-cdh4.7.0.jar")
library(rmr2)
library(rhdfs)
hdfs.init()
# P tabela com valores
## @knitr kmeans-signature
kmeans.mr = function(P, num.clusters, num.iter, combine, in.memory.combine) {
## @knitr kmeans-dist.fun
# C : Centroides
dist.fun = function(C, P) {apply(C,1, function(x) {colSums((t(P) - x)^2))}}
## @knitr kmeans.map
kmeans.map = function(., P) {
nearest = {if(is.null(C)) sample(1:num.clusters, nrow(P), replace = TRUE)
else {D = dist.fun(C, P) nearest = max.col(-D)}}
if(!(combine || in.memory.combine))
keyval(nearest, P)
else {keyval(nearest, cbind(1, P))}}
## @knitr kmeans.reduce
kmeans.reduce = {
if (!(combine || in.memory.combine) )
function(., P)
t(as.matrix(apply(P, 2, mean)))
else
function(k, P)
keyval(
k,
t(as.matrix(apply(P, 2, sum))))}
## @knitr kmeans-main-1
C = NULL
for(i in 1:num.iter ) {
C =
values(
from.dfs(
mapreduce(
P,
map = kmeans.map,
reduce = kmeans.reduce)))
if(combine || in.memory.combine)
C = C[, -1]/C[, 1]
## @knitr end
# points(C, col = i + 1, pch = 19)
## @knitr kmeans-main-2
if(nrow(C) < num.clusters) {
C =
rbind(
C,
matrix(
rnorm(
(num.clusters -
nrow(C)) * nrow(C)),
ncol = nrow(C)) %*% C) }}
C}
## @knitr end
## sample runs
##
out = list()
for(be in c("local", "hadoop")) {
rmr.options(backend = be)
set.seed(0)
## @knitr kmeans-data
P = do.call(rbind, rep(list(matrix(rnorm(10, sd = 10), ncol=2)),20)) + matrix(rnorm(200), ncol =2)
## @knitr end
# x11()
# plot(P)
# points(P)
out[[be]] =
## @knitr kmeans-run
num.iter = 5,
combine = FALSE,
in.memory.combine = FALSE)
## @knitr end
}
# would love to take this step but kmeans in randomized in a way that makes it hard to be completely reprodubile
# stopifnot(rmr2:::cmp(out[['hadoop']], out[['local']]))