Skip to content

Commit

Permalink
Merge branch 'apache:master' into expose-newly-add-table-in-pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
qg-lin authored Aug 5, 2024
2 parents ee98165 + 47f5660 commit 014e7a3
Show file tree
Hide file tree
Showing 163 changed files with 7,495 additions and 2,378 deletions.
1 change: 1 addition & 0 deletions .github/actions/get-workflow-origin
Submodule get-workflow-origin added at 1b5a8a
1 change: 1 addition & 0 deletions .github/actions/label-when-approved-action
64 changes: 64 additions & 0 deletions .github/workflows/approve_label.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: "Label when approved workflow run"
on:
workflow_run:
workflows: [Label-when-reviewed]
types: [requested]
permissions:
# All other permissions are set to none
checks: write
contents: read
pull-requests: write
jobs:
label-when-approved:
name: "Label when approved"
runs-on: ubuntu-latest
outputs:
isApprovedByCommiters: ${{ steps.label-when-approved-by-commiters.outputs.isApproved }}
isApprovedByAnyone: ${{ steps.label-when-approved-by-anyone.outputs.isApproved }}
steps:
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v2
with:
persist-credentials: false
submodules: recursive
- name: "Get information about the original trigger of the run"
uses: ./.github/actions/get-workflow-origin
id: source-run-info
with:
token: ${{ secrets.GITHUB_TOKEN }}
sourceRunId: ${{ github.event.workflow_run.id }}
- name: Label when approved by commiters
uses: ./.github/actions/label-when-approved-action
id: label-when-approved-by-commiters
with:
token: ${{ secrets.GITHUB_TOKEN }}
label: 'approved'
require_committers_approval: 'true'
remove_label_when_approval_missing: 'true'
pullRequestNumber: ${{ steps.source-run-info.outputs.pullRequestNumber }}
- name: Label when approved by anyone
uses: ./.github/actions/label-when-approved-action
id: label-when-approved-by-anyone
with:
token: ${{ secrets.GITHUB_TOKEN }}
label: 'reviewed'
pullRequestNumber: ${{ steps.source-run-info.outputs.pullRequestNumber }}
remove_label_when_approval_missing: 'true'
28 changes: 28 additions & 0 deletions .github/workflows/approve_label_trigger.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: Label-when-reviewed
on: pull_request_review
jobs:

label-when-reviewed:
name: "Label PRs when reviewed"
runs-on: ubuntu-latest
steps:
- name: "Do nothing. Only trigger corresponding workflow_run event"
run: echo
56 changes: 56 additions & 0 deletions .github/workflows/close_stale.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# https://github.com/actions/stale
name: 'Close stale issues and PRs'
on:
schedule:
- cron: '0 0 * * *'
permissions:
# Stale recommended permissions
pull-requests: write
issues: write
jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v9
with:
operations-per-run: 1000
# Stale Issues
days-before-issue-stale: 0
days-before-issue-close: 0
stale-issue-message: >
As required by Apache Flink, please report bugs or new features on Apache Jira
under the project Flink using component tag Flink CDC. You must have a JIRA account in order to log cases and issues.
If you don’t have an ASF JIRA account, you can request one at the ASF Self-serve portal,
account creation requires review by the PMC member of the application project, which normally takes one to two working days to be approved.
close-issue-message: >
This issue has been closed because Flink CDC doesn't use GitHub issue trackers.
# Stale PRs
days-before-pr-stale: 60
days-before-pr-close: 30
stale-pr-message: >
This pull request has been automatically marked as stale because it has not had recent
activity for 60 days. It will be closed in 30 days if no further activity occurs.
close-pr-message: >
This pull request has been closed because it has not had recent activity. You could reopen it
if you try to continue your work, and anyone who are interested in it are encouraged to continue
work on this pull request.
close-pr-label: stale
remove-pr-stale-when-updated: true
remove-issue-stale-when-updated: true
labels-to-add-when-unstale: 'waiting for review'
110 changes: 98 additions & 12 deletions .github/workflows/flink_cdc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,8 @@ jobs:
- name: Run license check
run: gem install rubyzip -v 2.3.0 && ./tools/ci/license_check.rb

