优秀的编程知识分享平台

网站首页 > 技术文章 正文

Paimon的安装与使用

nanyue 2025-01-09 15:09:01 技术文章 2 ℃

基础环境:

  • Ubuntu 22.04
  • JDK 1.8
  • MinIO RELEASE.2024-11-07T00-52-20Z
  • Flink 1.20.0
  • Paimon 0.9

Paimon 是什么

Paimon 是 Apache 软件基金会孵化的一个流式数据湖存储技术,它为用户提供了高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。Paimon 采用开放的数据格式和技术理念,可以与 Apache Flink、Spark、Trino 等业界主流计算引擎进行对接,共同推进 Streaming Lakehouse 架构的普及和发展。

环境准备 1:Flink-1.20.0 Standalone模式安装

参考之前的文章

Paimon 安装与简单使用

下载Paimon对应版本 jar及其他驱动

flink-connector-jdbc-3.2.0-1.19.jar
flink-s3-fs-hadoop-1.20.0.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
paimon-flink-1.20-0.9.0.jar
paimon-s3-0.9.0.jar

wget <https://repo1.maven.org/maven2/org/apache/paimon/paimon-s3/0.9.0/paimon-s3-0.9.0.jar>
wget <https://repo1.maven.org/maven2/org/apache/paimon/paimon-flink-1.20/0.9.0/paimon-flink-1.20-0.9.0.jar>
wget <https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar>
wget <https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar>
wget <https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.20.0/flink-s3-fs-hadoop-1.20.0.jar>

# 将 paimon所需的驱动包放置 Flink 的 lib 目录下
cp ./*.jar /mnt/flink/flink-1.20.0/lib/

启动Flink

./flink-1.20.0/bin/start-cluster.sh

使用 Paimon

# 启动FlinkSQL 客户端
./flink-1.20.0/bin/sql-client.sh embedded

# 创建 catalog 及 table(使用 MinIO 作为 FileSystem)
CREATE CATALOG my_catalog WITH (
    'type' = 'paimon',
    'warehouse' = 's3://warehouse/winter',
    's3.endpoint'='http://MinIOIP:port',
    's3.access-key' = 'xxxxx',
    's3.secret-key' = 'xxxxxxxxxx'
    );


USE CATALOG my_catalog;

# 创建 Paimon table
CREATE TABLE sink_orders (
  id BIGINT,
  num BIGINT,
  price BIGINT,
  status STRING,
	PRIMARY KEY (dt, id, num) NOT ENFORCED
) PARTITIONED BY (dt)
WITH (
  'bucket' = '1'
);

# 创建 kafka temporary table
CREATE temporary TABLE kafka_orders (
 id BIGINT,
 num BIGINT,
 price BIGINT,
 status STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'paimon',
  'properties.bootstrap.servers' = 'ip:port',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

# paimon 在流模式下要求设置检查点 checkpoint 间隔时间
SET 'execution.checkpointing.interval' = '10 s';

# 将数据从 kafka temporary table中读取并写入到 sink_orders 中
INSERT INTO sink_orders
SELECT id,
 num,
 price
FROM kafka_orders
WHERE status = 'normal';

创建 JdbcCatalog

CREATE CATALOG my_jdbc WITH (
    'type' = 'paimon',
    'metastore' = 'jdbc',
    'uri' = 'jdbc:mysql://ip:port/paimon',
    'jdbc.user' = 'xxxx', 
    'jdbc.password' = 'xxxxxxxxx', 
    'catalog-key'='jdbc',
    'warehouse' = 's3://paimonwarehouse/paimon',
    's3.endpoint'='http://xxxxxx:xxxx',
    's3.access-key' = 'xxxx',
    's3.secret-key' = 'xxxxxxxxxxxx',
    's3.region' = 'us-east-1'
);

CREATE TABLE my_table (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

insert into my_table values(1,1,'laugh','1','2');

自动初始化 catalog

在进入 SQL 客户端时,自动初始化 catalog,vim sql-client-init.sql 创建初始化脚本:

SET sql-client.execution.result-mode = 'tableau';

CREATE CATALOG my_jdbc WITH (
    'type' = 'paimon',
    'metastore' = 'jdbc',
    'uri' = 'jdbc:mysql://ip:port/paimon',
    'jdbc.user' = 'xxxx', 
    'jdbc.password' = 'xxxxxxxxx', 
    'catalog-key'='jdbc',
    'warehouse' = 's3://paimonwarehouse/paimon',
    's3.endpoint'='http://xxxxxx:xxxx',
    's3.access-key' = 'xxxx',
    's3.secret-key' = 'xxxxxxxxxxxx',
    's3.region' = 'us-east-1'
);

use catalog my_jdbc;

Flink SQLClient 三种显示格式

1- 表格模式 SET sql-client.execution.result-mode=table;

2- 变更日志模式 SET sql-client.execution.result-mode=changelog

3- Tableau 模式 SET sql-client.execution.result-mode=tableau;

Tags:

最近发表
标签列表