Skip to content

add password option #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dump.rdb

# Distribution / packaging
.Python
.idea/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use an IDE that leaves something in the project directory, please add it to your gitignore_global.

env/
bin/
build/
Expand Down
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM python:2
WORKDIR /dist
RUN pip install hiredis retrying Werkzeug click

ADD ./ /dist/
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ See also https://github.com/antirez/redis/blob/3.0/src/redis-trib.rb
Create a cluster in some redis nodes (the nodes are cluster enabled, and none of them in any cluster)

redis-trib.py create NODE_HOST_a:PORT_a NODE_HOST_b:PORT_b ...

Use with password

redis-trib.py create --password MyPassword NODE_HOST_a:PORT_a NODE_HOST_b:PORT_b ...

Add another node to a cluster, but neither set as slave nor migrating slots to it

Expand Down Expand Up @@ -166,3 +170,8 @@ Output:
* `master_id`: master's `node_id` if it's a slave, or `None` otherwise
* `assigned_slots`: a list of assigned slots if it's a master; it won't contain slots being migrated
* `slots_migrating`: boolean value for whether there are any slot(s) migrating or importing on this node

For sett password for all nodes use:

r = redistrib.clusternode.ClusterNode(...)
r.password = MyPassword
4 changes: 2 additions & 2 deletions redistrib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__version__ = '0.5.0'
REPO = 'https://github.com/HunanTV/redis-trib.py'
__version__ = '0.0.1'
REPO = 'https://github.com/Negashev/redis-trib.py'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will appreciate if you could take care of this project but could you please use a higher version number?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

4 changes: 3 additions & 1 deletion redistrib/clusternode.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from connection import Connection


