Category Archives: Hadoop

Hortonworks Hadoop tuning

Tez

tez.task.resource.memory.mb
tez.am.resource.memory.mb

MapReduce2

MR Map Java Heap Size
MR Reduce Java Heap Size
MR AppMaster Java Heap Size

Yarn

yarn.scheduler.capacity.maximum-am-resource-percent=80 (this is a MUST)

  • Memory:
    • Node
  • Container:
    • Minimum container size
    • Maxmum containcer size

Hive

  • Tez:
    • Tez Container Size
    • Hold containers to reduce latency = true
    • Number of containers held = 10
    • Memory (For Map Join)
  • hive-site:
    • set hive.execution.engine=tez;
      set hive.vectorized.execution.reduce.enabled = true;
      set hive.vectorized.execution.enabled = true;
      set hive.cbo.enable=true;
      set hive.compute.query.using.stats=true;
      set hive.stats.fetch.column.stats=true;
      set hive.stats.fetch.partition.stats=true;

Yarn

  • Memory:
    • Node
  • Container:
    • Minimum container size
    • Maxmum containcer size

Sqoop (Use ORC to improve performance )

# import
mysql -h $myhost -u $myuser -p$mypass $mydb -e 'show tables' | awk -v myuser="$myuser" -v mypass="$mypass" -v mydb="$mydb" -v myhost="$myhost" '{ print "sqoop import --connect jdbc:mysql://"myhost"/"mydb" --username "myuser" --password "mypass" -m 1 --table "$1" --hcatalog-database "mydb" --hcatalog-table "$1" --create-hcatalog-table --hcatalog-storage-stanza \"stored as orcfile\""}' | bash

 

How use Sqoop to import and append to Hive

How use Sqoop to import and append to Hive

# Import from mysql to hive

sqoop import --connect jdbc:mysql://<HOST>/<DB> \
  --username <MYUSER> \
  --password <MYPASS> \
  --table <MYTABLE> \
  --hive-import --hive-table <DBMAMEONHIVE>.<TABLE> \
  --fields-terminated-by ','

# Change the hive table to external (will be stored on HDFS)

alter table <TABLE NAME> SET TBLPROPERTIES('EXTERNAL'='TRUE')

# Verify were the table are stored on HDFS looking for the Location field.

DESCRIBE FORMATTED <TABLE NAME>

# Than now you can import and append with sqoop direct to the hdfs file which will reflect direct on the external table.

sqoop import --connect jdbc:mysql://<HOST>/ \
  --username <MYUSER> \
  --password <MYPASS> \
  --table <MYTABLE> \
  --target-dir '<HDFS_LOCATION_OUTPUT>' --incremental append --check-column '<PRIMARY_KEY_COLUMN>' --last-value <LAST_VALUE_IMPORTED>

How to debug Yarn / Hive Jobs

How to debug Yarn / Hive Jobs

The easy way is access the Hadoop Resource Manager UI, normally is running at port 8088, for example:

http://yourambari:8088/cluster/apps
ResrouceManagerUI
Click on the link to application details and you will see a very detailed information about your Job.

Useful commands

To list running jobs use:

yarn application -list

                Application-Id	    Application-Name	    Application-Type	      User	     Queue	             State	       Final-State	       Progress	                       Tracking-URL
application_1480129450160_0003	HIVE-5cb06afa-2102-418c-8716-4726c1d13f35	                 TEZ	   testq	   default	          ACCEPTED	         UNDEFINED	             0%	                                N/A
application_1480129450160_0002	HIVE-c4a9098b-530e-4737-9c29-d3f9cc8e45ba	                 TEZ	   testq	   default	           RUNNING	         UNDEFINED	             0%	    http://testserver:32917/ui/

To debug a job, after run the application -list command you could check the “Tracking-URL” example on the example output the Application-ID application_1480129450160_0002 url is http://testserver:32917/ui/

To kill a job use:

yarn application -kill Application-Id

HDFS Snapshots

HDFS Snapshots

HDFS Snapshots are read-only point-in-time copies of the file system. Snapshots can be taken on a subtree of the file system or the entire file system and are:

  • Performant and Reliable: Snapshot creation is atomic and instantaneous, no matter the size or depth of the directory subtree
  • Scalable: Snapshots do not create extra copies of blocks on the file system. Snapshots are highly optimized in memory and stored along with the NameNode’s file system namespace

