Kafka与Flume整合完成数据收集配置

246次阅读
没有评论

一、Flume和Kafka组件整合配置

1.新建一个syslog_mem_kafka.conf文件

在/usr/local/src/flume/conf /目录下创建syslog_mem_kafka.conf文件

[hadoop@master ~]$ vim /usr/local/src/flume/conf/syslog_mem_kafka.conf

2.配置syslog_mem_kafka.conf文件

添加如下内容:

agent1.sources = src     

agent1.channels = ch1  

agent1.sinks = des1     
 
# Describe/configure the source

agent1.sources.src.type =syslogtcp   

agent1.sources.src.port = 6868   

agent1.sources.src.host = master  
 
# Use a channel which buffers events in memory

agent1.channels.ch1.type = memory   
 
# Describe the sink
 
agent1.sinks.des1.type = org.apache.flume.sink.kafka.KafkaSink  

agent1.sinks.des1.brokerList = master:9092,slave1:9092,slave2:9092 

agent1.sinks.des1.topic = flumekafka   

agent1.sources.src.channels = ch1     

agent1.sinks.des1.channel = ch1       

3.创建名为flumekafka的topic

[hadoop@master ~]$ /usr/local/src/kafka/bin/kafka-topics.sh –create –zookeeper master:2181,slave1:2181,slave2:2181 –replication-factor 2 –topic flumekafka –partitions 1

Create参数代表创建, zookeeper参数为zookeeper集群的主机名 ,replication-factor代表生成多少个副本文件,topic 为topic的名称,partitions指定多少个分区

4.启动flume进程

[hadoop@master ~]$ /usr/local/src/flume/bin/flume-ng agent -c /usr/local/src/flume/conf/ -f /usr/local/src/flume/conf/syslog_mem_kafka.conf -n agent1 -Dflume.root.logger=DEBUG,console

运行后不要关闭终端

-c 的意思是在conf目录使用配置文件。指定配置文件放在上面目录

-f 指定一个配置文件

-n agent的名称(必填)

-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。

5.在slave1中创建消费者

[hadoop@slave1~]$/usr/local/src/kafka/bin/kafka-console-consumer.sh –zookeeper master:2181,slave1:2181,slave2:2181 –topic flumekafka –from-beginning

运行后不要关闭终端

zookeeper参数为zookeeper集群的主机名

Topic指定在hello上创建消费者

from-beginning  读取历史未消费的数据

6.使用nc命令 向master:6868 发送信息

如果连接成功,这时候客户端输入文本信息回车就可以发送到服务端,一旦有人连接,第二个会话就连接不上。

打开新的master终端

[hadoop@master ~]$ nc master 6868

Hello flumekafka

7.查看slave1中的消费者

在前面slave1创建消费者的终端中可以看出输出信息

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

hello flumekafka

到点睡觉了
版权声明:本站原创文章,由 到点睡觉了2022-01-12发表,共计1934字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)