Skip to content

ClusterCache #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions ClusterCache.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
<?php

namespace heyanlong\redis;

/**
* Redis Cluster Cache implementation.
* For clustered MGET/MSET usage.
*
* @author arkham.vm <[email protected]>
*/
class ClusterCache extends \yii\redis\Cache
{
/**
* Get a hash group string identifier for specified key.
*
* @param string[]|string $key
* @return string
*/
protected static function getHashGroup($key) {
$key = is_array($key) ? $key['key'] : $key;
// We need to keep key if it doesn't have the hash prefix
$group = $key;

if (1 === preg_match('~^\{(?P<group>.+)\}~Uu', $key, $preg)) {
$group = $preg['group'];
}

/**
* Prefix needed because some hash keys may contains only numbers.
* Thus, such hash keys can rewrite plain keys.
*/
return 'gr:' . $group;
}

/**
* @inheritdoc
*
* Multi keys operations in Redis Cluster only possible while all query hash keys stored in one cluster node.
* Thus we need to divide query to several queries by hash key and run them separately.
*/
protected function getValues($keys)
{
$result = [];

// Divide keys to groups by hash prefix.
$commands = [];
foreach ($keys as $key) {
$commands[static::getHashGroup($key)][] = $key;
}

// Execute commands
foreach ($commands as $commandKeys) {
$commandKeys = is_array($commandKeys) ? $commandKeys : [$commandKeys];
$response = $this->redis->executeCommand('MGET', $commandKeys);

$i = 0;
foreach ($commandKeys as $commandkey) {
$result[$commandkey] = $response[$i++];
}
}

return $result;
}

/**
* @inheritdoc
*
* Multi keys operations in Redis Cluster only possible while all query hash keys stored in one cluster node.
* Thus we need to divide query to several queries by hash key and run them separately.
*/
protected function setValues($data, $expire)
{
$expire = (int) ($expire * 1000);

$tempData = [];
foreach ($data as $key => $value) {
$tempItem['key'] = $key;
$tempItem['value'] = $value;
$tempData[] = $tempItem;
}
unset($data, $tempItem);

// Divide keys to groups by hash prefix.
$commands = [];
foreach ($tempData as $tempKey) {
$commands[static::getHashGroup($tempKey)][] = $tempKey;
}
// Execute commands
$failedKeys = [];
foreach ($commands as $commandKeys) {
$args = [];
foreach ($commandKeys as $commandKey) {
$args[] = $commandKey['key'];
$args[] = $commandKey['value'];
}

if ($expire == 0) {
$this->redis->executeCommand('MSET', $args);
} else {
// Cluster doesn't support transactions - there is no "MULTI"
$this->redis->executeCommand('MSET', $args);
foreach ($commandKeys as $expireKey) {
if ('1' !== $this->redis->executeCommand('PEXPIRE', [$expireKey['key'], $expire])) {
$failedKeys[] = $expireKey['key'];
}
}
}

unset($args);
}

return $failedKeys;
}

/**
* @inheritdoc
*
* FLUSHDB and FLUSHALL command works only for specified connected cluster node.
* Thus, we need to execute it on each node.
*/
protected function flushValues()
{
foreach ($this->redis->master as $node) {
$this->redis->open($node);
if (true !== $this->redis->executeCommand('FLUSHALL')) {
throw new Exception('Can\'t flush values in node: ' . $node);
}
$this->redis->close();
}

return true;
}
}
69 changes: 54 additions & 15 deletions Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,54 @@ class Connection extends \yii\redis\Connection

const EVENT_AFTER_OPEN = 'afterOpen';

/** @var string[] $master Parameters for connections */
public $master = [];

/** @var resource $_socket Current connection */
private $_socket;

/** @var resource[] $_sockets Store all connections, to prevent recconects */
private $_sockets = [];

/**
* @inheritdoc
*/
public function __sleep()
{
$this->close();

return array_keys(get_object_vars($this));
}

/**
* @inheritdoc
*/
public function getIsActive()
{
return $this->_socket !== null;
}

