环保公司起名简介 一文晓畅基于Flink1.12构建流批一体数仓的技术点
环保公司起名简介 一文晓畅基于Flink1.12构建流批一体数仓的技术点

本文转载自微信公多号「大数据技术与数仓」,作者西贝。转载本文请有关大数据技术与数仓公多号。

基于Flink构建流批一体的实时数仓是现在数据仓库周围比较火的实践方案。随着Flink的赓续迭代,其挑供的一系列技术特性使得用户构建流批一体的行使变得越来越方便。本文将以Flink1.12为例,逐一介绍这些特性的基本操纵手段,主要包括以下内容:

Flink集成Hive Hive Catalog与Hive Dialect Flink读写Hive Flink upsert-kafka连接器 Flink CDC的connector Flink集成Hive

操纵Hive构建数据仓库已经成为了比较远大的一栽解决方案。现在,一些比较常见的大数据处理引擎,都无一破例兼容Hive。Flink从1.9最先声援集成Hive,不过1.9版本为beta版,不选举在生产环境中操纵。在Flink1.10版本中,标志着对 Blink的整相符宣告完善,对 Hive 的集成也达到了生产级别的请求。值得仔细的是,分歧版本的Flink对于Hive的集成有所迥异,本文将以最新的Flink1.12版本为例,阐述Flink集成Hive的浅易步骤,以下是全文,期待对你有所协助。

Flink集成Hive的基本手段

Flink 与 Hive 的集成主要表现在以下两个方面:

持久化元数据

Flink行使 Hive 的 MetaStore 行为持久化的 Catalog,吾们可始末HiveCatalog将分歧会话中的 Flink 元数据存储到 Hive Metastore 中。例如,吾们能够操纵HiveCatalog将其 Kafka的数据源外存储在 Hive Metastore 中,云云该外的元数据新闻会被持久化到Hive的MetaStore对答的元数据库中,在后续的 SQL 查询中,吾们能够重复操纵它们。

行使 Flink 来读写 Hive 的外。

Flink打通了与Hive的集成,如同操纵SparkSQL或者Impala操作Hive中的数据相通,吾们能够操纵Flink直接读写Hive中的外。

HiveCatalog的设计挑供了与 Hive 良益的兼容性,用户能够”开箱即用”的访问其已有的 Hive外。不必要修改现有的 Hive Metastore,也不必要更改外的数据位置或分区。

Flink集成Hive的步骤

Flink声援的Hive版本

大版本 V1 V2 V3 V4 V5 V6 V7 1.0 1.0.0 1.0.1           1.1 1.1.0 1.1.1           1.2 1.2.0 1.2.1 1.2.2         2.0 2.0.0 2.0.1           2.1 2.1.0 2.1.1           2.2 2.2.0             2.3 2.3.0 2.3.1 2.3.2 2.3.3 2.3.4 2.3.5 2.3.6 3.1 3.1.0 3.1.1 3.1.2    

值得仔细的是,对于分歧的Hive版本,能够在功能方面有所迥异,这些迥异取决于你操纵的Hive版本,而不取决于Flink,一些版本的功能迥异如下:

Hive 内置函数在操纵 Hive-1.2.0 及更高版本时声援。 列收敛,也就是 PRIMARY KEY 和 NOT NULL,在操纵 Hive-3.1.0 及更高版本时声援。 更改外的统计新闻,在操纵 Hive-1.2.0 及更高版本时声援。 DATE列统计新闻,在操纵 Hive-1.2.0 及更高版时声援。 操纵 Hive-2.0.x 版本时不声援写入 ORC 外。

倚赖项

本文以Flink1.12为例,集成的Hive版本为Hive2.3.4。集成Hive必要额外增补一些倚赖jar包,并将其安放在Flink装配现在录下的lib文件夹下,云云吾们才能始末 Table API 或 SQL Client 与 Hive 进走交互。

另外,Apache Hive 是基于 Hadoop 之上构建的, 因而还必要 Hadoop 的倚赖,配置益HADOOP_CLASSPATH即可。这一点专门主要,否则在操纵FlinkSQL Cli查询Hive中的外时,会报如下舛讹:

java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf 

配置HADOOP_CLASSPATH,必要在/etc/profile文件中配置如下的环境变量:

export HADOOP_CLASSPATH=`hadoop classpath` 

Flink官网挑供了两栽手段增补Hive的倚赖项。第一栽是操纵 Flink 挑供的 Hive Jar包(根据操纵的 Metastore 的版正本选择对答的 Hive jar),提出优先操纵Flink挑供的Hive jar包,这栽手段比较简片面便。本文操纵的就是此栽手段。自然,倘若你操纵的Hive版本与Flink挑供的Hive jar包兼容的版本纷歧致,你能够选择第二栽手段,即别增补每个所需的 jar 包。

下面列举了可用的jar包及其适用的Hive版本,吾们能够根据操纵的Hive版本,下载对答的jar包即可。比如本文操纵的Hive版本为Hive2.3.4,因而只必要下载flink-sql-connector-hive-2.3.6即可,并将其安放在Flink装配现在录的lib文件夹下。

Metastore version Maven dependency SQL Client JAR 1.0.0 ~ 1.2.2 flink-sql-connector-hive-1.2.2 Download 2.0.0 ~2.2.0 flink-sql-connector-hive-2.2.0 Download 2.3.0 ~2.3.6 flink-sql-connector-hive-2.3.6 Download 3.0.0 ~ 3.1.2 flink-sql-connector-hive-3.1.2 Download

上面列举的jar包,是吾们在操纵Flink SQL Cli所必要的jar包,除此之外,根据分歧的Hive版本,还必要增补如下jar包。以Hive2.3.4为例,除了上面的一个jar包之外,还必要增补下面两个jar包:

flink-connector-hive_2.11-1.12.0.jar和hive-exec-2.3.4.jar。其中hive-exec-2.3.4.jar包存在于Hive装配路径下的lib文件夹。flink-connector-hive_2.11-1.12.0.jar的下载地址为:

https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.12.0/ 

NOTE:black_nib::Flink1.12集成Hive只必要增补如下三个jar包,以Hive2.3.4为例,别离为:

flink-sql-connector-hive-2.3.6

flink-connector-hive_2.11-1.12.0.jar

hive-exec-2.3.4.jar

Flink SQL Cli集成Hive

将上面的三个jar包增补至Flink的lib现在录下之后,就能够操纵Flink操作Hive的数据外了。以FlinkSQL Cli为例:

配置sql-client-defaults.yaml

该文件时Flink SQL Cli启动时操纵的配置文件,该文件位于Flink装配现在录的conf/文件夹下,详细的配置如下,主要是配置catalog:

除了上面的一些配置参数,Flink还挑供了下面的一些其他配置参数:

参数 必选 默认值 类型 描述 type 是 (无) String Catalog 的类型。创建 HiveCatalog 时,该参数必须竖立为'hive'。 name 是 (无) String Catalog 的名字。仅在操纵 YAML file 时必要指定。 hive-conf-dir 否 (无) String 指向包含 hive-site.xml 现在录的 URI。该 URI 必须是 Hadoop 文件编制所声援的类型。倘若指定一个相对 URI,即不包含 scheme,则默认为本地文件编制。倘若该参数异国指定,吾们会在 class path 下查找hive-site.xml。 default-database 否 default String 当一个catalog被设为现在catalog时,所操纵的默认现在database。 hive-version 否 (无) String HiveCatalog 能够自动检测操纵的 Hive 版本。吾们提出不要手动竖立 Hive 版本,除非自动检测机制战败。 hadoop-conf-dir 否 (无) String Hadoop 配置文件现在录的路径。现在仅声援本地文件编制路径。吾们选举操纵 HADOOP_CONF_DIR 环境变量来指定 Hadoop 配置。因此仅在环境变量不悦足您的需求时再考虑操纵该参数,例如当您期待为每个 HiveCatalog 单独竖立 Hadoop 配置时。

