Skip to content
This repository was archived by the owner on Jun 10, 2022. It is now read-only.

Auto create topics producer #215

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 44 additions & 24 deletions src/Broker.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Kafka\Sasl\Plain;
use Kafka\Sasl\Scram;
use function array_keys;
use function array_walk_recursive;
use function explode;
use function in_array;
use function serialize;
Expand All @@ -18,6 +19,9 @@ class Broker
{
use SingletonTrait;

public const SOCKET_MODE_ASYNC = 0;
public const SOCKET_MODE_SYNC = 1;

/**
* @var int
*/
Expand All @@ -43,6 +47,9 @@ class Broker
*/
private $dataSockets = [];

/** @var SocketFactory */
private $socketFactory;

/**
* @var callable|null
*/
Expand All @@ -53,6 +60,11 @@ class Broker
*/
private $config;

public function setSocketFactory(SocketFactory $socketFactory): void
{
$this->socketFactory = $socketFactory;
}

public function setProcess(callable $process): void
{
$this->process = $process;
Expand Down Expand Up @@ -134,12 +146,12 @@ public function getBrokers(): array
return $this->brokers;
}

public function getMetaConnect(string $key, bool $modeSync = false): ?CommonSocket
public function getMetaConnect(string $key, int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket
{
return $this->getConnect($key, 'metaSockets', $modeSync);
return $this->getConnect($key, 'metaSockets', $mode);
}

public function getRandConnect(bool $modeSync = false): ?CommonSocket
public function getRandConnect(int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket
{
$nodeIds = array_keys($this->brokers);
shuffle($nodeIds);
Expand All @@ -148,24 +160,24 @@ public function getRandConnect(bool $modeSync = false): ?CommonSocket
return null;
}

return $this->getMetaConnect((string) $nodeIds[0], $modeSync);
return $this->getMetaConnect((string) $nodeIds[0], $mode);
}

public function getDataConnect(string $key, bool $modeSync = false): ?CommonSocket
public function getDataConnect(string $key, int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket
{
return $this->getConnect($key, 'dataSockets', $modeSync);
return $this->getConnect($key, 'dataSockets', $mode);
}

public function getConnect(string $key, string $type, bool $modeSync = false): ?CommonSocket
public function getConnect(string $key, string $type, int $mode = self::SOCKET_MODE_ASYNC): ?CommonSocket
{
if (isset($this->{$type}[$key])) {
return $this->{$type}[$key];
if (isset($this->{$type}[$key][$mode])) {
return $this->{$type}[$key][$mode];
}

if (isset($this->brokers[$key])) {
$hostname = $this->brokers[$key];
if (isset($this->{$type}[$hostname])) {
return $this->{$type}[$hostname];
if (isset($this->{$type}[$hostname][$mode])) {
return $this->{$type}[$hostname][$mode];
}
}

Expand All @@ -182,19 +194,19 @@ public function getConnect(string $key, string $type, bool $modeSync = false): ?
[$host, $port] = explode(':', $key);
}

if ($host === null || $port === null || (! $modeSync && $this->process === null)) {
if ($host === null || $port === null || ($mode === self::SOCKET_MODE_ASYNC && $this->process === null)) {
return null;
}

try {
$socket = $this->getSocket((string) $host, (int) $port, $modeSync);
$socket = $this->getSocket((string) $host, (int) $port, $mode);

if ($socket instanceof Socket && $this->process !== null) {
$socket->setOnReadable($this->process);
}

$socket->connect();
$this->{$type}[$key] = $socket;
$this->{$type}[$key][$mode] = $socket;

return $socket;
} catch (\Throwable $e) {
Expand All @@ -205,30 +217,29 @@ public function getConnect(string $key, string $type, bool $modeSync = false): ?

public function clear(): void
{
foreach ($this->metaSockets as $key => $socket) {
$socket->close();
}
foreach ($this->dataSockets as $key => $socket) {
$sockets = [$this->metaSockets, $this->dataSockets];

array_walk_recursive($sockets, function (CommonSocket $socket): void {
$socket->close();
}
});

$this->brokers = [];
}

/**
* @throws \Kafka\Exception
*/
public function getSocket(string $host, int $port, bool $modeSync): CommonSocket
public function getSocket(string $host, int $port, int $mode): CommonSocket
{
$saslProvider = $this->judgeConnectionConfig();

if ($modeSync) {
return new SocketSync($host, $port, $this->config, $saslProvider);
if ($mode === self::SOCKET_MODE_SYNC) {
return $this->getSocketFactory()->createSocketSync($host, $port, $this->config, $saslProvider);
}

return new Socket($host, $port, $this->config, $saslProvider);
return $this->getSocketFactory()->createSocket($host, $port, $this->config, $saslProvider);
}


/**
* @throws \Kafka\Exception
*/
Expand Down Expand Up @@ -281,4 +292,13 @@ private function getSaslMechanismProvider(Config $config): SaslMechanism

throw new Exception(sprintf('"%s" is an invalid SASL mechanism', $mechanism));
}

private function getSocketFactory(): SocketFactory
{
if ($this->socketFactory === null) {
$this->socketFactory = new SocketFactory();
}

return $this->socketFactory;
}
}
7 changes: 7 additions & 0 deletions src/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* @method int getMetadataRequestTimeoutMs()
* @method int getMetadataRefreshIntervalMs()
* @method int getMetadataMaxAgeMs()
* @method bool getAutoCreateTopicsEnable()
* @method string getSecurityProtocol()
* @method bool getSslEnable()
* @method void setSslEnable(bool $sslEnable)
Expand Down Expand Up @@ -84,6 +85,7 @@ abstract class Config
'metadataRequestTimeoutMs' => 60000,
'metadataRefreshIntervalMs' => 300000,
'metadataMaxAgeMs' => -1,
'autoCreateTopicsEnable' => true,
'securityProtocol' => self::SECURITY_PROTOCOL_PLAINTEXT,
'sslEnable' => false, // this config item will override, don't config it.
'sslLocalCert' => '',
Expand Down Expand Up @@ -241,6 +243,11 @@ public function setMetadataMaxAgeMs(int $metadataMaxAgeMs): void
static::$options['metadataMaxAgeMs'] = $metadataMaxAgeMs;
}

public function setAutoCreateTopicsEnable(bool $flag = true): void
{
static::$options['autoCreateTopicsEnable'] = $flag;
}

/**
* @throws Exception\Config
*/
Expand Down
6 changes: 5 additions & 1 deletion src/Producer/RecordValidator.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Kafka\Producer;

use Kafka\Exception;
use Kafka\ProducerConfig;
use function is_string;
use function trim;

Expand All @@ -30,7 +31,10 @@ public function validate(array $record, array $topicList): void
throw Exception\InvalidRecordInSet::missingTopic();
}

if (! isset($topicList[$record['topic']])) {
/** @var ProducerConfig $config */
$config = ProducerConfig::getInstance();

if (! isset($topicList[$record['topic']]) && ! $config->getAutoCreateTopicsEnable()) {
throw Exception\InvalidRecordInSet::nonExististingTopic($record['topic']);
}

Expand Down
4 changes: 2 additions & 2 deletions src/Producer/SyncProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public function send(array $recordSet): array
$sendData = $this->convertRecordSet($recordSet);
$result = [];
foreach ($sendData as $brokerId => $topicList) {
$connect = $broker->getDataConnect((string) $brokerId, true);
$connect = $broker->getDataConnect((string) $brokerId, Broker::SOCKET_MODE_SYNC);

if ($connect === null) {
return [];
Expand Down Expand Up @@ -118,7 +118,7 @@ public function syncMeta(): void
$broker = $this->getBroker();

foreach ($brokerHost as $host) {
$socket = $broker->getMetaConnect($host, true);
$socket = $broker->getMetaConnect($host, Broker::SOCKET_MODE_SYNC);

if ($socket === null) {
continue;
Expand Down
25 changes: 25 additions & 0 deletions src/SocketFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php
declare(strict_types=1);

namespace Kafka;

class SocketFactory
{
public function createSocket(
string $host,
int $port,
?Config $config = null,
?SaslMechanism $saslProvider = null
): Socket {
return new Socket($host, $port, $config, $saslProvider);
}

public function createSocketSync(
string $host,
int $port,
?Config $config = null,
?SaslMechanism $saslProvider = null
): SocketSync {
return new SocketSync($host, $port, $config, $saslProvider);
}
}
33 changes: 31 additions & 2 deletions tests/Base/BrokerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use Kafka\Broker;
use Kafka\Socket;
use Kafka\SocketFactory;
use Kafka\SocketSync;
use PHPUnit\Framework\TestCase;

Expand Down Expand Up @@ -95,7 +96,7 @@ public function testData(): void
$this->assertEquals($topics, $broker->getTopics());
}

public function getConnect(): void
public function testGetConnect(): void
{
$broker = $this->getBroker();
$data = [
Expand Down Expand Up @@ -128,6 +129,34 @@ public function getConnect(): void
$this->assertNull($result);
}

public function testGetConnectSyncAndAsyncForTheSameBroker(): void
{
$socket = $this->getMockBuilder(Socket::class)
->disableOriginalConstructor()
->getMock();

$socketSync = $this->getMockBuilder(SocketSync::class)
->disableOriginalConstructor()
->getMock();

$socketFactory = $this->getMockBuilder(SocketFactory::class)
->setMethods(['createSocket', 'createSocketSync'])
->getMock();

$socketFactory->method('createSocket')
->willReturn($socket);
$socketFactory->method('createSocketSync')
->willReturn($socketSync);

$broker = $this->getBroker();
$broker->setSocketFactory($socketFactory);
$broker->setProcess(function (): void {
});

$this->assertSame($socket, $broker->getConnect('kafka:9092', 'metaSockets'));
$this->assertSame($socketSync, $broker->getConnect('kafka:9092', 'metaSockets', Broker::SOCKET_MODE_SYNC));
}

public function testConnectRandFalse(): void
{
$broker = $this->getBroker();
Expand All @@ -141,7 +170,7 @@ public function testGetSocketNotSetConfig(): void
$broker = $this->getBroker();
$hostname = '127.0.0.1';
$port = 9092;
$socket = $broker->getSocket($hostname, $port, true);
$socket = $broker->getSocket($hostname, $port, Broker::SOCKET_MODE_SYNC);

$this->assertInstanceOf(SocketSync::class, $socket);
}
Expand Down
18 changes: 18 additions & 0 deletions tests/Base/Producer/RecordValidatorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use Kafka\Exception\InvalidRecordInSet;
use Kafka\Producer\RecordValidator;
use Kafka\ProducerConfig;
use PHPUnit\Framework\TestCase;

final class RecordValidatorTest extends TestCase
Expand All @@ -16,6 +17,10 @@ public function setUp(): void
{
$this->recordValidator = new RecordValidator();

/** @var ProducerConfig $config */
$config = ProducerConfig::getInstance();
$config->setAutoCreateTopicsEnable(false);

parent::setUp();
}

Expand Down Expand Up @@ -61,4 +66,17 @@ public function invalidRecordThrowsExceptionDataProvider(): array
],
];
}

/**
* @doesNotPerformAssertions
* @phpcsSuppress SlevomatCodingStandard.TypeHints.TypeHintDeclaration.UselessDocComment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't suppress CS violations

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took it from the test above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh I get it, it's because we're using an old version of doctrine/coding-standard. Let's keep it for now and I created #216 to handle that. Thanks!

*/
public function testValidateNonExistingTopicWhenAutoCreateTopicsEnabled(): void
{
/** @var ProducerConfig $config */
$config = ProducerConfig::getInstance();
$config->setAutoCreateTopicsEnable(true);

$this->recordValidator->validate(['topic' => 'test', 'value' => 'a value'], []);
}
}
2 changes: 2 additions & 0 deletions tests/Functional/Ticket/GH181Test.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ public function setUp(): void
);
}

/** @var ProducerConfig $config */
$config = ProducerConfig::getInstance();
$config->setMetadataBrokerList($brokers);
$config->setBrokerVersion($version);
$config->setAutoCreateTopicsEnable(false);

parent::setUp();
}
Expand Down