Skip to content

Commit 65d199b

Browse files
committed
[FLINK-37548] Add Model DDL methods in TABLE API
1 parent 0e7b44c commit 65d199b

File tree

12 files changed

+1088
-22
lines changed

12 files changed

+1088
-22
lines changed

Diff for: docs/content.zh/docs/dev/python/table/table_environment.md

+66
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,50 @@ TableEnvironment API
199199
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.drop_table" name="link">}}
200200
</td>
201201
</tr>
202+
<tr>
203+
<td>
204+
<strong>create_model(model_path, model_descriptor, ignore_if_exists=False)</strong>
205+
</td>
206+
<td>
207+
Registers a `Model` object as a model similar to SQL Models.
208+
</td>
209+
<td class="text-center">
210+
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.create_model" name="link">}}
211+
</td>
212+
</tr>
213+
<tr>
214+
<td>
215+
<strong>create_temporary_model(model_path, model_descriptor, ignore_if_exists=False)</strong>
216+
</td>
217+
<td>
218+
Registers a `Model` object as a temporary model similar to SQL temporary Models.
219+
</td>
220+
<td class="text-center">
221+
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.create_temporary_model" name="link">}}
222+
</td>
223+
</tr>
224+
<tr>
225+
<td>
226+
<strong>drop_model(model_path, ignore_if_not_exists=True)</strong>
227+
</td>
228+
<td>
229+
Drops a model registered in the given path.
230+
</td>
231+
<td class="text-center">
232+
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.drop_model" name="link">}}
233+
</td>
234+
</tr>
235+
<tr>
236+
<td>
237+
<strong>drop_temporary_model(model_path, ignore_if_not_exists=True)</strong>
238+
</td>
239+
<td>
240+
Drops a temporary model registered in the given path.
241+
</td>
242+
<td class="text-center">
243+
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.drop_temporary_model" name="link">}}
244+
</td>
245+
</tr>
202246
<tr>
203247
<td>
204248
<strong>execute_sql(stmt)</strong>
@@ -791,6 +835,28 @@ table_env.get_config().set("pipeline.name", "my_first_job")
791835
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_temporary_views" name="链接">}}
792836
</td>
793837
</tr>
838+
<tr>
839+
<td>
840+
<strong>list_models()</strong>
841+
</td>
842+
<td>
843+
获取当前命名空间(当前 catalog 的当前数据库)中所有可用的模型和临时模型名称。
844+
</td>
845+
<td class="text-center">
846+
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_models" name="链接">}}
847+
</td>
848+
</tr>
849+
<tr>
850+
<td>
851+
<strong>list_temporary_models()</strong>
852+
</td>
853+
<td>
854+
获取当前命名空间(当前 catalog 的当前数据库)中所有可用的临时模型名称。
855+
</td>
856+
<td class="text-center">
857+
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_temporary_models" name="链接">}}
858+
</td>
859+
</tr>
794860
</tbody>
795861
</table>
796862

Diff for: docs/content/docs/dev/python/table/table_environment.md

+67
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,50 @@ These APIs are used to create/remove Table API/SQL Tables and write queries:
199199
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.drop_table" name="link">}}
200200
</td>
201201
</tr>
202+
<tr>
203+
<td>
204+
<strong>create_model(model_path, model_descriptor, ignore_if_exists=False)</strong>
205+
</td>
206+
<td>
207+
Registers a `Model` object as a model similar to SQL Models.
208+
</td>
209+
<td class="text-center">
210+
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.create_model" name="link">}}
211+
</td>
212+
</tr>
213+
<tr>
214+
<td>
215+
<strong>create_temporary_model(model_path, model_descriptor, ignore_if_exists=False)</strong>
216+
</td>
217+
<td>
218+
Registers a `Model` object as a temporary model similar to SQL temporary Models.
219+
</td>
220+
<td class="text-center">
221+
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.create_temporary_model" name="link">}}
222+
</td>
223+
</tr>
224+
<tr>
225+
<td>
226+
<strong>drop_model(model_path, ignore_if_not_exists=True)</strong>
227+
</td>
228+
<td>
229+
Drops a model registered in the given path.
230+
</td>
231+
<td class="text-center">
232+
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.drop_model" name="link">}}
233+
</td>
234+
</tr>
235+
<tr>
236+
<td>
237+
<strong>drop_temporary_model(model_path, ignore_if_not_exists=True)</strong>
238+
</td>
239+
<td>
240+
Drops a temporary model registered in the given path.
241+
</td>
242+
<td class="text-center">
243+
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.drop_temporary_model" name="link">}}
244+
</td>
245+
</tr>
202246
<tr>
203247
<td>
204248
<strong>execute_sql(stmt)</strong>
@@ -795,6 +839,29 @@ These APIs are used to access catalogs and modules. You can find more detailed i
795839
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_temporary_views" name="link">}}
796840
</td>
797841
</tr>
842+
<tr>
843+
<td>
844+
<strong>list_models()</strong>
845+
</td>
846+
<td>
847+
Gets the names of all models in the current database of the current catalog.
848+
It returns both temporary and models.
849+
</td>
850+
<td class="text-center">
851+
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_models" name="link">}}
852+
</td>
853+
</tr>
854+
<tr>
855+
<td>
856+
<strong>list_temporary_models()</strong>
857+
</td>
858+
<td>
859+
Gets the names of all temporary models available in the current namespace (the current database of the current catalog).
860+
</td>
861+
<td class="text-center">
862+
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_temporary_models" name="link">}}
863+
</td>
864+
</tr>
798865
</tbody>
799866
</table>
800867

Diff for: flink-python/docs/reference/pyflink.table/table_environment.rst

