paimon学习笔记

官方文档:https://paimon.apache.org/docs/1.0/concepts/overview/

1752237205404.png

paimon catalog目前支持三种元数据存储,fs catalog:元数据和表数据存储在文件系统、hive catalog:元数据额外保存在hive metastore,支持hive访问表、jdbc catalog:元数据额外存储在mysql、postgresql等关系型数据库,前两者比较常用。paimon底层存储支持多种文件系统,例如本地文件系统,hadoop hdfs,阿里云oss,亚马逊s3等。

使用S3存储paimon表数据的话,需要下载依赖jar包放到flink下的lib目录,下载地址可以参考官方文档:https://paimon.apache.org/docs/1.0/maintenance/filesystems

测试中,我使用minio docker镜像搭建了S3对象存储服务,minio 与 Amazon S3 云存储服务 API 兼容。这里遇到2个问题,第一个是容器内部的时区问题,需要把时区调整到上海,可以通过添加环境变量TZ: Asia/Shanghai解决,第二个是s3.endpoint必须写ip,写hostname会超时连接不上。

-- 存储在hdfs
create catalog paimon_hdfs with (
  'type' = 'paimon',
  'warehouse' = 'hdfs://cluster1:8020/data/paimon/fs'
);

-- 存储在s3对象存储
create catalog paimon_s3 with (
  'type' = 'paimon',
  'warehouse' = 's3://data/paimon',
  's3.endpoint' = 'http://192.168.8.200:9000',
  's3.access-key' = 'xxx',
  's3.secret-key' = 'xxx'
);

使用hive catalog,需要下载依赖jar包放到flink下的lib目录,可以通过下面的链接下载,下面链接对应的hive版本是3.1.3,flink版本是1.20.2。

https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-3.1.3_2.12/1.20.2

遇到的问题:hive paimon的zstd-jni依赖冲突问题,导致使用hive查询paimon表时报错。paimon使用的zstd-jni版本为:1.5.5-11,hive3.1.3使用的zstd-jni版本为:1.3.2-2。使用下面第二个链接中的jar替换一下,查询报错问题就解决了。

https://github.com/apache/paimon/blob/release-1.0/pom.xml

https://mvnrepository.com/artifact/com.github.luben/zstd-jni/1.5.5-11

create catalog paimon_hive with (
  'type' = 'paimon',
  -- 指定metastore为hive
  'metastore' = 'hive',
  -- 不设置的话,默认使用hive配置: hive.metastore.warehouse.dir目录
  'warehouse' = 'hdfs://cluster1:8020/data/paimon/hive'
);
create catalog paimon_jdbc with (
  'type' = 'paimon',
  'metastore' = 'jdbc',
  'uri' = 'jdbc:postgresql://<host>:<port>/<databaseName>',
  'jdbc.user' = '...',
  'jdbc.password' = '...',
  'catalog-key' = 'jdbc',
  'warehouse' = 'hdfs://cluster1:8020/data/paimon/jdbc'
);

paimon支持下面的表类型:

  • 主键表
  • 非主键表/追加表
  • 视图表
  • 格式表:与hive兼容
  • 对象表:(暂时不知道啥样的)
  • 物化表
use catalog paimon_hdfs;
create database if not exists paimon_tutorial;
use paimon_tutorial;

create table user_table (
  user_id string primary key not enforced,
  name string,
  age int
) with (
  'bucket' = '1'
);

在batch模式下支持:DELETE、UPDATE、MERGE-INTO操作。

create table my_table (
  a int,
  b string
);

需要catalog metastore支持。测试hive catalog支持,fs catalog和jdbc catalog不支持。不支持的话,只能使用临时视图,只在当前session有效。

create view if not exists my_table_view
  (a, b)
as select a, b from my_table;

drop view if exists my_table_view;

show views;

show create view my_table_view;

