项目三 flume 采集数据至hbase

简介

flume采集数据至hbase有四个实例,本文章一一列举,各实例流程均差不多,区别基本上就是配置文件的编写。其中实例一流程较为详细,后面几个实例参考实例一流程

实例一

编写配置文件

  • 先在/opt/module/flume/conf/job目录下创建一个flume采集数据至hbase的配置文件
代码语言:shell复制
cd /opt/module/flume/conf/job
vim test-flume-into-hbase-1.conf

agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink

agent.sources.logfile-source.type = exec
agent.sources.logfile-sourcemand = tail -f /data/flume-hbase-test/mkhbasetable/data/test.log
agent.sources.logfile-source.checkerpiodic = 50
agent.sources.logfile-source.channels = file-channel

agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data

agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table-test1
agent.sinks.hbase-sink.columnFamily = familycloml
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

agent.sinks.hbase-sink.serializer.payloadcolumn = column-1
agent.sinks.hbase-sink.channel = file-channel
配置文件详解
  • 源(Source)
代码语言:txt复制
agent.sources: 定义 Flume 代理的源,这里设置为 `logfile-source`。

agent.sources.logfile-source.type: 源的类型为 `exec`,表示它将执行一个命令来获取数据。

agent.sources.logfile-sourcemand: 指定要执行的命令,`tail -f /data/flume-hbase-test/mkhbasetable/data/test.log` 用来实时跟踪 `test.log` 文件,输出文件中新增的内容。

agent.sources.logfile-source.checkerpiodic: 这是一个拼写错误,应为 `checker.period`,它的作用是设定源的检查时长(单位为毫秒),决定多长时间检查源是否有新数据,这里设置为每 50 毫秒检查一次。

agent.sources.logfile-source.channels: 指定源与通道之间的连接,这里指向 `file-channel`,表示从这个源读取的数据会被发送到这个通道。
  • 通道(Channel)
代码语言:txt复制
agent.channels: 定义 Flume 代理的通道,这里设置为 `file-channel`。

agent.channels.file-channel.type: 通道的类型为 `file`,表示使用文件系统存储数据。

agent.channels.file-channel.checkpointDir: 这是通道的检查点目录,用于存储通道的状态,以便在失败后恢复。

agent.channels.file-channel.dataDirs: 这是通道的数据存储目录,通道会把数据暂时存储在这里,直到 Sink 提取数据。
  • 接收器(Sink)
代码语言:txt复制
agent.sinks: 定义 Flume 代理的接收器,这里设置为 `hbase-sink`。

agent.sinks.hbase-sink.type: 接收器的类型是 HBaseSink,意味着这个接收器将数据写入 HBase 数据库。

agent.sinks.hbase-sink.table: 指定要写入的 HBase 表名,这里是 `mikeal-hbase-table-test1`。

agent.sinks.hbase-sink.columnFamily: 指定 HBase 表的列族名,这里为 `familycloml`。需要确认该列族是否已经存在于 HBase 表中。

agent.sinks.hbase-sink.serializer: 指定如何序列化数据的类,这里使用了简单的 HBase 事件序列化器 `SimpleHbaseEventSerializer`。

agent.sinks.hbase-sink.serializer.payloadcolumn: 指定数据中应保存到 HBase 的列名,这里设置为 `column-1`,这个值会对应于 HBase 表的某一列。

agent.sinks.hbase-sink.channel: 指定这个接收器使用的通道,这里指向 `file-channel`,表示它从这个通道中读取数据。

创建相关路径和文件

代码语言:shell复制
# 创建日志和数据存放路径
mkdir -p /data/flume-hbase-test/mkhbasetable/data/

# 创建通道检查点路径和通道数据暂存路径
mkdir -p /data/flume-hbase-test/data
mkdir -p /data/flume-hbase-test/checkpoint

# 创建空日志文件
cd /data/flume-hbase-test/mkhbasetable/data/
touch test.log

# 编辑模拟数据文件
vim data-test1.txt

134.3
726.9
262.0
902.8
665.8
153.2
618.3
333.4
985.7
201.2
970.3
234.8

hbase创建相关表

代码语言:shell复制
# 依次启动Hadoop,zookeeper,hbase所有进程
allstart.sh

# 启动hbase shell
hbase shell

