Skip to content

Commit e6f52d9

Browse files
authored
Throw Runtime Exception for tables without primary key (CDAP-19607) (#210)
* Throw Runtime Exception when pipeline is run without primary key in table * Resolved all comments(except adding test for scenario where primary key exists)
1 parent f2327ed commit e6f52d9

File tree

3 files changed

+86
-0
lines changed

3 files changed

+86
-0
lines changed

sqlserver-delta-plugins/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
<port.file>${project.build.outputDirectory}/port.properties</port.file>
3232
<sqlserver.image>mcr.microsoft.com/mssql/server:2017-CU19-ubuntu-16.04</sqlserver.image>
3333
<sqlserver.password>D3ltaPass</sqlserver.password>
34+
<powermock.version>2.0.9</powermock.version>
3435
</properties>
3536

3637
<dependencies>
@@ -56,6 +57,18 @@
5657
<version>8.2.1.jre8</version>
5758
<scope>test</scope>
5859
</dependency>
60+
<dependency>
61+
<groupId>org.powermock</groupId>
62+
<artifactId>powermock-module-junit4</artifactId>
63+
<version>${powermock.version}</version>
64+
<scope>test</scope>
65+
</dependency>
66+
<dependency>
67+
<groupId>org.powermock</groupId>
68+
<artifactId>powermock-api-mockito2</artifactId>
69+
<version>${powermock.version}</version>
70+
<scope>test</scope>
71+
</dependency>
5972
</dependencies>
6073

6174
<build>

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerRecordConsumer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.cdap.delta.api.DDLOperation;
2323
import io.cdap.delta.api.DMLEvent;
2424
import io.cdap.delta.api.DMLOperation;
25+
import io.cdap.delta.api.DeltaFailureRuntimeException;
2526
import io.cdap.delta.api.DeltaSourceContext;
2627
import io.cdap.delta.api.EventEmitter;
2728
import io.cdap.delta.api.Offset;
@@ -122,6 +123,11 @@ public void accept(SourceRecord sourceRecord) {
122123
// shouldn't happen
123124
return;
124125
}
126+
if (sourceRecord.key() == null) {
127+
throw new DeltaFailureRuntimeException(String.format("Table '%s' in database '%s' has no primary key. " +
128+
"Tables without a primary key are" +
129+
" not supported.", tableName, databaseName));
130+
}
125131

126132
StructuredRecord before = val.get("before");
127133
StructuredRecord after = val.get("after");
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright © 2022 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.delta.sqlserver;
18+
19+
import com.microsoft.sqlserver.jdbc.SQLServerDriver;
20+
import io.cdap.delta.api.DeltaFailureRuntimeException;
21+
import io.cdap.delta.api.DeltaSourceContext;
22+
import io.cdap.delta.api.Offset;
23+
import io.cdap.delta.plugin.mock.MockContext;
24+
import io.cdap.delta.plugin.mock.MockEventEmitter;
25+
import org.apache.kafka.connect.data.ConnectSchema;
26+
import org.apache.kafka.connect.data.Field;
27+
import org.apache.kafka.connect.data.Schema;
28+
import org.apache.kafka.connect.data.Struct;
29+
import org.apache.kafka.connect.source.SourceRecord;
30+
import org.junit.Test;
31+
import org.junit.runner.RunWith;
32+
import org.mockito.Mockito;
33+
import org.mockito.junit.MockitoJUnitRunner;
34+
35+
import java.util.Arrays;
36+
import java.util.HashMap;
37+
import java.util.HashSet;
38+
import java.util.List;
39+
40+
@RunWith(MockitoJUnitRunner.class)
41+
public class SqlServerRecordConsumerTest {
42+
private static final String DATABASE = "AdventureWorks2014";
43+
private static final String TOPICNAME = "dbo.testreplication.npe";
44+
45+
@Test(expected = DeltaFailureRuntimeException.class)
46+
public void testTableWithoutPrimaryKey() {
47+
DeltaSourceContext context = new MockContext(SQLServerDriver.class);
48+
MockEventEmitter eventEmitter = new MockEventEmitter(5);
49+
SqlServerRecordConsumer sqlServerRecordConsumer = new SqlServerRecordConsumer
50+
(context, eventEmitter, DATABASE, new HashSet<>(), new HashMap<>(), new Offset(), true);
51+
SourceRecord sourceRecordMock = Mockito.mock(SourceRecord.class);
52+
// the topic name should be in this form: [db.server.name].[schema].[table]
53+
Mockito.when(sourceRecordMock.topic()).thenReturn(TOPICNAME);
54+
List<Field> fields = Arrays.asList(new Field("op", 0, new ConnectSchema(Schema.Type.STRING)));
55+
Struct valueStruct = new Struct(new ConnectSchema(Schema.Type.STRUCT, false, null,
56+
null, null, null, null, fields, null, null));
57+
valueStruct.put("op", "c");
58+
Mockito.when(sourceRecordMock.value()).thenReturn(valueStruct);
59+
Mockito.when(sourceRecordMock.sourceOffset()).thenReturn(new HashMap() {{
60+
put("snapshot", true);
61+
}});
62+
//setting primary key as NULL
63+
Mockito.when(sourceRecordMock.key()).thenReturn(null);
64+
sqlServerRecordConsumer.accept(sourceRecordMock);
65+
}
66+
67+
}

0 commit comments

Comments
 (0)