# hudi 安装
# 本内容你将获得
- flink集成kafka、mysql cdc、hudi
- flink sql接收kafka消息
- flink sql接收mysql cdc消息
- flink sql创建hudi表并映射hive表
# 软件安装
# 说明
hudi依赖Java、scala、flink环境
hudi保存数据到hdfs依赖hadoop环境
hudi映射hive表依赖hive环境
# 集成开始
# 编译jar包
# 编译hudi jar包
从hudi代码仓库下载源码并切换到0.12.0分支
修改Hudi\pom.xml中kafka、hadoop、hive、spark2、scala12、scala.binary的版本
#软件版本与数据环境一致
<kafka.version>3.2.0</kafka.version> //旧: <kafka.version>2.0.0</kafka.version>
<hadoop.version>3.3.4</hadoop.version> //旧: <hadoop.version>2.10.1</hadoop.version>
<hive.version>3.1.3</hive.version> //旧: <hive.version>2.3.1</hive.version>
<spark2.version>2.4.8</spark2.version> //旧: <spark2.version>2.4.4</spark2.version>
<scala12.version>2.12.15</scala12.version> //旧: <scala12.version>2.12.10</scala12.version>
<scala.binary.version>2.12</scala.binary.version> //旧: <scala.binary.version>2.11</scala.binary.version>
修改Hudi\packaging\hudi-flink-bundle\pom.xml中hive的版本
<hive.version>3.1.3</hive.version> //旧: <hive.version>2.3.1</hive.version>
<id>flink-bundle-shade-hive3</id>
<properties>
<hive.version>3.1.3</hive.version> //旧: <hive.version>3.1.2</hive.version>
<flink.bundle.hive.scope>compile</flink.bundle.hive.scope>
</properties>
修改Hudi\hudi-common\src\main\java\org\apache\hudi\common\table\log\block\HoodieParquetDataBlock.java的构造函数
try (FSDataOutputStream outputStream = new FSDataOutputStream(baos,null)) //旧: try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) //避免编译报错
# 编译 hudi-flink1.14-bundle_2.12-0.12.0.jar
进入Hudi\packaging\hudi-flink-bundle目录,执行 mvn clean install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3 进行编译
使用压缩工具打开hudi-flink1.14-bundle_2.12-0.12.0.jar(非解压),修改hbase-default.xml、hbase-site.xml中的hbase版本及是否跳过
<name>hbase.defaults.for.version</name>
<value>2.0.0-alpha4</value> //旧: <value>2.4.9</value>
<name>hbase.defaults.for.version.skip</name>
<value>true</value> //旧: <value>false</value>
将hudi-flink1.14-bundle_2.12-0.12.0.jar 上传到 /root/tools/flink-1.14.5/lib目录
# 编译hudi-hadoop-mr-bundle-0.12.0.jar
进入Hudi\packaging\hudi-hadoop-mr-bundle 目录,执行 mvn clean install -DskipTests 进行编译
使用压缩工具打开hudi-hadoop-mr-bundle-0.12.0.jar(非解压),修改hbase-default.xml、hbase-site.xml中的hbase版本及是否跳过
<name>hbase.defaults.for.version</name>
<value>2.0.0-alpha4</value> //旧: <value>2.4.9</value>
<name>hbase.defaults.for.version.skip</name>
<value>true</value> //旧: <value>false</value>
将hudi-hadoop-mr-bundle-0.12.0.jar 上传到 /root/tools/hive-3.1.3/auxlib、/root/tools/hive-3.1.3/lib目录
# 编译hudi-hive-sync-bundle-0.12.0.jar
进入Hudi\packaging\hudi-hive-sync-bundle 目录,执行 mvn clean install -DskipTests 进行编译
使用压缩工具打开hudi-hive-sync-bundle-0.12.0.jar(非解压),修改hbase-default.xml、hbase-site.xml中的hbase版本及是否跳过
<name>hbase.defaults.for.version</name>
<value>2.0.0-alpha4</value> //旧: <value>2.4.9</value>
<name>hbase.defaults.for.version.skip</name>
<value>true</value> //旧: <value>false</value>
将hudi-hive-sync-bundle-0.12.0.jar 上传到 /root/tools/hive-3.1.3/auxlib目录
# 编译flink-shaded-hadoop jar包
从flink-shaded代码仓库下载源码并切换到release-10.0分支
修改flink-shaded\pom.xml,在<profiles>增加如下配置,用于优化网络
<profile>
<id>vendor-repos</id>
<activation>
<property>
<name>vendor-repos</name>
</property>
</activation> <!-- Add vendor maven repositories -->
<repositories> <!-- Cloudera -->
<repository>
<id>cloudera-releases</id>
<url>https://maven.aliyun.com/repository/central</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</profile>
修改flink-shaded\flink-shaded-hadoop-2-parent\pom.xml
<hadoop.version>3.3.4</hadoop.version> //旧:<hadoop.version>2.4.1</hadoop.version> <!--对应hadoop的版本3.3.4-->
修改\flink-shaded\flink-shaded-hadoop-2-parent\flink-shaded-hadoop-2\pom.xml
<packaging>jar</packaging>
<version>${hadoop.version}-14.0</version> //旧:<version>${hadoop.version}-10.0</version> <!--对应flink的版本1.14.5-->
修改flink-shaded\flink-shaded-hadoop-2-parent\flink-shaded-hadoop-2-uber\pom.xml
<packaging>jar</packaging>
<version>${hadoop.version}-14.0</version> //旧:<version>${hadoop.version}-10.0</version> <!--对应flink的版本1.14.5-->
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>${hadoop.version}-14.0</version> //旧: <version>${hadoop.version}-10.0</version> <!--对应flink的版本1.14.5-->
</dependency>
<dependency> //增加依赖
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
在flink-shaded目录下,执行 mvn clean install -DskipTests -Dhadoop.version=3.3.4 -Drat.skip=true 进行编译
将 flink-shaded-hadoop-2-uber-3.3.4-14.0.jar 上传到 /root/tools/flink-1.14.5/lib目录
# 下载jar包
登录maven官方仓库
搜索flink connector kafka,下载flink-connector-kafka_2.12-1.14.5.jar,用于flink集成kafka
搜索flink sql connector kafka,下载flink-sql-connector-kafka_2.12-1.14.5.jar,用于flink sql集成kafka
搜索 kafka clients ,下载kafka-clients-3.2.0.jar
搜索flink sql connector mysql cdc ,下载flink-sql-connector-mysql-cdc-2.2.1.jar ,用于flink sql集成mysql cdc
将4个jar上传到 /root/tools/flink-1.14.5/lib目录
# flink sql接收kafka消息
# 启动 flink
cd /root/tools/flink-1.14.5/bin
--设置环境变量
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
--启动集群
./start-cluster.sh
# kafka产生消息
# 启动 kafka
cd /root/tools/kafka_2.12-3.2.0/bin
nohup sh kafka-server-start.sh ../config/server.properties & #非守护进程启动,进程容易异常挂掉
nohup sh kafka-server-start.sh -daemon ../config/server.properties 1>/dev/null 2>&1 & #守护进程启动,防止进程异常挂掉
# 创建 topic
cd /root/tools/kafka_2.12-3.2.0/bin
./kafka-topics.sh --create --bootstrap-server 192.168.17.149:9092 --replication-factor 1 --partitions 1 --topic flinktest
# 查看当前 topic 列表
cd /root/tools/kafka_2.12-3.2.0/bin
./kafka-topics.sh --list --bootstrap-server 192.168.17.149:9092
# 产生消息
cd /root/tools/kafka_2.12-3.2.0/bin
sh kafka-console-producer.sh --broker-list 192.168.17.149:9092 --topic flinktest
//生产端发消息
{"tinyint0": 6, "smallint1": 223, "int2": 42999, "bigint3": 429450, "float4": 95.47324181659323, "double5": 340.5755392968011,"decimal6": 111.1111, "boolean7": true, "char8": "dddddd", "varchar9": "buy0", "string10": "buy1", "timestamp11": "2021-09-13 03:08:50.810"}
# 消费消息
cd /root/tools/kafka_2.12-3.2.0/bin
sh kafka-console-consumer.sh --bootstrap-server 192.168.17.149:9092 --topic flinktest --from-beginning
# 进入 flinksql
cd /root/tools/flink-1.14.5/bin
--设置环境变量
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./sql-client.sh embedded
# 创建与 kafka 关联的表
Flink SQL> CREATE TABLE kafkatest (
> tinyint0 TINYINT
> ,smallint1 SMALLINT
> ,int2 INT
> ,bigint3 BIGINT
> ,float4 FLOAT
> ,double5 DOUBLE
> ,decimal6 DECIMAL(38,8)
> ,boolean7 BOOLEAN
> ,char8 STRING
> ,varchar9 STRING
> ,string10 STRING
> ,timestamp11 STRING
> ) WITH (
> 'connector' = 'kafka', -- 使用 kafka connector
> 'topic' = 'flinktest', -- kafka topic名称
> 'scan.startup.mode' = 'earliest-offset', -- 从起始 offset 开始读取
> 'properties.bootstrap.servers' = '192.168.17.149:9092', -- kafka broker 地址
> 'properties.group.id' = 'testgroup1',
> 'value.format' = 'json',
> 'value.json.fail-on-missing-field' = 'true',
> 'value.fields-include' = 'ALL'
> );
[INFO] Execute statement succeed.
# 查询表
在 kafka 生产端,发消息后,应看到表接收到数据
Flink SQL> select *from kafkatest ;
# flink sql接收mysql cdc消息
# 查看数据库是否开启bin_log
如没有开启,需要修改/etc/my.cnf文件,增加如下内容并重启数据库
log-bin=mysql-bin
binlog-format=ROW
# 进入flinksql并创建关联mysql的表
Flink SQL> CREATE TABLE users_source_mysql (
> id BIGINT PRIMARY KEY NOT ENFORCED
> ,name STRING
> ,birthday TIMESTAMP(3)
> ,ts TIMESTAMP(3)
> ) WITH(
> 'connector' = 'mysql-cdc' ,
> 'hostname' = '192.168.17.149' ,
> 'port' = '3306' ,
> 'username' = 'root' ,
> 'password' = 'qaz123689' ,
> 'server-time-zone' = 'Asia/Shanghai' ,
> 'debezium. snapshot.mode' = 'initial ' ,
> 'database-name' = 'test' ,
> 'table-name' = 'tbl_users'
> );
[INFO] Execute statement succeed.
Flink SQL> select *from users_source_mysql;
# 在mysql数据库修改表tbl_users信息、删除表信息、新增记录,flinksql应可观察到数据变化
# flink sql创建hudi表并映射hive表
# 在hive中创建public数据库
hive> create database public ;
# 进入flinksql客户端,创建hudi时映射hive表,往hudi表中插入数据
[root@hadoopmaster bin]# ./sql-client.sh embedded
Flink SQL> CREATE TABLE nation_info(
> numeric_code int
> ,national_name string
> ,roman_spelling string
> ,alphabetic_code string
> ,primary key(numeric_code) not enforced
> )WITH (
> 'connector' = 'hudi',
> 'path' = 'hdfs:///user/flink/hudi/public/nation_info', --hudi表的hdfs存储路径
> 'table.type' = 'COPY_ON_WRITE', --写时复制模式
> 'write.bucket_assign.tasks' = '1',
> 'write.tasks' = '1',
> 'hive_sync.enable'= 'true', -- 开启自动同步hive
> 'hive_sync.mode'= 'hms', -- 自动同步hive模式,默认jdbc模式
> 'hive_sync.metastore.uris'= 'thrift://192.168.17.149:9083', -- hive metastore地址
> 'hive_sync.jdbc_url' = 'jdbc:hive2://192.168.17.149:10000', -- required, hiveServer地址
> 'hive_sync.table'= 'nation_info', -- hive 新建表名
> 'hive_sync.db'= 'public', -- hive 新建数据库名
> 'hive_sync.username'= 'hive', -- hive 用户名
> 'hive_sync.password'= 'qaz123689', -- hive 密码
> 'hive_sync.support_timestamp'= 'true' -- 兼容hive timestamp类型
> );
Flink SQL> insert into nation_info VALUES
> (101,'汉族测试','Han','HA'),
> (102,'蒙古族测试','Mongol','MG');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: ed3e34be09a0c71984d7dc26b33fdf54
Flink SQL>
# 进入hive数据库,验证映射表功能
hive> select *from public.nation_info where numeric_code in (101,102); --查询表是否有数据
OK
20220915171415714 20220915171415714_0_2 101 5f23eff2-16ba-499f-806d-6a0dd59777d5_0-1-0_20220915171415714.parquet 101 汉族测试 Han HA
20220915171415714 20220915171415714_0_3 102 5f23eff2-16ba-499f-806d-6a0dd59777d5_0-1-0_20220915171415714.parquet 102 蒙古族测试 Mongol MG
Time taken: 1.354 seconds, Fetched: 2 row(s)
hive> show create table public.nation_info ; --查看表结构
2022-09-14 15:59:49,151 INFO [55c9a8f2-d526-4057-aa73-0ebb073a1440 main] exec.ListSinkOperator (Operator.java:logStats(1038)) - RECORDS_OUT_INTERMEDIATE:0, RECORDS_OUT_OPERATOR_LIST_SINK_0:27,
CREATE EXTERNAL TABLE `public.nation_info`(
`_hoodie_commit_time` string COMMENT '',
`_hoodie_commit_seqno` string COMMENT '',
`_hoodie_record_key` string COMMENT '',
`_hoodie_partition_path` string COMMENT '',
`_hoodie_file_name` string COMMENT '',
`numeric_code` int COMMENT '',
`national_name` string COMMENT '',
`roman_spelling` string COMMENT '',
`alphabetic_code` string COMMENT '')
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'hoodie.query.as.ro.table'='false',
'path'='hdfs:///user/flink/hudi/public/nation_info')
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://192.168.17.149:9000/user/flink/hudi/public/nation_info'
TBLPROPERTIES (
'last_commit_time_sync'='20220913210502766',
'spark.sql.sources.provider'='hudi',
'spark.sql.sources.schema.numParts'='1',
'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"numeric_code","type":"integer","nullable":false,"metadata":{}},{"name":"national_name","type":"string","nullable":true,"metadata":{}},{"name":"roman_spelling","type":"string","nullable":true,"metadata":{}},{"name":"alphabetic_code","type":"string","nullable":true,"metadata":{}}]}',
'transient_lastDdlTime'='1662317806')
Time taken: 1.366 seconds, Fetched: 27 row(s)
# 查看hudi表的hdfs文件
[root@hadoopmaster etc]# hadoop fs -ls hdfs:///user/flink/hudi/public/nation_info
Found 3 items
drwxr-xr-x - root supergroup 0 2022-09-05 02:56 hdfs:///user/flink/hudi/public/nation_info/.hoodie
-rw-r--r-- 3 root supergroup 96 2022-09-05 02:56 hdfs:///user/flink/hudi/public/nation_info/.hoodie_partition_metadata
-rw-r--r-- 3 root supergroup 436145 2022-09-05 02:56 hdfs:///user/flink/hudi/public/nation_info/67a6d28e-2804-4039-8e75-007e54903a2a_0-1-0_20220905025642928.parquet
[root@hadoopmaster sbin]# hadoop fs -ls hdfs:///user/flink/hudi/public/nation_info
Found 6 items
drwxr-xr-x - root supergroup 0 2022-09-15 17:14 hdfs:///user/flink/hudi/public/nation_info/.hoodie
-rw-r--r-- 3 root supergroup 96 2022-09-15 14:52 hdfs:///user/flink/hudi/public/nation_info/.hoodie_partition_metadata
-rw-r--r-- 3 root supergroup 434972 2022-09-15 14:52 hdfs:///user/flink/hudi/public/nation_info/5f23eff2-16ba-499f-806d-6a0dd59777d5_0-1-0_20220915145209326.parquet
-rw-r--r-- 3 root supergroup 435119 2022-09-15 17:10 hdfs:///user/flink/hudi/public/nation_info/5f23eff2-16ba-499f-806d-6a0dd59777d5_0-1-0_20220915170959370.parquet
-rw-r--r-- 3 root supergroup 435117 2022-09-15 17:11 hdfs:///user/flink/hudi/public/nation_info/5f23eff2-16ba-499f-806d-6a0dd59777d5_0-1-0_20220915171157486.parquet
-rw-r--r-- 3 root supergroup 435117 2022-09-15 17:14 hdfs:///user/flink/hudi/public/nation_info/5f23eff2-16ba-499f-806d-6a0dd59777d5_0-1-0_20220915171415714.parquet
[root@hadoopmaster sbin]#
# 其他
- 无
← flink 安装