如何启动zookeeper-3.3.6

发布网友 发布时间:2022-04-22 03:37

我来回答

3个回答

懂视网 时间:2022-04-14 23:28

说明: (1) 实验环境. 三台服务器:test165、test62、test63。test165同时是JobTracker和TaskTracker. 测试例子:官网自带的SSSP程序,数据是自己模拟生成。 运行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar o

说明:

(1) 实验环境.

三台服务器:test165、test62、test63。test165同时是JobTracker和TaskTracker.

测试例子:官网自带的SSSP程序,数据是自己模拟生成。

运行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsVertex -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/giraph/SSSP -of org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/giraph/output-sssp-debug-7 -w 5

(2). 为节约空间,下文中所有代码均为核心代码片段。

(3). core-site.xml中hadoop.tmp.dir的路径设为:/home/hadoop/hadooptmp

(4).写本文是多次调试完成的,故文中的JobID不一样,读者可理解为同一JobID.

(5). 后续文章也遵循上述规则。

1. org.apache.giraph.graph.GraphMapper类

Giraph中自定义org.apache.giraph.graph.GraphMapper类来继承Hadoop中的 org.apache.hadoop.mapreduce.Mapper类,覆写了setup()、map()、cleanup()和run()方法。GraphMapper类的说明如下:

This mapper that will execute the BSP graph tasks alloted to this worker. All tasks will be performed by calling the GraphTaskManager object managed by this GraphMapper wrapper classs. Since this mapper will not be passing data by key-value pairs through the MR framework, the Mapper parameter types are irrelevant, and set to Object type.

BSP的运算逻辑被封装在GraphMapper类中,其拥有一GraphTaskManager对象,用来管理Job的tasks。每个GraphMapper对象都相当于BSP中的一个计算节点(compute node)。

在GraphMapper类中的setup()方法中,创建GraphTaskManager对象并调用其setup()方法进行一些初始化工作。如下:

 @Override
 public void setup(Context context)
 throws IOException, InterruptedException {
 // Execute all Giraph-related role(s) assigned to this compute node.
 // Roles can include "master," "worker," "zookeeper," or . . . ?
 graphTaskManager = new GraphTaskManager(context);
 graphTaskManager.setup(
 DistributedCache.getLocalCacheArchives(context.getConfiguration()));
 }
map()方法为空,因为所有操作都被封装在了GraphTaskManager类中。在run()方法中调用GraphTaskManager对象的execute()方法进行BSP迭代计算。
@Override
 public void run(Context context) throws IOException, InterruptedException {
 // Notify the master quicker if there is worker failure rather than
 // waiting for ZooKeeper to timeout and delete the ephemeral znodes
 try {
 setup(context);
 while (context.nextKeyValue()) {
 graphTaskManager.execute();
 }
 cleanup(context);
 // Checkstyle exception due to needing to dump ZooKeeper failure
 } catch (RuntimeException e) {
 graphTaskManager.zooKeeperCleanup();
 graphTaskManager.workerFailureCleanup();
 }
 }

2. org.apache.giraph.graph.GraphTaskManager 类

功能:The Giraph-specific business logic for a single BSP compute node in whatever underlying type of cluster our Giraph job will run on. Owning object will provide the glue into the underlying cluster framework and will call this object to perform Giraph work.

下面讲述setup()方法,代码如下。

 /**
 * Called by owner of this GraphTaskManager on each compute node
 * @param zkPathList the path to the ZK jars we need to run the job
 */
 public void setup(Path[] zkPathList) throws IOException, InterruptedException {
 context.setStatus("setup: Initializing Zookeeper services.");
 locateZookeeperClasspath(zkPathList);
 serverPortList = conf.getZookeeperList();
 if (serverPortList == null && startZooKeeperManager()) {
 return; // ZK connect/startup failed
 }
 if (zkManager != null && zkManager.runsZooKeeper()) {
 LOG.info("setup: Chosen to run ZooKeeper...");
 }
 context.setStatus("setup: Connected to Zookeeper service " +serverPortList);
 this.graphFunctions = determineGraphFunctions(conf, zkManager);
 instantiateBspService(serverPortList, sessionMsecTimeout);
 }
