Hadoop简明教程

关于Hadoop

Apache Hadoop是一款支持数据密集型分布式应用程序并以Apache 2.0许可协议发布的开源软件框架。它支持在商品硬件构建的大型集群上运行的应用程序。Hadoop是根据谷歌公司发表的MapReduce和Google文件系统的论文自行实现而成。所有的Hadoop模块都有一个基本假设,即硬件故障是常见情况,应该由框架自动处理。

Hadoop框架透明地为应用提供可靠性和数据移动。它实现了名为MapReduce的编程范式:应用程序被分割成许多小部分,而每个部分都能在集群中的任意节点上运行或重新运行。此外,Hadoop还提供了分布式文件系统,用以存储所有计算节点的数据,这为整个集群带来了非常高的带宽。MapReduce和分布式文件系统的设计,使得整个框架能够自动处理节点故障。它使应用程序与成千上万的独立计算的计算机和PB级的数据连接起来。现在普遍认为整个Apache Hadoop“平台”包括Hadoop内核、MapReduce、Hadoop分布式文件系统(HDFS)以及一些相关项目,有Apache Hive和Apache HBase等等。

Hadoop的基本概念

MapReduce

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念”Map(映射)”和”Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

简单来说,一个映射函数就是对一些独立元素组成的概念上的列表(例如,一个测试成绩的列表)的每一个元素进行指定的操作(比如,有人发现所有学生的成绩都被高估了一分,他可以定义一个“减一”的映射函数,用来修正这个错误。)。事实上,每个元素都是被独立操作的,而原始列表没有被更改,因为这里创建了一个新的列表来保存新的答案。这就是说,Map操作是可以高度并行的,这对高性能要求的应用以及并行计算领域的需求非常有用。

而归纳操作指的是对一个列表的元素进行适当的合并(继续看前面的例子,如果有人想知道班级的平均分该怎么做?他可以定义一个归纳函数,通过让列表中的奇数(odd)或偶数(even)元素跟自己的相邻的元素相加的方式把列表减半,如此递归运算直到列表只剩下一个元素,然后用这个元素除以人数,就得到了平均分)。虽然他不如映射函数那么并行,但是因为归纳总是有一个简单的答案,大规模的运算相对独立,所以归纳函数在高度并行环境下也很有用。

HDFS

HDFS是一个分布式文件系统。因为HDFS具有高容错性的特点,所以它可以设计部署在低廉的硬件上。她可以通过提供高吞吐率来访问应用程序的程序,适合那些有着超大数据集的应用程序。HDFS放宽了对可移植操作系统接口(POSIX)的要求,这样可以实现以流的形式访问文件系统中的数据。HDFS原本是开源的Apache项目Nutch的基础结构,最后它却成为了Hadoop基础架构之一。

HDFS通过三个重要的角色来进行文件系统的管理:NameNode、DataNode和Client。

NameNode可以看做是分布式文件系统中的管理者,主要负责管理文件系统的命名空间、集群配置信息和存储块的复制等。NameNode会将文件系统的Metadata存储在内存中,这些信息主要包括文件信息、每一个文件对应的文件块的信息和每一个文件块在DataNode中的信息等。

DataNode是文件存储的基本单元,它将文件块(Block)存储在本地文件系统中,保存了所有Block的Metadata,同时周期性地将所有存在的Block信息发送给NameNode。

YARN

Apache YARN(Yet Another Resource Negotiator)是Hadoop的集群资源管理系统。YARN被引入Hadoop 2,最初是为了改善MapReduce的实现,但它具有足够的通用性,同样可以支持其他的分布式计算模式。

YARN提供请求和使用集群资源的API,但这些API很少直接用于用户代码。相反,用户代码中用的是分布式计算框架提供的更高层API,这些API建立在YARN之上且向用户隐藏了资源管理细节。一些分布式计算框架(MapReduce,Spark等等)作为YARN应用运行在集群计算层(YARN)和集群存储层(HDFS和HBase)上。还有一层应用如Pig、Hive和Crunch都是运行在MapReduce,Spark或Tez之上的处理框架,它们不和YARN直接打交道。

