实验目的

熟悉MapReduce的架构和基本操作。查看官方给的例子,在Hadoop的伪分布式模式下,以grep为实验对象了解执行MapReduce作业的过程,了解mapper函数和reducer函数。

实验原理

Hadoop MapReduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上TB级别的数据集。

一个MapReduce作业(job)通常会把输入的数据集切分为若干独立的数据块,由map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在 文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

通常,MapReduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。

MapReduce框架由一个单独的master和每个集群节点一个slave共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。

应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了作业配置(job configuration)。然后,Hadoop的job client提交作业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。

NOTE.本实验中代码前带#的表示shell代码,终端中执行,shell代码的下一行表示该代码的输出;否则为JAVA代码,在eclipse中执行。

实验步骤

步骤1.进入hadoop目录,查看hadoop安装目录下有哪些文件。

#cd /usr/local/hadoop/bin/
#ls
container-executor  hadoop  hadoop.cmd  hdfs  hdfs.cmd  mapred  mapred.cmd  rcc  test-container-executor  yarn  yarn.cmd

对应的,例子文件一般路径是/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.4.jar,注意这里版本是2.7.4,如果安装不同的版本,此处版本号是不一样的。

步骤2.查看hadoop官方给了哪些例子。

# ./hadoop jar /usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar
An example program must be given as the first argument.
Valid program names are:
  aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files.
  aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files.
  bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi.
  dbcount: An example job that count the pageview counts from a database.
  distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.
  grep: A map/reduce program that counts the matches of a regex in the input.
  join: A job that effects a join over sorted, equally partitioned datasets
  multifilewc: A job that counts words from several files.
  pentomino: A map/reduce tile laying program to find solutions to pentomino problems.
  pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method.
  randomtextwriter: A map/reduce program that writes 10GB of random textual data per node.
  randomwriter: A map/reduce program that writes 10GB of random data per node.
  secondarysort: An example defining a secondary sort to the reduce.
  sort: A map/reduce program that sorts the data written by the random writer.
  sudoku: A sudoku solver.
  teragen: Generate data for the terasort
  terasort: Run the terasort
  teravalidate: Checking results of terasort
  wordcount: A map/reduce program that counts the words in the input files.
  wordmean: A map/reduce program that counts the average length of the words in the input files.
  wordmedian: A map/reduce program that counts the median length of the words in the input files.
  wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.

步骤3.以grep例子作为实验,查看grep例子的用法。

grep例子的作用是给出一个正则表达式和一系列文件(也可以是单个文件),统计正则表达式匹配到的单词的次数。

想要知道具体某个例子怎么使用,可以在上面的命令后面跟上这个例子的名称加--help

# ./hadoop jar /usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar grep --help
Grep <inDir> <outDir> <regex> [<group>]
Generic options supported are
-conf <configuration file>     specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|resourcemanager:port>    specify a ResourceManager
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]

可以看到,grep例子后面跟着的是输入文件目录和输出文件目录,然后是正则表达式。

步骤4.创建要处理的数据。

我们先在Hadoop安装目录/usr/local/hadoop下创建一个文件,使用如下命令快速创建:

#cd /usr/local/hadoop
# echo "hello world.the world is hadoop world">input.txt
# cat input.txt 
hello world.the world is hadoop world
#bin/hdfs dfs -put input.txt .

echo命令会输出双引号中的字符串,而>命令则将输出重定向到文件input.txt中。

步骤5.用hadoop命令提交作业,运行作业

# bin/hadoop jar /usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.4.jar grep input.txt output world

步骤6.查看结果:

# bin/hdfs dfs -cat output/p*
3    world

可以看到,在结果文件中显示,匹配到正则表达式:world的次数为3次。

步骤7.自己编写代码,实现grep同样的功能。在eclipse中File->New->Other->Map/Reduce->Map/Reduce Project,新建项目GrepExample,然后新建包grepExample,并创建类Grep.

下面代码String dir_in = "hdfs://localhost:9000/user/hadoop/input.txt";中的hadoop要换成自己的用户名。

