Hadoop

Table of Contents

1 Hadoop简介

Hadoop 起源于开源的网络搜索引擎Apache Nutch。利用Hadoop,用户可以在不了解分布式底层细节的情况下,开发分布式程序,充分利用集群的威力进行高速运算和存储。

Hadoop不是缩写,是它一个生造出来的词。

本文主要摘自:《Hadoop权威指南(第3版) 修订版》或者其英文原版Hadoop - The Definitive Guide, 4th

1.1 数据存储和分析(Hadoop产生背景)

我们生活在数据大爆炸时代,每天产生的数据越来越多。我们面临数据存储和分析的困境: 硬盘存储容量在不断提升,但硬盘数据的读取速度却没有与时俱进。 1990年,一个普通硬盘可以存储1370MB数据,传输速度为4.4MB/s(这些规格对应的是希捷的ST-41600n硬盘),因此只需要5分钟就可以读完整个硬盘中的数据。20年过去了,1TB的硬盘已然成为主流,但其数据传输速度约为100MB/s,读完整个硬盘中的数据至少得花2.5个小时。

一个很简单的减少读取时间的办法是同时从多个硬盘上读数据。试想,如果我们有100个硬盘,每个硬盘存储1%的数据,并行读取,那么不到两分钟就可以读完所有数据。 仅使用硬盘容量的1%似乎很浪费。但是我们可以存储100个数据集,每个数据集1 TB,并实现共享硬盘的读取。可以想象,用户肯定很乐于通过硬盘共享来缩短数据分析时间;并且,从统计角度来看,用户的分析工作都是在不同时间点进行的,所以彼此之间的干扰并不太大。

虽然如此,但要对多个硬盘中的数据并行进行读写数据,还有更多问题要解决。第一个需要解决的是硬件故障问题。一旦开始使用多个硬件,其中个别硬件就很有可能发生故障。为了避免数据丢失,最常见的做法是复制(replication):系统保存数据的复本(replica),一旦有系统发生故障,就可以使用另外保存的复本。例如,冗余硬盘阵列(RAID)就是按这个原理实现的,另外,Hadoop的文件系统(HDFS,Hadoop Distributed FileSystem)也是一类,不过它采取的方法稍有不同,详见后文的描述。

第二个问题是大多数分析任务需要以某种方式结合大部分数据来共同完成分析,即从一个硬盘读取的数据可能需要与从另外99个硬盘中读取的数据结合使用。各种分布式系统允许结合不同来源的数据进行分析,但保证其正确性是一个非常大的挑战。 MapReduce提出一个编程模型,该模型抽象出这些硬盘读写问题并将其转换为对一个数据集(由键值对组成)的计算。 后文将详细讨论这个模型,这样的计算由map和reduce两部分组成,而且只有这两部分提供对外的接口。

简而言之, Hadoop为我们提供了一个可靠的共享存储和分析系统。HDFS实现数据的存储,MapReduce实现数据的分析和处理。虽然Hadoop还有其他功能,但HDFS和MapReduce是它的核心价值。

1.2 相较于其他系统的优势

1.2.1 和关系型数据库的比较

为什么不能用数据库来对大量硬盘上的大规模数据进行批量分析呢?我们为什么需要MapReduce?关系型数据库和MapReduce的比较参考下表:

  Traditional RDBMS MapReduce
Data size Gigabytes Petabytes
Access Interactive and batch Batch
Updates Read and write many times Write once, read many times
Transactions ACID None
Structure Schema-on-write Schema-on-read
Integrity High Low
Scaling Nonlinear Linear

1.2.2 和网格计算的比较

高性能计算(High Performance Computing,HPC)和网格计算(Grid Computing)组织多年以来一直在研究大规模数据处理,主要使用类似于消息传递接口(Message Passing Interface,MPI)的API。从广义上讲, 高性能计算采用的方法是将作业分散到集群的各台机器上,这些机器访问存储区域网络(SAN)所组成的共享文件系统。 这比较适用于计算密集型的作业,但如果节点需要访问的数据量更庞大(高达几百GB,MapReduce开始施展它的魔法),很多计算节点就会因为网络带宽的瓶颈问题不得不闲下来等数据。

