Install and Configure WPS-Hadoop

From Gcube Wiki
Jump to: navigation, search


1. Configure the VM structure

The vm includes mainly:

  • The hadoopApplications folder, it will contain your jar applications. From this folder you can maintain/develope/test your applications (usually in R); when an application is ready you can compress (jar) it and  copy into hadoop hdfs folder.
  • The hadoop hdfs application folder (/algorithmRepository), a folder stored inside the hdfs, which contains all ready applications jars.
  • The wps-hadoop source project (wps-hadoop-source), it contains the whole project source files, including the wps-hadoop process classes. From this project you can add new processes, compile and package a new processes java library to add to the wps-hadoop web-app.
  • The wps-hadoop web-app folder (wps-hadoop-0.1-SNAPSHOT-tomcat-embedded): it contains the Tomcat, the 52nWPS, the wps-hadoop project classes and the java processes classes. From this folder you can update the java wps-hadoop processes, configure the processes list (wps_config.xml), start/stop/restart the tomcat servlet and see the runtime logs for debugging.

Let’s see in practise the vm content.

$ ssh imarine-wp10@10.15.18.10
<><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><>
This system is for the exclusive use of authorized users only. All
activity may be logged and/or monitored.
Unauthorized or improper use of this system may result in administrative
disciplinary action and/or civil and criminal penalties. By
continuing to use this system you indicate your awareness of and
consent to these terms and conditions of use.
LOG OFF IMMEDIATELY if you do not agree to the conditions stated in this warning.
<><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><><>
 
[imarine-wp10@sb-10-15-18-10 ~]$ pwd
/home/imarine-wp10
 
[imarine-wp10@sb-10-15-18-10 ~]$ ll
total 28
drwxr-xr-x 4 imarine-wp10 ciop  4096 Apr 28 16:45 hadoopApplications
drwx------ 2 imarine-wp10 ciop 16384 Jun  3  2013 lost+found
drwxr-xr-x 9 imarine-wp10 ciop  4096 Apr 28 16:17 wps-hadoop-0.1-SNAPSHOT-tomcat-embedded
drwxr-xr-x 6 imarine-wp10 ciop  4096 Apr 28 18:11 wps-hadoop-source

You can see the hadoopApplications folder, the wps-hadoop source folder and the wps-hadoop web-app folder.


1.1. hadoopApplications folder

[imarine-wp10@sb-10-15-18-10 ~]$ tree hadoopApplications/
hadoopApplications/
|-- terradue
    `-- helloWorld
        `-- application
            |-- application.xml
            `-- helloWorld
                |-- bin
                |   `-- helloWorld.r
                |-- lib
                `-- run

In this directory you can structure the applications as you want. in this example we have one example application called “helloWorld”, inside the group “terradue”. Each application folder is always structured in this way, in particular it always must contains an application folder which contains a run bash script. The run script is called by hadoop to parallel iterate the streaming inputs and and (possibly) call your R script for each input.


1.2. algorithmRepository HDFS folder

This folder is not on the classic file system, it’s on the HDFS. You can access to this fs by the hadoop fs command.

[imarine-wp10@sb-10-15-18-10 ~]$ hadoop fs -ls /
Found 5 items
drwxr-xr-x   - imarine-wp10 supergroup          0 2014-04-28 17:26 /algorithmRepository
drwxr-xr-x   - root         supergroup          0 2014-04-16 16:23 /ciop
drwxr-xr-x   - root         supergroup          0 2014-04-16 16:24 /storage
drwxr-xr-x   - imarine-wp10 supergroup          0 2014-04-28 17:26 /store
drwxr-xr-x   - mapred       supergroup          0 2014-04-16 16:23 /var
[imarine-wp10@sb-10-15-18-10 ~]$ hadoop fs -ls /algorithmRepository
Found 1 items
-rw-r--r--   1 imarine-wp10 supergroup       2577 2014-04-28 17:26 /algorithmRepository/helloWorld.jar
 
Note: you can copy files from the local file system to the hdfs by the hadoop fs -copyFromLocal <source> <dest> command.
Note: you can remove files from the hdfs by the hadoop fs -rm <source> command (necessary to replace files).

1.3. wps-hadoop-source folder

Let’s see where are the mainly files:


Processes Java Classes (we have some examples, not all are added as wps-hadoop process)

[imarine-wp10@sb-10-15-18-10 ~]$ tree wps-hadoop-source/src/main/java/com/terradue/wps_hadoop/processes/
wps-hadoop-source/src/main/java/com/terradue/wps_hadoop/processes/
|-- examples
|   |-- async
|   |   `-- Async.java
|   |-- hello_world
|   |   `-- HelloWorld.java
|-- fao
|   `-- spread
|       `-- Spread.java
|-- ird
|   |-- kernel_density
|   |   `-- KernelDensity.java
|   `-- occurrences_generator
|       `-- OccurrencesGenerator.java
`-- terradue
    `-- envi_enrich
        `— EnviEnrich.java

Processes descriptions

[imarine-wp10@sb-10-15-18-10 ~]$ tree wps-hadoop-source/src/main/resources/com/terradue/wps_hadoop/processes/
wps-hadoop-source/src/main/resources/com/terradue/wps_hadoop/processes/
|-- examples
|   |-- async
|   |   `-- Async.xml
|   `-- hello_world
|       `-- HelloWorld.xml
|-- fao
|   `-- spread
|       `-- Spread.xml
|-- ird
|   |-- kernel_density
|   |   `-- KernelDensity.xml
|   `-- occurrences_generator
|       `-- OccurrencesGenerator.xml
`-- terradue
    `-- envi_enrich
        `-- EnviEnrich.xml

