Fork me on GitHub
Skye's Blog

Forever youthful,forever weeping


  • 首页

  • 分类

  • 归档

  • 标签

  • 搜索

JVM内存配置

发表于 2016-07-26 | 分类于 Java |

参数中-vmargs的意思是设置JVM参数,所以后面的其实都是JVM的参数了,我们首先了解一下JVM内存管理的机制,然后再解释每个参数代表的含义。

堆(Heap)和非堆(Non-heap)内存

按照官方的说法:“Java虚拟机具有一个堆,堆是运行时数据区域,所有类实例和数组的内存均从此处分配。堆是在Java虚拟机启动时创建的。”“在JVM中堆之外的内存称为非堆内存(Non-heapmemory)”。可以看出JVM主要管理两种类型的内存:堆和非堆。简单来说堆就是Java代码可及的内存,是留给开发人员使用的;非堆就是JVM留给自己用的,所以方法区、JVM内部处理或优化所需的内存(如JIT编译后的代码缓存)、每个类结构(如运行时常数池、字段和方法数据)以及方法和构造方法的代码都在非堆内存中。

堆内存分配

JVM初始分配的内存由-Xms指定,默认是物理内存的1/64;JVM最大分配的内存由-Xmx指定,默认是物理内存的1/4。默认空余堆内存小于40%时,JVM就会增大堆直到-Xmx的最大限制;空余堆内存大于70%时,JVM会减少堆直到-Xms的最小限制。因此服务器一般设置-Xms、-Xmx相等以避免在每次GC后调整堆的大小。

非堆内存分配

JVM使用-XX:PermSize设置非堆内存初始值,默认是物理内存的1/64;由XX:MaxPermSize设置最大非堆内存的大小,默认是物理内存的1/4。

JVM内存限制(最大值)

首先JVM内存限制于实际的最大物理内存(废话!呵呵),假设物理内存无限大的话,JVM内存的最大值跟操作系统有很大的关系。简单的说就32位处理器虽然可控内存空间有4GB,但是具体的操作系统会给一个限制,这个限制一般是2GB-3GB(一般来说Windows系统下为1.5G-2G,Linux系统下为2G-3G),而64bit以上的处理器就不会有限制了。

减少jvm内存回收引起的eclipse卡的问题

这个主要是jvm在client模式,进行内存回收时,会停下所有的其它工作,带回收完毕才去执行其它任务,在这期间eclipse就卡住了。所以适当的增加jvm申请的内存大小来减少其回收的次数甚至不回收,就会是卡的现象有明显改善。

主要通过以下的几个jvm参数来设置堆内存的:

  • -Xmx512m 最大总堆内存,一般设置为物理内存的1/4
  • -Xms512m 初始总堆内存,一般将它设置的和最大堆内存一样大,这样就不需要根据当前堆使用情况而调整堆的大小了
  • -Xmn192m 年轻带堆内存,sun官方推荐为整个堆的3/8
  • 堆内存的组成 总堆内存 = 年轻带堆内存 + 年老带堆内存 + 持久带堆内存
  • 年轻带堆内存 对象刚创建出来时放在这里
  • 年老带堆内存 对象在被真正会回收之前会先放在这里
  • 持久带堆内存 class文件,元数据等放在这里
  • -XX:PermSize=128m 持久带堆的初始大小
  • -XX:MaxPermSize=128m 持久带堆的最大大小,eclipse默认为256m。如果要编译jdk这种,一定要把这个设的很大,因为它的类太多了。

eclipse运行配置

Eclipse -> Run -> Run Configurations -> Arguments -> VM arguments
或者 Run as -> Run Configurations -> Arguments -> VM arguments
-Xms2048m -Xmx2048m

-Xms是设置内存初始化的大小(如上面的2048m)
-Xmx是设置最大能够使用内存的大小(如上面的2048m, 最好不要超过物理内存)
也可通过 eclipse.ini配置

参考文章:
Eclipse中进行JVM内存设置
JVM监控与调优
JVM调优总结(这个总结得比较全面)
JVM性能调优

Mahout Item-based推荐的分布式实现

发表于 2016-07-25 | 分类于 大数据 |

Mahout API地址:http://apache.github.io/mahout/0.10.1/docs/mahout-mr/overview-summary.html

Mahout推荐算法API详解

Mahout算法框架自带的推荐器有下面这些:

  • GenericUserBasedRecommender:基于用户的推荐器,用户数量少时速度快;
  • GenericItemBasedRecommender:基于商品推荐器,商品数量少时速度快,尤其当外部提供了商品相似度数据后效率更好;
  • SlopeOneRecommender:基于slope-one算法的推荐器,在线推荐或更新较快,需要事先大量预处理运算,物品数量少时较好;
  • SVDRecommender:奇异值分解,推荐效果较好,但之前需要大量预处理运算;
  • KnnRecommender:基于k近邻算法(KNN),适合于物品数量较小时;
  • TreeClusteringRecommender:基于聚类的推荐器,在线推荐较快,之前需要大量预处理运算,用户数量较少时效果好;
  • Mahout最常用的三个推荐器是上述的前三个,本文主要讨论前两种的使用。

接口相关介绍

基于用户或物品的推荐器主要包括以下几个接口:

