配置flume读日志易kafka写hdfs写数据

Task 1.     搭建一个单机的hadoop 2.     用flume读测试环境Kafka数据写到hdfs中 环境准备 1.     jdk1.8.0_102 2.     hadoop-2.7.3 3.     apache-flume-1.7.0-bin 4.     三个子...

Task

1.     搭建一个机的hadoop

2.     flume测试环Kafka数据写到hdfs

环境准备

1.     jdk1.8.0_102

2.     hadoop-2.7.3

3.     apache-flume-1.7.0-bin

4.     三个子目都放置在宿主机的/data
# pwd -P; ls $(pwd -P)
/data
apache-flume-1.7.0-bin  hadoop-2.7.3  jdk1.8.0_102

搭建hadoop

采用的hadoop版本为最新稳定版——2.7.3,下载地址为https://mirrors.cnnic.cn/apache/hadoop/common/hadoop-2.7.3/hadoop-2.7.3.tar.gz

# install

1.     export JAVA_HOME=/data/jdk1.8.0_102

2.     bin/hdfs namenode -format

3.     sbin/hadoop-daemon.sh start namenode

4.     sbin/hadoop-daemon.sh start datanode

# test

1.     bin/hdfs dfs -mkdir /user

2.     bin/hdfs dfs -ls /

3.     bin/hdfs dfs -put etc/hadoop/hdfs-site.xml /user

4.     bin/hdfs dfs -cat /user/hdfs-site.xml

配置flume

采用的flume版本为最新稳定版——1.7.0,下载地址为https://mirrors.cnnic.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

配置flume从日志易测试环境的Kafka上读数据,写到hdfs

注意写hdfs需要hdfs jar包,并需要删除flume/lib目录中多余的一个snappy-java

find /data/hadoop-2.7.3/share/hadoop/hdfs -name *.jar  -exec cp -t /data/apache-flume-1.7.0-bin/lib {} \;

find /data/hadoop-2.7.3/share/hadoop/common -name *.jar  -exec cp -t /data/apache-flume-1.7.0-bin/lib {} \;

配置文件见文章末尾。

终结

flume中配置了每10000event就切换一个新文件,hdfs上用命令可以看到符合预期

attachments-2018-08-HdMLsHP45b7a750d46911.png

flume-ng配置文件

conf/flume.conf 配置文件

http://flume.apache.org/FlumeUserGuide.html#file-channel

 

#定义存储方式

flume.channels = channel1

 

#定义存储类型

flume.channels.channel1.type = file

 

#检查点文件的备份目录,该检查点备份目录不能与数据目录一致

flume.channels.channel1.checkpointDir = logs/collector/flume/editlog/checkpoint

 


用于存日志文件,可以逗号分隔多个目列表。在不同的磁上使用多个目可以提高文件通道的性能

 

flume.channels.channel1.dataDirs = logs/collector/flume/editlog/data

 

#频道的最大容量

flume.channels.channel1.capacity = 1000000

 

#等待重连时间

flume.channels.channel1.keep-alive = 1


#
通道支持事物的最大大小

flume.channels.channel1.transactionCapacity = 10000

#检查点之间的时间(毫秒)

flume.channels.channel1.checkpointInterval = 5000

 

#使用源

flume.sources = sourceKafka


#
使用远类型

flume.sources.sourceKafka.type = org.apache.flume.source.kafka.KafkaSource

 

#通道模式

flume.sources.sourceKafka.channels = channel1

 

#写入通道中最大消息数

flume.sources.sourceKafka.batchSize = 1000

 

#写入通道之前的最长时间,毫秒单位

flume.sources.sourceKafka.batchDurationMillis = 2000

 

#使用kafka的源地址

flume.sources.sourceKafka.kafka.bootstrap.servers = 192.168.1.18:9092

 

#kafka从中读取消息生成的topic

flume.sources.sourceKafka.kafka.topics = ucloud

 

#在多个源的情况喜爱,同一个id为一个消费组

flume.sources.sourceKafka.kafka.consumer.group.id = flume

 

#覆盖topic的配置

# override topic name

 

flume.sources.sourceKafka.interceptors = i1

flume.sources.sourceKafka.interceptors.i1.type = static

flume.sources.sourceKafka.interceptors.i1.preserveExisting = false

flume.sources.sourceKafka.interceptors.i1.key = topic

flume.sources.sourceKafka.interceptors.i1.value = ucloud


#
增加主机的配置

# add hostname header

flume.sources.sourceKafka.interceptors = i2

flume.sources.sourceKafka.interceptors.i2.type = host

flume.sources.sourceKafka.interceptors.i2.preserveExisting = false

flume.sources.sourceKafka.interceptors.i2.hostHeader = hostname

 

flume.sinks = sinkHdfs

flume.sinks.sinkHdfs.type = hdfs

flume.sinks.sinkHdfs.channel = channel1

flume.sinks.sinkHdfs.hdfs.path = hdfs://172.17.0.21:9000/flume/

flume.sinks.sinkHdfs.hdfs.filePrefix = %{hostname}

flume.sinks.sinkHdfs.hdfs.rollInterval = 0 

flume.sinks.sinkHdfs.hdfs.rollCount = 10000

flume.sinks.sinkHdfs.hdfs.rollSize = 0

flume.sinks.sinkHdfs.hdfs.minBlockReplicas = 1

flume.sinks.sinkHdfs.hdfs.fileType = DataStream

flume.sinks.sinkHdfs.hdfs.writeFormat = Text

 

conf/flume-env.sh配置文件

#添加环境变量

export JAVA_HOME=/data/jdk1.8.0_102

export JAVA_OPTS="-Djava.net.preferIPv4Stack=true -Xms1g -Xmx1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-CMSConcurrentMTEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSParallelRemarkEnabled -Dflume.monitoring.type=HTTP -Dflume.monitoring.port=54211

 

 

运行例命令:

bin/flume-ng agent -n flume -c conf --no-reload-conf -f conf/flume.conf

 

 

 

重启flume命令

 

[root@i-xb2d2l03 ~]# /data/apache-flume-1.7.0-bin/ctl.sh

Usage: /data/apache-flume-1.7.0-bin/ctl.sh {start|stop|restart|status|force-stop}

  • 发表于 2018-08-20 16:01
  • 阅读 ( 4073 )
  • 分类:技术分享

你可能感兴趣的文章

相关问题

1 条评论

请先 登录 后评论
不写代码的码农
admin

15 篇文章

作家榜 »

  1. 日志易 24 文章
  2. admin 15 文章
  3. 日志易小A 2 文章
  4. 疯狂的馒头 2 文章
  5. 腾龙国际娱乐 1 文章
  6. rizhiyi509 1 文章
  7. Xiaoyu 1 文章
  8. 陈晨 0 文章