Difference between revisions of "WorkflowHiveQLAdaptor"
(Created page with '=Overview= This adaptor is an external to the adaptors offered by the WorkflowEngine constructs an Execution Plan based on the query defi…') |
|||
Line 1: | Line 1: | ||
=Overview= | =Overview= | ||
− | This adaptor | + | This adaptor, external to the adaptors offered by the [[WorkflowEngine]], constructs an [[ExecutionEngine#Execution_Plan | Execution Plan]] based on the query defined in HiveQL. This query can vary from a simple transformation job to a complex distributed MapReduce job. The query is parsed and the adaptor then processes the retrieved parsed info to create the [[ExecutionEngine#Execution_Plan | Execution Plan]]. During plan creation the [https://gcube.wiki.gcube-system.org/gcube/index.php/Search_Operators Search Operators] are being exploited. |
− | =HiveQL | + | =HiveQL= |
− | + | ||
While based on SQL, HiveQL offers extensions, including loading/exporting data from outer sources and create table as select, but only offers basic support for indexes. Also, HiveQL lacks support for transactions and materialized views, | While based on SQL, HiveQL offers extensions, including loading/exporting data from outer sources and create table as select, but only offers basic support for indexes. Also, HiveQL lacks support for transactions and materialized views, | ||
and only limited subquery support. Nevertheless, the main outstanding feature resides on ease of writing queries that can be | and only limited subquery support. Nevertheless, the main outstanding feature resides on ease of writing queries that can be | ||
Line 9: | Line 8: | ||
===Query Example=== | ===Query Example=== | ||
− | Here follows a simple example of a job expressed in HiveQL, composed by multiple statements. In brief, data stored locally to a given path, are retrieved and distributed by a particular field before transforming each record by the provided script. Finally, output after being merged is directed to the provided local location. | + | Here follows a simple example of a job expressed in HiveQL, composed by multiple statements. In brief, data stored locally to a given path, are retrieved and distributed by a particular field before transforming each record by the provided script. Finally, output after being merged is directed to the provided local location. All import/export path can also be uri like ftp, jdbc etc. |
− | <source lang= | + | <source lang=sql> |
CREATE TABLE u_data ( | CREATE TABLE u_data ( | ||
userid INT, | userid INT, | ||
Line 47: | Line 46: | ||
===Operator Plan=== | ===Operator Plan=== | ||
− | + | <source lang=xml> | |
− | + | <Operation> | |
<Functionality>DATASINK</Functionality> | <Functionality>DATASINK</Functionality> | ||
<Indications> | <Indications> | ||
− | + | schema-[userid, movieid, rating, weekday], | |
− | + | tableName-u_data_new, | |
− | + | sink-file:/tmp/u_data_new | |
</Indications> | </Indications> | ||
<Children> | <Children> | ||
− | + | <Operation> | |
− | + | <Functionality>SELECT</Functionality> | |
− | + | <Indications> | |
− | + | schema-[userid, movieid, rating, weekday], | |
− | + | filterMask-[0, 1, 2, 3] | |
− | + | </Indications> | |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
<Children> | <Children> | ||
− | + | <Operation> | |
− | + | <Functionality>MERGE</Functionality> | |
− | + | <Indications>schema-[_col0, _col1, _col2, _col3]</Indications> | |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
<Children> | <Children> | ||
− | + | <Operation> | |
− | + | <Functionality>SCRIPT</Functionality> | |
− | + | <Indications> | |
− | + | schema-[_col0, _col1, _col2, _col3], | |
− | + | source-/home/user/weekday_mapper.py, | |
− | + | scriptCmd-python weekday_mapper.py | |
− | + | </Indications> | |
− | + | <Children> | |
+ | <Operation> | ||
+ | <Functionality>PARTITION</Functionality> | ||
+ | <Indications> | ||
+ | schema-[userid, movieid, rating, unixtime], | ||
+ | clusterBy-[1], | ||
+ | order-+ | ||
+ | </Indications> | ||
+ | <Children> | ||
+ | <DataSource> | ||
+ | <<nowiki>Source</nowiki>>u_data</<nowiki>Source</nowiki>> | ||
+ | <Indications> | ||
+ | schema-[userid, movieid, rating, unixtime], | ||
+ | input-file:/home/user/u.data, | ||
+ | filterMask-[0, 1, 2, 3] | ||
+ | </Indications> | ||
+ | </DataSource> | ||
+ | </Children> | ||
+ | </Operation> | ||
+ | </Children> | ||
+ | </Operation> | ||
</Children> | </Children> | ||
− | + | </Operation> | |
− | + | ||
− | + | ||
</Children> | </Children> | ||
− | + | </Operation> | |
− | + | ||
− | + | ||
</Children> | </Children> | ||
− | + | </Operation> | |
+ | </source> | ||
+ | |||
+ | Constructed plan can be seen in next figure | ||
− | |||
=Highlights= | =Highlights= | ||
==Parallelization factor== | ==Parallelization factor== | ||
− | The adaptor will create the [[ExecutionEngine#ExecutionPlan | ExecutionPlan]] that will orchestrate the execution of a DAG of jobs as a series of [[ExecutionPlan_Elements#Sequence | Sequence]]. Depending on the transformation query, the adaptor may create a single sequential plan to multiple chains. For example, if query specifies “Distribute by” command, with value equal to integer, the query will be executed in parallelization factor equal to | + | The adaptor will create the [[ExecutionEngine#ExecutionPlan | ExecutionPlan]] that will orchestrate the execution of a DAG of jobs as a series of [[ExecutionPlan_Elements#Sequence | Sequence]]. Depending on the transformation query, the adaptor may create a single sequential plan to multiple chains. For example, if query specifies “Distribute by” command, with a value equal to integer, the query will be executed in parallelization factor equal to that value. Otherwise, a table field may be specified, thus parallelization will be done according to that field's cardinality. For each unique value of that field a new sequential plan is created to handle corresponding records. |
==User Defined Function== | ==User Defined Function== | ||
User can provide the script of his choice that will be used during transformation. The script must consume data provided to standard input and produce output. Each record is splitted in separated lines while each record field is delimited by tab ‘\t’ character. | User can provide the script of his choice that will be used during transformation. The script must consume data provided to standard input and produce output. Each record is splitted in separated lines while each record field is delimited by tab ‘\t’ character. | ||
+ | <source lang=python> | ||
+ | import sys | ||
+ | import datetime | ||
+ | for line in sys.stdin: | ||
+ | line = line.strip() | ||
+ | userid, movieid, rating, unixtime = line.split('\t') | ||
+ | weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday() | ||
+ | print '\t'.join([userid, movieid, rating, str(weekday)]) | ||
+ | </source> | ||
=Usage= | =Usage= | ||
− | The following | + | The following snippet demonstrate the usage of the adaptor as a library: |
<source lang=java> | <source lang=java> | ||
String query = args[0]; | String query = args[0]; |
Revision as of 16:28, 19 November 2013
Contents
Overview
This adaptor, external to the adaptors offered by the WorkflowEngine, constructs an Execution Plan based on the query defined in HiveQL. This query can vary from a simple transformation job to a complex distributed MapReduce job. The query is parsed and the adaptor then processes the retrieved parsed info to create the Execution Plan. During plan creation the Search Operators are being exploited.
HiveQL
While based on SQL, HiveQL offers extensions, including loading/exporting data from outer sources and create table as select, but only offers basic support for indexes. Also, HiveQL lacks support for transactions and materialized views, and only limited subquery support. Nevertheless, the main outstanding feature resides on ease of writing queries that can be translated into a DAG of jobs that comply with MapReduce programming model.
Query Example
Here follows a simple example of a job expressed in HiveQL, composed by multiple statements. In brief, data stored locally to a given path, are retrieved and distributed by a particular field before transforming each record by the provided script. Finally, output after being merged is directed to the provided local location. All import/export path can also be uri like ftp, jdbc etc.
CREATE TABLE u_data ( userid INT, movieid INT, rating INT, unixtime STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE; LOAD DATA LOCAL INPATH '/home/user/u.data' OVERWRITE INTO TABLE u_data; ADD FILE /home/USER/weekday_mapper.py; CREATE TABLE u_data_new ( userid INT, movieid INT, rating INT, weekday INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'; FROM u_data INSERT OVERWRITE TABLE u_data_new SELECT TRANSFORM(userid, movieid, rating, unixtime) USING 'python weekday_mapper.py' AS userid, movieid, rating, weekday CLUSTER BY movieid; INSERT OVERWRITE LOCAL DIRECTORY '/tmp/u_data_new' SELECT u_data_new.* FROM u_data_new;
Operator Plan
<Operation> <Functionality>DATASINK</Functionality> <Indications> schema-[userid, movieid, rating, weekday], tableName-u_data_new, sink-file:/tmp/u_data_new </Indications> <Children> <Operation> <Functionality>SELECT</Functionality> <Indications> schema-[userid, movieid, rating, weekday], filterMask-[0, 1, 2, 3] </Indications> <Children> <Operation> <Functionality>MERGE</Functionality> <Indications>schema-[_col0, _col1, _col2, _col3]</Indications> <Children> <Operation> <Functionality>SCRIPT</Functionality> <Indications> schema-[_col0, _col1, _col2, _col3], source-/home/user/weekday_mapper.py, scriptCmd-python weekday_mapper.py </Indications> <Children> <Operation> <Functionality>PARTITION</Functionality> <Indications> schema-[userid, movieid, rating, unixtime], clusterBy-[1], order-+ </Indications> <Children> <DataSource> <<nowiki>Source</nowiki>>u_data</<nowiki>Source</nowiki>> <Indications> schema-[userid, movieid, rating, unixtime], input-file:/home/user/u.data, filterMask-[0, 1, 2, 3] </Indications> </DataSource> </Children> </Operation> </Children> </Operation> </Children> </Operation> </Children> </Operation> </Children> </Operation>
Constructed plan can be seen in next figure
Highlights
Parallelization factor
The adaptor will create the ExecutionPlan that will orchestrate the execution of a DAG of jobs as a series of Sequence. Depending on the transformation query, the adaptor may create a single sequential plan to multiple chains. For example, if query specifies “Distribute by” command, with a value equal to integer, the query will be executed in parallelization factor equal to that value. Otherwise, a table field may be specified, thus parallelization will be done according to that field's cardinality. For each unique value of that field a new sequential plan is created to handle corresponding records.
User Defined Function
User can provide the script of his choice that will be used during transformation. The script must consume data provided to standard input and produce output. Each record is splitted in separated lines while each record field is delimited by tab ‘\t’ character.
import sys import datetime for line in sys.stdin: line = line.strip() userid, movieid, rating, unixtime = line.split('\t') weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday() print '\t'.join([userid, movieid, rating, str(weekday)])
Usage
The following snippet demonstrate the usage of the adaptor as a library:
String query = args[0]; WorkflowHiveQLAdaptor adaptor = new WorkflowHiveQLAdaptor(); HiveQLPlanner planner = new HiveQLPlanner(); planner .processLine(query); PlanNode plan = OperatorAnalyzer.optimizePlan(planner.getCreatedPlan()); adaptor.setPlanDesc(plan); adaptor.CreatePlan(); String output = adaptor.ExecutePlan();