DataModel 是用户喜好信息的抽象接口,它的具体实现支持从任意类型的数据源抽取用户喜好信息。Taste 默认提供 JDBCDataModel 和 FileDataModel,分别支持从数据库和文件中读取用户的喜好信息。
UserSimilarity 和 ItemSimilarity。UserSimilarity 用于定义两个用户间的相似度,它是基于协同过滤的推荐引擎的核心部分,可以用来计算用户的“邻居”,这里我们将与当前用户口味相似的用户称为他的邻居。ItemSimilarity 类似的,计算内容之间的相似度。
UserNeighborhood 用于基于用户相似度的推荐方法中,推荐的内容是基于找到与当前用户喜好相似的邻居用户的方式产生的。UserNeighborhood 定义了确定邻居用户的方法,具体实现一般是基于 UserSimilarity 计算得到的。
Recommender 是推荐引擎的抽象接口,Taste 中的核心组件。程序中,为它提供一个 DataModel,它可以计算出对不同用户的推荐内容。实际应用中,主要使用它的实现类 GenericUserBasedRecommender 或者 GenericItemBasedRecommender,分别实现基于用户相似度的推荐引擎或者基于内容的推荐引擎。
RecommenderEvaluator:评分器。
RecommenderIRStatsEvaluator:搜集推荐性能相关的指标,包括准确率、召回率等等。
目前,Mahout为DataModel提供了以下几种实现:

  • org.apache.mahout.cf.taste.impl.model.GenericDataModel
  • org.apache.mahout.cf.taste.impl.model.GenericBooleanPrefDataModel
  • org.apache.mahout.cf.taste.impl.model.PlusAnonymousUserDataModel
  • org.apache.mahout.cf.taste.impl.model.file.FileDataModel
  • org.apache.mahout.cf.taste.impl.model.hbase.HBaseDataModel
  • org.apache.mahout.cf.taste.impl.model.cassandra.CassandraDataModel
  • org.apache.mahout.cf.taste.impl.model.mongodb.MongoDBDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.SQL92JDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.MySQLJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.PostgreSQLJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.GenericJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.SQL92BooleanPrefJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.MySQLBooleanPrefJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.PostgreBooleanPrefSQLJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.ReloadFromJDBCDataModel
    从类名上就可以大概猜出来每个DataModel的用途,奇怪的是竟然没有HDFS的DataModel,有人实现了一个,请参考MAHOUT-1579。

UserSimilarity 和 ItemSimilarity 相似度实现有以下几种:

  • CityBlockSimilarity:基于Manhattan距离相似度
  • EuclideanDistanceSimilarity:基于欧几里德距离计算相似度
  • LogLikelihoodSimilarity:基于对数似然比的相似度
  • PearsonCorrelationSimilarity:基于皮尔逊相关系数计算相似度
  • SpearmanCorrelationSimilarity:基于皮尔斯曼相关系数相似度
  • TanimotoCoefficientSimilarity:基于谷本系数计算相似度
  • UncenteredCosineSimilarity:计算 Cosine 相似度
    以上相似度的说明,请参考Mahout推荐引擎介绍。

UserNeighborhood 主要实现有两种:

  • NearestNUserNeighborhood:对每个用户取固定数量N个最近邻居
  • ThresholdUserNeighborhood:对每个用户基于一定的限制,取落在相似度限制以内的所有用户为邻居

Recommender分为以下几种实现:

  • GenericUserBasedRecommender:基于用户的推荐引擎
  • GenericBooleanPrefUserBasedRecommender:基于用户的无偏好值推荐引擎
  • GenericItemBasedRecommender:基于物品的推荐引擎
  • GenericBooleanPrefItemBasedRecommender:基于物品的无偏好值推荐引擎

RecommenderEvaluator有以下几种实现:

  • AverageAbsoluteDifferenceRecommenderEvaluator:计算平均差值
  • RMSRecommenderEvaluator:计算均方根差
  • RecommenderIRStatsEvaluator的实现类是GenericRecommenderIRStatsEvaluator。

单机运行

首先,需要在maven中加入对mahout的依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-integration</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-examples</artifactId>
<version>0.9</version>
</dependency>

基于用户的推荐,以FileDataModel为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
File modelFile modelFile = new File("intro.csv");
DataModel model = new FileDataModel(modelFile);
//用户相似度,使用基于皮尔逊相关系数计算相似度
UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
//选择邻居用户,使用NearestNUserNeighborhood实现UserNeighborhood接口,选择邻近的4个用户
UserNeighborhood neighborhood = new NearestNUserNeighborhood(4, similarity, model);
Recommender recommender = new GenericUserBasedRecommender(model, neighborhood, similarity);
//给用户1推荐4个物品
List<RecommendedItem> recommendations = recommender.recommend(1, 4);
for (RecommendedItem recommendation : recommendations) {
System.out.println(recommendation);
}

注意: FileDataModel要求输入文件中的字段分隔符为逗号或者制表符,如果你想使用其他分隔符,你可以扩展一个FileDataModel的实现,例如,mahout中已经提供了一个解析MoiveLens的数据集(分隔符为::)的实现GroupLensDataModel。