YARN通过两类长期运行的守护进程提供自己的核心服务:管理集群上资源使用的资源管理器(Resource Manager)、运行在集群中所有节点上且能够启动和监控容器(Container)的节点管理器(Node Manager)。容器用于执行特定应用程序的进程,每个容器都有资源限制(内存、CPU等)。一个容器可以使一个Unix进程,也可以是一个Linux cgroup,取决于YARN的配置。

Hive

Hive最早是由Facebook设计的,是一个建立在Hadoop基础之上的数据仓库,它提供了一些用于对Hadoop文件中的数据集进行数据整理、特殊查询和分析存储的工具。Hive提供的是一种结构化数据的机制,她支持类似于传统RDBMS中的SQL语言的查询语言,来帮助那些熟悉SQL的用户查询Hadoop中的数据,该查询语言称为Hive QL。与此同时,传统的MapReduce编程人员也可以在Mapper或Reducer中通过Hive QL查询数据。Hive编译器会把Hive QL编译成一组MapReduce任务,从而方便MapReduce编程人员进行Hadoop系统开发。

HBase

HBase是一个分布式的、面向列的开源数据库,该技术来源于Google论文《Bigtable:一个结构化数据的分布式存储系统》。如果Bigtable利用了Google文件系统(Google File System)提供的分布式数据存储方式一样,HBase在Hadoop之上提供了类似于Bigtable的能力。HBase不同于一般的关系数据库,原因有两个:其一,HBase是一个适合于非结构化数据存储的数据库;其二,HBase是基于列而不是基于行的模式。HBase和Bigtable使用相同的数据模型。用户将数据存储在一个表里,一个数据行拥有一个可选择的键和任意数量的列。由于HBase表是疏松的,用户可以为行定义各种不同的列。HBase主要用于需要随机访问、实时读写的大数据。

安装Hadoop

假设我们有3台服务器,分别是hadoop-ahadoop-bhadoop-c,3台操作系统均为CentOS 7,用来组成Hadoop集群,Hadoop集群规划:

HDFS:

hadoop-a:NameNode、SecondaryNameNode

hadoop-b:DataNode

hadoop-c:DataNode

YARN:

hadoop-a:ResouceManager

hadoop-b:NodeManager

hadoop-c:NodeManager

安装ssh和rsync

检查和安装ssh和rsync,ssh以后台服务运行,以便Hadoop脚本管理远程Hadoop守护进程。

安装Java

安装JAVA 8,这里安装的是OpenJDK 1.8,并配置好JAVA_HOMEPATH等环境变量。

配置/etc/hosts

编辑/etc/hosts文件,加入3台服务器的IP和主机名映射:

192.168.232.161 hadoop-a
192.168.232.162 hadoop-b
192.168.232.163 hadoop-c

配置SSH免密码登录

生成密钥:

$ ssh-keygen -t rsa -f ~/.ssh/id_rsa

将公钥追加到本机的authorized_keys中:

$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

然后将另外两台服务器生成的id_rsa.pub内容也追加到authorized_keys中,最终如下:

ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDUHjkpc8XZirKJTwac7bydFVIBbHMxUStGhd62G1s9Afi+6VMrtJniLsq4KWON9yUB2ivf0BS9kwt/Aema6v8yi14rt2Xelub8HocMomW90Qe36JZZbIdFFjs1LclUXEesim/X9dsKD9X6RmB+WBsQZVUh0wNa9n4JglBXHPhrGe6e8MBQWinHPJPjto5KbuXGH1ZFc52hGPCbvkKz1l5tfwjn9XoaEpPEdAHoYpmty9j/zUG38yEXOjgMIdwW4ilZgvJ6/+zkNnMcDTgHYY51lge+R3PBPW7Tx6tZasbXE0wA96adBWCwwYfJ7Yd+ugmEwOEwcVORKk5PV8RgiWEP kpali@hadoop-a
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCggtXsdW9p4F1fWyr+N14S43DZEE9MIATzl5cIao5pvA+rVxMz/dTvosDDwiTIJOlwTfX6qg0MVGMTXiiB/6R45v2gB75uSidFNdoblWf/wMf0svrSu6oZ7PcbgiiPyokBBhEQ39s+tU+CG3yfafpDNgWqLe0UE2V2eXIQNmv+Y2Xjl3Gj9PdwBdkQJYBC+9fZ+GUmcQVdLxZUI4D7gEAZ6Y6OeHDABaH+RHXFNk+yNdlklYKJ5QpBmMJmKEAEkA36mkXRXjbldlCpR/KZEarqRhez4gzsjiB1oM1Za4e7JJQoR5t5t62VytXJvn9z0g7DOSxnlhQ5N3Xp8PD+9BY9 kpali@hadoop-b
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCxnGMtmh/YFCtubKBy7k3cZQ4yzOWAlsS9KwaOz0EsL/djRKeREi50yxgFxm/Ht/rXlQzwdV3suv3eFjWbqHDXHyJ5Q1wItoUNEc7SpDAUuqdmTznh2DbpMFUTcO3czLFtwumsqINzwGlQdd/6hS2QdCHrT/w8jV1lH4TjdvgiSy5CKvgVRJzN2RlvaO9dgl+NOQBhUbcKm77bHjfBXizIqYWoPZtNh6z6cy6Iodg8qaZpM1eVBkeHN2vcF7cszOhZDEu5fuRqm1oBo7J4DZXuVHXXFmYy+FYrIvBoU4hqiTB1M9uZskkxdAsx6pes3S8PrpJMvOmqyQJ0OQ6u+oQz kpali@hadoop-c

