How-to Implement Algorithms for DataMiner
Contents
- 1 Prerequisites
- 2 Step by Step
- 3 Customize input visualization
- 4 Case of algorithms using databases
- 5 Customize output
- 6 Test the algorithm
- 7 Properties File and Deploy
- 8 Complete Example with multiple outputs
- 9 Integrating R Scripts
- 10 Enabling Cloud Computing for R Scripts
- 11 Video
- 12 Related Links
Prerequisites
IDE: Eclipse Java EE IDE for Web Developers. Version: 3.7+
We advice you to also follow this video:
http://i-marine.eu/Content/eTraining.aspx?id=e1777006-a08c-49ad-b2e6-c13e094f27d4
Maven[1] repository configuration: File:Settings.xml
Hello World Algorithm: File:Hello-world-algorithm.zip
Step by Step
Let's start by creating a project using the eclipse IDE that is mavenized according to our indications. After having mavenized the project in eclipse you have to put dependencies.
Maven coordinates
The maven artifact coordinates are:
<dependencyManagement> <dependencies> <dependency> <groupId>org.gcube.distribution</groupId> <artifactId>gcube-bom</artifactId> <version>LATEST</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.gcube.dataanalysis</groupId> <artifactId>ecological-engine</artifactId> <version>[1.6.0-SNAPSHOT,2.0.0-SNAPSHOT)</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>[4.12,)</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> <scope>test</scope> </dependency> ..... </dependencies>
And add BasicConfigurator.configure(); at the beginning of your test methods (AND ONLY IN THE TEST METHODS) to activate the logs.
Lets start creating a new call which implements a basic algorithm; it will be executed by the DataMiner.
The next step is to extend a basic interface StandardLocalExternalAlgorithm
.
The following snippet shows unimplemented interface methods that we are going to fulfill.
public class SimpleAlgorithm extends StandardLocalExternalAlgorithm{ @Override public void init() throws Exception { // TODO Auto-generated method stub } @Override public String getDescription() { // TODO Auto-generated method stub return null; } @Override protected void process() throws Exception { // TODO Auto-generated method stub } @Override protected void setInputParameters() { // TODO Auto-generated method stub } @Override public void shutdown() { // TODO Auto-generated method stub } @Override public StatisticalType getOutput() { return null; } }
The init()
is the initialization method. In this simple example we need to initialize the loging facility and we use the logger from the ecological engine library. In case the algorithm uses a database, we have to open its connection in this method.
The shutdown()
closes database connection.
In the getDescription()
method we add a simple description for the algorithm.
Customize input visualization
String input parameters
The user's input is obtained by calling from setInputParameters()
the method addStringInput with following parameters:
- name of the variable ;
- description for the variable;
- default value;
User input is retrieved using getInputParameter()
passing name used as parameter into setInputParameters()
.
protected void setInputParameters() { addStringInput(NameOfVariable, "Description", "DefaultInput"); }
The input parameter will be automatically passed by DataMiner to the procedure. In particular, to process the method we can retrieve such parameter by name that we set in the addStringInput method.
@Override protected void process() throws Exception { .... String userInputValue = getInputParameter(NameOfVariable); }
Combo box input parameter
In order to obtain a combo box we have to define a enumerator that contains the possible
choices that could be selected in the combo box and you have to pass it to the method addEnumerateInput
as follows:
public enum Enum { FIRST_ENUM, SECOND_ENUM } protected void setInputParameters() { addEnumerateInput(Enum.values(), variableName, "Description", Enum.FIRST_ENUM.name()); }
addEnumerateInput
parameters are respectively:
- values of declared enumerator;
- name of variable used to extract value insert by user;
- description of value;
- default value visualized in comboBox
File input parameter
User can be upload his data in the DataMiner as file. After the uploading of a file, it's possible to use uploaded data as input for an algorithm.
@Override protected void setInputParameters() { inputs.add(new PrimitiveType(File.class.getName(), null, PrimitiveTypes.FILE, "inputFileParameterName", "Input File Description", "Input File Name")); } @Override protected void process() throws Exception { String fileParameter = getInputParameter("inputFileParameterName"); FileInputStream fileStream = new FileInputStream(fileParameter); }
Import input from the DataMiner database
User can be upload his data in the DataMiner "Access to the Data Space" Section. After the uploading of a file (for example csv file), it's possible to use uploaded data as input for an algorithm. In order to select the columns values of a table that is extrapolated from csv, an algorithm developer fulfills the methods in the following way:
@Override protected void setInputParameters() { List<TableTemplates> templates = new ArrayList<TableTemplates>(); templates.add(TableTemplates.GENERIC); InputTable tinput = new InputTable(templates, "Table","Table Description"); ColumnTypesList columns = new ColumnTypesList("Table","Columns", "Selceted Columns Description", false); inputs.add(tinput); inputs.add(columns); DatabaseType.addDefaultDBPars(inputs); } @Override protected void process() throws Exception { { config.setParam("DatabaseDriver", "org.postgresql.Driver"); SessionFactory dbconnection = DatabaseUtils.initDBSession(config); String[] columnlist = columnnames.split(AlgorithmConfiguration.getListSeparator()); List<Object> speciesList = DatabaseFactory.executeSQLQuery("select " + columnlist[0]+ " from " + tablename, dbconnection); }
Advanced Input
It is possible to indicate spatial inputs or time/date inputs. The details for the definition of these dare are reported in the Advanced Input page .
Case of algorithms using databases
In order to use a database it is required to call, into setInputParameters()
, the method addRemoteDatabaseInput()
.
An important step is to pass as first parameter the name of the Runtime Resource addressing the database.
The DataMiner automatically retrieves thew following parameters from the runtime resource: url ,user and password. Into the process method, before database connection, url,user and password will be retrieve using getInputParameter
. Each of them is retrieved using the name and passing it into addRemoteDatabaseInput
as parameters.
@Override protected void setInputParameters() { ... addRemoteDatabaseInput("Obis2Repository", urlParameterName,userParameterName, passwordParameterName, "driver", "dialect"); @Override protected void process() throws Exception { ... String databaseJdbc = getInputParameter(urlParameterName); String databaseUser = getInputParameter(userParameterName); String databasePwd = getInputParameter(passwordParameterName); connection = DriverManager.getConnection(databaseJdbc, databaseUser,databasePwd); ... }
Customize output
The last step is to set and to specify output of procedure.
For this purpose we override the method getOutput()
which return StatisticalType.
First output parameter we instantiate is a PrimitiveType object that wraps a string; so, we set type as string.
We associate name and description to the output value.
We can istantiate a second output as an another PrimitiveType
We set them as a map which will keep the order of the parameter used to store both output.
We add both the output object into the map.
getOutput()
procedure which will invoke DataMiner to understand type of the output object and at this point in the ecological engine library the algorithm will be indexed with the name set in the file of property.
String Output
In ordert to have a string as output you have to create a PrimitiveType
as follows:
@Override public StatisticalType getOutput() { …. PrimitiveType val = new PrimitiveType(String.class.getName(), myString , PrimitiveTypes.STRING, stringName, defaultValue); return val; }
Bar Chart Output
In order to create an Histogram Chart you have to fulfill a DafaultCategoryDataser
object and use it to create chart
DefaultCategoryDataset dataset; … dataset.addValue(...); …. @Override public StatisticalType getOutput() { …. HashMap<String, Image> producedImages = new HashMap<String, Image>(); JFreeChart chart = HistogramGraph.createStaticChart(dataset); Image image = ImageTools.toImage(chart.createBufferedImage(680, 420)); producedImages.put("Species Observations", image); … }
Timeseries Chart Output
In order to create a TimeSeries Chart you have to fulfill a DafaultCategoryDataser
object and use it to create the chart.
The second parameter of createStatiChart method is the format of time.
DefaultCategoryDataset dataset; … dataset.addValue(...); …. @Override public StatisticalType getOutput() { ... HashMap<String, Image> producedImages = new HashMap<String, Image>(); JFreeChart chart = TimeSeriesGraph.createStaticChart(dataset, "yyyy"); Image image = ImageTools.toImage(chart.createBufferedImage(680, 420)); producedImages.put("TimeSeries chart", image); ... }
File Output
In order to create a results file that user can download, algorithm developers have to add following code:
protected String fileName; protected BufferedWriter out; @Override protected void process() throws Exception { //Note you must add timestamp to the file name // fileName = super.config.getPersistencePath() + "results.csv"; out = new BufferedWriter(new FileWriter(fileName)); out.write(results); out.newLine(); } @Override public StatisticalType getOutput() { ... PrimitiveType file = new PrimitiveType(File.class.getName(), new File(fileName), PrimitiveTypes.FILE, "Description ", "Default value"); map.put("Output",file); ... }
Test the algorithm
This is a template example to test an algorithm from Eclipse. Download the following folder https://goo.gl/r16rfF and put it locally to the code.
package org.gcube.dataanalysis.ecoengine.test.regression; import java.util.List; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; import org.gcube.dataanalysis.ecoengine.evaluation.bioclimate.InterpolateTables.INTERPOLATIONFUNCTIONS; import org.gcube.dataanalysis.ecoengine.interfaces.ComputationalAgent; import org.gcube.dataanalysis.ecoengine.interfaces.Transducerer; public class TestTransducers { public static void main(String[] args) throws Exception { System.out.println("TEST 1"); ComputationalAgent computationalAgent = new yourClassName(); computationalAgent.setConfiguration(testConfigLocal()); computationalAgent.init(); Regressor.process(computationalAgent); computationalAgent.shutdown(); } private static AlgorithmConfiguration testConfigLocal() { AlgorithmConfiguration config = Regressor.getConfig(); config.setAgent("OCCURRENCES_DUPLICATES_DELETER"); config.setParam("longitudeColumn", "decimallongitude"); config.setParam("latitudeColumn", "decimallatitude"); config.setParam("recordedByColumn", "recordedby"); config.setParam("scientificNameColumn", "scientificname"); config.setParam("eventDateColumn", "eventdate"); config.setParam("lastModificationColumn", "modified"); config.setParam("OccurrencePointsTableName", "whitesharkoccurrences2"); config.setParam("finalTableName", "whitesharkoccurrencesnoduplicates"); config.setParam("spatialTolerance", "0.5"); config.setParam("confidence", "80"); return config; } }
Properties File and Deploy
In order to deploy an algorithm we must create:
- the jar corresponding to the eclipse Java project containing the algorithm;
- a file of property containing the name you want the algorithm to be displayed on the GUI and the classpath to algorithm class. E.g. MY_ALGORITHM=org.gcube.cnr.Myalgorithm
You must provide these two files to the i-Marine team. They will move the algorithm onto a DataMiner instance and the interface will be automatically generated.
In the following example, inside the src/main/java folder, the package org.gcube.dataanalysis.myAlgorithms
exists that contains the class SimpleAlgorithm
implementing an algorithm.
SIMPLE_ALGORITHM=org.gcube.dataanalysis.myrAlgorithms.SimpleAlgorithm
Complete Example with multiple outputs
public class AbsoluteSpeciesBarChartsAlgorithm extends StandardLocalExternalAlgorithm { LinkedHashMap<String, StatisticalType> map = new LinkedHashMap<String, StatisticalType>(); static String databaseName = "DatabaseName"; static String userParameterName = "DatabaseUserName"; static String passwordParameterName = "DatabasePassword"; static String urlParameterName = "DatabaseURL"; private String firstSpeciesNumber="Species"; private String yearStart="Starting_year"; private String yearEnd="Ending_year"; private int speciesNumber; private DefaultCategoryDataset defaultcategorydataset; @Override public void init() throws Exception { AnalysisLogger.getLogger().debug("Initialization"); } @Override public String getDescription() { return "Algorithm returning bar chart of most observed species in a specific years range (with respect to the OBIS database)"; } @Override protected void process() throws Exception { defaultcategorydataset = new DefaultCategoryDataset(); String driverName = "org.postgresql.Driver"; String tmp=getInputParameter(firstSpeciesNumber); speciesNumber = Integer.parseInt(tmp); Class driverClass = Class.forName(driverName); Driver driver = (Driver) driverClass.newInstance(); String databaseJdbc = getInputParameter(urlParameterName); String year_start = getInputParameter(yearStart); String year_end = getInputParameter(yearEnd); String databaseUser = getInputParameter(userParameterName); String databasePwd = getInputParameter(passwordParameterName); Connection connection = null; connection = DriverManager.getConnection(databaseJdbc, databaseUser, databasePwd); Statement stmt = connection.createStatement(); String query = "SELECT tname, sum(count)AS count FROM public.count_species_per_year WHERE year::integer >=" + year_start + "AND year::integer <=" + year_end + "GROUP BY tname ORDER BY count desc;"; ResultSet rs = stmt.executeQuery(query); int i =0; String s = "Species"; while (rs.next()&& i<speciesNumber) { String tname = rs.getString("tname"); String count = rs.getString("count"); int countOcc=Integer.parseInt(count); // First output (list of string) PrimitiveType val = new PrimitiveType(String.class.getName(), count, PrimitiveTypes.STRING, tname, tname); map.put(tname, val); if(i<16) defaultcategorydataset.addValue(countOcc,s,tname); else break; i++; } connection.close(); } @Override protected void setInputParameters() { addStringInput(firstSpeciesNumber, "Number of shown species", "10"); addStringInput(yearStart, "Starting year of observations", "1800"); addStringInput(yearEnd, "Ending year of observations", "2020"); addRemoteDatabaseInput("Obis2Repository", urlParameterName, userParameterName, passwordParameterName, "driver", "dialect"); } @Override public void shutdown() { AnalysisLogger.getLogger().debug("Shutdown"); } @Override public StatisticalType getOutput() { PrimitiveType p = new PrimitiveType(Map.class.getName(), PrimitiveType.stringMap2StatisticalMap(outputParameters), PrimitiveTypes.MAP, "Discrepancy Analysis",""); AnalysisLogger.getLogger().debug("MapsComparator: Producing Gaussian Distribution for the errors"); //build image: HashMap<String, Image> producedImages = new HashMap<String, Image>(); JFreeChart chart = HistogramGraph.createStaticChart(defaultcategorydataset); Image image = ImageTools.toImage(chart.createBufferedImage(680, 420)); producedImages.put("Species Observations", image); PrimitiveType images = new PrimitiveType(HashMap.class.getName(), producedImages, PrimitiveTypes.IMAGES, "ErrorRepresentation", "Graphical representation of the error spread"); //end build image AnalysisLogger.getLogger().debug("Bar Charts Species Occurrences Produced"); //collect all the outputs map.put("Result", p); map.put("Images", images); //generate a primitive type for the collection PrimitiveType output = new PrimitiveType(HashMap.class.getName(), map, PrimitiveTypes.MAP, "ResultsMap", "Results Map"); return output; } }
Integrating R Scripts
DataMiner (DM) supports R scripts integration. This section explains how to integrate R scripts that will be executed by one single powerful machine in sequential mode. The calculation will be distributed on one of the machines that make up the DataMiner system, and the DM will automatically account for multi-users requests management. This section does not deal with parallel processing enabled for the script, which will be discussed later.
In the Eclipse project, download the following configuration folder: http://goo.gl/bNKrZK Then add the following maven dependency:
<dependency> <groupId>org.gcube.dataanalysis</groupId> <artifactId>ecological-engine-smart-executor</artifactId> <version>[1.0.0-SNAPSHOT,2.0.0)</version> </dependency>
Then copy an R script inside the cfg folder. The DM framework assumes that the R file (i) accepts an input file whose name is hard-coded in the script, (ii) produces an output file whose name is hard-coded in the script, (iii) requires an R context made up of user's variables, (iv) possibly requires custom adjustment to the code.
The DM framework facilitates the call to the script by adding context variables "on the fly" and managing multi-user synchronous calls. This mechanism is performed by generating new on-the-fly temporary R scripts for each user. The DM will be also responsible for distributing the script on one powerful machine. Required packages are assumed to be preinstalled on the backend system.
One example of an algorithm calling a complex interpolation model is the following:
package org.gcube.dataanalysis.executor.rscripts; import java.io.File; import java.util.HashMap; import java.util.LinkedHashMap; import org.gcube.contentmanagement.lexicalmatcher.utils.AnalysisLogger; import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType; import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; import org.gcube.dataanalysis.ecoengine.datatypes.enumtypes.PrimitiveTypes; import org.gcube.dataanalysis.ecoengine.interfaces.StandardLocalExternalAlgorithm; import org.gcube.dataanalysis.executor.util.RScriptsManager; public class SGVMS_Interpolation extends StandardLocalExternalAlgorithm { private static int maxPoints = 10000; public enum methodEnum { cHs, SL}; RScriptsManager scriptmanager; String outputFile; @Override public void init() throws Exception { AnalysisLogger.getLogger().debug("Initializing SGVMS_Interpolation"); } @Override public String getDescription() { return "An interpolation method relying on the implementation by the Study Group on VMS (SGVMS). The method uses two interpolation approached to simulate vessels points at a certain temporal resolution. The input is a file in TACSAT format uploaded on the DataMiner. The output is another TACSAT file containing interpolated points." + "The underlying R code has been extracted from the SGVM VMSTools framework. This algorithm comes after a feasibility study (http://goo.gl/risQre) which clarifies the features an e-Infrastructure adds to the original scripts. Limitation: the input will be processed up to "+maxPoints+" vessels trajectory points."; } @Override protected void process() throws Exception { status = 0; //instantiate the R Script executor scriptmanager = new RScriptsManager(); //this is the script name String scriptName = "interpolateTacsat.r"; //absolute path to the input, provided by the DM String inputFile = config.getParam("InputFile"); AnalysisLogger.getLogger().debug("Starting SGVM Interpolation-> Config path "+config.getConfigPath()+" Persistence path: "+config.getPersistencePath()); //default input and outputs String defaultInputFileInTheScript = "tacsat.csv"; String defaultOutputFileInTheScript = "tacsat_interpolated.csv"; //input parameters: represent the context of the script. Values will be assigned in the R environment. LinkedHashMap<String,String> inputParameters = new LinkedHashMap<String, String>(); inputParameters.put("npoints",config.getParam("npoints")); inputParameters.put("interval",config.getParam("interval")); inputParameters.put("margin",config.getParam("margin")); inputParameters.put("res",config.getParam("res")); inputParameters.put("fm",config.getParam("fm")); inputParameters.put("distscale",config.getParam("distscale")); inputParameters.put("sigline",config.getParam("sigline")); inputParameters.put("minspeedThr",config.getParam("minspeedThr")); inputParameters.put("maxspeedThr",config.getParam("maxspeedThr")); inputParameters.put("headingAdjustment",config.getParam("headingAdjustment")); inputParameters.put("equalDist",config.getParam("equalDist").toUpperCase()); //add static context variables inputParameters.put("st", "c(minspeedThr,maxspeedThr)"); inputParameters.put("fast", "TRUE"); inputParameters.put("method", "\""+config.getParam("method")+"\""); AnalysisLogger.getLogger().debug("Starting SGVM Interpolation-> Input Parameters: "+inputParameters); //if other code injection is required, put the strings to substitute as keys and the substituting ones as values HashMap<String,String> codeInjection = null; //force the script to produce an output file, otherwise generate an exception boolean scriptMustReturnAFile = true; boolean uploadScriptOnTheInfrastructureWorkspace = false; //the DataMiner service will manage the upload AnalysisLogger.getLogger().debug("SGVM Interpolation-> Executing the script "); status = 10; //execute the script in multi-user mode scriptmanager.executeRScript(config, scriptName, inputFile, inputParameters, defaultInputFileInTheScript, defaultOutputFileInTheScript, codeInjection, scriptMustReturnAFile,uploadScriptOnTheInfrastructureWorkspace, config.getConfigPath()); //assign the file path to an output variable for the DM outputFile = scriptmanager.currentOutputFileName; AnalysisLogger.getLogger().debug("SGVM Interpolation-> Output File is "+outputFile); status = 100; } @Override protected void setInputParameters() { //declare the input parameters the user will set: they will basically correspond to the R context inputs.add(new PrimitiveType(File.class.getName(), null, PrimitiveTypes.FILE, "InputFile", "Input file in TACSAT format. E.g. http://goo.gl/i16kPw")); addIntegerInput("npoints", "The number of pings or positions required between each real or actual vessel position or ping", "10"); addIntegerInput("interval", "Average time in minutes between two adjacent datapoints", "120"); addIntegerInput("margin", "Maximum deviation from specified interval to find adjacent datapoints (tolerance)", "10"); addIntegerInput("res", "Number of points to use to create interpolation (including start and end point)", "100"); addEnumerateInput(methodEnum.values(), "method","Set to cHs for cubic Hermite spline or SL for Straight Line interpolation", "cHs"); addDoubleInput("fm", "The FM parameter in cubic interpolation", "0.5"); addIntegerInput("distscale", "The DistScale parameter for cubic interpolation", "20"); addDoubleInput("sigline", "The Sigline parameter in cubic interpolation", "0.2"); addDoubleInput("minspeedThr", "A filter on the minimum speed to take into account for interpolation", "2"); addDoubleInput("maxspeedThr", "A filter on the maximum speed to take into account for interpolation", "6"); addIntegerInput("headingAdjustment", "Parameter to adjust the choice of heading depending on its own or previous point (0 or 1). Set 1 in case the heading at the endpoint does not represent the heading of the arriving vessel to that point but the departing vessel.", "0"); inputs.add(new PrimitiveType(Boolean.class.getName(), null, PrimitiveTypes.BOOLEAN, "equalDist", "Whether the number of positions returned should be equally spaced or not", "true")); } @Override public StatisticalType getOutput() { //return the output file by the procedure to the DM PrimitiveType o = new PrimitiveType(File.class.getName(), new File(outputFile), PrimitiveTypes.FILE, "OutputFile", "Output file in TACSAT format."); return o; } @Override public void shutdown() { //in the case of forced shutdown, stop the R process if (scriptmanager!=null) scriptmanager.stop(); System.gc(); } }
In order to test the above algorithm, just modify the "transducerers.properties" file inside the cfg folder by adding the following string:
SGVM_INTERPOLATION=org.gcube.dataanalysis.executor.rscripts.SGVMS_Interpolation
which will assign a name to the algorithm. Then a test class for this algorithm will be the following:
package org.gcube.dataanalysis.executor.tests; import java.util.List; import org.gcube.dataanalysis.ecoengine.configuration.AlgorithmConfiguration; import org.gcube.dataanalysis.ecoengine.datatypes.PrimitiveType; import org.gcube.dataanalysis.ecoengine.datatypes.StatisticalType; import org.gcube.dataanalysis.ecoengine.interfaces.ComputationalAgent; import org.gcube.dataanalysis.ecoengine.test.regression.Regressor; public class TestSGVMInterpolation { public static void main(String[] args) throws Exception { // setup the configuration AlgorithmConfiguration config = new AlgorithmConfiguration(); // set the path to the cfg folder and to the PARALLEL_PROCESSING folder config.setConfigPath("./cfg/"); config.setPersistencePath("./PARALLEL_PROCESSING"); //set the user's inputs. They will passed by the DM to the script in the following way: config.setParam("InputFile", "<absolute path to the file>/tacsatmini.csv"); //put the absolute path to the input file config.setParam("npoints", "10"); config.setParam("interval", "120"); config.setParam("margin", "10"); config.setParam("res", "100"); config.setParam("method", "SL"); config.setParam("fm", "0.5"); config.setParam("distscale", "20"); config.setParam("sigline", "0.2"); config.setParam("minspeedThr", "2"); config.setParam("maxspeedThr", "6"); config.setParam("headingAdjustment", "0"); config.setParam("equalDist", "true"); //set the scope and the user (optional for this test) config.setGcubeScope( "/gcube/devsec/devVRE"); config.setParam("ServiceUserName", "test.user"); //set the name of the algorithm to call, as is is in the transducerer.properties file config.setAgent("SGVM_INTERPOLATION"); //recall the transducerer with the above name ComputationalAgent transducer = new SGVMS_Interpolation(); tansducer.setConfiguration(config); //init the transducer transducer.init(); //start the process Regressor.process(transducer); //retrieve the output StatisticalType st = transducer.getOutput(); System.out.println("st:"+((PrimitiveType)st).getContent()); } }
Enabling Cloud Computing for R Scripts
In the case of a process running in the Infrastructure and using Cloud computing, you have to extend the ActorNode class, define how to setup the process, chunkize the input space, run the script and perform the Reduce phase. These steps are performed using the following methods respectively:
- setup(AlgorithmConfiguration config)
- getNumberOfRightElements()
- getNumberOfLeftElements()
- postProcess(boolean manageDuplicates, boolean manageFault)
package org.gcube.dataanalysis.executor.nodes.algorithms; public class LWR extends ActorNode { public String destinationTable; public String destinationTableLabel; public String originTable; public String familyColumn; public int count; public float status = 0; //specify the kind of parallel process: the following performs a matrix-to-matrix comparison @Override public ALG_PROPS[] getProperties() { ALG_PROPS[] p = { ALG_PROPS.PHENOMENON_VS_PARALLEL_PHENOMENON }; return p; } @Override public String getName() { return "LWR"; } @Override public String getDescription() { return "An algorithm to estimate Length-Weight relationship parameters for marine species, using Bayesian methods. Runs an R procedure. Based on the Cube-law theory."; } @Override public List<StatisticalType> getInputParameters() { List<TableTemplates> templateLWRInput = new ArrayList<TableTemplates>(); templateLWRInput.add(TableTemplates.GENERIC); InputTable p1 = new InputTable(templateLWRInput, "LWR_Input", "Input table containing taxa and species information", "lwr"); ColumnType p3 = new ColumnType("LWR_Input", "FamilyColumn", "The column containing Family information", "Family", false); ServiceType p4 = new ServiceType(ServiceParameters.RANDOMSTRING, "RealOutputTable", "name of the resulting table", "lwr_"); PrimitiveType p2 = new PrimitiveType(String.class.getName(), null, PrimitiveTypes.STRING, "TableLabel", "Name of the table which will contain the model output", "lwrout"); List<StatisticalType> parameters = new ArrayList<StatisticalType>(); parameters.add(p1); parameters.add(p3); parameters.add(p2); parameters.add(p4); DatabaseType.addDefaultDBPars(parameters); return parameters; } @Override public StatisticalType getOutput() { List<TableTemplates> template = new ArrayList<TableTemplates>(); template.add(TableTemplates.GENERIC); OutputTable p = new OutputTable(template, destinationTableLabel, destinationTable, "Output lwr table"); return p; } @Override public void initSingleNode(AlgorithmConfiguration config) { } @Override public float getInternalStatus() { return status; } private static String scriptName = "UpdateLWR_4.R"; //the inputs delivered by the DM are: the index and number of elements to take from the left and right tables, the indication on if the same requeste was yet asked to another worker node (in the case of errors), the sandobox folder in which the script will be executed, the configuration of the algorithm @Override public int executeNode(int leftStartIndex, int numberOfLeftElementsToProcess, int rightStartIndex, int numberOfRightElementsToProcess, boolean duplicate, String sandboxFolder, String nodeConfigurationFileObject, String logfileNameToProduce) { String insertQuery = null; try { status = 0; //reconstruct the configuration AlgorithmConfiguration config = Transformations.restoreConfig(nodeConfigurationFileObject); config.setConfigPath(sandboxFolder); System.out.println("Initializing DB"); //take the parameters and possibly initialize connection to the DB dbconnection = DatabaseUtils.initDBSession(config); destinationTableLabel = config.getParam("TableLabel"); destinationTable = config.getParam("RealOutputTable"); System.out.println("Destination Table: "+destinationTable); System.out.println("Destination Table Label: "+destinationTableLabel); originTable = config.getParam("LWR_Input"); familyColumn = config.getParam("FamilyColumn"); System.out.println("Origin Table: "+originTable); // take the families to process List<Object> families = DatabaseFactory.executeSQLQuery(DatabaseUtils.getDinstictElements(originTable, familyColumn, ""), dbconnection); // transform the families into a string StringBuffer familiesFilter = new StringBuffer(); familiesFilter.append("Families <- Fam.All["); int end = rightStartIndex + numberOfRightElementsToProcess; //build the substitution string for (int i = rightStartIndex; i < end; i++) { familiesFilter.append("Fam.All == \"" + families.get(i) + "\""); if (i < end - 1) familiesFilter.append(" | "); } familiesFilter.append("]"); //substitution to perform in the script String substitutioncommand = "sed -i 's/Families <- Fam.All[Fam.All== \"Acanthuridae\" | Fam.All == \"Achiridae\"]/" + familiesFilter + "/g' " + "UpdateLWR_Test2.R"; System.out.println("Preparing for processing the families names: "+familiesFilter.toString()); substituteInScript(sandboxFolder+scriptName,sandboxFolder+"UpdateLWR_Tester.R","Families <- Fam.All[Fam.All== \"Acanthuridae\" | Fam.All == \"Achiridae\"]",familiesFilter.toString()); //for test only System.out.println("Creating local file from remote table"); // download the table in csv format to feed the procedure DatabaseUtils.createLocalFileFromRemoteTable(sandboxFolder+"RF_LWR.csv", originTable, ",", config.getDatabaseUserName(),config.getDatabasePassword(),config.getDatabaseURL()); String headers = "Subfamily,Family,Genus,Species,FBname,SpecCode,AutoCtr,Type,a,b,CoeffDetermination,Number,LengthMin,Score,BodyShapeI"; System.out.println("Adding headers to the file"); String headerscommand = "sed -i '1s/^/"+headers+"\\n/g' "+"RF_LWR2.csv"; // substitute the string in the RCode addheader(sandboxFolder+"RF_LWR.csv",sandboxFolder+"RF_LWR2.csv",headers); System.out.println("Headers added"); System.out.println("Executing R script " + "R --no-save < UpdateLWR_Tester.R"); // run the R code: it can be alternatively made with the methods of the previous example Process process = Runtime.getRuntime().exec("R --no-save"); BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(process.getOutputStream())); bw.write("source('UpdateLWR_Tester.R')\n"); bw.write("q()\n"); bw.close(); BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream())); String line = br.readLine(); System.out.println(line); while (line!=null){ line = br.readLine(); System.out.println(line); } process.destroy(); System.out.println("Appending csv to table"); // transform the output into table StringBuffer lines = readFromCSV("LWR_Test1.csv"); insertQuery = DatabaseUtils.insertFromBuffer(destinationTable, columnNames, lines); DatabaseFactory.executeSQLUpdate(insertQuery, dbconnection); System.out.println("The procedure was successful"); status = 1f; } catch (Exception e) { e.printStackTrace(); System.out.println("warning: error in node execution " + e.getLocalizedMessage()); System.out.println("Insertion Query: "+insertQuery); System.err.println("Error in node execution " + e.getLocalizedMessage()); return -1; } finally { if (dbconnection != null) try { dbconnection.close(); } catch (Exception e) { } } return 0; } //setup phase of the algorithm @Override public void setup(AlgorithmConfiguration config) throws Exception { destinationTableLabel = config.getParam("TableLabel"); AnalysisLogger.getLogger().info("Table Label: "+destinationTableLabel); destinationTable = config.getParam("RealOutputTable"); AnalysisLogger.getLogger().info("Uderlying Table Name: "+destinationTable); originTable = config.getParam("LWR_Input"); AnalysisLogger.getLogger().info("Original Table: "+originTable); familyColumn = config.getParam("FamilyColumn"); AnalysisLogger.getLogger().info("Family Column: "+familyColumn); haspostprocessed = false; AnalysisLogger.getLogger().info("Initializing DB Connection"); dbconnection = DatabaseUtils.initDBSession(config); List<Object> families = DatabaseFactory.executeSQLQuery(DatabaseUtils.getDinstictElements(originTable, familyColumn, ""), dbconnection); count = families.size(); //create the table were the script will write the output DatabaseFactory.executeSQLUpdate(String.format(createOutputTable, destinationTable), dbconnection); AnalysisLogger.getLogger().info("Destination Table Created! Addressing " + count + " species"); } @Override public int getNumberOfRightElements() { return count; //each Worker node has to get all the elements in the right table } @Override public int getNumberOfLeftElements() { return 1; //each Worker node has to get only one element in the left table } @Override public void stop() { //if has not postprocessed, then abort the computations by removing the database table if (!haspostprocessed){ try{ AnalysisLogger.getLogger().info("The procedure did NOT correctly postprocessed ....Removing Table "+destinationTable+" because of computation stop!"); DatabaseFactory.executeSQLUpdate(DatabaseUtils.dropTableStatement(destinationTable), dbconnection); }catch (Exception e) { AnalysisLogger.getLogger().info("Table "+destinationTable+" did not exist"); } } else AnalysisLogger.getLogger().info("The procedure has correctly postprocessed: shutting down the connection!"); if (dbconnection != null) try { dbconnection.close(); } catch (Exception e) { } } boolean haspostprocessed = false; @Override public void postProcess(boolean manageDuplicates, boolean manageFault) { haspostprocessed=true; } }
Video
We advice you to also follow this video which practically show how to build an algorithm:
http://i-marine.eu/Content/eTraining.aspx?id=e1777006-a08c-49ad-b2e6-c13e094f27d4