前言:用最简单最少的语言,分享我的坑,理不理解需求不重要,问题都在shell代码中,看shell极度枯燥,希望能有帮助!
也可以看看,shell中hive,mysql等是如何处理较大数据的- -!

一. 起因

需求,分析hive表中两年内用户XX的所有数据,按照每天早,中,晚三个时间段统计,按照两年内的节假日统计,按照月份的上旬,中旬,下旬统计,按照周末,工作日统计等等。
假设现数据形式,手机号代表每一个用户,用户不同时间发送的短信数量作为统计目的!
最后,按照类似

1
mobile , am_count , noon_count , pm_count , springday_count , nationalday_count,weekend_count,weekday_count

形式统计为一张表!
说的太抽象,但是,你可以了解的有:

  1. 由于hive表中存在脏数据,需要将数据导出,清洗。
  2. hive to mysql的dump方法,行不通。(如果可以麻烦告知方法,这里字段中含特殊字符导致无法正确导入)
  3. 对sql使用不深,导致错误思路,导致踩坑。

二. 解决方案

方案一

按照需求,将每一个字段对应一条sql的方式求出mobile , count的值,然后将这些字段统计起来(利用mysql的唯一键unique indexduplicate on update方式)。

具体步骤:

  1. hive脚本导出每一列数据
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    #!/usr/bin/env bash
    echo '-----------开始从hive查数----------------'
    HIVE_SETTING="
    SET mapred.child.java.opts=-Xmx8192m;
    SET mapreduce.reduce.memory.mb=8192;
    SET mapreduce.reduce.java.opts='-Xmx8192M';
    SET mapreduce.map.memory.mb=8192;
    SET mapreduce.map.java.opts='-Xmx8192M';
    SET mapred.child.map.java.opts='-Xmx8192M';
    SET mapred.job.priority=HIGH;
    SET mapred.map.tasks.speculative.execution=false;
    SET mapred.reduce.tasks.speculative.execution=false;
    set hive.exec.dynamic.partition.mode=nonstrict;
    set hive.exec.dynamic.partition=true;
    SET hive.exec.max.dynamic.partitions=100000;
    SET hive.exec.max.dynamic.partitions.pernode=100000;
    USE xxxdb;
    set mapred.job.queue.name=xxx;
    set hive.exec.compress.output=true;
    set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
    set hive.exec.parallel=true;
    set mapred.job.name = ${0}_xxx;
    "
    #工作日数据查询
    HIVE_SQL="
    select mobile,count(mobile) from xxx_table where pmod(datediff(optime, '2012-01-01'), 7) in (1,2,3,4,5) group by mobile;
    "
    #1. 将hive执行结果赋值给变量
    DATA=$(hive -e "
    ${HIVE_SETTING}
    ${HIVE_SQL};
    ")
    #2. 将hive结果输出到文件中
    hive -e "
    ${HIVE_SETTING}
    ${HIVE_SQL};
    " >/home/q/hive_data.txt
    echo '-----------结束从hive查数----------------'

查询出来数据22亿, 约占45G磁盘空间.

  1. mysql脚本导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/usr/bin/env bash
echo "进程Pid: $$"
#将文本文件里面(mobile,count)字段插入到mysql中
insertIntoMysql(){
#获取参数
path=${1}
col=${2}
TIMESTAMP=$(date +%Y%m%d%H%M%S)
echo "path:${path},column:${col},time:${TIMESTAMP}"
#遍历文件每一行
cat ${path} | while read line
do
#获取每一行中的每一列
mobile=$(echo -e "${line}" | cut -f 1)
count=$(echo -e "${line}" | cut -f 2)
#写入myusql
cmd="INSERT INTO db.xxx_table (mobile,${col}) VALUES ('${mobile}',${count}) ON DUPLICATE KEY UPDATE ${col}=${count};"
eval $(mysql -uroot -pxxx --default-character-set=utf8 -e "${cmd}")
echo "mobile:${mobile},count:${count}"
done
TIMESTAMP=$(date +%Y%m%d%H%M%S)
echo "end time:${TIMESTAMP}"
}
#保存每个字段(mobile,count)的文件目录
path="/home/q/part1"
eval cd ${path}
line=$(find ${path} -type f)
for s in ${line[@]}
do
#截取文件名,即mysql table中对应的列名!
col=$(echo ${s} |cut -d "/" -f5)
insertIntoMysql ${s} ${col}
done
exit;

到此,似乎是完了,多开几个脚本一起往mysql中导数就行了. 但是,这只是开始!

问题

  1. 为什么不用mysql的批量导入?
  2. 一行一行的插入22亿数据,要插入多久?
    答: 批量导入的原子操作整行数据 , 无法做到聚合列! 22亿数据多个脚本,24小时插入量在2000W左右!

    改进1: ok单表插入太慢,我分表插入会快一些吧! 改进脚本!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    #!/usr/bin/env bash
    #多表插入,根据mobile确定表名
    # (x_financial,xxx_financial11,xxx_financial0,...,xxx_financial9)
    getTableName(){
    mobile=${1}
    table="xxx_financial"
    if [ ${mobile} -a -n ${mobile} ]
    then
    prefix=$(echo "${mobile:0:5}")
    #861开头的手机号太多,所以又分十张表
    if [ ${prefix:0:3} == "861" ]; then
    model=`expr ${prefix} % 10`
    table=${table}"${model}"
    fi
    #11位手机号的分一张xxx_financial11
    if [ ${#mobile} == 11 -a ${mobile:0:1} == "1" ]; then
    table=${table}"11"
    fi
    echo "${table}"
    else
    #国际,其他的分一张xxx_financial
    echo "${table}"
    fi
    }
    #将文本文件里面(mobile,count)字段插入到mysql中
    insertIntoMysql(){
    path=${1}
    col=${2}
    echo "path:${path},column:${col}"
    cat ${path} | while read line
    do
    mobile=$(echo -e "${line}" | cut -f 1)
    count=$(echo -e "${line}" | cut -f 2)
    table=`getTableName ${mobile}`
    cmd="INSERT INTO db_name.${table}(mobile,${col}) VALUES ('${mobile}',${count}) ON DUPLICATE KEY UPDATE ${col}=${count};"
    eval $(mysql -h127.0.0.1 -P3306 -uroot -p'xxx' --default-character-set=utf8 -e "${cmd}")
    echo "table:${table},mobile:${mobile},count:${count}"
    done
    }
    path="/home/q/data_hive/hive1"
    eval cd ${path}
    line=$(find ${path} -type f)
    for s in ${line[@]}
    do
    col=$(echo ${s} |cut -d "/" -f6)
    insertIntoMysql ${s} ${col}
    done
    exit;

问题

  1. 的确横向分表后插入数据的确快很多,但是会出现数据集中同时插入同一张表的情况,依旧不能容忍!

    改进2: ok一条一条的插入不可以,我批量插入!

    但是,上面横向分表逻辑不能使用了!因为每一个手机号对应的表不一样,sql语句拼接很困难!既然,横切表不行,为了简单,我选择纵切表(将表的列切开mobile, count1,mobile,count2的形式).
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    #!/usr/bin/env bash
    echo "进程Pid: $$"
    #将文本文件里面(mobile,count)字段插入到mysql中
    insertIntoMysql(){
    path=${1}
    col=${2}
    TIMESTAMP=$(date +%Y%m%d%H%M%S)
    echo "path:${path},column:${col},time:${TIMESTAMP}"
    str1="INSERT INTO db_name.xxx_financial99(mobile,${col}) VALUES "
    str2=" ON DUPLICATE KEY UPDATE ${col}=VALUES(${col});"
    n=0
    cat ${path} | while read line
    do
    mobile=$(echo -e "${line}" | cut -f 1)
    count=$(echo -e "${line}" | cut -f 2)
    let n++
    if [ `expr ${n} % 5000` == 0 ];
    then
    cmd=${cmd}"('${mobile}',${count})"
    cmd=${str1}${cmd}${str2}
    #echo ${cmd}
    eval $(mysql -h127.0.0.1 -P3306 -uroot -p'xxx' --default-character-set=utf8 -e "${cmd}")
    cmd=" "
    else
    cmd=${cmd}"('${mobile}',${count}),"
    fi
    #echo "mobile:${mobile},count:${count}"
    done
    TIMESTAMP=$(date +%Y%m%d%H%M%S)
    echo "end ${col} time:${TIMESTAMP}"
    }
    path="/home/q/xxx/xxx"
    eval cd ${path}
    line=$(find ${path} -type f)
    for s in ${line[@]}
    do
    col=$(echo ${s} |cut -d "/" -f6)
    insertIntoMysql ${s} ${col}
    done
    exit;

其中, 一次批量插入5000条, 考虑到shell中会限制参数的长度(报错: /usr/bin/mysql: Argument list too long)!
还有mysql提交sql长度默认为4M,我们可以通过show VARIABLES like '%max_allowed_packet%'; set global max_allowed_packet=33554432;查看和修改!


方案二

上面纵切,批量插入虽然基本满足需求,但是会存在两个问题,1. 如果mysql开启了bin-log很可能会导致磁盘报警! 2. 批量插入可能会出现死锁(期间出现过一次,调整批插文件顺序(减少在同一列上操作的机会))!
其实,整个问题一个hive-sql可以搞定将多列进行聚合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env bash
echo '-----------开始从hive查数----------------'
TIMESTAMP=$(date +%Y%m%d%H%M%S)
echo "PID: $$,start time:${TIMESTAMP}"
HIVE_SETTING="
xxx
"
HIVE_SQL="
select a.mobile,
if(b1.midnight_msg_no_receive_count>0, b1.midnight_msg_no_receive_count,0) as midnight_msg_no_receive_count,
if(b2.am_msg_no_receive_count>0, b2.am_msg_no_receive_count,0) as am_msg_no_receive_count,
if(b3.noon_msg_no_receive_count>0, b3.noon_msg_no_receive_count,0) as noon_msg_no_receive_count,
...
if(e8.last_one_year_normal_msg_no_receive_count>0,e8.last_one_year_normal_msg_no_receive_count,0) as last_one_year_normal_msg_no_receive_count
from
(select mobile from hive_table where delivrd='UNDELIVRD' group by mobile) a left outer join
(select mobile,count(mobile) as midnight_msg_no_receive_count from hive_table where delivrd='UNDELIVRD' and hour(optime) in (0,1,2,3,4,5,23) group by mobile) b1 on a.mobile=b1.mobile left outer join
(select mobile,count(mobile) as am_msg_no_receive_count from hive_table where delivrd='UNDELIVRD' and hour(optime) in (6,7,8,9,10) group by mobile) b2 on a.mobile=b2.mobile left outer join
...
(select mobile,count(mobile) as last_one_year_normal_msg_no_receive_count from hive_table where delivrd='UNDELIVRD' and ivr=0 and to_date(optime)>='2016-04-01' and to_date(optime)<='2017-03-31' group by mobile) e8 on a.mobile=e8.mobile;
"
hive -e "
${HIVE_SETTING}
${HIVE_SQL}
" >/home/q/data_to_hive/data_hive/data_hive_undelivrd
TIMESTAMP=$(date +%Y%m%d%H%M%S)
echo "end time:${TIMESTAMP}"
echo '-----------结束从hive查数----------------'
exit;

三. 总结

这里,我认为价值在于我走的弯路上!为了解决mysql插入性能问题,实施的一系列探索上, 同时积累了用脚本对mysql这些操作的熟练性.
过程中遇到的问题都轻描淡写了(有google!),从本文你将可以了解以下知识:

  1. hive脚本相关操作
  2. mysql数据插入,批量插入脚本的使用,及其中我遇到的一些坑.
  3. 脚本处理数据的一些操作(遍历目录下的每一个文件, 遍历文件的每一行,获取每一行中的每一列,记录shell线程,执行时间,函数传参和返回值)
  4. 理解做事情的思路是多么的重要.
  5. 这是一次xxx的经历.