flume_kafka_hdfs_hive数据的处理
使用flume收集数据,将数据传递给kafka和hdfs,kafka上的数据可以使用storm构建实时计算,而hdfs上的数据,经过MR处理之后可以导入hive中进行处理。
环境:hadoop1.2.1,hive 0.13.1,maven 3.2.5,flume 1.4,kafka 0.7.2,eclipse luna,jdk 1.7_75;mysql-connector-java-5.1.26.bin.jar,flume-kafka-master.zip。
说明:所有服务都架设在一台机器上。
1:安装hadoop:这篇文章写得比较完整,可以看看:Ubuntu 12.10 安装JDK、Hadoop全过程
我在安装过程中出现:Does not contain a valid host:port authority: file:/// ,看了一遍自己的core-site.xml,hdfs-site.xml,mapred-site.xml没有发现错误,还特地看了些hosts配置,最后网上找到,fs.default.name中default写错了,启动hadoop。
2:安装hive:下载解压之后,设置HIVE_HOME,将HIVE_HOME/bin加入到PATH变量中,直接输入hive即可启动。默认hive是使用嵌入模式的Derby数据库,它的特点是小巧,而且老爹也是apache,但存在单session,无法多用户共享,这里参考网上的资料将元数据存储到mysql中去:Hive集成Mysql作为元数据,这里我出现了点问题,只能使用localhost进行连接,无法使用root@myggg,试着按照文章中查找my.conf,但没有找到相关配置,在实验环境下这样也可以用:
3:安装flume:
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
Flume是由Cloudera推出的一个高效收集,处理,移动大量日志数据的分布式,可靠地,高可用的服务。它有简单并且灵活的架构基于数据流之上。它使用可靠地协调性,容错转移,恢复机制使它强健性并且容错。它使用简单可伸缩的数据模型能实现在线数据分析。
Flume安agent划分,一个agent包括Source,Channel,Sink三个部分,Source从Web Server中取数据,push交给Channel,Sink将pull Channel得到数据,一个agent可以有多个Channel, Sink。
配置FLUME_HOME,将PATH中加入Flume下的执行路径,将conf下的flume-conf.properties.template重命名为flume-conf.properties,然后进行配置,在单机情况下:
agent.sources = r1 //agent中添加source,命名为r1 agent.sinks = s1 //agent中添加sink,命名为s1 agent.channels = c1 //agent中添加channel,命名为c1 agent.sinks.s1.channel = c1 //s1从c1中取数据 agent.sources.r1.channels = c1 //r1将数据交给c1 #describe the source agent.sources.r1.type = exec //定义r1的类型为exec agent.sources.r1.command = tail -F /root/input/loginfo //r1执行的命令 #use a channel which buffers events in memory agent.channels.c1.type = memory //定义c1的类型memory agent.channels.c1.capacity = 1000 //c1的容量 agent.channels.c1.transactionCapacity = 100 //channel获取或者sink获得一次最大的数据量 #sinks type agent.sinks.s1.type = logger //定义s1的类型为logger
完成后启动flume,测试:
bin/flume-ng agent --conf ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent
4:安装kafka:关于kafka的介绍:Kafka快速入门,简单来说,kafka集群中的一台服务器就是一个broker,消息按名字分类,叫做topic,消息的产生是producer,消息的获取方为customer。kafka的安装方法同上。由于使用默认配置,kafka中的config下不需要配置了,直接启动即可进行模拟:
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties bin/kafka-console-producer.sh --zookeeper localhost:2181 localhost:9092 --topic test bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
相关可以看apache-kafka。
5:进行整合:数据的处理过程包括数据收集,数据清理,数据存储,数据分析,数据展现。在这里数据的收集由flume负责,定期从web server中收集log相关信息,对于实时数据的处理,将数据直接发送到kafka,然后交给后面的storm处理(这个没有做),对于离线部分,经过简单的mr处理后存储到hdfs上,然后使用hive操作。
总的架构图:
Flume的设计:
在搭建之前,先安装maven:安装步骤同上Flume与Kafka
安装完后echo $PATH:
/usr/lib/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/java/jdk1.7.0_75/bin:/root/hadoop-1.2.1/bin:/root/apache-hive-0.13.1/bin:/root/apache-flume-1.4.0/bin:/root/kafka-0.7.2/bin:/root/bin:/usr/java/jdk1.7.0_75/bin:/root/hadoop-1.2.1/bin:/root/apache-hive-0.13.1/bin:/root/apache-flume-1.4.0/bin:/root/kafka-0.7.2/bin:/root/Downloads/apache-maven-3.2.5/bin
Flume与Kafka之间整合需要一个插件:这里介绍个flume-kafka插件,flume1.4,kafka0.7.2的基础上,将代码下载下来,进入目录,使用maven打包成jar文件,将生成的jar包放到flume的lib或相关目录下,依次将hadoop.1.2.1-*.jar,kafka0.7.2.jar, scala-compiler.jar(2.8),scala-library.jar(2.8),zkclient-0.1.jar导入,mvn package过程中可能会报错,找不到kafka0.7.2.jar,你需要将额外的extra-dependencies下的包放到‘~/.m2/repository/com/linkedin/kafka/kafka/0.7.2/’下,再进行package。
对于myggg开启6个终端:
对于发送到kafka中的数据以后在处理,现在主要是针对hadoop中的数据,首先使用MR处理,格式化文本。
6:后续:解压eclipse,将之前准备的hadoop-eclipse-plugin-1.2.1.jar放到eclipse下的plugins目录下,使用vnc连接到机器,编写MR程序。
使用’hadoop fs -cat /myFlume/FlumeData.1426320728464’查看文件:
1,b 2,c 3,d 4,e 5,f 6,g 7,z 0,o
编写MR,将一行记录拆分为key,value:
public static class MyMapper extends Mapper<Object, Text, IntWritable, Text>{ private IntWritable hello = new IntWritable(); private Text world = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String[] array = value.toString().split(","); if (array.length == 2) { hello.set(Integer.parseInt(array[0])); world.set(array[1]); context.write(hello, world); } } }
查看结果:
[root@myggg eclipse]# hadoop fs -cat /myoutput/part-r-00000 0 o 1 b 2 c 3 d 4 e 5 f 6 g 7 z
使用hive建立外部表查看数据:
create external table employee(id int, name string) row format delimited fields terminated by ‘\t‘ lines terminated by ‘\n‘ stored as textfile location ‘/myoutput‘;
然后就可以进行相关查询与处理了。