From 0da47ecad9aad6cf356ce7a2e4599d656c701e40 Mon Sep 17 00:00:00 2001 From: vladimir Date: Thu, 24 Mar 2016 19:21:05 +1000 Subject: [PATCH 1/2] ClusterCache --- ClusterCache.php | 133 +++++++++++++++++++++++++++++++++++++++++++++++ Connection.php | 20 +++---- README.md | 43 +++++++++++++++ 3 files changed, 187 insertions(+), 9 deletions(-) create mode 100644 ClusterCache.php diff --git a/ClusterCache.php b/ClusterCache.php new file mode 100644 index 0000000..e5be1ca --- /dev/null +++ b/ClusterCache.php @@ -0,0 +1,133 @@ + + */ +class ClusterCache extends \yii\redis\Cache +{ + /** + * Get a hash group string identifier for specified key. + * + * @param string[]|string $key + * @return string + */ + protected static function getHashGroup($key) { + $key = is_array($key) ? $key['key'] : $key; + // We need to keep key if it doesn't have the hash prefix + $group = $key; + + if (1 === preg_match('~^\{(?P.+)\}~Uu', $key, $preg)) { + $group = $preg['group']; + } + + /** + * Prefix needed because some hash keys may contains only numbers. + * Thus, such hash keys can rewrite plain keys. + */ + return 'gr:' . $group; + } + + /** + * @inheritdoc + * + * Multi keys operations in Redis Cluster only possible while all query hash keys stored in one cluster node. + * Thus we need to divide query to several queries by hash key and run them separately. + */ + protected function getValues($keys) + { + $result = []; + + // Divide keys to groups by hash prefix. + $commands = []; + foreach ($keys as $key) { + $commands[static::getHashGroup($key)][] = $key; + } + + // Execute commands + foreach ($commands as $commandKeys) { + $commandKeys = is_array($commandKeys) ? $commandKeys : [$commandKeys]; + $response = $this->redis->executeCommand('MGET', $commandKeys); + + $i = 0; + foreach ($commandKeys as $commandkey) { + $result[$commandkey] = $response[$i++]; + } + } + + return $result; + } + + /** + * @inheritdoc + * + * Multi keys operations in Redis Cluster only possible while all query hash keys stored in one cluster node. + * Thus we need to divide query to several queries by hash key and run them separately. + */ + protected function setValues($data, $expire) + { + $expire = (int) ($expire * 1000); + + $tempData = []; + foreach ($data as $key => $value) { + $tempItem['key'] = $key; + $tempItem['value'] = $value; + $tempData[] = $tempItem; + } + unset($data, $tempItem); + + // Divide keys to groups by hash prefix. + $commands = []; + foreach ($tempData as $tempKey) { + $commands[static::getHashGroup($tempKey)][] = $tempKey; + } + // Execute commands + $failedKeys = []; + foreach ($commands as $commandKeys) { + $args = []; + foreach ($commandKeys as $commandKey) { + $args[] = $commandKey['key']; + $args[] = $commandKey['value']; + } + + if ($expire == 0) { + $this->redis->executeCommand('MSET', $args); + } else { + // Cluster doesn't support transactions - there is no "MULTI" + $this->redis->executeCommand('MSET', $args); + foreach ($commandKeys as $expireKey) { + if ('1' !== $this->redis->executeCommand('PEXPIRE', [$expireKey['key'], $expire])) { + $failedKeys[] = $expireKey['key']; + } + } + } + + unset($args); + } + + return $failedKeys; + } + + /** + * @inheritdoc + * + * FLUSHDB and FLUSHALL command works only for specified connected cluster node. + * Thus, we need to execute it on each node. + */ + protected function flushValues() + { + foreach ($this->redis->master as $node) { + $this->redis->open($node); + if (true !== $this->redis->executeCommand('FLUSHALL')) { + throw new Exception('Can\'t flush values in node: ' . $node); + } + $this->redis->close(); + } + + return true; + } +} diff --git a/Connection.php b/Connection.php index 22a4d51..28ad979 100644 --- a/Connection.php +++ b/Connection.php @@ -80,13 +80,17 @@ public function executeCommand($name, $params = [], $hostname = '') \Yii::trace("Executing Redis Command: {$name}", __METHOD__); fwrite($this->_socket, $command); - return $this->parseResponse(implode(' ', $params), $this->_socket); + /** + * There is a bug with "implode(' ', $params)". If not standart serializer is used (e.g. igbinary) - key value may contains spaces and "\n". + * Therefore we need to pass original parameters array. + */ + return $this->parseResponse($params, $this->_socket); } private function parseResponse($command, $socket) { if (($line = fgets($socket)) === false) { - throw new Exception("Failed to read from socket.\nRedis command was: " . $command); + throw new Exception("Failed to read from socket.\nRedis command was: " . implode(' ', $command)); } $type = $line[0]; @@ -106,13 +110,11 @@ private function parseResponse($command, $socket) $hostname = $moved[2]; - $name = explode(' ', $command)[0]; - $param = array_slice(explode(' ', $command), 1); - - return $this->executeCommand($name, $param, $hostname); + // Some tricky with array_shift - no need to create an extra variables + return $this->executeCommand(array_shift($command), $command, $hostname); } else { - throw new Exception("Redis error: " . $line . "\nRedis command was: " . $command); + throw new Exception("Redis error: " . $line . "\nRedis command was: " . implode(' ', $command)); } case ':': // Integer reply @@ -126,7 +128,7 @@ private function parseResponse($command, $socket) $data = ''; while ($length > 0) { if (($block = fread($socket, $length)) === false) { - throw new Exception("Failed to read from socket.\nRedis command was: " . $command); + throw new Exception("Failed to read from socket.\nRedis command was: " . implode(' ', $command)); } $data .= $block; $length -= mb_strlen($block, '8bit'); @@ -140,7 +142,7 @@ private function parseResponse($command, $socket) } return $data; default: - throw new Exception('Received illegal data from redis: ' . $line . "\nRedis command was: " . $command); + throw new Exception('Received illegal data from redis: ' . $line . "\nRedis command was: " . implode(' ', $command)); } } diff --git a/README.md b/README.md index 126cace..9081612 100644 --- a/README.md +++ b/README.md @@ -62,3 +62,46 @@ return [ ] ]; ``` + +Cluster Cache Configuration +--------------------------- + +By default Redis Cluster doesn't support MGET/MSET queries from several cluster nodes like this: + +``` +MGET {user10}.name {user10}.email {user10}.pass {user20}.name {user20}.email {user20}.pass someNotHashedKey +``` + +To implement this feature you can use ClusterCache: + +```php +return [ + //.... + 'components' => [ + 'class' => 'heyanlong\redis\ClusterCache', + 'redis' => [ + 'class' => 'heyanlong\redis\Connection', + 'master' => [ + '10.155.20.169:6379', + '10.155.20.167:6391', + '10.155.20.168:6379', + '10.155.20.167:6380', +// 'localhost:6379', + ], + 'database' => 0, + ], + + + ] +]; +``` + +ClusterCache will group keys by hash, and divide one query to several queries: + +``` +MGET {user10}.name {user10}.email {user10}.pass +MGET {user20}.name {user20}.email {user20}.pass +MGET someNotHashedKey +``` + +...execute them, and then merge results. \ No newline at end of file From ecdfbf1428efabe8869e86cc182479385db80d08 Mon Sep 17 00:00:00 2001 From: vladimir Date: Fri, 25 Mar 2016 12:51:37 +1000 Subject: [PATCH 2/2] prevent useless reconnects --- Connection.php | 49 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/Connection.php b/Connection.php index 28ad979..3b99581 100644 --- a/Connection.php +++ b/Connection.php @@ -15,10 +15,18 @@ class Connection extends \yii\redis\Connection const EVENT_AFTER_OPEN = 'afterOpen'; + /** @var string[] $master Parameters for connections */ public $master = []; + /** @var resource $_socket Current connection */ private $_socket; + /** @var resource[] $_sockets Store all connections, to prevent recconects */ + private $_sockets = []; + + /** + * @inheritdoc + */ public function __sleep() { $this->close(); @@ -26,21 +34,35 @@ public function __sleep() return array_keys(get_object_vars($this)); } + /** + * @inheritdoc + */ public function getIsActive() { return $this->_socket !== null; } + /** + * @inheritdoc + * + * Store all opened connections, to prevent recconects + */ public function open($hostname = '') { - if ($this->getIsActive() && $hostname == '') { - return; - } - - $hostname = $hostname == '' ? $this->master[array_rand($this->master)] : $hostname; - $this->_socket = $this->connect($hostname); + if (!isset($this->_sockets[$hostname])) { + if ($this->getIsActive() && $hostname == '') { + return; + } + + $hostname = $hostname == '' ? $this->master[array_rand($this->master)] : $hostname; + $this->_sockets[$hostname] = $this->connect($hostname); + } + $this->_socket = &$this->_sockets[$hostname]; } + /** + * @inheritdoc + */ public function close() { if ($this->_socket !== null) { @@ -50,11 +72,17 @@ public function close() } } + /** + * @inheritdoc + */ protected function initConnection() { $this->trigger(self::EVENT_AFTER_OPEN); } + /** + * @inheritdoc + */ public function __call($name, $params) { $redisCommand = strtoupper(Inflector::camel2words($name, false)); @@ -67,6 +95,9 @@ public function __call($name, $params) } } + /** + * @inheritdoc + */ public function executeCommand($name, $params = [], $hostname = '') { $this->open($hostname); @@ -87,6 +118,9 @@ public function executeCommand($name, $params = [], $hostname = '') return $this->parseResponse($params, $this->_socket); } + /** + * @inheritdoc + */ private function parseResponse($command, $socket) { if (($line = fgets($socket)) === false) { @@ -146,6 +180,9 @@ private function parseResponse($command, $socket) } } + /** + * @inheritdoc + */ private function connect($node) { $socket = null;