migration_test:
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
with:
submodules: true
- name: Compile snapshot CDC version
run: mvn --no-snapshot-updates -B install -DskipTests
- name: Run migration tests
run: cd flink-cdc-migration-tests && mvn clean verify

compile_and_test:
needs: license_check
# Only run the CI pipeline for the flink-cdc-connectors repository
# if: github.repository == 'apache/flink-cdc-connectors'
runs-on: ubuntu-latest
Expand Down Expand Up @@ -263,3 +252,100 @@ jobs:
done
fi
exit 0
migration_test_ut:
needs: license_check
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
with:
submodules: true
- name: Compile snapshot CDC version
run: mvn --no-snapshot-updates -B install -DskipTests
- name: Run migration tests
run: cd flink-cdc-migration-tests && mvn clean verify

pipeline_migration_test:
needs: migration_test_ut
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [ '8', '11' ]

steps:
- uses: actions/checkout@v4
- name: Set up Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: 3.0
bundler-cache: true # runs 'bundle install' and caches installed gems automatically
- uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java-version }}
distribution: temurin
cache: maven
- name: Install dependencies
run: gem install terminal-table
- name: Prepare CDC versions
run: CDC_SOURCE_HOME=$PWD ruby tools/mig-test/prepare_libs.rb
- name: Prepare Flink distro
run: wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz && tar -xzvf flink-1.18.1-bin-scala_2.12.tgz
working-directory: ./tools/mig-test
- name: Patch Flink configs
run: FLINK_HOME=./flink-1.18.1/ ruby misc/patch_flink_conf.rb
working-directory: ./tools/mig-test
- name: Start containers
run: cd conf && docker compose up -d
working-directory: ./tools/mig-test
- name: Run migration tests
run: FLINK_HOME=./flink-1.18.1/ ruby run_migration_test.rb
working-directory: ./tools/mig-test
- name: Stop containers
if: always()
run: cd conf && docker compose down
working-directory: ./tools/mig-test

data_stream_migration_test:
needs: migration_test_ut
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [ '8', '11' ]

steps:
- uses: actions/checkout@v4
- name: Set up Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: 3.0
bundler-cache: true # runs 'bundle install' and caches installed gems automatically
- uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java-version }}
distribution: temurin
cache: maven
- name: Install dependencies
run: gem install terminal-table
- name: Prepare CDC versions
run: CDC_SOURCE_HOME=$PWD ruby tools/mig-test/prepare_libs.rb
- name: Prepare Flink distro
run: wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz && tar -xzvf flink-1.18.1-bin-scala_2.12.tgz
working-directory: ./tools/mig-test
- name: Patch Flink configs
run: FLINK_HOME=./flink-1.18.1/ ruby misc/patch_flink_conf.rb
working-directory: ./tools/mig-test
- name: Compile Dummy DataStream Jobs
run: cd datastream && ruby compile_jobs.rb
working-directory: ./tools/mig-test
- name: Start containers
run: cd conf && docker compose up -d
working-directory: ./tools/mig-test
- name: Run migration tests
run: cd datastream && FLINK_HOME=../flink-1.18.1/ ruby run_migration_test.rb
working-directory: ./tools/mig-test
- name: Stop containers
if: always()
run: cd conf && docker compose down
working-directory: ./tools/mig-test
6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
[submodule "docs/themes/book"]
path = docs/themes/book
url = https://github.com/alex-shpak/hugo-book
[submodule ".github/actions/get-workflow-origin"]
path = .github/actions/get-workflow-origin
url = https://github.com/potiuk/get-workflow-origin
[submodule ".github/actions/label-when-approved-action"]
path = .github/actions/label-when-approved-action
url = https://github.com/TobKed/label-when-approved-action
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ full database synchronization, sharding table synchronization, schema evolution
password: pass

