In the big-data ecosystem, it is often necessary to move the data from Hadoop file system to external storage containers like S3 or to the data warehouse for further analytics. In this article, I will quickly show you what are the necessary steps that need to be taken while moving the data from HDFS to S3 with some tips and gotchas. In a later article, I will write about moving the same data from S3 to Redshift which is mostly straightforward as long as we have the data prepped up correctly for the date warehouse injection.
HDFS Source Directory
hdfs://hadoopcluster.com:9000/data/hive/warehouse/testdb.db/
mapping_analytics_data
HDFS Source Table (optional)
testdb.mapping_analytics_data
HDFS (State directory)
hdfs://hadoopcluster.com:9000/data/test/mapping_analytics_historical.db
S3 Bucket Location
s3://hdfs_bucket/mapping-data/
Step 1: Data preparation in HDFS
Data preparation at the source is required so as to make sure there that there are no issues loading the data eventually into Redshift tables. This step is not crucial if you have plans to station this data only in the S3 storage with no goals of copying it to a data warehouse. The reason being is that the Redshift (or any RDBMS tables in that respect) can be very picky about the format of the data, so this script should get the data into a state that Redshift (or any RDBMS) is happy with. Also once the data is in storage container it is almost always an uphill battle to make any changes (esp the one that relates to the schema) at that time than when the data is still on HDFS. This is also the time when you architect and design your data warehouse tables that are ready for data injection.
Most of the issues that I faced during the S3 to Redshift load are related to having the null values and sometimes with the data type mismatch due to a special character. To transform the data I have created a new directory in HDFS and used the INSERT OVERWRITE DIRECTORY
script in Hive to copy data from existing location (or table) to the new location. If you rather need the data moved to a Hive table instead of a directory you can either useINSERT OVERWRITE TABLE
or just create an external table over the new data directory. See, Writing data into the filesystem from queries
Here are some of the configurations that I have used to make the process easier.
- Used Spark on Hive to utilize Apache Spark as the Hive’s execution engine for faster execution. You must have Spark installed on your cluster to make this work but you do not have to use it. More info here
- Utilized Gzip compression to help with faster network copy and saves space in S3 bucket
- NULL values are replaced with blank strings or other literals by using nvl function
- Removed Hypens in the date column using
regexp_replace
function.
-- ## Transformation and Insert Script within HDFS ## --
-- enable compression and set engine to use spark execution
--
--
set hive.execution.engine=spark;
set mapred.reduce.tasks=1;
set mapred.output.compress=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec;
set hive.exec.orc.default.compress = gzip
set hive.msck.path.validation=ignore;
MSCK REPAIR TABLE unid_mapping_analytics_pyspark;
-- write to directory
INSERT OVERWRITE DIRECTORY "hdfs://hadoopcluster.com:9000/data/TEST
/mapping_analytics_historical.db/dt=${hiveconf:DATE_PARTITION}"
ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\t"
STORED AS TEXTFILE
--
--
SELECT
NVL(regexp_replace(date,'-',''), ""),
NVL(source, ""),
NVL(dimension,""),
CAST(NVL(value, "0") as bigint)
FROM
testdb.mapping_analytics_data
WHERE
rec_date = "${hiveconf:DATE_PARTITION}"
--
--
Here is the Hive query to invoke the above script using the command line. Note the -f option where you will provide the above insert script and I have used -hiveconf to pass the date parameters. Have to run this from the hdfs cluster which can access the old and new hdfs location. See, Hive Batch Mode Commands
/usr/bin/hive -hiveconf "DATE_PARTITION=2017-11-02" \
-f $HIVE_SCRIPTS/stage_HDFS_Insert.sql 2&1 \
tee ${LOG_FILE_PREFIX}-stage_hdfstoS3.log
Step 2: HDFS to S3 Migration
Finally, we will move the cleansed data to S3 using the DistCp command, which is often used in data movement workflows in Hadoop ecosystem. It provides a distributed copy capability built on top of a MapReduce framework. The below code shows copying data from HDFS location to the S3 bucket.
##
/opt/hadoop/bin/hadoop distcp hdfs://hadoopcluster.com:9000/data \
/TEST/mapping_analytics_historical.db/dt=2017-11-02/* \
s3a://$AWS_ACCESS_KEY:$AWS_SECRET_KEY@hdfs_bucket/mapping-data \
/dt=2017-11-02\
$LOG_DIR/mapping-log-$DATE_PARTITION.log 2&1
##
Note: S3DistCp is an extension to DistCp that is optimized to work with S3 and that adds several useful features in addition to moving data between HDFS and S3.

From the above snippet note that I have multiple files in the S3 container. Although it is not a requirement it is usually a best practice to have multiple files in distributed systems. In my case, the Spark execution engine automatically splits the output into multiple files due to Spark’s distributed way of computation.
If you use hive (mapreduce only) and want to move the data to Redshift it is a best practice to split the files before loading to Redshift tables as the COPY command to Redshift loads data in parallel from multiple files using the massively parallel processing (MPP) architecture. If you loading data from a single large file, Amazon Redshift is forced to perform a serialized load, which is much slower. See more on this, Loading data from Amazon S3
Read Full Post »