hadoop

HADOOP安装配置

1
2
3
4
5
6
7
8
9
~ tar -zxvf hadoop-x.y.z.tar.gz

~ vim ~/.zshrc
export HADOOP_HOME=/Users/xxx/usr/hadoop
export HADOOP_CONF_DIR=/Users/xxx/workspace/service/hadoop/conf
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

~ hadoop version
Hadoop 3.2.1

伪分布模式

http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html

  1. 需要开启ssh,macos上:系统偏好设置 > 共享 > 远程登录
1
2
3
~ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
~ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
~ chmod 0600 ~/.ssh/authorized_keys
  1. 将etc/hadoop/目录下的配置文件copy到一个专门的地方,将配置和安装文件隔离
    上面的$HADOOP_CONF_DIR

  2. 修改配置

etc/hadoop/core-site.xml:

1
2
3
4
5
6
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

etc/hadoop/hdfs-site.xml:

1
2
3
4
5
6
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

etc/hadoop/mapred-site.xml:

1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.application.classpath</name>
<value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
</property>
</configuration>

etc/hadoop/yarn-site.xml:

1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>
  1. 格式化文件系统
1
~ bin/hdfs namenode -format
  1. 启动
1
2
3
4
5
~ sbin/start-dfs.sh
NameNode - http://localhost:9870/

~ sbin/start-yarn.sh
ResourceManager - http://localhost:8088/

HDFS

数据流

一致模型(文件读写的数据可见性)

  • 新建一个文件,它能在文件系统中立即可见
  • 写入文件不能保证立即可见,即使调用了flush刷新并存储
  • 当写入的数据超过一个块后,第一个数据块对新的reader才可见,总之,当前正在写入的块对其他reader不可见
  • hflush,可以将所有缓存强制刷新到datanode,hflush返回成功后,新的reader都能读到目前为止写入的数据,但是这个实际上也仅确保数据在datanode的内存中,所以如果断电会造成数据丢失
  • hsync,类似于fsync,确保数据写入到磁盘

distcp——并行复制

MapReduce作业运行机制

作业的提交

HIVE

HIVE安装配置

  • 包解压和环境变量配置

http://hive.apache.org/downloads.html

20191006:Hive 3.1.2版本下载下来有点问题,guava的jar包版本有问题(19.0),需要手动替换成28.1-jre

1
2
export HIVE_HOME=/Users/xxx/usr/apache-hive
export PATH=$HIVE_HOME/bin:$PATH
  • mysql启动,没有mysql则需要安装,配置用户&数据库
  • 配置apache-hive/conf/hive-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://127.0.0.1:3306/hive?createDatabaseIfNotExist=true&amp;useUnicode=true&amp;characterEncoding=UTF-8&amp;useSSL=false</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>datanucleus.readOnlyDatastore</name>
<value>false</value>
</property>
<property>
<name>datanucleus.fixedDatastore</name>
<value>false</value>
</property>
<property>
<name>datanucleus.autoCreateSchema</name>
<value>true</value>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
<property>
<name>datanucleus.autoCreateTables</name>
<value>true</value>
</property>
<property>
<name>datanucleus.autoCreateColumns</name>
<value>true</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>true</value>
</property>
<!-- 显示表的列名 -->
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<!-- 显示数据库名称 -->
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
</configuration>
  • 格式化数据库
1
apache-hive/bin/schematool -dbType mysql -initSchema
  • 连接hive

  • 简单命令
1
2
% hive -f scripts.q
% hive -e "select * from dummy"

HIVE示例

  • 建表
    字段之间的分隔符是制表符
1
create table records (year string,temperature int ,quality int) row format delimited fields terminated by '\t';
  • 在hdfs创建目录
1
hadoop fs -mkdir -p /user/hive/warehouse/records
  • 加载数据
1
hive> load data local inpath '/Users/yuzhiyuan/workspace/code/test/hive/sample.txt' overwrite into table records;

HiveQL

  • 内置函数
1
2
show functions;
describe function length;
  • 类型转换
    • 任何数值类型都可以隐式转换成一个范围更广的类型或者文本类型(string/char/varchar)
    • 所有文本类型可以隐式的转换成另一种文本类型
    • 文本类型可以隐式的转换成double/decimal
    • boolean类型不能转换
    • timestamp和date可以隐式转换为文本类型

托管表和外部表

托管表的删除,会把元数据和数据一起删掉

1
2
3
4
create table managed_table (dummy string);
load data inpath '/user/tom/sample.txt' into table managed_table;
# 删除表
drop table managed_table;

外部表的删除,只会删除元数据

1
2
3
4
create external table managed_table (dummy string);
load data inpath '/user/tom/sample.txt' into table managed_table;
# 删除表
drop table managed_table;

分区和桶

分区相当于表目录下的子目录,在partitioned by中定义的是分区列,数据列中并不包含,因为它们源于目录名

1
2
3
4
create table logs (ts bigint, line string) partitioned by (dt string, country string);
# 加载数据时,要显式指定分区值
load data inpath '/user/tom/sample.txt' into table logs
partition (dt='2011-02-23', country='ZH');
  • 查看表所具有的分区
1
hive> show partitions logs;

1
2
create table bucketed_users (id bigint, name string) 
clustered by (id) sorted by (id asc) into 4 buckets;

导入数据

1
2
insert overwrite table target
select col1,col2 from source
  • 多表插入

    遍历一次源表,产生多份数据,提高效率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
FROM records2

insert overwrite table stations_by_year
select year, count(distinct station)
group by year

insert overwrite table records_by_year partition(ptdate,ptchannel)
select year, count(1)
group by year

insert overwrite table good_records_by_year
select year, count(1)
where temperature!=9999
group by year

查询数据

排序和聚集

  • order by
    全排序
  • sort by
    hive的非标准扩展,为每个reducer排序
  • distribute by
    控制特定行到哪个reducer
1
select year,temperature from record distribute by year sort by year asc, temperature desc;

distribute bysort by中所用的列相同时,可以简写为cluster by

MapReduce脚本

  • transform 脚本,从stdin获取输入,并以tab分隔打印到标准输出

    1
    2
    3
    4
    5
    6
    7
    8
    import sys
    import datetime

    for line in sys.stdin:
    line = line.strip()
    userid, movieid, rating, unixtime = line.split('\t')
    weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
    print '\t'.join([userid, movieid, rating, str(weekday)])
  • HiveQL,transform (*) using '*.py' as (*)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    CREATE TABLE u_data_new (
    userid INT,
    movieid INT,
    rating INT,
    weekday INT)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\t';

    add FILE weekday_mapper.py;

    INSERT OVERWRITE TABLE u_data_new
    SELECT
    TRANSFORM (userid, movieid, rating, unixtime)
    USING 'python weekday_mapper.py'
    AS (userid, movieid, rating, weekday)
    FROM u_data;

    SELECT weekday, COUNT(*)
    FROM u_data_new
    GROUP BY weekday;

连接

join & outer join & left join & right join

1
hive> INSERT OVERWRITE TABLE events SELECT t1.bar, t1.foo, t2.foo FROM pokes t1 JOIN invites t2 ON (t1.bar = t2.bar);

HIVE UDF with python

https://florianwilhelm.info/2016/10/python_udf_in_hive/