网站首页 > 技术文章 正文
基础环境:
- Ubuntu 22.04
- JDK 1.8
- MinIO RELEASE.2024-11-07T00-52-20Z
- Flink 1.20.0
- Paimon 0.9
Paimon 是什么
Paimon 是 Apache 软件基金会孵化的一个流式数据湖存储技术,它为用户提供了高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。Paimon 采用开放的数据格式和技术理念,可以与 Apache Flink、Spark、Trino 等业界主流计算引擎进行对接,共同推进 Streaming Lakehouse 架构的普及和发展。
环境准备 1:Flink-1.20.0 Standalone模式安装
参考之前的文章
Paimon 安装与简单使用
下载Paimon对应版本 jar及其他驱动
flink-connector-jdbc-3.2.0-1.19.jar
flink-s3-fs-hadoop-1.20.0.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
paimon-flink-1.20-0.9.0.jar
paimon-s3-0.9.0.jar
wget <https://repo1.maven.org/maven2/org/apache/paimon/paimon-s3/0.9.0/paimon-s3-0.9.0.jar>
wget <https://repo1.maven.org/maven2/org/apache/paimon/paimon-flink-1.20/0.9.0/paimon-flink-1.20-0.9.0.jar>
wget <https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar>
wget <https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar>
wget <https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.20.0/flink-s3-fs-hadoop-1.20.0.jar>
# 将 paimon所需的驱动包放置 Flink 的 lib 目录下
cp ./*.jar /mnt/flink/flink-1.20.0/lib/
启动Flink
./flink-1.20.0/bin/start-cluster.sh
使用 Paimon
# 启动FlinkSQL 客户端
./flink-1.20.0/bin/sql-client.sh embedded
# 创建 catalog 及 table(使用 MinIO 作为 FileSystem)
CREATE CATALOG my_catalog WITH (
'type' = 'paimon',
'warehouse' = 's3://warehouse/winter',
's3.endpoint'='http://MinIOIP:port',
's3.access-key' = 'xxxxx',
's3.secret-key' = 'xxxxxxxxxx'
);
USE CATALOG my_catalog;
# 创建 Paimon table
CREATE TABLE sink_orders (
id BIGINT,
num BIGINT,
price BIGINT,
status STRING,
PRIMARY KEY (dt, id, num) NOT ENFORCED
) PARTITIONED BY (dt)
WITH (
'bucket' = '1'
);
# 创建 kafka temporary table
CREATE temporary TABLE kafka_orders (
id BIGINT,
num BIGINT,
price BIGINT,
status STRING
) WITH (
'connector' = 'kafka',
'topic' = 'paimon',
'properties.bootstrap.servers' = 'ip:port',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
# paimon 在流模式下要求设置检查点 checkpoint 间隔时间
SET 'execution.checkpointing.interval' = '10 s';
# 将数据从 kafka temporary table中读取并写入到 sink_orders 中
INSERT INTO sink_orders
SELECT id,
num,
price
FROM kafka_orders
WHERE status = 'normal';
创建 JdbcCatalog
CREATE CATALOG my_jdbc WITH (
'type' = 'paimon',
'metastore' = 'jdbc',
'uri' = 'jdbc:mysql://ip:port/paimon',
'jdbc.user' = 'xxxx',
'jdbc.password' = 'xxxxxxxxx',
'catalog-key'='jdbc',
'warehouse' = 's3://paimonwarehouse/paimon',
's3.endpoint'='http://xxxxxx:xxxx',
's3.access-key' = 'xxxx',
's3.secret-key' = 'xxxxxxxxxxxx',
's3.region' = 'us-east-1'
);
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
insert into my_table values(1,1,'laugh','1','2');
自动初始化 catalog
在进入 SQL 客户端时,自动初始化 catalog,vim sql-client-init.sql 创建初始化脚本:
SET sql-client.execution.result-mode = 'tableau';
CREATE CATALOG my_jdbc WITH (
'type' = 'paimon',
'metastore' = 'jdbc',
'uri' = 'jdbc:mysql://ip:port/paimon',
'jdbc.user' = 'xxxx',
'jdbc.password' = 'xxxxxxxxx',
'catalog-key'='jdbc',
'warehouse' = 's3://paimonwarehouse/paimon',
's3.endpoint'='http://xxxxxx:xxxx',
's3.access-key' = 'xxxx',
's3.secret-key' = 'xxxxxxxxxxxx',
's3.region' = 'us-east-1'
);
use catalog my_jdbc;
Flink SQLClient 三种显示格式
1- 表格模式 SET sql-client.execution.result-mode=table;
2- 变更日志模式 SET sql-client.execution.result-mode=changelog
3- Tableau 模式 SET sql-client.execution.result-mode=tableau;
猜你喜欢
- 2025-01-09 精通Spring Boot 3 : 13. Spring Cloud 与 Spring Boot (4)
- 2025-01-09 Spring Boot集成Redis Search快速入门Demo
- 2025-01-09 Spring Boot 3.x嵌入MongoDB 进行测试
- 2025-01-09 java安全之fastjson链分析
- 2025-01-09 MyBatis初级实战之五:一对一关联查询
- 2025-01-09 精通Spring Boot 3 : 8. Spring Boot 测试 (2)
- 2025-01-09 DevSecOps 管道: 使用Jenkins实现安全的多语言应用程序
- 2025-01-09 Liquibase+Spring+Maven: 管理数据库轻松搞定
- 2025-01-09 比较一下JSON与XML两种数据格式?
- 2025-01-09 Java批量导入时,如何去除重复数据并返回结果?
- 02-21走进git时代, 你该怎么玩?_gits
- 02-21GitHub是什么?它可不仅仅是云中的Git版本控制器
- 02-21Git常用操作总结_git基本用法
- 02-21为什么互联网巨头使用Git而放弃SVN?(含核心命令与原理)
- 02-21Git 高级用法,喜欢就拿去用_git基本用法
- 02-21Git常用命令和Git团队使用规范指南
- 02-21总结几个常用的Git命令的使用方法
- 02-21Git工作原理和常用指令_git原理详解
- 最近发表
- 标签列表
-
- cmd/c (57)
- c++中::是什么意思 (57)
- sqlset (59)
- ps可以打开pdf格式吗 (58)
- phprequire_once (61)
- localstorage.removeitem (74)
- routermode (59)
- vector线程安全吗 (70)
- & (66)
- java (73)
- org.redisson (64)
- log.warn (60)
- cannotinstantiatethetype (62)
- js数组插入 (83)
- resttemplateokhttp (59)
- gormwherein (64)
- linux删除一个文件夹 (65)
- mac安装java (72)
- reader.onload (61)
- outofmemoryerror是什么意思 (64)
- flask文件上传 (63)
- eacces (67)
- 查看mysql是否启动 (70)
- java是值传递还是引用传递 (58)
- 无效的列索引 (74)