# 创建mikeal-hbase-table-test1表
create 'mikeal-hbase-table-test1', 'familycloml'

编写启动脚本

  • 该脚本方便地启动 Flume 任务而不需要手动输入所有命令,也可以确保 Flume 进程在后台持续运行,适合在生产环境中使用。
代码语言:shell复制
# 切换到脚本路径
cd /opt/module/flume/job-shell

# 编辑启动脚本
vim test-flume-into-hbase-1

#!/bin/bash

echo " --------启动 master 采集日志数据至Hbase 测试1--------"
nohup /opt/module/flume/bin/flume-ng agent -n agent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/test-flume-into-hbase-1.conf >/dev/null 2>&1 &


# 添加权限
chmod 777 ./*

启动流程

代码语言:shell复制
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell

# 启动flume采集脚本
test-flume-into-hbase-1

# 向日志文件添加数据
/data/flume-hbase-test/mkhbasetable/data
cat data-test1.txt >> test.log 

检测结果

实例二

编写配置文件

代码语言:shell复制
cd /opt/module/flume/conf/job
vim test-flume-into-hbase-2.conf

agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink

agent.sources.logfile-source.type = exec
agent.sources.logfile-sourcemand = tail -f /data/flume-hbase-test/mkhbasetable/data/test.log
agent.sources.logfile-source.checkPeriodic = 50

agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data

agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table-test2
agent.sinks.hbase-sink.columnFamily = familycloml
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
agent.sinks.hbase-sink.serializer.payloadColumn = column-1

agent.sources.logfile-source.channels = file-channel
agent.sinks.hbase-sink.channel = file-channel

配置文件详解

代码语言:txt复制
实例二使用SimpleAsyncHbaseEventSerializer序列化模式来采集数据,其他模式基本和实例一一致

hbase创建相关表

代码语言:shell复制
create 'mikeal-hbase-table-test2', 'familycloml'

编写启动脚本

代码语言:shell复制
vim test-flume-into-hbase-2

#!/bin/bash

echo " --------启动 master 采集日志数据至Hbase 测试2--------"
nohup /opt/module/flume/bin/flume-ng agent -n agent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/test-flume-into-hbase-2.conf >/dev/null 2>&1 &


# 添加权限
chmod 777 ./*

启动流程

代码语言:shell复制
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell

# 启动flume采集脚本
test-flume-into-hbase-2

# 向日志文件添加数据
cd /data/flume-hbase-test/mkhbasetable/data
cat data-test1.txt >> test.log 

检测结果

实例三

编写配置文件

代码语言:shell复制
cd /opt/module/flume/conf/job
vim test-flume-into-hbase-3.conf

agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink

agent.sources.logfile-source.type = exec
agent.sources.logfile-sourcemand = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source.checkperiodic = 50

agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data

agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table-test3
agent.sinks.hbase-sink.columnFamily = familycloml
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

agent.sinks.hbase-sink.serializer.regex = (\\d+\\.\\d+\\.\\d+\\.\\d+) \\[(.*?)\\] \\w+ (.+)
agent.sinks.hbase-sink.serializer.colNames = ip, time, url

agent.sources.logfile-source.channels = file-channel
agent.sinks.hbase-sink.channel = file-channel

配置文件详解

代码语言:txt复制
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer: 指定使用正则表达式序列化器将事件序列化为 HBase 可接受的格式。

agent.sinks.hbase-sink.serializer.regex =(\\d+\\.\\d+\\.\\d+\\.\\d+) \\[(.*?)\\] \\w+ (.+): 定义正则表达式,将相关数据提取

agent.sinks.hbase-sink.serializer.colNames = ip, time, url: 指定要提取的列名,即将从日志中解析出的数据对应到 HBase 表的列。这里是 ip、time 和 url。

hbase创建相关表

代码语言:shell复制
create 'mikeal-hbase-table-test3', 'familycloml'

编辑日志文件所需要测试的数据

代码语言:txt复制
vim nginx-data.txt

192.168.1.1 [27/Sep/2024:10:28:00 -0400] GET /path/to/resource?param=value HTTP/1.1
10.0.0.2 [27/Sep/2024:10:32:31 -0400] POST /api/v1/data HTTP/1.1
172.16.0.3 [27/Sep/2024:10:34:45 -0400] DELETE /api/v2/resource HTTP/1.1

编写启动脚本

代码语言:shell复制
cd /data/flume-hbase-test/mkhbasetable/data

vim test-flume-into-hbase-3

#!/bin/bash

echo " --------启动 master 采集日志数据至Hbase 测试3--------"
nohup /opt/module/flume/bin/flume-ng agent -n agent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/test-flume-into-hbase-3.conf >/dev/null 2>&1 &

# 添加权限
chmod 777 ./*

启动流程

代码语言:shell复制
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell

# 启动flume采集脚本
test-flume-into-hbase-3

# 向日志文件添加数据
cd /data/flume-hbase-test/mkhbasetable/data
cat nginx-data.txt >> nginx.log 

检测结果

实例四

编写配置文件

代码语言:shell复制
cd /opt/module/flume/conf/job
vim test-flume-into-hbase-multi-position.conf

agent.sources = logfile-source-1, logfile-source-2
agent.channels = file-channel-1, file-channel-2
agent.sinks = hbase-sink-1, hbase-sink-2

agent.sources.logfile-source-1.type = exec
agent.sources.logfile-source-1mand = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source-1.checkperiodic = 50

agent.sources.logfile-source-2.type = exec
agent.sources.logfile-source-2mand = tail -f /data/flume-hbase-test/mkhbasetable/data/tomcat.log
agent.sources.logfile-source-2.checkperiodic = 50

agent.channels.file-channel-1.type = file
agent.channels.file-channel-1.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel-1.dataDirs = /data/flume-hbase-test/data

agent.channels.file-channel-2.type = file
agent.channels.file-channel-2.checkpointDir = /data/flume-hbase-test/checkpoint2
agent.channels.file-channel-2.dataDirs = /data/flume-hbase-test/data2

agent.sinks.hbase-sink-1.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink-1.table = mikeal-hbase-table-test-multi-position
agent.sinks.hbase-sink-1.columnFamily = familycloml1
agent.sinks.hbase-sink-1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

agent.sinks.hbase-sink-1.serializer.regex = (\\d+\\.\\d+\\.\\d+\\.\\d+) \\[(.*?)\\] \\w+ (.+) 
agent.sinks.hbase-sink-1.serializer.colNames = ip, time, url

agent.sinks.hbase-sink-2.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink-2.table = mikeal-hbase-table-test-multi-position
agent.sinks.hbase-sink-2.columnFamily = familycloml2
agent.sinks.hbase-sink-2.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

agent.sinks.hbase-sink-2.serializer.regex = (\\d+\\.\\d+\\.\\d+\\.\\d+) \\[(.*?)\\] \\w+ (.+)
agent.sinks.hbase-sink-2.serializer.colNames = ip, time, url

agent.sources.logfile-source-1.channels = file-channel-1
agent.sinks.hbase-sink-1.channel = file-channel-1

agent.sources.logfile-source-2.channels = file-channel-2
agent.sinks.hbase-sink-2.channel = file-channel-2

配置文件详解

代码语言:txt复制
配置在前面三个实例的基础上加上双通道模式

创建相关路径

代码语言:shell复制
# 创建通道检查点路径和通道数据暂存路径
mkdir -p /data/flume-hbase-test/checkpoint2
mkdir -p /data/flume-hbase-test/data2

hbase创建相关表

代码语言:shell复制
create 'mikeal-hbase-table-test-multi-position', 'familycloml1', 'familycloml2'

编写启动脚本

代码语言:shell复制
cd /data/flume-hbase-test/mkhbasetable/data

vim test-flume-into-hbase-multi-position 


#!/bin/bash

echo " --------启动 master 采集日志数据至Hbase 测试4--------"
nohup /opt/module/flume/bin/flume-ng agent -n agent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/test-flume-into-hbase-multi-position.conf >/dev/null 2>&1 &


# 添加权限
chmod 777 ./*

启动流程

代码语言:shell复制
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell

# 启动flume采集脚本
test-flume-into-hbase-multi-position 

# 向日志文件添加数据
cd /data/flume-hbase-test/mkhbasetable/data
cat nginx-data.txt >> nginx.log && cat nginx-data.txt >> tomcat.log

检测结果