+6
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,17 @@ keyword, thus must be escaped) in a catalog named 'cat.1' and database named 'db
163163
TableEnvironment.create_temporary_table
164164
TableEnvironment.create_temporary_view
165165
TableEnvironment.create_view
166+
TableEnvironment.create_model
167+
TableEnvironment.create_temporary_model
166168
TableEnvironment.drop_function
167169
TableEnvironment.drop_table
168170
TableEnvironment.drop_temporary_function
169171
TableEnvironment.drop_temporary_system_function
170172
TableEnvironment.drop_temporary_table
171173
TableEnvironment.drop_temporary_view
172174
TableEnvironment.drop_view
175+
TableEnvironment.drop_model
176+
TableEnvironment.drop_temporary_model
173177
TableEnvironment.execute_plan
174178
TableEnvironment.execute_sql
175179
TableEnvironment.explain_sql
@@ -191,6 +195,8 @@ keyword, thus must be escaped) in a catalog named 'cat.1' and database named 'db
191195
TableEnvironment.list_temporary_views
192196
TableEnvironment.list_user_defined_functions
193197
TableEnvironment.list_views
198+
TableEnvironment.list_models
199+
TableEnvironment.list_temporary_models
194200
TableEnvironment.load_module
195201
TableEnvironment.load_plan
196202
TableEnvironment.create_catalog

Diff for: flink-python/pyflink/table/model_descriptor.py

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
from typing import Dict, Union, Optional
19+
20+
from pyflink.common.config_options import ConfigOption
21+
from pyflink.java_gateway import get_gateway
22+
from pyflink.table.schema import Schema
23+
24+
__all__ = ['ModelDescriptor']
25+
26+
27+
class ModelDescriptor(object):
28+
"""
29+
Describes a CatalogModel representing a model.
30+
31+
ModelDescriptor is a template for creating a CatalogModel instance. It closely resembles the
32+
"CREATE MODEL" SQL DDL statement, containing input schema, output schema, and other
33+
characteristics.
34+
35+
This can be used to register a model in the Table API.
36+
"""
37+
def __init__(self, j_model_descriptor):
38+
self._j_model_descriptor = j_model_descriptor
39+
40+
@staticmethod
41+
def for_provider(provider: str) -> 'ModelDescriptor.Builder':
42+
"""
43+
Creates a new :class:`~pyflink.table.ModelDescriptor.Builder` for a model using the given
44+
provider value.
45+
46+
:param provider: The provider value for the model.
47+
"""
48+
gateway = get_gateway()
49+
j_builder = gateway.jvm.ModelDescriptor.forProvider(provider)
50+
return ModelDescriptor.Builder(j_builder)
51+
52+
def get_options(self) -> Dict[str, str]:
53+
return self._j_model_descriptor.getOptions()
54+
55+
def get_input_schema(self) -> Optional[Schema]:
56+
j_input_schema = self._j_model_descriptor.getInputSchema()
57+
if j_input_schema.isPresent():
58+
return Schema(j_input_schema.get())
59+
else:
60+
return None
61+
62+
def get_output_schema(self) -> Optional[Schema]:
63+
j_output_schema = self._j_model_descriptor.getOutputSchema()
64+
if j_output_schema.isPresent():
65+
return Schema(j_output_schema.get())
66+
else:
67+
return None
68+
69+
def get_comment(self) -> Optional[str]:
70+
j_comment = self._j_model_descriptor.getComment()
71+
if j_comment.isPresent():
72+
return j_comment.get()
73+
else:
74+
return None
75+
76+
def __str__(self):
77+
return self._j_model_descriptor.toString()
78+
79+
def __eq__(self, other):
80+
return (self.__class__ == other.__class__ and
81+
self._j_model_descriptor.equals(other._j_model_descriptor))
82+
83+
def __hash__(self):
84+
return self._j_model_descriptor.hashCode()
85+
86+
class Builder(object):
87+
"""
88+
Builder for ModelDescriptor.
89+
"""
90+
91+
def __init__(self, j_builder):
92+
self._j_builder = j_builder
93+
94+
def input_schema(self, input_schema: Schema) -> 'ModelDescriptor.Builder':
95+
"""
96+
Define the input schema of the ModelDescriptor.
97+
"""
98+
self._j_builder.inputSchema(input_schema._j_schema)
99+
return self
100+
101+
def output_schema(self, output_schema: Schema) -> 'ModelDescriptor.Builder':
102+
"""
103+
Define the output schema of the ModelDescriptor.
104+
"""
105+
self._j_builder.outputSchema(output_schema._j_schema)
106+
return self
107+
108+
def option(self, key: Union[str, ConfigOption], value) -> 'ModelDescriptor.Builder':
109+
"""
110+
Sets the given option on the model.
111+
112+
Option keys must be fully specified.
113+
114+
Example:
115+
::
116+
117+
>>> ModelDescriptor.for_connector("OPENAI")\
118+
... .input_schema(input_schema)\
119+
... .output_schema(output_schema)\
120+
... .option("task", "regression")\
121+
... .build()
122+
123+
"""
124+
if isinstance(key, str):
125+
self._j_builder.option(key, value)
126+
else:
127+
self._j_builder.option(key._j_config_option, value)
128+
return self
129+
130+
def comment(self, comment: str) -> 'ModelDescriptor.Builder':
131+
"""
132+
Define the comment for this model.
133+
"""
134+
self._j_builder.comment(comment)
135+
return self
136+
137+
def build(self) -> 'ModelDescriptor':
138+
"""
139+
Returns an immutable instance of :class:`~pyflink.table.ModelDescriptor`.
140+
"""
141+
return ModelDescriptor(self._j_builder.build())

0 commit comments

Comments
 (0)