GenericUserBasedRecommender是基于用户的简单推荐器实现类,推荐主要参照传入的DataModel和UserNeighborhood,总体是三个步骤:

  1. 从UserNeighborhood获取当前用户Ui最相似的K个用户集合{U1, U2, …Uk};
  2. 从这K个用户集合排除Ui的偏好商品,剩下的Item集合为{Item0, Item1, …Itemm};
  3. 对Item集合里每个Itemj计算Ui可能偏好程度值pref(Ui, Itemj),并把Item按此数值从高到低排序,前N个item推荐给用户Ui。
  4. 对相同用户重复获得推荐结果,我们可以改用CachingRecommender来包装GenericUserBasedRecommender对象,将推荐结果缓存起来:

Recommender cachingRecommender = new CachingRecommender(recommender);

上面代码可以在main方法中直接运行,然后,我们可以获取推荐模型的评分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//使用平均绝对差值获得评分
RecommenderEvaluator evaluator = new AverageAbsoluteDifferenceRecommenderEvaluator();
// 用RecommenderBuilder构建推荐引擎
RecommenderBuilder recommenderBuilder = new RecommenderBuilder() {
@Override
public Recommender buildRecommender(DataModel model) throws TasteException {
UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
UserNeighborhood neighborhood = new NearestNUserNeighborhood(4, similarity, model);
return new GenericUserBasedRecommender(model, neighborhood, similarity);
}
};
// Use 70% of the data to train; test using the other 30%.
double score = evaluator.evaluate(recommenderBuilder, null, model, 0.7, 1.0);
System.out.println(score);

接下来,可以获取推荐结果的查准率和召回率:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
RecommenderIRStatsEvaluator statsEvaluator = new GenericRecommenderIRStatsEvaluator();
// Build the same recommender for testing that we did last time:
RecommenderBuilder recommenderBuilder = new RecommenderBuilder() {
@Override
public Recommender buildRecommender(DataModel model) throws TasteException {
UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
UserNeighborhood neighborhood = new NearestNUserNeighborhood(4, similarity, model);
return new GenericUserBasedRecommender(model, neighborhood, similarity);
}
};
// 计算推荐4个结果时的查准率和召回率
IRStatistics stats = statsEvaluator.evaluate(recommenderBuilder,null, model, null, 4,
GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD,1.0);
System.out.println(stats.getPrecision());
System.out.println(stats.getRecall());

如果是基于物品的推荐,代码大体相似,只是没有了UserNeighborhood,然后将上面代码中的User换成Item即可,完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
File modelFile modelFile = new File("intro.csv");
DataModel model = new FileDataModel(new File(file));
// Build the same recommender for testing that we did last time:
RecommenderBuilder recommenderBuilder = new RecommenderBuilder() {
@Override
public Recommender buildRecommender(DataModel model) throws TasteException {
ItemSimilarity similarity = new PearsonCorrelationSimilarity(model);
return new GenericItemBasedRecommender(model, similarity);
}
};
//获取推荐结果
List<RecommendedItem> recommendations = recommenderBuilder.buildRecommender(model).recommend(1, 4);
for (RecommendedItem recommendation : recommendations) {
System.out.println(recommendation);
}
//计算评分
RecommenderEvaluator evaluator =
new AverageAbsoluteDifferenceRecommenderEvaluator();
// Use 70% of the data to train; test using the other 30%.
double score = evaluator.evaluate(recommenderBuilder, null, model, 0.7, 1.0);
System.out.println(score);
//计算查全率和查准率
RecommenderIRStatsEvaluator statsEvaluator = new GenericRecommenderIRStatsEvaluator();
// Evaluate precision and recall "at 2":
IRStatistics stats = statsEvaluator.evaluate(recommenderBuilder,
null, model, null, 4,
GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD,
1.0);
System.out.println(stats.getPrecision());
System.out.println(stats.getRecall());

在Spark中运行

在Spark中运行,需要将Mahout相关的jar添加到Spark的classpath中,修改/etc/spark/conf/spark-env.sh,添加下面两行代码:

