Difference between revisions of "WorkflowHiveQLAdaptor"

From Gcube Wiki
Jump to: navigation, search
(Created page with '=Overview= This adaptor is an external to the adaptors offered by the WorkflowEngine constructs an Execution Plan based on the query defi…')
 
Line 1: Line 1:
 
=Overview=
 
=Overview=
This adaptor is an external to the adaptors offered by the [[WorkflowEngine]] constructs an [[ExecutionEngine#Execution_Plan | Execution Plan]] based on the query defined in HiveQL. This query can vary from a simple transformation job to a comple distributed MapReduce job. The query is parsed and the adaptor then processes the retrieved parsed info to create the [[ExecutionEngine#Execution_Plan | Execution Plan]]. During plan creation the [https://gcube.wiki.gcube-system.org/gcube/index.php/Search_Operators Search Operators] are being exploited.
+
This adaptor, external to the adaptors offered by the [[WorkflowEngine]], constructs an [[ExecutionEngine#Execution_Plan | Execution Plan]] based on the query defined in HiveQL. This query can vary from a simple transformation job to a complex distributed MapReduce job. The query is parsed and the adaptor then processes the retrieved parsed info to create the [[ExecutionEngine#Execution_Plan | Execution Plan]]. During plan creation the [https://gcube.wiki.gcube-system.org/gcube/index.php/Search_Operators Search Operators] are being exploited.
  
=HiveQL Support=
+
=HiveQL=
==Query==
+
 
While based on SQL, HiveQL offers extensions, including loading/exporting data from outer sources and create table as select, but only offers basic support for indexes. Also, HiveQL lacks support for transactions and materialized views,
 
While based on SQL, HiveQL offers extensions, including loading/exporting data from outer sources and create table as select, but only offers basic support for indexes. Also, HiveQL lacks support for transactions and materialized views,
 
and only limited subquery support. Nevertheless, the main outstanding feature resides on ease of writing queries that can be  
 
and only limited subquery support. Nevertheless, the main outstanding feature resides on ease of writing queries that can be  
Line 9: Line 8:
  
 
===Query Example===
 
===Query Example===
Here follows a simple example of a job expressed in HiveQL, composed by multiple statements. In brief, data stored locally to a given path, are retrieved and distributed by a particular field before transforming each record by the provided script. Finally, output after being merged is directed to the provided local location.
+
Here follows a simple example of a job expressed in HiveQL, composed by multiple statements. In brief, data stored locally to a given path, are retrieved and distributed by a particular field before transforming each record by the provided script. Finally, output after being merged is directed to the provided local location. All import/export path can also be uri like ftp, jdbc etc.
<source lang=xml>
+
<source lang=sql>
 
CREATE TABLE u_data (
 
CREATE TABLE u_data (
 
   userid INT,
 
   userid INT,
Line 47: Line 46:
  
 
===Operator Plan===
 
===Operator Plan===
 
+
<source lang=xml>
<Operation>
+
<Operation>
 
   <Functionality>DATASINK</Functionality>
 
   <Functionality>DATASINK</Functionality>
 
   <Indications>
 
   <Indications>
  schema-[userid, movieid, rating, weekday],
+
  schema-[userid, movieid, rating, weekday],  
  tableName-u_data_new,
+
  tableName-u_data_new,  
  sink-file:/tmp/u_data_new
+
  sink-file:/tmp/u_data_new
 
   </Indications>
 
   </Indications>
 
   <Children>
 
   <Children>
  <Operation>
+
    <Operation>
    <Functionality>SELECT</Functionality>
+
      <Functionality>SELECT</Functionality>
    <Indications>
+
      <Indications>
    schema-[userid, movieid, rating, weekday],
+
      schema-[userid, movieid, rating, weekday],  
    filterMask-[0, 1, 2, 3]
+
      filterMask-[0, 1, 2, 3]
    </Indications>
+
       </Indications>
    <Children>
+
    <Operation>
+
       <Functionality>MERGE</Functionality>
+
      <Indications>schema-[_col0, _col1, _col2, _col3]</Indications>
+
 
       <Children>
 
       <Children>
      <Operation>
+
        <Operation>
        <Functionality>SCRIPT</Functionality>
+
          <Functionality>MERGE</Functionality>
        <Indications>
+
          <Indications>schema-[_col0, _col1, _col2, _col3]</Indications>
        schema-[_col0, _col1, _col2, _col3],
+
        source-/home/user/weekday_mapper.py,
+
        scriptCmd-python weekday_mapper.py
+
        </Indications>
+
        <Children>
+
        <Operation>
+
          <Functionality>PARTITION</Functionality>
+
          <Indications>
+
          schema-[userid, movieid, rating, unixtime],
+
          clusterBy-[1],
+
          order-+
+
          </Indications>
+
 
           <Children>
 
           <Children>
          <DataSource>
+
            <Operation>
            <<nowiki>Source</nowiki>>u_data</<nowiki>Source</nowiki>>
+
              <Functionality>SCRIPT</Functionality>
            <Indications>
+
              <Indications>
            schema-[userid, movieid, rating, unixtime],
+
              schema-[_col0, _col1, _col2, _col3],
            input-file:/home/user/u.data,
+
              source-/home/user/weekday_mapper.py,
            filterMask-[0, 1, 2, 3]
+
              scriptCmd-python weekday_mapper.py
            </Indications>
+
              </Indications>
          </DataSource>
+
              <Children>
 +
                <Operation>
 +
                  <Functionality>PARTITION</Functionality>
 +
                  <Indications>
 +
                    schema-[userid, movieid, rating, unixtime],
 +
                    clusterBy-[1],
 +
                    order-+
 +
                </Indications>
 +
                  <Children>
 +
                    <DataSource>
 +
                      <<nowiki>Source</nowiki>>u_data</<nowiki>Source</nowiki>>
 +
                      <Indications>
 +
                        schema-[userid, movieid, rating, unixtime],  
 +
                        input-file:/home/user/u.data,  
 +
                        filterMask-[0, 1, 2, 3]
 +
                      </Indications>
 +
                    </DataSource>
 +
                  </Children>
 +
                </Operation>
 +
              </Children>
 +
            </Operation>
 
           </Children>
 
           </Children>
        </Operation>
+
         </Operation>
         </Children>
+
      </Operation>
+
 
       </Children>
 
       </Children>
    </Operation>
+
     </Operation>
     </Children>
+
  </Operation>
+
 
   </Children>
 
   </Children>
</Operation>
+
</Operation>
 +
</source>
 +
 
 +
Constructed plan can be seen in next figure
  
Constructed plan can be seen in next figure
 
  
 
=Highlights=
 
=Highlights=
 
==Parallelization factor==
 
==Parallelization factor==
The adaptor will create the [[ExecutionEngine#ExecutionPlan | ExecutionPlan]] that will orchestrate the execution of a DAG of jobs as a series of [[ExecutionPlan_Elements#Sequence | Sequence]]. Depending on the transformation query, the adaptor may create a single sequential plan to multiple chains. For example, if query specifies “Distribute by” command, with value equal to integer, the query will be executed in parallelization factor equal to the value. Otherwise, a table field may be specified, thus parallelization will be done according to that field cardinality. For each unique value of that field and new sequential plan is created to handle corresponding records.
+
The adaptor will create the [[ExecutionEngine#ExecutionPlan | ExecutionPlan]] that will orchestrate the execution of a DAG of jobs as a series of [[ExecutionPlan_Elements#Sequence | Sequence]]. Depending on the transformation query, the adaptor may create a single sequential plan to multiple chains. For example, if query specifies “Distribute by” command, with a value equal to integer, the query will be executed in parallelization factor equal to that value. Otherwise, a table field may be specified, thus parallelization will be done according to that field's cardinality. For each unique value of that field a new sequential plan is created to handle corresponding records.
  
 
==User Defined Function==
 
==User Defined Function==
 
User can provide the script of his choice that will be used during transformation. The script must consume data provided to standard input and produce output. Each record is splitted in separated lines while each record field is delimited by tab ‘\t’ character.
 
User can provide the script of his choice that will be used during transformation. The script must consume data provided to standard input and produce output. Each record is splitted in separated lines while each record field is delimited by tab ‘\t’ character.
 +
<source lang=python>
 +
import sys
 +
import datetime
  
 +
for line in sys.stdin:
 +
  line = line.strip()
 +
  userid, movieid, rating, unixtime = line.split('\t')
 +
  weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
 +
  print '\t'.join([userid, movieid, rating, str(weekday)])
 +
</source>
 
=Usage=
 
=Usage=
The following snippets demonstrate the usage of the adaptor as a library:
+
The following snippet demonstrate the usage of the adaptor as a library:
 
<source lang=java>
 
<source lang=java>
 
String query = args[0];
 
String query = args[0];

Revision as of 16:28, 19 November 2013

Overview

This adaptor, external to the adaptors offered by the WorkflowEngine, constructs an Execution Plan based on the query defined in HiveQL. This query can vary from a simple transformation job to a complex distributed MapReduce job. The query is parsed and the adaptor then processes the retrieved parsed info to create the Execution Plan. During plan creation the Search Operators are being exploited.

HiveQL

While based on SQL, HiveQL offers extensions, including loading/exporting data from outer sources and create table as select, but only offers basic support for indexes. Also, HiveQL lacks support for transactions and materialized views, and only limited subquery support. Nevertheless, the main outstanding feature resides on ease of writing queries that can be translated into a DAG of jobs that comply with MapReduce programming model.

Query Example

Here follows a simple example of a job expressed in HiveQL, composed by multiple statements. In brief, data stored locally to a given path, are retrieved and distributed by a particular field before transforming each record by the provided script. Finally, output after being merged is directed to the provided local location. All import/export path can also be uri like ftp, jdbc etc.

CREATE TABLE u_data (
  userid INT,
  movieid INT,
  rating INT,
  unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
 
LOAD DATA LOCAL INPATH '/home/user/u.data'
OVERWRITE INTO TABLE u_data;
 
ADD FILE /home/USER/weekday_mapper.py;
 
CREATE TABLE u_data_new (
  userid INT,
  movieid INT,
  rating INT,
  weekday INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
 
FROM u_data
INSERT OVERWRITE TABLE u_data_new
SELECT
 TRANSFORM(userid, movieid, rating, unixtime)
 USING 'python weekday_mapper.py'
 AS userid, movieid, rating, weekday
CLUSTER BY movieid;
 
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/u_data_new'
SELECT u_data_new.*
FROM u_data_new;

Operator Plan

<Operation>
  <Functionality>DATASINK</Functionality>
  <Indications>
  	schema-[userid, movieid, rating, weekday], 
  	tableName-u_data_new, 
  	sink-file:/tmp/u_data_new
  </Indications>
  <Children>
    <Operation>
      <Functionality>SELECT</Functionality>
      <Indications>
      	schema-[userid, movieid, rating, weekday], 
      	filterMask-[0, 1, 2, 3]
      </Indications>
      <Children>
        <Operation>
          <Functionality>MERGE</Functionality>
          <Indications>schema-[_col0, _col1, _col2, _col3]</Indications>
          <Children>
            <Operation>
              <Functionality>SCRIPT</Functionality>
              <Indications>
              	schema-[_col0, _col1, _col2, _col3], 
              	source-/home/user/weekday_mapper.py, 
              	scriptCmd-python weekday_mapper.py
              </Indications>
              <Children>
                <Operation>
                  <Functionality>PARTITION</Functionality>
                  <Indications>
                    schema-[userid, movieid, rating, unixtime], 
                    clusterBy-[1], 
                    order-+
                 </Indications>
                  <Children>
                    <DataSource>
                      <<nowiki>Source</nowiki>>u_data</<nowiki>Source</nowiki>>
                      <Indications>
                        schema-[userid, movieid, rating, unixtime], 
                        input-file:/home/user/u.data, 
                        filterMask-[0, 1, 2, 3]
                      </Indications>
                    </DataSource>
                  </Children>
                </Operation>
              </Children>
            </Operation>
          </Children>
        </Operation>
      </Children>
    </Operation>
  </Children>
</Operation>

Constructed plan can be seen in next figure


Highlights

Parallelization factor

The adaptor will create the ExecutionPlan that will orchestrate the execution of a DAG of jobs as a series of Sequence. Depending on the transformation query, the adaptor may create a single sequential plan to multiple chains. For example, if query specifies “Distribute by” command, with a value equal to integer, the query will be executed in parallelization factor equal to that value. Otherwise, a table field may be specified, thus parallelization will be done according to that field's cardinality. For each unique value of that field a new sequential plan is created to handle corresponding records.

User Defined Function

User can provide the script of his choice that will be used during transformation. The script must consume data provided to standard input and produce output. Each record is splitted in separated lines while each record field is delimited by tab ‘\t’ character.

import sys
import datetime
 
for line in sys.stdin:
  line = line.strip()
  userid, movieid, rating, unixtime = line.split('\t')
  weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
  print '\t'.join([userid, movieid, rating, str(weekday)])

Usage

The following snippet demonstrate the usage of the adaptor as a library:

String query = args[0];
WorkflowHiveQLAdaptor adaptor = new WorkflowHiveQLAdaptor();
 
HiveQLPlanner planner = new HiveQLPlanner();
planner .processLine(query);
PlanNode plan = OperatorAnalyzer.optimizePlan(planner.getCreatedPlan());
 
adaptor.setPlanDesc(plan);
adaptor.CreatePlan();
String output = adaptor.ExecutePlan();