/**
* @inheritdoc
*
* Store all opened connections, to prevent recconects
*/
public function open($hostname = '')
{
if ($this->getIsActive() && $hostname == '') {
return;
}

$hostname = $hostname == '' ? $this->master[array_rand($this->master)] : $hostname;
$this->_socket = $this->connect($hostname);
if (!isset($this->_sockets[$hostname])) {
if ($this->getIsActive() && $hostname == '') {
return;
}

$hostname = $hostname == '' ? $this->master[array_rand($this->master)] : $hostname;
$this->_sockets[$hostname] = $this->connect($hostname);
}
$this->_socket = &$this->_sockets[$hostname];
}

/**
* @inheritdoc
*/
public function close()
{
if ($this->_socket !== null) {
Expand All @@ -50,11 +72,17 @@ public function close()
}
}

/**
* @inheritdoc
*/
protected function initConnection()
{
$this->trigger(self::EVENT_AFTER_OPEN);
}

/**
* @inheritdoc
*/
public function __call($name, $params)
{
$redisCommand = strtoupper(Inflector::camel2words($name, false));
Expand All @@ -67,6 +95,9 @@ public function __call($name, $params)
}
}

/**
* @inheritdoc
*/
public function executeCommand($name, $params = [], $hostname = '')
{
$this->open($hostname);
Expand All @@ -80,13 +111,20 @@ public function executeCommand($name, $params = [], $hostname = '')
\Yii::trace("Executing Redis Command: {$name}", __METHOD__);
fwrite($this->_socket, $command);

return $this->parseResponse(implode(' ', $params), $this->_socket);
/**
* There is a bug with "implode(' ', $params)". If not standart serializer is used (e.g. igbinary) - key value may contains spaces and "\n".
* Therefore we need to pass original parameters array.
*/
return $this->parseResponse($params, $this->_socket);
}

/**
* @inheritdoc
*/
private function parseResponse($command, $socket)
{
if (($line = fgets($socket)) === false) {
throw new Exception("Failed to read from socket.\nRedis command was: " . $command);
throw new Exception("Failed to read from socket.\nRedis command was: " . implode(' ', $command));
}

$type = $line[0];
Expand All @@ -106,13 +144,11 @@ private function parseResponse($command, $socket)

$hostname = $moved[2];

$name = explode(' ', $command)[0];
$param = array_slice(explode(' ', $command), 1);

return $this->executeCommand($name, $param, $hostname);
// Some tricky with array_shift - no need to create an extra variables
return $this->executeCommand(array_shift($command), $command, $hostname);

} else {
throw new Exception("Redis error: " . $line . "\nRedis command was: " . $command);
throw new Exception("Redis error: " . $line . "\nRedis command was: " . implode(' ', $command));
}

case ':': // Integer reply
Expand All @@ -126,7 +162,7 @@ private function parseResponse($command, $socket)
$data = '';
while ($length > 0) {
if (($block = fread($socket, $length)) === false) {
throw new Exception("Failed to read from socket.\nRedis command was: " . $command);
throw new Exception("Failed to read from socket.\nRedis command was: " . implode(' ', $command));
}
$data .= $block;
$length -= mb_strlen($block, '8bit');
Expand All @@ -140,10 +176,13 @@ private function parseResponse($command, $socket)
}
return $data;
default:
throw new Exception('Received illegal data from redis: ' . $line . "\nRedis command was: " . $command);
throw new Exception('Received illegal data from redis: ' . $line . "\nRedis command was: " . implode(' ', $command));
}
}

/**
* @inheritdoc
*/
private function connect($node)
{
$socket = null;
Expand Down
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,46 @@ return [
]
];
```

Cluster Cache Configuration
---------------------------

By default Redis Cluster doesn't support MGET/MSET queries from several cluster nodes like this:

```
MGET {user10}.name {user10}.email {user10}.pass {user20}.name {user20}.email {user20}.pass someNotHashedKey
```

To implement this feature you can use ClusterCache:

```php
return [
//....
'components' => [
'class' => 'heyanlong\redis\ClusterCache',
'redis' => [
'class' => 'heyanlong\redis\Connection',
'master' => [
'10.155.20.169:6379',
'10.155.20.167:6391',
'10.155.20.168:6379',
'10.155.20.167:6380',
// 'localhost:6379',
],
'database' => 0,
],


]
];
```

ClusterCache will group keys by hash, and divide one query to several queries:

```
MGET {user10}.name {user10}.email {user10}.pass
MGET {user20}.name {user20}.email {user20}.pass
MGET someNotHashedKey
```

...execute them, and then merge results.