diff --git a/docker-compose.yml b/docker-compose.yml index c5fa5545b..b8853d946 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -38,16 +38,16 @@ services: - PREDIS_DSN=redis+predis://redis - PHPREDIS_DSN=redis+phpredis://redis - GPS_DSN=gps:?projectId=mqdev&emulatorHost=http://google-pubsub:8085 - - SQS_DSN=sqs:?key=key&secret=secret®ion=us-east-1&endpoint=http://localstack:4576&version=latest - - SNS_DSN=sns:?key=key&secret=secret®ion=us-east-1&endpoint=http://localstack:4575&version=latest - - SNSQS_DSN=snsqs:?key=key&secret=secret®ion=us-east-1&sns_endpoint=http://localstack:4575&sqs_endpoint=http://localstack:4576&version=latest + - SQS_DSN=sqs:?key=key&secret=secret®ion=us-east-1&endpoint=http://localstack:4566&version=latest + - SNS_DSN=sns:?key=key&secret=secret®ion=us-east-1&endpoint=http://localstack:4566&version=latest + - SNSQS_DSN=snsqs:?key=key&secret=secret®ion=us-east-1&sns_endpoint=http://localstack:4566&sqs_endpoint=http://localstack:4566&version=latest - WAMP_DSN=wamp://thruway:9090 - REDIS_HOST=redis - REDIS_PORT=6379 - AWS_SQS_KEY=key - AWS_SQS_SECRET=secret - AWS_SQS_REGION=us-east-1 - - AWS_SQS_ENDPOINT=http://localstack:4576 + - AWS_SQS_ENDPOINT=http://localstack:4566 - AWS_SQS_VERSION=latest - BEANSTALKD_DSN=beanstalk://beanstalkd:11300 - GEARMAN_DSN=gearman://gearmand:4730 @@ -127,10 +127,9 @@ services: - '9090:9090' localstack: - image: 'localstack/localstack:0.8.10' + image: 'localstack/localstack:3.0.2' ports: - - '4576:4576' - - '4575:4575' + - '4566:4566' environment: HOSTNAME_EXTERNAL: 'localstack' SERVICES: 'sqs,sns' diff --git a/docker/bin/test.sh b/docker/bin/test.sh index 2070584bb..cfc94aab5 100755 --- a/docker/bin/test.sh +++ b/docker/bin/test.sh @@ -39,7 +39,7 @@ waitForService gearmand 4730 50 waitForService kafka 9092 50 waitForService mongo 27017 50 waitForService thruway 9090 50 -waitForService localstack 4576 50 +waitForService localstack 4566 50 php docker/bin/refresh-mysql-database.php || exit 1 php docker/bin/refresh-postgres-database.php || exit 1 diff --git a/pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php b/pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php index a90b1d306..91ec9cb2e 100644 --- a/pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php +++ b/pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php @@ -158,10 +158,11 @@ public function testConsumerReceiveMessageFromTopicDirectly() $this->amqpContext->declareTopic($topic); $consumer = $this->amqpContext->createConsumer($topic); - //guard + // guard $this->assertNull($consumer->receive(1000)); $message = $this->amqpContext->createMessage(__METHOD__); + $message->setDeliveryTag(0); $producer = $this->amqpContext->createProducer(); $producer->send($topic, $message); @@ -181,10 +182,11 @@ public function testConsumerReceiveMessageWithZeroTimeout() $this->amqpContext->declareTopic($topic); $consumer = $this->amqpContext->createConsumer($topic); - //guard + // guard $this->assertNull($consumer->receive(1000)); $message = $this->amqpContext->createMessage(__METHOD__); + $message->setDeliveryTag(0); $producer = $this->amqpContext->createProducer(); $producer->send($topic, $message); diff --git a/pkg/gps/GpsConnectionFactory.php b/pkg/gps/GpsConnectionFactory.php index c15854763..9b59923d3 100644 --- a/pkg/gps/GpsConnectionFactory.php +++ b/pkg/gps/GpsConnectionFactory.php @@ -77,10 +77,14 @@ public function createContext(): Context if ($this->config['lazy']) { return new GpsContext(function () { return $this->establishConnection(); - }); + }, [ + 'serilalizeToJson' => $this->config['serilalizeToJson'], + ]); } - return new GpsContext($this->establishConnection()); + return new GpsContext($this->establishConnection(), [ + 'serilalizeToJson' => $this->config['serilalizeToJson'], + ]); } private function parseDsn(string $dsn): array @@ -88,10 +92,7 @@ private function parseDsn(string $dsn): array $dsn = Dsn::parseFirst($dsn); if ('gps' !== $dsn->getSchemeProtocol()) { - throw new \LogicException(sprintf( - 'The given scheme protocol "%s" is not supported. It must be "gps"', - $dsn->getSchemeProtocol() - )); + throw new \LogicException(sprintf('The given scheme protocol "%s" is not supported. It must be "gps"', $dsn->getSchemeProtocol())); } $emulatorHost = $dsn->getString('emulatorHost'); @@ -105,6 +106,7 @@ private function parseDsn(string $dsn): array 'emulatorHost' => $emulatorHost, 'hasEmulator' => $hasEmulator, 'lazy' => $dsn->getBool('lazy'), + 'serilalizeToJson' => $dsn->getBool('serilalizeToJson'), ]), function ($value) { return null !== $value; }); } @@ -121,6 +123,7 @@ private function defaultConfig(): array { return [ 'lazy' => true, + 'serilalizeToJson' => true, ]; } } diff --git a/pkg/gps/GpsConsumer.php b/pkg/gps/GpsConsumer.php index e2a1be272..de67ca085 100644 --- a/pkg/gps/GpsConsumer.php +++ b/pkg/gps/GpsConsumer.php @@ -110,7 +110,14 @@ private function getSubscription(): Subscription private function convertMessage(GoogleMessage $message): GpsMessage { - $gpsMessage = GpsMessage::jsonUnserialize($message->data()); + $options = $this->context->getOptions(); + + if ($options['serilalizeToJson']) { + $gpsMessage = GpsMessage::jsonUnserialize($message->data()); + } else { + $gpsMessage = new GpsMessage($message->data(), $message->attributes()); + } + $gpsMessage->setNativeMessage($message); return $gpsMessage; diff --git a/pkg/gps/GpsContext.php b/pkg/gps/GpsContext.php index 77e6200cf..ab88cc0c3 100644 --- a/pkg/gps/GpsContext.php +++ b/pkg/gps/GpsContext.php @@ -52,11 +52,7 @@ public function __construct($client, array $options = []) } elseif (is_callable($client)) { $this->clientFactory = $client; } else { - throw new \InvalidArgumentException(sprintf( - 'The $client argument must be either %s or callable that returns %s once called.', - PubSubClient::class, - PubSubClient::class - )); + throw new \InvalidArgumentException(sprintf('The $client argument must be either %s or callable that returns %s once called.', PubSubClient::class, PubSubClient::class)); } } @@ -148,11 +144,7 @@ public function getClient(): PubSubClient if (false == $this->client) { $client = call_user_func($this->clientFactory); if (false == $client instanceof PubSubClient) { - throw new \LogicException(sprintf( - 'The factory must return instance of %s. It returned %s', - PubSubClient::class, - is_object($client) ? get_class($client) : gettype($client) - )); + throw new \LogicException(sprintf('The factory must return instance of %s. It returned %s', PubSubClient::class, is_object($client) ? get_class($client) : gettype($client))); } $this->client = $client; @@ -160,4 +152,9 @@ public function getClient(): PubSubClient return $this->client; } + + public function getOptions(): array + { + return $this->options; + } } diff --git a/pkg/gps/Tests/GpsConnectionFactoryConfigTest.php b/pkg/gps/Tests/GpsConnectionFactoryConfigTest.php index 039e86cd4..8f843668a 100644 --- a/pkg/gps/Tests/GpsConnectionFactoryConfigTest.php +++ b/pkg/gps/Tests/GpsConnectionFactoryConfigTest.php @@ -41,9 +41,6 @@ public function testThrowIfDsnCouldNotBeParsed() /** * @dataProvider provideConfigs - * - * @param mixed $config - * @param mixed $expectedConfig */ public function testShouldParseConfigurationAsExpected($config, $expectedConfig) { @@ -58,6 +55,7 @@ public static function provideConfigs() null, [ 'lazy' => true, + 'serilalizeToJson' => true, ], ]; @@ -65,6 +63,7 @@ public static function provideConfigs() 'gps:', [ 'lazy' => true, + 'serilalizeToJson' => true, ], ]; @@ -72,6 +71,7 @@ public static function provideConfigs() [], [ 'lazy' => true, + 'serilalizeToJson' => true, ], ]; @@ -83,6 +83,7 @@ public static function provideConfigs() 'emulatorHost' => 'http://google-pubsub:8085', 'hasEmulator' => true, 'lazy' => true, + 'serilalizeToJson' => true, ], ]; @@ -94,6 +95,7 @@ public static function provideConfigs() 'emulatorHost' => 'http://google-pubsub:8085', 'hasEmulator' => true, 'lazy' => true, + 'serilalizeToJson' => true, ], ]; @@ -104,6 +106,19 @@ public static function provideConfigs() 'projectId' => 'mqdev', 'emulatorHost' => 'http://Fgoogle-pubsub:8085', 'lazy' => false, + 'serilalizeToJson' => true, + ], + ]; + + yield [ + ['dsn' => 'gps:?foo=fooVal&projectId=mqdev&emulatorHost=http%3A%2F%2Fgoogle-pubsub%3A8085&serilalizeToJson=false'], + [ + 'foo' => 'fooVal', + 'projectId' => 'mqdev', + 'emulatorHost' => 'http://google-pubsub:8085', + 'hasEmulator' => true, + 'lazy' => true, + 'serilalizeToJson' => false, ], ]; } diff --git a/pkg/gps/Tests/GpsConsumerTest.php b/pkg/gps/Tests/GpsConsumerTest.php index 0b4b925ce..a11bcd7ff 100644 --- a/pkg/gps/Tests/GpsConsumerTest.php +++ b/pkg/gps/Tests/GpsConsumerTest.php @@ -131,6 +131,11 @@ public function testShouldReceiveMessageNoWait() ->willReturn($client) ; + $context + ->expects($this->once()) + ->method('getOptions') + ->willReturn(['serilalizeToJson' => true]); + $consumer = new GpsConsumer($context, new GpsQueue('queue-name')); $message = $consumer->receiveNoWait(); @@ -171,6 +176,11 @@ public function testShouldReceiveMessage() ->willReturn($client) ; + $context + ->expects($this->once()) + ->method('getOptions') + ->willReturn(['serilalizeToJson' => true]); + $consumer = new GpsConsumer($context, new GpsQueue('queue-name')); $message = $consumer->receive(12345); @@ -179,6 +189,94 @@ public function testShouldReceiveMessage() $this->assertSame('the body', $message->getBody()); } + public function testShouldReceiveMessageUnSerialize() + { + $message = new GpsMessage('the body'); + $nativeMessage = new Message([ + 'data' => json_encode($message), + ], []); + + $subscription = $this->createSubscriptionMock(); + $subscription + ->expects($this->once()) + ->method('pull') + ->with($this->identicalTo([ + 'maxMessages' => 1, + 'requestTimeout' => 12.345, + ])) + ->willReturn([$nativeMessage]); + + $client = $this->createPubSubClientMock(); + $client + ->expects($this->once()) + ->method('subscription') + ->willReturn($subscription); + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getClient') + ->willReturn($client); + $context + ->expects($this->once()) + ->method('getOptions') + ->willReturn(['serilalizeToJson' => false]); + + $consumer = new GpsConsumer($context, new GpsQueue('queue-name')); + + $message = $consumer->receive(12345); + + $this->assertInstanceOf(GpsMessage::class, $message); + $this->assertSame($nativeMessage->data(), $message->getBody()); + $this->assertSame($nativeMessage->attributes(), $message->getProperties()); + } + + public function testShouldReceiveMessageUnSerializeWithAttributes() + { + $message = new GpsMessage('the body'); + $nativeMessage = new Message([ + 'data' => json_encode($message), + 'attributes' => [ + 'foo' => 'fooVal', + 'bar' => 'barVal', + ], + ], []); + + $subscription = $this->createSubscriptionMock(); + $subscription + ->expects($this->once()) + ->method('pull') + ->with($this->identicalTo([ + 'maxMessages' => 1, + 'requestTimeout' => 12.345, + ])) + ->willReturn([$nativeMessage]); + + $client = $this->createPubSubClientMock(); + $client + ->expects($this->once()) + ->method('subscription') + ->willReturn($subscription); + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getClient') + ->willReturn($client); + $context + ->expects($this->once()) + ->method('getOptions') + ->willReturn(['serilalizeToJson' => false]); + + $consumer = new GpsConsumer($context, new GpsQueue('queue-name')); + + $message = $consumer->receive(12345); + + $this->assertInstanceOf(GpsMessage::class, $message); + $this->assertSame($nativeMessage->data(), $message->getBody()); + $this->assertSame($nativeMessage->attributes(), $message->getProperties()); + } + /** * @return \PHPUnit\Framework\MockObject\MockObject|GpsContext */ diff --git a/pkg/gps/Tests/GpsContextTest.php b/pkg/gps/Tests/GpsContextTest.php index 658e2659e..d7640bff8 100644 --- a/pkg/gps/Tests/GpsContextTest.php +++ b/pkg/gps/Tests/GpsContextTest.php @@ -86,6 +86,13 @@ public function testCreateConsumerShouldThrowExceptionIfInvalidDestination() $context->createConsumer(new GpsTopic('')); } + public function testShouldReturnOptions() + { + $context = new GpsContext($this->createPubSubClientMock(), ['foo' => 'fooVal']); + + $this->assertSame(['ackDeadlineSeconds' => 10, 'foo' => 'fooVal'], $context->getOptions()); + } + /** * @return PubSubClient|\PHPUnit\Framework\MockObject\MockObject|PubSubClient */ diff --git a/pkg/gps/Tests/GpsMessageTest.php b/pkg/gps/Tests/GpsMessageTest.php index a43a22315..8182333cc 100644 --- a/pkg/gps/Tests/GpsMessageTest.php +++ b/pkg/gps/Tests/GpsMessageTest.php @@ -29,7 +29,7 @@ public function testCouldBeUnserializedFromJson() $json = json_encode($message); - //guard + // guard $this->assertNotEmpty($json); $unserializedMessage = GpsMessage::jsonUnserialize($json); diff --git a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php index 30485b072..4bcaf357b 100644 --- a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php +++ b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php @@ -23,7 +23,6 @@ protected function createContext() /** * @param GpsContext $context - * @param mixed $queueName */ protected function createQueue(Context $context, $queueName) { diff --git a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php index 240c71a3b..3a96bb533 100644 --- a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php +++ b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php @@ -23,7 +23,6 @@ protected function createContext() /** * @param GpsContext $context - * @param mixed $queueName */ protected function createQueue(Context $context, $queueName) {