1.4. wps-hadoop-0.1-SNAPSHOT-tomcat-embedded folder

wps_config xml file

[imarine-wp10@sb-10-15-18-10 ~]$ ls wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/webapps/wps/config/wps_config.xml 
wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/webapps/wps/config/wps_config.xml
 
Note: after edited the wps_config.xml you must reload the wps server.
You ca do this simply by touch the web.xml:
  [imarine-wp10@sb-10-15-18-10 ~]$ touch wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/webapps/wps/WEB-INF/web.xml
or by restarting the tomcat (hard reload):
  [imarine-wp10@sb-10-15-18-10 ~]$ wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/bin/shutdown.sh 
  Using LD_LIBRARY_PATH: /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/lib/natives
  Using CATALINA_BASE:   /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded
  Using CATALINA_HOME:   /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded
  Using CATALINA_TMPDIR: /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/temp
  Using JRE_HOME:        /usr
  Using CLASSPATH:       /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/bin/bootstrap.jar:/home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/bin/tomcat-juli.jar
  [imarine-wp10@sb-10-15-18-10 ~]$ wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/bin/startup.sh 
  Using LD_LIBRARY_PATH: /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/lib/natives
  Using CATALINA_BASE:   /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded
  Using CATALINA_HOME:   /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded
  Using CATALINA_TMPDIR: /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/temp
  Using JRE_HOME:        /usr
  Using CLASSPATH:       /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/bin/bootstrap.jar:/home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/bin/tomcat-juli.jar

catalina.out log file

[imarine-wp10@sb-10-15-18-10 ~]$ ll wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/logs/catalina.out 
-rw-r--r-- 1 imarine-wp10 ciop 92500 Apr 29 11:07 wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/logs/catalina.out

Note: you can tail the log to debug in real time:

[imarine-wp10@sb-10-15-18-10 ~]$ tail -f wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/logs/catalina.out

wps-hadoop java processes library

[imarine-wp10@sb-10-15-18-10 ~]$ ll wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/webapps/wps/WEB-INF/lib/wps-hadoop-0.1-SNAPSHOT.jar 
-rw-r--r-- 1 imarine-wp10 ciop 196478 Apr 28 16:40 wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/webapps/wps/WEB-INF/lib/wps-hadoop-0.1-SNAPSHOT.jar

Note: this is the file to replace when you update the java processes classes. The jar is obtained after a source build (by maven - see after in the example tutorial), copied from ~/wps-hadoop-source/target/ folder


2. Tutorial: create the indicator_i1 process

2.1. Create the application

We suppose to already have created, tested and configured (including install R libs) the R script which performs the indicator_i1 algorithm. Let’s create the application folder inside the hadoopApplications.


2.1.1. Create the directory structure

[imarine-wp10@sb-10-15-18-10 ~]$ mkdir -p hadoopApplications/ird/indicator_i1      # we choose to put it inside an ird folder group
[imarine-wp10@sb-10-15-18-10 ~]$ cd hadoopApplications/ird/indicator_i1
[imarine-wp10@sb-10-15-18-10 indicator_i1]$ mkdir -p application/indicator_i1/bin/ application/indicator_i1/lib

so we have:

[imarine-wp10@sb-10-15-18-10 indicator_i1]$ tree
.
|-- application
    `-- indicator_i1
        |-- bin
        |-- lib


2.1.2. Copy and integrate the .R script

Note: with the header #!/usr/bin/Rscript you allow to run the script as executable (for smart call by the run bash script) Note: in the pre-processing statements some parameters are set, something from args, others by constants (year_attribute_name) Note: in the post-processing statements the results file are copied to the current directory (Sys.getenv("PWD"))


#!/usr/bin/Rscript --vanilla --slaveargs <- commandArgs(TRUE)
# Francesco Cerasuolo - Terradue
# pre-processing 
 
args <- commandArgs(TRUE)
wfsUrl <- args[1]
typeName <- args[2]
species <- args[3]
 
connection_type <- "remote"
data_type <- "WFS"
url <- wfsUrl
layer <- typeName
ogc_filter <- paste('<ogc:Filter xmlns:ogc="http://www.opengis.net/ogc" xmlns:gml="http://www.opengis.net/gml"><ogc:PropertyIsEqualTo><ogc:PropertyName>species</ogc:PropertyName><ogc:Literal>',species,'</ogc:Literal></ogc:PropertyIsEqualTo></ogc:Filter>',
        sep="")
 
year_attribute_name <- "year"
ocean_attribute_name <- "ocean"
species_attribute_name <- "species"
value_attribute_name <- "value"
 
 
#Norbert Billet - IRD
#2014/01/27: Norbert - Multi sources edit
#2013/08/30: Norbert - Initial edit
#Atlas_i1_SpeciesByOcean : build a graph of catches by ocean and by year
 
#52North WPS annotations
# wps.des: id = Atlas_i1_SpeciesByOcean, title = IRD tuna atlas indicator i1, abstract = Graph of species catches by ocean;
 
# wps.in: id = data_type, type = string, title = Data type (csv or WFS or MDSTServer), value = "WFS";
# wps.in: id = url, type = string, title = Data URL, value = "http://mdst-macroes.ird.fr:8080/constellation/WS/wfs/tuna_atlas";
# wps.in: id = layer, type = string, title = Data layer name, minOccurs = 0, maxOccurs = 1, value = "ns11:i1i2_mv";
# wps.in: id = mdst_query, type = string, title = MDSTServer query. Only used with MDSTServer data type, minOccurs = 0, maxOccurs = 1;
# wps.in: id = ogc_filter, type = string, title = OGC filter to apply on a WFS datasource. Only used with WFS data type, minOccurs = 0, maxOccurs = 1;
# wps.in: id = year_attribute_name, type = string, title = Year attribute name in the input dataset, value = "year";
# wps.in: id = ocean_attribute_name, type = string, title = Ocean attribute name in the input dataset, value = "ocean";
# wps.in: id = species_attribute_name, type = string, title = Species attribute name in the input dataset, value = "species";
# wps.in: id = value_attribute_name, type = string, title = Value attribute name in the input dataset, value = "value";
# wps.in: id = connection_type, type = string, title = Data connection type (local or remote), value = "remote";
 
# wps.out: id = result, type = string, title = List of result files path;
 
 
if(! require(IRDTunaAtlas)) {
  stop("Missing IRDTunaAtlas library")
}
 
df <- readData(connectionType=connection_type,
        dataType=data_type,
        url=url,
        layer=layer,
        MDSTQuery=mdst_query,
        ogcFilter=ogc_filter)
 
result <- Atlas_i1_SpeciesByOcean(df=df,
                        yearAttributeName=year_attribute_name,
                        oceanAttributeName=ocean_attribute_name,
                        speciesAttributeName=species_attribute_name,
                        valueAttributeName=value_attribute_name)
 
 
# Francesco Cerasuolo - Terradue
# post-processing                        
apply(result, 1, function(x) file.copy(x, paste(Sys.getenv("PWD"), basename(x), sep="/")))


2.1.3. Create the run bash script

imarine-wp10@sb-10-15-18-10 indicator_i1]$ vi application/indicator_i1/run 


#!/bin/bash
# INDICATOR I1
 
SUCCESS=0
ERR_NOINPUT=18
ERR_NOOUTPUT=19
ERR_CURL=30
DEBUG_EXIT=66
 
function cleanExit ()
{
    local retval=$?
       local msg=""
       case "$retval" in
        $SUCCESS)
              msg="Processing successfully concluded";;
        $ERR_NOINPUT)
              msg="Unable to retrieve an input file";;
        $ERR_NOOUTPUT)
              msg="No output results";;
                $ERR_CURL)
              msg="curl failed to download the GML from $wfsUrl";;
        $DEBUG_EXIT)
              msg="Breaking at debug exit";;
        *)
                  msg="Unknown error";;
       esac
       [ "$retval" != 0 ] && echo "Error $retval - $msg, processing aborted" || echo "INFO - $msg" 
       exit "$retval"
}
# trap an exit signal to exit properly
trap cleanExit EXIT
 
# evaluating the applicationPath, to resolve environments variable
eval "appPath=\"$applicationPath\""
# evaluating the outputFilesPath (hdfs path)
eval "outFilesPath=\"$outputFilesPath\""
 
# R library path 
export R_LIBS_USER=/application/share/rlibrary/
 
export PATH=$appPath/bin:$PATH
chmod 755 $appPath/bin/*
 
# data input file info
inputDatafileName="inputData.txt"
 
# create and entering work directory 
mkdir -p ./work
cd work
 
#counter (used as key)
count=0
 
type_name=ns11:i1i2_mv
 
# iterate each input (each input is a row and it’s a species identifier)
while read species
do
	# for debug
        echo "INPUT: species=$species"
 
	# call the .R script
        Atlas_i1_SpeciesByOcean.R $wfsUrl $type_name $species
 
        # saving produced output files on the hdfs (subfolder: exec<count>)
        keyDir="exec$count"
        path="$outFilesPath$keyDir"
 
        # hdfs output directory
        hadoop fs -mkdir $path
 
        # create an input info file
        echo "species=$species" > $inputDatafileName
 
        # copy all files to the hdfs path
        hadoop fs -copyFromLocal ./* $path/
 
        # cleanup
        rm -f $inputDatafileName
 
        let "count += 1"
done

2.2. Create the application jar and put it into the HDFS

2.2.1. Compress the application folder in a .jar file

Note: you must maintain the jar name with the same name of the application folder name. Simply, from the indicator_i1 folder:


[imarine-wp10@sb-10-15-18-10 indicator_i1]$ jar cvf indicator_i1.jar application
added manifest
adding: application/(in = 0) (out= 0)(stored 0%)
adding: application/indicator_i1/(in = 0) (out= 0)(stored 0%)
adding: application/indicator_i1/run(in = 1871) (out= 918)(deflated 50%)
adding: application/indicator_i1/bin/(in = 0) (out= 0)(stored 0%)
adding: application/indicator_i1/bin/Atlas_i1_SpeciesByOcean.R(in = 3020) (out= 1095)(deflated 63%)
adding: application/indicator_i1/lib/(in = 0) (out= 0)(stored 0%)
 
 
[imarine-wp10@sb-10-15-18-10 indicator_i1]$ ll
total 16
drwxr-xr-x 3 imarine-wp10 ciop 4096 Apr 29 12:25 application
-rw-r--r-- 1 imarine-wp10 ciop 6322 Apr 29 14:06 indicator_i1.jar
[imarine-wp10@sb-10-15-18-10 indicator_i1]

2.2.2. Copy the jar into the HDFS

From the indicator_i1 folder:

# remove previous jar if present
hadoop fs -rm /algorithmRepository/indicator_i1.jar
# copy the jar
hadoop fs -copyFromLocal ./indicator_i1.jar /algorithmRepository/
# check if the jar is added
hadoop fs -ls /algorithmRepository
Found 2 items
-rw-r--r--   1 imarine-wp10 supergroup       2577 2014-04-28 17:26 /algorithmRepository/helloWorld.jar
-rw-r--r--   1 imarine-wp10 supergroup       6322 2014-04-29 14:21 /algorithmRepository/indicator_i1.jar

Now the application is stored into the hdfs repository and it can be called from the wps-hadoop web-app.


2.3. Create the process description xml file

As we saw at par.1.3., we must create the process xml process description inside the ~/wps-hadoop-source/src/main/resources/com/terradue/wps_hadoop/processes/ directory. We choose to organise the xml into the subpath ird/indicator/. Note: It’s important to have the path aligned to the class package path (see in 2.4). Note: The path+processName will be the process identifier.

[imarine-wp10@sb-10-15-18-10 ~]$ vi wps-hadoop-source/src/main/resources/com/terradue/wps_hadoop/processes/ird/indicator/IndicatorI1.xml
 
 
<?xml version="1.0" encoding="UTF-8"?>
<wps:ProcessDescriptions xmlns:wps="http://www.opengis.net/wps/1.0.0"
        xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:xlink="http://www.w3.org/1999/xlink"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.opengis.net/wps/1.0.0 http://geoserver.itc.nl:8080/wps/schemas/wps/1.0.0/wpsDescribeProcess_response.xsd"
        xml:lang="en-US" service="WPS" version="1.0.0">
        <ProcessDescription wps:processVersion="1.0.0"
                storeSupported="true" statusSupported="false">
                <ows:Identifier>IndicatorI1</ows:Identifier>
                <ows:Title>IRD Tuna Atlas Indicator i1</ows:Title>
                <ows:Abstract>Graph of catches of a given species.</ows:Abstract>
                <ows:Metadata xlink:title="Biodiversity"/>
                <DataInputs>
                        <Input minOccurs="1" maxOccurs="2147483647">
                                <ows:Identifier>species</ows:Identifier>
                                <ows:Title>Species Names</ows:Title>
                                <ows:Abstract>Species Names</ows:Abstract>
                                <LiteralData>
                                        <ows:DataType ows:reference="xs:string"></ows:DataType>
                                        <ows:AllowedValues>
                                                <ows:Value>YFT</ows:Value>
                                                <ows:Value>SKJ</ows:Value>
                                                <ows:Value>BET</ows:Value>
                                                <ows:Value>ALB</ows:Value>
                                                <ows:Value>BFT</ows:Value>
                                                <ows:Value>SBF</ows:Value>
                                                <ows:Value>SFA</ows:Value>
                                                <ows:Value>BLM</ows:Value>
                                                <ows:Value>MLS</ows:Value>
                                                <ows:Value>BIL</ows:Value>
                                                <ows:Value>SWO</ows:Value>
                                                <ows:Value>SSP</ows:Value>
                                        </ows:AllowedValues>
                                </LiteralData>
                        </Input>
                        <Input minOccurs="0" maxOccurs="1">
                                <ows:Identifier>wfsUrl</ows:Identifier>
                                <ows:Title>WFS Url</ows:Title>
                                <ows:Abstract>WFS Url</ows:Abstract>
                                <LiteralData>
                                        <ows:DataType ows:reference="xs:string"></ows:DataType>
                                        <ows:AnyValue/>
                                </LiteralData>
                        </Input>
                </DataInputs>
                <ProcessOutputs>
                        <Output>
                                <ows:Identifier>result</ows:Identifier>
                                <ows:Title>result</ows:Title>
                                <ows:Abstract>result</ows:Abstract>
                                <ComplexOutput>
                                        <Default>
                                                <Format>
                                                        <MimeType>application/xml</MimeType>
                                                </Format>
                                        </Default>
                                        <Supported>
                                                <Format>
                                                        <MimeType>application/xml</MimeType>
                                                </Format>
                                        </Supported>
                                </ComplexOutput>
                        </Output>
                </ProcessOutputs>
        </ProcessDescription>
</wps:ProcessDescriptions>


2.4. Create the wps-hadoop process java class(es)

In this section we create the java class (or the classes, if need) to implement a wps process which can easy act on hadoop. It’s important to define well: 1. which parameters are taken from the wps execute request (specified in the process description) 2. how these parameters are processed (if we need to transform them) 3. which parameters are passed to the hadoop streaming 4. which parameters (from parameters taken in 3th) are set as fixed parameters 5. which parameter (from parameters taken in 3th) is set as inputResource (determining the parallelism) 6. 7. For this indicator_i1 example, we have: • speciesCodes, as inputResource • wfsUrl, as fixed parameter

The wfsUrl are taken as is from wps execute request, while the speciesCodes are created starting from species names list (from wps execute request too), by searching into a default list of all species and managing case-free characters.

The mainly class created is, in this case, IndicatorI1.java, and it must extends StreamingAbstractAlgorithm. At least one class like this must be created. However, in this case two simple classes are created: Constants.java and SpeciesCodes.java (species codes simple db with search engine). Note: We choose to create the classes inside the package com.terradue.wps_hadoop.processes.ird.indicator, the same path structure of the process description created.

Here’s the list of java classes:


[imarine-wp10@sb-10-15-18-10 ~]$ ll wps-hadoop-source/src/main/java/com/terradue/wps_hadoop/processes/ird/indicator/
total 24
-rw-r--r-- 1 imarine-wp10 ciop  377 Apr 28 17:35 Constants.java
-rw-r--r-- 1 imarine-wp10 ciop 3026 Apr 28 17:35 IndicatorI1.java
-rw-r--r-- 1 imarine-wp10 ciop 1545 Apr 28 17:35 SpeciesCodes.java


2.4.1. IndicatorI1.java

[imarine-wp10@sb-10-15-18-10 ~]$ vi wps-hadoop-source/src/main/java/com/terradue/wps_hadoop/processes/ird/indicator/IndicatorI1.java
 
 
/**
 * 
 */
package com.terradue.wps_hadoop.processes.ird.indicator;
 
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
import org.apache.log4j.Logger;
import org.n52.wps.io.data.IData;
import org.n52.wps.io.data.binding.complex.GenericFileDataBinding;
import org.n52.wps.io.data.binding.literal.LiteralStringBinding;
 
import com.terradue.wps_hadoop.common.input.InputUtils;
import com.terradue.wps_hadoop.common.input.ListInputResource;
import com.terradue.wps_hadoop.streaming.ResultsInfo;
import com.terradue.wps_hadoop.streaming.StreamingAbstractAlgorithm;
import com.terradue.wps_hadoop.streaming.StreamingPackagedAlgorithm;
import com.terradue.wps_hadoop.streaming.WpsHadoopConfiguration;
 
/**
 * @author fcerasuolo
 *
 */
public class IndicatorI1 extends StreamingAbstractAlgorithm {
 
    protected final Logger logger = Logger.getLogger(getClass());
        private List<String> errors = new ArrayList<String>();
 
        @Override
        public Map<String, IData> run(Map<String, List<IData>> inputData) {
                List<String> names = InputUtils.getListStringInputParameter(inputData, "species", true);
                List<String> speciesCodes = getSpeciesCodesFromSpeciesNames(names);
                String wfsUrl = InputUtils.getStringInputParameter(inputData, "wfsUrl");
 
                logger.info("Running Job TUNA ATLAS INDICATOR I1...");
 
                // get the configuration with default values
                WpsHadoopConfiguration conf = new WpsHadoopConfiguration();
 
                // create a new hadoop streaming algorithm
                StreamingPackagedAlgorithm streaming = new StreamingPackagedAlgorithm(conf);
 
                // set algorithm name
                streaming.setAlgorithmName("indicator_i1");
 
		// used if you want to copy the jar at runtime into the hdfs, for quick tests
//              streaming.setAlgorithmPackage(new File("/home/imarine-wp10/hadoopApplications/ird/indicator_i1/indicator_i1.jar"), true);
 
		// set the input resource
                streaming.setInputResource(new ListInputResource(speciesCodes));
 
                // adding parameters
                streaming.addFixedParameter("wfsUrl", wfsUrl==null ? Constants.defaultWfsUrl : wfsUrl);
 
		// set verbose debug mode (default false)
                streaming.setDebugMode(true);
 
                try {
                        // let's run!
                        ResultsInfo result = streaming.runAsync(this);
 
                        Map<String, IData> wpsResultMap = new HashMap<String, IData>();
                        wpsResultMap.put("result", result.getXmlFileDataBinding());
                        return wpsResultMap;
 
                } catch (Exception e) {
                        e.printStackTrace();
                        throw new RuntimeException("Execution job failed! " + e.getMessage());
                }
        }
 
        @Override
        public List<String> getErrors() {
                return errors;
        }
 
        @Override
        public Class<?> getInputDataType(String id) {
                return LiteralStringBinding.class;
        }
 
        @Override
        public Class<?> getOutputDataType(String id) {
                if (id.contentEquals("result"))
                        return GenericFileDataBinding.class;
                else
                        return null;
        }
 
        /**
         * @param names
         * @return
         */
        private List<String> getSpeciesCodesFromSpeciesNames(List<String> names) {
                List<String> speciesCodes = new ArrayList<String>();
                for (String name: names)
                        speciesCodes.add(SpeciesCodes.getSpeciesCode(name));
                return speciesCodes;
        }
}


2.4.2. SpeciesCodes.java

/**
 * 
 */
package com.terradue.wps_hadoop.processes.ird.indicator;
 
/**
 * @author "Francesco Cerasuolo (francesco.cerasuolo@terradue.com)"
 *
 */
public class SpeciesCodes {
 
        private static String csv[] = {
                "YFT,Thunnus albacares,Albacore,Rabil,Yellowfin tuna",
                "SKJ,Katsuwonus pelamis,Listao,Listado,Ocean skipjack",
                "BET,Thunnus obesus,Thon obese,Patudo,Bigeye tuna",
                "ALB,Thunnus alalunga,Germon,Atun blanco,Albacore",
                "BFT,Thunnus thynnus thynnus,Thon rouge,Atun rojo,Bluefin tuna",
                "SBF,Thunnus maccoyii,Thon rouge du sud,Atun rojo del sur,Southern bluefin tuna",
                "SFA,Istiophorus platypterus,Voilier Indo-Pacifique,Pez vela del Indo-Pacifico,Indo-Pacific sailfish",
                "BLM,Makaira indica,Makaire noir,Aguja negra,Black marlin",
                "BUM,Makaira nigricans,Makaire bleu Atlantique,Aguja azul,Atlantic blue marlin",
                "MLS,Tetrapturus audax,Marlin raye,Marlin rayado,Striped marlin",
                "BIL,Istiophoridae spp.,Poissons a rostre non classes,,Unclassified marlin",
                "SWO,Xiphias gladius,Espadon,Pez espada,Broadbill swordfish",
                "SSP,Tetrapturus angustirostris,Makaire a rostre court,Marlin trompa corta,short-billed spearfish",
        };
 
        protected static String getSpeciesCode(String speciesName) {
 
                speciesName = speciesName.toUpperCase();
 
                for (String csvRow: csv) {
                        csvRow = csvRow.toUpperCase();
                        String[] words = csvRow.split(",");
                        String code = words[0];
                        for (String word: words)
                                if (word.contentEquals(speciesName))
                                        return code;
                }
 
                throw new RuntimeException("Species not found.");
        }
 
}


2.4.3. Constants.java

/**
 * 
 */
package com.terradue.wps_hadoop.processes.ird.indicator;
 
/**
 * @author "Francesco Cerasuolo (francesco.cerasuolo@terradue.com)"
 *
 */
public class Constants {
        protected static final String defaultWfsUrl =   "http://mdst-macroes.ird.fr:8080/constellation/WS/wfs/tuna_atlas";
}


2.5. Build and package the sources

Now you can compile, build and package all. From ~/wps-hadoop-source directory, by using maven:


[imarine-wp10@sb-10-15-18-10 ~]$ cd wps-hadoop-source/
[imarine-wp10@sb-10-15-18-10 wps-hadoop-source]$ mvn clean package -P wps,linux-x86_64
 
[INFO] Scanning for projects...
....
....
....
....
[INFO] Building tar : /home/imarine-wp10/wps-hadoop-source/target/wps-hadoop-1.2.0-SNAPSHOT-bin.tar.gz
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 13.225s
[INFO] Finished at: Tue Apr 29 15:34:45 CEST 2014
[INFO] Final Memory: 24M/184M
[INFO] ------------------------------------------------------------------------

After the mvn execution, you can see what’s generated into the target folder:


[imarine-wp10@sb-10-15-18-10 wps-hadoop-source]$ ll target/
total 22072
drwxr-xr-x 2 imarine-wp10 ciop     4096 Apr 29 15:36 archive-tmp
drwxr-xr-x 4 imarine-wp10 ciop     4096 Apr 29 15:36 classes
drwxr-xr-x 2 imarine-wp10 ciop     4096 Apr 29 15:36 maven-archiver
drwxr-xr-x 2 imarine-wp10 ciop     4096 Apr 29 15:36 surefire
-rw-r--r-- 1 imarine-wp10 ciop 22394573 Apr 29 15:36 wps-hadoop-1.2.0-SNAPSHOT-bin.tar.gz
-rw-r--r-- 1 imarine-wp10 ciop   153429 Apr 29 15:36 wps-hadoop-1.2.0-SNAPSHOT.jar

The wps-hadoop-1.2.0-SNAPSHOT.jar  library is all you need.


2.6. Configure the wps-hadoop web-app

Few more steps: copy the jar library obtained in 2.5, update the wps_config.xml including this new process, and restart the web application.


2.6.1. Copy the jar library

You simply copy the jar from target folder inside wps-hadoop-source to lib directory of wps-hadoop web-app:

[imarine-wp10@sb-10-15-18-10 ~]$ cp wps-hadoop-source/target/wps-hadoop-1.2.0-SNAPSHOT.jar wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/webapps/wps/WEB-INF/lib/


2.6.2. Update the wps_config.xml

[imarine-wp10@sb-10-15-18-10 ~]$ vi wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/webapps/wps/config/wps_config.xml 

You’ve to simply add a property

<Property name="Algorithm" active="true">com.terradue.wps_hadoop.processes.ird.indicator.IndicatorI1</Property>
inside //AlgorithmRepositoryList/Repository[@name=“UploadedAlgorithmRepository]
 
 
...
        <AlgorithmRepositoryList>
                <Repository name="UploadedAlgorithmRepository" className="org.n52.wps.server.UploadedAlgorithmRepository" active="true">
                        <Property name="Algorithm" active="true">com.terradue.wps_hadoop.processes.ird.indicator.IndicatorI1</Property>
                </Repository>
 
        </AlgorithmRepositoryList>
        <RemoteRepositoryList/>
        <Server hostname="sb-10-15-18-10.ird.terradue.int" hostport="8888" includeDataInputsInResponse="false" computationTimeoutMilliSeconds="5"
                cacheCapabilites="false" webappPath="wps" repoReloadInterval="0">
                <Database/>
        </Server>
</WPSConfiguration>

Notice that the property inner text is com.terradue.wps_hadoop.processes.ird.indicator.IndicatorI1, exactly the package+className(without .java)


2.6.3. Restart the wps-hadoop web application

You ca do this simply by touch the web.xml:

  [imarine-wp10@sb-10-15-18-10 ~]$ touch wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/webapps/wps/WEB-INF/web.xml

or by restarting the tomcat (hard reload):

  [imarine-wp10@sb-10-15-18-10 ~]$ wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/bin/shutdown.sh 
  Using LD_LIBRARY_PATH: /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/lib/natives
  Using CATALINA_BASE:   /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded
  Using CATALINA_HOME:   /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded
  Using CATALINA_TMPDIR: /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/temp
  Using JRE_HOME:        /usr
  Using CLASSPATH:       /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/bin/bootstrap.jar:/home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/bin/tomcat-juli.jar
  [imarine-wp10@sb-10-15-18-10 ~]$ wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/bin/startup.sh 
  Using LD_LIBRARY_PATH: /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/lib/natives
  Using CATALINA_BASE:   /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded
  Using CATALINA_HOME:   /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded
  Using CATALINA_TMPDIR: /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/temp
  Using JRE_HOME:        /usr
  Using CLASSPATH:       /home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/bin/bootstrap.jar:/home/imarine-wp10/wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/bin/tomcat-juli.jar

Now the process is plugged inside the wps-hadoop system. Note: the restart can take some seconds; you see the progress by see in realtime the log (tail -f  wps-hadoop-0.1-SNAPSHOT-tomcat-embedded/logs/catalina.out)


2.7. Run, Test and Debug

2.7.1. Run & Test

getCapabilities: http://sb-10-15-18-10.ird.terradue.int:8888/wps/WebProcessingService?Request=GetCapabilities&Service=WPS You should see the new process created:


<wps:Process wps:processVersion="1.0.0">
<ows:Identifier>
com.terradue.wps_hadoop.processes.ird.indicator.IndicatorI1
</ows:Identifier>
<ows:Title>IRD Tuna Atlas Indicator i1</ows:Title>
</wps:Process>


describeProcess: http://sb-10-15-18-10.ird.terradue.int:8888/wps/WebProcessingService?Service=WPS&Version=1.0.0&Request=DescribeProcess&Identifier=com.terradue.wps_hadoop.processes.ird.indicator.IndicatorI1 You should see the process description xml.

executeProcess (example):   async: http://sb-10-15-18-10.ird.terradue.int:8888/wps/WebProcessingService?service=wps&version=1.0.0&request=Execute&identifier=com.terradue.wps_hadoop.processes.ird.indicator.IndicatorI1&dataInputs=species=BFT;species=BFT;&ResponseDocument=result sync: http://sb-10-15-18-10.ird.terradue.int:8888/wps/WebProcessingService?service=wps&version=1.0.0&request=Execute&identifier=com.terradue.wps_hadoop.processes.ird.indicator.IndicatorI1&dataInputs=species=BFT;species=BFT;&ResponseDocument=result&storeExecuteResponse=true&status=true


The wps-hadoop web-app include a simple wps-webclient, you can access it by http://sb-10-15-18-10.ird.terradue.int:8888/client.html


2.7.2. Debug

During wps-hadoop process execution, you can check the catalina.out log: inside it it’s displayed the hadoop tracking url and the status. If you follow the url, you can see all map/reduce attempts with logs, in real time.


2014-04-29 16:19:24,008 [pool-21-thread-6] INFO  org.apache.hadoop.streaming.StreamJob: (by wps-hadoop) JobId: job_201404161623_0007
2014-04-29 16:19:24,008 [pool-21-thread-6] INFO  org.apache.hadoop.streaming.StreamJob: To kill this job, run:
2014-04-29 16:19:24,009 [pool-21-thread-6] INFO  org.apache.hadoop.streaming.StreamJob: UNDEF/bin/hadoop job  -Dmapred.job.tracker=sb-10-15-18-10.ird.terradue.int:8021 -kill job_201404161623_0007
2014-04-29 16:19:24,037 [pool-21-thread-6] INFO  org.apache.hadoop.streaming.StreamJob: Tracking URL: http://sb-10-15-18-10.ird.terradue.int:50030/jobdetails.jsp?jobid=job_201404161623_0007
2014-04-29 16:19:24,129 [pool-21-thread-4] INFO  org.n52.wps.server.request.ExecuteRequest: Update received from Subject, state changed to : 73
2014-04-29 16:19:24,131 [pool-21-thread-4] INFO  org.apache.hadoop.streaming.StreamJob:  map 100%  reduce 33%
2014-04-29 16:19:25,039 [pool-21-thread-6] INFO  org.n52.wps.server.request.ExecuteRequest: Update received from Subject, state changed to : 0
2014-04-29 16:19:25,039 [pool-21-thread-6] INFO  org.apache.hadoop.streaming.StreamJob:  map 0%  reduce 0%
2014-04-29 16:19:25,137 [pool-21-thread-4] INFO  org.n52.wps.server.request.ExecuteRequest: Update received from Subject, state changed to : 100
2014-04-29 16:19:25,139 [pool-21-thread-4] INFO  org.apache.hadoop.streaming.StreamJob:  map 100%  reduce 100%
2014-04-29 16:19:28,146 [pool-21-thread-4] INFO  org.apache.hadoop.streaming.StreamJob: Job complete: job_201404161623_0005
2014-04-29 16:19:28,428 [pool-21-thread-4] INFO  org.apache.hadoop.streaming.StreamJob: Output: /store/a5cc3063-b8d8-4422-ad32-0c7f387e07dd/output/

Note: it’s convenient to set streaming.setDebugMode(true); inside your java process class. In this way you can see the bash run file execution in debug mode, and into the tracking url you can see each bash statement execution.