您的当前位置:首页正文

hadoop之HDFS学习笔记(一)

2023-11-21 来源:易榕旅网
hadoop之HDFS学习笔记(⼀)

主要内容:hdfs的整体运⾏机制,DATANODE存储⽂件块的观察,hdfs集群的搭建与配置,hdfs命令⾏客户端常见命令;业务系统中⽇志⽣成机制,HDFS的java客户端api基本使⽤。

1、什么是⼤数据基本概念《数据处理》

在互联⽹技术发展到现今阶段,⼤量⽇常、⼯作等事务产⽣的数据都已经信息化,⼈类产⽣的数据量相⽐以前有了爆炸式的增长,以前的传统的数据处理技术已经⽆法胜任,需求催⽣技术,⼀套⽤来处理海量数据的软件⼯具应运⽽⽣,这就是⼤数据!

处理海量数据的核⼼技术:海量数据存储:分布式海量数据运算:分布式

⼤数据的海量数据的存储和运算,核⼼技术就是分布式。

这些核⼼技术的实现是不需要⽤户从零开始造轮⼦的存储和运算,都已经有⼤量的成熟的框架来⽤

存储框架:

HDFS——分布式⽂件存储系统(HADOOP中的存储框架)HBASE——分布式数据库系统

KAFKA——分布式消息缓存系统(实时流式数据处理场景中应⽤⼴泛)

⽂件系统中的数据以⾮结构化居多,没有直观的结构,数据库中的信息多以表的形式存在,具有结构化,存在规律;查询的时候⽂本⽂件只能⼀⾏⼀⾏扫描,⽽数据库效率⾼很多,可以利⽤sql查询语法,数据库在存和取⽅便的多。

数据库和⽂件系统相⽐,数据库相当于在特定的⽂件系统上的软件封装。其实HBASE就是对HDFS的进⼀层封装,它的底层⽂件系统就是HDFS。

分布式消息缓存系统,既然是分布式,那就意味着横跨很多机器,意味着容量可以很⼤。和前两者相⽐它的数据存储形式是消息(不是表,也不是⽂件),消息可以看做有固定格式的⼀条数据,⽐如消息头,消息体等,消息体可以是json,数据库的⼀条记录,⼀个序列化对象等。消息最终存放在kafaka内部的特定的⽂件系统⾥。

运算框架:(要解决的核⼼问题就是帮⽤户将处理逻辑在很多机器上并⾏)MAPREDUCE—— 离线批处理/HADOOP中的运算框架SPARK —— 离线批处理/实时流式计算STORM —— 实时流式计算

离线批处理:数据是静态的,⼀次处理⼀⼤批数据。实时流式:数据在源源不断的⽣成,边⽣成,边计算

这些运算框架的思想都差不多,特别是mapreduce和spark,简单来看spark是对mapreduce的进⼀步封装;

运算框架和存储框架之间没有强耦合关系,spark可以读HDFS,HBASE,KAFKA⾥的数据,当然需要存储框架提供访问接⼝。

辅助类的⼯具(解放⼤数据⼯程师的⼀些繁琐⼯作):

HIVE —— 数据仓库⼯具:可以接收sql,翻译成mapreduce或者spark程序运⾏FLUME——数据采集SQOOP——数据迁移

ELASTIC SEARCH —— 分布式的搜索引擎

flume⽤于⾃动采集数据源机器上的数据到⼤数据集群中。

HIVE看起来像⼀个数据库,但其实不是,Hive中存了⼀些需要分析的数据,然后在直接写sql进⾏分析,hive接收sql,翻译成mapreduce或者spark程序运⾏;hive本质是mapreduce或spark,我们只需要写sql逻辑⽽不是mapreduce逻辑,Hive⾃动完成对sql的翻译,⽽且还是在海量数据集上。.......

换个⾓度说,⼤数据是:1、有海量的数据

2、有对海量数据进⾏挖掘的需求

3、有对海量数据进⾏挖掘的软件⼯具(hadoop、spark、storm、flink、tez、impala......)

⼤数据在现实⽣活中的具体应⽤数据处理的最典型应⽤:公司的产品运营情况分析

电商推荐系统:基于海量的浏览⾏为、购物⾏为数据,进⾏⼤量的算法模型的运算,得出各类推荐结论,以供电商⽹站页⾯来为⽤户进⾏商品推荐

精准⼴告推送系统:基于海量的互联⽹⽤户的各类数据,统计分析,进⾏⽤户画像(得到⽤户的各种属性标签),然后可以为⼴告主进⾏有针对性的精准的⼴告投放

2、什么是hadoophadoop中有3个核⼼组件:

分布式⽂件系统:HDFS —— 实现将⽂件分布式存储在很多的服务器上分布式运算编程框架:MAPREDUCE —— 实现在很多机器上分布式并⾏运算

分布式资源调度平台:YARN —— 帮⽤户调度⼤量的mapreduce程序,并合理分配运算资源

3、hdfs整体运⾏机制hdfs:分布式⽂件系统

hdfs有着⽂件系统共同的特征:1、有⽬录结构,顶层⽬录是: /2、系统中存放的就是⽂件

3、系统可以提供对⽂件的:创建、删除、修改、查看、移动等功能

hdfs跟普通的单机⽂件系统有区别:

1、单机⽂件系统中存放的⽂件,是在⼀台机器的操作系统中2、hdfs的⽂件系统会横跨N多的机器

3、单机⽂件系统中存放的⽂件,是在⼀台机器的磁盘上

4、hdfs⽂件系统中存放的⽂件,是落在n多机器的本地单机⽂件系统中(hdfs是⼀个基于linux本地⽂件系统之上的⽂件系统)

hdfs的⼯作机制:

1、客户把⼀个⽂件存⼊hdfs,其实hdfs会把这个⽂件切块后,分散存储在N台linux机器系统中(负责存储⽂件块的⾓⾊:data node)<准确来说:切块的⾏为是由客户端决定的>2、⼀旦⽂件被切块存储,那么,hdfs中就必须有⼀个机制,来记录⽤户的每⼀个⽂件的切块信息,及每⼀块的具体存储机器(负责记录块信息的⾓⾊是:name node)3、为了保证数据的安全性,hdfs可以将每⼀个⽂件块在集群中存放多个副本(到底存⼏个副本,是由当时存⼊该⽂件的客户端指定的)

综述:⼀个hdfs系统,由⼀台运⾏了namenode的服务器,和N台运⾏了datanode的服务器组成!

4、搭建hdfs分布式集群4.1 hdfs集群组成结构:4.2 安装hdfs集群的具体步骤:4.2.1、⾸先需要准备N台linux服务器学习阶段,⽤虚拟机即可!

先准备4台虚拟机:1个namenode节点 + 3 个datanode 节点

4.2.2、修改各台机器的主机名和ip地址主机名:hdp-01 对应的ip地址:192.168.33.11主机名:hdp-02 对应的ip地址:192.168.33.12主机名:hdp-03 对应的ip地址:192.168.33.13主机名:hdp-04 对应的ip地址:192.168.33.14

4.2.3、从windows中⽤CRT软件进⾏远程连接

在windows中将各台linux机器的主机名配置到的windows的本地域名映射⽂件中:c:/windows/system32/drivers/etc/hosts

192.168.33.11 hdp-01192.168.33.12 hdp-02192.168.33.13 hdp-03192.168.33.14 hdp-04⽤crt连接上后,修改⼀下crt的显⽰配置(字号,编码集改为UTF-8):

4.2.3、配置linux服务器的基础软件环境

防⽕墙

关闭防⽕墙:service iptables stop 关闭防⽕墙⾃启: chkconfig iptables off

安装jdk:(hadoop体系中的各软件都是java开发的)  1) 利⽤alt+p 打开sftp窗⼝,然后将jdk压缩包拖⼊sftp窗⼝

  2) 然后在linux中将jdk压缩包解压到/root/apps 下  3) 配置环境变量:JAVA_HOME PATH  vi /etc/profile 在⽂件的最后,加⼊:export JAVA_HOME=/root/apps/jdk1.8.0_60export PATH=$PATH:$JAVA_HOME/bin

  4) 修改完成后,记得 source /etc/profile使配置⽣效

  5) 检验:在任意⽬录下输⼊命令: java -version 看是否成功执⾏  6) 将安装好的jdk⽬录⽤scp命令拷贝到其他机器

  7) 将/etc/profile配置⽂件也⽤scp命令拷贝到其他机器并分别执⾏source命令

集群内主机的域名映射配置在hdp-01上,vi /etc/hosts

127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4::1 localhost localhost.localdomain localhost6 localhost6.localdomain6192.168.33.11 hdp-01192.168.33.12 hdp-02192.168.33.13 hdp-03192.168.33.14 hdp-04然后,将hosts⽂件拷贝到集群中的所有其他机器上scp /etc/hosts hdp-02:/etc/scp /etc/hosts hdp-03:/etc/scp /etc/hosts hdp-04:/etc/

补如果在执⾏scp命令的时候,提⽰没有scp命令,则可以配置⼀个本地yum源

来安装充1、先在虚拟机中配置cdrom为⼀个centos的安装镜像iso⽂件提⽰:2、在linux系统中将光驱挂在到⽂件系统中(某个⽬录)

3、mkdir /mnt/cdrom

4、mount -t iso9660 -o loop /dev/cdrom /mnt/cdrom5、检验挂载是否成功: ls /mnt/cdrom6、3、配置yum的仓库地址配置⽂件

7、yum的仓库地址配置⽂件⽬录: /etc/yum.repos.d8、先将⾃带的仓库地址配置⽂件批量更名:

9、然后,拷贝⼀个出来进⾏修改

10、修改完配置⽂件后,再安装scp命令:11、yum install openssh-clients -y

4.2.4、安装hdfs集群

1、上传hadoop安装包到hdp-01bin⽂件为hadoop功能命令,sbin中为集群管理命令。

2、修改配置⽂件 要点提⽰核⼼配置参数:

1) 指定hadoop的默认⽂件系统为:hdfs2) 指定hdfs的namenode节点为哪台机器3) 指定namenode软件存储元数据的本地⽬录4) 指定datanode软件存放⽂件块的本地⽬录

hadoop的配置⽂件在:/root/apps/hadoop安装⽬录/etc/hadoop/

hadoop中的其他组件如mapreduce,yarn等,这些组将会去读数据,指定hadoop的默认⽂件系统为:hdfs,就是告诉这些组件去hdfs中读数据;该项配置意味dadoop中的组件可以访问各种⽂件系统。