操作Hive中的外

最先启动FlinkSQL Cli,命令如下:

./bin/sql-client.sh embedded 

接下来,吾们能够查望注册的catalog

Flink SQL> show catalogs; default_catalog myhive 

操纵注册的myhive catalog

Flink SQL> use catalog myhive; 

倘若Hive中有一张users外,在Hive中查询该外:

hive (default)> select * from users; OK users.id        users.mame 1       jack 2       tom 3       robin 4       haha 5       haha 

查望对答的数据库外,吾们能够望到Hive中已经存在的外,云云就能够操纵FlinkSQL操作Hive中的外,比如查询,写入数据。

Flink SQL> show tables; Flink SQL> select * from users; 

向Hive外users中插入一条数据:

Flink SQL> insert into users select 6,'bob'; 

再次操纵Hive客户端往查询该外的数据,会发现写入了一条数据。

接下来,吾们再在FlinkSQL Cli中创建一张kafka的数据源外:

CREATE TABLE user_behavior (      `user_id` BIGINT, -- 用户id     `item_id` BIGINT, -- 商品id     `cat_id` BIGINT, -- 品类id     `action` STRING, -- 用户走为     `province` INT, -- 用户所在的省份     `ts` BIGINT, -- 用户走为发生的时间戳     `proctime` AS PROCTIME(), -- 始末计算列产生一个处理时间列     `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间      WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark  ) WITH (      'connector' = 'kafka', -- 操纵 kafka connector     'topic' = 'user_behavior', -- kafka主题     'scan.startup.mode' = 'earliest-offset', -- 偏移量     'properties.group.id' = 'group1', -- 消耗者组     'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',      'format' = 'json', -- 数据源格式为json     'json.fail-on-missing-field' = 'true',     'json.ignore-parse-errors' = 'false' ); 

查望外组织

Flink SQL> DESCRIBE user_behavior; 

吾们能够在Hive的客户端中实走下面命令查望刚刚在Flink SQLCli中创建的外

hive (default)> desc formatted  user_behavior; # Detailed Table Information              Database:               default                   Owner:                  null                      CreateTime:             Sun Dec 20 16:04:59 CST 2020      LastAccessTime:         UNKNOWN                   Retention:              0                         Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/user_behavior    Table Type:             MANAGED_TABLE             Table Parameters:                         flink.connector         kafka                        flink.format            json                         flink.json.fail-on-missing-field        true                         flink.json.ignore-parse-errors  false                        flink.properties.bootstrap.servers      kms-2:9092,kms-3:9092,kms-4:9092         flink.properties.group.id       group1                       flink.scan.startup.mode earliest-offset              flink.schema.0.data-type        BIGINT                       flink.schema.0.name     user_id                      flink.schema.1.data-type        BIGINT                       flink.schema.1.name     item_id                      flink.schema.2.data-type        BIGINT                       flink.schema.2.name     cat_id                       flink.schema.3.data-type        VARCHAR(2147483647)          flink.schema.3.name     action                       flink.schema.4.data-type        INT                          flink.schema.4.name     province                     flink.schema.5.data-type        BIGINT                       flink.schema.5.name     ts                           flink.schema.6.data-type        TIMESTAMP(3) NOT NULL         flink.schema.6.expr     PROCTIME()                   flink.schema.6.name     proctime                     flink.schema.7.data-type        TIMESTAMP(3)                 flink.schema.7.expr     TO_TIMESTAMP(FROM_UNIXTIME(`ts`, 'yyyy-MM-dd HH:mm:ss'))         flink.schema.7.name     eventTime                    flink.schema.watermark.0.rowtime        eventTime                    flink.schema.watermark.0.strategy.data-type     TIMESTAMP(3)                 flink.schema.watermark.0.strategy.expr  `eventTime` - INTERVAL '5' SECOND         flink.topic             user_behavior                is_generic              true                         transient_lastDdlTime   1608451499                             # Storage Information             SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe        InputFormat:            org.apache.hadoop.mapred.TextInputFormat          OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat    Compressed:             No                        Num Buckets:            -1                        Bucket Columns:         []                        Sort Columns:           []                        Storage Desc Params:                      serialization.format    1              

NOTE:black_flag::在Flink中创建一张外,会把该外的元数据新闻持久化到Hive的metastore中,吾们能够在Hive的metastore中查望该外的元数据新闻

进入Hive的元数据新闻库环保公司起名简介,本文操纵的是MySQL。实走下面的命令:

SELECT      a.tbl_id, -- 外id     from_unixtime(create_time) AS create_time, -- 创建时间     a.db_id, -- 数据库id     b.name AS db_name, -- 数据库名称     a.tbl_name -- 外名称 FROM TBLS AS a LEFT JOIN DBS AS b ON a.db_id =b.db_id WHERE a.tbl_name = "user_behavior"; 

操纵代码连接到 Hive

maven倚赖

<!-- Flink Dependency --> <dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-connector-hive_2.11</artifactId>   <version>1.12.0</version> </dependency> <dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-table-api-java-bridge_2.11</artifactId>   <version>1.12.0</version> </dependency> <!-- Hive Dependency --> <dependency>     <groupId>org.apache.hive</groupId>     <artifactId>hive-exec</artifactId>     <version>2.3.4</version> </dependency

代码

public class HiveIntegrationDemo {     public static void main(String[] args) {         EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();         TableEnvironment tableEnv = TableEnvironment.create(settings);          String name            = "myhive";         String defaultDatabase = "default";         String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";                  HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);         tableEnv.registerCatalog("myhive", hive);         // 操纵注册的catalog         tableEnv.useCatalog("myhive");         // 向Hive外中写入一条数据          String insertSQL = "insert into users select 10,'lihua'";          TableResult result2 = tableEnv.executeSql(insertSQL);         System.out.println(result2.getJobClient().get().getJobStatus());      } } 

挑交程序,不悦目察Hive外的转折:

bin/flink run -m kms-1:8081 \ -c com.flink.sql.hiveintegration.HiveIntegrationDemo \ ./original-study-flink-sql-1.0-SNAPSHOT.jar 
Hive Catalog与Hive Dialect 什么是Hive Catalog

吾们清新,Hive操纵Hive Metastore(HMS)存储元数据新闻,操纵有关型数据库来持久化存储这些新闻。因而,Flink集成Hive必要打通Hive的metastore,往管理Flink的元数据,这就是Hive Catalog的功能。

Hive Catalog的主要作用是操纵Hive MetaStore往管理Flink的元数据。Hive Catalog能够将元数据进走持久化,云云后续的操作就能够逆复操纵这些外的元数据,而不必每次操纵时都要重新注册。倘若不往持久化catalog,那么在每个session中取处理数据,都要往重复地创建元数据对象,云云是专门耗时的。

如何操纵Hive Catalog

HiveCatalog是开箱即用的,因而,一旦配置益Flink与Hive集成,就能够操纵HiveCatalog。比如,吾们始末FlinkSQL 的DDL语句创建一张kafka的数据源外,立刻就能查望该外的元数据新闻。

HiveCatalog能够处理两栽类型的外:一栽是Hive兼容的外,另一栽是清淡外(generic table)。其中Hive兼容外是以兼容Hive的手段来存储的,因而,对于Hive兼容外而言,吾们既能够操纵Flink往操作该外,又能够操纵Hive往操作该外。

清淡外是对Flink而言的,当操纵HiveCatalog创建一张清淡外,仅仅是操纵Hive MetaStore将其元数据进走了持久化,因而能够始末Hive查望这些外的元数据新闻(始末DESCRIBE FORMATTED命令),但是不及始末Hive往处理这些外,由于语法不兼容。

对于是否是清淡外,Flink操纵is_generic属性进走标识。默认情况下,创建的外是清淡外,即is_generic=true,倘若要创建Hive兼容外,必要在建外属性中指定is_generic=false。

尖叫挑示:

由于倚赖Hive Metastore,因而必须开启Hive MetaStore服务

代码中操纵Hive Catalog
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();        TableEnvironment tableEnv = TableEnvironment.create(settings);         String name            = "myhive";        String defaultDatabase = "default";        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";         HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);        tableEnv.registerCatalog("myhive", hive);        // 操纵注册的catalog        tableEnv.useCatalog("myhive"); 
Flink SQLCli中操纵Hive Catalog

在FlinkSQL Cli中操纵Hive Catalog很浅易,只必要配置一下sql-cli-defaults.yaml文件即可。配置内容如下:

catalogs:    - name: myhive      type: hive      default-database: default      hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf 

在FlinkSQL Cli中创建一张kafka外,该外默认为清淡外,即is_generic=true

CREATE TABLE user_behavior (      `user_id` BIGINT, -- 用户id     `item_id` BIGINT, -- 商品id     `cat_id` BIGINT, -- 品类id     `action` STRING, -- 用户走为     `province` INT, -- 用户所在的省份     `ts` BIGINT, -- 用户走为发生的时间戳     `proctime` AS PROCTIME(), -- 始末计算列产生一个处理时间列     `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间      WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark  ) WITH (      'connector' = 'kafka', -- 操纵 kafka connector     'topic' = 'user_behavior', -- kafka主题     'scan.startup.mode' = 'earliest-offset', -- 偏移量     'properties.group.id' = 'group1', -- 消耗者组     'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',      'format' = 'json', -- 数据源格式为json     'json.fail-on-missing-field' = 'true',     'json.ignore-parse-errors' = 'false' ); 