需要catalog metastore支持,hive catalog支持,hdfs catalog不支持,jdbc catalog没去测试。hdfs上表中的数据和hive表一致。没有paimon表中的schema、snapshot等数据。支持csv、orc、parquet格式数据。

use catalog paimon_hive;
create database if not exists paimon_test;
use paimon_test;

create table my_csv_table (
  a int,
  b string
) with (
  'type' = 'format-table',
  'file.format' = 'csv',
  'field-delimiter' = ','
);

create table my_orc_table (
  a int,
  b string
) with (
  'type' = 'format-table',
  'file.format' = 'orc'
);

create table my_parquet_table (
  a int,
  b string
) with (
  'type' = 'format-table',
  'file.format' = 'parquet'
);

用于存储非结构化数据,例如,图片、视频、音乐等文件。

create table `my_object_table` with (
  'type' = 'object-table',
  'object-location' = 'hdfs://cluster1:8020/data/object'
);

-- 刷新对象表
call sys.refresh_object_table('default.my_object_table');

-- 查询对象表
select * from my_object_table;
select * from my_object_table /*+ OPTIONS('scan.snapshot-id' = '1') */;

对象表格式如下:

name type null key extras watermark
path STRING FALSE
parent_path STRING FALSE
name STRING FALSE
length BIGINT FALSE
mtime TIMESTAMP_LTZ(3) TRUE
atime TIMESTAMP_LTZ(3) TRUE
owner STRING TRUE
generation INT TRUE
content_type STRING TRUE
storage_class STRING TRUE
md5_hash STRING TRUE
metadata_mtime TIMESTAMP_LTZ(3) TRUE
metadata MAP<STRING, STRING> TRUE
create materialized table continuous_users_shops
partitioned by (ds)
freshness = interval '30' second
as select
  user_id,
  ds,
  sum (payment_amount_cents) as payed_buy_fee_sum,
  sum (1) as pv
from (
  select user_id, order_created_at as ds, payment_amount_cents
  from json_source
) as tmp
group by user_id, ds;

系统表包含两种,数据系统表和全局系统表。数据系统表包含每个Paimon数据表的元数据和信息,例如创建的快照和使用的选项。

通过快照表,可以看出表的数据发生变化的次数。

select * from my_table$snapshots;

通过查询schemas table可以知道,表的schema历史变更。

select * from my_table$schemas;

-- 查询snapshot_id为3的数据schema信息
select
  s.snapshot_id,
  t.schema_id,
  t.fields
from my_table$snapshots s
join my_table$schemas t
on s.schema_id = t.schema_id
where snapshot_id = 3;

表的配置信息

select * from my_table$options;

+I、-U、+U、-D

select * from my_table$audit_log;

有点类似上面的表,官方文档说,更新前更新后会合并到一行。

select * from pk_table$binlog;

读写方式类似追加表

select * from file_table$files /* OPTIONS('scan.snapshot-id'='3') */;
select * from my_table$tags;
select * from my_table$branches;

消费者id以及要消费的下一个snapshot id

select * from my_table$consumers;
select * from my_table$manifests;

后面学到聚合表,再看看这个。

select * from my_table$aggregation_fields;
select * from my_table$partitions;
select * from my_table$buckets;
select * from my_table$statistics;
select * from my_table$table_indexes;

hive catalog查询报错,另外两个查询正常。

use sys;
show tables;
select * from sys.all_table_options;
select * from sys.catalog_options;

定义表的时候可以使用主键,主键可以包含多个列,paimon强制在bucket中使用主键进行排序。

无分区表和分区表分区下的数据,会再分成桶进行保存。每个bucket目录都包含一个LSM Tree和其变更日志。

bucket分桶由数据中一个或多个列的hash值决定,可以使用bucket-key选项定义决定bucket分桶的列。如果没有定义,主键表默认使用主键,非主键表使用整条数据。