依次介绍每个方法的功能:

1) locateZookeeperClasspath(zkPathList):找到ZK jar的本地副本,其路径为:/home/hadoop/hadooptmp/mapred/local/taskTracker/root/jobcache/job_201403270456_0001/jars/job.jar ,用于启动ZooKeeper服务。
2) startZooKeeperManager(),初始化和配置ZooKeeperManager。定义如下,

 /**
 * Instantiate and configure ZooKeeperManager for this job. This will
 * result in a Giraph-owned Zookeeper instance, a connection to an
 * existing quorum as specified in the job configuration, or task failure
 * @return true if this task should terminate
 */
 private boolean startZooKeeperManager()
 throws IOException, InterruptedException {
 zkManager = new ZooKeeperManager(context, conf);
 context.setStatus("setup: Setting up Zookeeper manager.");
 zkManager.setup();
 if (zkManager.computationDone()) {
 done = true;
 return true;
 }
 zkManager.onlineZooKeeperServers();
 serverPortList = zkManager.getZooKeeperServerPortString();
 return false;
 }

org.apache.giraph.zk.ZooKeeperManager 类,功能:Manages the election of ZooKeeper servers, starting/stopping the services, etc.

ZooKeeperManager类的setup()定义如下:

/**
 * Create the candidate stamps and decide on the servers to start if
 * you are partition 0.
 */
 public void setup() throws IOException, InterruptedException {
 createCandidateStamp();
 getZooKeeperServerList();
 }
createCandidateStamp()方法在 HDFS上 的_bsp/_defaultZkManagerDir/job_201403301409_0006/_task 目录下为每个task创建一个文件,文件内容为空。文件名为本机的Hostname+taskPartition,如下截图:

运行时指定了5个workers(-w 5),再加上一个master,所有上面有6个task。

getZooKeeperServerList()方法中,taskPartition为0的task会调用createZooKeeperServerList()方法创建ZooKeeper server List,也是创建一个空文件,通过文件名来描述Zookeeper servers。

createZooKeeperServerList核心代码如下:

/**
 * Task 0 will call this to create the ZooKeeper server list. The result is
 * a file that describes the ZooKeeper servers through the filename.
 */
 private void createZooKeeperServerList() throws IOException,
 InterruptedException {
 Map hostnameTaskMap = Maps.newTreeMap();
 while (true) {
 FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
 hostnameTaskMap.clear();
 if (fileStatusArray.length > 0) {
 for (FileStatus fileStatus : fileStatusArray) { 
  String[] hostnameTaskArray =
  fileStatus.getPath().getName().split(HOSTNAME_TASK_SEPARATOR);
 
  if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) {
  hostnameTaskMap.put(hostnameTaskArray[0],
  new Integer(hostnameTaskArray[1]));
  }
 }
 if (hostnameTaskMap.size() >= serverCount) {
  break;
 }
 Thread.sleep(pollMsecs);
 }
 }
 }
首先获取taskDirectory(_bsp/_defaultZkManagerDir/job_201403301409_0006/_task)目录下文件,如果当前目录下有文件,则把文件名(Hostname+taskPartition)中的Hostname和taskPartition存入到hostNameTaskMap中。扫描taskDirectory目录后,若hostNameTaskMap的size大于serverCount(等于GiraphConstants.java中的ZOOKEEPER_SERVER_COUNT变量,定义为1),就停止外层的循环。外层循环的目的是:因为taskDirectory下的文件每个task文件时多个task在分布式条件下创建的,有可能task 0在此创建server List时,别的task还没有生成后task文件。Giraph默认为每个Job启动一个ZooKeeper服务,也就是说只有一个task会启动ZooKeeper服务。

经过多次测试,task 0总是被选为ZooKeeper Server ,因为在同一进程中,扫描taskDirectory时,只有它对应的task 文件(其他task的文件还没有生成好),然后退出for循环,发现hostNameTaskMap的size等于1,直接退出while循环。那么此处就选了test162 0。

最后,创建了文件:_bsp/_defaultZkManagerDir/job_201403301409_0006/zkServerList_test162 0

