Skip to content

Commit ef0427f

Browse files
authored
Merge pull request #214 from data-integrations/CDAP-20121
MYSQL Create Table doesn't pick up the schema according to debezium's…
2 parents 57ff488 + 27b1461 commit ef0427f

File tree

3 files changed

+49
-4
lines changed

3 files changed

+49
-4
lines changed

delta-plugins-common/src/main/java/io/cdap/delta/plugin/mock/MockContext.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@
3636
public class MockContext implements DeltaSourceContext {
3737
private final Class<?> driverClass;
3838
private final Map<String, byte[]> state = new HashMap<>();
39+
private Map<String, String> runtimeArguments;
3940

4041
public MockContext(Class<?> driverClass) {
4142
this.driverClass = driverClass;
43+
this.runtimeArguments = new HashMap<>();
4244
}
4345

4446
@Override
@@ -88,7 +90,7 @@ public Map<String, String> getTags() {
8890

8991
@Override
9092
public Map<String, String> getRuntimeArguments() {
91-
return new HashMap<>();
93+
return this.runtimeArguments;
9294
}
9395

9496
@Override
@@ -151,4 +153,8 @@ public <T> T newPluginInstance(String pluginId, MacroEvaluator evaluator) {
151153
public void notifyFailed(Throwable throwable) {
152154
// no-op
153155
}
156+
157+
public void addRuntimeArgument(String key, String value) {
158+
this.runtimeArguments.put(key, value);
159+
}
154160
}

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
*/
6060
public class MySqlEventReader implements EventReader {
6161
public static final Logger LOG = LoggerFactory.getLogger(MySqlEventReader.class);
62-
private static final String SOURCE_CONNECTOR_PREFIX = "source.connector.";
62+
protected static final String SOURCE_CONNECTOR_PREFIX = "source.connector.";
6363
private final MySqlConfig config;
6464
private final EventEmitter emitter;
6565
private final ExecutorService executorService;
@@ -200,7 +200,7 @@ private static MySqlValueConverters getValueConverters(MySqlConnectorConfig conf
200200

201201
boolean timeAdjusterEnabled = configuration.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
202202
return new MySqlValueConverters(decimalMode, timePrecisionMode, bigIntUnsignedMode,
203-
CommonConnectorConfig.BinaryHandlingMode.BYTES,
203+
configuration.binaryHandlingMode(),
204204
timeAdjusterEnabled ? MySqlEventReader::adjustTemporal : x -> x,
205205
(message, exception) -> {
206206
throw new DebeziumException(message, exception);

mysql-delta-plugins/src/test/java/io/cdap/delta/mysql/MySqlEventReaderIntegrationTest.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ public class MySqlEventReaderIntegrationTest {
6464
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
6565
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
6666
Schema.Field.of("bday", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))));
67+
private static final String BINARYCOL_TABLE = "binarycoltable";
68+
private static final Schema EXPECTED_BINARYCOL_SCHEMA = Schema.recordOf(
69+
BINARYCOL_TABLE,
70+
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
71+
Schema.Field.of("bincol", Schema.of(Schema.Type.STRING)));
72+
6773
private static String password;
6874
private static int port;
6975

@@ -109,6 +115,13 @@ public static void setupClass() throws Exception {
109115
CUSTOMERS_TABLE));
110116
}
111117

118+
// create table with Binary col
119+
try (Statement statement = connection.createStatement()) {
120+
statement.execute(
121+
String.format("CREATE TABLE %s (id int PRIMARY KEY, bincol BINARY(16) not null)",
122+
BINARYCOL_TABLE));
123+
}
124+
112125
// insert sample data
113126
try (PreparedStatement ps = connection.prepareStatement(String.format("INSERT INTO %s VALUES (?, ?, ?)",
114127
CUSTOMERS_TABLE))) {
@@ -239,4 +252,30 @@ public void stopReaderTest() throws Exception {
239252
eventReader.stop();
240253
Assert.assertFalse(eventReader.failedToStop());
241254
}
242-
}
255+
256+
@Test
257+
public void testBinaryHandlingModebyDebezium() throws InterruptedException {
258+
//https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-binary-handling-mode
259+
SourceTable sourceTable = new SourceTable(DB, BINARYCOL_TABLE, null,
260+
Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
261+
262+
MockContext context = new MockContext(Driver.class);
263+
context.addRuntimeArgument(MySqlEventReader.SOURCE_CONNECTOR_PREFIX + "binary.handling.mode" , "HEX");
264+
265+
MockEventEmitter eventEmitter = new MockEventEmitter(4);
266+
MySqlConfig config = new MySqlConfig("localhost", port, "root", password, 13, DB,
267+
TimeZone.getDefault().getID());
268+
269+
MySqlEventReader eventReader = new MySqlEventReader(Collections.singleton(sourceTable), config,
270+
context, eventEmitter);
271+
eventReader.start(new Offset());
272+
273+
eventEmitter.waitForExpectedEvents(30, TimeUnit.SECONDS);
274+
275+
DDLEvent ddlEvent = eventEmitter.getDdlEvents().get(3);
276+
Assert.assertEquals(DDLOperation.Type.CREATE_TABLE, ddlEvent.getOperation().getType());
277+
Assert.assertEquals(DB, ddlEvent.getOperation().getDatabaseName());
278+
Assert.assertEquals(BINARYCOL_TABLE, ddlEvent.getOperation().getTableName());
279+
Assert.assertEquals(EXPECTED_BINARYCOL_SCHEMA, ddlEvent.getSchema());
280+
}
281+
}

0 commit comments

Comments
 (0)