Skip to content

Commit 4eae5d2

Browse files
authored
Merge pull request #4 from umanwizard/create_sources
Parse `CREATE SOURCES`
2 parents 0e68cee + 8b723fe commit 4eae5d2

File tree

4 files changed

+109
-29
lines changed

4 files changed

+109
-29
lines changed

src/ast/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,12 @@ pub enum Statement {
397397
schema: SourceSchema,
398398
with_options: Vec<SqlOption>,
399399
},
400+
/// CREATE SOURCES
401+
CreateSources {
402+
url: String,
403+
schema_registry: String,
404+
with_options: Vec<SqlOption>,
405+
},
400406
/// CREATE SINK
401407
CreateSink {
402408
name: ObjectName,
@@ -558,6 +564,22 @@ impl fmt::Display for Statement {
558564
}
559565
Ok(())
560566
}
567+
Statement::CreateSources {
568+
url,
569+
schema_registry,
570+
with_options,
571+
} => {
572+
write!(
573+
f,
574+
"CREATE SOURCES FROM {} USING SCHEMA REGISTRY {}",
575+
Value::SingleQuotedString(url.clone()).to_string(),
576+
Value::SingleQuotedString(schema_registry.clone()).to_string(),
577+
)?;
578+
if !with_options.is_empty() {
579+
write!(f, " WITH ({})", display_comma_separated(with_options))?;
580+
}
581+
Ok(())
582+
}
561583
Statement::CreateSink {
562584
name,
563585
from,

src/ast/visit.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,15 @@ pub trait Visit<'ast> {
316316
visit_create_source(self, name, url, schema, with_options)
317317
}
318318

319+
fn visit_create_sources(
320+
&mut self,
321+
url: &'ast String,
322+
schema_registry: &'ast String,
323+
with_options: &'ast Vec<SqlOption>,
324+
) {
325+
visit_create_sources(self, url, schema_registry, with_options)
326+
}
327+
319328
fn visit_source_schema(&mut self, source_schema: &'ast SourceSchema) {
320329
visit_source_schema(self, source_schema)
321330
}
@@ -504,6 +513,11 @@ pub fn visit_statement<'ast, V: Visit<'ast> + ?Sized>(visitor: &mut V, statement
504513
schema,
505514
with_options,
506515
} => visitor.visit_create_source(name, url, schema, with_options),
516+
Statement::CreateSources {
517+
url,
518+
schema_registry,
519+
with_options,
520+
} => visitor.visit_create_sources(url, schema_registry, with_options),
507521
Statement::CreateSink {
508522
name,
509523
from,
@@ -1103,6 +1117,19 @@ pub fn visit_create_source<'ast, V: Visit<'ast> + ?Sized>(
11031117
}
11041118
}
11051119

1120+
pub fn visit_create_sources<'ast, V: Visit<'ast> + ?Sized>(
1121+
visitor: &mut V,
1122+
url: &'ast String,
1123+
schema_registry: &'ast String,
1124+
with_options: &'ast Vec<SqlOption>,
1125+
) {
1126+
visitor.visit_literal_string(url);
1127+
visitor.visit_literal_string(schema_registry);
1128+
for option in with_options {
1129+
visitor.visit_option(option);
1130+
}
1131+
}
1132+
11061133
fn visit_source_schema<'ast, V: Visit<'ast> + ?Sized>(
11071134
visitor: &mut V,
11081135
source_schema: &'ast SourceSchema,

src/parser.rs

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,15 @@ impl Parser {
816816
}
817817
}
818818