onlineZooKeeperServers(),根据zkServerList_test162 0文件,Task 0 先生成zoo.cfg配置文件,使用ProcessBuilder来创建ZooKeeper服务进程,然后Task 0 再通过socket连接到ZooKeeper服务进程上,最后创建文件 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0 来标记master任务已完成。worker一直在进行循环检测master是否生成好 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0,即worker等待直到master上的ZooKeeper服务已经启动完成。

启动ZooKeeper服务的命令如下:

3) determineGraphFunctions()。

GraphTaskManager类中有CentralizedServiceMaster对象和CentralizedServiceWorker 对象,分别对应于master和worker。每个BSP compute node扮演的角色判定逻辑如下:

a) If not split master, everyone does the everything and/or running ZooKeeper.

b) If split master/worker, masters also run ZooKeeper

c) If split master/worker == true and giraph.zkList is set, the master will not instantiate a ZK instance, but will assume a quorum is already active on the cluster for Giraph to use.

该判定在GraphTaskManager 类中的静态方法determineGraphFunctions()中定义,片段代码如下:

 private static GraphFunctions determineGraphFunctions(
 ImmutableClassesGiraphConfiguration conf,
 ZooKeeperManager zkManager) {
 // What functions should this mapper do?
 if (!splitMasterWorker) {
 if ((zkManager != null) && zkManager.runsZooKeeper()) {
 functions = GraphFunctions.ALL;
 } else {
 functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER;
 }
 } else {
 if (zkAlreadyProvided) {
 int masterCount = conf.getZooKeeperServerCount();
 if (taskPartition < masterCount) {
  functions = GraphFunctions.MASTER_ONLY;
 } else {
  functions = GraphFunctions.WORKER_ONLY;
 }
 } else {
 if ((zkManager != null) && zkManager.runsZooKeeper()) {
  functions = GraphFunctions.MASTER_ZOOKEEPER_ONLY;
 } else {
  functions = GraphFunctions.WORKER_ONLY;
 }
 }
 }
 return functions;
 }

默认的,Giraph会区分master和worker。会在master上面启动zookeeper服务,不会在worker上启动ZooKeeper服务。那么Task 0 就是master+ZooKeeper,其他Tasks就是workers。

热心网友 时间:2022-04-14 20:36

如何启动zookeeper-3.3.6?

启动zookeeper-3.3.6的方法:下载安装配置zookeeper的服务器环境-创建文件-设置权限-编辑-重启即可。

具体步骤:

一、登陆linux服务器用cd 命令切换到/etc/rc.d/init.d/目录下。

二、touch zookeeper创建一个文件。

三、为文件添加可执行权限chmod +x zookeeper。

四、用vi zookeeper来编辑这个文件。

五、在zookeeper里面输入如下内容。

六、保存退出。

七、用service zookeeper start/stop来启动停止zookeeper服务。

八、使用chkconfig --add zookeeper命令吧zookeeper添加到开机启动里面。

九、使用chkconfig --list 来看看添加的zookeeper是否在里面。

十、重启即可。

热心网友 时间:2022-04-14 21:54

1、配置java环境
2、下载zookeeper3.4.6(http://zookeeper.apache.org/releases.html)
3、解压 zookeeper-3.4.6.tar.gz
4、进入conf目录,cp zoo_sample.cfg to zoo.cfg,根据你的要求进行修改

[plain] view plain copy
tickTime=2000
dataDir=/data/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888

如果是single模式下,只需要修改dataDir即可。
tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。

clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒

syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 2*2000=4 秒
server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。

除了修改 zoo.cfg 配置文件,集群模式下还要配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面就有一个数据就是 A 的值,Zookeeper 启动时会读取这个文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是那个 server。(在/data/zookeeper/目录下touch myid,vi myid 插入对应的server.id)

5、将整个 zookeeper-3.4.6 scp到其他机器上
6、启动zookeeper
在每台机器上运行 bin/zkServer.sh start
查看运行状态:bin/zkServer.sh status
Mode: leader

Mode: follower

可以看出哪台为leader了

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com