Task
1. 搭建一个单机的hadoop
2. 用flume读测试环境Kafka数据写到hdfs中
环境准备
1. jdk1.8.0_102
2. hadoop-2.7.3
3. apache-flume-1.7.0-bin
4.
三个子目录都放置在宿主机的/data目录
# pwd -P; ls $(pwd -P)
/data
apache-flume-1.7.0-bin hadoop-2.7.3 jdk1.8.0_102
搭建hadoop
采用的hadoop版本为最新稳定版——2.7.3,下载地址为https://mirrors.cnnic.cn/apache/hadoop/common/hadoop-2.7.3/hadoop-2.7.3.tar.gz
# install
1. export JAVA_HOME=/data/jdk1.8.0_102
2. bin/hdfs namenode -format
3. sbin/hadoop-daemon.sh start namenode
4. sbin/hadoop-daemon.sh start datanode
# test
1. bin/hdfs dfs -mkdir /user
2. bin/hdfs dfs -ls /
3. bin/hdfs dfs -put etc/hadoop/hdfs-site.xml /user
4. bin/hdfs dfs -cat /user/hdfs-site.xml
配置flume
采用的flume版本为最新稳定版——1.7.0,下载地址为https://mirrors.cnnic.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
配置flume从日志易测试环境的Kafka上读数据,写到hdfs上
注意写hdfs需要hdfs jar包,并需要删除flume/lib目录中多余的一个snappy-java包
find /data/hadoop-2.7.3/share/hadoop/hdfs -name *.jar -exec cp -t /data/apache-flume-1.7.0-bin/lib {} \;
find /data/hadoop-2.7.3/share/hadoop/common -name *.jar -exec cp -t /data/apache-flume-1.7.0-bin/lib {} \;
配置文件见文章末尾。
最终结果
在flume中配置了每10000条event就切换一个新文件,在hdfs上用命令可以看到符合预期
flume-ng配置文件
conf/flume.conf 配置文件:
# http://flume.apache.org/FlumeUserGuide.html#file-channel
#定义存储方式
flume.channels = channel1
#定义存储类型
flume.channels.channel1.type = file
#检查点文件的备份目录,该检查点备份目录不能与数据目录一致
flume.channels.channel1.checkpointDir = logs/collector/flume/editlog/checkpoint
用于存储日志文件,可以逗号分隔多个目录列表。在不同的磁盘上使用多个目录可以提高文件通道的性能
flume.channels.channel1.dataDirs = logs/collector/flume/editlog/data
#频道的最大容量
flume.channels.channel1.capacity = 1000000
#等待重连时间
flume.channels.channel1.keep-alive = 1
#通道支持事物的最大大小
flume.channels.channel1.transactionCapacity
= 10000
#检查点之间的时间(毫秒)
flume.channels.channel1.checkpointInterval = 5000
#使用源
flume.sources = sourceKafka
#使用远类型
flume.sources.sourceKafka.type = org.apache.flume.source.kafka.KafkaSource
#通道模式
flume.sources.sourceKafka.channels = channel1
#写入通道中最大消息数
flume.sources.sourceKafka.batchSize = 1000
#写入通道之前的最长时间,毫秒单位
flume.sources.sourceKafka.batchDurationMillis = 2000
#使用kafka的源地址
flume.sources.sourceKafka.kafka.bootstrap.servers = 192.168.1.18:9092
#kafka从中读取消息生成的topic
flume.sources.sourceKafka.kafka.topics = ucloud
#在多个源的情况喜爱,同一个id为一个消费组
flume.sources.sourceKafka.kafka.consumer.group.id = flume
#覆盖topic的配置
# override topic name
flume.sources.sourceKafka.interceptors = i1
flume.sources.sourceKafka.interceptors.i1.type = static
flume.sources.sourceKafka.interceptors.i1.preserveExisting = false
flume.sources.sourceKafka.interceptors.i1.key = topic
flume.sources.sourceKafka.interceptors.i1.value = ucloud
#增加主机的配置
# add hostname header
flume.sources.sourceKafka.interceptors = i2
flume.sources.sourceKafka.interceptors.i2.type = host
flume.sources.sourceKafka.interceptors.i2.preserveExisting = false
flume.sources.sourceKafka.interceptors.i2.hostHeader = hostname
flume.sinks = sinkHdfs
flume.sinks.sinkHdfs.type = hdfs
flume.sinks.sinkHdfs.channel = channel1
flume.sinks.sinkHdfs.hdfs.path = hdfs://172.17.0.21:9000/flume/
flume.sinks.sinkHdfs.hdfs.filePrefix = %{hostname}
flume.sinks.sinkHdfs.hdfs.rollInterval = 0
flume.sinks.sinkHdfs.hdfs.rollCount = 10000
flume.sinks.sinkHdfs.hdfs.rollSize = 0
flume.sinks.sinkHdfs.hdfs.minBlockReplicas = 1
flume.sinks.sinkHdfs.hdfs.fileType = DataStream
flume.sinks.sinkHdfs.hdfs.writeFormat = Text
conf/flume-env.sh配置文件
#添加环境变量
export JAVA_HOME=/data/jdk1.8.0_102
export JAVA_OPTS="-Djava.net.preferIPv4Stack=true -Xms1g -Xmx1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-CMSConcurrentMTEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSParallelRemarkEnabled -Dflume.monitoring.type=HTTP -Dflume.monitoring.port=54211
启动运行实例命令:
bin/flume-ng agent -n flume -c conf --no-reload-conf -f conf/flume.conf
重启flume命令
[root@i-xb2d2l03 ~]# /data/apache-flume-1.7.0-bin/ctl.sh
Usage: /data/apache-flume-1.7.0-bin/ctl.sh {start|stop|restart|status|force-stop}
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!