Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[computes] fixed session-type extraction for connectors #3724

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions apps/beeswax/src/beeswax/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,26 @@ def tokenize_and_convert(item, key=None):
return sorted(collection, key=lambda i: tokenize_and_convert(i, key=key))


def is_compute(cluster):
def find_compute_in_cluster(cluster):
if not cluster:
return False
return None
connector = cluster.get('connector')
compute = cluster.get('compute')

def compute_check(x):
def _compute_check(x):
return x and x.get('type') in COMPUTE_TYPES
return compute_check(cluster) or compute_check(connector) or compute_check(compute)

return (
cluster if _compute_check(cluster)
else compute if _compute_check(compute)
else connector if _compute_check(connector) else None)


def extract_session_type(snippet):
compute = find_compute_in_cluster(snippet)
if compute and compute.get('name'):
return compute['name']
return snippet.get('type') if snippet else None


'''
Expand All @@ -119,13 +130,8 @@ def find_compute(cluster=None, user=None, dialect=None, namespace_id=None):
connector = cluster.get('connector')
compute = cluster.get('compute')

def compute_check(x):
return x and x.get('type') in COMPUTE_TYPES

# Pick the most probable compute object
selected_compute = (cluster if compute_check(cluster)
else compute if compute_check(compute)
else connector if compute_check(connector) else None)
selected_compute = find_compute_in_cluster(cluster)

# If found, we will attempt to reload it, first by id then by name
if selected_compute:
Expand Down
12 changes: 6 additions & 6 deletions apps/beeswax/src/beeswax/server/dbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from kazoo.client import KazooClient

from azure.abfs import abfspath
from beeswax.common import apply_natural_sort, is_compute
from beeswax.common import apply_natural_sort, find_compute_in_cluster
from beeswax.conf import (
APPLY_NATURAL_SORT_MAX,
AUTH_PASSWORD,
Expand Down Expand Up @@ -164,7 +164,7 @@ def get(user, query_server=None, cluster=None):


def get_query_server_config(name='beeswax', connector=None):
if connector and (has_connectors() or is_compute(connector)):
if connector and (has_connectors() or find_compute_in_cluster(connector)):
LOG.debug("Query via connector %s (%s)" % (name, connector.get('type')))
query_server = get_query_server_config_via_connector(connector)
else:
Expand Down Expand Up @@ -1042,14 +1042,14 @@ def use(self, database, session=None):
query = hql_query('USE `%s`' % database)
return self.client.use(query, session=session)

def get_log(self, query_handle, start_over=True):
return self.client.get_log(query_handle, start_over)
def get_log(self, query_handle, start_over=True, session=None):
return self.client.get_log(query_handle, start_over, session=session)

def get_state(self, handle):
return self.client.get_state(handle)

def get_operation_status(self, handle):
return self.client.get_operation_status(handle)
def get_operation_status(self, handle, session=None):
return self.client.get_operation_status(handle, session=session)

def execute_and_wait(self, query, timeout_sec=30.0, sleep_interval=0.5):
"""
Expand Down
2 changes: 1 addition & 1 deletion apps/beeswax/src/beeswax/server/hive_metastore_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def get_state(self, handle):
def close(self, handle):
pass

def get_operation_status(self, handle):
def get_operation_status(self, handle, session=None):
return MockFinishedOperation()

def get_default_configuration(self, *args, **kwargs):
Expand Down
18 changes: 9 additions & 9 deletions apps/beeswax/src/beeswax/server/hive_server2_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1103,18 +1103,18 @@ def fetch_log(self, operation_handle, orientation=TFetchOrientation.FETCH_NEXT,

return '\n'.join(lines)

def get_operation_status(self, operation_handle):
def get_operation_status(self, operation_handle, session=None):
req = TGetOperationStatusReq(operationHandle=operation_handle)
(res, session) = self.call(self._client.GetOperationStatus, req)
(res, session) = self.call(self._client.GetOperationStatus, req, session=session)
return res

def get_log(self, operation_handle):
def get_log(self, operation_handle, session=None):
try:
req = TGetLogReq(operationHandle=operation_handle)
(res, session) = self.call(self._client.GetLog, req)
(res, session) = self.call(self._client.GetLog, req, session=session)
return res.log
except Exception as e:
if 'Invalid query handle' in str(e):
if 'Invalid query handle' in str(e) or 'Invalid or unknown query handle' in str(e):
message = 'Invalid query handle'
LOG.error('%s: %s' % (message, e))
else:
Expand Down Expand Up @@ -1436,9 +1436,9 @@ def get_state(self, handle):
res = self._client.get_operation_status(operationHandle)
return HiveServerQueryHistory.STATE_MAP[res.operationState]

def get_operation_status(self, handle):
def get_operation_status(self, handle, session=None):
operationHandle = handle.get_rpc_handle()
return self._client.get_operation_status(operationHandle)
return self._client.get_operation_status(operationHandle, session=session)

def use(self, query, session=None):
data = self._client.execute_query(query, session=session)
Expand Down Expand Up @@ -1482,11 +1482,11 @@ def close_session(self, session):
def dump_config(self):
return 'Does not exist in HS2'

def get_log(self, handle, start_over=True):
def get_log(self, handle, start_over=True, session=None):
operationHandle = handle.get_rpc_handle()

if beeswax_conf.USE_GET_LOG_API.get() or self.query_server.get('dialect') == 'impala':
return self._client.get_log(operationHandle)
return self._client.get_log(operationHandle, session=session)
else:
if start_over:
orientation = TFetchOrientation.FETCH_FIRST
Expand Down
3 changes: 2 additions & 1 deletion apps/jobbrowser/src/jobbrowser/apis/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from babel import localtime
from django.utils.translation import gettext as _

from beeswax.common import extract_session_type
from desktop.lib import export_csvxls
from impala.conf import COORDINATOR_UI_SPNEGO
from jobbrowser.apis.base_api import Api
Expand All @@ -54,7 +55,7 @@ def _get_api(user, cluster=None):
server_url = compute['options'].get('api_url')
else:
# TODO: multi computes if snippet.get('compute') or snippet['type'] has computes
application = cluster['compute']['type'] if cluster.get('compute') else cluster.get('interface', 'impala')
application = extract_session_type(cluster) or 'impala'
session = Session.objects.get_session(user, application=application)
server_url = _get_impala_server_url(session)
return get_impalad_api(user=user, url=server_url)
Expand Down
6 changes: 3 additions & 3 deletions desktop/libs/notebook/src/notebook/connectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from django.utils.encoding import smart_str
from django.utils.translation import gettext as _

from beeswax.common import find_compute, is_compute
from beeswax.common import find_compute, find_compute_in_cluster
from desktop.auth.backend import is_admin
from desktop.conf import TASK_SERVER, has_connectors, is_cdw_compute_enabled
from desktop.lib import export_csvxls
Expand Down Expand Up @@ -402,7 +402,7 @@ def patch_snippet_for_connector(snippet, user=None):
Connector backward compatibility switcher.
# TODO Connector unification
"""
if is_compute(snippet):
if find_compute_in_cluster(snippet):
snippet['connector'] = find_compute(cluster=snippet, user=user)
if snippet['connector'] and snippet['connector'].get('dialect'):
snippet['dialect'] = snippet['connector']['dialect']
Expand Down Expand Up @@ -433,7 +433,7 @@ def get_api(request, snippet):
if has_connectors() and snippet.get('type') == 'hello' and is_admin(request.user):
LOG.debug('Using the interpreter from snippet')
interpreter = snippet.get('interpreter')
elif is_cdw_compute_enabled():
elif find_compute_in_cluster(snippet):
LOG.debug("Finding the compute from db using snippet: %s" % snippet)
interpreter = find_compute(cluster=snippet, user=request.user)
if interpreter is None:
Expand Down
5 changes: 2 additions & 3 deletions desktop/libs/notebook/src/notebook/connectors/hiveserver2.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from django.urls import reverse
from django.utils.translation import gettext as _

from beeswax.common import is_compute
from beeswax.common import extract_session_type
from desktop.auth.backend import is_admin
from desktop.conf import USE_DEFAULT_CONFIGURATION, has_connectors
from desktop.lib.conf import BoundConfig
Expand Down Expand Up @@ -321,8 +321,7 @@ def execute(self, notebook, snippet):
db = self._get_db(snippet, interpreter=self.interpreter)

statement = self._get_current_statement(notebook, snippet)
compute = snippet.get('compute', {})
session_type = compute['name'] if is_compute(snippet) and compute.get('name') else snippet['type']
session_type = extract_session_type(snippet)
session = self._get_session(notebook, session_type)

query = self._prepare_hql_query(snippet, statement['statement'], session)
Expand Down