练习一
实验一学习了官方给的例子WordCount,统计给定文本中给定词的次数,本练习要求自己编写MapReduce程序实现LineCount,统计给定文本中的行数,自定义测试文本。
练习二
自定义计数器,用计数器的方法统计给定文本中敏感词个数。
例如:文本
hello world
hello hadoop
hello MapReduce
hello hive
敏感词为hello,那么输出结果为4。
练习三
实现一个Map端连接,自定义测试用例。
练习四
编写代码实现将下面的文本1的数据排序为文本2的形式:
文本1:
2
21
345
67
3456
3
45
78
567
35654
6
10
278
文本2:
2
3
6
10
21
45
67
78
278
345
567
3456
35654
答案
练习一
代码:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class LineCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable obj = new IntWritable(1);
private Text words = new Text("Total Lines");
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
output.collect(words, obj);
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum1 = 0;
while (values.hasNext()) {
sum1 += values.next().get();
}
output.collect(key, new IntWritable(sum1));
}
}
public static void main(String[] args) throws Exception {
JobConf config = new JobConf(LineCount.class);
config.setJobName("LineCount");
config.setOutputKeyClass(Text.class);
config.setOutputValueClass(IntWritable.class);
config.setMapperClass(Map.class);
config.setCombinerClass(Reduce.class);
config.setReducerClass(Reduce.class);
config.setInputFormat(TextInputFormat.class);
config.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(config, new Path(args[0]));
FileOutputFormat.setOutputPath(config, new Path(args[1]));
JobClient.runJob(config);
}
}
练习二
代码:
package mr.mrcounter;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static void main(String[] args) throws IOException,
InterruptedException, URISyntaxException, ClassNotFoundException {
Path inPath = new Path("/MR_Counter/");// 输入目录
Path outPath = new Path("/MR_Counter/out");// 输出目录
Configuration conf = new Configuration();
// conf.set("fsdefaultFS", "hdfs://master:9000");
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf,
"hadoop");
if (fs.exists(outPath)) {// 如果输出目录已存在,则删除
fs.delete(outPath, true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
public static class MyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private static Text k = new Text();
private static LongWritable v = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
<span style="color:#FF0000;">Counter sensitiveCounter = context.getCounter("Sensitive Words:",
"mapreduce");</span>// 创建一个组是Sensitive Words,名是mapreduce的计数器
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
<span style="color:#FF0000;">if (word.equalsIgnoreCase("mapreduce")) {//如果出现了mapreduce,则计数器值加1
sensitiveCounter.increment(1L);
}</span>
k.set(word);
context.write(k, v);
}
}
}
public static class MyReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
private static LongWritable v = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
v.set(sum);
context.write(key, v);
}
}
}
练习三
测试用例:
order.txt
1001 pd001 300
1002 pd002 20
1003 pd003 40
1004 pd002 50
pdts.txt
pd001 apple
pd002 xiaomi
pd003 cuizi
代码:
package com.tianjie.mapsidejoin;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapSideJoin {
static class MapSideJoinMappe extends Mapper<LongWritable, Text, Text, NullWritable>{
//map 商品的订单信息k v key为商品编号,v为商品名称
Map<String,String> pdInfoMap = new HashMap<String, String>();
Text ktext = new Text();
/*setup 函数用来加载文件到hadoop缓存中
* */
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
//打开输入文本文件的路径,获得一个输入流
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("C:/Users/admin/Desktop/join/cache/pdts.txt")));
String line;
while(StringUtils.isNotEmpty(line = br.readLine())){
//获得商品信息表 k为商品编号,value为商品名称
String[] split = line.split("\t");
pdInfoMap.put(split[0], split[1]);
}
}
/*
* hadoop 的缓冲机制*/
/*
* map 函数的输入key value ,其中默认输入为TextInputFormat,
* key 为输入文本的偏移量,value为输入文本的值
* Text,NullWriable为map文件输入的值
* */
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
//获得文本文件的一行
String orderline = value.toString();
//将文本文件按照制表符切分
String[] fields = orderline.split("\t");
//更具商品编号,获得商品名称
String pdName = pdInfoMap.get(fields[1]);
//获得商品的名字,将商品名称追加在文本文件中
ktext.set(orderline+"\t"+pdName);
//将新的文本文件写出
context.write(ktext, NullWritable.get());
}
}
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
//得到hadoop的一个配置参数
Configuration conf = new Configuration();
//获取一个job实例
Job job = Job.getInstance(conf);
//加载job的运行类
job.setJarByClass(MapSideJoin.class);
//加载mapper的类
job.setMapperClass(MapSideJoinMappe.class);
//设置mapper类的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置文件输入的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//设置文件的输出路径
FileSystem fs = FileSystem.get(conf);
Path path = new Path(args[1]);
if(fs.isDirectory(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//指定需要缓冲一个文件到所有maptask运行节点工作目录
//job.addArchiveToClassPath(""); 缓存jar包到task运行节点的classpath中
//job.addFileToClassPath(file); 缓存普通文件到task运行节点的classpath中
//job.addCacheArchive(uri); 缓存压缩包文件到task运行节点的工作目录中
//1:缓存普通文件到task运行节点的工作目录
job.addCacheFile(new URI("file:///C:/Users/admin/Desktop/join/cache/pdts.txt"));
//2:指定map端的加入逻辑不需要reduce阶段,设置reducetask数量为0
job.setNumReduceTasks(0);
//提交job任务,等待job任务的结束
boolean res =job.waitForCompletion(true);
System.exit(res?1:0);
}
}
输出:
1001 pd001 300 apple
1002 pd002 20 xiaomi
1003 pd003 40 cuizi
1004 pd002 50 xiaomi
练习四
代码:
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MyMapReduce {
public static class MyMapper extends
Mapper<Object, Text, IntWritable, IntWritable> {
//Map阶段的两个压入context的参数(也就是第3、4个参数)类型皆修改为IntWritable,而不是Text
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
IntWritable data = new IntWritable(Integer.parseInt(value.toString()));//将输入文件的每一行擦写成IntWritable
IntWritable random = new IntWritable(new Random().nextInt());//搞个随机数
context.write(data, random);
}
}
public static class MyReducer extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
//将reducer得到的两个数据类型(第1、2个参数)标识为IntWritable,而不是Text
//将reducer写到文件的两个数据类型(第3、4个参数)标识为IntWritable,而不是Text
public void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
while (values.iterator().hasNext()) {//遍历values,有1个随机数,输出一次key
context.write(key, null);
values.iterator().next();//记得遍历的时候,将游标(迭代器)向后推
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "");
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(IntWritable.class);//表示写到文件的Key是IntWritable,而不是Text
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}