diff --git a/testgres/__init__.py b/testgres/__init__.py index 339ae62..959aaa4 100644 --- a/testgres/__init__.py +++ b/testgres/__init__.py @@ -39,7 +39,6 @@ from .utils import \ reserve_port, \ release_port, \ - bound_ports, \ get_bin_path, \ get_pg_config, \ get_pg_version @@ -51,6 +50,7 @@ from .config import testgres_config from .operations.os_ops import OsOperations, ConnectionParams +from .operations.os_ops import OsLockObj from .operations.local_ops import LocalOperations from .operations.remote_ops import RemoteOperations @@ -64,7 +64,8 @@ "XLogMethod", "IsolationLevel", "NodeStatus", "ProcessType", "DumpFormat", "PostgresNode", "NodeApp", "PortManager", - "reserve_port", "release_port", "bound_ports", "get_bin_path", "get_pg_config", "get_pg_version", + "reserve_port", "release_port", "get_bin_path", "get_pg_config", "get_pg_version", "First", "Any", - "OsOperations", "LocalOperations", "RemoteOperations", "ConnectionParams" + "OsOperations", "LocalOperations", "RemoteOperations", "ConnectionParams", + "OsLockObj", ] diff --git a/testgres/consts.py b/testgres/consts.py index 89c49ab..d358920 100644 --- a/testgres/consts.py +++ b/testgres/consts.py @@ -10,6 +10,10 @@ TMP_CACHE = 'tgsc_' TMP_BACKUP = 'tgsb_' +TMP_TESTGRES = "testgres" + +TMP_TESTGRES_PORTS = TMP_TESTGRES + "/ports" + # path to control file XLOG_CONTROL_FILE = "global/pg_control" diff --git a/testgres/impl/port_manager__generic.py b/testgres/impl/port_manager__generic.py index a51af2b..6794910 100755 --- a/testgres/impl/port_manager__generic.py +++ b/testgres/impl/port_manager__generic.py @@ -1,64 +1,142 @@ from ..operations.os_ops import OsOperations +from ..operations.os_ops import OsLockObj from ..port_manager import PortManager from ..exceptions import PortForException +from .. import consts +import os import threading import random import typing +import logging class PortManager__Generic(PortManager): + C_MIN_PORT_NUMBER = 1024 + C_MAX_PORT_NUMBER = 65535 + _os_ops: OsOperations _guard: object # TODO: is there better to use bitmap fot _available_ports? _available_ports: typing.Set[int] - _reserved_ports: typing.Set[int] + _reserved_ports: typing.Dict[int, OsLockObj] + + _lock_dir: str def __init__(self, os_ops: OsOperations): + assert __class__.C_MIN_PORT_NUMBER <= __class__.C_MAX_PORT_NUMBER + assert os_ops is not None assert isinstance(os_ops, OsOperations) self._os_ops = os_ops self._guard = threading.Lock() - self._available_ports: typing.Set[int] = set(range(1024, 65535)) - self._reserved_ports: typing.Set[int] = set() + + self._available_ports = set( + range(__class__.C_MIN_PORT_NUMBER, __class__.C_MAX_PORT_NUMBER + 1) + ) + assert len(self._available_ports) == ( + (__class__.C_MAX_PORT_NUMBER - __class__.C_MIN_PORT_NUMBER) + 1 + ) + + self._reserved_ports = dict() + self._lock_dir = None def reserve_port(self) -> int: assert self._guard is not None assert type(self._available_ports) == set # noqa: E721t - assert type(self._reserved_ports) == set # noqa: E721 + assert type(self._reserved_ports) == dict # noqa: E721 + assert isinstance(self._os_ops, OsOperations) with self._guard: + if self._lock_dir is None: + temp_dir = self._os_ops.get_tempdir() + assert type(temp_dir) == str # noqa: E721 + lock_dir = os.path.join(temp_dir, consts.TMP_TESTGRES_PORTS) + assert type(lock_dir) == str # noqa: E721 + self._os_ops.makedirs(lock_dir) + self._lock_dir = lock_dir + + assert self._lock_dir is not None + assert type(self._lock_dir) == str # noqa: E721 + t = tuple(self._available_ports) assert len(t) == len(self._available_ports) sampled_ports = random.sample(t, min(len(t), 100)) t = None for port in sampled_ports: + assert type(port) == int # noqa: E721 assert not (port in self._reserved_ports) assert port in self._available_ports + assert port >= __class__.C_MIN_PORT_NUMBER + assert port <= __class__.C_MAX_PORT_NUMBER + if not self._os_ops.is_port_free(port): continue - self._reserved_ports.add(port) - self._available_ports.discard(port) + try: + lock_path = self.helper__make_lock_path(port) + lock_obj = self._os_ops.create_lock_fs_obj(lock_path) # raise + except: # noqa: 722 + continue + + assert isinstance(lock_obj, OsLockObj) + assert self._os_ops.path_exists(lock_path) + + try: + self._reserved_ports[port] = lock_obj + except: # noqa: 722 + assert not (port in self._reserved_ports) + lock_obj.release() + raise + assert port in self._reserved_ports + self._available_ports.discard(port) assert not (port in self._available_ports) + __class__.helper__send_debug_msg("Port {} is reserved.".format(port)) return port raise PortForException("Can't select a port.") def release_port(self, number: int) -> None: assert type(number) == int # noqa: E721 + assert number >= __class__.C_MIN_PORT_NUMBER + assert number <= __class__.C_MAX_PORT_NUMBER assert self._guard is not None - assert type(self._reserved_ports) == set # noqa: E721 + assert type(self._reserved_ports) == dict # noqa: E721 with self._guard: assert number in self._reserved_ports assert not (number in self._available_ports) self._available_ports.add(number) - self._reserved_ports.discard(number) + lock_obj = self._reserved_ports.pop(number) assert not (number in self._reserved_ports) assert number in self._available_ports + assert isinstance(lock_obj, OsLockObj) + lock_obj.release() + + __class__.helper__send_debug_msg("Port {} is released.", number) + return + + @staticmethod + def helper__send_debug_msg(msg_template: str, *args) -> None: + assert msg_template is not None + assert str is not None + assert type(msg_template) == str # noqa: E721 + assert type(args) == tuple # noqa: E721 + assert msg_template != "" + s = "[port manager] " + s += msg_template.format(*args) + logging.debug(s) + + def helper__make_lock_path(self, port_number: int) -> str: + assert type(port_number) == int # noqa: E721 + # You have to call the reserve_port at first! + assert type(self._lock_dir) == str # noqa: E721 + + result = os.path.join(self._lock_dir, str(port_number) + ".lock") + assert type(result) == str # noqa: E721 + return result diff --git a/testgres/operations/local_ops.py b/testgres/operations/local_ops.py index ccf1ab8..e794507 100644 --- a/testgres/operations/local_ops.py +++ b/testgres/operations/local_ops.py @@ -15,6 +15,7 @@ from ..exceptions import ExecUtilException from ..exceptions import InvalidOperationException from .os_ops import ConnectionParams, OsOperations, get_default_encoding +from .os_ops import OsLockObj from .raise_error import RaiseError from .helpers import Helpers @@ -28,6 +29,23 @@ CMD_TIMEOUT_SEC = 60 +class LocalOsLockFsObj(OsLockObj): + _path: str + + def __init__(self, path: str): + assert type(path) == str # noqa: str + self._path = path + os.mkdir(path) # throw + assert os.path.exists(path) + self._path = path + + def release(self) -> None: + assert type(self._path) == str # noqa: str + assert os.path.exists(self._path) + os.rmdir(self._path) + self._path = None + + class LocalOperations(OsOperations): sm_single_instance: OsOperations = None sm_single_instance_guard = threading.Lock() @@ -535,3 +553,7 @@ def get_tempdir(self) -> str: assert type(r) == str # noqa: E721 assert os.path.exists(r) return r + + def create_lock_fs_obj(self, path: str) -> OsLockObj: + assert type(path) == str # noqa: E721 + return LocalOsLockFsObj(path) diff --git a/testgres/operations/os_ops.py b/testgres/operations/os_ops.py index 45e4f71..891c99b 100644 --- a/testgres/operations/os_ops.py +++ b/testgres/operations/os_ops.py @@ -16,6 +16,11 @@ def get_default_encoding(): return locale.getencoding() or 'UTF-8' +class OsLockObj: + def release(self) -> None: + raise NotImplementedError() + + class OsOperations: def __init__(self, username=None): self.ssh_key = None @@ -133,3 +138,7 @@ def is_port_free(self, number: int): def get_tempdir(self) -> str: raise NotImplementedError() + + def create_lock_fs_obj(self, path: str) -> OsLockObj: + assert type(path) == str # noqa: E721 + raise NotImplementedError() diff --git a/testgres/operations/remote_ops.py b/testgres/operations/remote_ops.py index a478b45..9ecfed5 100644 --- a/testgres/operations/remote_ops.py +++ b/testgres/operations/remote_ops.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import getpass import os import platform @@ -10,6 +12,7 @@ from ..exceptions import ExecUtilException from ..exceptions import InvalidOperationException from .os_ops import OsOperations, ConnectionParams, get_default_encoding +from .os_ops import OsLockObj from .raise_error import RaiseError from .helpers import Helpers @@ -40,6 +43,31 @@ def cmdline(self): return cmdline.split() +class RemoteOsLockFsObj(OsLockObj): + _os_ops: RemoteOperations + _path: str + + def __init__(self, os_ops: RemoteOperations, path: str): + assert isinstance(os_ops, RemoteOperations) + assert type(path) == str # noqa: str + + os_ops.makedir(path) # throw + assert os_ops.path_exists(path) + + self._os_ops = os_ops + self._path = path + + def release(self) -> None: + assert type(self._path) == str # noqa: str + assert isinstance(self._os_ops, RemoteOperations) + assert self._os_ops.path_exists(self._path) + + self._os_ops.rmdir(self._path) # throw + + self._path = None + self._os_ops = None + + class RemoteOperations(OsOperations): def __init__(self, conn_params: ConnectionParams): if not platform.system().lower() == "linux": @@ -687,6 +715,10 @@ def get_tempdir(self) -> str: assert type(temp_dir) == str # noqa: E721 return temp_dir + def create_lock_fs_obj(self, path: str) -> OsLockObj: + assert type(path) == str # noqa: E721 + return RemoteOsLockFsObj(self, path) + @staticmethod def _is_port_free__process_0(error: str) -> bool: assert type(error) == str # noqa: E721 diff --git a/testgres/utils.py b/testgres/utils.py index d231eec..d808665 100644 --- a/testgres/utils.py +++ b/testgres/utils.py @@ -30,9 +30,6 @@ # _old_port_manager = PortManager__Generic(LocalOperations.get_single_instance()) -# ports used by nodes -bound_ports = _old_port_manager._reserved_ports - # re-export version type class PgVer(Version): @@ -46,7 +43,7 @@ def __init__(self, version: str) -> None: def internal__reserve_port(): """ - Generate a new port and add it to 'bound_ports'. + Reserve a port. """ return _old_port_manager.reserve_port() diff --git a/tests/test_os_ops_common.py b/tests/test_os_ops_common.py index 5ae3a61..c999f44 100644 --- a/tests/test_os_ops_common.py +++ b/tests/test_os_ops_common.py @@ -17,6 +17,7 @@ from testgres import InvalidOperationException from testgres import ExecUtilException +from testgres.operations.os_ops import OsLockObj from concurrent.futures import ThreadPoolExecutor from concurrent.futures import Future as ThreadFuture @@ -1113,3 +1114,37 @@ class tadWorkerData: logging.info("Test is finished! Total error count is {}.".format(nErrors)) return + + def test_create_lock_fs_obj(self, os_ops: OsOperations): + assert isinstance(os_ops, OsOperations) + + tmp = os_ops.mkdtemp() + assert type(tmp) == str # noqa: E721 + assert os_ops.path_exists(tmp) + logging.info("tmp dir is [{}]".format(tmp)) + + p1 = os.path.join(tmp, "a.lock") + obj1 = os_ops.create_lock_fs_obj(p1) + assert obj1 is not None + assert isinstance(obj1, OsLockObj) + assert os_ops.path_exists(tmp) + assert os_ops.path_exists(p1) + + while True: + try: + os_ops.create_lock_fs_obj(p1) + except Exception as e: + logging.info("OK. We got the error ({}): {}".format(type(e).__name__, e)) + break + raise Exception("We wait the exception!") + + assert isinstance(obj1, OsLockObj) + assert os_ops.path_exists(tmp) + assert os_ops.path_exists(p1) + + obj1.release() + assert not os_ops.path_exists(p1) + + assert os_ops.path_exists(tmp) + os_ops.rmdir(tmp) + assert not os_ops.path_exists(tmp) diff --git a/tests/test_testgres_local.py b/tests/test_testgres_local.py index 63e5f37..d8765f8 100644 --- a/tests/test_testgres_local.py +++ b/tests/test_testgres_local.py @@ -19,7 +19,6 @@ from testgres import get_pg_version # NOTE: those are ugly imports -from testgres.utils import bound_ports from testgres.utils import PgVer from testgres.node import ProcessProxy @@ -90,40 +89,6 @@ def test_pg_config(self): b = get_pg_config() assert (id(a) != id(b)) - def test_ports_management(self): - assert bound_ports is not None - assert type(bound_ports) == set # noqa: E721 - - if len(bound_ports) != 0: - logging.warning("bound_ports is not empty: {0}".format(bound_ports)) - - stage0__bound_ports = bound_ports.copy() - - with get_new_node() as node: - assert bound_ports is not None - assert type(bound_ports) == set # noqa: E721 - - assert node.port is not None - assert type(node.port) == int # noqa: E721 - - logging.info("node port is {0}".format(node.port)) - - assert node.port in bound_ports - assert node.port not in stage0__bound_ports - - assert stage0__bound_ports <= bound_ports - assert len(stage0__bound_ports) + 1 == len(bound_ports) - - stage1__bound_ports = stage0__bound_ports.copy() - stage1__bound_ports.add(node.port) - - assert stage1__bound_ports == bound_ports - - # check that port has been freed successfully - assert bound_ports is not None - assert type(bound_ports) == set # noqa: E721 - assert bound_ports == stage0__bound_ports - def test_child_process_dies(self): # test for FileNotFound exception during child_processes() function cmd = ["timeout", "60"] if os.name == 'nt' else ["sleep", "60"]