Skip to content

cloudstorage: move cloud storage sink to the specified package #1222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Apr 22, 2025
Merged
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
20531ed
move cloud storage sink to the specified package
3AceShowHand Apr 14, 2025
a30a2fe
move cloud storage writer to the corresponding package
3AceShowHand Apr 15, 2025
9da22e8
lower the names
3AceShowHand Apr 15, 2025
187e9c6
Merge branch 'master' into refactor-cloud-storage
3AceShowHand Apr 15, 2025
607c7de
some tiny fix
3AceShowHand Apr 15, 2025
b677d41
some tiny fix
3AceShowHand Apr 15, 2025
da9becf
remove ddl worker
3AceShowHand Apr 15, 2025
e0fe6c1
make fmt
3AceShowHand Apr 16, 2025
8dae206
Merge branch 'master' into refactor-cloud-storage
3AceShowHand Apr 16, 2025
41eff9b
adjust how to call post flush
3AceShowHand Apr 16, 2025
6bad10b
remove the is dead
3AceShowHand Apr 16, 2025
888c36d
fix some
3AceShowHand Apr 16, 2025
4a3946f
move ddl worker ut to the sink
3AceShowHand Apr 16, 2025
64622b1
fix fmt
3AceShowHand Apr 16, 2025
81e5508
simply fix
3AceShowHand Apr 16, 2025
bcd3413
Merge branch 'master' into refactor-cloud-storage
3AceShowHand Apr 16, 2025
916f07a
fix tests
3AceShowHand Apr 16, 2025
67c549f
adjust the ut
3AceShowHand Apr 17, 2025
7f6710d
refactor the dml workers
3AceShowHand Apr 17, 2025
0a59dca
fix unit test
3AceShowHand Apr 17, 2025
ff6fa18
send checkpoint like other sink
3AceShowHand Apr 17, 2025
12df262
add one comment
3AceShowHand Apr 17, 2025
266e090
fix test
3AceShowHand Apr 18, 2025
62177d7
add schems
3AceShowHand Apr 18, 2025
fd69fb1
fix ut
3AceShowHand Apr 18, 2025
2775f0c
Merge branch 'master' into refactor-cloud-storage
3AceShowHand Apr 18, 2025
8afb37a
merge master
3AceShowHand Apr 18, 2025
9037165
fix test
3AceShowHand Apr 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -113,7 +113,7 @@ P=3

# The following packages are used in unit tests.
# Add new packages here if you want to include them in unit tests.
UT_PACKAGES_DISPATCHER := ./pkg/sink/cloudstorage/... ./pkg/sink/mysql/... ./pkg/sink/util/... ./downstreamadapter/sink/... ./downstreamadapter/dispatcher/... ./downstreamadapter/worker/... ./downstreamadapter/worker/writer/... ./pkg/sink/codec/avro/... ./pkg/sink/codec/open/... ./pkg/sink/codec/csv/... ./pkg/sink/codec/canal/... ./pkg/sink/codec/debezium/... ./pkg/sink/codec/simple/...
UT_PACKAGES_DISPATCHER := ./pkg/sink/cloudstorage/... ./pkg/sink/mysql/... ./pkg/sink/util/... ./downstreamadapter/sink/... ./downstreamadapter/dispatcher/... ./pkg/sink/codec/avro/... ./pkg/sink/codec/open/... ./pkg/sink/codec/csv/... ./pkg/sink/codec/canal/... ./pkg/sink/codec/debezium/... ./pkg/sink/codec/simple/...
UT_PACKAGES_MAINTAINER := ./maintainer/...
UT_PACKAGES_COORDINATOR := ./coordinator/...
UT_PACKAGES_LOGSERVICE := ./logservice/...
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package sink
package blackhole

import (
"context"
@@ -23,27 +23,26 @@ import (
"go.uber.org/zap"
)

// BlackHoleSink is responsible for writing data to blackhole.
// sink is responsible for writing data to blackhole.
// Including DDL and DML.
type BlackHoleSink struct{}
type sink struct{}

func newBlackHoleSink() (*BlackHoleSink, error) {
blackholeSink := BlackHoleSink{}
return &blackholeSink, nil
func New() (*sink, error) {
return &sink{}, nil
}

func (s *BlackHoleSink) IsNormal() bool {
func (s *sink) IsNormal() bool {
return true
}

func (s *BlackHoleSink) SinkType() common.SinkType {
func (s *sink) SinkType() common.SinkType {
return common.BlackHoleSinkType
}

func (s *BlackHoleSink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) {
func (s *sink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) {
}

func (s *BlackHoleSink) AddDMLEvent(event *commonEvent.DMLEvent) {
func (s *sink) AddDMLEvent(event *commonEvent.DMLEvent) {
// NOTE: don't change the log, integration test `lossy_ddl` depends on it.
// ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L23
log.Debug("BlackHoleSink: WriteEvents", zap.Any("dml", event))
@@ -52,37 +51,31 @@ func (s *BlackHoleSink) AddDMLEvent(event *commonEvent.DMLEvent) {
}
}

func (s *BlackHoleSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error {
switch event.GetType() {
case commonEvent.TypeDDLEvent:
e := event.(*commonEvent.DDLEvent)
// NOTE: don't change the log, integration test `lossy_ddl` depends on it.
// ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L17
log.Debug("BlackHoleSink: DDL Event", zap.Any("ddl", e))
for _, callback := range e.PostTxnFlushed {
callback()
}
case commonEvent.TypeSyncPointEvent:
e := event.(*commonEvent.SyncPointEvent)
for _, callback := range e.PostTxnFlushed {
callback()
}
default:
log.Error("unknown event type",
zap.Any("event", event))
}
event.PostFlush()
return nil
}

func (s *BlackHoleSink) AddCheckpointTs(_ uint64) {
func (s *sink) AddCheckpointTs(_ uint64) {
}

func (s *BlackHoleSink) GetStartTsList(_ []int64, startTsList []int64, _ bool) ([]int64, []bool, error) {
func (s *sink) GetStartTsList(_ []int64, startTsList []int64, _ bool) ([]int64, []bool, error) {
return startTsList, make([]bool, len(startTsList)), nil
}

func (s *BlackHoleSink) Close(_ bool) {}
func (s *sink) Close(_ bool) {}

func (s *BlackHoleSink) Run(_ context.Context) error {
func (s *sink) Run(_ context.Context) error {
return nil
}
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package sink
package blackhole

import (
"testing"
@@ -23,7 +23,7 @@ import (

// Test callback and tableProgress works as expected after AddDMLEvent
func TestBlacHoleSinkBasicFunctionality(t *testing.T) {
sink, err := newBlackHoleSink()
sink, err := New()
require.NoError(t, err)

count := 0
198 changes: 0 additions & 198 deletions downstreamadapter/sink/cloudstorage.go

This file was deleted.

Loading