吾们能够在Hive客户端中查望该外的元数据新闻

hive (default)> desc formatted  user_behavior; Table Parameters:                        ...         is_generic              true                       ...          

从上面的元数据新闻能够望出,is_generic=true,表明该外是一张清淡外,倘若在Hive中往查望该外,则会报错。

上面创建的外是清淡外,该外不及操纵Hive往查询。那么,该如何创建一张Hive兼容外呢?吾们只必要在建外的属性中表现指定is_generic=false即可,详细如下:

CREATE TABLE hive_compatible_tbl (      `user_id` BIGINT, -- 用户id     `item_id` BIGINT, -- 商品id     `cat_id` BIGINT, -- 品类id     `action` STRING, -- 用户走为     `province` INT, -- 用户所在的省份     `ts` BIGINT -- 用户走为发生的时间戳  ) WITH (      'connector' = 'kafka', -- 操纵 kafka connector     'topic' = 'user_behavior', -- kafka主题     'scan.startup.mode' = 'earliest-offset', -- 偏移量     'properties.group.id' = 'group1', -- 消耗者组     'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',      'format' = 'json', -- 数据源格式为json     'json.fail-on-missing-field' = 'true',     'json.ignore-parse-errors' = 'false',     'is_generic' = 'false' ); 

当吾们在Hive中查望该外的元数据新闻时,能够望出:is_generic =false

hive (default)> desc formatted hive_compatible_tbl; Table Parameters:                         ...                    is_generic              false                        ... 

吾们能够操纵FlinkSQL Cli或者HiveCli向该外中写入数据,然后别离始末FlinkSQL Cli和Hive Cli往查望该外数据的转折

hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486; hive (default)> select * from hive_compatible_tbl; 

再在FlinkSQL Cli中查望该外,

Flink SQL> select user_id,item_id,action from hive_compatible_tbl;                    user_id                   item_id                    action                       2020                      1221                       buy      

同样,吾们能够在FlinkSQL Cli中往向该外中写入数据:

Flink SQL>  insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486; Flink SQL> select user_id,item_id,action from hive_compatible_tbl;                     user_id                   item_id                    action                       2020                      1221                       buy                       2020                      1222                       fav 

尖叫挑示:

对于Hive兼容的外,必要仔细数据类型,详细的数据类型对答有关以及仔细点如下

Flink 数据类型 Hive 数据类型 CHAR(p) CHAR(p) VARCHAR(p) VARCHAR(p) STRING STRING BOOLEAN BOOLEAN TINYINT TINYINT SMALLINT SMALLINT INT INT BIGINT LONG FLOAT FLOAT DOUBLE DOUBLE DECIMAL(p, s) DECIMAL(p, s) DATE DATE TIMESTAMP(9) TIMESTAMP BYTES BINARY ARRAY LIST MAP<K, V> MAP<K, V> ROW STRUCT

仔细:

Hive CHAR(p) 类型的最大长度为255 Hive VARCHAR(p)类型的最大长度为65535 Hive MAP类型的key仅声援基本类型,而Flink’s MAP 类型的key实走肆意类型 Hive不声援说相符数据类型,比如STRUCT Hive’s TIMESTAMP 的精度是 9 , Hive UDFs函数只能处理 precision <= 9的 TIMESTAMP 值 Hive 不声援 Flink挑供的 TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及MULTISET类型 FlinkINTERVAL 类型与 Hive INTERVAL 类型纷歧样 上面介绍了清淡外和Hive兼容外,那么吾们该如何操纵Hive的语法进走建外呢?这个时候就必要操纵Hive Dialect。 什么是Hive Dialect

从Flink1.11.0最先,只要开启了Hive dialect配置,用户就能够操纵HiveQL语法,云云吾们就能够在Flink中操纵Hive的语法操纵一些DDL和DML操作。

Flink现在声援两栽SQL方言(SQL dialects),别离为:default和hive。默认的SQL方言是default,倘若要操纵Hive的语法,必要将SQL方言切换到hive。

如何操纵Hive Dialect

在SQL Cli中操纵Hive dialect

操纵hive dialect只必要配置一个参数即可,该参数名称为:table.sql-dialect。吾们就能够在sql-client-defaults.yaml配置文件中进走配置,也能够在详细的会话窗口中进走设定,对于SQL dialect的切换,不必要进走重启session。

execution:   planner: blink   type: batch   result-mode: table  configuration:   table.sql-dialect: hive 

倘若吾们必要在SQL Cli中进走切换hive dialect,能够操纵如下命令:

Flink SQL> set table.sql-dialect=hive; -- 操纵hive dialect Flink SQL> set table.sql-dialect=default; -- 操纵default dialect 

尖叫挑示:

一旦切换到了hive dialect,就只能操纵Hive的语法建外,倘若尝试操纵Flink的语法建外,则会报错

在Table API中协调dialect

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build(); TableEnvironment tableEnv = TableEnvironment.create(settings); // 操纵hive dialect tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); // 操纵 default dialect tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); 

操作示例