若不指定数据的存放⽬录,hadoop默认将数据存放在/temp下。

可以参考官⽹的默认配置信息。1) 修改hadoop-env.shexport JAVA_HOME=/root/apps/jdk1.8.0_602) 修改core-site.xml

fs.defaultFShdfs://hdp-01:9000/

hdfs://hdp-01:9000包含两层意思:  1、指定默认的⽂件系统。  2、指明了namenode是谁。value中的值是URI风格

3) 修改hdfs-site.xml配置namenode和datanode的⼯作⽬录,添加secondary name node。

dfs.namenode.name.dir/root/hdpdata/name/

dfs.datanode.data.dir/root/hdpdata/data

dfs.namenode.secondary.http-addresshdp-02:50090

4) 拷贝整个hadoop安装⽬录到其他机器   scp -r /root/apps/hadoop-2.8.1 hdp-02:/root/apps/  scp -r /root/apps/hadoop-2.8.1 hdp-03:/root/apps/  scp -r /root/apps/hadoop-2.8.1 hdp-04:/root/apps/

5) 启动HDFS

所谓的启动HDFS,就是在对的机器上启动对的软件

要点要运⾏hadoop的命令,需要在linux环境中配置HADOOP_HOME和PATH环境变量vi /etc/profile

提⽰:

export JAVA_HOME=/root/apps/jdk1.8.0_60export HADOOP_HOME=/root/apps/hadoop-2.8.1

export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

⾸先,初始化namenode的元数据⽬录

要在hdp-01上执⾏hadoop的⼀个命令来初始化namenode的元数据存储⽬录hadoop namenode -format

  创建⼀个全新的元数据存储⽬录  ⽣成记录元数据的⽂件fsimage

  ⽣成集群的相关标识:如:集群id——clusterID

该步骤叫做namenode的初始化也叫格式化,本质是建⽴namenode运⾏所需要的⽬录以及⼀些必要的⽂件,所以该操作⼀般只在集群第⼀次启动之前执⾏。

然后,启动namenode进程(在hdp-01上)hadoop-daemon.sh start namenode

启动完后,⾸先⽤jps查看⼀下namenode的进程是否存在

namenode就是⼀个java软件,我们知道启动⼀个java软件需要主类的main⽅法 java xxx.java - 若⼲参数,处于⽅便的考虑,hadoop中提供了⼀个通⽤的软件启动脚本hadoop-daemon.sh,脚本可以接受参数,专门⽤来启动hadoop中的软件。

可以看到namenode在监听两个端⼝,9000⽤来和客户端通信(9000为RPC端⼝号,内部进程之间互相通信的端⼝,datanode和namenode的通信),接受hdfs客户端的请求,50070是web服务端⼝,也就是说namenode内置⼀个web服务器,http客户端可以通过次端⼝发送请求。然后,在windows中⽤浏览器访问namenode提供的web端⼝:50070然后,启动众datanode们(在任意地⽅)hadoop-daemon.sh start datanode

下图是datanode的⼀下信息展⽰,可以看到datanode内部通信的端⼝号是50010,⽽且datanode也提供了问访问端⼝50075.

6) ⽤⾃动批量启动脚本来启动HDFShdfs其实就是⼀堆java软件,我们可以⾃⼰⼿动hadoop-daemon.sh逐个启动,也可以使⽤hadoop提供的批量启动脚本。1) 先配置hdp-01到集群中所有机器(包含⾃⼰)的免密登陆

2) 配完免密后,可以执⾏⼀次 ssh 0.0.0.0

3) 修改hadoop安装⽬录中/etc/hadoop/slaves(把需要启动datanode进程的节点列⼊)

hdp-01hdp-02hdp-03hdp-04core-site.xml中配置过namenode,但是需要批量启动那些datanode呢,该⽂件/etc/hadoop/slaves的配置就是解决这个问题的,该⽂件就是给启动脚本看的。4) 在hdp-01上⽤脚本:start-dfs.sh 来⾃动启动整个集群5) 如果要停⽌,则⽤脚本:stop-dfs.sh

start-dfs.sh、stop-dfs.sh会启动、关闭namenode,datanode和secondnamenode当然你也可以⾃⼰写脚本来做上述的事情 ,如下所⽰。

5、hdfs的客户端操作hdfs装好之后,接下来的⼯作就是hdfs⾥传东西,去东西,由客户端来完成。

5.1、客户端的理解hdfs的客户端有多种形式:1、⽹页形式2、命令⾏形式

3、客户端在哪⾥运⾏,没有约束,只要运⾏客户端的机器能够跟hdfs集群联⽹们

  对于客户端来讲,hdfs是⼀个整体,⽹页版的客户端主要是⽤来查看hdfs信息的,可以创建⽬录,但是需要权限

命令⾏客户端

bin命令中的 hadoop 和 hdfs 都可以启动 hdfs 客户端,hadoop和hdfs都是脚本,都会去启动⼀个hdfs的java客户端。java客户端在安装包的jar包中./hadoop fs -ls /

表⽰hadoop要访问hdfs,该脚本就会去启动hdfs客户端,客户端可以接收参数,⽐如查看hdfs根⽬录。

