练习一

实验一学习了官方给的例子WordCount,统计给定文本中给定词的次数,本练习要求自己编写MapReduce程序实现LineCount,统计给定文本中的行数,自定义测试文本。

练习二

自定义计数器,用计数器的方法统计给定文本中敏感词个数。

例如:文本

hello world
hello hadoop
hello MapReduce
hello hive

敏感词为hello,那么输出结果为4。

练习三

实现一个Map端连接,自定义测试用例。

练习四

编写代码实现将下面的文本1的数据排序为文本2的形式:

文本1:

2
21
345
67
3456
3
45
78
567
35654
6
10
278

文本2:

2
3
6
10
21
45
67
78
278
345
567
3456
35654

答案

练习一

代码:

import java.io.IOException;  
import java.util.Iterator;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapred.FileInputFormat;  
import org.apache.hadoop.mapred.FileOutputFormat;  
import org.apache.hadoop.mapred.JobClient;  
import org.apache.hadoop.mapred.JobConf;  
import org.apache.hadoop.mapred.MapReduceBase;  
import org.apache.hadoop.mapred.Mapper;  
import org.apache.hadoop.mapred.OutputCollector;  
import org.apache.hadoop.mapred.Reducer;  
import org.apache.hadoop.mapred.Reporter;  
import org.apache.hadoop.mapred.TextInputFormat;  
import org.apache.hadoop.mapred.TextOutputFormat;  
public class LineCount {  
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {  
        private final static IntWritable obj = new IntWritable(1);  
        private Text words = new Text("Total Lines");  
        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,  
            Reporter reporter) throws IOException {  
            output.collect(words, obj);  
        }  
    }  

public static class Reduce extends MapReduceBase implements  
    Reducer<Text, IntWritable, Text, IntWritable> {  
    public void reduce(Text key, Iterator<IntWritable> values,  
        OutputCollector<Text, IntWritable> output, Reporter reporter)  
        throws IOException {  
            int sum1 = 0;  
            while (values.hasNext()) {  
                sum1 += values.next().get();  
            }  
            output.collect(key, new IntWritable(sum1));  
        }  
}  

public static void main(String[] args) throws Exception {  
    JobConf config = new JobConf(LineCount.class);  
    config.setJobName("LineCount");  
    config.setOutputKeyClass(Text.class);  
    config.setOutputValueClass(IntWritable.class);  
    config.setMapperClass(Map.class);  
    config.setCombinerClass(Reduce.class);  
    config.setReducerClass(Reduce.class);  
    config.setInputFormat(TextInputFormat.class);  
    config.setOutputFormat(TextOutputFormat.class);  
    FileInputFormat.setInputPaths(config, new Path(args[0]));  
    FileOutputFormat.setOutputPath(config, new Path(args[1]));  
    JobClient.runJob(config);  
}  
}

练习二

代码:

package mr.mrcounter;  

import java.io.IOException;  
import java.net.URI;  
import java.net.URISyntaxException;  

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Counter;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  

public class WordCount {  

    public static void main(String[] args) throws IOException,  
            InterruptedException, URISyntaxException, ClassNotFoundException {  
        Path inPath = new Path("/MR_Counter/");// 输入目录  
        Path outPath = new Path("/MR_Counter/out");// 输出目录  

        Configuration conf = new Configuration();  
        // conf.set("fsdefaultFS", "hdfs://master:9000");  
        FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf,  
                "hadoop");  
        if (fs.exists(outPath)) {// 如果输出目录已存在,则删除  
            fs.delete(outPath, true);  
        }  

        Job job = Job.getInstance(conf);  

        job.setJarByClass(WordCount.class);  

        job.setMapperClass(MyMapper.class);  
        job.setReducerClass(MyReducer.class);  

        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(LongWritable.class);  

        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(LongWritable.class);  

        job.setInputFormatClass(TextInputFormat.class);  
        job.setOutputFormatClass(TextOutputFormat.class);  

        FileInputFormat.setInputPaths(job, inPath);  
        FileOutputFormat.setOutputPath(job, outPath);  

        job.waitForCompletion(true);  

    }  

    public static class MyMapper extends  
            Mapper<LongWritable, Text, Text, LongWritable> {  

        private static Text k = new Text();  
        private static LongWritable v = new LongWritable(1);  

        @Override  
        protected void map(LongWritable key, Text value, Context context)  
                throws IOException, InterruptedException {  

            <span style="color:#FF0000;">Counter sensitiveCounter = context.getCounter("Sensitive Words:",  
                    "mapreduce");</span>// 创建一个组是Sensitive Words,名是mapreduce的计数器  

            String line = value.toString();  
            String[] words = line.split(" ");  

            for (String word : words) {  
                <span style="color:#FF0000;">if (word.equalsIgnoreCase("mapreduce")) {//如果出现了mapreduce,则计数器值加1  
                    sensitiveCounter.increment(1L);  
                }</span>  
                k.set(word);  
                context.write(k, v);  
            }  
        }  
    }  

    public static class MyReducer extends  
            Reducer<Text, LongWritable, Text, LongWritable> {  

        private static LongWritable v = new LongWritable();  

        @Override  
        protected void reduce(Text key, Iterable<LongWritable> values,  
                Reducer<Text, LongWritable, Text, LongWritable>.Context context)  
                throws IOException, InterruptedException {  
            int sum = 0;  
            for (LongWritable value : values) {  
                sum += value.get();  
            }  

            v.set(sum);  

            context.write(key, v);  
        }  
    }  

}

