クラスタに接続する的な?

pig-0.5.0 より抜粋。


org.apache.pig.backend.hadoop.executionengine.HExecutionEngine

    private String[] doHod(String server, Properties properties) throws ExecException {
        if (hodMapRed != null) {
            return new String[] {hodHDFS, hodMapRed};
        }
        
        try {
            // first, create temp director to store the configuration
            hodConfDir = createTempDir(server);
			
            //jz: fallback to systemproperty cause this not handled in Main
            hodParams = new StringBuilder(properties.getProperty(
                    "hod.param", System.getProperty("hod.param", "")));
            // get the number of nodes out of the command or use default
            int nodes = getNumNodes(hodParams);

            // command format: hod allocate - d <cluster_dir> -n <number_of_nodes> <other params>
            String[] fixedCmdArray = new String[] { "hod", "allocate", "-d",
                                       hodConfDir, "-n", Integer.toString(nodes) };
            String[] extraParams = hodParams.toString().split(" ");
    
            String[] cmdarray = new String[fixedCmdArray.length + extraParams.length];
            System.arraycopy(fixedCmdArray, 0, cmdarray, 0, fixedCmdArray.length);
            System.arraycopy(extraParams, 0, cmdarray, fixedCmdArray.length, extraParams.length);

            log.info("Connecting to HOD...");
            log.debug("sending HOD command " + cmdToString(cmdarray));

            // setup shutdown hook to make sure we tear down hod connection
            Runtime.getRuntime().addShutdownHook(new ShutdownThread());

            runCommand(server, cmdarray, true);

            // print all the information provided by HOD
            try {
                BufferedReader br = new BufferedReader(new InputStreamReader(hodProcess.getErrorStream()));
                String msg;
                while ((msg = br.readLine()) != null)
                    log.info(msg);
                br.close();
            } catch(IOException ioe) {}

            // for remote connection we need to bring the file locally  
            if (!server.equals(LOCAL))
                hodConfDir = copyHadoopConfLocally(server);

            String hdfs = null;
            String mapred = null;
            String hadoopConf = hodConfDir + "/hadoop-site.xml";

            log.info ("Hadoop configuration file: " + hadoopConf);

            JobConf jobConf = new JobConf(hadoopConf);
            jobConf.addResource("pig-cluster-hadoop-site.xml");

            //the method below alters the properties object by overriding the
            //hod properties with the values from properties and recomputing
            //the properties
            recomputeProperties(jobConf, properties);
            
            hdfs = properties.getProperty(FILE_SYSTEM_LOCATION);
            if (hdfs == null) {
                int errCode = 4007;
                String msg = "Missing fs.default.name from hadoop configuration.";
                throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT);
            }
            log.info("HDFS: " + hdfs);

            mapred = properties.getProperty(JOB_TRACKER_LOCATION);
            if (mapred == null) {
                int errCode = 4007;
                String msg = "Missing mapred.job.tracker from hadoop configuration";
                throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT);
            }
            log.info("JobTracker: " + mapred);

            // this is not longer needed as hadoop-site.xml given to us by HOD
            // contains data in the correct format
            // hdfs = fixUpDomain(hdfs, properties);
            // mapred = fixUpDomain(mapred, properties);
            hodHDFS = hdfs;
            hodMapRed = mapred;

            return new String[] {hdfs, mapred};
        } 
        catch (Exception e) {
            int errCode = 6010;
            String msg = "Could not connect to HOD";
            throw new ExecException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e);
        }
    }