1
2
SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:/usr/lib/mahout/lib/*"
SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:/usr/lib/mahout/*"

然后,以本地模式在spark-shell中运行下面代码交互测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//注意:这里是本地目录
val model = new FileDataModel(new File("intro.csv"))
val evaluator = new RMSRecommenderEvaluator()
val recommenderBuilder = new RecommenderBuilder {
override def buildRecommender(dataModel: DataModel): Recommender = {
val similarity = new LogLikelihoodSimilarity(dataModel)
new GenericItemBasedRecommender(dataModel, similarity)
}
}
val score = evaluator.evaluate(recommenderBuilder, null, model, 0.95, 0.05)
println(s"Score=$score")
val recommender=recommenderBuilder.buildRecommender(model)
val users=trainingRatings.map(_.user).distinct().take(20)
import scala.collection.JavaConversions._
val result=users.par.map{user=>
user+","+recommender.recommend(user,40).map(_.getItemID).mkString(",")
}

https://github.com/sujitpal/mia-scala-examples上面有一个评估基于物品或是用户的各种相似度下的评分的类,叫做 RecommenderEvaluator,供大家学习参考。

分布式运行

Mahout提供了org.apache.mahout.cf.taste.hadoop.item.RecommenderJob类以MapReduce的方式来实现基于物品的协同过滤,查看该类的使用说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ubuntu@Master:~/data$ mahout org.apache.mahout.cf.taste.hadoop.item.RecommenderJob
MAHOUT_LOCAL is not set; adding HADOOP_CONF_DIR to classpath.
Running on hadoop, using /home/ubuntu/hadoop/bin/hadoop and HADOOP_CONF_DIR=/home/ubuntu/hadoop/etc/hadoop
MAHOUT-JOB: /home/ubuntu/apache-mahout-distribution-0.10.1/mahout-examples-0.10.1-job.jar
16/07/25 07:41:32 WARN driver.MahoutDriver: No org.apache.mahout.cf.taste.hadoop.item.RecommenderJob.props found on classpath, will use command-line arguments only
16/07/25 07:41:33 ERROR common.AbstractJob: Missing required option --similarityClassname
Missing required option --similarityClassname
Usage:
[--input <input> --output <output> --numRecommendations <numRecommendations>
--usersFile <usersFile> --itemsFile <itemsFile> --filterFile <filterFile>
--userItemFile <userItemFile> --booleanData <booleanData> --maxPrefsPerUser
<maxPrefsPerUser> --minPrefsPerUser <minPrefsPerUser> --maxSimilaritiesPerItem
<maxSimilaritiesPerItem> --maxPrefsInItemSimilarity <maxPrefsInItemSimilarity>
--similarityClassname <similarityClassname> --threshold <threshold>
--outputPathForSimilarityMatrix <outputPathForSimilarityMatrix> --randomSeed
<randomSeed> --sequencefileOutput --help --tempDir <tempDir> --startPhase
<startPhase> --endPhase <endPhase>]
--similarityClassname (-s) similarityClassname Name of distributed
similarity measures class to
instantiate, alternatively
use one of the predefined
similarities
([SIMILARITY_COOCCURRENCE,
SIMILARITY_LOGLIKELIHOOD,
SIMILARITY_TANIMOTO_COEFFICIEN
T, SIMILARITY_CITY_BLOCK,
SIMILARITY_COSINE,
SIMILARITY_PEARSON_CORRELATION
,
SIMILARITY_EUCLIDEAN_DISTANCE]
)

也可输入mahout org.apache.mahout.cf.taste.hadoop.item.RecommenderJob --help查看详细说明

可见,该类可以接收的命令行参数如下:

--input(path)(-i): 存储用户偏好数据的目录,该目录下可以包含一个或多个存储用户偏好数据的文本文件;
--output(path)(-o): 结算结果的输出目录
--numRecommendations (integer): 为每个用户推荐的item数量,默认为10
--usersFile (path): 指定一个包含了一个或多个存储userID的文件路径,仅为该路径下所有文件包含的userID做推荐计算 (该选项可选)
--itemsFile (path): 指定一个包含了一个或多个存储itemID的文件路径,仅为该路径下所有文件包含的itemID做推荐计算 (该选项可选)
--filterFile (path): 指定一个路径,该路径下的文件包含了[userID,itemID]值对,userID和itemID用逗号分隔。计算结果将不会为user推荐[userID,itemID]值对中包含的item (该选项可选)
--booleanData (boolean): 如果输入数据不包含偏好数值,则将该参数设置为true,默认为false
--maxPrefsPerUser (integer): 在最后计算推荐结果的阶段,针对每一个user使用的偏好数据的最大数量,默认为10
--minPrefsPerUser (integer): 在相似度计算中,忽略所有偏好数据量少于该值的用户,默认为1
--maxSimilaritiesPerItem (integer): 针对每个item的相似度最大值,默认为100
--maxPrefsPerUserInItemSimilarity (integer): 在item相似度计算阶段,针对每个用户考虑的偏好数据最大数量,默认为1000
--similarityClassname (classname): 向量相似度计算类
outputPathForSimilarityMatrix:SimilarityMatrix输出目录
--randomSeed:随机种子 –sequencefileOutput:序列文件输出路径
--tempDir (path): 存储临时文件的目录,默认为当前用户的home目录下的temp目录
--startPhase
--endPhase
--threshold (double): 忽略相似度低于该阀值的item对

一个例子如下,使用SIMILARITY_LOGLIKELIHOOD相似度推荐物品:

1
$ hadoop jar /usr/lib/mahout/mahout-examples-0.9-cdh5.4.0-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob --input /tmp/mahout/part-00000 --output /tmp/mahout-out -s SIMILARITY_LOGLIKELIHOOD

自己运行的例子如下:

部分实验数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
1 25 0.0136316222
1 116 0.0090877481
1 5 0.0045438741
1 23 0.1862988368
1 17 0.0272632444
1 3 1.4122360602
1 11 0.0363509925
1 12 0.4543874068
1 120 0.0027263244
1 93 0.0136316222
1 21 0.0036350993
1 6 0.7688234922
1 47 0.0018175496
1 66 0.0454387407
1 27 0.0254456948
1 44 0.0245369200
1 315 0.0045438741
1 28 0.0545264888
1 138 0.0636142369
1 108 0.0045438741
1 1 7.2695320732
1 85 12.5188577359
1 168 0.0545264888
10 4 0.6772009029
10 6 0.6772009029
10 217 0.0112866817
10 2 1.7607223476
10 1 1.8735891648
100 25 0.4788867023
100 5 0.0047793084
100 17 0.2915378128
100 26 0.0047793084
100 3 0.1194827101
100 11 0.6987348890
100 4 0.6652797301
100 12 0.0047793084
100 30 0.0736013495
100 32 0.5257239247
100 31 0.0076468934
100 37 0.0430137757
100 29 0.0592634242
100 44 0.0009558617
100 13 4.4313747540
100 1 9.5461906101
100 10 0.0439696373
1000 8 0.2902055623
1000 14 0.0483675937
1000 5 0.0725513906
1000 9 0.0725513906
1000 26 0.3869407497
1000 3 0.1451027811
1000 436 0.0120918984
1000 2 3.9177750907
1000 15 2.4304715840

1
ubuntu@Master:~/data$ mahout org.apache.mahout.cf.taste.hadoop.item.RecommenderJob -i /test/item/ckm_pre_result1000000.txt -o /test/item/outputPersonCorr --similarityClassname org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.PearsonCorrelationSimilarity

部分结果:

1
2
3
4
5
1 [196:10.482155,14:9.271373,145:8.875873,177:7.779633,360:7.537198,114:7.1917353,635:7.085346,75:6.8879533,235:6.796164,210:6.586777]
2 [386:4.339762,631:4.2806735,194:4.274664,153:4.1018524,362:3.7975848,934:3.422003,195:3.0110214,188:2.7676048,30:2.6990044,746:2.693153]
3 [45:4.2422075,212:4.1731844,270:3.9618893,309:3.960001,204:3.933118,275:3.6498196,321:3.6286862,179:3.487534,240:3.3450491,170:3.28568]
4 [293:4.3900704,746:3.9469879,51:3.3795352,52:3.3444872,312:2.818981,24:2.719058,649:2.2690945,28:2.1947412,196:2.170363,145:2.008545]
5 [590:1.1531421,332:1.1508745,336:1.134177,852:1.1335075,561:1.121143,36:1.1099223,535:1.0878772,129:1.0850264,236:1.0413511,83:1.0349866]

默认情况下,mahout使用的reduce数目为1,这样造成大数据处理时效率较低,可以通过参数mahout执行脚本中的MAHOUT_OPTS中的-Dmapred.reduce.tasks参数指定reduce数目。

上面命令运行完成之后,会在当前用户的hdfs主目录生成temp目录,该目录可由–tempDir (path)参数设置:

1
2
3
4
5
6
7
8
9
10
11
12
$ hadoop fs -ls temp
Found 10 items
-rw-r--r-- 3 root hadoop 7 2015-06-10 14:42 temp/maxValues.bin
-rw-r--r-- 3 root hadoop 5522717 2015-06-10 14:42 temp/norms.bin
drwxr-xr-x - root hadoop 0 2015-06-10 14:41 temp/notUsed
-rw-r--r-- 3 root hadoop 7 2015-06-10 14:42 temp/numNonZeroEntries.bin
-rw-r--r-- 3 root hadoop 3452222 2015-06-10 14:41 temp/observationsPerColumn.bin
drwxr-xr-x - root hadoop 0 2015-06-10 14:47 temp/pairwiseSimilarity
drwxr-xr-x - root hadoop 0 2015-06-10 14:52 temp/partialMultiply
drwxr-xr-x - root hadoop 0 2015-06-10 14:39 temp/preparePreferenceMatrix
drwxr-xr-x - root hadoop 0 2015-06-10 14:50 temp/similarityMatrix
drwxr-xr-x - root hadoop 0 2015-06-10 14:42 temp/weights

观察yarn的管理界面,该命令会生成9个任务,任务名称依次是:

  • PreparePreferenceMatrixJob-ItemIDIndexMapper-Reducer
  • PreparePreferenceMatrixJob-ToItemPrefsMapper-Reducer
  • PreparePreferenceMatrixJob-ToItemVectorsMapper-Reducer
  • RowSimilarityJob-CountObservationsMapper-Reducer
  • RowSimilarityJob-VectorNormMapper-Reducer
  • RowSimilarityJob-CooccurrencesMapper-Reducer
  • RowSimilarityJob-UnsymmetrifyMapper-Reducer
  • partialMultiply
  • RecommenderJob-PartialMultiplyMapper-Reducer

从任务名称,大概可以知道每个任务在做什么,如果你的输入参数不一样,生成的任务数可能不一样,这个需要测试一下才能确认。

在hdfs上查看输出的结果,用户和推荐结果用\t分隔,推荐结果中物品之间用逗号分隔,物品后面通过冒号连接评分:

1
2
843 [10709679:4.8334665,8389878:4.833426,9133835:4.7503786,10366169:4.7503185,9007487:4.750272,8149253:4.7501993,10366165:4.750115,9780049:4.750108,8581254:4.750071,10456307:4.7500467]
6253 [10117445:3.0375953,10340299:3.0340924,8321090:3.0340924,10086615:3.032164,10436801:3.0187714,9668385:3.0141575,8502110:3.013954,10476325:3.0074399,10318667:3.0004222,8320987:3.0003839]

使用Java API方式执行,请参考Mahout分步式程序开发 基于物品的协同过滤ItemCF。

在Scala或者Spark中,可以以Java API或者命令方式运行,最后还可以通过Spark来处理推荐的结果,例如:过滤、去重、补足数据,这部分内容不做介绍。

本文基本转载自:转载自JavaChen Blog,作者:JavaChen,文章地址http://blog.javachen.com/2015/06/10/collaborative-filtering-using-mahout.html

其他参考资料:

  • 用Hadoop构建电影推荐系统(自己实现分布式)
  • 用Mahout构建职位推荐引擎(单机)
  • Mahout构建图书推荐系统(单机)
  • Mahout分步式程序开发 基于物品的协同过滤ItemCF(调用接口)
  • mahout分布式:Item-based推荐
  • Introduction to Item-Based Recommendations with Hadoop
  • 使用Mahout搭建推荐系统之入门篇4-Mahout实战
  • 基于MapReduce的ItemBase推荐算法的共现矩阵实现

VMWare虚拟机:“锁定文件失败”的解决方法

发表于 2016-07-19 | 分类于 计算机技巧 |

问题描述

如果使用VMWare虚拟机的时候突然系统崩溃蓝屏或者强行关机,有一定几率会导致无法启动,会提示:锁定文件失败,打不开磁盘或快照所依赖的磁盘。

虚拟磁盘(.vmdk)本身有一个磁盘保护机制,为了防止多台虚拟机同时访问同一个虚拟磁盘(.vmdk)带来的数据丢失和性能削减方面的隐患,每次启动虚拟机的时候虚拟机会使用扩展名为.lck(磁盘锁)文件对虚拟磁盘(.vmdk)进行锁定保护。当虚拟机关闭时.lck(磁盘锁)文件自动删除。但是可能由于您非正常关闭虚拟机,这时虚拟机还没来得及删除您系统上的.lck(磁盘锁)文件,所以当下次您启动虚拟机的时候出现了上述错误。

解决方法

打开你存放虚拟机系统文件的文件夹,注意,是系统文件,不是虚拟机的安装目录。搜索关键字*.lck,删除搜索到的文件即可。

亚马逊EC2搭建Shadowsocks

发表于 2016-07-19 | 分类于 计算机技巧 |

在网上寻找优秀的科学上网方式中发现,自己搭建Shadowscoks是非常不错的方式,并且AWS有一年的免费使用期限,到期后再买别的VPS就行了。
https://segmentfault.com/a/1190000003101075
上面连接的教程从注册申请AWS到搭建服务已经非常详细,自己再做一些补充。

服务器端配置

在EC2上创建好Linux服务器后(本人为Ubuntu)需要对其安装环境
参考官方文档使用 PuTTY 从 Windows 连接到 Linux 实例连接服务器

安装shadowsocks依赖

  1. sudo -s // 获取超级管理员权限
  2. apt-get update // 更新apt-get
  3. apt-get install python-pip // 安装python包管理工具pip
  4. pip install shadowsocks // 安装shadowsocks

配置shadowsocks

vim /etc/shadowsocks.json

单一端口配置

1
2
3
4
5
6
7
8
9
10
{
"server":"server_ip", #EC2实例的IP,注意这里我们不能填写公有IP,需要填写私有IP或者0.0.0.0 填0.0.0.0即可
"server_port":8388, #server端监听的端口,需要在EC2实例中开放此端口
"local_address": "127.0.0.1",
"local_port":1080,
"password":"password", #密码
"timeout":300,
"method":"aes-256-cfb", #加密方式
"fast_open": false #是否开启fast open
}

如果想要把VPN分享给其它人而不泄露自己的密码,也可以在配置文件中设置多端口+多密码的模式,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"server":"server_ip", #EC2实例的IP,注意这里我们不能填写公有IP,需要填写私有IP或者0.0.0.0
"local_address": "127.0.0.1",
"local_port":1080,
"port_password":
{
"8088”: “password8088”,
"8089”: "password8089”
}
"timeout":300,
"method":"aes-256-cfb", #加密方式
"fast_open": false #是否开启fast open
}

配置完成后启动Shawdowsocks

1
2
3
4
启动:ssserver -c /etc/shadowsocks.json -d start
停止:ssserver -c /etc/shadowsocks.json -d stop
重启:ssserver -c /etc/shadowsocks.json -d restart
查看状态:ssserver -c /etc/shadowsocks.json -d status

关闭服务器防火墙

sudo ufw disable

开启AWS入站端口

配置好shaodowsocks后,还需要将配置中的端口打开,这样客户端的服务才能链接得上EC2中的shadowsocks服务。
在EC2网页中编辑入站规则将配置文件中的端口号(如8388)加入入站规则。
服务器端配置完毕。

客户端下载

shadowsocks下载地址1
shadowsocks下载地址2
windows10尽量使用2.3版本,否则可能出现500或者502错误
http://pan.baidu.com/s/1hqIk4mS

500或者502错误

  • 使用2.3版本
  • 或者尝试以下命令
    1
    2
    3
    netsh interface ipv4 reset
    netsh interface ipv6 reset
    netsh winsock reset

window下搭建eclipse运行MapReduce环境

发表于 2016-07-13 | 分类于 大数据 |

系统环境及所需文件

  • eclipse-jee-mars-2
  • hadoop2.7.2
  • hadoop-eclipse-plugin
  • hadoop.dll & winutils.exe

修改Master节点的hdfs-site.xml

<property>      
    <name>dfs.permissions</name>      
    <value>false</value>  
</property> 

旨在取消权限检查

1
2
3
4
<property>
<name>dfs.web.ugi</name>
<value>Skye,supergroup</value>
</property>

配置Hadoop插件

  1. windows下载hadoop-2.7.2解压到某目录下,如:E:\hadoop\hadoop-2.7.2
  2. 下载hadoop-eclipse-plugin插件hadoop-eclipse-plugin,将release目录下的hadoop-eclipse-plugin-2.6.0.jar拷贝到eclipse/plugins,重启eclipse。
  3. 插件配置windows->show view->other 显示mapreduce视图
  4. window->preferences->hadoop map/reduce 指定windows上的hadoop根目录(即:E:\hadoop\hadoop-2.7.2)
  5. 在Map/Reduce Locations 面板中,点击小象图标定义hadoop

    解释:
    MapReduce Master
    Host:虚拟机hadoop master对应ip
    Port:hdfs-site.xml中dfs.datanode.ipc.address指定的的端口号。此处填9001
    DFS Master中Port:core-site.xml中fs.defaultFS指定的端口。应填9000
    User name:linux中运行hadoop的用户。

配置完毕查看结果

windows下运行环境配置

  1. 在系统环境变量中增加HADOOP_HOME,并在Path中加入%HADOOP_HOME%\bin
  2. 将下载下来的hadoop.dll,winutils.exe拷贝到HADOOP_HOME/bin目录下

创建 MapReduce工程并运行

需要拷贝 服务器hadoop中的log4j.properties文件到工程的src目录

run on hadoop

运行时报如下错误,弄了好长一段时间,发现原因是服务器通过内网ip访问,外网无法解析。用虚拟机连接成功.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
16/07/13 10:42:38 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
16/07/13 10:42:39 INFO mapreduce.Job: Job job_local510776960_0001 running in uber mode : false
16/07/13 10:42:39 INFO mapreduce.Job: map 0% reduce 0%
16/07/13 10:42:39 INFO mapred.Task: Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@3bfe5dd7
16/07/13 10:42:39 INFO mapred.MapTask: Processing split: hdfs://Master:9000/test/test3.txt:0+259
16/07/13 10:42:39 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/07/13 10:42:39 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/07/13 10:42:39 INFO mapred.MapTask: soft limit at 83886080
16/07/13 10:42:39 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/07/13 10:42:39 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/07/13 10:42:39 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/07/13 10:43:00 WARN hdfs.BlockReaderFactory: I/O error constructing remote block reader.
java.net.ConnectException: Connection timed out: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:656)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:59)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:91)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
16/07/13 10:43:00 WARN hdfs.DFSClient: Failed to connect to /10.0.0.14:50010 for block, add to deadNodes and continue. java.net.ConnectException: Connection timed out: no further information
java.net.ConnectException: Connection timed out: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:656)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:59)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:91)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

mapreduce任务运行时shuffle Error

发表于 2016-07-11 | 分类于 大数据 |

本文引用参考:MapReduce任务Shuffle Error错误
相关参考连接: yarn & mapreduce 配置参数总结

错误描述

在运行MapReduce任务的时候,出现如下错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#1
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56)
at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46)
at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63)
at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297)
at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)

解决方案

根据《Hadoop:The Definitive Guide 4th Edition》所述(P203-219),map任务和reduce任务之间要经过一个shuffle过程,该过程复制map任务的输出作为reduce任务的输入。
具体的来说,shuffle过程的输入是:map任务的输出文件,它的输出接收者是:运行reduce任务的机子上的内存buffer,并且shuffle过程以并行方式运行。
参数mapreduce.reduce.shuffle.input.buffer.percent控制运行reduce任务的机子上多少比例的内存用作上述buffer(默认值为0.70),参数mapreduce.reduce.shuffle.parallelcopies控制shuffle过程的并行度(默认值为5)。那么”mapreduce.reduce.shuffle.input.buffer.percent” * “mapreduce.reduce.shuffle.parallelcopies” 必须小于等于1,否则就会出现如上错误
因此,我将mapreduce.reduce.shuffle.input.buffer.percent设置成值为0.1,就可以正常运行了(设置成0.2,还是会抛同样的错)

job.getConfiguration().setStrings("mapreduce.reduce.shuffle.input.buffer.percent", "0.1");
或者在maperd-site.xml中修改

1
2
3
4
<property>
<name>mapreduce.reduce.input.buffer.percent</name>
<value>0.0</value>
</property>

另外,可以发现如果使用两个参数的默认值,那么两者乘积为3.5,大大大于1了,为什么没有经常抛出以上的错误呢?
1)首先,把默认值设为比较大,主要是基于性能考虑,将它们设为比较大,可以大大加快从map复制数据的速度
2)其次,要抛出如上异常,还需满足另外一个条件,就是map任务的数据一下子准备好了等待shuffle去复制,在这种情况下,就会导致shuffle过程的“线程数量”和“内存buffer使用量”都是满负荷的值,自然就造成了内存不足的错误;而如果map任务的数据是断断续续完成的,那么没有一个时刻shuffle过程的“线程数量”和“内存buffer使用量”是满负荷值的,自然也就不会抛出如上错误

另外,如果在设置以上参数后,还是出现错误,那么有可能是运行Reduce任务的进程的内存总量不足,可以通过mapred.child.java.opts参数来调节,比如设置mapred.child.java.opts=-Xmx2024m

MapReduce 多文件输入

发表于 2016-06-16 | 分类于 大数据 |

多路径输入

  1. FileInputFormat.addInputPath 多次调用加载不同路径

    1
    2
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileInputFormat.addInputPath(job, new Path(args[1]));
  2. FileInputFormat.addInputPaths一次调用加载 多路径字符串用逗号隔开

    1
    FileInputFormat.addInputPaths(job, "hdfs://master:9000/cs/path1,hdfs://RS5-112:9000/cs/path2");
  3. 多种输入**MultipleInputs可以加载不同路径的输入文件,并且每个路径可用不同的

    1
    2
    maperMultipleInputs.addInputPath(job, new Path("hdfs://master:9000/cs/path1"), TextInputFormat.class,MultiTypeFileInput1Mapper.class);
    MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/cs/path3"), TextInputFormat.class,MultiTypeFileInput3Mapper.class);

网上例子:

package example;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 多类型文件输入
* @author lijl
*
*/