修改authorized_keys 文件权限:

$ chmod 0600 ~/.ssh/authorized_keys

authorized_keys 文件复制到另外两台主机的同一目录下:

$ scp ~/.ssh/authorized_keys hadoop-b:~/.ssh/
$ scp ~/.ssh/authorized_keys hadoop-c:~/.ssh/

这样三台服务器分别就可以免密码SSH登录本机以及另外两台服务器。

安装Hadoop

官网下载安装包hadoop-2.9.2.tar.gz,分别安装到3个节点。

解压到当前目录下:

$ tar -xvf hadoop-2.9.2.tar.gz

使用root用户移动刚解压的hadoop-2.9.2目录到/opt目录下:

# mv hadoop-2.9.2 /opt/

修改.bash_profile,主要是新增HADOOP_HOME环境变量,参考如下:

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.191.b12-1.el7_6.x86_64/jre
export HADOOP_HOME=/opt/hadoop-2.9.2
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

使环境变量生效:

$ source ~/.bash_profile

配置Hadoop

三个节点分别创建必要的目录:

$ cd /opt/hadoop-2.9.2
$ mkdir -p hdfs/name hdfs/data hdfs/tmp

然后在hadoop-a节点,修改/opt/hadoop-2.9.2/etc/hadoop目录下的配置文件,主要修改以下配置文件:

hadoop-env.sh
yarn-env.sh
slaves
core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml

xml配置文件的说明和默认值可以参考以下链接:

core-site.xml

hdfs-site.xml

mapred-site.xml

yarn-site.xml

修改hadoop-env.sh中的JAVA_HOME

# The java implementation to use.
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.191.b12-1.el7_6.x86_64/jre

修改yarn-env.sh中的JAVA_HOME

# some Java parameters
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.191.b12-1.el7_6.x86_64/jre

修改slaves文件,修改为以下内容:

hadoop-b
hadoop-c

修改core-site.xml文件,配置为:

<configuration>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://hadoop-a:9000</value>
    </property>
    <property>
        <name>io.file.buffer.size</name>
        <value>131072</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/hadoop-2.9.2/hdfs/tmp</value>
    </property>
</configuration>

修改hdfs-site.xml文件,配置为:

<configuration>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>/opt/hadoop-2.9.2/hdfs/name</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>/opt/hadoop-2.9.2/hdfs/data</value>
    </property>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
    <property>
        <name>dfs.permissions</name>
        <value>false</value>
    </property>
</configuration>

拷贝mapred-site.xml.templatemapred-site.xml

$ cp /opt/hadoop-2.9.2/etc/hadoop/mapred-site.xml.template /opt/hadoop-2.9.2/etc/hadoop/mapred-site.xml

修改mapred-site.xml文件,配置为:

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

修改yarn-site.xml文件,配置为:

<configuration>
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>hadoop-a</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
</configuration>

拷贝配置文件到另外两个节点上:

$ scp -r /opt/hadoop-2.9.2/etc/hadoop hadoop-b:/opt/hadoop-2.9.2/etc/
$ scp -r /opt/hadoop-2.9.2/etc/hadoop hadoop-c:/opt/hadoop-2.9.2/etc/