MapReduc尽量在计算节点上存储数据,以实现数据的本地快速访问。数据本地化(data locality)特性是MapReduce的核心特征,并因此而获得良好的性能。

虽然MPI赋予程序员很大的控制权,但需要程序员显式控制数据流机制,包括用C语言构造底层的功能模块(例如套接字)和高层的数据分析算法。而MapReduce则在更高层次上执行任务,即程序员仅从键值对函数的角度考虑任务的执行,而且数据流是隐含的。

2 MapReduce

MapReduce是一种可用于数据处理的编程模型。

2.1 NCDC气象数据集

下面是美国国家气候数据中心(National Climatic Data Center,简称NCDC)的数据实例(数据可以从 http://hadoopbook.com/code.html 下载)。我们的任务是从中找出每年全球气温的最高记录是多少?

这些数据以行为单位,使用ASCII格式存储,每行就是一条记录。数据放在目录all中,每年的数据在一个以年份命名的gz压缩包中,比如1901年的数据保存在1901.gz中,其部分实例(前20条)如下:

# 下面是1901.gz中的前20条记录,(可通过 `gunzip -c all/1901.gz | head -n 20` 得到输出)
0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
0029029070999991901010113004+64333+023450FM-12+000599999V0202901N008219999999N0000001N9-00721+99999102001ADDGF104991999999999999999999
0029029070999991901010120004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00941+99999102001ADDGF108991999999999999999999
0029029070999991901010206004+64333+023450FM-12+000599999V0201801N008219999999N0000001N9-00611+99999101831ADDGF108991999999999999999999
0029029070999991901010213004+64333+023450FM-12+000599999V0201801N009819999999N0000001N9-00561+99999101761ADDGF108991999999999999999999
0029029070999991901010220004+64333+023450FM-12+000599999V0201801N009819999999N0000001N9-00281+99999101751ADDGF108991999999999999999999
0029029070999991901010306004+64333+023450FM-12+000599999V0202001N009819999999N0000001N9-00671+99999101701ADDGF106991999999999999999999
0029029070999991901010313004+64333+023450FM-12+000599999V0202301N011819999999N0000001N9-00331+99999101741ADDGF108991999999999999999999
0029029070999991901010320004+64333+023450FM-12+000599999V0202301N011819999999N0000001N9-00281+99999101741ADDGF108991999999999999999999
0029029070999991901010406004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00331+99999102311ADDGF108991999999999999999999
0029029070999991901010413004+64333+023450FM-12+000599999V0202301N008219999999N0000001N9-00441+99999102261ADDGF108991999999999999999999
0029029070999991901010420004+64333+023450FM-12+000599999V0202001N011819999999N0000001N9-00391+99999102231ADDGF108991999999999999999999
0029029070999991901010506004+64333+023450FM-12+000599999V0202701N004119999999N0000001N9+00001+99999101821ADDGF104991999999999999999999
0029029070999991901010513004+64333+023450FM-12+000599999V0202701N002119999999N0000001N9+00061+99999102591ADDGF104991999999999999999999
0029029070999991901010520004+64333+023450FM-12+000599999V0202301N004119999999N0000001N9+00001+99999102671ADDGF104991999999999999999999
0029029070999991901010606004+64333+023450FM-12+000599999V0202701N006219999999N0000001N9+00061+99999102751ADDGF103991999999999999999999
0029029070999991901010613004+64333+023450FM-12+000599999V0202701N006219999999N0000001N9+00061+99999102981ADDGF100991999999999999999999
0029029070999991901010620004+64333+023450FM-12+000599999V0203201N002119999999N0000001N9-00111+99999103191ADDGF100991999999999999999999
0029029070999991901010706004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00331+99999103341ADDGF100991999999999999999999
0029029070999991901010713004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00501+99999103321ADDGF100991999999999999999999
---------------------------------------------------------------------------------------||||||-----------------------------------------
                                                                                  我们只关心这6列(88到93列)

每行记录中第88列到第92列是气温值(如-0078表示零下7.8度,如果数据缺失则用+9999表示),第93列是quality code(仅当该列为0/1/4/5/9时认为气温值是有效的)。由于我们只需要统计最高气温,所以仅关心第88列到第93列即可,其它列(如记录日期、经伟度、风向等等)都不用关心。

下面我们将分别使用Unix传统工具和Hadoop来计算每年全球气温的最高记录。

2.2 方法一:使用Unix工具来分析数据

找出前面数据中,每年全球气温的最高记录是多少?用awk容易实现:

#!/usr/bin/env bash
for year in all/*
do
    echo -ne `basename $year .gz`"\t"
    gunzip -c $year | \
        awk '{ temp = substr($0, 88, 5) + 0;
               q = substr($0, 93, 1);
               if (temp != 9999 && q ~ /[01459]/ && temp > max) max = temp }
             END { print max }'
done

假设上面程序名为max_temperature.sh,则运行它可以得到:

% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...

由于源文件中的气温值被放大了10倍,所以1901年的最高气温是31.7度。在亚马逊的EC2 High-CPU Extra Large Instance运行这个程序,处理一个世纪的气象数据,找到每年最高气温,需要42分钟。

2.3 方法二:使用Hadoop来分析数据

2.3.1 map和reduce处理过程的原理性介绍

MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段。每个阶段都将“键/值对”作为输入,键值对的类型可以由程序员选择。

以前面求最高气温的实例为例子。map阶段的输入是美国国家气候数据中心的原始数据。为了简单全面地描述处理过程,仅考虑输入数据的下面5条样本记录:

$ cat input/ncdc/sample.txt
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999

把这5行记录以“键/值对”的方式作为 map函数的输入:

(0,   0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999)
(106, 0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999)
(212, 0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999)
(318, 0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999)
(424, 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999)
---------------------||||--------------------------------------------------------------------||||||------------
                     年份                                                               气温信息(第88-92列是气温值,第93列是quality code)

其中,key是记录在文件中的行号(从0开始),我们并不关心,将其忽略即可。map函数的功能是提取年份和气温信息, map函数的输出为:

(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)

map函数输出经由MapReduce框架中进行进一步的处理后,主要需要根据键对键/值对进行排序和分组。经过这一番处理之后, reduce函数看到的输入为:

(1949, [111, 78])
(1950, [0, 22, −11])

reduce函数所需要做的工作是遍历这些数据,找出最大值,产生最终的输出结果。所以 reduce函数的输出为:

(1949, 111)
(1950, 22)

这个结果就是从5行样本数据中得到的每一个全球最高气温,即:1949年的最高气温是11.1度,1950年最高气温是2.2度。

整个数据流如图 1 所示。

hadoop_mapreduce_logical_data_flow.jpg

Figure 1: MapReduce的逻辑数据流

在图 1 的底部是Unix管道,用于模拟整个MapReduce的流程,这部分内容将在后面讨论Hadoop Straming时再次涉及。

2.3.2 Java MapReduce

前面介绍了MapReduce的工作原理,现在我们用代码实现它。我们需要三样东西:map函数、reduce函数、用来运行MapReduce作业的main函数。

2.3.2.1 map函数

map函数是由一个Mapper接口来实现的,其中声明了一个map()虚方法。在求最高气温的例子中,map函数的实现如下:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final int MISSING = 9999;

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String year = line.substring(15, 19);
        int airTemperature;
        if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
            airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
        String quality = line.substring(92, 93);

        if (airTemperature != MISSING && quality.matches("[01459]")) {       // 其逻辑和前面的awk脚本类似
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}

这个Mapper接口是一个泛型类型,它有4个形参类型,由它们来指定map函数的“输入键”、“输入值”、“输出键”和“输出值的类型”。 就前面这个实例来说,输入键是一个长整数偏移量,输入的值是一行文本,输出键是年份,输出值是气温(整数)。Hadoop规定了自己的一套可用于网络序列优化的基本类型,而不直接使用内置的Java类型。这些类型都可以在org.apache.hadoop.io包中找到。这里使用LongWritable类型(相当于Java的Long类型)、Text类型(相当于Java的String类型)和IntWritable类型(相当于Java的Integer类型)。

map()方法有3个参数。前两个参数为map()方法需要传入一个键和一个值。我们将一个包含Java字符串输入行的Text值转换成Java的String类型,然后利用其substring()方法提取我们感兴趣的列。map()方法还提供了一个Context实例用于写入输出内容。在这个例子中,我们将年份数据是“输出键”,所以把它按Text对象进行写入,将气温值封装在IntWritable类型中。

2.3.2.2 reduce函数

类似地,reduce函数是由一个Reducer接口来实现的,其中声明了一个reduce()虚方法。在求最高气温的例子中,reduce函数的实现如下:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;
        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, new IntWritable(maxValue));
    }
}

Reducer接口是一个泛型类型,它有4个形参类型用于指定reduce过程的输入和输出类型。 reduce过程的输入类型必须和map过程的输出类型(即Text类型和IntWritable类型)匹配。在求最高气温的例子中,reduce过程的输出类型也正好为Text类型和IntWritable类型,分别表示年份及最高气温。

2.3.2.3 运行MapReduce作业的main函数

最后一部分代码是用来运行MapReduce作业。如下所示:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperature <input path> <output path>");
            System.exit(-1);
        }

        Job job = new Job();
        job.setJarByClass(MaxTemperature.class);
        job.setJobName("Max temperature");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Job对象指定作业执行规范。我们可以用它来控制整个作业的运行。我们在Hadoop集群上运行这个作业时,要把代码打包成一个JAR文件(Hadoop在集群上发布这个文件)。不必明确指定JAR文件的名称,在Job对象的setJarByClass()方法中传递一个类即可,Hadoop利用这个类来查找包含它的JAR文件,进而找到相关的JAR文件。

构造Job对象之后,需要指定输入和输出数据的路径。调用FileInputFormat类的静态方法addInputPath()来定义输入数据的路径,这个路径可以是单个的文件、一个目录(此时,将目录下所有文件当作输入)或符合特定文件模式的一系列文件。由函数名可知,可以多次调用addInputPath()来实现多路径的输入。

调用FileOutputFormat类中的静态方法setOutputPath()来指定输出路径(只能有一个输出路径)。这个方法指定的是reduce函数输出文件的写入目录。在运行作业前该目录是不应该存在的,否则Hadoop会报错并拒绝运行作业。这种预防措施的目的是防止数据丢失(长时间运行的作业如果结果被意外覆盖,肯定是非常恼人的)。

接着,通过setMapperClass()和setReducerClass()指定map类型和reduce类型。

setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型,正如本例所示,这两个输出类型一般都是相同的。如果不同,则通过setMapOutputKeyClass()和setMapOutputValueClass()来设置map函数的输出类型。

输入的类型通过InputFormat类来控制,我们的例子中没有设置,因为使用的是默认的TextInputFormat(文本输入格式)。

在设置定义map和reduce函数的类之后,可以开始运行作业。Job中的waitForCompletion()方法提交作业并等待执行完成。该方法中的布尔参数是个详细标识,所以作业会把进度写到控制台。

waitForCompletion()方法返回一个布尔值,表示执行的成(true)败(false),这个布尔值被转换成程序的退出代码0或者1。

程序的运行测试参见附录部分。

2.4 Hadoop Streaming

Hadoop提供了MapReduce的API,允许你使用非Java的其他语言来写自己的map和reduce函数。 Hadoop Streaming使用Unix标准流作为Hadoop和应用程序之间的接口,所以我们可以使用任何编程语言(如Ruby,Python等等)通过标准输入/输出来写MapReduce程序。

Streaming天生适合用于文本处理。map的输入数据通过标准输入流传递给map函数,并且是一行一行地传输,最后将结果行写到标准输出。map输出的键/值对是以一个制表符分隔的行,并且写入标准输出reduce 函数的输入格式与之相同(通过制表符来分隔的键/值对)并通过标准输入流进行传输。reduce函数从标准输入流中读取输入行,该输入已由Hadoop框架根据键排过序,最后将结果写入标准输出。

下面使用Streaming来重写按年份查找最高气温的MapReduce程序。

2.4.1 Ruby版本

每年全球气温的最高记录的例子。用Ruby编写的map函数:

#!/usr/bin/env ruby
STDIN.each_line do |line|
   val = line
   year, temp, q = val[15,4], val[87,5], val[92,1]
   puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end

用Ruby编写的map函数:

#!/usr/bin/env ruby

last_key, max_val = nil, -1000000
STDIN.each_line do |line|
  key, val = line.split("\t")
  if last_key && last_key != key
    puts "#{last_key}\t#{max_val}"
    last_key, max_val = key, val.to_i
  else
    last_key, max_val = key, [max_val, val.to_i].max
  end
end
puts "#{last_key}\t#{max_val}" if last_key

可以直接在Unix管道上进行测试,过程如下:

$ cat input/ncdc/sample.txt | ./max_temperature_map.rb
1950	+0000
1950	+0022
1950	-0011
1949	+0111
1949	+0078
$ cat input/ncdc/sample.txt | ./max_temperature_map.rb  | sort | ./max_temperature_reduce.rb
1949	111
1950	22

在Hadoop环境中的测试如下:

$ rm -rf output
$ hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
  -input input/ncdc/sample.txt \
  -output output \
  -mapper ./max_temperature_map.rb \
  -reducer ./max_temperature_reduce.rb

运行结束后,在output目录中会生成结果文件:

$ cat output/part-00000
1949	111
1950	22

3 附录:安装Hadoop

这里仅介绍在单机上安装Hadoop(以版本2.7.1为例)的过程。
首先,从官网 http://hadoop.apache.org/releases.html#Download 下载稳定版的发布包(hadoop-2.7.1.tar.gz,大小约201MB),然后解压到本地文件系统中。

$ tar xzf hadoop-2.7.1.tar.gz
$ ls hadoop-2.7.1/
LICENSE.txt README.txt  etc         lib         sbin
NOTICE.txt  bin         include     libexec     share

然后,配置JAVA_HOME/HADOOP_INSTALL/PATH三个环境变量,通常在shell启动文件(如~/.bash_profile)中设置。

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/
export HADOOP_INSTALL=/Users/cig01/hadoop-2.7.1/
export PATH=$PATH:$HADOOP_INSTALL/bin

最后,执行命令 hadoop version 来测试是否安装成功。如:

$ hadoop version
Hadoop 2.7.1
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 15ecc87ccf4a0228f35af08fc56de536e6ce657a
Compiled by jenkins on 2015-06-29T06:04Z
Compiled with protoc 2.5.0
From source with checksum fc0a1a23fc1868e4d5ee7fa2b28a58a
This command was run using /Users/cig01/hadoop-2.7.1/share/hadoop/common/hadoop-common-2.7.1.jar

4 附录:Maven搭建Hadoop工程(从NCDC数据集中求每年的最高气温)

下面介绍如何用Maven搭建Hadoop工程,从NCDC数据集中求每年的最高气温。

第一步,生成一个简单的Hello World工程。

$ mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.mycompany.app -DartifactId=my-hadoop-app -DinteractiveMode=false

第二步,添加Hadoop依赖包。
一个简单的Hadoop工程,可能需要下面几个包:

hadoop-common
hadoop-hdfs
hadoop-mapreduce-client-core
hadoop-mapreduce-client-jobclient
hadoop-mapreduce-client-common

对应地,把下面依赖增加到pom.xml中。

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.7.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.7.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
        <version>2.7.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-common</artifactId>
        <version>2.7.1</version>
    </dependency>

第三步,实现map函数、reduce函数、用来运行MapReduce作业的main函数。
删除第一步自动生成的文件App.java和AppTest.java。把之前介绍的MaxTemperatureMapper/MaxTemperatureReducer/MaxTemperature三个类,放入到包com.mycompany.app中。

$ cd my-hadoop-app
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/com
./src/main/java/com/mycompany
./src/main/java/com/mycompany/app
./src/main/java/com/mycompany/app/MaxTemperature.java
./src/main/java/com/mycompany/app/MaxTemperatureMapper.java
./src/main/java/com/mycompany/app/MaxTemperatureReducer.java

第四步,测试运行。

$ mvn clean package
$ rm -rf out/
$ hadoop jar target/my-hadoop-app-1.0-SNAPSHOT.jar com.mycompany.app.MaxTemperature input/ncdc/sample.txt  out

运行结束后,在out目录中会有名为part-xxx的结果文件,如:

$ cat out/part-r-00000
1949	111
1950	22

即在样本数据集中,1949年的最高气温是11.1度,1950年最高气温是2.2度。


Author: cig01

Created: <2015-11-28 Sat 00:00>

Last updated: <2017-03-26 Sun 21:13>

Creator: Emacs 25.1.1 (Org mode 9.0.7)