class ClusterNode(object):
def __init__(self, node_id, latest_know_ip_address_and_port, flags,
master_id, last_ping_sent_time, last_pong_received_time,
Expand All @@ -10,6 +11,7 @@ def __init__(self, node_id, latest_know_ip_address_and_port, flags,
host, port = latest_know_ip_address_and_port.split(':')
self.host = host
self.port = int(port)
self.password = None
self.flags = flags.split(',')
self.master_id = None if master_id == '-' else master_id
self.assigned_slots = []
Expand Down Expand Up @@ -52,7 +54,7 @@ def fail(self):

def get_conn(self):
if self._conn is None:
self._conn = Connection(self.host, self.port)
self._conn = Connection(self.host, self.port, self.password)
return self._conn

def close(self):
Expand Down
97 changes: 54 additions & 43 deletions redistrib/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ def addslots(t, begin, end):
addslots(t, begin, end)


def create(host_port_list, max_slots=1024):
def create(host_port_list, password=None, max_slots=1024):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new optional parameter should be append after existing parameters for API compatible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to fix it

conns = []
try:
for host, port in set(host_port_list):
t = Connection(host, port)
t = Connection(host, port, password)
conns.append(t)
_ensure_cluster_status_unset(t)
logging.info('Instance at %s:%d checked', t.host, t.port)
Expand All @@ -111,17 +111,17 @@ def create(host_port_list, max_slots=1024):
t.close()


def start_cluster(host, port, max_slots=SLOT_COUNT):
with Connection(host, port) as t:
def start_cluster(host, port, password=None, max_slots=SLOT_COUNT):
with Connection(host, port, password) as t:
_ensure_cluster_status_unset(t)
_add_slots(t, 0, SLOT_COUNT, max_slots)
_poll_check_status(t)
logging.info('Instance at %s:%d started as a standalone cluster',
host, port)


def start_cluster_on_multi(host_port_list, max_slots=SLOT_COUNT):
return create(host_port_list, max_slots)
def start_cluster_on_multi(host_port_list, max_slots=SLOT_COUNT, password=None):
return create(host_port_list, max_slots, password)


def _migr_keys(src_conn, target_host, target_port, slot):
Expand Down Expand Up @@ -204,26 +204,26 @@ def _join_to_cluster(clst, new):


def join_cluster(cluster_host, cluster_port, newin_host, newin_port,
balancer=None, balance_plan=base_balance_plan):
with Connection(newin_host, newin_port) as t, \
Connection(cluster_host, cluster_port) as cnode:
balancer=None, balance_plan=base_balance_plan, password=None):
with Connection(newin_host, newin_port, password) as t, \
Connection(cluster_host, cluster_port, password) as cnode:
_join_to_cluster(cnode, t)
nodes = []
try:
logging.info(
'Instance at %s:%d has joined %s:%d; now balancing slots',
newin_host, newin_port, cluster_host, cluster_port)
nodes = _list_nodes(t, default_host=newin_host)[0]
nodes = _list_nodes(t, default_host=newin_host, password=password)[0]
for src, dst, count in balance_plan(nodes, balancer):
_migr_slots(src, dst, src.assigned_slots[:count], nodes)
finally:
for n in nodes:
n.close()


def add_node(cluster_host, cluster_port, newin_host, newin_port):
with Connection(newin_host, newin_port) as t, \
Connection(cluster_host, cluster_port) as c:
def add_node(cluster_host, cluster_port, newin_host, newin_port, password=None):
with Connection(newin_host, newin_port, password) as t, \
Connection(cluster_host, cluster_port, password) as c:
_join_to_cluster(c, t)


Expand Down Expand Up @@ -253,13 +253,13 @@ def _check_master_and_migrate_slots(nodes, myself):
_migr_slots(myself, node, myself.assigned_slots, nodes)


def del_node(host, port):
def del_node(host, port, password=None):
myself = None
nodes = []
t = Connection(host, port)
t = Connection(host, port, password)
try:
_ensure_cluster_status_set(t)
nodes, myself = _list_nodes(t, filter_func=lambda n: not n.fail)
nodes, myself = _list_nodes(t, filter_func=lambda n: not n.fail, password=password)
nodes.remove(myself)
if myself.master:
_check_master_and_migrate_slots(nodes, myself)
Expand All @@ -284,8 +284,8 @@ def quit_cluster(host, port):
return del_node(host, port)


def shutdown_cluster(host, port):
with Connection(host, port) as t:
def shutdown_cluster(host, port, password=None):
with Connection(host, port, password) as t:
_ensure_cluster_status_set(t)
myself = None
m = t.send_raw(CMD_CLUSTER_NODES)
Expand All @@ -302,18 +302,19 @@ def shutdown_cluster(host, port):
logging.debug('Ask `cluster delslots` Rsp %s', m)


def fix_migrating(host, port):
def fix_migrating(host, port, password=None):
nodes = dict()
mig_srcs = []
mig_dsts = []
t = Connection(host, port)
t = Connection(host, port, password)
try:
m = t.send_raw(CMD_CLUSTER_NODES)
logging.debug('Ask `cluster nodes` Rsp %s', m)
for node_info in m.split('\n'):
if not _valid_node_info(node_info):
continue
node = ClusterNode(*node_info.split(' '))
node.password = password
node.host = node.host or host
nodes[node.node_id] = node

Expand Down Expand Up @@ -356,16 +357,25 @@ def _check_slave(slave_host, slave_port, t):
t.raise_('%s not switched to a slave' % slave_addr)


def replicate(master_host, master_port, slave_host, slave_port):
with Connection(slave_host, slave_port) as t, \
Connection(master_host, master_port) as master_conn:
def replicate(master_host, master_port, slave_host, slave_port, password=None):
with Connection(slave_host, slave_port, password) as t, \
Connection(master_host, master_port, password) as master_conn:
_ensure_cluster_status_set(master_conn)
myself = _list_nodes(master_conn)[1]
myself = _list_nodes(master_conn, password=password)[1]
myid = myself.node_id if myself.master else myself.master_id

_join_to_cluster(master_conn, t)
logging.info('Instance at %s:%d has joined %s:%d; now set replica',
slave_host, slave_port, master_host, master_port)
if password:
m = t.execute('config', 'set', 'masterauth', password)
logging.debug('Set `masterauth` for slave Rsp %s', m)
if m.lower() != 'ok':
t.raise_('Unexpected reply after CONFIG SET MASTERAUTH for slave: %s' % m)
m = master_conn.execute('config', 'set', 'masterauth', password)
logging.debug('Set `masterauth` for master Rsp %s', m)
if m.lower() != 'ok':
t.raise_('Unexpected reply after CONFIG SET MASTERAUTH for master: %s' % m)

m = t.execute('cluster', 'replicate', myid)
logging.debug('Ask `cluster replicate` Rsp %s', m)
Expand All @@ -384,7 +394,7 @@ def _filter_master(node):
return node.master


def _list_nodes(conn, default_host=None, filter_func=lambda node: True):
def _list_nodes(conn, default_host=None, filter_func=lambda node: True, password=None):
m = conn.send_raw(CMD_CLUSTER_NODES)
logging.debug('Ask `cluster nodes` Rsp %s', m)
default_host = default_host or conn.host
Expand All @@ -395,6 +405,7 @@ def _list_nodes(conn, default_host=None, filter_func=lambda node: True):
if not _valid_node_info(node_info):
continue
node = ClusterNode(*node_info.split(' '))
node.password = password
if 'myself' in node_info:
myself = node
if myself.host == '':
Expand All @@ -404,26 +415,26 @@ def _list_nodes(conn, default_host=None, filter_func=lambda node: True):
return nodes, myself


def _list_masters(conn, default_host=None):
def _list_masters(conn, default_host=None, password=None):
return _list_nodes(conn, default_host or conn.host,
filter_func=_filter_master)
filter_func=_filter_master, password=password)


