diff --git a/src/Broker.php b/src/Broker.php index b0553d43..fc6c365c 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -7,6 +7,7 @@ use Kafka\Sasl\Plain; use Kafka\Sasl\Scram; use function array_keys; +use function array_walk_recursive; use function explode; use function in_array; use function serialize; @@ -18,6 +19,9 @@ class Broker { use SingletonTrait; + public const SOCKET_MODE_ASYNC = 0; + public const SOCKET_MODE_SYNC = 1; + /** * @var int */ @@ -43,6 +47,9 @@ class Broker */ private $dataSockets = []; + /** @var SocketFactory */ + private $socketFactory; + /** * @var callable|null */ @@ -53,6 +60,11 @@ class Broker */ private $config; + public function setSocketFactory(SocketFactory $socketFactory): void + { + $this->socketFactory = $socketFactory; + } + public function setProcess(callable $process): void { $this->process = $process; @@ -134,12 +146,12 @@ public function getBrokers(): array return $this->brokers; } - public function getMetaConnect(string $key, bool $modeSync = false): ?CommonSocket + public function getMetaConnect(string $key, int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket { - return $this->getConnect($key, 'metaSockets', $modeSync); + return $this->getConnect($key, 'metaSockets', $mode); } - public function getRandConnect(bool $modeSync = false): ?CommonSocket + public function getRandConnect(int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket { $nodeIds = array_keys($this->brokers); shuffle($nodeIds); @@ -148,24 +160,24 @@ public function getRandConnect(bool $modeSync = false): ?CommonSocket return null; } - return $this->getMetaConnect((string) $nodeIds[0], $modeSync); + return $this->getMetaConnect((string) $nodeIds[0], $mode); } - public function getDataConnect(string $key, bool $modeSync = false): ?CommonSocket + public function getDataConnect(string $key, int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket { - return $this->getConnect($key, 'dataSockets', $modeSync); + return $this->getConnect($key, 'dataSockets', $mode); } - public function getConnect(string $key, string $type, bool $modeSync = false): ?CommonSocket + public function getConnect(string $key, string $type, int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket { - if (isset($this->{$type}[$key])) { - return $this->{$type}[$key]; + if (isset($this->{$type}[$key][$mode])) { + return $this->{$type}[$key][$mode]; } if (isset($this->brokers[$key])) { $hostname = $this->brokers[$key]; - if (isset($this->{$type}[$hostname])) { - return $this->{$type}[$hostname]; + if (isset($this->{$type}[$hostname][$mode])) { + return $this->{$type}[$hostname][$mode]; } } @@ -182,19 +194,19 @@ public function getConnect(string $key, string $type, bool $modeSync = false): ? [$host, $port] = explode(':', $key); } - if ($host === null || $port === null || (! $modeSync && $this->process === null)) { + if ($host === null || $port === null || ($mode === self::SOCKET_MODE_ASYNC && $this->process === null)) { return null; } try { - $socket = $this->getSocket((string) $host, (int) $port, $modeSync); + $socket = $this->getSocket((string) $host, (int) $port, $mode); if ($socket instanceof Socket && $this->process !== null) { $socket->setOnReadable($this->process); } $socket->connect(); - $this->{$type}[$key] = $socket; + $this->{$type}[$key][$mode] = $socket; return $socket; } catch (\Throwable $e) { @@ -205,30 +217,29 @@ public function getConnect(string $key, string $type, bool $modeSync = false): ? public function clear(): void { - foreach ($this->metaSockets as $key => $socket) { - $socket->close(); - } - foreach ($this->dataSockets as $key => $socket) { + $sockets = [$this->metaSockets, $this->dataSockets]; + + array_walk_recursive($sockets, function (CommonSocket $socket): void { $socket->close(); - } + }); + $this->brokers = []; } /** * @throws \Kafka\Exception */ - public function getSocket(string $host, int $port, bool $modeSync): CommonSocket + public function getSocket(string $host, int $port, int $mode): CommonSocket { $saslProvider = $this->judgeConnectionConfig(); - if ($modeSync) { - return new SocketSync($host, $port, $this->config, $saslProvider); + if ($mode === self::SOCKET_MODE_SYNC) { + return $this->getSocketFactory()->createSocketSync($host, $port, $this->config, $saslProvider); } - return new Socket($host, $port, $this->config, $saslProvider); + return $this->getSocketFactory()->createSocket($host, $port, $this->config, $saslProvider); } - /** * @throws \Kafka\Exception */ @@ -281,4 +292,13 @@ private function getSaslMechanismProvider(Config $config): SaslMechanism throw new Exception(sprintf('"%s" is an invalid SASL mechanism', $mechanism)); } + + private function getSocketFactory(): SocketFactory + { + if ($this->socketFactory === null) { + $this->socketFactory = new SocketFactory(); + } + + return $this->socketFactory; + } } diff --git a/src/Config.php b/src/Config.php index bb3827e2..02aea8cd 100644 --- a/src/Config.php +++ b/src/Config.php @@ -24,6 +24,7 @@ * @method int getMetadataRequestTimeoutMs() * @method int getMetadataRefreshIntervalMs() * @method int getMetadataMaxAgeMs() + * @method bool getAutoCreateTopicsEnable() * @method string getSecurityProtocol() * @method bool getSslEnable() * @method void setSslEnable(bool $sslEnable) @@ -84,6 +85,7 @@ abstract class Config 'metadataRequestTimeoutMs' => 60000, 'metadataRefreshIntervalMs' => 300000, 'metadataMaxAgeMs' => -1, + 'autoCreateTopicsEnable' => true, 'securityProtocol' => self::SECURITY_PROTOCOL_PLAINTEXT, 'sslEnable' => false, // this config item will override, don't config it. 'sslLocalCert' => '', @@ -241,6 +243,11 @@ public function setMetadataMaxAgeMs(int $metadataMaxAgeMs): void static::$options['metadataMaxAgeMs'] = $metadataMaxAgeMs; } + public function setAutoCreateTopicsEnable(bool $flag = true): void + { + static::$options['autoCreateTopicsEnable'] = $flag; + } + /** * @throws Exception\Config */ diff --git a/src/Producer/RecordValidator.php b/src/Producer/RecordValidator.php index 22e07cee..d962710d 100644 --- a/src/Producer/RecordValidator.php +++ b/src/Producer/RecordValidator.php @@ -4,6 +4,7 @@ namespace Kafka\Producer; use Kafka\Exception; +use Kafka\ProducerConfig; use function is_string; use function trim; @@ -30,7 +31,10 @@ public function validate(array $record, array $topicList): void throw Exception\InvalidRecordInSet::missingTopic(); } - if (! isset($topicList[$record['topic']])) { + /** @var ProducerConfig $config */ + $config = ProducerConfig::getInstance(); + + if (! isset($topicList[$record['topic']]) && ! $config->getAutoCreateTopicsEnable()) { throw Exception\InvalidRecordInSet::nonExististingTopic($record['topic']); } diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index 71b1cb06..206a8045 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -67,7 +67,7 @@ public function send(array $recordSet): array $sendData = $this->convertRecordSet($recordSet); $result = []; foreach ($sendData as $brokerId => $topicList) { - $connect = $broker->getDataConnect((string) $brokerId, true); + $connect = $broker->getDataConnect((string) $brokerId, Broker::SOCKET_MODE_SYNC); if ($connect === null) { return []; @@ -118,7 +118,7 @@ public function syncMeta(): void $broker = $this->getBroker(); foreach ($brokerHost as $host) { - $socket = $broker->getMetaConnect($host, true); + $socket = $broker->getMetaConnect($host, Broker::SOCKET_MODE_SYNC); if ($socket === null) { continue; diff --git a/src/SocketFactory.php b/src/SocketFactory.php new file mode 100644 index 00000000..4633800a --- /dev/null +++ b/src/SocketFactory.php @@ -0,0 +1,25 @@ +assertEquals($topics, $broker->getTopics()); } - public function getConnect(): void + public function testGetConnect(): void { $broker = $this->getBroker(); $data = [ @@ -128,6 +129,34 @@ public function getConnect(): void $this->assertNull($result); } + public function testGetConnectSyncAndAsyncForTheSameBroker(): void + { + $socket = $this->getMockBuilder(Socket::class) + ->disableOriginalConstructor() + ->getMock(); + + $socketSync = $this->getMockBuilder(SocketSync::class) + ->disableOriginalConstructor() + ->getMock(); + + $socketFactory = $this->getMockBuilder(SocketFactory::class) + ->setMethods(['createSocket', 'createSocketSync']) + ->getMock(); + + $socketFactory->method('createSocket') + ->willReturn($socket); + $socketFactory->method('createSocketSync') + ->willReturn($socketSync); + + $broker = $this->getBroker(); + $broker->setSocketFactory($socketFactory); + $broker->setProcess(function (): void { + }); + + $this->assertSame($socket, $broker->getConnect('kafka:9092', 'metaSockets')); + $this->assertSame($socketSync, $broker->getConnect('kafka:9092', 'metaSockets', Broker::SOCKET_MODE_SYNC)); + } + public function testConnectRandFalse(): void { $broker = $this->getBroker(); @@ -141,7 +170,7 @@ public function testGetSocketNotSetConfig(): void $broker = $this->getBroker(); $hostname = '127.0.0.1'; $port = 9092; - $socket = $broker->getSocket($hostname, $port, true); + $socket = $broker->getSocket($hostname, $port, Broker::SOCKET_MODE_SYNC); $this->assertInstanceOf(SocketSync::class, $socket); } diff --git a/tests/Base/Producer/RecordValidatorTest.php b/tests/Base/Producer/RecordValidatorTest.php index 7572213a..459b1f2b 100644 --- a/tests/Base/Producer/RecordValidatorTest.php +++ b/tests/Base/Producer/RecordValidatorTest.php @@ -5,6 +5,7 @@ use Kafka\Exception\InvalidRecordInSet; use Kafka\Producer\RecordValidator; +use Kafka\ProducerConfig; use PHPUnit\Framework\TestCase; final class RecordValidatorTest extends TestCase @@ -16,6 +17,10 @@ public function setUp(): void { $this->recordValidator = new RecordValidator(); + /** @var ProducerConfig $config */ + $config = ProducerConfig::getInstance(); + $config->setAutoCreateTopicsEnable(false); + parent::setUp(); } @@ -61,4 +66,17 @@ public function invalidRecordThrowsExceptionDataProvider(): array ], ]; } + + /** + * @doesNotPerformAssertions + * @phpcsSuppress SlevomatCodingStandard.TypeHints.TypeHintDeclaration.UselessDocComment + */ + public function testValidateNonExistingTopicWhenAutoCreateTopicsEnabled(): void + { + /** @var ProducerConfig $config */ + $config = ProducerConfig::getInstance(); + $config->setAutoCreateTopicsEnable(true); + + $this->recordValidator->validate(['topic' => 'test', 'value' => 'a value'], []); + } } diff --git a/tests/Functional/Ticket/GH181Test.php b/tests/Functional/Ticket/GH181Test.php index 224c4f61..5dcae2c1 100644 --- a/tests/Functional/Ticket/GH181Test.php +++ b/tests/Functional/Ticket/GH181Test.php @@ -22,9 +22,11 @@ public function setUp(): void ); } + /** @var ProducerConfig $config */ $config = ProducerConfig::getInstance(); $config->setMetadataBrokerList($brokers); $config->setBrokerVersion($version); + $config->setAutoCreateTopicsEnable(false); parent::setUp(); }