练习三

测试用例:

order.txt
1001    pd001    300
1002    pd002    20
1003    pd003    40
1004    pd002    50

pdts.txt
pd001    apple
pd002    xiaomi
pd003    cuizi
代码:
package com.tianjie.mapsidejoin;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapSideJoin {

    static class MapSideJoinMappe extends Mapper<LongWritable, Text, Text, NullWritable>{


        //map 商品的订单信息k v key为商品编号,v为商品名称
        Map<String,String>     pdInfoMap = new HashMap<String, String>();
        Text ktext = new Text();


        /*setup 函数用来加载文件到hadoop缓存中
         * */
        protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {

            //打开输入文本文件的路径,获得一个输入流
            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("C:/Users/admin/Desktop/join/cache/pdts.txt")));
            String line;
            while(StringUtils.isNotEmpty(line = br.readLine())){
                //获得商品信息表 k为商品编号,value为商品名称
                String[] split = line.split("\t");
                pdInfoMap.put(split[0], split[1]);

            }

        }
        /*
         * hadoop 的缓冲机制*/


        /*
         * map 函数的输入key value ,其中默认输入为TextInputFormat,
         *     key 为输入文本的偏移量,value为输入文本的值
         *     Text,NullWriable为map文件输入的值
         *     */
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {

            //获得文本文件的一行
            String orderline  = value.toString();
            //将文本文件按照制表符切分
            String[] fields = orderline.split("\t");
            //更具商品编号,获得商品名称
            String pdName = pdInfoMap.get(fields[1]);
            //获得商品的名字,将商品名称追加在文本文件中
            ktext.set(orderline+"\t"+pdName);
            //将新的文本文件写出
            context.write(ktext, NullWritable.get());
        }

    }


    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {

        //得到hadoop的一个配置参数
        Configuration conf = new Configuration();
        //获取一个job实例
        Job job = Job.getInstance(conf);
        //加载job的运行类
        job.setJarByClass(MapSideJoin.class);

        //加载mapper的类
        job.setMapperClass(MapSideJoinMappe.class);
        //设置mapper类的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //设置文件输入的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        //设置文件的输出路径
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path(args[1]);
        if(fs.isDirectory(path)){
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //指定需要缓冲一个文件到所有maptask运行节点工作目录
        //job.addArchiveToClassPath(""); 缓存jar包到task运行节点的classpath中
        //job.addFileToClassPath(file); 缓存普通文件到task运行节点的classpath中
        //job.addCacheArchive(uri);      缓存压缩包文件到task运行节点的工作目录中

        //1:缓存普通文件到task运行节点的工作目录
        job.addCacheFile(new URI("file:///C:/Users/admin/Desktop/join/cache/pdts.txt")); 

        //2:指定map端的加入逻辑不需要reduce阶段,设置reducetask数量为0
        job.setNumReduceTasks(0);

        //提交job任务,等待job任务的结束
        boolean res =job.waitForCompletion(true);
        System.exit(res?1:0);

        }
}

输出:

1001 pd001 300 apple
1002 pd002 20 xiaomi
1003 pd003 40 cuizi
1004 pd002 50 xiaomi

练习四

代码:

import java.io.IOException;  
import java.util.Random;  

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.GenericOptionsParser;  

public class MyMapReduce {  

    public static class MyMapper extends  
            Mapper<Object, Text, IntWritable, IntWritable> {  
        //Map阶段的两个压入context的参数(也就是第3、4个参数)类型皆修改为IntWritable,而不是Text  
        public void map(Object key, Text value, Context context)  
                throws IOException, InterruptedException {  
            IntWritable data = new IntWritable(Integer.parseInt(value.toString()));//将输入文件的每一行擦写成IntWritable  
            IntWritable random = new IntWritable(new Random().nextInt());//搞个随机数  
            context.write(data, random);  
        }  
    }  

    public static class MyReducer extends  
            Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {  
        //将reducer得到的两个数据类型(第1、2个参数)标识为IntWritable,而不是Text  
        //将reducer写到文件的两个数据类型(第3、4个参数)标识为IntWritable,而不是Text  
        public void reduce(IntWritable key, Iterable<IntWritable> values,  
                Context context) throws IOException, InterruptedException {  
            while (values.iterator().hasNext()) {//遍历values,有1个随机数,输出一次key  
                context.write(key, null);  
                values.iterator().next();//记得遍历的时候,将游标(迭代器)向后推  
            }  
        }  
    }  

    public static void main(String[] args) throws Exception {  
        Configuration conf = new Configuration();  

        String[] otherArgs = new GenericOptionsParser(conf, args)  
                .getRemainingArgs();  
        if (otherArgs.length != 2) {  
            System.err.println("Usage: wordcount <in> <out>");  
            System.exit(2);  
        }  
        Job job = new Job(conf, "");  
        job.setMapperClass(MyMapper.class);  
        job.setReducerClass(MyReducer.class);  
        job.setOutputKeyClass(IntWritable.class);//表示写到文件的Key是IntWritable,而不是Text  
        job.setOutputValueClass(IntWritable.class);  
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }  

}

results matching ""

    No results matching ""