启动Hadoop

启动Hadoop之前要先格式化,在NameNode也就是hadoop-a上执行:

$ hdfs namenode -format

hadoop-a上启动HDFS:

$ start-dfs.sh

hadoop-a上启动YARN:

$ start-yarn.sh

hadoop-a上执行jps命令,查看Hadoop相关进程:

$ jps | grep -v Jps
5387 ResourceManager
5672 SecondaryNameNode
5281 NameNode

hadoop-b上执行jps命令,查看Hadoop相关进程:

$ jps | grep -v Jps
21072 DataNode
21206 NodeManager

hadoop-c上执行jps命令,查看Hadoop相关进程:

$ jps | grep -v Jps
20685 DataNode
20810 NodeManager

使用Hadoop

可以访问Web页面http://hadoop-a:50070查看或管理HDFS,访问http://hadoop-a:8088查看或管理YARN。

Hello World

在HDFS上创建一个目录:

$ hadoop fs -mkdir -p /test/input

查看创建的目录:

$ hadoop fs -ls /test
Found 1 items
drwxr-xr-x   - kpali supergroup          0 2019-01-23 10:22 /test/input

创建两个文件:

$ echo "Hello World Bye World" > file01
$ echo "Hello Hadoop Goodbye Hadoop" > file02

上传文件到HDFS:

$ hadoop fs -put file0* /test/input

查看刚刚上传的文件:

$ hadoop fs -ls /test/input
Found 2 items
-rw-r--r--   1 kpali supergroup         22 2019-01-23 10:22 /test/input/file01
-rw-r--r--   1 kpali supergroup         28 2019-01-23 10:22 /test/input/file02

新建一个Maven项目,名称为hadoop-demopom.xml引入以下依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.9.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.9.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.9.2</version>
    </dependency>
</dependencies>

新建一个类WordCount,这里我们直接使用Hadoop自带的示例程序wordcount,这是一个统计单词的MapReduce程序,代码如下:

package com.example;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  // 创建一个Mapper类
  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    // 实现map函数
    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      // 提取单词,以键值对<单词,1次>写入context
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  // 创建一个Reducer类
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
    // 实现reduce函数
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      // 统计相同键(即同一个单词)出现的次数,并以键值对<单词,出现次数>写入context
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    // 创建一个Job,名称为word count
    Job job = Job.getInstance(conf, "word count");
    // 设置Job关联的一些类
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    // 设置输出的键值对类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    // 读取并设置输入目录
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    // 读取并设置输出目录
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    // 执行并等待Job完成
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

然后打包成jar包,上传到Hadoop服务器上,执行这个wordcount程序,将结果输出到/test/output目录下:

$ hadoop jar hadoop-demo.jar com.example.WordCount /test/input /test/output

等待执行结束,也可以在YARN Web界面查看执行结果。

查看输出文件:

$ hadoop fs -ls /test/output
Found 2 items
-rw-r--r--   1 kpali supergroup          0 2019-01-23 10:33 /test/output/_SUCCESS
-rw-r--r--   1 kpali supergroup         41 2019-01-23 10:33 /test/output/part-r-00000

查看输出结果:

$ hadoop fs -cat /test/output/part-r-00000
Bye     1
Goodbye 1
Hadoop  2
Hello   2
World   2

Hadoop流

Hadoop流提供了一个API,允许用户使用任何脚本语言写Map函数或者Reduce函数,Hadoop流的关键是,它使用UNIX标准流作为程序与Hadoop之间的接口,因此任何程序只要可以从标准输入流中读取数据并且可以写入数据到标准输出流,那么就可以通过Hadoop流使用其他语言编写MapReduce程序的Map函数或Reduce函数。

我们使用相同的输入数据,但是使用Linux的cat命令来做Map函数,使用wc命令来做Reduce函数,测试一下Hadoop流:

$ hadoop jar /opt/hadoop-2.9.2/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar -input  /test/input -output /test/output1 -mapper /bin/cat -reducer /usr/bin/wc

同样,查看一下输出结果:

$ hadoop fs -cat /test/output1/part-00000
      2       8      52

参考文档

http://hadoop.apache.org/docs/stable/

《Hadoop实战 第2版》

《Hadoop权威指南》

https://www.cnblogs.com/qingyunzong/p/8496127.html

Table of Contents