public class MultiTypeFileInputMR {
static class MultiTypeFileInput1Mapper extends Mapper<LongWritable, Text, Text, Text>{
public void map(LongWritable key,Text value,Context context){
try {
String[] str = value.toString().split("\\|");
context.write(new Text(str[0]), new Text(str[1]));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class MultiTypeFileInput3Mapper extends Mapper<LongWritable, Text, Text, Text>{
public void map(LongWritable key,Text value,Context context){
try {
String[] str = value.toString().split("");
context.write(new Text(str[0]), new Text(str[1]));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class MultiTypeFileInputReducer extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key,Iterable<Text> values,Context context){
try {
for(Text value:values){
context.write(key,value);
}

} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("mapred.textoutputformat.separator", ",");
Job job = new Job(conf,"MultiPathFileInput");
job.setJarByClass(MultiTypeFileInputMR.class);
FileOutputFormat.setOutputPath(job, new Path("hdfs://RS5-112:9000/cs/path6"));

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setReducerClass(MultiTypeFileInputReducer.class);
job.setNumReduceTasks(1);
MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"), TextInputFormat.class,MultiTypeFileInput1Mapper.class);
MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path3"), TextInputFormat.class,MultiTypeFileInput3Mapper.class);
System.exit(job.waitForCompletion(true)?0:1);
}

}

自己例子

QLMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.hdu.mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class QLMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
String[] mbUnlike = { "盒子", "助手", "输入法", "平台" };
String mbdylxLike = "游戏";
String mbdylxUnlike = "网页游戏";
String delWeb = "访问网站";
Text outputValue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
// 接收数据v1
String line = value.toString();
// 切分数据
String[] words = line.split("");
// String[] words = line.split("\t");
boolean flag = true;
for (int i = 0; i < 4; i++) {
if (words.length < 5) { // 过滤 长度小于4的信息 即访问网站等
flag = false;
break;
}
if (words[3].indexOf(mbUnlike[i]) != -1) { // 有其中一个则为false
flag = false;
break;
}
}
if (flag == true) {
if (words[4].indexOf(mbdylxUnlike) != -1) { // 有网页游戏则为false
flag = false;
} else if (words[4].indexOf(mbdylxLike) == -1) { // 没有游戏则为false
flag = false;
}
}
if (flag == true) {
outputValue.set(line);
context.write(outputValue, new LongWritable(1L));
}
}
}

QLReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.hdu.mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class QLReducer extends Reducer<Text, LongWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 接收数据
// 输出
context.write(key, NullWritable.get());
}
}

DataClean.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.hdu.mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class QLReducer extends Reducer<Text, LongWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 接收数据
// 输出
context.write(key, NullWritable.get());
}
}

