从Resource Manager中获取一个新的Application ID

前提:有一个hadoop集群,并且拷贝core-site.xml,hdfs-site.xml,mapred-site.xml,yarn-site.xml到classpath下,可以使src/main/resources

 

1.获取一个GetNewApplicationRequest,实例是protobuf的类GetNewApplicationRequestPBImpl,未来会支持其他序列化方式。序列化方式决定了RPC工厂,产生哪种可序列化类。

Records.newRecord,就是实例化一个protobuf类,注意如果自己写代码,对protobuf的后缀命名有一些规则,必须是"PBImpl"结尾,package命名必须以"impl.pb";结尾,否则找不到对应的protobuf class

GetNewApplicationRequest request = Records
                .newRecord(GetNewApplicationRequest.class);
Configuration conf = new Configuration();

 

2.获取resouceManager所在的ip和端口

InetSocketAddress rmAddress = conf.getSocketAddr(
                YarnConfiguration.RM_ADDRESS,
                YarnConfiguration.DEFAULT_RM_ADDRESS,
                YarnConfiguration.DEFAULT_RM_PORT);

 

3.新建一个YarnRPC实例,默认用的是该类:org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC,并且获取

ApplicationClientProtocol实例,默认类是ApplicationClientProtocolPBClientImpl

紧接着调用获取一个新的appID

// get Yarn RPC
        YarnRPC rpc = YarnRPC.create(conf);
        // get a proxy from yarn rpc, yarn rpc is a factory, can produce some protocol
        rmClientProtocol = (ApplicationClientProtocol) (rpc.getProxy(
                ApplicationClientProtocol.class, rmAddress, conf));
        // get response from yarn rpc proxy with sending the GetNewApplicationRequest
        // which contains a protobuf object.
        GetNewApplicationResponse newApp = rmClientProtocol
                .getNewApplication(request);

        System.err.println("get a new application id:"
                + newApp.getApplicationId());

 

具体proxy里面做了如何初始化哪些类,还需要进一步研究。

总结就是目前hadoop支持Protobuf作为默认序列化框架,目前YARN 的RPC都采用的是protobuf的工厂。

 

下面是完整代码

package com.jarvis.yarnapp;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;

public class ZJXClient {
    private ApplicationClientProtocol rmClientProtocol;

    public void run() throws YarnException, IOException {
        
        // get a request for communicate with yarn
        GetNewApplicationRequest request = Records
                .newRecord(GetNewApplicationRequest.class);
        Configuration conf = new Configuration();
        // get resouce manager address
        InetSocketAddress rmAddress = conf.getSocketAddr(
                YarnConfiguration.RM_ADDRESS,
                YarnConfiguration.DEFAULT_RM_ADDRESS,
                YarnConfiguration.DEFAULT_RM_PORT);

        // get Yarn RPC
        YarnRPC rpc = YarnRPC.create(conf);
        // get a proxy from yarn rpc, yarn rpc is a factory, can produce some protocol
        rmClientProtocol = (ApplicationClientProtocol) (rpc.getProxy(
                ApplicationClientProtocol.class, rmAddress, conf));
        // get response from yarn rpc proxy with sending the GetNewApplicationRequest
        // which contains a protobuf object.
        GetNewApplicationResponse newApp = rmClientProtocol
                .getNewApplication(request);

        System.err.println("get a new application id:"
                + newApp.getApplicationId());
    }

    public static void main(String[] args) throws YarnException, IOException {
        ZJXClient zjxClient = new ZJXClient();
        zjxClient.run();
    }
}

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。