Skip to content

Commit 65e3505

Browse files
authored
CDAP-1938901: Fix assessment of time, datetime and datetime2 types for MS SQL (#215)
* CDAP-1938901: Fix assessment of datetime types for MS SQL * CDAP-1938901: Added unit tests
1 parent ef0427f commit 65e3505

File tree

2 files changed

+177
-14
lines changed

2 files changed

+177
-14
lines changed

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerTableAssessor.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ public class SqlServerTableAssessor implements TableAssessor<TableDetail> {
4141
static final String TYPE_NAME = "TYPE_NAME";
4242
private static final String GEOMETRY = "GEOMETRY";
4343
private static final String GEOGRAPHY = "GEOGRAPHY";
44-
private static final String DATETIME2 = "DATETIME2";
44+
static final String DATETIME2 = "DATETIME2";
45+
public static final int MAX_SUPPORTED_SCALE = 6;
46+
public static final int MILLIS_SCALE = 3;
4547

4648
@Override
4749
public TableAssessment assess(TableDetail tableDetail) {
@@ -102,19 +104,27 @@ static ColumnEvaluation evaluateColumn(ColumnDetail detail) throws IllegalArgume
102104
schema = Schema.of(Schema.LogicalType.DATE);
103105
break;
104106
case Types.TIME:
105-
support = ColumnSupport.PARTIAL;
106-
suggestion = new ColumnSuggestion("The precision will be reduced to microseconds from nanoseconds",
107-
Collections.emptyList());
108-
schema = Schema.of(Schema.LogicalType.TIME_MICROS);
109-
break;
110-
case Types.TIMESTAMP:
111-
if (DATETIME2.equals(upperCaseTypeName)) {
107+
scale = Integer.parseInt(properties.get(SCALE));
108+
if (scale > MAX_SUPPORTED_SCALE) {
112109
support = ColumnSupport.PARTIAL;
113110
suggestion = new ColumnSuggestion("The precision will be reduced to microseconds from nanoseconds",
114111
Collections.emptyList());
115-
schema = Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
112+
}
113+
if (scale <= MILLIS_SCALE) {
114+
schema = Schema.of(Schema.LogicalType.TIME_MILLIS);
116115
} else {
117-
schema = Schema.of(Schema.LogicalType.TIMESTAMP_MILLIS);
116+
schema = Schema.of(Schema.LogicalType.TIME_MICROS);
117+
}
118+
break;
119+
case Types.TIMESTAMP:
120+
schema = Schema.of(Schema.LogicalType.DATETIME);
121+
if (DATETIME2.equals(upperCaseTypeName)) {
122+
scale = Integer.parseInt(properties.get(SCALE));
123+
if (scale > MAX_SUPPORTED_SCALE) {
124+
support = ColumnSupport.PARTIAL;
125+
suggestion = new ColumnSuggestion("The precision will be reduced to microseconds from nanoseconds",
126+
Collections.emptyList());
127+
}
118128
}
119129
break;
120130

@@ -150,11 +160,11 @@ static ColumnEvaluation evaluateColumn(ColumnDetail detail) throws IllegalArgume
150160
}
151161

152162
Schema.Field field = schema == null ? null :
153-
Schema.Field.of(detail.getName(), detail.isNullable() ? Schema.nullableOf(schema) : schema);
163+
Schema.Field.of(detail.getName(), detail.isNullable() ? Schema.nullableOf(schema) : schema);
154164
ColumnAssessment assessment = ColumnAssessment.builder(detail.getName(), detail.getType().getName())
155-
.setSupport(support)
156-
.setSuggestion(suggestion)
157-
.build();
165+
.setSupport(support)
166+
.setSuggestion(suggestion)
167+
.build();
158168
return new ColumnEvaluation(field, assessment);
159169
}
160170
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright © 2022 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package io.cdap.delta.sqlserver;
17+
18+
import io.cdap.cdap.api.data.schema.Schema;
19+
import io.cdap.delta.api.assessment.ColumnDetail;
20+
import io.cdap.delta.api.assessment.ColumnSupport;
21+
import io.cdap.delta.api.assessment.TableAssessment;
22+
import io.cdap.delta.api.assessment.TableDetail;
23+
import io.cdap.delta.plugin.common.ColumnEvaluation;
24+
import org.junit.Assert;
25+
import org.junit.Test;
26+
27+
import java.sql.JDBCType;
28+
import java.util.Arrays;
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
32+
public class SqlServerTableAssessorTest {
33+
34+
private static final String SCALE_100_NANOS = "7";
35+
private static final String SCALE_MICROS = "6";
36+
private static final String SCALE_MILLIS = "3";
37+
private static final String DATETIME = "DATETIME";
38+
private static final String TIME = "TIME";
39+
private static final String DB = "db";
40+
private static final String TABLE = "table";
41+
private static final String SCHEMA = "schema";
42+
private static final String UPDATED_TIME_COLUMN = "updated_time";
43+
private static final String UPDATED_AT_COLUMN = "updated_at";
44+
45+
private SqlServerTableAssessor tableAssessor = new SqlServerTableAssessor();
46+
47+
@Test
48+
public void testDateTime2WithNanoScaleMapping() {
49+
Map<String, String> props = new HashMap<>();
50+
props.put(SqlServerTableAssessor.TYPE_NAME, SqlServerTableAssessor.DATETIME2);
51+
props.put(SqlServerTableAssessor.SCALE, SCALE_100_NANOS);
52+
ColumnDetail columnDetail = new ColumnDetail(UPDATED_AT_COLUMN, JDBCType.TIMESTAMP, true, props);
53+
TableDetail tableDetail = new TableDetail.Builder(DB, TABLE, SCHEMA)
54+
.setColumns(Arrays.asList(columnDetail)).build();
55+
56+
ColumnEvaluation columnEvaluation = SqlServerTableAssessor.evaluateColumn(columnDetail);
57+
TableAssessment assessment = tableAssessor.assess(tableDetail);
58+
59+
Assert.assertEquals(Schema.LogicalType.DATETIME,
60+
columnEvaluation.getField().getSchema().getNonNullable().getLogicalType());
61+
Assert.assertEquals(1, assessment.getColumns().size());
62+
Assert.assertEquals(ColumnSupport.PARTIAL, assessment.getColumns().get(0).getSupport());
63+
}
64+
65+
@Test
66+
public void testDateTime2WithMicrosScaleMapping() {
67+
Map<String, String> props = new HashMap<>();
68+
props.put(SqlServerTableAssessor.TYPE_NAME, SqlServerTableAssessor.DATETIME2);
69+
props.put(SqlServerTableAssessor.SCALE, SCALE_MICROS);
70+
ColumnDetail columnDetail = new ColumnDetail(UPDATED_AT_COLUMN, JDBCType.TIMESTAMP, true, props);
71+
TableDetail tableDetail = new TableDetail.Builder(DB, TABLE, SCHEMA)
72+
.setColumns(Arrays.asList(columnDetail)).build();
73+
74+
ColumnEvaluation columnEvaluation = SqlServerTableAssessor.evaluateColumn(columnDetail);
75+
TableAssessment assessment = tableAssessor.assess(tableDetail);
76+
77+
Assert.assertEquals(Schema.LogicalType.DATETIME,
78+
columnEvaluation.getField().getSchema().getNonNullable().getLogicalType());
79+
Assert.assertEquals(1, assessment.getColumns().size());
80+
Assert.assertEquals(ColumnSupport.YES, assessment.getColumns().get(0).getSupport());
81+
}
82+
83+
@Test
84+
public void testDateTimeMapping() {
85+
Map<String, String> props = new HashMap<>();
86+
props.put(SqlServerTableAssessor.TYPE_NAME, DATETIME);
87+
ColumnDetail columnDetail = new ColumnDetail(UPDATED_AT_COLUMN, JDBCType.TIMESTAMP, true, props);
88+
TableDetail tableDetail = new TableDetail.Builder(DB, TABLE, SCHEMA)
89+
.setColumns(Arrays.asList(columnDetail)).build();
90+
91+
ColumnEvaluation columnEvaluation = SqlServerTableAssessor.evaluateColumn(columnDetail);
92+
TableAssessment assessment = tableAssessor.assess(tableDetail);
93+
94+
Assert.assertEquals(Schema.LogicalType.DATETIME,
95+
columnEvaluation.getField().getSchema().getNonNullable().getLogicalType());
96+
Assert.assertEquals(1, assessment.getColumns().size());
97+
Assert.assertEquals(ColumnSupport.YES, assessment.getColumns().get(0).getSupport());
98+
}
99+
100+
@Test
101+
public void testTimeWithNanoScaleMapping() {
102+
Map<String, String> props = new HashMap<>();
103+
props.put(SqlServerTableAssessor.TYPE_NAME, TIME);
104+
props.put(SqlServerTableAssessor.SCALE, SCALE_100_NANOS);
105+
ColumnDetail columnDetail = new ColumnDetail(UPDATED_TIME_COLUMN, JDBCType.TIME, true, props);
106+
TableDetail tableDetail = new TableDetail.Builder(DB, TABLE, SCHEMA)
107+
.setColumns(Arrays.asList(columnDetail)).build();
108+
109+
ColumnEvaluation columnEvaluation = SqlServerTableAssessor.evaluateColumn(columnDetail);
110+
TableAssessment assessment = tableAssessor.assess(tableDetail);
111+
112+
Assert.assertEquals(Schema.LogicalType.TIME_MICROS,
113+
columnEvaluation.getField().getSchema().getNonNullable().getLogicalType());
114+
Assert.assertEquals(1, assessment.getColumns().size());
115+
Assert.assertEquals(ColumnSupport.PARTIAL, assessment.getColumns().get(0).getSupport());
116+
}
117+
118+
@Test
119+
public void testTimeWithMicrosScaleMapping() {
120+
Map<String, String> props = new HashMap<>();
121+
props.put(SqlServerTableAssessor.TYPE_NAME, TIME);
122+
props.put(SqlServerTableAssessor.SCALE, SCALE_MICROS);
123+
ColumnDetail columnDetail = new ColumnDetail(UPDATED_TIME_COLUMN, JDBCType.TIME, true, props);
124+
TableDetail tableDetail = new TableDetail.Builder(DB, TABLE, SCHEMA)
125+
.setColumns(Arrays.asList(columnDetail)).build();
126+
127+
ColumnEvaluation columnEvaluation = SqlServerTableAssessor.evaluateColumn(columnDetail);
128+
TableAssessment assessment = tableAssessor.assess(tableDetail);
129+
130+
Assert.assertEquals(Schema.LogicalType.TIME_MICROS,
131+
columnEvaluation.getField().getSchema().getNonNullable().getLogicalType());
132+
Assert.assertEquals(1, assessment.getColumns().size());
133+
Assert.assertEquals(ColumnSupport.YES, assessment.getColumns().get(0).getSupport());
134+
}
135+
136+
@Test
137+
public void testTimeWithMillisScaleMapping() {
138+
Map<String, String> props = new HashMap<>();
139+
props.put(SqlServerTableAssessor.TYPE_NAME, TIME);
140+
props.put(SqlServerTableAssessor.SCALE, SCALE_MILLIS);
141+
ColumnDetail columnDetail = new ColumnDetail(UPDATED_TIME_COLUMN, JDBCType.TIME, true, props);
142+
TableDetail tableDetail = new TableDetail.Builder(DB, TABLE, SCHEMA)
143+
.setColumns(Arrays.asList(columnDetail)).build();
144+
145+
ColumnEvaluation columnEvaluation = SqlServerTableAssessor.evaluateColumn(columnDetail);
146+
TableAssessment assessment = tableAssessor.assess(tableDetail);
147+
148+
Assert.assertEquals(Schema.LogicalType.TIME_MILLIS,
149+
columnEvaluation.getField().getSchema().getNonNullable().getLogicalType());
150+
Assert.assertEquals(1, assessment.getColumns().size());
151+
Assert.assertEquals(ColumnSupport.YES, assessment.getColumns().get(0).getSupport());
152+
}
153+
}

0 commit comments

Comments
 (0)