def list_nodes(host, port, default_host=None, filter_func=lambda node: True):
with Connection(host, port) as t:
return _list_nodes(t, default_host or host, filter_func)
def list_nodes(host, port, default_host=None, filter_func=lambda node: True, password=None):
with Connection(host, port, password) as t:
return _list_nodes(t, default_host or host, filter_func, password)


def list_masters(host, port, default_host=None):
with Connection(host, port) as t:
return _list_masters(t, default_host or host)
def list_masters(host, port, default_host=None, password=None):
with Connection(host, port, password) as t:
return _list_masters(t, default_host or host, password)


def migrate_slots(src_host, src_port, dst_host, dst_port, slots):
def migrate_slots(src_host, src_port, dst_host, dst_port, slots, password=None):
if src_host == dst_host and src_port == dst_port:
raise ValueError('Same node')
with Connection(src_host, src_port) as t:
nodes, myself = _list_masters(t, src_host)
with Connection(src_host, src_port, password) as t:
nodes, myself = _list_masters(t, src_host, password)

slots = set(slots)
logging.debug('Migrating %s', slots)
Expand All @@ -435,18 +446,18 @@ def migrate_slots(src_host, src_port, dst_host, dst_port, slots):
raise ValueError('Two nodes are not in the same cluster')


def rescue_cluster(host, port, subst_host, subst_port):
def rescue_cluster(host, port, subst_host, subst_port, password=None):
failed_slots = set(xrange(SLOT_COUNT))
with Connection(host, port) as t:
with Connection(host, port, password) as t:
_ensure_cluster_status_set(t)
for node in _list_masters(t)[0]:
for node in _list_masters(t, password=password)[0]:
if not node.fail:
failed_slots -= set(node.assigned_slots)
if len(failed_slots) == 0:
logging.info('No need to rescue cluster at %s:%d', host, port)
return

with Connection(subst_host, subst_port) as s:
with Connection(subst_host, subst_port, password) as s:
_ensure_cluster_status_unset(s)

m = s.execute('cluster', 'meet', host, port)
Expand All @@ -466,14 +477,14 @@ def rescue_cluster(host, port, subst_host, subst_port):
subst_host, subst_port, len(failed_slots))


def execute(host, port, master_only, slave_only, commands):
with Connection(host, port) as c:
def execute(host, port, master_only, slave_only, commands, password=None):
with Connection(host, port, password) as c:
filter_func = lambda n: True
if master_only:
filter_func = _filter_master
elif slave_only:
filter_func = lambda n: n.slave
nodes = _list_nodes(c, filter_func=filter_func)[0]
nodes = _list_nodes(c, filter_func=filter_func, password=password)[0]

result = []
for n in nodes:
Expand Down
10 changes: 8 additions & 2 deletions redistrib/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import hiredis
from functools import wraps

from exceptions import RedisStatusError, RedisIOError
from redistrib.exceptions import RedisIOError, RedisStatusError

SYM_STAR = '*'
SYM_DOLLAR = '$'
Expand Down Expand Up @@ -61,13 +61,15 @@ def g(conn, *args, **kwargs):
return f(conn, *args, **kwargs)
except IOError as e:
raise RedisIOError(e, conn.host, conn.port)

return g


class Connection(object):
def __init__(self, host, port, timeout=5):
def __init__(self, host, port, password=None, timeout=5):
self.host = host
self.port = port
self.password = password
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.reader = hiredis.Reader()
self.last_raw_message = ''
Expand All @@ -79,6 +81,10 @@ def __init__(self, host, port, timeout=5):
@_wrap_sock_op
def _conn(self):
self.sock.connect((self.host, self.port))
if self.password:
# auth with pass
self.execute('auth', self.password)
logging.debug('Connected to %s:%d with password', self.host, self.port)

@_wrap_sock_op
def _recv(self):
Expand Down
Loading