Copy Data with Hive and Spark / Copiar Datos con Hive y Spark


These are two examples of how to copy data from one S3 location to other S3 location. Same operation can be done from S3 to HDFS and vice-versa.

I’m considering that you are able to launch the Hive client or spark-shell client.

Hive:

Using Mapreduce engine or Tez engine:

set hive.execution.engine=mr; 

or

set hive.execution.engine=tez; 
CREATE EXTERNAL TABLE source_table(a_col string, b_col string, c_col string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://mybucket/hive/csv/';

CREATE TABLE destination_table(a_col string, b_col string, c_col string) LOCATION 's3://mybucket/output-hive/csv_1/';

INSERT OVERWRITE TABLE destination_table SELECT * FROM source_table;

Spark:

sc.textFile("s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2013-48/segments/1387346051826/warc/").saveAsTextFile("s3://mybucket/spark/bigfiles")

 

If you want to copy data to HDFS, you can also explore s3-dist-cp:

s3DistCP:

s3-dist-cp --src s3://mybucket/hive/csv/ --dest=hdfs:///output-hive/csv_10/

 

Hive: Extracting JSON fields


Handling JSON files with Hive is not always an easy task.

If you need to extract some specific fields from a structured JSON, we have some alternatives:

There are two UDF functions that are usually helpful on this cases: ‘get_json_object’ and ‘json_tuple’. These functions allows you to access json fields from Hive without installing additional libraries.

get_json_object:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-get_json_object

json_tuple:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-json_tuple

To navigate JSON structure with get_json_object, the entire JSON file has to be mapped as string.

As example:

1) Create the external table as string:

CREATE EXTERNAL TABLE json_table (str string) LOCATION 's3://mybucket/input/jsonserde' ;

2) select field[0] from store.fruit field:

select get_json_object(json_table.str, '$.store.fruit\[0]') as MyField from json_table;

Hive: dealing with Out of Memory and Garbage Collector errors.


hive_logo

This is the common error:

java.lang.OutOfMemoryError: GC overhead limit exceeded

This error will occur in several Java environments, but, in particular, with Hive, is pretty common when big structures or several thousands objects are stored in memory.

According to Sun, the error will raise if too much time is being spent in garbage collection:

If more than 98% of the total time is spent in garbage collection and less than 2% of the heap is recovered, an OutOfMemoryError will be thrown.

This feature is designed to prevent applications from running for an extended period of time while making little or no progress because the heap is too small.

If necessary, this feature can be disabled by adding the option -XX:-UseGCOverheadLimit.

Also, we can increase the Heap Size, via “-Xmx1024m” option.

Another interesting option is the Concurrent Collector “UseConcMarkSweepGC“: It performs most of its work concurrently (i.e., while the application is still running) to keep garbage collection pauses short.

It is designed for applications with medium to large-sized data sets for which response time is more important than overall throughput, since the techniques used to minimize pauses can reduce application performance.

In Hive, we can set thes parameters with a  command like this:

SET mapred.child.java.opts="-server -Xmx1g -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit";

 

An alternative to avoid storing all the structure in memory:

Write intermediate results to a temporary table in the database instead of hashmaps, a database table table is not memory bound, so use an indexed table is a solution in many cases.

When the intermediate table is complete, execute a sql statement(s) from it instead of from memory.

Hive logs to stdout


Muchas veces necesitamos debugear alguna consulta Hive que esta dando error. Una manera facil es habilitar el logger por consola:

hive.root.logger specifies the logging level as well as the log destination. Specifying console as the target sends the logs to the standard error (instead of the log file).

$HIVE_HOME/bin/hive -hiveconf hive.root.logger=INFO,console

Mas informacion:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli

 

Hive query with JOIN, GROUP BY and SUM does not return results


On Hive 0.11, and lower versions, if we set:

set hive.optimize.skewjoin=true; 
set hive.auto.convert.join=false;

A query with JOIN, GROUP BY and SUM does not return results.

But if we make the query a little more simple, using JOIN but not GROUP and SUM functions, We will GET RESULTS.

I have found that there are bugs reported recently:

https://issues.apache.org/jira/browse/HIVE-5888

and

https://issues.apache.org/jira/browse/HIVE-6041

This bug is related also with the previous one (https://issues.apache.org/jira/browse/HIVE-6520), already reported:

If we set:

set hive.optimize.skewjoin=true; 
set hive.auto.convert.join=true;

We will no have any output.

The reason is that the skew join in hive relies on a reduce phase to save skewed keys on local disk, but hive.auto.convert.join=true turns a mapreduce task into a mapjoin task in some scenarios.

As a result, there is no skewed keys generated by the mapjoin and the result is empty.

If you set hive.auto.convert.join=false to disable the auto conversion of a mapjoin, the performance is very bad because the reduce phase takes a very long time to process the skew keys.

 

This is expected to be resolved on version hive-0.13.0.

check system variables or environment variables on Hive


On Hive we can check values for system variables or environment variables with the command:

hive> set;

if we need to ask for a specific variable value, we can run:

hive> set hive.security.authorization.enabled; 
hive.security.authorization.enabled=false

More information:

https://cwiki.apache.org/confluence/display/Hive/AdminManual+Configuration