Examples

Enable snapshots

$ hdfs dfsadmin -allowSnapshot /data/myfolder

Create a snapshot

$ hdfs dfs -createSnapshot /data/myfolder

This will create a snapshot and give it a default name which matches the timestamp, the folder will be something similar to this:

/data/myfolder/.snapshot/s20150803-000922.092

Recovering from data lost

Now imagine some guy deleted a folder, for example: /data/myfolder/test

And we need to recover this folder, this folder will be at the snapshot dir and to recover it we just need to copy this folder from snapshot to the original folder.
Locate the specific dir at the snapshot:

hdfs dfs -ls /data/myfolder/.snapshot/s20130903-000941.091/test

Restore to the specific dir:

hdfs dfs -cp /data/myfolder/.snapshot/s20130903-000941.091/test /data/myfolder/

More details:
http://hortonworks.com/products/hortonworks-sandbox/#tutorial_gallery
http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.3.2/bk_user-guide/content/user-guide-hdfs-snapshots.html

Kafka Basics – Topics, Producers and Consumers

Kafka Basics

Workdir on HortonWorks

cd /usr/hdp/2.3.4.0-3485/kafka/bin

Create new topic “TEST”:

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --topic test

List topics:

cd /usr/hdp/2.3.4.0-3485/kafka/bin
./kafka-topics.sh --create --zookeeper localhost:2181

Details about a specific topic:

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

Send some messages

Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default each line will be sent as a separate message.

Run the producer and then type a few messages into the console to send to the server.

./kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message

Start a consumer

Kafka also has a command line consumer that will dump out messages to standard output.

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

More info: http://kafka.apache.org/081/quickstart.html

Install Hue on Hortonworks – Centos 6.7

 

https://developer.ibm.com/hadoop/blog/2015/10/27/how-to-install-hue-3-9-on-top-of-biginsights-4-1/

Hadoop – Hive / Impala

Hive / Impala

Apache Hive is a data warehouse infrastructure built on top of Hadoop for providing data summarization, query, and analysis.
Cloudera Impala is a query engine that runs on Apache Hadoop.

Videos

Examples

Queries

# Normal
SELECT name, age FROM mytable LIMIT 10;

# Regex Querie 
SELECT '(ds|hr)?+.+' FROM sales;

# Subqueries
SELECT total FROM 
  (SELECT c1 + c2 AS total FROM mytable) my_query;

# Join
join

Create table

CREATE TABLE mytable (name string, age int)
  ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ';'
  STORED AS TEXTFILE;

Load Data

LOAD DATA LOCAL INPATH 'path/mydata/data.txt'
INTO TABLE mytable;

Load from an existing table

INSERT INTO birthdays SELECT firstname, lastname, birthday FROM customers WHERE birthday IS NOT NULL;

Apache access log to hive

First copy an apache access log to hdfs.
sudo -u hdfs hadoop fs -mkdir /user/hive/warehouse/original_access_logs
sudo -u hdfs hadoop fs -copyFromLocal /opt/examples/log_files/access.log.2 /user/hive/warehouse/original_access_logs
After the copy ends, create a table using hive with that hdfs file as source:
CREATE EXTERNAL TABLE intermediate_access_logs (
    ip STRING,
    date STRING,
    method STRING,
    url STRING,
    http_version STRING,
    code1 STRING,
    code2 STRING,
    dash STRING,
    user_agent STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
    'input.regex' = '([^ ]*) - - \\[([^\\]]*)\\] "([^\ ]*) ([^\ ]*) ([^\ ]*)" (\\d*) (\\d*) "([^"]*)" "([^"]*)"',
    'output.format.string' = "%1$$s %2$$s %3$$s %4$$s %5$$s %6$$s %7$$s %8$$s %9$$s")
LOCATION '/user/hive/warehouse/original_access_logs';
 