transform:
- source-table: adb.web_order01
projection: \*, UPPER(product_name) as product_name
filter: id > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, UPPER(product_name) as product_name
filter: id > 20 AND order_id > 200
description: project fields and filter
- source-table: adb.web_order01
projection: \*, UPPER(product_name) as product_name
filter: id > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, UPPER(product_name) as product_name
filter: id > 20 AND order_id > 200
description: project fields and filter

route:
- source-table: adb.web_order\.*
sink-table: adb.ods_web_orders
description: sync sharding tables to one destination table
- source-table: adb.web_order\.*
sink-table: adb.ods_web_orders
description: sync sharding tables to one destination table

pipeline:
name: MySQL to Doris Pipeline
Expand Down
19 changes: 15 additions & 4 deletions docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ Flink SQL> SELECT * FROM orders;
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
<td> MySQL CDC 消费者可选的启动模式,
合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。
合法的模式为 "initial","earliest-offset","latest-offset","specific-offset","timestamp" 和 "snapshot"。
请查阅 <a href="#a-name-id-002-a">启动模式</a> 章节了解更多详细信息。</td>
</tr>
<tr>
Expand All @@ -285,6 +285,13 @@ Flink SQL> SELECT * FROM orders;
<td>String</td>
<td>在 "specific-offset" 启动模式下,启动位点的 GTID 集合。</td>
</tr>
<tr>
<td>scan.startup.timestamp-millis</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>在 "timestamp" 启动模式下,启动位点的毫秒时间戳。</td>
</tr>
<tr>
<td>scan.startup.specific-offset.skip-events</td>
<td>optional</td>
Expand Down Expand Up @@ -493,7 +500,7 @@ CREATE TABLE products (
* (3)在快照读取之前,Source 不需要数据库锁权限。

如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此`server id`的范围必须类似于 `5400-6400`
且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk)
且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 源首先会根据您指定的表块键将表分块(chunk)
然后 MySQL CDC Source 将多个块分配给多个 reader 以并行读取表的数据。

#### 并发读取
Expand Down Expand Up @@ -543,7 +550,7 @@ MySQL 集群中你监控的服务器出现故障后, 你只需将受监视的服

当 MySQL CDC Source 启动时,它并行读取表的快照,然后以单并行度的方式读取表的 binlog。

在快照阶段,根据表的主键和表行的大小将快照切割成多个快照块
在快照阶段,快照会根据表的分块键和表行的大小切割成多个快照块
快照块被分配给多个快照读取器。每个快照读取器使用 [区块读取算法](#snapshot-chunk-reading) 并将读取的数据发送到下游。
Source 会管理块的进程状态(完成或未完成),因此快照阶段的 Source 可以支持块级别的 checkpoint。
如果发生故障,可以恢复 Source 并继续从最后完成的块中读取块。
Expand All @@ -558,7 +565,9 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业

在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。
MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。
如果表中没有主键, 增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 来回退到旧的快照读取机制。
如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`
否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。

对于数值和自动增量拆分列,MySQL CDC Source 按固定步长高效地拆分块。
例如,如果你有一个主键列为`id`的表,它是自动增量 BIGINT 类型,最小值为`0`,最大值为`100`
Expand Down Expand Up @@ -622,6 +631,7 @@ MySQLSource.builder()
.startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 从指定 binlog 文件名和位置启动
.startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")) // 从 GTID 集合启动
.startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动
.startupOptions(StartupOptions.snapshot()) // 仅读取快照
...
.build()
```
Expand All @@ -635,6 +645,7 @@ CREATE TABLE mysql_source (...) WITH (
'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
'scan.startup.mode' = 'specific-offset', -- 从特定位点启动
'scan.startup.mode' = 'timestamp', -- 从特定位点启动
'scan.startup.mode' = 'snapshot', -- 仅读取快照
'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定 binlog 文件名
'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置
'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合
Expand Down
Loading

0 comments on commit 014e7a3

Please sign in to comment.