Skip to content

Commit 4781ba6

Browse files
authored
[flink] Add default_catalog and default_database option (#4015)
- `default_catalog` and `default_database` parameters make it possible to switch to desired Flink catalog and database automatically after the session is created
1 parent b5fc7fa commit 4781ba6

File tree

1 file changed

+22
-0
lines changed

1 file changed

+22
-0
lines changed

desktop/libs/notebook/src/notebook/connectors/flink_sql.py

+22
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
_API_VERSION = 'v3'
3434
SESSION_KEY = '%(username)s-%(connector_name)s'
3535
OPERATION_TOKEN = '%(username)s-%(connector_name)s' + '-operation-token'
36+
DEFAULT_CATALOG_PARAM = "default_catalog"
37+
DEFAULT_DATABASE_PARAM = "default_database"
3638

3739

3840
def query_error_handler(func):
@@ -70,6 +72,8 @@ def __init__(self, user, interpreter=None):
7072

7173
self.options = interpreter['options']
7274
api_url = self.options['url']
75+
self.default_catalog = self.options.get(DEFAULT_CATALOG_PARAM)
76+
self.default_database = self.options.get(DEFAULT_DATABASE_PARAM)
7377

7478
self.db = FlinkSqlClient(user=user, api_url=api_url)
7579

@@ -159,6 +163,10 @@ def _get_session(self):
159163

160164
if not session:
161165
session = self.db.create_session()
166+
if self.default_database:
167+
self._use_database(self.default_catalog, self.default_database)
168+
elif self.default_catalog:
169+
self._use_catalog(self.default_catalog)
162170

163171
try:
164172
self.db.session_heartbeat(session_handle=session['sessionHandle'])
@@ -453,6 +461,20 @@ def _show_functions(self, database):
453461

454462
return [{'name': function[0]} for function in function_list]
455463

464+
def _use_catalog(self, catalog):
465+
session = self._get_session()
466+
self.db.configure_session(session_handle=(session['id']), statement="USE CATALOG `%s`" % catalog)
467+
468+
def _use_database(self, catalog, database):
469+
session = self._get_session()
470+
if catalog:
471+
self.db.configure_session(session_handle=(session['id']),
472+
statement="USE `%(catalog)s`.`%(database)s`" % {'catalog': catalog,
473+
'database': database})
474+
else:
475+
self.db.configure_session(session_handle=(session['id']),
476+
statement="USE `%(database)s`" % {'database': database})
477+
456478

457479
class FlinkSqlClient:
458480
"""

0 commit comments

Comments
 (0)