50030不能监测到eclipse提交作业问题解决
问题描述:
当按照Hadoop实战上讲述的用eclipse提交作业,其实作业是运行在eclipse虚拟的一个云环境中,而不是真正提交到Hadoop云端运行。在50030上也看不到job的运行记录,此时的代码如下:
package com.spork.hadoop.jobutil.test; import java.io.File; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import com.spork.hadoop.jobutil.EJob; public class WordCountTest { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] inAndOut={"hdfs://localhost:9000/bin/in/1","hdfs://localhost:9000/out"}; Job job = new Job(conf, "word count"); job.setJarByClass(WordCountTest.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(inAndOut[0])); FileOutputFormat.setOutputPath(job, new Path(inAndOut[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
如果要将作业真正提交到云端,一种是用命令行打jar包,然后运行、提交作业,这方式比较笨,而程序员都是些懒人啊,能让机器做干嘛自己动手。具体实现可以参考这个博客:http://weixiaolu.iteye.com/blog/1402919
这里介绍一个更加智能的方式,其主要思路是将jar打包的工作放在了java代码中来完成。其中使用了一个工具类ejob.java。
示例代码如下:
package com.spork.hadoop.jobutil.test; import java.io.File; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import com.spork.hadoop.jobutil.EJob; public class WordCountTest { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { // Add these statements. XXX File jarFile = EJob.createTempJar("bin"); EJob.addClasspath("/usr/hadoop-1.2.1/conf"); ClassLoader classLoader = EJob.getClassLoader(); Thread.currentThread().setContextClassLoader(classLoader); Configuration conf = new Configuration(); String[] inAndOut={"hdfs://localhost:9000/bin/in/1","hdfs://localhost:9000/out"}; Job job = new Job(conf, "word count"); // And add this statement. XXX ((JobConf) job.getConfiguration()).setJar(jarFile.toString()); job.setJarByClass(WordCountTest.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(inAndOut[0])); FileOutputFormat.setOutputPath(job, new Path(inAndOut[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
ejob类的代码如下:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.spork.hadoop.jobutil; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Array; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Arrays; import java.util.Enumeration; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; public class EJob { private static ArrayList<URL> classPath = new ArrayList<URL>(); /** Unpack a jar file into a directory. */ public static void unJar(File jarFile, File toDir) throws IOException { JarFile jar = new JarFile(jarFile); try { Enumeration entries = jar.entries(); while (entries.hasMoreElements()) { JarEntry entry = (JarEntry) entries.nextElement(); if (!entry.isDirectory()) { InputStream in = jar.getInputStream(entry); try { File file = new File(toDir, entry.getName()); if (!file.getParentFile().mkdirs()) { if (!file.getParentFile().isDirectory()) { throw new IOException("Mkdirs failed to create " + file.getParentFile().toString()); } } OutputStream out = new FileOutputStream(file); try { byte[] buffer = new byte[8192]; int i; while ((i = in.read(buffer)) != -1) { out.write(buffer, 0, i); } } finally { out.close(); } } finally { in.close(); } } } } finally { jar.close(); } } /** * Run a Hadoop job jar. If the main class is not in the jar‘s manifest, then * it must be provided on the command line. */ public static void runJar(String[] args) throws Throwable { String usage = "jarFile [mainClass] args..."; if (args.length < 1) { System.err.println(usage); System.exit(-1); } int firstArg = 0; String fileName = args[firstArg++]; File file = new File(fileName); String mainClassName = null; JarFile jarFile; try { jarFile = new JarFile(fileName); } catch (IOException io) { throw new IOException("Error opening job jar: " + fileName).initCause(io); } Manifest manifest = jarFile.getManifest(); if (manifest != null) { mainClassName = manifest.getMainAttributes().getValue("Main-Class"); } jarFile.close(); if (mainClassName == null) { if (args.length < 2) { System.err.println(usage); System.exit(-1); } mainClassName = args[firstArg++]; } mainClassName = mainClassName.replaceAll("/", "."); File tmpDir = new File(System.getProperty("java.io.tmpdir")); tmpDir.mkdirs(); if (!tmpDir.isDirectory()) { System.err.println("Mkdirs failed to create " + tmpDir); System.exit(-1); } final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir); workDir.delete(); workDir.mkdirs(); if (!workDir.isDirectory()) { System.err.println("Mkdirs failed to create " + workDir); System.exit(-1); } Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { fullyDelete(workDir); } catch (IOException e) { } } }); unJar(file, workDir); classPath.add(new File(workDir + "/").toURL()); classPath.add(file.toURL()); classPath.add(new File(workDir, "classes/").toURL()); File[] libs = new File(workDir, "lib").listFiles(); if (libs != null) { for (int i = 0; i < libs.length; i++) { classPath.add(libs[i].toURL()); } } ClassLoader loader = new URLClassLoader(classPath.toArray(new URL[0])); Thread.currentThread().setContextClassLoader(loader); Class<?> mainClass = Class.forName(mainClassName, true, loader); Method main = mainClass.getMethod("main", new Class[] { Array.newInstance( String.class, 0).getClass() }); String[] newArgs = Arrays.asList(args).subList(firstArg, args.length) .toArray(new String[0]); try { main.invoke(null, new Object[] { newArgs }); } catch (InvocationTargetException e) { throw e.getTargetException(); } } /** * Delete a directory and all its contents. If we return false, the directory * may be partially-deleted. */ public static boolean fullyDelete(File dir) throws IOException { File contents[] = dir.listFiles(); if (contents != null) { for (int i = 0; i < contents.length; i++) { if (contents[i].isFile()) { if (!contents[i].delete()) { return false; } } else { // try deleting the directory // this might be a symlink boolean b = false; b = contents[i].delete(); if (b) { // this was indeed a symlink or an empty directory continue; } // if not an empty directory or symlink let // fullydelete handle it. if (!fullyDelete(contents[i])) { return false; } } } } return dir.delete(); } /** * Add a directory or file to classpath. * * @param component */ public static void addClasspath(String component) { if ((component != null) && (component.length() > 0)) { try { File f = new File(component); if (f.exists()) { URL key = f.getCanonicalFile().toURL(); if (!classPath.contains(key)) { classPath.add(key); } } } catch (IOException e) { } } } /** * Add default classpath listed in bin/hadoop bash. * * @param hadoopHome */ public static void addDefaultClasspath(String hadoopHome) { // Classpath initially contains conf dir. addClasspath(hadoopHome + "/conf"); // For developers, add Hadoop classes to classpath. addClasspath(hadoopHome + "/build/classes"); if (new File(hadoopHome + "/build/webapps").exists()) { addClasspath(hadoopHome + "/build"); } addClasspath(hadoopHome + "/build/test/classes"); addClasspath(hadoopHome + "/build/tools"); // For releases, add core hadoop jar & webapps to classpath. if (new File(hadoopHome + "/webapps").exists()) { addClasspath(hadoopHome); } addJarsInDir(hadoopHome); addJarsInDir(hadoopHome + "/build"); // Add libs to classpath. addJarsInDir(hadoopHome + "/lib"); addJarsInDir(hadoopHome + "/lib/jsp-2.1"); addJarsInDir(hadoopHome + "/build/ivy/lib/Hadoop/common"); } /** * Add all jars in directory to classpath, sub-directory is excluded. * * @param dirPath */ public static void addJarsInDir(String dirPath) { File dir = new File(dirPath); if (!dir.exists()) { return; } File[] files = dir.listFiles(); if (files == null) { return; } for (int i = 0; i < files.length; i++) { if (files[i].isDirectory()) { continue; } else { addClasspath(files[i].getAbsolutePath()); } } } /** * Create a temp jar file in "java.io.tmpdir". * * @param root * @return * @throws IOException */ public static File createTempJar(String root) throws IOException { if (!new File(root).exists()) { return null; } Manifest manifest = new Manifest(); manifest.getMainAttributes().putValue("Manifest-Version", "1.0"); final File jarFile = File.createTempFile("EJob-", ".jar", new File(System .getProperty("java.io.tmpdir"))); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { jarFile.delete(); } }); JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile), manifest); createTempJarInner(out, new File(root), ""); out.flush(); out.close(); return jarFile; } private static void createTempJarInner(JarOutputStream out, File f, String base) throws IOException { if (f.isDirectory()) { File[] fl = f.listFiles(); if (base.length() > 0) { base = base + "/"; } for (int i = 0; i < fl.length; i++) { createTempJarInner(out, fl[i], base + fl[i].getName()); } } else { out.putNextEntry(new JarEntry(base)); FileInputStream in = new FileInputStream(f); byte[] buffer = new byte[1024]; int n = in.read(buffer); while (n != -1) { out.write(buffer, 0, n); n = in.read(buffer); } in.close(); } } /** * Return a classloader based on user-specified classpath and parent * classloader. * * @return */ public static ClassLoader getClassLoader() { ClassLoader parent = Thread.currentThread().getContextClassLoader(); if (parent == null) { parent = EJob.class.getClassLoader(); } if (parent == null) { parent = ClassLoader.getSystemClassLoader(); } return new URLClassLoader(classPath.toArray(new URL[0]), parent); } }
修改之后,50030就可以监测到job的运行状况了,而且看cpu的使用状态,各个cpu的占用率也上去了。
若想更加详细了解ejob的实现机理,可以参考下面的博客。
http://www.cnblogs.com/spork/archive/2010/04/21/1717592.html
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。