クラスタに接続する的な?
pig-0.5.0 より抜粋。
- HODが入って無いとダメ。
- system.propertiesから必要なパラメータを抜くので注意。
- トリガは、pig -o ってな感じ。
- [#PIG-682] Fix the ssh tunneling code - ASF JIRA
org.apache.pig.backend.hadoop.executionengine.HExecutionEngine
private String[] doHod(String server, Properties properties) throws ExecException { if (hodMapRed != null) { return new String[] {hodHDFS, hodMapRed}; } try { // first, create temp director to store the configuration hodConfDir = createTempDir(server); //jz: fallback to systemproperty cause this not handled in Main hodParams = new StringBuilder(properties.getProperty( "hod.param", System.getProperty("hod.param", ""))); // get the number of nodes out of the command or use default int nodes = getNumNodes(hodParams); // command format: hod allocate - d <cluster_dir> -n <number_of_nodes> <other params> String[] fixedCmdArray = new String[] { "hod", "allocate", "-d", hodConfDir, "-n", Integer.toString(nodes) }; String[] extraParams = hodParams.toString().split(" "); String[] cmdarray = new String[fixedCmdArray.length + extraParams.length]; System.arraycopy(fixedCmdArray, 0, cmdarray, 0, fixedCmdArray.length); System.arraycopy(extraParams, 0, cmdarray, fixedCmdArray.length, extraParams.length); log.info("Connecting to HOD..."); log.debug("sending HOD command " + cmdToString(cmdarray)); // setup shutdown hook to make sure we tear down hod connection Runtime.getRuntime().addShutdownHook(new ShutdownThread()); runCommand(server, cmdarray, true); // print all the information provided by HOD try { BufferedReader br = new BufferedReader(new InputStreamReader(hodProcess.getErrorStream())); String msg; while ((msg = br.readLine()) != null) log.info(msg); br.close(); } catch(IOException ioe) {} // for remote connection we need to bring the file locally if (!server.equals(LOCAL)) hodConfDir = copyHadoopConfLocally(server); String hdfs = null; String mapred = null; String hadoopConf = hodConfDir + "/hadoop-site.xml"; log.info ("Hadoop configuration file: " + hadoopConf); JobConf jobConf = new JobConf(hadoopConf); jobConf.addResource("pig-cluster-hadoop-site.xml"); //the method below alters the properties object by overriding the //hod properties with the values from properties and recomputing //the properties recomputeProperties(jobConf, properties); hdfs = properties.getProperty(FILE_SYSTEM_LOCATION); if (hdfs == null) { int errCode = 4007; String msg = "Missing fs.default.name from hadoop configuration."; throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT); } log.info("HDFS: " + hdfs); mapred = properties.getProperty(JOB_TRACKER_LOCATION); if (mapred == null) { int errCode = 4007; String msg = "Missing mapred.job.tracker from hadoop configuration"; throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT); } log.info("JobTracker: " + mapred); // this is not longer needed as hadoop-site.xml given to us by HOD // contains data in the correct format // hdfs = fixUpDomain(hdfs, properties); // mapred = fixUpDomain(mapred, properties); hodHDFS = hdfs; hodMapRed = mapred; return new String[] {hdfs, mapred}; } catch (Exception e) { int errCode = 6010; String msg = "Could not connect to HOD"; throw new ExecException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e); } }