package grepExample;

import java.io.IOException;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.map.RegexMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;

public class Grep {

    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        String dir_in = "hdfs://localhost:9000/user/hadoop/input.txt";
        String dir_out = "hdfs://localhost:9000/output_grep";
        String reg = "world";//匹配world

        conf.set(RegexMapper.PATTERN, reg);
        conf.setInt(RegexMapper.GROUP, 0);

        Path in = new Path(dir_in);
        Path tmp = new Path("grep-temp-"
                + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
        Path out = new Path(dir_out);

        try {
            Job grepJob = new Job(conf, "grep-search");

            grepJob.setJarByClass(Grep.class);

            grepJob.setInputFormatClass(TextInputFormat.class);
            grepJob.setMapperClass(RegexMapper.class);
            grepJob.setCombinerClass(LongSumReducer.class);
            grepJob.setPartitionerClass(HashPartitioner.class);

            grepJob.setMapOutputKeyClass(Text.class);
            grepJob.setMapOutputValueClass(LongWritable.class);
            FileInputFormat.addInputPath(grepJob, in);

            grepJob.setReducerClass(LongSumReducer.class);
            // job.setNumReduceTasks(1);
            grepJob.setOutputFormatClass(SequenceFileOutputFormat.class);

            grepJob.setOutputKeyClass(Text.class);
            grepJob.setOutputValueClass(LongWritable.class);
            FileOutputFormat.setOutputPath(grepJob, tmp);

            grepJob.waitForCompletion(true);

            Job sortJob = new Job(conf, "grep-sort");

            sortJob.setJarByClass(Grep.class);

            sortJob.setInputFormatClass(SequenceFileInputFormat.class);
            sortJob.setMapperClass(InverseMapper.class);
            FileInputFormat.addInputPath(sortJob, tmp);

            sortJob.setNumReduceTasks(1);//【全局排序】
            sortJob.setSortComparatorClass(LongWritable.DecreasingComparator.class);//逆序

            FileOutputFormat.setOutputPath(sortJob, out);

            sortJob.waitForCompletion(true);

        } finally {
            FileSystem.get(conf).delete(tmp, true);
        }
    }
}

步骤8.选择Grep.java->Run As ->Run on Hadoop, 运行程序,然后在DFS Locations中选择output_grep,点开part-r-00000,查看结果。

步骤10.Grep代码的简单分析。

在MapReduce中编程时,不使用JAVA的原生类型或者包装类型,Hadoop为我们提供了一整套对应的包装类来实现这些类型。下面是对应关系表:

hadoop数据类型 <------------> java数据类型:

布尔型:

BooleanWritable <------------> boolean

整型:

ByteWritable <------------> byte

ShortWritable <------------> short

IntWritable <------------> int

LongWritable <------------> long

浮点型:

FloatWritable <------------> float

DoubleWritable <------------> double

字符串(文本):

Text <------------> String

数组:

ArrayWritable <------------> Array

map集合:

MapWritable <------------> map

还有ObjectWritable、NullWritable等很多不常用类型。

在该过程中,Map的结果是:
key value
world 1
world 1
world 1
那么传入到Reducer中时,其输入值已经变成了:
key value
world [1,1,1]
因此,在Reducer中,我们只需要把每个value数组迭代,并相加即可得到总的计数。

为了能够将程序的入参(输入文件路径、输出目录路径、正则表达式)配置正确,在main函数中需要new一个job对象,并将Jobconf对象传入。然后利用job对象的一系列方法,就能方便地设置处理的Mapper和Reducer类分别是什么,程序的输出值是什么类型等。FileInputFormat和FileOutputFormat两个类则可以方便地设置输入路径和输出路径。输入路径是允许设置多个的,不过此处我们不需要实现的那么复杂,就允许一个路径即可。job.waitForCompletion(true)是任务执行的语句,入参true表示执行时打印执行过程,该方法返回布尔值,如果为true那就是执行成功,false就是执行失败。

results matching ""

    No results matching ""