diff --git a/.gitignore b/.gitignore index 8392c37..a7eb277 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ dump.rdb # Distribution / packaging .Python +.idea/ env/ bin/ build/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ff03e22 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM python:2 +WORKDIR /dist +RUN pip install hiredis retrying Werkzeug click + +ADD ./ /dist/ diff --git a/README.md b/README.md index 5578276..a96c397 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,10 @@ See also https://github.com/antirez/redis/blob/3.0/src/redis-trib.rb Create a cluster in some redis nodes (the nodes are cluster enabled, and none of them in any cluster) redis-trib.py create NODE_HOST_a:PORT_a NODE_HOST_b:PORT_b ... + +Use with password + + redis-trib.py create --password MyPassword NODE_HOST_a:PORT_a NODE_HOST_b:PORT_b ... Add another node to a cluster, but neither set as slave nor migrating slots to it @@ -166,3 +170,8 @@ Output: * `master_id`: master's `node_id` if it's a slave, or `None` otherwise * `assigned_slots`: a list of assigned slots if it's a master; it won't contain slots being migrated * `slots_migrating`: boolean value for whether there are any slot(s) migrating or importing on this node + +For sett password for all nodes use: + + r = redistrib.clusternode.ClusterNode(...) + r.password = MyPassword \ No newline at end of file diff --git a/redistrib/__init__.py b/redistrib/__init__.py index 0ebbd51..f5d9a42 100644 --- a/redistrib/__init__.py +++ b/redistrib/__init__.py @@ -1,2 +1,2 @@ -__version__ = '0.5.0' -REPO = 'https://github.com/HunanTV/redis-trib.py' +__version__ = '0.0.1' +REPO = 'https://github.com/Negashev/redis-trib.py' \ No newline at end of file diff --git a/redistrib/clusternode.py b/redistrib/clusternode.py index f4492f7..dbaa86d 100644 --- a/redistrib/clusternode.py +++ b/redistrib/clusternode.py @@ -2,6 +2,7 @@ from connection import Connection + class ClusterNode(object): def __init__(self, node_id, latest_know_ip_address_and_port, flags, master_id, last_ping_sent_time, last_pong_received_time, @@ -10,6 +11,7 @@ def __init__(self, node_id, latest_know_ip_address_and_port, flags, host, port = latest_know_ip_address_and_port.split(':') self.host = host self.port = int(port) + self.password = None self.flags = flags.split(',') self.master_id = None if master_id == '-' else master_id self.assigned_slots = [] @@ -52,7 +54,7 @@ def fail(self): def get_conn(self): if self._conn is None: - self._conn = Connection(self.host, self.port) + self._conn = Connection(self.host, self.port, self.password) return self._conn def close(self): diff --git a/redistrib/command.py b/redistrib/command.py index 9e39bc1..c75a928 100644 --- a/redistrib/command.py +++ b/redistrib/command.py @@ -80,11 +80,11 @@ def addslots(t, begin, end): addslots(t, begin, end) -def create(host_port_list, max_slots=1024): +def create(host_port_list, password=None, max_slots=1024): conns = [] try: for host, port in set(host_port_list): - t = Connection(host, port) + t = Connection(host, port, password) conns.append(t) _ensure_cluster_status_unset(t) logging.info('Instance at %s:%d checked', t.host, t.port) @@ -111,8 +111,8 @@ def create(host_port_list, max_slots=1024): t.close() -def start_cluster(host, port, max_slots=SLOT_COUNT): - with Connection(host, port) as t: +def start_cluster(host, port, password=None, max_slots=SLOT_COUNT): + with Connection(host, port, password) as t: _ensure_cluster_status_unset(t) _add_slots(t, 0, SLOT_COUNT, max_slots) _poll_check_status(t) @@ -120,8 +120,8 @@ def start_cluster(host, port, max_slots=SLOT_COUNT): host, port) -def start_cluster_on_multi(host_port_list, max_slots=SLOT_COUNT): - return create(host_port_list, max_slots) +def start_cluster_on_multi(host_port_list, max_slots=SLOT_COUNT, password=None): + return create(host_port_list, max_slots, password) def _migr_keys(src_conn, target_host, target_port, slot): @@ -204,16 +204,16 @@ def _join_to_cluster(clst, new): def join_cluster(cluster_host, cluster_port, newin_host, newin_port, - balancer=None, balance_plan=base_balance_plan): - with Connection(newin_host, newin_port) as t, \ - Connection(cluster_host, cluster_port) as cnode: + balancer=None, balance_plan=base_balance_plan, password=None): + with Connection(newin_host, newin_port, password) as t, \ + Connection(cluster_host, cluster_port, password) as cnode: _join_to_cluster(cnode, t) nodes = [] try: logging.info( 'Instance at %s:%d has joined %s:%d; now balancing slots', newin_host, newin_port, cluster_host, cluster_port) - nodes = _list_nodes(t, default_host=newin_host)[0] + nodes = _list_nodes(t, default_host=newin_host, password=password)[0] for src, dst, count in balance_plan(nodes, balancer): _migr_slots(src, dst, src.assigned_slots[:count], nodes) finally: @@ -221,9 +221,9 @@ def join_cluster(cluster_host, cluster_port, newin_host, newin_port, n.close() -def add_node(cluster_host, cluster_port, newin_host, newin_port): - with Connection(newin_host, newin_port) as t, \ - Connection(cluster_host, cluster_port) as c: +def add_node(cluster_host, cluster_port, newin_host, newin_port, password=None): + with Connection(newin_host, newin_port, password) as t, \ + Connection(cluster_host, cluster_port, password) as c: _join_to_cluster(c, t) @@ -253,13 +253,13 @@ def _check_master_and_migrate_slots(nodes, myself): _migr_slots(myself, node, myself.assigned_slots, nodes) -def del_node(host, port): +def del_node(host, port, password=None): myself = None nodes = [] - t = Connection(host, port) + t = Connection(host, port, password) try: _ensure_cluster_status_set(t) - nodes, myself = _list_nodes(t, filter_func=lambda n: not n.fail) + nodes, myself = _list_nodes(t, filter_func=lambda n: not n.fail, password=password) nodes.remove(myself) if myself.master: _check_master_and_migrate_slots(nodes, myself) @@ -284,8 +284,8 @@ def quit_cluster(host, port): return del_node(host, port) -def shutdown_cluster(host, port): - with Connection(host, port) as t: +def shutdown_cluster(host, port, password=None): + with Connection(host, port, password) as t: _ensure_cluster_status_set(t) myself = None m = t.send_raw(CMD_CLUSTER_NODES) @@ -302,11 +302,11 @@ def shutdown_cluster(host, port): logging.debug('Ask `cluster delslots` Rsp %s', m) -def fix_migrating(host, port): +def fix_migrating(host, port, password=None): nodes = dict() mig_srcs = [] mig_dsts = [] - t = Connection(host, port) + t = Connection(host, port, password) try: m = t.send_raw(CMD_CLUSTER_NODES) logging.debug('Ask `cluster nodes` Rsp %s', m) @@ -314,6 +314,7 @@ def fix_migrating(host, port): if not _valid_node_info(node_info): continue node = ClusterNode(*node_info.split(' ')) + node.password = password node.host = node.host or host nodes[node.node_id] = node @@ -356,16 +357,25 @@ def _check_slave(slave_host, slave_port, t): t.raise_('%s not switched to a slave' % slave_addr) -def replicate(master_host, master_port, slave_host, slave_port): - with Connection(slave_host, slave_port) as t, \ - Connection(master_host, master_port) as master_conn: +def replicate(master_host, master_port, slave_host, slave_port, password=None): + with Connection(slave_host, slave_port, password) as t, \ + Connection(master_host, master_port, password) as master_conn: _ensure_cluster_status_set(master_conn) - myself = _list_nodes(master_conn)[1] + myself = _list_nodes(master_conn, password=password)[1] myid = myself.node_id if myself.master else myself.master_id _join_to_cluster(master_conn, t) logging.info('Instance at %s:%d has joined %s:%d; now set replica', slave_host, slave_port, master_host, master_port) + if password: + m = t.execute('config', 'set', 'masterauth', password) + logging.debug('Set `masterauth` for slave Rsp %s', m) + if m.lower() != 'ok': + t.raise_('Unexpected reply after CONFIG SET MASTERAUTH for slave: %s' % m) + m = master_conn.execute('config', 'set', 'masterauth', password) + logging.debug('Set `masterauth` for master Rsp %s', m) + if m.lower() != 'ok': + t.raise_('Unexpected reply after CONFIG SET MASTERAUTH for master: %s' % m) m = t.execute('cluster', 'replicate', myid) logging.debug('Ask `cluster replicate` Rsp %s', m) @@ -384,7 +394,7 @@ def _filter_master(node): return node.master -def _list_nodes(conn, default_host=None, filter_func=lambda node: True): +def _list_nodes(conn, default_host=None, filter_func=lambda node: True, password=None): m = conn.send_raw(CMD_CLUSTER_NODES) logging.debug('Ask `cluster nodes` Rsp %s', m) default_host = default_host or conn.host @@ -395,6 +405,7 @@ def _list_nodes(conn, default_host=None, filter_func=lambda node: True): if not _valid_node_info(node_info): continue node = ClusterNode(*node_info.split(' ')) + node.password = password if 'myself' in node_info: myself = node if myself.host == '': @@ -404,26 +415,26 @@ def _list_nodes(conn, default_host=None, filter_func=lambda node: True): return nodes, myself -def _list_masters(conn, default_host=None): +def _list_masters(conn, default_host=None, password=None): return _list_nodes(conn, default_host or conn.host, - filter_func=_filter_master) + filter_func=_filter_master, password=password) -def list_nodes(host, port, default_host=None, filter_func=lambda node: True): - with Connection(host, port) as t: - return _list_nodes(t, default_host or host, filter_func) +def list_nodes(host, port, default_host=None, filter_func=lambda node: True, password=None): + with Connection(host, port, password) as t: + return _list_nodes(t, default_host or host, filter_func, password) -def list_masters(host, port, default_host=None): - with Connection(host, port) as t: - return _list_masters(t, default_host or host) +def list_masters(host, port, default_host=None, password=None): + with Connection(host, port, password) as t: + return _list_masters(t, default_host or host, password) -def migrate_slots(src_host, src_port, dst_host, dst_port, slots): +def migrate_slots(src_host, src_port, dst_host, dst_port, slots, password=None): if src_host == dst_host and src_port == dst_port: raise ValueError('Same node') - with Connection(src_host, src_port) as t: - nodes, myself = _list_masters(t, src_host) + with Connection(src_host, src_port, password) as t: + nodes, myself = _list_masters(t, src_host, password) slots = set(slots) logging.debug('Migrating %s', slots) @@ -435,18 +446,18 @@ def migrate_slots(src_host, src_port, dst_host, dst_port, slots): raise ValueError('Two nodes are not in the same cluster') -def rescue_cluster(host, port, subst_host, subst_port): +def rescue_cluster(host, port, subst_host, subst_port, password=None): failed_slots = set(xrange(SLOT_COUNT)) - with Connection(host, port) as t: + with Connection(host, port, password) as t: _ensure_cluster_status_set(t) - for node in _list_masters(t)[0]: + for node in _list_masters(t, password=password)[0]: if not node.fail: failed_slots -= set(node.assigned_slots) if len(failed_slots) == 0: logging.info('No need to rescue cluster at %s:%d', host, port) return - with Connection(subst_host, subst_port) as s: + with Connection(subst_host, subst_port, password) as s: _ensure_cluster_status_unset(s) m = s.execute('cluster', 'meet', host, port) @@ -466,14 +477,14 @@ def rescue_cluster(host, port, subst_host, subst_port): subst_host, subst_port, len(failed_slots)) -def execute(host, port, master_only, slave_only, commands): - with Connection(host, port) as c: +def execute(host, port, master_only, slave_only, commands, password=None): + with Connection(host, port, password) as c: filter_func = lambda n: True if master_only: filter_func = _filter_master elif slave_only: filter_func = lambda n: n.slave - nodes = _list_nodes(c, filter_func=filter_func)[0] + nodes = _list_nodes(c, filter_func=filter_func, password=password)[0] result = [] for n in nodes: diff --git a/redistrib/connection.py b/redistrib/connection.py index 602d087..c23acbd 100644 --- a/redistrib/connection.py +++ b/redistrib/connection.py @@ -3,7 +3,7 @@ import hiredis from functools import wraps -from exceptions import RedisStatusError, RedisIOError +from redistrib.exceptions import RedisIOError, RedisStatusError SYM_STAR = '*' SYM_DOLLAR = '$' @@ -61,13 +61,15 @@ def g(conn, *args, **kwargs): return f(conn, *args, **kwargs) except IOError as e: raise RedisIOError(e, conn.host, conn.port) + return g class Connection(object): - def __init__(self, host, port, timeout=5): + def __init__(self, host, port, password=None, timeout=5): self.host = host self.port = port + self.password = password self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.reader = hiredis.Reader() self.last_raw_message = '' @@ -79,6 +81,10 @@ def __init__(self, host, port, timeout=5): @_wrap_sock_op def _conn(self): self.sock.connect((self.host, self.port)) + if self.password: + # auth with pass + self.execute('auth', self.password) + logging.debug('Connected to %s:%d with password', self.host, self.port) @_wrap_sock_op def _recv(self): diff --git a/redistrib/console.py b/redistrib/console.py index 607c1f3..8108458 100644 --- a/redistrib/console.py +++ b/redistrib/console.py @@ -2,7 +2,7 @@ import click import command -from . import __version__ +from redistrib import __version__ def _parse_host_port(addr): @@ -20,47 +20,53 @@ def cli(): @click.option( '--max-slots', type=int, default=1024, help='maxium number of slots in a single CLUSTER ADDSLOTS command') +@click.option('--password', help='Password for all nodes the cluster') @click.argument('addrs', nargs=-1, required=True) -def create(addrs, max_slots=1024): - command.create([_parse_host_port(hp) for hp in addrs], max_slots) +def create(addrs, password=None, max_slots=1024): + command.create([_parse_host_port(hp) for hp in addrs], password, max_slots) @cli.command(help='Add a new Redis node to a cluster') @click.option('--existing-addr', required=True, help='Address of any node in the cluster') @click.option('--new-addr', required=True, help='Address of the new node') -def add_node(existing_addr, new_addr): +@click.option('--password', help='Password for all nodes the cluster') +def add_node(existing_addr, new_addr, password=None): cluster_host, cluster_port = _parse_host_port(existing_addr) newin_host, newin_port = _parse_host_port(new_addr) - command.add_node(cluster_host, cluster_port, newin_host, newin_port) + command.add_node(cluster_host, cluster_port, newin_host, newin_port, password) @cli.command(help='Add a slave node to a master') @click.option('--master-addr', required=True, help='Address of the master') @click.option('--slave-addr', required=True, help='Address of the slave') -def replicate(master_addr, slave_addr): +@click.option('--password', help='Password for all nodes the cluster') +def replicate(master_addr, slave_addr, password=None): master_host, master_port = _parse_host_port(master_addr) slave_host, slave_port = _parse_host_port(slave_addr) - command.replicate(master_host, master_port, slave_host, slave_port) + command.replicate(master_host, master_port, slave_host, slave_port, password) @cli.command(help='Remove a Redis node from a cluster') @click.option('--addr', required=True, help='Address of the node') -def del_node(addr): - command.del_node(*_parse_host_port(addr)) +@click.option('--password', help='Password for all nodes the cluster') +def del_node(addr, password=None): + command.del_node(*_parse_host_port(addr), password=password) @cli.command(help='Shutdown a cluster. The cluster should not contain more' ' than one Redis node and there is no key in that Redis') @click.option('--addr', required=True, help='Address of the node') -def shutdown(addr): - command.shutdown_cluster(*_parse_host_port(addr)) +@click.option('--password', help='Password for all nodes the cluster') +def shutdown(addr, password=None): + command.shutdown_cluster(*_parse_host_port(addr), password=password) @cli.command(help='Fix migrating status') @click.option('--addr', required=True, help='Address of the node') -def fix(addr): - command.fix_migrating(*_parse_host_port(addr)) +@click.option('--password', help='Password for all nodes the cluster') +def fix(addr, password=None): + command.fix_migrating(*_parse_host_port(addr), password=password) @cli.command(help='Add a Redis node to a broken cluster to undertake missing' @@ -68,9 +74,10 @@ def fix(addr): @click.option('--existing-addr', required=True, help='Address of any node in the cluster') @click.option('--new-addr', required=True, help='Address of the new node') -def rescue(existing_addr, new_addr): +@click.option('--password', help='Password for all nodes the cluster') +def rescue(existing_addr, new_addr, password=None): host, port = _parse_host_port(existing_addr) - command.rescue_cluster(host, port, *_parse_host_port(new_addr)) + command.rescue_cluster(host, port, *_parse_host_port(new_addr), password=password) @cli.command(help='Migrate slots from one Redis node to another') @@ -79,7 +86,8 @@ def rescue(existing_addr, new_addr): @click.option('--dst-addr', required=True, help='Address of the migrating destination') @click.argument('slots_ranges', nargs=-1, required=True) -def migrate(src_addr, dst_addr, slots_ranges): +@click.option('--password', help='Password for all nodes the cluster') +def migrate(src_addr, dst_addr, slots_ranges, password=None): src_host, src_port = _parse_host_port(src_addr) dst_host, dst_port = _parse_host_port(dst_addr) @@ -91,7 +99,7 @@ def migrate(src_addr, dst_addr, slots_ranges): else: slots.append(int(rg)) - command.migrate_slots(src_host, src_port, dst_host, dst_port, slots) + command.migrate_slots(src_host, src_port, dst_host, dst_port, slots, password) def _format_master(node): @@ -109,10 +117,11 @@ def _format_slave(node, master): @cli.command(help='List Redis nodes in a cluster') @click.option('--addr', required=True, help='Address of any node in the cluster') -def list(addr): +@click.option('--password', help='Password for all nodes the cluster') +def list(addr, password=None): host, port = _parse_host_port(addr) id_map = {} - nodes = sorted(command.list_nodes(host, port)[0], key=lambda n: n.addr()) + nodes = sorted(command.list_nodes(host, port, password=password)[0], key=lambda n: n.addr()) master_count = 0 fail_count = 0 for node in nodes: @@ -141,10 +150,11 @@ def list(addr): @click.option('--slave-only', is_flag=True, help='Only send to slaves') @click.option('--addr', required=True, help='Address of any node in the cluster') +@click.option('--password', help='Password for all nodes the cluster') @click.argument('commands', nargs=-1, required=True) -def execute(master_only, slave_only, addr, commands): +def execute(master_only, slave_only, addr, commands, password=None): host, port = _parse_host_port(addr) - for r in command.execute(host, port, master_only, slave_only, commands): + for r in command.execute(host, port, master_only, slave_only, commands, password): if r['result'] is None: print('%s -%s' % (r['node'].addr(), r['exception'])) else: @@ -153,6 +163,11 @@ def execute(master_only, slave_only, addr, commands): def main(): logging.basicConfig(level=logging.INFO) - click.echo('Redis-trib %s Copyright (c) HunanTV Platform developers' % + click.echo('Redis-trib 0.5.0 Copyright (c) HunanTV Platform developers') + click.echo('Redis-trib %s by Negash' % __version__) cli() + + +if __name__ == "__main__": + main()