Difference between revisions of "WorkflowHadoopAdaptor"

From Gcube Wiki
Jump to: navigation, search
(Usage)
Line 101: Line 101:
 
}
 
}
 
</source>
 
</source>
 +
 +
To submit the job, firstly we will need to define the resources that are needed for this job. These resources can include one of the following types
 +
 +
*Jar - The jar file that contains the main method to execute (mandatory, exactly one resource of this type must be present)
 +
*MainClass - The class in the above jar that contains the main method (mandatory, exactly one resource of this type must be present)
 +
*Argument - Argument that must be provided to the main method (optional, one or more resources of this type can be present)
 +
*Configuration - Configuration to be provided to the application (optional, exactly one resource of this type can be present)
 +
*Property - Properties that should be made available to the application (optional, one or more resources of this type can be present)
 +
*File - Files that should be present in every node in the working directory of the executed mappers and reducers (optional, one or more resources of this type can be present)
 +
*Lib - Jars that should be in the classpath of the executed mappers and reducers (optional, one or more resources of this type can be present)
 +
*Archive - Hadoop Archives (.har) files that should be extracted in every node in the working directory of the executed mappers and reducers (optional, one or more resources of this type can be present)
 +
*Input - Input files that should be in the HDFS and are input files of the mappers (optional, one or more resources of this type can be present)
 +
*Output - Output location of the reducers products (optional, exactly one resource of this type can be present)
 +
 +
For our case, the following snippet will declare the resources we need. The first argument of the AttachedGridResource is the name that we want the resource to have once it has been moved to the grid UI node for the resources of type JDL, UserProxy, Config and InData. For the OutData resources, this is the name of the output file that will be restored when we call the glite-wms-job-output so that the engine can retrieve only the output files we want. The second argument represents the path to the resource that we want to attach and is stored in the machine that we are running the adaptor code.

Revision as of 17:58, 24 February 2010

Overview

This adaptor as part of the adaptors offered by the WorkflowEngine constructs an Execution Plan that can mediate to submit a job writen under the Map Reduce design pattern and written against the utilities offered by the Hadoop infrastructure.. After its submission the job is monitored for its status and once completed the output files are retrieved and stored in the StorageSystem. The resources that are provided and need to be moved to the Hadoop infrastructure are all transfered through the StorageSystem. They are stored once the plan is constructed and are then retrieved once the execution is started. The Hadoop infrastructure utilized resides in a cloud infrastructure with which the ExecutionEngine negotiates the resource availability.

Plan Template

The entire execution process takes place in the Hadoop UI node. This node is picked from the InformationSystem and is currently chosen randomly from all the available ones. Currently once the node has been picked, the execution cannot be moved to a different one even if there is a problem communicating with that node. The execution that takes place is a series of steps executed sequentially. These steps include the following:

  • Contact the remote node
  • Retrieval of the data stored in the StorageSystem and these include the resources marked as Archive, Configuration, Files, Libraries, Jar and Input resources
    • If some of these resources are declared as stored in the Hadoop HDFS instead of the StorageSystem, calls are made to the HDFS to retrieve the marked content
  • The Input resources are moved to the HDFS so that they are available to the MapReduce processes
  • Submit the job using the provided Jar file, specifying the Class containing the main method to execute and optionally any resources provided such as configuration, properties, files, library jars, archives and arguments
  • When the job is executed, and even if some error occurred, the standard output and standard error of the job is retrieved and stored to the StorageSystem
  • If the job terminated successfully and it is requested, retrieve the directory containing the output
    • A call to the HDFS is made to retrieve the directory, a tar.gz archive is created of this directory and stored to the StorageSystem
  • Finally remove the input and output directory is it has been requested

Highlights

Processing filters

Verbose progress filter

Taking advantage the extensible filtering capabilities of the Filters the WorkflowEngine defines an external filter that can process the verbose output of the hadoop jar command. This way, outputs of the following format can be parsed and each line read by this output is emitted back to the caller as status report. This output is printed in the standard error of the command execution. The filter is applied to the output read from the command's standard error while it is being persisted to be moved to the StorageSystem.

