10分钟教你如何用flume抽取mysql数据至hdfs
小标 2018-06-07 来源 : 阅读 3502 评论 0

摘要:本文主要向大家介绍了如何用flume抽取mysql数据至hdfs,通过具体的实例让大家了解,希望对大家学习mysql有所帮助。

本文主要向大家介绍了如何用flume抽取mysql数据至hdfs,通过具体的实例让大家了解,希望对大家学习mysql有所帮助。

场景分析:

一般情况下关系型数据库(mysql、oracle、sqlserver)数据抽取至hdfs、hive、hbase使用sqoop工具。
但sqoop数据抽取的底层依靠mapreduce,处理的实时性得不到保证。如果能将数据抽取和Sparkstreaming+Sparksql结合将大大提高了处理效率。因而想到了flume抽取关系型数据库数据至kafka中,有Sparkstreaming读取。本文介绍如何通过flume抽取mysql数据至hdfs,后面会介绍kafka+sparkStreaming的流程。

sql数据库表">1.建立mysql数据库表

控制台登录mysql后运行下命令:

use test;

create table wlslog
(id int not null,
time_stamp varchar(40),
category varchar(40),
type varchar(40),
servername varchar(40),
code varchar(40),
msg varchar(40),
primary key ( id )
);

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(1,’apr-8-2014-7:06:16-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000365’,’server state changed to standby’);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(2,’apr-8-2014-7:06:17-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000365’,’server state changed to starting’);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(3,’apr-8-2014-7:06:18-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000365’,’server state changed to admin’);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(4,’apr-8-2014-7:06:19-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000365’,’server state changed to resuming’);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(5,’apr-8-2014-7:06:20-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000361’,’started weblogic adminserver’);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(6,’apr-8-2014-7:06:21-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000365’,’server state changed to running’);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(7,’apr-8-2014-7:06:22-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000360’,’server started in running mode’);
commit;

2. 建立相关目录与文件

(1)创建本地状态文件
mkdir -p /var/lib/flume
cd /var/lib/flume
touch sql-source.status
chmod -R 777 /var/lib/flume
(2)建立HDFS目标目录

hdfs dfs -mkdir -p /flume/mysql
hdfs dfs -chmod -R 777 /flume/mysql

3. 准备JAR包

从https://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下载flume-ng-sql-source-1.3.7.jar文件,并复制到Flume库目录。我用的是ambari搭建的集群因此命令如下:

cp flume-ng-sql-source-1.3.7.jar /usr/hdp/current/flume-server/lib/

将MySQL JDBC驱动JAR包也复制到Flume库目录。

cp mysql-connector-java-5.1.17.jar /usr/hdp/current/flume-server/lib/mysql-connector-java.jar

4. 配置Flume

ambari主页面如下操作:Ambari -> Flume -> Configs -> flume.conf中配置如下属性:

agent.channels.ch1.type = memory 
agent.sources.sql-source.channels = ch1 
agent.channels = ch1 
agent.sinks = HDFS 
 
agent.sources = sql-source 
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource 
 
agent.sources.sql-source.connection.url = jdbc:mysql://你的ip:3306/test 
agent.sources.sql-source.user = root 
agent.sources.sql-source.password = 你的密码 
agent.sources.sql-source.table = wlslog 
agent.sources.sql-source.columns.to.select = * 
 
agent.sources.sql-source.incremental.column.name = id 
agent.sources.sql-source.incremental.value = 0 
 
agent.sources.sql-source.run.query.delay=5000 
 
agent.sources.sql-source.status.file.path = /var/lib/flume 
agent.sources.sql-source.status.file.name = sql-source.status 
 
agent.sinks.HDFS.channel = ch1 
agent.sinks.HDFS.type = hdfs 
agent.sinks.HDFS.hdfs.path = hdfs://你的namenode主机名/flume/mysql 
agent.sinks.HDFS.hdfs.fileType = DataStream 
agent.sinks.HDFS.hdfs.writeFormat = Text 
agent.sinks.HDFS.hdfs.rollSize = 268435456 
agent.sinks.HDFS.hdfs.rollInterval = 0 
agent.sinks.HDFS.hdfs.rollCount = 0

重启flume服务。hdfs对应目录下将会查看到数据

本文由职坐标整理并发布,了解更多内容,请关注职坐标数据库MySQL数据库频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 2 不喜欢 | 2
看完这篇文章有何感觉?已经有4人表态,50%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程