bucket是用于读写的最小存储单元,因此bucket的数量限制了最大的处理并行度。bucket数量不应该太多,因为它会导致大量小文件和低读取性能。每个bucket中的推荐数据大小约为200MB-1GB。

通过’bucket’ = ’n’设置分桶数(n > 0),根据Math.abs(key_hashcode % numBuckets)计算数据属于哪个桶。

重新调整桶数,只能通过离线进程完成。桶的数量过多会导致小文件过多,桶的数量过少会导致写入性能问题。

主键表默认使用动态分桶,或者通过’bucket’ = ‘-1’设置。

  • 选项1: ‘dynamic-bucket.target-row-num’: 一个bucket的数据条数
  • 选项2: ‘dynamic-bucket.initial-buckets’: 初始bucket数量

仅支持单个写入任务。

  • 普通动态分桶模式
    • 当表没有分区,或者主键包含所有的分区字段。动态分桶模式使用HASH索引来维护从键到桶的映射,它比固定分桶模式需要更多的内存。1亿条数据多占用1G内存。普通动态桶模式支持排序压缩以加快查询速度。
  • 跨分区动态分桶模式
    • 主键没有包含所有的分区键字段。
      1. Deduplicate: 删除旧分区数据,新数据插入到新分区。
      2. PartialUpdate & Aggregation: 新数据插入到旧分区。
      3. FirstRow: 存在旧数据,则忽略新数据。

Paimon采用LSM Tree (log-structured merge-tree)作为文件存储的数据结构。

MOR是主键表的默认模式。

优点:

  • 写入性能很好。 缺点:
  • 读取性能一般。
  • 一个LSM Tree只能由一个线程读取,读取并行度受限。
  • 当分桶数据量很大时,读取性能会非常低。bucket大小推荐200M-1G。
  • 由于合并过程,不能对非主键列执行基于过滤器的数据跳过。

可以通过下面配置表模式为COW。COW模式每次写入都会进行合并操作,所有数据都将被合并到最新。读取时,不需要合并,读取性能最高。但每次写入都需要完全合并,写入放大非常严重。

ALTER TABLE orders SET ('full-compaction.delta-commits' = '1');

优点:

  • 读取性能很好。 缺点:
  • 写入性能非常差。

可以通过下面配置表模式为MOW。由于Paimon的LSM结构,它能够通过主键进行查询。我们可以在写入时生成删除向量文件,表示文件中的哪些数据已被删除。这可以在读取过程中直接过滤掉不必要的行,这相当于合并,不会影响读取性能。

ALTER TABLE orders SET ('deletion-vectors.enabled' = 'true');

优点:

  • 读写性能都很好 缺点:
  • 数据延迟问题(没看懂,后续探索一下)

后续用到再探索

当paimon主键表,接收到多条相同主键的数据时。paimon会将多条数据合并成一条,以保证主键的唯一性。合并引擎就是合并数据的方式。

deduplicate是默认的合并引擎,paimon会保存主键最新的数据,丢弃相同主键的旧数据。

partial-update: 部分列更新,为null的列,会被后面到来的非null数据更新,非null的列,会被后面到来非null的值更新。可以设计序列组,实现部分列一起更新(全非null才更新)

create table normal_merge_pu_tbl (
  k int,
  a int,
  b int,
  c int,
primary key (k) not enforced
) with (
  'merge-engine' = 'partial-update'
);

insert into normal_merge_pu_tbl values (1, 23, 10, cast(null as int));
-- 查询时设置: set 'execution.runtime-mode' = 'batch';
select * from normal_merge_pu_tbl;
-- 1 | 23 | 10 | <NULL>
insert into normal_merge_pu_tbl values (1, cast(null as int), cast(null as int), 99);
select * from normal_merge_pu_tbl;
-- 1 | 23 | 10 | 99
insert into normal_merge_pu_tbl values (1, 25, 13, cast(null as int));
select * from normal_merge_pu_tbl;
-- 1 | 25 | 13 | 99