10/02/24 09:46:17 INFO input.FileInputFormat: Total input paths to process : 7
10/02/24 09:46:18 INFO mapred.JobClient: Running job: job_201002181242_0052
10/02/24 09:46:19 INFO mapred.JobClient:  map 0% reduce 0%
10/02/24 09:46:29 INFO mapred.JobClient:  map 28% reduce 0%
10/02/24 09:46:30 INFO mapred.JobClient:  map 57% reduce 0%
10/02/24 09:46:33 INFO mapred.JobClient:  map 71% reduce 0%
10/02/24 09:46:35 INFO mapred.JobClient:  map 100% reduce 0%
10/02/24 09:46:38 INFO mapred.JobClient:  map 100% reduce 33%
10/02/24 09:46:44 INFO mapred.JobClient:  map 100% reduce 100%
10/02/24 09:46:46 INFO mapred.JobClient: Job complete: job_201002181242_0052
10/02/24 09:46:46 INFO mapred.JobClient: Counters: 18
10/02/24 09:46:46 INFO mapred.JobClient:   Job Counters
10/02/24 09:46:46 INFO mapred.JobClient:     Launched reduce tasks=1
10/02/24 09:46:46 INFO mapred.JobClient:     Rack-local map tasks=1
10/02/24 09:46:46 INFO mapred.JobClient:     Launched map tasks=7
10/02/24 09:46:46 INFO mapred.JobClient:     Data-local map tasks=6
10/02/24 09:46:46 INFO mapred.JobClient:   FileSystemCounters
10/02/24 09:46:46 INFO mapred.JobClient:     FILE_BYTES_READ=3538615
10/02/24 09:46:46 INFO mapred.JobClient:     HDFS_BYTES_READ=6866273
10/02/24 09:46:46 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=5927736
10/02/24 09:46:46 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=1165259
10/02/24 09:46:46 INFO mapred.JobClient:   Map-Reduce Framework
10/02/24 09:46:46 INFO mapred.JobClient:     Reduce input groups=0
10/02/24 09:46:46 INFO mapred.JobClient:     Combine output records=165570
10/02/24 09:46:46 INFO mapred.JobClient:     Map input records=146199
10/02/24 09:46:46 INFO mapred.JobClient:     Reduce shuffle bytes=2388897
10/02/24 09:46:46 INFO mapred.JobClient:     Reduce output records=0
10/02/24 09:46:46 INFO mapred.JobClient:     Spilled Records=409994
10/02/24 09:46:46 INFO mapred.JobClient:     Map output bytes=10780405
10/02/24 09:46:46 INFO mapred.JobClient:     Combine input records=1094930
10/02/24 09:46:46 INFO mapred.JobClient:     Map output records=1094930
10/02/24 09:46:46 INFO mapred.JobClient:     Reduce input records=165570

Usage

The following snippets demonstrate the usage of the adaptor.

Lets assume we want to run the following Word Count example:

public class WordCount {
	public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}
 
	public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
		private IntWritable result = new IntWritable();
		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) sum += val.get();
			result.set(sum);
			context.write(key, result);
		}
	}
 
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		if (args.length != 2) {
			System.err.println("Usage: wordcount <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

To submit the job, firstly we will need to define the resources that are needed for this job. These resources can include one of the following types

  • Jar - The jar file that contains the main method to execute (mandatory, exactly one resource of this type must be present)
  • MainClass - The class in the above jar that contains the main method (mandatory, exactly one resource of this type must be present)
  • Argument - Argument that must be provided to the main method (optional, one or more resources of this type can be present)
  • Configuration - Configuration to be provided to the application (optional, exactly one resource of this type can be present)
  • Property - Properties that should be made available to the application (optional, one or more resources of this type can be present)
  • File - Files that should be present in every node in the working directory of the executed mappers and reducers (optional, one or more resources of this type can be present)
  • Lib - Jars that should be in the classpath of the executed mappers and reducers (optional, one or more resources of this type can be present)
  • Archive - Hadoop Archives (.har) files that should be extracted in every node in the working directory of the executed mappers and reducers (optional, one or more resources of this type can be present)
  • Input - Input files that should be in the HDFS and are input files of the mappers (optional, one or more resources of this type can be present)
  • Output - Output location of the reducers products (optional, exactly one resource of this type can be present)

For our case, the following snippet will declare the resources we need. The first argument of the AttachedGridResource is the name that we want the resource to have once it has been moved to the grid UI node for the resources of type JDL, UserProxy, Config and InData. For the OutData resources, this is the name of the output file that will be restored when we call the glite-wms-job-output so that the engine can retrieve only the output files we want. The second argument represents the path to the resource that we want to attach and is stored in the machine that we are running the adaptor code.