Flink SQL> set table.sql-dialect=hive; -- 操纵Hive语法创建一张外 CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` (   `id` int COMMENT 'id',   `name` string COMMENT '名称',   `age` int COMMENT '年龄'  ) COMMENT 'hive dialect外测试' ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; 

进入Hive客户端往查望该外的元数据新闻

desc formatted hive_dialect_tbl; col_name        data_type       comment # col_name              data_type               comment                                id                      int                                          name                    string                                       age                     int                                                            # Detailed Table Information              Database:               default                   Owner:                  null                      CreateTime:             Mon Dec 21 17:23:48 CST 2020      LastAccessTime:         UNKNOWN                   Retention:              0                         Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl         Table Type:             MANAGED_TABLE             Table Parameters:                         comment                 hive dialect外测试              is_generic              false                        transient_lastDdlTime   1608542628                             # Storage Information             SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe        InputFormat:            org.apache.hadoop.mapred.TextInputFormat          OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat    Compressed:             No                        Num Buckets:            -1                        Bucket Columns:         []                        Sort Columns:           []                        Storage Desc Params:                      field.delim             ,                            serialization.format    ,                    

很清晰,该外是一张Hive兼容外,即is_generic=false。

操纵FlinkSQLCli向该外中写入一条数据:

Flink SQL> insert into hive_dialect_tbl select 1,'tom',20; 

吾们也能够在Hive的Cli中往操作该外

hive (default)> select * from hive_dialect_tbl; hive (default)> insert into hive_dialect_tbl select 2,'jack',22; 

以下是操纵Hive方言的一些仔细事项。

Hive dialect只能用于操作Hive外,不及用于清淡外。Hive方言答与HiveCatalog一首操纵。 固然一切Hive版本都声援相通的语法,但是是否有特定功能照样取决于操纵的Hive版本。例如,仅在Hive-2.4.0或更高版本中声援更新数据库位置。 Hive和Calcite具有分歧的保留关键字。例如,default在Calcite中是保留关键字,在Hive中是非保留关键字。因而,在操纵Hive dialect时,必须操纵逆引号(`)引用此类关键字,才能将其用作标识符。 在Hive中不及查询在Flink中创建的视图。

自然,一旦开启了Hive dialect,吾们就能够遵命Hive的操作手段在Flink中往处理Hive的数据了,详细的操作与Hive相反,本文不再赘述。

Flink读写Hive Flink写入Hive外

Flink声援以**批处理(Batch)和流处理(Streaming)**的手段写入Hive外。当以批处理的手段写入Hive外时,只有当写入作业终结时,才能够望到写入的数据。批处理的手段写入声援append模式和overwrite模式。

批处理模式写入环保公司起名简介

向非分区外写入数据

Flink SQL> use catalog myhive; -- 操纵catalog Flink SQL> INSERT INTO users SELECT 2,'tom'; Flink SQL> set execution.type=batch; -- 操纵批处理模式 Flink SQL> INSERT OVERWRITE users SELECT 2,'tom'; 

向分区外写入数据

-- 向静态分区外写入数据 Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25; -- 向动态分区外写入数据 Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08'; 

流处理模式写入

流式写入Hive外,不声援**Insert overwrite **手段,否则报如下舛讹:

[ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: Streaming mode not support overwrite. 

下面的示例是将kafka的数据流式写入Hive的分区外

-- 操纵流处理模式 Flink SQL> set execution.type=streaming; -- 操纵Hive方言 Flink SQL> SET table.sql-dialect=hive;  -- 创建一张Hive分区外 CREATE TABLE user_behavior_hive_tbl (    `user_id` BIGINT, -- 用户id     `item_id` BIGINT, -- 商品id     `cat_id` BIGINT, -- 品类id     `action` STRING, -- 用户走为     `province` INT, -- 用户所在的省份     `ts` BIGINT -- 用户走为发生的时间戳 ) PARTITIONED BY (dt STRING,hr STRING,mi STRING) STORED AS parquet  TBLPROPERTIES (   'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',   'sink.partition-commit.trigger'='partition-time',   'sink.partition-commit.delay'='0S',   'sink.partition-commit.policy.kind'='metastore,success-file' );  -- 操纵默认SQL方言 Flink SQL> SET table.sql-dialect=default;  -- 创建一张kafka数据源外 CREATE TABLE user_behavior (      `user_id` BIGINT, -- 用户id     `item_id` BIGINT, -- 商品id     `cat_id` BIGINT, -- 品类id     `action` STRING, -- 用户走为     `province` INT, -- 用户所在的省份     `ts` BIGINT, -- 用户走为发生的时间戳     `proctime` AS PROCTIME(), -- 始末计算列产生一个处理时间列     `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间      WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark  ) WITH (      'connector' = 'kafka', -- 操纵 kafka connector     'topic' = 'user_behaviors', -- kafka主题     'scan.startup.mode' = 'earliest-offset', -- 偏移量     'properties.group.id' = 'group1', -- 消耗者组     'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',      'format' = 'json', -- 数据源格式为json     'json.fail-on-missing-field' = 'true',     'json.ignore-parse-errors' = 'false' ); 

关于Hive外的一些属性注释:

partition.time-extractor.timestamp-pattern 默认值:(none) 注释:分区时间抽取器,与 DDL 中的分区字段保持相反,倘若是按先天区,则能够是**year-day

倘若是按天时进走分区,则该属性值为:dt $hour:00:00`;

sink.partition-commit.trigger process-time:不必要时间挑取器和水位线,当现在时间大于分区创建时间 + sink.partition-commit.delay 中定义的时间,挑交分区; partition-time:必要 Source 外中定义 watermark,当 watermark > 挑取到的分区时间 +sink.partition-commit.delay 中定义的时间,挑交分区; 默认值:process-time 注释:分区触发器类型,可选 process-time 或partition-time。

sink.partition-commit.delay

默认值:0S 注释:分区挑交的延往往间,倘若是按先天区,则该属性的值为:1d,倘若是按幼时分区,则该属性值为1h;

sink.partition-commit.policy.kind

metastore:增补分区的元数据新闻,仅Hive外声援该值配置 success-file:在外的存储路径下增补一个_SUCCESS文件 默认值:(none) 注释:挑交分区的策略,用于告诉下游的行使该分区已经完善了写入,也就是说该分区的数据能够被访问读取。可选的值如下: 能够同时配置上面的两个值,比如metastore,success-file

实走流式写入Hive外

-- streaming sql,将数据写入Hive外 INSERT INTO user_behavior_hive_tbl  SELECT      user_id,     item_id,     cat_id,     action,     province,     ts,     FROM_UNIXTIME(ts, 'yyyy-MM-dd'),     FROM_UNIXTIME(ts, 'HH'),     FROM_UNIXTIME(ts, 'mm') FROM user_behavior;  -- batch sql,查询Hive外的分区数据 SELECT * FROM user_behavior_hive_tbl WHERE dt='2021-01-04' AND  hr='16' AND mi = '46'; 

同时查望Hive外的分区数据:

尖叫挑示:

1.Flink读取Hive外默认操纵的是batch模式,倘若要操纵流式读取Hive外,必要而外指定一些参数,见下文。

2.只有在完善 Checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,同时生成_SUCCESS文件,因而,Flink流式写入Hive外必要开启并配置 Checkpoint。对于Flink SQL Client而言,必要在flink-conf.yaml中开启CheckPoint,配置内容为:

state.backend: filesystem execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION execution.checkpointing.interval: 60s execution.checkpointing.mode: EXACTLY_ONCE state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints

Flink读取Hive外

Flink声援以**批处理(Batch)和流处理(Streaming)**的手段读取Hive中的外。批处理的手段与Hive的自己查询相通,即只在挑交查询的时刻查询一次Hive外。流处理的手段将会赓续地监控Hive外,并且会添量地挑取新的数据。默认情况下,Flink是以批处理的手段读取Hive外。

关于流式读取Hive外,Flink既声援分区外又声援非分区外。对于分区外而言,Flink将会监控新产生的分区数据,并以添量的手段读取这些数据。对于非分区外,Flink会监控Hive外存储路径文件夹内里的新文件,并以添量的手段读取新的数据。

Flink读取Hive外能够配置一下参数:

streaming-source.enable 默认值:false 注释:是否开启流式读取 Hive 外,默认不开启。 streaming-source.partition.include 默认值:all 注释:配置读取Hive的分区,包括两栽手段:all和latest。all意味着读取一切分区的数据,latest外示只读取最新的分区数据。值得仔细的是,latest手段只能用于开启了流式读取Hive外,并用于维外JOIN的场景。 streaming-source.monitor-interval 默认值:None 注释:赓续监控Hive外分区或者文件的时间阻隔。值得仔细的是,当以流的手段读取Hive外时,该参数的默认值是1m,即1分钟。当temporal join时,默认的值是60m,即1幼时。另外,该参数配置不宜过短 ,最短是1 个幼时,由于现在的实现是每个 task 都会查询 metastore,高频的查能够会对metastore 产生过大的压力。 streaming-source.partition-order 默认值:partition-name 注释:streaming source的分区挨次。默认的是partition-name,外示操纵默认分区名称挨次添载最新分区,也是选举操纵的手段。除此之外还有两栽手段,别离为:create-time和partition-time。其中create-time外示操纵分区文件创建时间挨次。partition-time外示操纵分区时间挨次。指的仔细的是,对于非分区外,该参数的默认值为:create-time。 streaming-source.consume-start-offset 默认值:None 注释:流式读取Hive外的首首偏移量。 partition.time-extractor.kind 默认值:default 分区时间挑取器类型。用于从分区中挑取时间,声援default和自定义。倘若操纵default,则必要始末参数partition.time-extractor.timestamp-pattern配置时间戳挑取的正则外达式。

在 SQL Client 中必要表现地开启 SQL Hint 功能

Flink SQL> set table.dynamic-table-options.enabled= true;   

操纵SQLHint流式查询Hive外

SELECT * FROM user_behavior_hive_tbl /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2021-01-03') */; 
Hive维外JOIN

Flink 1.12 声援了 Hive 最新的分区行为时态外的功能,能够始末 SQL 的手段直接有关 Hive 分区外的最新分区,并且会自动监听最新的 Hive 分区,当监控到新的分区后,会自动地做维外数据的全量替换。

Flink声援的是processing-time的temporal join,也就是说总是与最新版本的时态外进走JOIN。另外,Flink既声援非分区外的temporal join,又声援分区外的temporal join。对于分区外而言,Flink会监听Hive外的最新分区数据。值得仔细的是,Flink尚不声援 event-time temporal join。

Temporal Join最新分区

对于一张随着时间转折的Hive分区外,Flink能够读取该外的数据行为一个无界流。倘若Hive分区外的每个分区都包含全量的数据,那么每个分区将做为一个时态外的版本数据,即将最新的分区数据行为一个全量维外数据。值得仔细的是,该功能特点仅声援Flink的STREAMING模式。

操纵 Hive 最新分区行为 Tempmoral table 之前,必要竖立必要的两个参数:

'streaming-source.enable' = 'true',   'streaming-source.partition.include' = 'latest' 

除此之外还有一些其他的参数,关于参数的注释见上面的分析。吾们在操纵Hive维外的时候,既能够在创建Hive外时指定详细的参数,也能够操纵SQL Hint的手段动态指定参数。一个Hive维外的创建模板如下:

-- 操纵Hive的sql方言 SET table.sql-dialect=hive; CREATE TABLE dimension_table (   product_id STRING,   product_name STRING,   unit_price DECIMAL(10, 4),   pv_count BIGINT,   like_count BIGINT,   comment_count BIGINT,   update_time TIMESTAMP(3),   update_user STRING,   ... ) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (   -- 手段1:遵命分区名排序来识别最新分区(选举操纵该栽手段)   'streaming-source.enable' = 'true', -- 开启Streaming source   'streaming-source.partition.include' = 'latest',-- 选择最新分区   'streaming-source.monitor-interval' = '12 h',-- 每12幼时添载一次最新分区数据   'streaming-source.partition-order' = 'partition-name',  -- 遵命分区名排序    -- 手段2:分区文件的创建时间排序来识别最新分区   'streaming-source.enable' = 'true',   'streaming-source.partition.include' = 'latest',   'streaming-source.partition-order' = 'create-time',-- 分区文件的创建时间排序   'streaming-source.monitor-interval' = '12 h'    -- 手段3:遵命分区时间排序来识别最新分区   'streaming-source.enable' = 'true',   'streaming-source.partition.include' = 'latest',   'streaming-source.monitor-interval' = '12 h',   'streaming-source.partition-order' = 'partition-time', -- 遵命分区时间排序   'partition.time-extractor.kind' = 'default',   'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00'  ); 

有了上面的Hive维外,吾们就能够操纵该维外与Kafka的实时流数据进走JOIN,得到响答的宽外数据。

-- 操纵default sql方言 SET table.sql-dialect=default; -- kafka实时流数据外 CREATE TABLE orders_table (   order_id STRING,   order_amount DOUBLE,   product_id STRING,   log_ts TIMESTAMP(3),   proctime as PROCTIME() ) WITH (...);  -- 将流外与hive最新分区数占有关  SELECT * FROM orders_table AS orders JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim  ON orders.product_id = dim.product_id; 

除了在定义Hive维外时指定有关的参数,吾们还能够始末SQL Hint的手段动态指定有关的参数,详细手段如下:

SELECT * FROM orders_table AS orders JOIN dimension_table /*+ OPTIONS('streaming-source.enable'='true',                  'streaming-source.partition.include' = 'latest',     'streaming-source.monitor-interval' = '1 h',     'streaming-source.partition-order' = 'partition-name') */ FOR SYSTEM_TIME AS OF orders.proctime AS dim -- 时态外(维外) ON orders.product_id = dim.product_id; 

Temporal Join最新外

对于Hive的非分区外,当操纵temporal join时,整个Hive外会被缓存到Slot内存中,然后根据流中的数据对答的key与其进走匹配。操纵最新的Hive外进走temporal join不必要进走额外的配置,吾们只必要配置一个Hive外缓存的TTL时间,该时间的作用是:当缓存过期时,就会重新扫描Hive外并添载最新的数据。

lookup.join.cache.ttl

尖叫挑示:

当操纵此栽手段时,Hive外必须是有界的lookup外,即非Streaming Source的时态外,换句话说,该外的属性streaming-source.enable = false。

倘若要操纵Streaming Source的时态外,记得配置streaming-source.monitor-interval的值,即数据更新的时间阻隔。

默认值:60min 注释:外示缓存时间。由于 Hive 维外会把维外一切数据缓存在 TM 的内存中,当维外数据量很大时,很容易造成 OOM。自然TTL的时间也不及太短,由于会屡次地添载数据,从而影响性能。
-- Hive维外数据操纵批处理的手段按天装载 SET table.sql-dialect=hive; CREATE TABLE dimension_table (   product_id STRING,   product_name STRING,   unit_price DECIMAL(10, 4),   pv_count BIGINT,   like_count BIGINT,   comment_count BIGINT,   update_time TIMESTAMP(3),   update_user STRING,   ... ) TBLPROPERTIES (   'streaming-source.enable' = 'false', -- 关闭streaming source   'streaming-source.partition.include' = 'all',  -- 读取一切数据   'lookup.join.cache.ttl' = '12 h' ); -- kafka原形外 SET table.sql-dialect=default; CREATE TABLE orders_table (   order_id STRING,   order_amount DOUBLE,   product_id STRING,   log_ts TIMESTAMP(3),   proctime as PROCTIME() ) WITH (...);  -- Hive维外join,Flink会添载该维外的一切数据到内存中 SELECT * FROM orders_table AS orders JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim ON orders.product_id = dim.product_id; 

尖叫挑示:

1.每一个子义务都必要缓存一份维外的全量数据,必定要确保TM的task Slot 大幼能够原谅维外的数据量;

2.选举将streaming-source.monitor-interval和lookup.join.cache.ttl的值设为一个较大的数,由于屡次的更新和添载数据会影响性能。

3.当缓存的维外数据必要重新刷新时,现在的做法是将整个外进走添载,因此不能够将新数据与旧数据区睁开来。

Hive维外JOIN示例

倘若维外的数据是始末批处理的手段(比如每天)装载至Hive中,而Kafka中的原形流数据必要与该维外进走JOIN,从而构建一个宽外数据,这个时候就能够操纵Hive的维外JOIN。

创建一张kafka数据源外,实时流
SET table.sql-dialect=default; CREATE TABLE fact_user_behavior (      `user_id` BIGINT, -- 用户id     `item_id` BIGINT, -- 商品id     `action` STRING, -- 用户走为     `province` INT, -- 用户所在的省份     `ts` BIGINT, -- 用户走为发生的时间戳     `proctime` AS PROCTIME(), -- 始末计算列产生一个处理时间列     `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间      WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark  ) WITH (      'connector' = 'kafka', -- 操纵 kafka connector     'topic' = 'user_behaviors', -- kafka主题     'scan.startup.mode' = 'earliest-offset', -- 偏移量     'properties.group.id' = 'group1', -- 消耗者组     'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',      'format' = 'json', -- 数据源格式为json     'json.fail-on-missing-field' = 'true',     'json.ignore-parse-errors' = 'false' ); 
创建一张Hive维外
SET table.sql-dialect=hive; CREATE TABLE dim_item (   item_id BIGINT,   item_name STRING,   unit_price DECIMAL(10, 4) ) PARTITIONED BY (dt STRING) TBLPROPERTIES (   'streaming-source.enable' = 'true',   'streaming-source.partition.include' = 'latest',   'streaming-source.monitor-interval' = '12 h',   'streaming-source.partition-order' = 'partition-name' ); 
有关Hive维外的最新数据
SELECT      fact.item_id,     dim.item_name,     count(*) AS buy_cnt FROM fact_user_behavior AS fact LEFT JOIN dim_item FOR SYSTEM_TIME AS OF fact.proctime AS dim ON fact.item_id = dim.item_id WHERE fact.action = 'buy' GROUP BY fact.item_id,dim.item_name; 

操纵SQL Hint手段环保公司起名简介,有关非分区的Hive维外:

set table.dynamic-table-options.enabled= true;  SELECT      fact.item_id,     dim.item_name,     count(*) AS buy_cnt FROM fact_user_behavior AS fact LEFT JOIN dim_item1 /*+ OPTIONS('streaming-source.enable'='false',                  'streaming-source.partition.include' = 'all',     'lookup.join.cache.ttl' = '12 h') */ FOR SYSTEM_TIME AS OF fact.proctime AS dim ON fact.item_id = dim.item_id WHERE fact.action = 'buy' GROUP BY fact.item_id,dim.item_name; 
Flink upsert-kafka连接器 Upsert Kafka connector简介

Upsert Kafka Connector批准用户以upsert的手段从Kafka主题读取数据或将数据写入Kafka主题。

当行为数据源时,upsert-kafka Connector会生产一个changelog流,其中每条数据记录都外示一个更新或删除事件。更实在地说,倘若不存在对答的key,则视为INSERT操作。倘若已经存在了相对答的key,则该key对答的value值为末了一次更新的值。

用外来类比,changelog 流中的数据记录被注释为 UPSERT,也称为 INSERT/UPDATE,由于任何具有相通 key 的现有走都被遮盖。另外,value 为空的新闻将会被视行为 DELETE 新闻。

当行为数据汇时,upsert-kafka Connector会消耗一个changelog流。它将INSERT / UPDATE_AFTER数据行为平常的Kafka新闻值写入(即INSERT和UPDATE操作,都会进走平常写入,倘若是更新,则联相符个key会存储多条数据,但在读取该外数据时,只保留末了一次更新的值),并将 DELETE 数据以 value 为空的 Kafka 新闻写入(key被打上墓碑标记,外示对答 key 的新闻被删除)。Flink 将根据主键列的值对数据进走分区,从而保证主键上的新闻有序,因此联相符主键上的更新/删除新闻将落在联相符分区中

倚赖

为了操纵Upsert Kafka连接器,必要增补下面的倚赖

<dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-connector-kafka_2.12</artifactId>     <version>1.12.0</version> </dependency

倘若操纵SQL Client,必要下载flink-sql-connector-kafka_2.11-1.12.0.jar,并将其安放在Flink装配现在录的lib文件夹下。

操纵手段

操纵样例

-- 创建一张kafka外,用户存储sink的数据 CREATE TABLE pageviews_per_region (   user_region STRING,   pv BIGINT,   uv BIGINT,   PRIMARY KEY (user_region) NOT ENFORCED ) WITH (   'connector' = 'upsert-kafka',   'topic' = 'pageviews_per_region',   'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',   'key.format' = 'avro',   'value.format' = 'avro' ); 

尖叫挑示:

要操纵 upsert-kafka connector,必须在创建外时操纵PRIMARY KEY定义主键,并为键(key.format)和值(value.format)指定序列化逆序列化格式。

upsert-kafka connector参数

connector

必选。指定要操纵的连接器,Upsert Kafka 连接器操纵:'upsert-kafka'。

topic

必选。用于读取和写入的 Kafka topic 名称。

properties.bootstrap.servers

必选。以逗号分隔的 Kafka brokers 列外。

key.format

必选。用于对 Kafka 新闻中 key 片面序列化和逆序列化的格式。key 字段由 PRIMARY KEY 语法指定。声援的格式包括 'csv'、'json'、'avro'。

value.format

必选。用于对 Kafka 新闻中 value 片面序列化和逆序列化的格式。声援的格式包括 'csv'、'json'、'avro'。

*properties. **

可选。该选项能够传递肆意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。Flink 会自动移除 选项名中的 "properties." 前缀,并将转换后的键名以及值传入 KafkaClient。例如,你能够始末 'properties.allow.auto.create.topics' = 'false' 来不准自动创建 topic。但是,某些选项,例如'key.deserializer' 和 'value.deserializer' 是不批准始末该手段传递参数,由于 Flink 会重写这些参数的值。

value.fields-include

可选,默认为ALL。限制key字段是否出现在 value 中。当取ALL时,外示新闻的 value 片面将包含 schema 中一切的字段,包括定义为主键的字段。当取EXCEPT_KEY时,外示记录的 value 片面包含 schema 的一切字段,定义为主键的字段除外。

key.fields-prefix

可选。为了避免与value字段命名冲突,为key字段增补一个自定义前缀。默认前缀为空。一旦指定了key字段的前缀,必须在DDL中指明前缀的名称,但是在构建key的序列化数据类型时,将移除该前缀。见下面的示例。在必要仔细的是:操纵该配置属性,value.fields-include的值必须为EXCEPT_KEY。

-- 创建一张upsert外,当指定了qwe前缀,涉及的key必须指定qwe前缀 CREATE TABLE result_total_pvuv_min_prefix (     qwedo_date     STRING,     -- 统计日期,必须包含qwe前缀     qwedo_min      STRING,      -- 统计分钟,必须包含qwe前缀     pv          BIGINT,     -- 点击量     uv          BIGINT,     -- 镇日内同个访客多次访问仅计算一个UV     currenttime TIMESTAMP,  -- 现在时间     PRIMARY KEY (qwedo_date, qwedo_min) NOT ENFORCED -- 必须包含qwe前缀 ) WITH (   'connector' = 'upsert-kafka',   'topic' = 'result_total_pvuv_min_prefix',   'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',   'key.json.ignore-parse-errors' = 'true',   'value.json.fail-on-missing-field' = 'false',   'key.format' = 'json',   'value.format' = 'json',   'key.fields-prefix'='qwe', -- 指定前缀qwe   'value.fields-include' = 'EXCEPT_KEY' -- key不展现kafka新闻的value中 ); -- 向该外中写入数据 INSERT INTO result_total_pvuv_min_prefix SELECT   do_date,    --  时间分区   cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- 分钟级别的时间   pv,   uv,   CURRENT_TIMESTAMP AS currenttime -- 现在时间 from   view_total_pvuv_min; 

尖叫挑示:

倘若指定了key字段前缀,但在DDL中并异国增补该前缀字符串,那么在向该外写入数时,会抛出下面变态:

[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: All fields in 'key.fields' must be prefixed with 'qwe' when option 'key.fields-prefix' is set but field 'do_date' is not prefixed.

sink.parallelism

可选。定义 upsert-kafka sink 算子的并走度。默认情况下,由框架确定并走度,与上游链接算子的并走度保持相反。

其他仔细事项

Key和Value的序列化格式

关于Key、value的序列化能够参考Kafka connector。值得仔细的是,必须指定Key和Value的序列化格式,其中Key是始末PRIMARY KEY指定的。

Primary Key收敛

Upsert Kafka 做事在 upsert 模式(FLIP-149)下。当吾们创建外时,必要在 DDL 中定义主键。具有相通key的数据,会存在相通的分区中。在 changlog source 上定义主键意味着在死亡后的 changelog 上主键具有唯一性。定义的主键将决定哪些字段出现在 Kafka 新闻的 key 中。

相反性保障

默认情况下,倘若启用 checkpoint,Upsert Kafka sink 会保证起码一次将数据插入 Kafka topic。

这意味着,Flink 能够将具有相通 key 的重复记录写入 Kafka topic。但由于该连接器以 upsert 的模式做事,该连接器行为 source 读时兴,能够确保具有相通主键值下仅末了一条新闻会奏效。因此,upsert-kafka 连接器能够像 HBase sink 相通实现幂等写入。

分区水位线

Flink 声援根据 Upsert Kafka 的 每个分区的数据特性发送响答的 watermark。当操纵这个特性的时候,watermark 是在 Kafka consumer 内部生成的。相符并每个分区生成的 watermark 的手段和 streaming shuffle 的手段是相反的(单个分区的输入取最大值,多个分区的输入取最幼值)。数据源产生的 watermark 是取决于该 consumer 负责的一切分区中现在最幼的 watermark。倘若该 consumer 负责的片面门区是余暇的,那么集体的 watermark 并不会进展。在这栽情况下,能够始末竖立正当的 table.exec.source.idle-timeout 来缓解这个题目。

数据类型

Upsert Kafka 用字节bytes存储新闻的 key 和 value,因此异国 schema 或数据类型。新闻按格式进走序列化和逆序列化,例如:csv、json、avro。分歧的序列化格式所挑供的数据类型有所分歧,因此必要根据操纵的序列化格式进走确定外字段的数据类型是否与该序列化类型挑供的数据类型兼容。

操纵案例

本文以实时地统计网页PV和UV的总量为例,介绍upsert-kafka基本操纵手段:

Kafka 数据源

用户的ippv新闻,一个用户在镇日内能够有许多次pv

CREATE TABLE source_ods_fact_user_ippv (     user_id      STRING,       -- 用户ID     client_ip    STRING,       -- 客户端IP     client_info  STRING,       -- 设备机型新闻     pagecode     STRING,       -- 页面代码     access_time  TIMESTAMP,    -- 乞求时间     dt           STRING,       -- 时间分区天     WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND  -- 定义watermark ) WITH (    'connector' = 'kafka', -- 操纵 kafka connector     'topic' = 'user_ippv', -- kafka主题     'scan.startup.mode' = 'earliest-offset', -- 偏移量     'properties.group.id' = 'group1', -- 消耗者组     'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',      'format' = 'json', -- 数据源格式为json     'json.fail-on-missing-field' = 'false',     'json.ignore-parse-errors' = 'true' ); 
Kafka Sink外

统计每分钟的PV、UV,并将效果存储在Kafka中

CREATE TABLE result_total_pvuv_min (     do_date     STRING,     -- 统计日期     do_min      STRING,      -- 统计分钟     pv          BIGINT,     -- 点击量     uv          BIGINT,     -- 镇日内同个访客多次访问仅计算一个UV     currenttime TIMESTAMP,  -- 现在时间     PRIMARY KEY (do_date, do_min) NOT ENFORCED ) WITH (   'connector' = 'upsert-kafka',   'topic' = 'result_total_pvuv_min',   'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',   'key.json.ignore-parse-errors' = 'true',   'value.json.fail-on-missing-field' = 'false',   'key.format' = 'json',   'value.format' = 'json',   'value.fields-include' = 'EXCEPT_KEY' -- key不展现kafka新闻的value中 ); 
计算逻辑
-- 创建视图 CREATE VIEW view_total_pvuv_min AS SELECT      dt AS do_date,                    -- 时间分区      count (client_ip) AS pv,          -- 客户端的IP      count (DISTINCT client_ip) AS uv, -- 客户端往重      max(access_time) AS access_time   -- 乞求的时间 FROM     source_ods_fact_user_ippv GROUP BY dt;  -- 写入数据 INSERT INTO result_total_pvuv_min SELECT   do_date,    --  时间分区   cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- 分钟级别的时间   pv,   uv,   CURRENT_TIMESTAMP AS currenttime -- 现在时间 from   view_total_pvuv_min; 
生产用户访问数据到kafka,向kafka中的user_ippv插入数据:
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:32:24","dt":"2021-01-08"}  {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1201","access_time":"2021-01-08 11:32:55","dt":"2021-01-08"}  {"user_id":"2","client_ip":"192.165.12.1","client_info":"pc","pagecode":"1031","access_time":"2021-01-08 11:32:59","dt":"2021-01-08"}  {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1101","access_time":"2021-01-08 11:33:24","dt":"2021-01-08"}  {"user_id":"3","client_ip":"192.168.10.3","client_info":"pc","pagecode":"1001","access_time":"2021-01-08 11:33:30","dt":"2021-01-08"}  {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:34:24","dt":"2021-01-08"} 
查询效果外:
select * from result_total_pvuv_min; 

能够望出:每分钟的pv、uv只表现一条数据,即代外着截止到现在时间点的pv和uv

查望Kafka中result_total_pvuv_min主题的数据,如下:

能够望出:针对每一条访问数据,触发计算了一次PV、UV,每一条数据都是截止到现在时间的累计PV和UV。

尖叫挑示:

默认情况下,倘若在启用了检查点的情况下实走查询,Upsert Kafka授与器会将具有起码一次保证的数据挑取到Kafka主题中。

这意味着,Flink能够会将具有相通键的重复记录写入Kafka主题。但是,由于连接器在upsert模式下做事,因此行为源读回时,联相符键上的末了一条记录将奏效。因此,upsert-kafka连接器就像HBase授与器相通实现幂等写入。

Flink CDC的connector 简介

Flink CDC Connector 是ApacheFlink的一组数据源连接器,操纵转折数据捕获change data capture (CDC)从分歧的数据库中挑取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它能够足够行使Debezium的功能。

特点

声援读取数据库快照,并且能够赓续读取数据库的变更日志,即使发生故障,也声援exactly-once 的处理语义 对于DataStream API的CDC connector,用户无需安放Debezium和Kafka,即可在单个作业中操纵多个数据库和外上的变更数据。 对于Table/SQL API 的CDC connector,用户能够操纵SQL DDL创建CDC数据源,来监视单个外上的数据变更。

操纵场景

数据库之间的添量数据同步 审计日志 数据库之上的实时死亡视图 基于CDC的维外join … Flink挑供的 table format

Flink挑供了一系列能够用于table connector的table format,详细如下:

Formats Supported Connectors CSV Apache Kafka, Filesystem JSON Apache Kafka, Filesystem, Elasticsearch Apache Avro Apache Kafka, Filesystem Debezium CDC Apache Kafka Canal CDC Apache Kafka Apache Parquet Filesystem Apache ORC Filesystem 操纵过程中的仔细点

操纵MySQL CDC的仔细点

倘若要操纵MySQL CDC connector,对于程序而言,必要增补如下倚赖:

<dependency>   <groupId>com.alibaba.ververica</groupId>   <artifactId>flink-connector-mysql-cdc</artifactId>   <version>1.0.0</version> </dependency

倘若要操纵Flink SQL Client,必要增补如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink装配现在录的lib文件夹下即可。

操纵canal-json的仔细点

倘若要操纵Kafka的canal-json,对于程序而言,必要增补如下倚赖:

<!-- universal --> <dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-connector-kafka_2.11</artifactId>     <version>1.11.0</version> </dependency

倘若要操纵Flink SQL Client,必要增补如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,将该jar包放在Flink装配现在录的lib文件夹下即可。由于Flink1.11的装配包 的lib现在录下并异国挑供该jar包,因而必须要手动增补倚赖包,否则会报如下舛讹:

[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.  Available factory identifiers are:  datagen mysql-cdc 

操纵changelog-json的仔细点

倘若要操纵Kafka的changelog-json Format,对于程序而言,必要增补如下倚赖:

<dependency>   <groupId>com.alibaba.ververica</groupId>   <artifactId>flink-format-changelog-json</artifactId>   <version>1.0.0</version> </dependency

倘若要操纵Flink SQL Client,必要增补如下jar包:flink-format-changelog-json-1.0.0.jar,将该jar包放在Flink装配现在录的lib文件夹下即可。

mysql-cdc的操作实践
-- MySQL /*Table structure for table `order_info` */ DROP TABLE IF EXISTS `order_info`; CREATE TABLE `order_info` (   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',   `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',   `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',   `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',   `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1外示下单,2外示支付',   `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',   `payment_way` varchar(20) DEFAULT NULL COMMENT '付款手段',   `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',   `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',   `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单营业编号(第三方支付用)',   `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',   `create_time` datetime DEFAULT NULL COMMENT '创建时间',   `operate_time` datetime DEFAULT NULL COMMENT '操作时间',   `expire_time` datetime DEFAULT NULL COMMENT '失效时间',   `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',   `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',   `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',   `province_id` int(20) DEFAULT NULL COMMENT '地区',   PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单外'; -- ---------------------------- -- Records of order_info -- ---------------------------- INSERT INTO `order_info`  VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9); INSERT INTO `order_info` VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3); INSERT INTO `order_info` VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);  /*Table structure for table `order_detail` */ CREATE TABLE `order_detail` (   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',   `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',   `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',   `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',   `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',   `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',   `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',   `create_time` datetime DEFAULT NULL COMMENT '创建时间',   PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单明细外';  -- ---------------------------- -- Records of order_detail -- ---------------------------- INSERT INTO `order_detail`  VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38'); INSERT INTO `order_detail`  VALUES (1330, 477, 9, '荣耀10 GT游玩添速 AIS手持夜景 6GB+64GB 幻影蓝全网通 移动联通电信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25'); INSERT INTO `order_detail` VALUES (1331, 478, 4, '幼米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 幼水滴周详屏拍照游玩智能手机', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34'); INSERT INTO `order_detail`  VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34'); INSERT INTO `order_detail`  VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34'); 

Flink SQL Cli创建CDC数据源

启动 Flink 集群,再启动 SQL CLI,实走下面命令:

-- 创建订单新闻外 CREATE TABLE order_info(     id BIGINT,     user_id BIGINT,     create_time TIMESTAMP(0),     operate_time TIMESTAMP(0),     province_id INT,     order_status STRING,     total_amount DECIMAL(10, 5)   ) WITH (     'connector' = 'mysql-cdc',     'hostname' = 'kms-1',     'port' = '3306',     'username' = 'root',     'password' = '123qwe',     'database-name' = 'mydw',     'table-name' = 'order_info' ); 

在Flink SQL Cli中查询该外的数据:result-mode: tableau,+外示数据的insert

在SQL CLI中创建订单细目外:

CREATE TABLE order_detail(     id BIGINT,     order_id BIGINT,     sku_id BIGINT,     sku_name STRING,     sku_num BIGINT,     order_price DECIMAL(10, 5),  create_time TIMESTAMP(0)  ) WITH (     'connector' = 'mysql-cdc',     'hostname' = 'kms-1',     'port' = '3306',     'username' = 'root',     'password' = '123qwe',     'database-name' = 'mydw',     'table-name' = 'order_detail' ); 

查询效果如下:

实走JOIN操作:

SELECT     od.id,     oi.id order_id,     oi.user_id,     oi.province_id,     od.sku_id,     od.sku_name,     od.sku_num,     od.order_price,     oi.create_time,     oi.operate_time FROM    (     SELECT *      FROM order_info     WHERE        order_status = '2'-- 已支付    ) oi    JOIN   (     SELECT *     FROM order_detail   ) od    ON oi.id = od.order_id; 
canal-json的操作实践

关于cannal的操纵手段,能够参考吾的另一篇文章:基于Canal与Flink实现数据实时添量同步(一)。吾已经将下面的外始末canal同步到了kafka,详细格式为:

{     "data":[         {             "id":"1",             "region_name":"华北"         },         {             "id":"2",             "region_name":"华东"         },         {             "id":"3",             "region_name":"东北"         },         {             "id":"4",             "region_name":"华中"         },         {             "id":"5",             "region_name":"华南"         },         {             "id":"6",             "region_name":"西南"         },         {             "id":"7",             "region_name":"西北"         }     ],     "database":"mydw",     "es":1597128441000,     "id":102,     "isDdl":false,     "mysqlType":{         "id":"varchar(20)",         "region_name":"varchar(20)"     },     "old":null,     "pkNames":null,     "sql":"",     "sqlType":{         "id":12,         "region_name":12     },     "table":"base_region",     "ts":1597128441424,     "type":"INSERT" } 

在SQL CLI中创建该canal-json格式的外:

CREATE TABLE region (   id BIGINT,   region_name STRING ) WITH (  'connector' = 'kafka',  'topic' = 'mydw.base_region',  'properties.bootstrap.servers' = 'kms-3:9092',  'properties.group.id' = 'testGroup',  'format' = 'canal-json' ,  'scan.startup.mode' = 'earliest-offset'  ); 

查询效果如下:

changelog-json的操作实践

创建MySQL数据源

参见上面的order_info

Flink SQL Cli创建changelog-json外

CREATE TABLE order_gmv2kafka (   day_str STRING,   gmv DECIMAL(10, 5) ) WITH (     'connector' = 'kafka',     'topic' = 'order_gmv_kafka',     'scan.startup.mode' = 'earliest-offset',     'properties.bootstrap.servers' = 'kms-3:9092',     'format' = 'changelog-json' );  INSERT INTO order_gmv2kafka SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmv FROM order_info WHERE order_status = '2' -- 订单已支付 GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd');  

查询外望一下效果:

再查一下kafka的数据:

{"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"} 

当将另外两个订单的状态order_status更新为2时,总金额=443+772+88=1293再不悦目察数据:

再望kafka中的数据:

总结

本文主要介绍了基于FlinK构建实时数仓的技术点,并对其操纵手段进走了详细描述,始末本文你也许对实时数仓和流批一体的行使会有一个深切意识,期待本文对你有所协助。

【编辑选举】环保公司起名简介

数仓架构赓续演进与发展:云原生、湖仓一体、离线实时一体、SaaS Flink Metrics监控与 RestApi 数仓 | 该如何理解数据仓库的建设 汽车之家:基于 Flink + Iceberg 的湖仓一体架构实践 Flink12大数据实时流处理 Flink 和 Iceberg 如何解决数据入湖面临的挑衅

Powered by 中国体育彩票手机版 @2013-2021 RSS地图 HTML地图