Fork me on GitHub

mapreduce多目录输出笔记

使用MultipleOutputs实现多目录/文件输出

org.apache.hadoop.mapreduce.lib.output.MultipleOutputs

在map或者reduce类中加入如下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private MultipleOutputs<Text, NullWritable> mos;
@Override
protected void setup(Reducer<Text, NullWritable, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
mos = new MultipleOutputs<Text, NullWritable>(context);// 初始化mos
}
@Override
protected void cleanup(Reducer<Text, NullWritable, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.cleanup(context);
mos.close();
}

在需要输出数据的地方,可以使用定义好的 mos 进行输出

1
2
3
mos.write("outputName", key, value);
mos.write("outputName", key, value, "filePrefix");
mos.write("outputName", key, value, "path/filePrefix");//到文件夹

在Job Driver 时定义一些 Named Output

1
2
3
4
MultipleOutputs.addNamedOutput(job, "outputXName",
XXXOutputFormat.class, OutputXKey.class, OutputXValue.class);
MultipleOutputs.addNamedOutput(job, "outputYName",
YYYOutputFormat.class, OutputYKey.class, OutputYValue.class);

取消类似part-r-00000的空文件
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class)
例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package com.hdu.recommend.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
* @author Skye
*
*/
public class DataCleanIconAndWeb {
public static class QLMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
private String webGame = "网页游戏";
Text outputValue = new Text();
// 设置多文件输出
private MultipleOutputs<Text,NullWritable> mos;
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
mos = new MultipleOutputs<Text, NullWritable>(context);// 初始化mos
}
@Override
protected void cleanup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.cleanup(context);
mos.close();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 接收数据v1
String line = value.toString();
// 切分数据
String[] words = line.split("");
// String[] words = line.split("\t");
boolean isWeb = false;
boolean flag = true;
//一系列处理代码
//***
//***
//***
String action = words[1] + "\t" + words[0] + "\t" + words[2]
+ "\t" + words[3] + "\t" + words[5];
outputValue.set(action);
mos.write("iconRecord", outputValue, NullWritable.get(),"iconRecord/icon");
String action = words[1] + "\t" + words[0] + "\t"
+ words[2] + "\t" + words[3] + "\t" + words[4]
+ "\t" + words[5];
outputValue.set(action);
mos.write( "webRecord",outputValue, NullWritable.get(),"webRecord/web");
}
}
public static void run(String originalDataPath, String dataCleanOutputFile)
throws Exception {
// 构建Job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 注意:main方法所在的类
job.setJarByClass(DataCleanIconAndWeb.class);
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", false);
job.getConfiguration().setStrings(
"mapreduce.reduce.shuffle.input.buffer.percent", "0.1");
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", false);
job.setNumReduceTasks(3);
// 设置Mapper相关属性
job.setMapperClass(QLMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(originalDataPath));
// 设置Reducer相关属性
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(dataCleanOutputFile));
MultipleOutputs.addNamedOutput(job, "iconRecord",
TextOutputFormat.class, Text.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, "webRecord",
TextOutputFormat.class, Text.class, NullWritable.class);
// 文件格式
job.setInputFormatClass(TextInputFormat.class);
//取消part-r-00000新式文件输出
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class);
// 提交任务
job.waitForCompletion(true);
long endTime = System.currentTimeMillis();
}
}

参考
http://gnailuy.com/dataplatform/2015/11/22/common-techniques-for-mapreduce/
http://blog.csdn.net/zgc625238677/article/details/51524786
https://www.iteblog.com/archives/848

------------- The endThanks for reading-------------