CREATE EXTERNAL TABLE tokenized_access_logs (
    ip STRING,
    date STRING,
    method STRING,
    url STRING,
    http_version STRING,
    code1 STRING,
    code2 STRING,
    dash STRING,
    user_agent STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/hive/warehouse/tokenized_access_logs';
 
ADD JAR /usr/lib/hive/lib/hive-contrib.jar;
 
INSERT OVERWRITE TABLE tokenized_access_logs SELECT * FROM intermediate_access_logs;
Now you can work with impala to query this data.
invalidate metadata; -- to reload and read this new table.
select count(*),url from tokenized_access_logs
where url like '%\/product\/%'
group by url order by count(*) desc;

Hadoop – HCatalog

HCatalog presents a relational view of data. Data is stored in tables and these tables can be placed into databases. Tables can also be partitioned on one or more keys. For a given value of a key (or set of keys) there will be one partition that contains all rows with that value (or set of values).

hcatalog_ex

 

Video

Command-line

hcat.py -g mygrpoup ...
# Indicates to HCatalog that table that needs to be created must have group as "mygroup"

hcat.py -p rwxr-xr-x ...
# Indicates to HCatalog to set permissions as "rwxr-xr-x"

hcat.py -f myscript.hcatalog ...
# Indicates to HCatalog that myscript.hcatalog is a file which contains DDL commands it needs to execute

hcat.py -e "create table mytrable (a int) ...
# Indicates to HCatalog to treat the following string as DDL command and execute it

Hadoop – Secondary namenode

​Namenodes are where the block’s metadata and directions are stored, and, that is done using 2 files, fsimage and edits. The fsimage is the first complete snapshot of the filesystem metadata, whereas edits contains only incremental modifications.

But, over time, the edits file grows and grows (its a log-based system) and it would take a long time to restore in the event of server failure. The edits file needs to be periodically applied to the fsimage. The problem is that the namenode may not have the avaliable resources (RAM/CPU) to do on-hot. This is where the secondary namenode comes in.
* The secondary namenode asks to the namenode to roll its edits file and begin writing to edits.new.
* The secundary namenode copies the namenode’s fsimage and edits to its local checkpoint directory.
* The secundary namenode “merge” fsimage and edits and write a new compacted fsimage to disk.
* The secundary nm sends the new fsimage file to the namenote, which adopts it (Renaming the edits.new to edits.).

Hadoop

HDFS, YARN and MAPREDUCE

  • HDFS is a highly-available, distributed file system.
    • A NameNode manages the file system.
    • A DataNode provides storage resources to the file system.
  • YARN is the data operating system for Hadoop that supports different types of distributed applications.
    • A ResourceManager manages cluster CPU and memory resources and features a pluggable job scheduler.
    • A NodeManager provides CPU and memory resources to the cluster.
    • An ApplicationMaster starts, monitors, restarts, and stops applications.
  • MapReduce is a framework, and not a program, used to process data in parallel across many machines.
    • It works in cooperation with HDFS to achieve scalability and performance.
  • MapReduce is so named because there is a map phase and a reduce phase.
    • The main purpose of the map phase is to read all of the input data and transform or filter it.
    • The main purpose of the reduce phase is to employ business logic to answer a question or solve a problem.
  • Use cases for MapReduce include data analytics, data mining, and full-text indexing.

HDP2.1HDFSYARNHighlighted

Videos

https://www.youtube.com/playlist?list=PL2y_WpKCCNQeLC4reyP-RaBqfH5QML000

Comand-Line Tools (hdfs)

Hadoop come with a command-line tool called hadoop.

The usage of it’s like unix commands, example:

# Create dir
hadoop fs -mkdir /user/hive/warehouse/original_access_logs

# List
hadoop fs -ls /user/hive/warehouse/original_access_logs

# Set the replication factor of a file
hadoop fs -setrep 5 -R /user/hive/warehouse

# Inspect and show file health
hadoop fsck /user/hive/warehouse -files -blocks -locations

* You can also access HDFS via REST Using WebHDFS.

Sqoop

Sqoop is a tool, which can be used to import our relational data from MySQL into HDFS.
Usage example:
sqoop import-all-tables \
-m 1 \
--connect jdbc:mysql://quickstart:3306/retail_db \
--username=retail_dba \
--password=cloudera \
--compression-codec=snappy \
--as-parquetfile \
--warehouse-dir=/user/hive/warehouse \
--hive-import