⽂件的切块⼤⼩和存储的副本数量,都是由客户端决定!所谓的由客户端决定,是通过配置参数来定的

hdfs的客户端会读以下两个参数,来决定切块⼤⼩(默认128M)、副本数量(默认3):切块⼤⼩的参数: dfs.blocksize副本数量的参数: dfs.replication

如果使⽤命令⾏客户端时,上⾯两个参数应该配置在客户端机器的hadoop⽬录中的hdfs-site.xml中配置,(命令⾏客户端本质就是启动了⼀个java客户端,这个客户端在启动的时候会将它依赖的所有jar包加⼊classpath中,客户端会从jar包中,加载xx-default.xml来获得默认的配置⽂件,也可以在hadoop/etc/xxx-site.xml中配置具体的参数来覆盖默认值。此时的/etc下的配置⽂件就是客户⾃定义的配置⽂件,也会被java客户端加载【客户端可以运⾏在任何地⽅】);当然也可以在具体代码中指定,见6节中的核⼼代码

dfs.blocksize64m

dfs.replication2

5.1.1、上传过程下图为datanode中的数据,.meta是该block的校验和信息。我们可以通过linux cat命令将两个块合并,会发现与原来的⽂件是⼀样的。

5.1.2、下载过程客户端⾸先回去namenode上去查找,有没有请求的hdfs路径下的⽂件,有的话都有该⽂件被切割成⼏块,每块有⼏个副本,这些副本都存放在集群中的哪些机器上,然后去存放了第⼀块数据的某⼀台机器上去下载第⼀块数据,将数据追加到本地,然后去下载下⼀块数据,继续追加到本地⽂件,知道下载完所有的块。

5.2、hdfs客户端的常⽤操作命令1、上传⽂件到hdfs中hadoop fs -put /本地⽂件 /aaa

2、下载⽂件到客户端本地磁盘

hadoop fs -get /hdfs中的路径 /本地磁盘⽬录

3、在hdfs中创建⽂件夹hadoop fs -mkdir -p /aaa/xxx

4、移动hdfs中的⽂件(更名)

hadoop fs -mv /hdfs的路径1 /hdfs的另⼀个路径2

复制hdfs中的⽂件到hdfs的另⼀个⽬录hadoop fs -cp /hdfs路径_1 /hdfs路径_2

5、删除hdfs中的⽂件或⽂件夹hadoop fs -rm -r /aaa

6、查看hdfs中的⽂本⽂件内容hadoop fs -cat /demo.txthadoop fs -tail /demo.txthadoop fs -tail -f /demo.txthadoop fs -text /demo.txt

7、查看hdfs⽬录下有哪些⽂件hadoop fs –ls /

8、追加本地⽂件到hdfs中的⽂件

hadoop fs -appendToFile 本地路径 /hdfs路径

9、权限修改

hadoop fs -chmod username1:usergroup1 /hdfs路径

要说明的是,hdfs中的⽤户和⽤户组这是⼀个名字称呼,与linux不⼀样,linux中不能将选线分配给⼀个不存在的⽤户。

可以查看hadoop fs 不带任何参数,来查看hdfs所⽀持的命令

Usage: hadoop fs [generic options]

[-appendToFile ... ] [-cat [-ignoreCrc] ...] [-checksum ...]

[-chgrp [-R] GROUP PATH...]

[-chmod [-R] PATH...] [-chown [-R] [OWNER][:[GROUP]] PATH...]

[-copyFromLocal [-f] [-p] [-l] [-d] ... ]

[-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] ... ] [-count [-q] [-h] [-v] [-t []] [-u] [-x] ...] [-cp [-f] [-p | -p[topax]] [-d] ... ]

[-createSnapshot []] [-deleteSnapshot ] [-df [-h] [ ...]]

[-du [-s] [-h] [-x] ...] [-expunge]

[-find ... ...]

[-get [-f] [-p] [-ignoreCrc] [-crc] ... ] [-getfacl [-R] ]

[-getfattr [-R] {-n name | -d} [-e en] ]

[-getmerge [-nl] [-skip-empty-file] ] [-help [cmd ...]]

[-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [ ...]] [-mkdir [-p] ...]

[-moveFromLocal ... ] [-moveToLocal ] [-mv ... ]

[-put [-f] [-p] [-l] [-d] ... ]

[-renameSnapshot ] [-rm [-f] [-r|-R] [-skipTrash] [-safely] ...] [-rmdir [--ignore-fail-on-non-empty]

...]

[-setfacl [-R] [{-b|-k} {-m|-x } ]|[--set ]]

[-setfattr {-n name [-v value] | -x name} ] [-setrep [-R] [-w] ...] [-stat [format] ...] [-tail [-f] ]

[-test -[defsz] ]

[-text [-ignoreCrc] ...] [-touchz ...]

[-truncate [-w] ...] [-usage [cmd ...]]

6、hdfs的java客户端编程HDFS客户端编程应⽤场景:数据采集业务系统中⽇志⽣成机制

数据采集程序其实就是通过对java客户端编程,将数据不断的上传到hdfs。在windows开发环境中做⼀些准备⼯作:

1、在windows的某个路径中解压⼀份windows版本的hadoop安装包

2、将解压出的hadoop⽬录配置到windows的环境变量中:HADOOP_HOME