HBase集群搭建

发表于 2016-06-16 | 分类于 大数据 |

1. 上传hbase安装包

2. 解压

3. 配置hbase集群,要修改3个文件(首先zk集群已经安装好了)

注意:要把hadoop的hdfs-site.xml和core-site.xml 放到hbase/conf下

vim hbase-env.sh

export JAVA_HOME=/usr/java/jdk1.7.0_55
//告诉hbase使用外部的zk 
export HBASE_MANAGES_ZK=false

vim hbase-site.xml

 <configuration>
    <!-- 指定hbase在HDFS上存储的路径 -->
    <property>
            <name>hbase.rootdir</name>
            <value>hdfs://ns1/hbase</value>
    </property>
    <!-- 指定hbase是分布式的 -->
    <property>
            <name>hbase.cluster.distributed</name>
            <value>true</value>
    </property>
    <!-- 指定zk的地址,多个用“,”分割 -->
    <property>
            <name>hbase.zookeeper.quorum</name>
            <value>itcast04:2181,itcast05:2181,itcast06:2181</value>
    </property>
</configuration>

vim regionservers

itcast03
itcast04
itcast05
itcast06

4. 将配置好的HBase拷贝到每一个节点并同步时间

scp -r /itcast/hbase-0.96.2-hadoop2/ itcast02:/itcast/
scp -r /itcast/hbase-0.96.2-hadoop2/ itcast03:/itcast/
scp -r /itcast/hbase-0.96.2-hadoop2/ itcast04:/itcast/
scp -r /itcast/hbase-0.96.2-hadoop2/ itcast05:/itcast/
scp -r /itcast/hbase-0.96.2-hadoop2/ itcast06:/itcast/

5. 启动所有的hbase

分别启动zk
    ./zkServer.sh start
启动hbase集群
    start-dfs.sh
启动hbase,在主节点上运行:
    start-hbase.sh

6.通过浏览器访问hbase管理页面

192.168.1.201:60010

7.为保证集群的可靠性,要启动多个HMaster,在itcast02上启动

hbase-daemon.sh start master
1…78
Skye

Skye

学习总结 思想感悟

78 日志
14 分类
37 标签
Weibo GitHub 简书 Email
Links
  • Huanqiang
© 2016 - 2019 Skye