819+
/// Bail out if the following tokens are not the excpected sequence of keywords,
820+
/// or consume them if they are
821+
pub fn expect_keywords(&mut self, expected: &[&'static str]) -> Result<(), ParserError> {
822+
for kw in expected {
823+
self.expect_keyword(kw)?;
824+
}
825+
Ok(())
826+
}
827+
819828
/// Consume the next token if it matches the expected token, otherwise return false
820829
#[must_use]
821830
pub fn consume_token(&mut self, expected: &Token) -> bool {
@@ -846,6 +855,8 @@ impl Parser {
846855
self.parse_create_view()
847856
} else if self.parse_keyword("SOURCE") {
848857
self.parse_create_source()
858+
} else if self.parse_keyword("SOURCES") {
859+
self.parse_create_sources()
849860
} else if self.parse_keyword("SINK") {
850861
self.parse_create_sink()
851862
} else if self.parse_keyword("EXTERNAL") {
@@ -869,10 +880,7 @@ impl Parser {
869880
} else {
870881
SourceSchema::Raw(self.parse_literal_string()?)
871882
};
872-
let mut with_options = vec![];
873-
if self.parse_keyword("WITH") {
874-
with_options = self.parse_with_options()?;
875-
}
883+
let with_options = self.parse_with_options()?;
876884
Ok(Statement::CreateSource {
877885
name,
878886
url,
@@ -881,16 +889,26 @@ impl Parser {
881889
})
882890
}
883891

892+
pub fn parse_create_sources(&mut self) -> Result<Statement, ParserError> {
893+
self.expect_keyword("FROM")?;
894+
let url = self.parse_literal_string()?;
895+
self.expect_keywords(&["USING", "SCHEMA", "REGISTRY"])?;
896+
let schema_registry = self.parse_literal_string()?;
897+
let with_options = self.parse_with_options()?;
898+
Ok(Statement::CreateSources {
899+
url,
900+
schema_registry,
901+
with_options,
902+
})
903+
}
904+
884905
pub fn parse_create_sink(&mut self) -> Result<Statement, ParserError> {
885906
let name = self.parse_object_name()?;
886907
self.expect_keyword("FROM")?;
887908
let from = self.parse_object_name()?;
888909
self.expect_keyword("INTO")?;
889910
let url = self.parse_literal_string()?;
890-
let mut with_options = vec![];
891-
if self.parse_keyword("WITH") {
892-
with_options = self.parse_with_options()?;
893-
}
911+
let with_options = self.parse_with_options()?;
894912
Ok(Statement::CreateSink {
895913
name,
896914
from,
@@ -928,11 +946,7 @@ impl Parser {
928946
// ANSI SQL and Postgres support RECURSIVE here, but we don't support it either.
929947
let name = self.parse_object_name()?;
930948
let columns = self.parse_parenthesized_column_list(Optional)?;
931-
let with_options = if self.parse_keyword("WITH") {
932-
self.parse_with_options()?
933-
} else {
934-
vec![]
935-
};
949+
let with_options = self.parse_with_options()?;
936950
self.expect_keyword("AS")?;
937951
let query = Box::new(self.parse_query()?);
938952
// Optional `WITH [ CASCADED | LOCAL ] CHECK OPTION` is widely supported here.
@@ -983,11 +997,7 @@ impl Parser {
983997
// parse optional column list (schema)
984998
let (columns, constraints) = self.parse_columns()?;
985999

986-
let with_options = if self.parse_keyword("WITH") {
987-
self.parse_with_options()?
988-
} else {
989-
vec![]
990-
};
1000+
let with_options = self.parse_with_options()?;
9911001

9921002
Ok(Statement::CreateTable {
9931003
name: table_name,
@@ -1135,19 +1145,23 @@ impl Parser {
11351145
}
11361146

11371147
pub fn parse_with_options(&mut self) -> Result<Vec<SqlOption>, ParserError> {
1138-
self.expect_token(&Token::LParen)?;
1139-
let mut options = vec![];
1140-
loop {
1141-
let name = self.parse_identifier()?;
1142-
self.expect_token(&Token::Eq)?;
1143-
let value = self.parse_value()?;
1144-
options.push(SqlOption { name, value });
1145-
if !self.consume_token(&Token::Comma) {
1146-
break;
1148+
if self.parse_keyword("WITH") {
1149+
self.expect_token(&Token::LParen)?;
1150+
let mut options = vec![];
1151+
loop {
1152+
let name = self.parse_identifier()?;
1153+
self.expect_token(&Token::Eq)?;
1154+
let value = self.parse_value()?;
1155+
options.push(SqlOption { name, value });
1156+
if !self.consume_token(&Token::Comma) {
1157+
break;
1158+
}
11471159
}
1160+
self.expect_token(&Token::RParen)?;
1161+
Ok(options)
1162+
} else {
1163+
Ok(vec![])
11481164
}
1149-
self.expect_token(&Token::RParen)?;
1150-
Ok(options)
11511165
}
11521166

11531167
pub fn parse_alter(&mut self) -> Result<Statement, ParserError> {

tests/sqlparser_common.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2216,6 +2216,23 @@ fn parse_create_source_registry() {
22162216
}
22172217
}
22182218

2219+
#[test]
2220+
fn parse_create_sources() {
2221+
let sql = "CREATE SOURCES FROM 'kafka://whatever' USING SCHEMA REGISTRY 'http://foo.bar:8081'";
2222+
match verified_stmt(sql) {
2223+
Statement::CreateSources {
2224+
url,
2225+
schema_registry,
2226+
with_options,
2227+
} => {
2228+
assert_eq!("kafka://whatever", url);
2229+
assert_eq!("http://foo.bar:8081", schema_registry);
2230+
assert!(with_options.is_empty());
2231+
}
2232+
_ => assert!(false)
2233+
}
2234+
}
2235+
22192236
#[test]
22202237
fn parse_create_sink() {
22212238
let sql = "CREATE SINK foo FROM bar INTO 'baz' WITH (name = 'val')";

0 commit comments

Comments
 (0)