主要内容:hdfs的整体运⾏机制,DATANODE存储⽂件块的观察,hdfs集群的搭建与配置,hdfs命令⾏客户端常见命令;业务系统中⽇志⽣成机制,HDFS的java客户端api基本使⽤。
1、什么是⼤数据基本概念《数据处理》
在互联⽹技术发展到现今阶段,⼤量⽇常、⼯作等事务产⽣的数据都已经信息化,⼈类产⽣的数据量相⽐以前有了爆炸式的增长,以前的传统的数据处理技术已经⽆法胜任,需求催⽣技术,⼀套⽤来处理海量数据的软件⼯具应运⽽⽣,这就是⼤数据!
处理海量数据的核⼼技术:海量数据存储:分布式海量数据运算:分布式
⼤数据的海量数据的存储和运算,核⼼技术就是分布式。
这些核⼼技术的实现是不需要⽤户从零开始造轮⼦的存储和运算,都已经有⼤量的成熟的框架来⽤
存储框架:
HDFS——分布式⽂件存储系统(HADOOP中的存储框架)HBASE——分布式数据库系统
KAFKA——分布式消息缓存系统(实时流式数据处理场景中应⽤⼴泛)
⽂件系统中的数据以⾮结构化居多,没有直观的结构,数据库中的信息多以表的形式存在,具有结构化,存在规律;查询的时候⽂本⽂件只能⼀⾏⼀⾏扫描,⽽数据库效率⾼很多,可以利⽤sql查询语法,数据库在存和取⽅便的多。
数据库和⽂件系统相⽐,数据库相当于在特定的⽂件系统上的软件封装。其实HBASE就是对HDFS的进⼀层封装,它的底层⽂件系统就是HDFS。
分布式消息缓存系统,既然是分布式,那就意味着横跨很多机器,意味着容量可以很⼤。和前两者相⽐它的数据存储形式是消息(不是表,也不是⽂件),消息可以看做有固定格式的⼀条数据,⽐如消息头,消息体等,消息体可以是json,数据库的⼀条记录,⼀个序列化对象等。消息最终存放在kafaka内部的特定的⽂件系统⾥。
运算框架:(要解决的核⼼问题就是帮⽤户将处理逻辑在很多机器上并⾏)MAPREDUCE—— 离线批处理/HADOOP中的运算框架SPARK —— 离线批处理/实时流式计算STORM —— 实时流式计算
离线批处理:数据是静态的,⼀次处理⼀⼤批数据。实时流式:数据在源源不断的⽣成,边⽣成,边计算
这些运算框架的思想都差不多,特别是mapreduce和spark,简单来看spark是对mapreduce的进⼀步封装;
运算框架和存储框架之间没有强耦合关系,spark可以读HDFS,HBASE,KAFKA⾥的数据,当然需要存储框架提供访问接⼝。
辅助类的⼯具(解放⼤数据⼯程师的⼀些繁琐⼯作):
HIVE —— 数据仓库⼯具:可以接收sql,翻译成mapreduce或者spark程序运⾏FLUME——数据采集SQOOP——数据迁移
ELASTIC SEARCH —— 分布式的搜索引擎
flume⽤于⾃动采集数据源机器上的数据到⼤数据集群中。
HIVE看起来像⼀个数据库,但其实不是,Hive中存了⼀些需要分析的数据,然后在直接写sql进⾏分析,hive接收sql,翻译成mapreduce或者spark程序运⾏;hive本质是mapreduce或spark,我们只需要写sql逻辑⽽不是mapreduce逻辑,Hive⾃动完成对sql的翻译,⽽且还是在海量数据集上。.......
换个⾓度说,⼤数据是:1、有海量的数据
2、有对海量数据进⾏挖掘的需求
3、有对海量数据进⾏挖掘的软件⼯具(hadoop、spark、storm、flink、tez、impala......)
⼤数据在现实⽣活中的具体应⽤数据处理的最典型应⽤:公司的产品运营情况分析
电商推荐系统:基于海量的浏览⾏为、购物⾏为数据,进⾏⼤量的算法模型的运算,得出各类推荐结论,以供电商⽹站页⾯来为⽤户进⾏商品推荐
精准⼴告推送系统:基于海量的互联⽹⽤户的各类数据,统计分析,进⾏⽤户画像(得到⽤户的各种属性标签),然后可以为⼴告主进⾏有针对性的精准的⼴告投放
2、什么是hadoophadoop中有3个核⼼组件:
分布式⽂件系统:HDFS —— 实现将⽂件分布式存储在很多的服务器上分布式运算编程框架:MAPREDUCE —— 实现在很多机器上分布式并⾏运算
分布式资源调度平台:YARN —— 帮⽤户调度⼤量的mapreduce程序,并合理分配运算资源
3、hdfs整体运⾏机制hdfs:分布式⽂件系统
hdfs有着⽂件系统共同的特征:1、有⽬录结构,顶层⽬录是: /2、系统中存放的就是⽂件
3、系统可以提供对⽂件的:创建、删除、修改、查看、移动等功能
hdfs跟普通的单机⽂件系统有区别:
1、单机⽂件系统中存放的⽂件,是在⼀台机器的操作系统中2、hdfs的⽂件系统会横跨N多的机器
3、单机⽂件系统中存放的⽂件,是在⼀台机器的磁盘上
4、hdfs⽂件系统中存放的⽂件,是落在n多机器的本地单机⽂件系统中(hdfs是⼀个基于linux本地⽂件系统之上的⽂件系统)
hdfs的⼯作机制:
1、客户把⼀个⽂件存⼊hdfs,其实hdfs会把这个⽂件切块后,分散存储在N台linux机器系统中(负责存储⽂件块的⾓⾊:data node)<准确来说:切块的⾏为是由客户端决定的>2、⼀旦⽂件被切块存储,那么,hdfs中就必须有⼀个机制,来记录⽤户的每⼀个⽂件的切块信息,及每⼀块的具体存储机器(负责记录块信息的⾓⾊是:name node)3、为了保证数据的安全性,hdfs可以将每⼀个⽂件块在集群中存放多个副本(到底存⼏个副本,是由当时存⼊该⽂件的客户端指定的)
综述:⼀个hdfs系统,由⼀台运⾏了namenode的服务器,和N台运⾏了datanode的服务器组成!
4、搭建hdfs分布式集群4.1 hdfs集群组成结构:4.2 安装hdfs集群的具体步骤:4.2.1、⾸先需要准备N台linux服务器学习阶段,⽤虚拟机即可!
先准备4台虚拟机:1个namenode节点 + 3 个datanode 节点
4.2.2、修改各台机器的主机名和ip地址主机名:hdp-01 对应的ip地址:192.168.33.11主机名:hdp-02 对应的ip地址:192.168.33.12主机名:hdp-03 对应的ip地址:192.168.33.13主机名:hdp-04 对应的ip地址:192.168.33.14
4.2.3、从windows中⽤CRT软件进⾏远程连接
在windows中将各台linux机器的主机名配置到的windows的本地域名映射⽂件中:c:/windows/system32/drivers/etc/hosts
192.168.33.11 hdp-01192.168.33.12 hdp-02192.168.33.13 hdp-03192.168.33.14 hdp-04⽤crt连接上后,修改⼀下crt的显⽰配置(字号,编码集改为UTF-8):
4.2.3、配置linux服务器的基础软件环境
防⽕墙
关闭防⽕墙:service iptables stop 关闭防⽕墙⾃启: chkconfig iptables off
安装jdk:(hadoop体系中的各软件都是java开发的) 1) 利⽤alt+p 打开sftp窗⼝,然后将jdk压缩包拖⼊sftp窗⼝
2) 然后在linux中将jdk压缩包解压到/root/apps 下 3) 配置环境变量:JAVA_HOME PATH vi /etc/profile 在⽂件的最后,加⼊:export JAVA_HOME=/root/apps/jdk1.8.0_60export PATH=$PATH:$JAVA_HOME/bin
4) 修改完成后,记得 source /etc/profile使配置⽣效
5) 检验:在任意⽬录下输⼊命令: java -version 看是否成功执⾏ 6) 将安装好的jdk⽬录⽤scp命令拷贝到其他机器
7) 将/etc/profile配置⽂件也⽤scp命令拷贝到其他机器并分别执⾏source命令
集群内主机的域名映射配置在hdp-01上,vi /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4::1 localhost localhost.localdomain localhost6 localhost6.localdomain6192.168.33.11 hdp-01192.168.33.12 hdp-02192.168.33.13 hdp-03192.168.33.14 hdp-04然后,将hosts⽂件拷贝到集群中的所有其他机器上scp /etc/hosts hdp-02:/etc/scp /etc/hosts hdp-03:/etc/scp /etc/hosts hdp-04:/etc/
补如果在执⾏scp命令的时候,提⽰没有scp命令,则可以配置⼀个本地yum源
来安装充1、先在虚拟机中配置cdrom为⼀个centos的安装镜像iso⽂件提⽰:2、在linux系统中将光驱挂在到⽂件系统中(某个⽬录)
3、mkdir /mnt/cdrom
4、mount -t iso9660 -o loop /dev/cdrom /mnt/cdrom5、检验挂载是否成功: ls /mnt/cdrom6、3、配置yum的仓库地址配置⽂件
7、yum的仓库地址配置⽂件⽬录: /etc/yum.repos.d8、先将⾃带的仓库地址配置⽂件批量更名:
9、然后,拷贝⼀个出来进⾏修改
10、修改完配置⽂件后,再安装scp命令:11、yum install openssh-clients -y
4.2.4、安装hdfs集群
1、上传hadoop安装包到hdp-01bin⽂件为hadoop功能命令,sbin中为集群管理命令。
2、修改配置⽂件 要点提⽰核⼼配置参数:
1) 指定hadoop的默认⽂件系统为:hdfs2) 指定hdfs的namenode节点为哪台机器3) 指定namenode软件存储元数据的本地⽬录4) 指定datanode软件存放⽂件块的本地⽬录
hadoop的配置⽂件在:/root/apps/hadoop安装⽬录/etc/hadoop/
hadoop中的其他组件如mapreduce,yarn等,这些组将会去读数据,指定hadoop的默认⽂件系统为:hdfs,就是告诉这些组件去hdfs中读数据;该项配置意味dadoop中的组件可以访问各种⽂件系统。
若不指定数据的存放⽬录,hadoop默认将数据存放在/temp下。
可以参考官⽹的默认配置信息。1) 修改hadoop-env.shexport JAVA_HOME=/root/apps/jdk1.8.0_602) 修改core-site.xml
3) 修改hdfs-site.xml配置namenode和datanode的⼯作⽬录,添加secondary name node。
4) 拷贝整个hadoop安装⽬录到其他机器 scp -r /root/apps/hadoop-2.8.1 hdp-02:/root/apps/ scp -r /root/apps/hadoop-2.8.1 hdp-03:/root/apps/ scp -r /root/apps/hadoop-2.8.1 hdp-04:/root/apps/
5) 启动HDFS
所谓的启动HDFS,就是在对的机器上启动对的软件
要点要运⾏hadoop的命令,需要在linux环境中配置HADOOP_HOME和PATH环境变量vi /etc/profile
提⽰:
export JAVA_HOME=/root/apps/jdk1.8.0_60export HADOOP_HOME=/root/apps/hadoop-2.8.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
⾸先,初始化namenode的元数据⽬录
要在hdp-01上执⾏hadoop的⼀个命令来初始化namenode的元数据存储⽬录hadoop namenode -format
创建⼀个全新的元数据存储⽬录 ⽣成记录元数据的⽂件fsimage
⽣成集群的相关标识:如:集群id——clusterID
该步骤叫做namenode的初始化也叫格式化,本质是建⽴namenode运⾏所需要的⽬录以及⼀些必要的⽂件,所以该操作⼀般只在集群第⼀次启动之前执⾏。
然后,启动namenode进程(在hdp-01上)hadoop-daemon.sh start namenode
启动完后,⾸先⽤jps查看⼀下namenode的进程是否存在
namenode就是⼀个java软件,我们知道启动⼀个java软件需要主类的main⽅法 java xxx.java - 若⼲参数,处于⽅便的考虑,hadoop中提供了⼀个通⽤的软件启动脚本hadoop-daemon.sh,脚本可以接受参数,专门⽤来启动hadoop中的软件。
可以看到namenode在监听两个端⼝,9000⽤来和客户端通信(9000为RPC端⼝号,内部进程之间互相通信的端⼝,datanode和namenode的通信),接受hdfs客户端的请求,50070是web服务端⼝,也就是说namenode内置⼀个web服务器,http客户端可以通过次端⼝发送请求。然后,在windows中⽤浏览器访问namenode提供的web端⼝:50070然后,启动众datanode们(在任意地⽅)hadoop-daemon.sh start datanode
下图是datanode的⼀下信息展⽰,可以看到datanode内部通信的端⼝号是50010,⽽且datanode也提供了问访问端⼝50075.
6) ⽤⾃动批量启动脚本来启动HDFShdfs其实就是⼀堆java软件,我们可以⾃⼰⼿动hadoop-daemon.sh逐个启动,也可以使⽤hadoop提供的批量启动脚本。1) 先配置hdp-01到集群中所有机器(包含⾃⼰)的免密登陆
2) 配完免密后,可以执⾏⼀次 ssh 0.0.0.0
3) 修改hadoop安装⽬录中/etc/hadoop/slaves(把需要启动datanode进程的节点列⼊)
hdp-01hdp-02hdp-03hdp-04core-site.xml中配置过namenode,但是需要批量启动那些datanode呢,该⽂件/etc/hadoop/slaves的配置就是解决这个问题的,该⽂件就是给启动脚本看的。4) 在hdp-01上⽤脚本:start-dfs.sh 来⾃动启动整个集群5) 如果要停⽌,则⽤脚本:stop-dfs.sh
start-dfs.sh、stop-dfs.sh会启动、关闭namenode,datanode和secondnamenode当然你也可以⾃⼰写脚本来做上述的事情 ,如下所⽰。
5、hdfs的客户端操作hdfs装好之后,接下来的⼯作就是hdfs⾥传东西,去东西,由客户端来完成。
5.1、客户端的理解hdfs的客户端有多种形式:1、⽹页形式2、命令⾏形式
3、客户端在哪⾥运⾏,没有约束,只要运⾏客户端的机器能够跟hdfs集群联⽹们
对于客户端来讲,hdfs是⼀个整体,⽹页版的客户端主要是⽤来查看hdfs信息的,可以创建⽬录,但是需要权限
命令⾏客户端
bin命令中的 hadoop 和 hdfs 都可以启动 hdfs 客户端,hadoop和hdfs都是脚本,都会去启动⼀个hdfs的java客户端。java客户端在安装包的jar包中./hadoop fs -ls /
表⽰hadoop要访问hdfs,该脚本就会去启动hdfs客户端,客户端可以接收参数,⽐如查看hdfs根⽬录。
⽂件的切块⼤⼩和存储的副本数量,都是由客户端决定!所谓的由客户端决定,是通过配置参数来定的
hdfs的客户端会读以下两个参数,来决定切块⼤⼩(默认128M)、副本数量(默认3):切块⼤⼩的参数: dfs.blocksize副本数量的参数: dfs.replication
如果使⽤命令⾏客户端时,上⾯两个参数应该配置在客户端机器的hadoop⽬录中的hdfs-site.xml中配置,(命令⾏客户端本质就是启动了⼀个java客户端,这个客户端在启动的时候会将它依赖的所有jar包加⼊classpath中,客户端会从jar包中,加载xx-default.xml来获得默认的配置⽂件,也可以在hadoop/etc/xxx-site.xml中配置具体的参数来覆盖默认值。此时的/etc下的配置⽂件就是客户⾃定义的配置⽂件,也会被java客户端加载【客户端可以运⾏在任何地⽅】);当然也可以在具体代码中指定,见6节中的核⼼代码
5.1.1、上传过程下图为datanode中的数据,.meta是该block的校验和信息。我们可以通过linux cat命令将两个块合并,会发现与原来的⽂件是⼀样的。
5.1.2、下载过程客户端⾸先回去namenode上去查找,有没有请求的hdfs路径下的⽂件,有的话都有该⽂件被切割成⼏块,每块有⼏个副本,这些副本都存放在集群中的哪些机器上,然后去存放了第⼀块数据的某⼀台机器上去下载第⼀块数据,将数据追加到本地,然后去下载下⼀块数据,继续追加到本地⽂件,知道下载完所有的块。
5.2、hdfs客户端的常⽤操作命令1、上传⽂件到hdfs中hadoop fs -put /本地⽂件 /aaa
2、下载⽂件到客户端本地磁盘
hadoop fs -get /hdfs中的路径 /本地磁盘⽬录
3、在hdfs中创建⽂件夹hadoop fs -mkdir -p /aaa/xxx
4、移动hdfs中的⽂件(更名)
hadoop fs -mv /hdfs的路径1 /hdfs的另⼀个路径2
复制hdfs中的⽂件到hdfs的另⼀个⽬录hadoop fs -cp /hdfs路径_1 /hdfs路径_2
5、删除hdfs中的⽂件或⽂件夹hadoop fs -rm -r /aaa
6、查看hdfs中的⽂本⽂件内容hadoop fs -cat /demo.txthadoop fs -tail /demo.txthadoop fs -tail -f /demo.txthadoop fs -text /demo.txt
7、查看hdfs⽬录下有哪些⽂件hadoop fs –ls /
8、追加本地⽂件到hdfs中的⽂件
hadoop fs -appendToFile 本地路径 /hdfs路径
9、权限修改
hadoop fs -chmod username1:usergroup1 /hdfs路径
要说明的是,hdfs中的⽤户和⽤户组这是⼀个名字称呼,与linux不⼀样,linux中不能将选线分配给⼀个不存在的⽤户。
可以查看hadoop fs 不带任何参数,来查看hdfs所⽀持的命令
Usage: hadoop fs [generic options]
[-appendToFile [-chgrp [-R] GROUP PATH...] [-chmod [-R] [-copyFromLocal [-f] [-p] [-l] [-d] [-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] [-createSnapshot [-du [-s] [-h] [-x] [-find [-get [-f] [-p] [-ignoreCrc] [-crc] [-getfattr [-R] {-n name | -d} [-e en] [-getmerge [-nl] [-skip-empty-file] [-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [ [-moveFromLocal [-put [-f] [-p] [-l] [-d] [-renameSnapshot [-setfacl [-R] [{-b|-k} {-m|-x [-setfattr {-n name [-v value] | -x name} [-test -[defsz] [-text [-ignoreCrc] [-truncate [-w] 6、hdfs的java客户端编程HDFS客户端编程应⽤场景:数据采集业务系统中⽇志⽣成机制 数据采集程序其实就是通过对java客户端编程,将数据不断的上传到hdfs。在windows开发环境中做⼀些准备⼯作: 1、在windows的某个路径中解压⼀份windows版本的hadoop安装包 2、将解压出的hadoop⽬录配置到windows的环境变量中:HADOOP_HOME 原因:若不配置环境变量,会在下载hdfs⽂件是出错,是由于使⽤hadoop的FileSystem保存⽂件到本地的时候出于效率的考虑,会使⽤hadoop安装包中的c语⾔库,显然没有配置hadoop环境变量时是找不到该c语⾔类库中的⽂件的;然⽽上传⽂件到hdfs没有类似问题; 6.1、核⼼代码1、将hdfs客户端开发所需的jar导⼊⼯程(jar包可在hadoop安装包中找到common和hdfs)2、写代码 6.1.1、获取hdfs客户端要点:要对hdfs中的⽂件进⾏操作,代码中⾸先需要获得⼀个hdfs的客户端对象Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(\"hdfs://hdp-01:9000\"),conf,\"root\"); 完整代码如下: /** * Configuration参数对象的机制: * 构造时,会加载jar包中的默认配置 xx-default.xml(core-default.xmlhdfs-default.xml) * 再加载 ⽤户配置xx-site.xml ,覆盖掉默认参数 * 构造完成之后,还可以conf.set(\"p\,会再次覆盖⽤户配置⽂件中的参数值 */ // new Configuration()会从项⽬的classpath中加载core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等⽂件 Configuration conf = new Configuration(); // 指定本客户端上传⽂件到hdfs时需要保存的副本数为:2 conf.set(\"dfs.replication\ // 指定本客户端上传⽂件到hdfs时切块的规格⼤⼩:64M conf.set(\"dfs.blocksize\ // 构造⼀个访问指定HDFS系统的客户端对象: 参数1:——HDFS系统的URI,参数2:——客户端要特别指定的参数,参数3:客户端的⾝份(⽤户名) FileSystem fs = FileSystem.get(new URI(\"hdfs://hdp-01:9000/\"), conf, \"root\"); // 上传⼀个⽂件到HDFS中 fs.copyFromLocalFile(new Path(\"D:/install-pkgs/hbase-1.2.1-bin.tar.gz\"), new Path(\"/aaa/\")); fs.close();View Code 6.1.2、对⽂件进⾏操作上传、下载⽂件;⽂件夹的创建和删除、⽂件的移动和复制、查看⽂件夹和⽂件等。3、利⽤fs对象的⽅法进⾏⽂件操作⽅法均与命令⾏⽅法对应,⽐如:上传⽂件 fs.copyFromLocalFile(new Path(\"本地路径\"),new Path(\"hdfs的路径\")); 下载⽂件 fs.copyToLocalFile(new Path(\"hdfs的路径\"),new Path(\"本地路径\")) 对⽂件的增删改查如下,对⽂件数据的操作后续介绍。 FileSystem fs = null; @Before public void init() throws Exception{ Configuration conf = new Configuration(); conf.set(\"dfs.replication\ conf.set(\"dfs.blocksize\ fs = FileSystem.get(new URI(\"hdfs://hdp-01:9000/\"), conf, \"root\"); } /** * 从HDFS中下载⽂件到客户端本地磁盘 * @throws IOException * @throws IllegalArgumentException */ @Test public void testGet() throws IllegalArgumentException, IOException{ fs.copyToLocalFile(new Path(\"/hdp20-05.txt\"), new Path(\"f:/\")); fs.close(); } /** * 在hdfs内部移动⽂件\\修改名称 */ @Test public void testRename() throws Exception{ fs.rename(new Path(\"/install.log\"), new Path(\"/aaa/in.log\")); fs.close(); } /** * 在hdfs中创建⽂件夹 */ @Test public void testMkdir() throws Exception{ fs.mkdirs(new Path(\"/xx/yy/zz\")); fs.close(); } /** * 在hdfs中删除⽂件或⽂件夹 */ @Test public void testRm() throws Exception{ fs.delete(new Path(\"/aaa\"), true); fs.close(); } /** * 查询hdfs指定⽬录下的⽂件信息 */ @Test public void testLs() throws Exception{ // 只查询⽂件的信息,不返回⽂件夹的信息 RemoteIterator while(iter.hasNext()){ LocatedFileStatus status = iter.next(); System.out.println(\"⽂件全路径:\"+status.getPath()); System.out.println(\"块⼤⼩:\"+status.getBlockSize()); System.out.println(\"⽂件长度:\"+status.getLen()); System.out.println(\"副本数量:\"+status.getReplication()); System.out.println(\"块信息:\"+Arrays.toString(status.getBlockLocations())); System.out.println(\"--------------------------------\"); } fs.close(); } /** * 查询hdfs指定⽬录下的⽂件和⽂件夹信息 */ @Test public void testLs2() throws Exception{ FileStatus[] listStatus = fs.listStatus(new Path(\"/\")); for(FileStatus status:listStatus){ System.out.println(\"⽂件全路径:\"+status.getPath()); System.out.println(status.isDirectory()?\"这是⽂件夹\":\"这是⽂件\"); System.out.println(\"块⼤⼩:\"+status.getBlockSize()); System.out.println(\"⽂件长度:\"+status.getLen()); System.out.println(\"副本数量:\"+status.getReplication()); System.out.println(\"--------------------------------\"); } fs.close(); }View Code 6.1.3、对⽂件数据进⾏操作 同过客户端使⽤open打开流对象来读取hdfs中⽂件的具体数据,包括指定偏移量来读取特定范围的数据;通过客户端向hdfs⽂件追加数据。 /** * 读取hdfs中的⽂件的内容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path(\"/test.txt\")); BufferedReader br = new BufferedReader(new InputStreamReader(in, \"utf-8\")); String line = null; while ((line = br.readLine()) != null) { System.out.println(line); } br.close(); in.close(); fs.close(); } /** * 读取hdfs中⽂件的指定偏移量范围的内容 * * * 作业题:⽤本例中的知识,实现读取⼀个⽂本⽂件中的指定BLOCK块中的所有数据 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testRandomReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path(\"/xx.dat\")); // 将读取的起始位置进⾏指定 in.seek(12); // 读16个字节 byte[] buf = new byte[16]; in.read(buf); System.out.println(new String(buf)); in.close(); fs.close(); }View Code 写数据,create提供了丰富的重载函数,轻松实现覆盖,追加,以及指定缓存⼤⼩,副本数量等等信息。 /** * 往hdfs中的⽂件写内容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testWriteData() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path(\"/zz.jpg\"), false); // D:\\images\\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg FileInputStream in = new FileInputStream(\"D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg\"); byte[] buf = new byte[1024]; int read = 0; while ((read = in.read(buf)) != -1) { out.write(buf,0,read); } in.close(); out.close(); fs.close(); }View Code 7、HDFS实例hdfs版本wordcount程序。任务描述: 1、从hdfs⽂件中读取数据,每次读取⼀⾏数据; 2、将数据交给具体的单词统计业务去作业(使⽤⾯向接⼝编程,当业务逻辑改变时,⽆需修改主程序代码);3、并将该⾏数据产⽣的结果存⼊缓存中(可以⽤hashmap模拟)数据采集设计: 1、流程 启动⼀个定时任务:——定时探测⽇志源⽬录——获取需要采集的⽂件 ——移动这些⽂件到⼀个待上传临时⽬录 ——遍历待上传⽬录中各⽂件,逐⼀传输到HDFS的⽬标路径,同时将传输完成的⽂件移动到备份⽬录启动⼀个定时任务: ——探测备份⽬录中的备份数据,检查是否已超出最长备份时长,如果超出,则删除 2、规划各种路径 ⽇志源路径: d:/logs/accesslog/待上传临时⽬录: d:/logs/toupload/备份⽬录: d:/logs/backup/⽇期/HDFS存储路径: /logs/⽇期 HDFS中的⽂件的前缀:access_log_HDFS中的⽂件的后缀:.log将路径配置写⼊属性⽂件 MAPPER_CLASS=cn.edu360.hdfs.wordcount.CaseIgnorWcountMapperINPUT_PATH=/wordcount/input OUTPUT_PATH=/wordcount/output2 View Code 主程序代码⽰例: import java.io.BufferedReader;import java.io.InputStreamReader;import java.net.URI; import java.util.HashMap;import java.util.Map.Entry;import java.util.Properties;import java.util.Set; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus;import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; public class HdfsWordcount { public static void main(String[] args) throws Exception{ /** * 初始化⼯作 */ Properties props = new Properties(); props.load(HdfsWordcount.class.getClassLoader().getResourceAsStream(\"job.properties\")); Path input = new Path(props.getProperty(\"INPUT_PATH\")); Path output = new Path(props.getProperty(\"OUTPUT_PATH\")); Class> mapper_class = Class.forName(props.getProperty(\"MAPPER_CLASS\")); Mapper mapper = (Mapper) mapper_class.newInstance(); Context context = new Context(); /** * 处理数据 */ FileSystem fs = FileSystem.get(new URI(\"hdfs://hdp-01:9000\"), new Configuration(), \"root\"); RemoteIterator while(iter.hasNext()){ LocatedFileStatus file = iter.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = null; // 逐⾏读取 while ((line = br.readLine()) != null) { // 调⽤⼀个⽅法对每⼀⾏进⾏业务处理 mapper.map(line, context); } br.close(); in.close(); } /** * 输出结果 */ HashMap