原因:若不配置环境变量,会在下载hdfs⽂件是出错,是由于使⽤hadoop的FileSystem保存⽂件到本地的时候出于效率的考虑,会使⽤hadoop安装包中的c语⾔库,显然没有配置hadoop环境变量时是找不到该c语⾔类库中的⽂件的;然⽽上传⽂件到hdfs没有类似问题;

6.1、核⼼代码1、将hdfs客户端开发所需的jar导⼊⼯程(jar包可在hadoop安装包中找到common和hdfs)2、写代码

6.1.1、获取hdfs客户端要点:要对hdfs中的⽂件进⾏操作,代码中⾸先需要获得⼀个hdfs的客户端对象Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(new URI(\"hdfs://hdp-01:9000\"),conf,\"root\");

完整代码如下:

/**

* Configuration参数对象的机制:

* 构造时,会加载jar包中的默认配置 xx-default.xml(core-default.xmlhdfs-default.xml) * 再加载 ⽤户配置xx-site.xml ,覆盖掉默认参数

* 构造完成之后,还可以conf.set(\"p\,会再次覆盖⽤户配置⽂件中的参数值 */

// new Configuration()会从项⽬的classpath中加载core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等⽂件 Configuration conf = new Configuration();

// 指定本客户端上传⽂件到hdfs时需要保存的副本数为:2 conf.set(\"dfs.replication\

// 指定本客户端上传⽂件到hdfs时切块的规格⼤⼩:64M conf.set(\"dfs.blocksize\

// 构造⼀个访问指定HDFS系统的客户端对象: 参数1:——HDFS系统的URI,参数2:——客户端要特别指定的参数,参数3:客户端的⾝份(⽤户名) FileSystem fs = FileSystem.get(new URI(\"hdfs://hdp-01:9000/\"), conf, \"root\");

// 上传⼀个⽂件到HDFS中

fs.copyFromLocalFile(new Path(\"D:/install-pkgs/hbase-1.2.1-bin.tar.gz\"), new Path(\"/aaa/\"));

fs.close();View Code

6.1.2、对⽂件进⾏操作上传、下载⽂件;⽂件夹的创建和删除、⽂件的移动和复制、查看⽂件夹和⽂件等。3、利⽤fs对象的⽅法进⾏⽂件操作⽅法均与命令⾏⽅法对应,⽐如:上传⽂件

fs.copyFromLocalFile(new Path(\"本地路径\"),new Path(\"hdfs的路径\")); 下载⽂件

fs.copyToLocalFile(new Path(\"hdfs的路径\"),new Path(\"本地路径\")) 对⽂件的增删改查如下,对⽂件数据的操作后续介绍。

FileSystem fs = null;

@Before

public void init() throws Exception{

Configuration conf = new Configuration(); conf.set(\"dfs.replication\ conf.set(\"dfs.blocksize\

fs = FileSystem.get(new URI(\"hdfs://hdp-01:9000/\"), conf, \"root\"); } /**

* 从HDFS中下载⽂件到客户端本地磁盘 * @throws IOException

* @throws IllegalArgumentException */

@Test

public void testGet() throws IllegalArgumentException, IOException{

fs.copyToLocalFile(new Path(\"/hdp20-05.txt\"), new Path(\"f:/\")); fs.close(); } /**

* 在hdfs内部移动⽂件\\修改名称 */

@Test

public void testRename() throws Exception{

fs.rename(new Path(\"/install.log\"), new Path(\"/aaa/in.log\"));

fs.close(); } /**

* 在hdfs中创建⽂件夹 */

@Test

public void testMkdir() throws Exception{

fs.mkdirs(new Path(\"/xx/yy/zz\"));

fs.close(); } /**

* 在hdfs中删除⽂件或⽂件夹 */

@Test

public void testRm() throws Exception{

fs.delete(new Path(\"/aaa\"), true);

fs.close(); } /**

* 查询hdfs指定⽬录下的⽂件信息 */

@Test

public void testLs() throws Exception{

// 只查询⽂件的信息,不返回⽂件夹的信息

RemoteIterator iter = fs.listFiles(new Path(\"/\"), true);

while(iter.hasNext()){

LocatedFileStatus status = iter.next();

System.out.println(\"⽂件全路径:\"+status.getPath()); System.out.println(\"块⼤⼩:\"+status.getBlockSize()); System.out.println(\"⽂件长度:\"+status.getLen());

System.out.println(\"副本数量:\"+status.getReplication());

System.out.println(\"块信息:\"+Arrays.toString(status.getBlockLocations()));

System.out.println(\"--------------------------------\"); }

fs.close(); } /**

* 查询hdfs指定⽬录下的⽂件和⽂件夹信息 */

@Test

public void testLs2() throws Exception{

FileStatus[] listStatus = fs.listStatus(new Path(\"/\"));

for(FileStatus status:listStatus){

System.out.println(\"⽂件全路径:\"+status.getPath());

System.out.println(status.isDirectory()?\"这是⽂件夹\":\"这是⽂件\"); System.out.println(\"块⼤⼩:\"+status.getBlockSize()); System.out.println(\"⽂件长度:\"+status.getLen());

System.out.println(\"副本数量:\"+status.getReplication());

System.out.println(\"--------------------------------\"); }

fs.close(); }View Code

6.1.3、对⽂件数据进⾏操作 同过客户端使⽤open打开流对象来读取hdfs中⽂件的具体数据,包括指定偏移量来读取特定范围的数据;通过客户端向hdfs⽂件追加数据。

/**

* 读取hdfs中的⽂件的内容 *

* @throws IOException

* @throws IllegalArgumentException */

@Test

public void testReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path(\"/test.txt\"));

BufferedReader br = new BufferedReader(new InputStreamReader(in, \"utf-8\")); String line = null;

while ((line = br.readLine()) != null) { System.out.println(line); }

br.close(); in.close(); fs.close(); }

/**

* 读取hdfs中⽂件的指定偏移量范围的内容 * *

* 作业题:⽤本例中的知识,实现读取⼀个⽂本⽂件中的指定BLOCK块中的所有数据 *

* @throws IOException

* @throws IllegalArgumentException */

@Test

public void testRandomReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path(\"/xx.dat\")); // 将读取的起始位置进⾏指定 in.seek(12); // 读16个字节

byte[] buf = new byte[16]; in.read(buf);

System.out.println(new String(buf)); in.close(); fs.close(); }View Code

写数据,create提供了丰富的重载函数,轻松实现覆盖,追加,以及指定缓存⼤⼩,副本数量等等信息。

/**

* 往hdfs中的⽂件写内容 *

* @throws IOException

* @throws IllegalArgumentException */

@Test

public void testWriteData() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path(\"/zz.jpg\"), false); // D:\\images\\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg

FileInputStream in = new FileInputStream(\"D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg\"); byte[] buf = new byte[1024]; int read = 0;

while ((read = in.read(buf)) != -1) { out.write(buf,0,read); }

in.close(); out.close(); fs.close(); }View Code

7、HDFS实例hdfs版本wordcount程序。任务描述:

1、从hdfs⽂件中读取数据,每次读取⼀⾏数据;

2、将数据交给具体的单词统计业务去作业(使⽤⾯向接⼝编程,当业务逻辑改变时,⽆需修改主程序代码);3、并将该⾏数据产⽣的结果存⼊缓存中(可以⽤hashmap模拟)数据采集设计:

1、流程

启动⼀个定时任务:——定时探测⽇志源⽬录——获取需要采集的⽂件

——移动这些⽂件到⼀个待上传临时⽬录

——遍历待上传⽬录中各⽂件,逐⼀传输到HDFS的⽬标路径,同时将传输完成的⽂件移动到备份⽬录启动⼀个定时任务:

——探测备份⽬录中的备份数据,检查是否已超出最长备份时长,如果超出,则删除

2、规划各种路径

⽇志源路径: d:/logs/accesslog/待上传临时⽬录: d:/logs/toupload/备份⽬录: d:/logs/backup/⽇期/HDFS存储路径: /logs/⽇期

HDFS中的⽂件的前缀:access_log_HDFS中的⽂件的后缀:.log将路径配置写⼊属性⽂件

MAPPER_CLASS=cn.edu360.hdfs.wordcount.CaseIgnorWcountMapperINPUT_PATH=/wordcount/input

OUTPUT_PATH=/wordcount/output2

View Code

主程序代码⽰例:

import java.io.BufferedReader;import java.io.InputStreamReader;import java.net.URI;

import java.util.HashMap;import java.util.Map.Entry;import java.util.Properties;import java.util.Set;

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.LocatedFileStatus;import org.apache.hadoop.fs.Path;

import org.apache.hadoop.fs.RemoteIterator;

public class HdfsWordcount {

public static void main(String[] args) throws Exception{ /**

* 初始化⼯作 */

Properties props = new Properties();

props.load(HdfsWordcount.class.getClassLoader().getResourceAsStream(\"job.properties\"));

Path input = new Path(props.getProperty(\"INPUT_PATH\")); Path output = new Path(props.getProperty(\"OUTPUT_PATH\"));

Class mapper_class = Class.forName(props.getProperty(\"MAPPER_CLASS\")); Mapper mapper = (Mapper) mapper_class.newInstance();

Context context = new Context(); /**

* 处理数据 */

FileSystem fs = FileSystem.get(new URI(\"hdfs://hdp-01:9000\"), new Configuration(), \"root\"); RemoteIterator iter = fs.listFiles(input, false);

while(iter.hasNext()){

LocatedFileStatus file = iter.next();

FSDataInputStream in = fs.open(file.getPath());

BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = null; // 逐⾏读取

while ((line = br.readLine()) != null) {

// 调⽤⼀个⽅法对每⼀⾏进⾏业务处理 mapper.map(line, context); }

br.close(); in.close(); } /**

* 输出结果 */

HashMap contextMap = context.getContextMap();

if(fs.exists(output)){

throw new RuntimeException(\"指定的输出⽬录已存在,请更换......!\"); }

FSDataOutputStream out = fs.create(new Path(output,new Path(\"res.dat\")));

Set> entrySet = contextMap.entrySet(); for (Entry entry : entrySet) {

out.write((entry.getKey().toString()+\"\\"+entry.getValue()+\"\\n\").getBytes()); }

out.close();

fs.close();

System.out.println(\"恭喜!数据统计完成.....\"); } }

View Code

⾃定义的业务接⼝

public interface Mapper {

public void map(String line,Context context); }

View Code

业务实现类1

public class WordCountMapper implements Mapper{ @Override

public void map(String line, Context context) {

String[] words = line.split(\" \");

for (String word : words) {

Object value = context.get(word); if(null==value){

context.write(word, 1); }else{

int v = (int)value;

context.write(word, v+1); } } }}

View Code

业务实现类2

public class CaseIgnorWcountMapper implements Mapper { @Override

public void map(String line, Context context) { String[] words = line.toUpperCase().split(\" \"); for (String word : words) {

Object value = context.get(word); if (null == value) {

context.write(word, 1); } else {

int v = (int) value;

context.write(word, v + 1); } } }}

View Code

缓存模拟

import java.util.HashMap;

public class Context {

private HashMap contextMap = new HashMap<>();

public void write(Object key,Object value){

contextMap.put(key, value); }

public Object get(Object key){

return contextMap.get(key); }

public HashMap getContextMap(){ return contextMap; }}

View Code

8、实战描述 需求描述:

在业务系统的服务器上,业务程序会不断⽣成业务⽇志(⽐如⽹站的页⾯访问⽇志)业务⽇志是⽤log4j⽣成的,会不断地切出⽇志⽂件

需要定期(⽐如每⼩时)从业务服务器上的⽇志⽬录中,探测需要采集的⽇志⽂件(access.log,不是直接采集数据),发往HDFS

注意点:业务服务器可能有多台(hdfs上的⽂件名不能直接⽤⽇志服务器上的⽂件名)当天采集到的⽇志要放在hdfs的当天⽬录中

采集完成的⽇志⽂件,需要移动到到⽇志服务器的⼀个备份⽬录中

定期检查(⼀⼩时检查⼀次)备份⽬录,将备份时长超出24⼩时的⽇志⽂件清除Timer timer = new Timer()timer.schedual() 简易版⽇志采集主程序

import java.util.Timer;

public class DataCollectMain {

public static void main(String[] args) {

Timer timer = new Timer();

timer.schedule(new CollectTask(), 0, 60*60*1000L);

timer.schedule(new BackupCleanTask(), 0, 60*60*1000L); }}

View Code

⽇志收集定时任务类

import java.io.File;

import java.io.FilenameFilter;import java.net.URI;

import java.text.SimpleDateFormat;import java.util.Arrays;import java.util.Date;

import java.util.Properties;import java.util.TimerTask;import java.util.UUID;

import org.apache.commons.io.FileUtils;

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.log4j.Logger;

public class CollectTask extends TimerTask { @Override

public void run() {

/**

* ——定时探测⽇志源⽬录 ——获取需要采集的⽂件 ——移动这些⽂件到⼀个待上传临时⽬录

* ——遍历待上传⽬录中各⽂件,逐⼀传输到HDFS的⽬标路径,同时将传输完成的⽂件移动到备份⽬录 * */ try {

// 获取配置参数

Properties props = PropertyHolderLazy.getProps(); // 构造⼀个log4j⽇志对象

Logger logger = Logger.getLogger(\"logRollingFile\");

// 获取本次采集时的⽇期

SimpleDateFormat sdf = new SimpleDateFormat(\"yyyy-MM-dd-HH\"); String day = sdf.format(new Date());

File srcDir = new File(props.getProperty(Constants.LOG_SOURCE_DIR)); // 列出⽇志源⽬录中需要采集的⽂件

File[] listFiles = srcDir.listFiles(new FilenameFilter() {

@Override

public boolean accept(File dir, String name) {

if (name.startsWith(props.getProperty(Constants.LOG_LEGAL_PREFIX))) { return true; }

return false; } });

// 记录⽇志

logger.info(\"探测到如下⽂件需要采集:\" + Arrays.toString(listFiles));

// 将要采集的⽂件移动到待上传临时⽬录

File toUploadDir = new File(props.getProperty(Constants.LOG_TOUPLOAD_DIR)); for (File file : listFiles) {

FileUtils.moveFileToDirectory(file, toUploadDir, true); }

// 记录⽇志

logger.info(\"上述⽂件移动到了待上传⽬录\" + toUploadDir.getAbsolutePath()); // 构造⼀个HDFS的客户端对象

FileSystem fs = FileSystem.get(new URI(props.getProperty(Constants.HDFS_URI)), new Configuration(), \"root\"); File[] toUploadFiles = toUploadDir.listFiles();

// 检查HDFS中的⽇期⽬录是否存在,如果不存在,则创建

Path hdfsDestPath = new Path(props.getProperty(Constants.HDFS_DEST_BASE_DIR) + day); if (!fs.exists(hdfsDestPath)) { fs.mkdirs(hdfsDestPath); }

// 检查本地的备份⽬录是否存在,如果不存在,则创建

File backupDir = new File(props.getProperty(Constants.LOG_BACKUP_BASE_DIR) + day + \"/\"); if (!backupDir.exists()) { backupDir.mkdirs(); }

for (File file : toUploadFiles) {

// 传输⽂件到HDFS并改名access_log_

Path destPath = new Path(hdfsDestPath + \"/\" + UUID.randomUUID() + props.getProperty(Constants.HDFS_FILE_SUFFIX)); fs.copyFromLocalFile(new Path(file.getAbsolutePath()), destPath);

// 记录⽇志

logger.info(\"⽂件传输到HDFS完成:\" + file.getAbsolutePath() + \"-->\" + destPath); // 将传输完成的⽂件移动到备份⽬录

FileUtils.moveFileToDirectory(file, backupDir, true);

// 记录⽇志

logger.info(\"⽂件备份完成:\" + file.getAbsolutePath() + \"-->\" + backupDir); }

} catch (Exception e) { e.printStackTrace(); } }}

View Code

定期清理过时备份⽇志

import java.io.File;

import java.text.SimpleDateFormat;import java.util.Date;

import java.util.TimerTask;

import org.apache.commons.io.FileUtils;

public class BackupCleanTask extends TimerTask { @Override

public void run() {

SimpleDateFormat sdf = new SimpleDateFormat(\"yyyy-MM-dd-HH\"); long now = new Date().getTime(); try {

// 探测本地备份⽬录

File backupBaseDir = new File(\"d:/logs/backup/\"); File[] dayBackDir = backupBaseDir.listFiles(); // 判断备份⽇期⼦⽬录是否已超24⼩时 for (File dir : dayBackDir) {

long time = sdf.parse(dir.getName()).getTime(); if(now-time>24*60*60*1000L){ FileUtils.deleteDirectory(dir); } }

} catch (Exception e) { e.printStackTrace(); } }}

View Code

配置信息提取到属性配置⽂件中,并写成常量,以单例设计模式去加载配置信息。

LOG_SOURCE_DIR=d:/logs/accesslog/LOG_TOUPLOAD_DIR=d:/logs/toupload/LOG_BACKUP_BASE_DIR=d:/logs/backup/LOG_BACKUP_TIMEOUT=24

LOG_LEGAL_PREFIX=access.log.HDFS_URI=hdfs://hdp-01:9000/HDFS_DEST_BASE_DIR=/logs/HDFS_FILE_PREFIX=access_log_HDFS_FILE_SUFFIX=.logView Code

public class Constants {

/**

* ⽇志源⽬录参数key */

public static final String LOG_SOURCE_DIR = \"LOG_SOURCE_DIR\"; /**

* ⽇志待上传⽬录参数key */

public static final String LOG_TOUPLOAD_DIR = \"LOG_TOUPLOAD_DIR\";

public static final String LOG_BACKUP_BASE_DIR = \"LOG_BACKUP_BASE_DIR\";

public static final String LOG_BACKUP_TIMEOUT = \"LOG_BACKUP_TIMEOUT\";

public static final String LOG_LEGAL_PREFIX = \"LOG_LEGAL_PREFIX\";

public static final String HDFS_URI = \"HDFS_URI\";

public static final String HDFS_DEST_BASE_DIR = \"HDFS_DEST_BASE_DIR\";

public static final String HDFS_FILE_PREFIX = \"HDFS_FILE_PREFIX\";

public static final String HDFS_FILE_SUFFIX = \"HDFS_FILE_SUFFIX\";}

View Code

import java.util.Properties;

/**

* 单例模式:懒汉式——考虑了线程安全 * @author ThinkPad * */

public class PropertyHolderLazy { private static Properties prop = null;

public static Properties getProps() throws Exception { if (prop == null) {

synchronized (PropertyHolderLazy.class) { if (prop == null) {

prop = new Properties();

prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream(\"collect.properties\")); } } }

return prop; }}

View Code

import java.util.Properties;

/**

* 单例设计模式,⽅式⼀: 饿汉式单例 * @author ThinkPad * */

public class PropertyHolderHungery {

private static Properties prop = new Properties();

static { try {

prop.load(PropertyHolderHungery.class.getClassLoader().getResourceAsStream(\"collect.properties\")); } catch (Exception e) { } }

public static Properties getProps() throws Exception { return prop; }}

View Code

9、总结hdfs有服务端和客户端;服务端:

  成员:namenode 管理元数据,datanode存储数据

  配置:需要指定使⽤的⽂件系统(默认的配置在core-default.xml,为本地⽂件系统,需要修改服务器core-site.xml修改为hdfs⽂件系统,并指定namenode),namenode和datanode的

⼯作⽬录(服务器的默认配置在hdfs-default.xml中,默认是在/temp下,需要就该hdfs-site.xml来覆盖默认值。);  细节:第⼀次启动集群时需要格式化namenode客户端:

  形式:⽹页端,命令⾏,java客户端api;客户端可以运⾏在任何地⽅。

  功能:指定上传⽂件的信息报括切块⼤⼩(hdfs-default.xml中默认值128m,可以在hdfs-site.xml中修改,也可以咋java api 中创建客户端对象的时候指定,总之由客户端来指定),副本数量(hdfs-default.xml中默认值3,同样可以修改覆盖);完成hdfs中⽂件的系列操作,如上传,下载

虽然服务端和客户端的共⽤配置 core-default.xml core-site.xml;hdfs-default.xml hdfs-site.xml,但是不同的程序所需要的参数不同,只不过为了⽅便,所有参数都写在⼀个⽂件中了。即是在服务器的hdfs-site.xml中配置了切块⼤⼩和副本数量,服务器的namenode和datanode根本不关⼼也不使⽤这些参数,只有启动服务器上的命令⾏客户端时,该参数才可能起作⽤。

因篇幅问题不能全部显示,请点此查看更多更全内容