diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 0000000..415baf6
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,30 @@
+name: CI
+on:
+ pull_request:
+ push:
+ branches:
+ - master
+jobs:
+ tests:
+ runs-on: ubuntu-latest
+ strategy:
+ fail-fast: false
+ matrix:
+ php: ['7.4', '8.0', '8.1', '8.2']
+
+ name: PHP ${{ matrix.php }} tests
+
+ steps:
+ - uses: actions/checkout@v2
+
+ - uses: shivammathur/setup-php@v2
+ with:
+ php-version: ${{ matrix.php }}
+ coverage: none
+ extensions: mongodb
+
+ - uses: "ramsey/composer-install@v1"
+ with:
+ composer-options: "--prefer-source"
+
+ - run: vendor/bin/phpunit --exclude-group=functional
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index 6415d29..0000000
--- a/.travis.yml
+++ /dev/null
@@ -1,27 +0,0 @@
-sudo: false
-
-language: php
-
-php:
- - '7.1'
-
-git:
- depth: 10
-
-cache:
- directories:
- - $HOME/.composer/cache
-
-services:
- - mongodb
-
-before_install:
- - echo "extension = mongodb.so" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini
-
-install:
- - composer self-update
- - composer install --prefer-source
-
-script:
- - vendor/bin/phpunit --exclude-group=functional
-
diff --git a/Client/MongodbDriver.php b/Client/MongodbDriver.php
deleted file mode 100644
index aa0d4f9..0000000
--- a/Client/MongodbDriver.php
+++ /dev/null
@@ -1,186 +0,0 @@
- 0,
- MessagePriority::LOW => 1,
- MessagePriority::NORMAL => 2,
- MessagePriority::HIGH => 3,
- MessagePriority::VERY_HIGH => 4,
- ];
-
- /**
- * @param MongodbContext $context
- * @param Config $config
- * @param QueueMetaRegistry $queueMetaRegistry
- */
- public function __construct(MongodbContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
- {
- $this->context = $context;
- $this->config = $config;
- $this->queueMetaRegistry = $queueMetaRegistry;
- }
-
- /**
- * {@inheritdoc}
- *
- * @return MongodbMessage
- */
- public function createTransportMessage(Message $message)
- {
- $properties = $message->getProperties();
-
- $headers = $message->getHeaders();
- $headers['content_type'] = $message->getContentType();
-
- $transportMessage = $this->context->createMessage();
- $transportMessage->setBody($message->getBody());
- $transportMessage->setHeaders($headers);
- $transportMessage->setProperties($properties);
- $transportMessage->setMessageId($message->getMessageId());
- $transportMessage->setTimestamp($message->getTimestamp());
- $transportMessage->setDeliveryDelay($message->getDelay());
- $transportMessage->setReplyTo($message->getReplyTo());
- $transportMessage->setCorrelationId($message->getCorrelationId());
- if (array_key_exists($message->getPriority(), self::$priorityMap)) {
- $transportMessage->setPriority(self::$priorityMap[$message->getPriority()]);
- }
-
- return $transportMessage;
- }
-
- /**
- * @param MongodbMessage $message
- *
- * {@inheritdoc}
- */
- public function createClientMessage(PsrMessage $message)
- {
- $clientMessage = new Message();
-
- $clientMessage->setBody($message->getBody());
- $clientMessage->setHeaders($message->getHeaders());
- $clientMessage->setProperties($message->getProperties());
-
- $clientMessage->setContentType($message->getHeader('content_type'));
- $clientMessage->setMessageId($message->getMessageId());
- $clientMessage->setTimestamp($message->getTimestamp());
- $clientMessage->setDelay($message->getDeliveryDelay());
- $clientMessage->setReplyTo($message->getReplyTo());
- $clientMessage->setCorrelationId($message->getCorrelationId());
-
- $priorityMap = array_flip(self::$priorityMap);
- $priority = array_key_exists($message->getPriority(), $priorityMap) ?
- $priorityMap[$message->getPriority()] :
- MessagePriority::NORMAL;
- $clientMessage->setPriority($priority);
-
- return $clientMessage;
- }
-
- /**
- * {@inheritdoc}
- */
- public function sendToRouter(Message $message)
- {
- if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
- throw new \LogicException('Topic name parameter is required but is not set');
- }
-
- $queue = $this->createQueue($this->config->getRouterQueueName());
- $transportMessage = $this->createTransportMessage($message);
-
- $this->context->createProducer()->send($queue, $transportMessage);
- }
-
- /**
- * {@inheritdoc}
- */
- public function sendToProcessor(Message $message)
- {
- if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
- throw new \LogicException('Processor name parameter is required but is not set');
- }
-
- if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
- throw new \LogicException('Queue name parameter is required but is not set');
- }
-
- $transportMessage = $this->createTransportMessage($message);
- $destination = $this->createQueue($queueName);
-
- $this->context->createProducer()->send($destination, $transportMessage);
- }
-
- /**
- * {@inheritdoc}
- */
- public function createQueue($queueName)
- {
- $transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName();
-
- return $this->context->createQueue($transportName);
- }
-
- /**
- * {@inheritdoc}
- */
- public function setupBroker(LoggerInterface $logger = null)
- {
- $logger = $logger ?: new NullLogger();
- $log = function ($text, ...$args) use ($logger) {
- $logger->debug(sprintf('[MongodbDriver] '.$text, ...$args));
- };
- $contextConfig = $this->context->getConfig();
- $log('Creating database and collection: "%s" "%s"', $contextConfig['dbname'], $contextConfig['collection_name']);
- $this->context->createCollection();
- }
-
- /**
- * {@inheritdoc}
- */
- public function getConfig()
- {
- return $this->config;
- }
-
- /**
- * @return array
- */
- public static function getPriorityMap()
- {
- return self::$priorityMap;
- }
-}
diff --git a/JSON.php b/JSON.php
index 84cac50..481b7f9 100644
--- a/JSON.php
+++ b/JSON.php
@@ -14,10 +14,7 @@ class JSON
public static function decode($string)
{
if (!is_string($string)) {
- throw new \InvalidArgumentException(sprintf(
- 'Accept only string argument but got: "%s"',
- is_object($string) ? get_class($string) : gettype($string)
- ));
+ throw new \InvalidArgumentException(sprintf('Accept only string argument but got: "%s"', is_object($string) ? $string::class : gettype($string)));
}
// PHP7 fix - empty string and null cause syntax error
@@ -26,32 +23,22 @@ public static function decode($string)
}
$decoded = json_decode($string, true);
- if (JSON_ERROR_NONE !== json_last_error()) {
- throw new \InvalidArgumentException(sprintf(
- 'The malformed json given. Error %s and message %s',
- json_last_error(),
- json_last_error_msg()
- ));
+ if (\JSON_ERROR_NONE !== json_last_error()) {
+ throw new \InvalidArgumentException(sprintf('The malformed json given. Error %s and message %s', json_last_error(), json_last_error_msg()));
}
return $decoded;
}
/**
- * @param mixed $value
- *
* @return string
*/
public static function encode($value)
{
- $encoded = json_encode($value, JSON_UNESCAPED_UNICODE);
-
- if (JSON_ERROR_NONE !== json_last_error()) {
- throw new \InvalidArgumentException(sprintf(
- 'Could not encode value into json. Error %s and message %s',
- json_last_error(),
- json_last_error_msg()
- ));
+ $encoded = json_encode($value, \JSON_UNESCAPED_UNICODE);
+
+ if (\JSON_ERROR_NONE !== json_last_error()) {
+ throw new \InvalidArgumentException(sprintf('Could not encode value into json. Error %s and message %s', json_last_error(), json_last_error_msg()));
}
return $encoded;
diff --git a/MongodbConnectionFactory.php b/MongodbConnectionFactory.php
index 1bc2c2d..3d34f73 100644
--- a/MongodbConnectionFactory.php
+++ b/MongodbConnectionFactory.php
@@ -1,11 +1,14 @@
parseDsn($config);
} elseif (is_array($config)) {
- $config = $this->parseDsn(empty($config['dsn']) ? 'mongodb:' : $config['dsn']);
+ $config = array_replace(
+ $config,
+ $this->parseDsn(empty($config['dsn']) ? 'mongodb:' : $config['dsn'])
+ );
} else {
throw new \LogicException('The config must be either an array of options, a DSN string or null');
}
@@ -48,14 +54,17 @@ public function __construct($config = 'mongodb:')
$this->config = $config;
}
- public function createContext()
+ /**
+ * @return MongodbContext
+ */
+ public function createContext(): Context
{
$client = new Client($this->config['dsn']);
return new MongodbContext($client, $this->config);
}
- public static function parseDsn($dsn)
+ public static function parseDsn(string $dsn): array
{
$parsedUrl = parse_url($dsn);
if (false === $parsedUrl) {
@@ -68,11 +77,7 @@ public static function parseDsn($dsn)
'mongodb' => true,
];
if (false == isset($parsedUrl['scheme'])) {
- throw new \LogicException(sprintf(
- 'The given DSN schema "%s" is not supported. There are supported schemes: "%s".',
- $parsedUrl['scheme'],
- implode('", "', array_keys($supported))
- ));
+ throw new \LogicException(sprintf('The given DSN schema "%s" is not supported. There are supported schemes: "%s".', $parsedUrl['scheme'], implode('", "', array_keys($supported))));
}
if ('mongodb:' === $dsn) {
return [
@@ -80,9 +85,11 @@ public static function parseDsn($dsn)
];
}
$config['dsn'] = $dsn;
+ // FIXME this is NOT a dbname but rather authdb. But removing this would be a BC break.
+ // see: https://github.com/php-enqueue/enqueue-dev/issues/1027
if (isset($parsedUrl['path']) && '/' !== $parsedUrl['path']) {
$pathParts = explode('/', $parsedUrl['path']);
- //DB name
+ // DB name
if ($pathParts[1]) {
$config['dbname'] = $pathParts[1];
}
@@ -90,13 +97,16 @@ public static function parseDsn($dsn)
if (isset($parsedUrl['query'])) {
$queryParts = null;
parse_str($parsedUrl['query'], $queryParts);
- //get enqueue attributes values
+ // get enqueue attributes values
if (!empty($queryParts['polling_interval'])) {
- $config['polling_interval'] = $queryParts['polling_interval'];
+ $config['polling_interval'] = (int) $queryParts['polling_interval'];
}
if (!empty($queryParts['enqueue_collection'])) {
$config['collection_name'] = $queryParts['enqueue_collection'];
}
+ if (!empty($queryParts['enqueue_database'])) {
+ $config['dbname'] = $queryParts['enqueue_database'];
+ }
}
return $config;
diff --git a/MongodbConsumer.php b/MongodbConsumer.php
index e3ef368..37ef125 100644
--- a/MongodbConsumer.php
+++ b/MongodbConsumer.php
@@ -1,12 +1,15 @@
context = $context;
$this->queue = $queue;
+
+ $this->pollingInterval = 1000;
}
/**
* Set polling interval in milliseconds.
- *
- * @param int $msec
*/
- public function setPollingInterval($msec)
+ public function setPollingInterval(int $msec): void
{
- $this->pollingInterval = $msec * 1000;
+ $this->pollingInterval = $msec;
}
/**
* Get polling interval in milliseconds.
- *
- * @return int
*/
- public function getPollingInterval()
+ public function getPollingInterval(): int
{
- return (int) $this->pollingInterval / 1000;
+ return $this->pollingInterval;
}
/**
- * {@inheritdoc}
- *
* @return MongodbDestination
*/
- public function getQueue()
+ public function getQueue(): Queue
{
return $this->queue;
}
/**
- * {@inheritdoc}
- *
- * @return MongodbMessage|null
+ * @return MongodbMessage
*/
- public function receive($timeout = 0)
+ public function receive(int $timeout = 0): ?Message
{
$timeout /= 1000;
$startAt = microtime(true);
@@ -81,57 +74,49 @@ public function receive($timeout = 0)
}
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
- return;
+ return null;
}
- usleep($this->pollingInterval);
+ usleep($this->pollingInterval * 1000);
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
- return;
+ return null;
}
}
}
/**
- * {@inheritdoc}
- *
- * @return MongodbMessage|null
+ * @return MongodbMessage
*/
- public function receiveNoWait()
+ public function receiveNoWait(): ?Message
{
return $this->receiveMessage();
}
/**
- * {@inheritdoc}
- *
* @param MongodbMessage $message
*/
- public function acknowledge(PsrMessage $message)
+ public function acknowledge(Message $message): void
{
// does nothing
}
/**
- * {@inheritdoc}
- *
* @param MongodbMessage $message
*/
- public function reject(PsrMessage $message, $requeue = false)
+ public function reject(Message $message, bool $requeue = false): void
{
InvalidMessageException::assertMessageInstanceOf($message, MongodbMessage::class);
if ($requeue) {
+ $message->setRedelivered(true);
$this->context->createProducer()->send($this->queue, $message);
return;
}
}
- /**
- * @return MongodbMessage|null
- */
- protected function receiveMessage()
+ private function receiveMessage(): ?MongodbMessage
{
$now = time();
$collection = $this->context->getCollection();
@@ -153,26 +138,9 @@ protected function receiveMessage()
return null;
}
if (empty($message['time_to_live']) || $message['time_to_live'] > time()) {
- return $this->convertMessage($message);
+ return $this->context->convertMessage($message);
}
- }
-
- /**
- * @param array $dbalMessage
- *
- * @return MongodbMessage
- */
- protected function convertMessage(array $mongodbMessage)
- {
- $properties = JSON::decode($mongodbMessage['properties']);
- $headers = JSON::decode($mongodbMessage['headers']);
-
- $message = $this->context->createMessage($mongodbMessage['body'], $properties, $headers);
- $message->setId((string) $mongodbMessage['_id']);
- $message->setPriority((int) $mongodbMessage['priority']);
- $message->setRedelivered((bool) $mongodbMessage['redelivered']);
- $message->setPublishedAt((int) $mongodbMessage['published_at']);
- return $message;
+ return null;
}
}
diff --git a/MongodbContext.php b/MongodbContext.php
index ce8945e..2e52ebd 100644
--- a/MongodbContext.php
+++ b/MongodbContext.php
@@ -1,13 +1,23 @@
client = $client;
}
- public function createMessage($body = '', array $properties = [], array $headers = [])
+ /**
+ * @return MongodbMessage
+ */
+ public function createMessage(string $body = '', array $properties = [], array $headers = []): Message
{
$message = new MongodbMessage();
$message->setBody($body);
@@ -40,27 +53,41 @@ public function createMessage($body = '', array $properties = [], array $headers
return $message;
}
- public function createTopic($name)
+ /**
+ * @return MongodbDestination
+ */
+ public function createTopic(string $name): Topic
{
return new MongodbDestination($name);
}
- public function createQueue($queueName)
+ /**
+ * @return MongodbDestination
+ */
+ public function createQueue(string $queueName): Queue
{
return new MongodbDestination($queueName);
}
- public function createTemporaryQueue()
+ public function createTemporaryQueue(): Queue
{
- throw new \BadMethodCallException('Mongodb transport does not support temporary queues');
+ throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt();
}
- public function createProducer()
+ /**
+ * @return MongodbProducer
+ */
+ public function createProducer(): Producer
{
return new MongodbProducer($this);
}
- public function createConsumer(PsrDestination $destination)
+ /**
+ * @param MongodbDestination $destination
+ *
+ * @return MongodbConsumer
+ */
+ public function createConsumer(Destination $destination): Consumer
{
InvalidDestinationException::assertDestinationInstanceOf($destination, MongodbDestination::class);
@@ -73,38 +100,67 @@ public function createConsumer(PsrDestination $destination)
return $consumer;
}
- public function close()
+ public function close(): void
{
- // TODO: Implement close() method.
}
- public function getCollection()
+ public function createSubscriptionConsumer(): SubscriptionConsumer
{
- return $this->client
- ->selectDatabase($this->config['dbname'])
- ->selectCollection($this->config['collection_name']);
+ return new MongodbSubscriptionConsumer($this);
}
/**
- * @return Client
+ * @internal It must be used here and in the consumer only
*/
- public function getClient()
+ public function convertMessage(array $mongodbMessage): MongodbMessage
{
- return $this->client;
+ $mongodbMessageObj = $this->createMessage(
+ $mongodbMessage['body'],
+ JSON::decode($mongodbMessage['properties']),
+ JSON::decode($mongodbMessage['headers'])
+ );
+
+ $mongodbMessageObj->setId((string) $mongodbMessage['_id']);
+ $mongodbMessageObj->setPriority((int) $mongodbMessage['priority']);
+ $mongodbMessageObj->setRedelivered((bool) $mongodbMessage['redelivered']);
+ $mongodbMessageObj->setPublishedAt((int) $mongodbMessage['published_at']);
+
+ return $mongodbMessageObj;
}
/**
- * @return array
+ * @param MongodbDestination $queue
*/
- public function getConfig()
+ public function purgeQueue(Queue $queue): void
+ {
+ $this->getCollection()->deleteMany([
+ 'queue' => $queue->getQueueName(),
+ ]);
+ }
+
+ public function getCollection(): Collection
+ {
+ return $this->client
+ ->selectDatabase($this->config['dbname'])
+ ->selectCollection($this->config['collection_name']);
+ }
+
+ public function getClient(): Client
+ {
+ return $this->client;
+ }
+
+ public function getConfig(): array
{
return $this->config;
}
- public function createCollection()
+ public function createCollection(): void
{
$collection = $this->getCollection();
+ $collection->createIndex(['queue' => 1], ['name' => 'enqueue_queue']);
$collection->createIndex(['priority' => -1, 'published_at' => 1], ['name' => 'enqueue_priority']);
$collection->createIndex(['delayed_until' => 1], ['name' => 'enqueue_delayed']);
+ $collection->createIndex(['queue' => 1, 'priority' => -1, 'published_at' => 1, 'delayed_until' => 1], ['name' => 'enqueue_combined']);
}
}
diff --git a/MongodbDestination.php b/MongodbDestination.php
index 360f58a..06653b4 100644
--- a/MongodbDestination.php
+++ b/MongodbDestination.php
@@ -1,46 +1,35 @@
destinationName = $name;
}
- /**
- * {@inheritdoc}
- */
- public function getQueueName()
+ public function getQueueName(): string
{
return $this->destinationName;
}
- /**
- * Alias for getQueueName()
- * {@inheritdoc}
- */
- public function getName()
+ public function getTopicName(): string
{
- return $this->getQueueName();
+ return $this->destinationName;
}
- /**
- * {@inheritdoc}
- */
- public function getTopicName()
+ public function getName(): string
{
return $this->destinationName;
}
diff --git a/MongodbMessage.php b/MongodbMessage.php
index 9a10e5f..fadc5dd 100644
--- a/MongodbMessage.php
+++ b/MongodbMessage.php
@@ -1,10 +1,12 @@
body = $body;
$this->properties = $properties;
@@ -68,248 +65,163 @@ public function __construct($body = '', array $properties = [], array $headers =
$this->redelivered = false;
}
- /**
- * @param string $id
- */
- public function setId($id)
+ public function setId(?string $id = null): void
{
$this->id = $id;
}
- /**
- * @return string $id
- */
- public function getId()
+ public function getId(): ?string
{
return $this->id;
}
- /**
- * @param string $body
- */
- public function setBody($body)
+ public function setBody(string $body): void
{
$this->body = $body;
}
- /**
- * {@inheritdoc}
- */
- public function getBody()
+ public function getBody(): string
{
return $this->body;
}
- /**
- * {@inheritdoc}
- */
- public function setProperties(array $properties)
+ public function setProperties(array $properties): void
{
$this->properties = $properties;
}
- /**
- * {@inheritdoc}
- */
- public function setProperty($name, $value)
+ public function setProperty(string $name, $value): void
{
$this->properties[$name] = $value;
}
- /**
- * {@inheritdoc}
- */
- public function getProperties()
+ public function getProperties(): array
{
return $this->properties;
}
- /**
- * {@inheritdoc}
- */
- public function getProperty($name, $default = null)
+ public function getProperty(string $name, $default = null)
{
return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default;
}
- /**
- * {@inheritdoc}
- */
- public function setHeader($name, $value)
+ public function setHeader(string $name, $value): void
{
$this->headers[$name] = $value;
}
- /**
- * @param array $headers
- */
- public function setHeaders(array $headers)
+ public function setHeaders(array $headers): void
{
$this->headers = $headers;
}
- /**
- * {@inheritdoc}
- */
- public function getHeaders()
+ public function getHeaders(): array
{
return $this->headers;
}
- /**
- * {@inheritdoc}
- */
- public function getHeader($name, $default = null)
+ public function getHeader(string $name, $default = null)
{
return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default;
}
- /**
- * {@inheritdoc}
- */
- public function isRedelivered()
+ public function isRedelivered(): bool
{
return $this->redelivered;
}
- /**
- * {@inheritdoc}
- */
- public function setRedelivered($redelivered)
+ public function setRedelivered(bool $redelivered): void
{
$this->redelivered = $redelivered;
}
- /**
- * {@inheritdoc}
- */
- public function setReplyTo($replyTo)
+ public function setReplyTo(?string $replyTo = null): void
{
$this->setHeader('reply_to', $replyTo);
}
- /**
- * {@inheritdoc}
- */
- public function getReplyTo()
+ public function getReplyTo(): ?string
{
return $this->getHeader('reply_to');
}
- /**
- * @return int
- */
- public function getPriority()
+ public function getPriority(): ?int
{
return $this->priority;
}
- /**
- * @param int $priority
- */
- public function setPriority($priority)
+ public function setPriority(?int $priority = null): void
{
$this->priority = $priority;
}
- /**
- * @return int
- */
- public function getDeliveryDelay()
+ public function getDeliveryDelay(): ?int
{
return $this->deliveryDelay;
}
/**
- * Set delay in milliseconds.
- *
- * @param int $deliveryDelay
+ * In milliseconds.
*/
- public function setDeliveryDelay($deliveryDelay)
+ public function setDeliveryDelay(?int $deliveryDelay = null): void
{
$this->deliveryDelay = $deliveryDelay;
}
- /**
- * @return int|float|null
- */
- public function getTimeToLive()
+ public function getTimeToLive(): ?int
{
return $this->timeToLive;
}
/**
- * Set time to live in milliseconds.
- *
- * @param int|float|null $timeToLive
+ * In milliseconds.
*/
- public function setTimeToLive($timeToLive)
+ public function setTimeToLive(?int $timeToLive = null): void
{
$this->timeToLive = $timeToLive;
}
- /**
- * {@inheritdoc}
- */
- public function setCorrelationId($correlationId)
+ public function setCorrelationId(?string $correlationId = null): void
{
$this->setHeader('correlation_id', $correlationId);
}
- /**
- * {@inheritdoc}
- */
- public function getCorrelationId()
+ public function getCorrelationId(): ?string
{
return $this->getHeader('correlation_id', null);
}
- /**
- * {@inheritdoc}
- */
- public function setMessageId($messageId)
+ public function setMessageId(?string $messageId = null): void
{
$this->setHeader('message_id', $messageId);
}
- /**
- * {@inheritdoc}
- */
- public function getMessageId()
+ public function getMessageId(): ?string
{
return $this->getHeader('message_id', null);
}
- /**
- * {@inheritdoc}
- */
- public function getTimestamp()
+ public function getTimestamp(): ?int
{
$value = $this->getHeader('timestamp');
return null === $value ? null : (int) $value;
}
- /**
- * {@inheritdoc}
- */
- public function setTimestamp($timestamp)
+ public function setTimestamp(?int $timestamp = null): void
{
$this->setHeader('timestamp', $timestamp);
}
- /**
- * @return int
- */
- public function getPublishedAt()
+ public function getPublishedAt(): ?int
{
return $this->publishedAt;
}
/**
- * @param int $publishedAt
+ * In milliseconds.
*/
- public function setPublishedAt($publishedAt)
+ public function setPublishedAt(?int $publishedAt = null): void
{
$this->publishedAt = $publishedAt;
}
diff --git a/MongodbProducer.php b/MongodbProducer.php
index c5132b6..ed28a66 100644
--- a/MongodbProducer.php
+++ b/MongodbProducer.php
@@ -1,15 +1,17 @@
context = $context;
}
/**
- * {@inheritdoc}
- *
* @param MongodbDestination $destination
* @param MongodbMessage $message
- *
- * @throws Exception
*/
- public function send(PsrDestination $destination, PsrMessage $message)
+ public function send(Destination $destination, Message $message): void
{
InvalidDestinationException::assertDestinationInstanceOf($destination, MongodbDestination::class);
InvalidMessageException::assertMessageInstanceOf($message, MongodbMessage::class);
@@ -63,14 +58,6 @@ public function send(PsrDestination $destination, PsrMessage $message)
}
$body = $message->getBody();
- if (is_scalar($body) || null === $body) {
- $body = (string) $body;
- } else {
- throw new InvalidMessageException(sprintf(
- 'The message body must be a scalar or null. Got: %s',
- is_object($body) ? get_class($body) : gettype($body)
- ));
- }
$publishedAt = null !== $message->getPublishedAt() ?
$message->getPublishedAt() :
@@ -90,10 +77,7 @@ public function send(PsrDestination $destination, PsrMessage $message)
$delay = $message->getDeliveryDelay();
if ($delay) {
if (!is_int($delay)) {
- throw new \LogicException(sprintf(
- 'Delay must be integer but got: "%s"',
- is_object($delay) ? get_class($delay) : gettype($delay)
- ));
+ throw new \LogicException(sprintf('Delay must be integer but got: "%s"', is_object($delay) ? $delay::class : gettype($delay)));
}
if ($delay <= 0) {
@@ -106,10 +90,7 @@ public function send(PsrDestination $destination, PsrMessage $message)
$timeToLive = $message->getTimeToLive();
if ($timeToLive) {
if (!is_int($timeToLive)) {
- throw new \LogicException(sprintf(
- 'TimeToLive must be integer but got: "%s"',
- is_object($timeToLive) ? get_class($timeToLive) : gettype($timeToLive)
- ));
+ throw new \LogicException(sprintf('TimeToLive must be integer but got: "%s"', is_object($timeToLive) ? $timeToLive::class : gettype($timeToLive)));
}
if ($timeToLive <= 0) {
@@ -123,58 +104,51 @@ public function send(PsrDestination $destination, PsrMessage $message)
$collection = $this->context->getCollection();
$collection->insertOne($mongoMessage);
} catch (\Exception $e) {
- throw new Exception('The transport has failed to send the message due to some internal error.', null, $e);
+ throw new Exception('The transport has failed to send the message due to some internal error.', $e->getCode(), $e);
}
}
/**
- * {@inheritdoc}
+ * @return self
*/
- public function setDeliveryDelay($deliveryDelay)
+ public function setDeliveryDelay(?int $deliveryDelay = null): Producer
{
$this->deliveryDelay = $deliveryDelay;
return $this;
}
- /**
- * {@inheritdoc}
- */
- public function getDeliveryDelay()
+ public function getDeliveryDelay(): ?int
{
return $this->deliveryDelay;
}
/**
- * {@inheritdoc}
+ * @return self
*/
- public function setPriority($priority)
+ public function setPriority(?int $priority = null): Producer
{
$this->priority = $priority;
return $this;
}
- /**
- * {@inheritdoc}
- */
- public function getPriority()
+ public function getPriority(): ?int
{
return $this->priority;
}
/**
- * {@inheritdoc}
+ * @return self
*/
- public function setTimeToLive($timeToLive)
+ public function setTimeToLive(?int $timeToLive = null): Producer
{
$this->timeToLive = $timeToLive;
+
+ return $this;
}
- /**
- * {@inheritdoc}
- */
- public function getTimeToLive()
+ public function getTimeToLive(): ?int
{
return $this->timeToLive;
}
diff --git a/MongodbSubscriptionConsumer.php b/MongodbSubscriptionConsumer.php
new file mode 100644
index 0000000..9fa6245
--- /dev/null
+++ b/MongodbSubscriptionConsumer.php
@@ -0,0 +1,133 @@
+context = $context;
+ $this->subscribers = [];
+ }
+
+ public function consume(int $timeout = 0): void
+ {
+ if (empty($this->subscribers)) {
+ throw new \LogicException('No subscribers');
+ }
+
+ $timeout = (int) ceil($timeout / 1000);
+ $endAt = time() + $timeout;
+
+ $queueNames = [];
+ foreach (array_keys($this->subscribers) as $queueName) {
+ $queueNames[$queueName] = $queueName;
+ }
+
+ $currentQueueNames = [];
+ while (true) {
+ if (empty($currentQueueNames)) {
+ $currentQueueNames = $queueNames;
+ }
+
+ $result = $this->context->getCollection()->findOneAndDelete(
+ [
+ 'queue' => ['$in' => array_keys($currentQueueNames)],
+ '$or' => [
+ ['delayed_until' => ['$exists' => false]],
+ ['delayed_until' => ['$lte' => time()]],
+ ],
+ ],
+ [
+ 'sort' => ['priority' => -1, 'published_at' => 1],
+ 'typeMap' => ['root' => 'array', 'document' => 'array'],
+ ]
+ );
+
+ if ($result) {
+ list($consumer, $callback) = $this->subscribers[$result['queue']];
+
+ $message = $this->context->convertMessage($result);
+
+ if (false === call_user_func($callback, $message, $consumer)) {
+ return;
+ }
+
+ unset($currentQueueNames[$result['queue']]);
+ } else {
+ $currentQueueNames = [];
+
+ usleep(200000); // 200ms
+ }
+
+ if ($timeout && microtime(true) >= $endAt) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * @param MongodbConsumer $consumer
+ */
+ public function subscribe(Consumer $consumer, callable $callback): void
+ {
+ if (false == $consumer instanceof MongodbConsumer) {
+ throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', MongodbConsumer::class, $consumer::class));
+ }
+
+ $queueName = $consumer->getQueue()->getQueueName();
+ if (array_key_exists($queueName, $this->subscribers)) {
+ if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) {
+ return;
+ }
+
+ throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName));
+ }
+
+ $this->subscribers[$queueName] = [$consumer, $callback];
+ }
+
+ /**
+ * @param MongodbConsumer $consumer
+ */
+ public function unsubscribe(Consumer $consumer): void
+ {
+ if (false == $consumer instanceof MongodbConsumer) {
+ throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', MongodbConsumer::class, $consumer::class));
+ }
+
+ $queueName = $consumer->getQueue()->getQueueName();
+
+ if (false == array_key_exists($queueName, $this->subscribers)) {
+ return;
+ }
+
+ if ($this->subscribers[$queueName][0] !== $consumer) {
+ return;
+ }
+
+ unset($this->subscribers[$queueName]);
+ }
+
+ public function unsubscribeAll(): void
+ {
+ $this->subscribers = [];
+ }
+}
diff --git a/README.md b/README.md
index d29cde0..2e9bbd1 100644
--- a/README.md
+++ b/README.md
@@ -1,27 +1,36 @@
+
Supporting Enqueue
+
+Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you'd like to join them, please consider:
+
+- [Become a sponsor](https://www.patreon.com/makasim)
+- [Become our client](http://forma-pro.com/)
+
+---
+
# Mongodb Transport
[](https://gitter.im/php-enqueue/Lobby)
-[](https://travis-ci.org/php-enqueue/mongodb)
+[](https://github.com/php-enqueue/mongodb/actions?query=workflow%3ACI)
[](https://packagist.org/packages/enqueue/mongodb)
[](https://packagist.org/packages/enqueue/mongodb)
-
-This is an implementation of the queue specification. It allows you to use MongoDB database as a message broker.
+
+This is an implementation of the queue specification. It allows you to use MongoDB database as a message broker.
## Resources
* [Site](https://enqueue.forma-pro.com/)
-* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md)
+* [Documentation](https://php-enqueue.github.io/transport/mongodb/)
* [Questions](https://gitter.im/php-enqueue/Lobby)
* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues)
## Developed by Forma-Pro
-Forma-Pro is a full stack development company which interests also spread to open source development.
-Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience.
+Forma-Pro is a full stack development company which interests also spread to open source development.
+Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience.
Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability.
If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com
## License
-It is released under the [MIT License](LICENSE).
\ No newline at end of file
+It is released under the [MIT License](LICENSE).
diff --git a/Symfony/MongodbTransportFactory.php b/Symfony/MongodbTransportFactory.php
deleted file mode 100644
index 3486d8a..0000000
--- a/Symfony/MongodbTransportFactory.php
+++ /dev/null
@@ -1,119 +0,0 @@
-name = $name;
- }
-
- /**
- * {@inheritdoc}
- */
- public function addConfiguration(ArrayNodeDefinition $builder)
- {
- $builder
- ->beforeNormalization()
- ->ifString()
- ->then(function ($v) {
- return ['dsn' => $v];
- })
- ->end()
- ->children()
- ->scalarNode('dsn')
- ->info('The Mongodb DSN. Other parameters are ignored if set')
- ->isRequired()
- ->end()
- ->scalarNode('dbname')
- ->defaultValue('enqueue')
- ->info('Database name.')
- ->end()
- ->scalarNode('collection_name')
- ->defaultValue('enqueue')
- ->info('Collection')
- ->end()
- ->integerNode('polling_interval')
- ->defaultValue(1000)
- ->min(100)
- ->info('How often query for new messages.')
- ->end()
- ;
- }
-
- /**
- * {@inheritdoc}
- */
- public function createConnectionFactory(ContainerBuilder $container, array $config)
- {
- $factory = new Definition(MongodbConnectionFactory::class, [$config]);
-
- $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
- $container->setDefinition($factoryId, $factory);
-
- return $factoryId;
- }
-
- /**
- * {@inheritdoc}
- */
- public function createContext(ContainerBuilder $container, array $config)
- {
- $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
-
- $context = new Definition(MongodbContext::class);
- $context->setPublic(true);
- $context->setFactory([new Reference($factoryId), 'createContext']);
-
- $contextId = sprintf('enqueue.transport.%s.context', $this->getName());
- $container->setDefinition($contextId, $context);
-
- return $contextId;
- }
-
- /**
- * {@inheritdoc}
- */
- public function createDriver(ContainerBuilder $container, array $config)
- {
- $driver = new Definition(MongodbDriver::class);
- $driver->setPublic(true);
- $driver->setArguments([
- new Reference(sprintf('enqueue.transport.%s.context', $this->getName())),
- new Reference('enqueue.client.config'),
- new Reference('enqueue.client.meta.queue_meta_registry'),
- ]);
-
- $driverId = sprintf('enqueue.client.%s.driver', $this->getName());
- $container->setDefinition($driverId, $driver);
-
- return $driverId;
- }
-
- /**
- * {@inheritdoc}
- */
- public function getName()
- {
- return $this->name;
- }
-}
diff --git a/Tests/Client/MongodbDriverTest.php b/Tests/Client/MongodbDriverTest.php
deleted file mode 100644
index 1d4bb60..0000000
--- a/Tests/Client/MongodbDriverTest.php
+++ /dev/null
@@ -1,351 +0,0 @@
-assertClassImplements(DriverInterface::class, MongodbDriver::class);
- }
-
- public function testCouldBeConstructedWithRequiredArguments()
- {
- new MongodbDriver(
- $this->createPsrContextMock(),
- $this->createDummyConfig(),
- $this->createDummyQueueMetaRegistry()
- );
- }
-
- public function testShouldReturnConfigObject()
- {
- $config = $this->createDummyConfig();
-
- $driver = new MongodbDriver(
- $this->createPsrContextMock(),
- $config,
- $this->createDummyQueueMetaRegistry()
- );
-
- $this->assertSame($config, $driver->getConfig());
- }
-
- public function testShouldCreateAndReturnQueueInstance()
- {
- $expectedQueue = new MongodbDestination('aName');
-
- $context = $this->createPsrContextMock();
- $context
- ->expects($this->once())
- ->method('createQueue')
- ->with('aprefix.afooqueue')
- ->willReturn($expectedQueue)
- ;
-
- $driver = new MongodbDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry());
-
- $queue = $driver->createQueue('aFooQueue');
-
- $this->assertSame($expectedQueue, $queue);
- }
-
- public function testShouldCreateAndReturnQueueInstanceWithHardcodedTransportName()
- {
- $expectedQueue = new MongodbDestination('aName');
-
- $context = $this->createPsrContextMock();
- $context
- ->expects($this->once())
- ->method('createQueue')
- ->with('aBarQueue')
- ->willReturn($expectedQueue)
- ;
-
- $driver = new MongodbDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry());
-
- $queue = $driver->createQueue('aBarQueue');
-
- $this->assertSame($expectedQueue, $queue);
- }
-
- public function testShouldConvertTransportMessageToClientMessage()
- {
- $transportMessage = new MongodbMessage();
- $transportMessage->setBody('body');
- $transportMessage->setHeaders(['hkey' => 'hval']);
- $transportMessage->setProperties(['key' => 'val']);
- $transportMessage->setHeader('content_type', 'ContentType');
- $transportMessage->setMessageId('MessageId');
- $transportMessage->setTimestamp(1000);
- $transportMessage->setPriority(2);
- $transportMessage->setDeliveryDelay(12345);
-
- $driver = new MongodbDriver(
- $this->createPsrContextMock(),
- $this->createDummyConfig(),
- $this->createDummyQueueMetaRegistry()
- );
-
- $clientMessage = $driver->createClientMessage($transportMessage);
-
- $this->assertInstanceOf(Message::class, $clientMessage);
- $this->assertSame('body', $clientMessage->getBody());
- $this->assertSame([
- 'hkey' => 'hval',
- 'content_type' => 'ContentType',
- 'message_id' => 'MessageId',
- 'timestamp' => 1000,
- ], $clientMessage->getHeaders());
- $this->assertSame([
- 'key' => 'val',
- ], $clientMessage->getProperties());
- $this->assertSame('MessageId', $clientMessage->getMessageId());
- $this->assertSame('ContentType', $clientMessage->getContentType());
- $this->assertSame(1000, $clientMessage->getTimestamp());
- $this->assertSame(12345, $clientMessage->getDelay());
-
- $this->assertNull($clientMessage->getExpire());
- $this->assertSame(MessagePriority::NORMAL, $clientMessage->getPriority());
- }
-
- public function testShouldConvertClientMessageToTransportMessage()
- {
- $clientMessage = new Message();
- $clientMessage->setBody('body');
- $clientMessage->setHeaders(['hkey' => 'hval']);
- $clientMessage->setProperties(['key' => 'val']);
- $clientMessage->setContentType('ContentType');
- $clientMessage->setExpire(123);
- $clientMessage->setPriority(MessagePriority::VERY_HIGH);
- $clientMessage->setMessageId('MessageId');
- $clientMessage->setTimestamp(1000);
-
- $context = $this->createPsrContextMock();
- $context
- ->expects($this->once())
- ->method('createMessage')
- ->willReturn(new MongodbMessage())
- ;
-
- $driver = new MongodbDriver(
- $context,
- $this->createDummyConfig(),
- $this->createDummyQueueMetaRegistry()
- );
-
- $transportMessage = $driver->createTransportMessage($clientMessage);
-
- $this->assertInstanceOf(MongodbMessage::class, $transportMessage);
- $this->assertSame('body', $transportMessage->getBody());
- $this->assertSame([
- 'hkey' => 'hval',
- 'content_type' => 'ContentType',
- 'message_id' => 'MessageId',
- 'timestamp' => 1000,
- 'reply_to' => null,
- 'correlation_id' => null,
- ], $transportMessage->getHeaders());
- $this->assertSame([
- 'key' => 'val',
- ], $transportMessage->getProperties());
- $this->assertSame('MessageId', $transportMessage->getMessageId());
- $this->assertSame(1000, $transportMessage->getTimestamp());
- }
-
- public function testShouldSendMessageToRouter()
- {
- $topic = new MongodbDestination('queue-name');
- $transportMessage = new MongodbMessage();
-
- $producer = $this->createPsrProducerMock();
- $producer
- ->expects($this->once())
- ->method('send')
- ->with($this->identicalTo($topic), $this->identicalTo($transportMessage))
- ;
- $context = $this->createPsrContextMock();
- $context
- ->expects($this->once())
- ->method('createQueue')
- ->with('aprefix.default')
- ->willReturn($topic)
- ;
- $context
- ->expects($this->once())
- ->method('createProducer')
- ->willReturn($producer)
- ;
- $context
- ->expects($this->once())
- ->method('createMessage')
- ->willReturn($transportMessage)
- ;
-
- $driver = new MongodbDriver(
- $context,
- $this->createDummyConfig(),
- $this->createDummyQueueMetaRegistry()
- );
-
- $message = new Message();
- $message->setProperty(Config::PARAMETER_TOPIC_NAME, 'topic');
-
- $driver->sendToRouter($message);
- }
-
- public function testShouldThrowExceptionIfTopicParameterIsNotSet()
- {
- $driver = new MongodbDriver(
- $this->createPsrContextMock(),
- $this->createDummyConfig(),
- $this->createDummyQueueMetaRegistry()
- );
-
- $this->expectException(\LogicException::class);
- $this->expectExceptionMessage('Topic name parameter is required but is not set');
-
- $driver->sendToRouter(new Message());
- }
-
- public function testShouldSendMessageToProcessor()
- {
- $queue = new MongodbDestination('queue-name');
- $transportMessage = new MongodbMessage();
-
- $producer = $this->createPsrProducerMock();
- $producer
- ->expects($this->once())
- ->method('send')
- ->with($this->identicalTo($queue), $this->identicalTo($transportMessage))
- ;
- $context = $this->createPsrContextMock();
- $context
- ->expects($this->once())
- ->method('createQueue')
- ->willReturn($queue)
- ;
- $context
- ->expects($this->once())
- ->method('createProducer')
- ->willReturn($producer)
- ;
- $context
- ->expects($this->once())
- ->method('createMessage')
- ->willReturn($transportMessage)
- ;
-
- $driver = new MongodbDriver(
- $context,
- $this->createDummyConfig(),
- $this->createDummyQueueMetaRegistry()
- );
-
- $message = new Message();
- $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor');
- $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'aFooQueue');
-
- $driver->sendToProcessor($message);
- }
-
- public function testShouldThrowExceptionIfProcessorNameParameterIsNotSet()
- {
- $driver = new MongodbDriver(
- $this->createPsrContextMock(),
- $this->createDummyConfig(),
- $this->createDummyQueueMetaRegistry()
- );
-
- $this->expectException(\LogicException::class);
- $this->expectExceptionMessage('Processor name parameter is required but is not set');
-
- $driver->sendToProcessor(new Message());
- }
-
- public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet()
- {
- $driver = new MongodbDriver(
- $this->createPsrContextMock(),
- $this->createDummyConfig(),
- $this->createDummyQueueMetaRegistry()
- );
-
- $this->expectException(\LogicException::class);
- $this->expectExceptionMessage('Queue name parameter is required but is not set');
-
- $message = new Message();
- $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor');
-
- $driver->sendToProcessor($message);
- }
-
- public function testShouldSetupBroker()
- {
- $context = $this->createPsrContextMock();
-
- $context
- ->expects($this->once())
- ->method('createCollection')
- ;
-
- $driver = new MongodbDriver(
- $context,
- $this->createDummyConfig(),
- $this->createDummyQueueMetaRegistry()
- );
-
- $driver->setupBroker();
- }
-
- /**
- * @return \PHPUnit_Framework_MockObject_MockObject|MongodbContext
- */
- private function createPsrContextMock()
- {
- return $this->createMock(MongodbContext::class);
- }
-
- /**
- * @return \PHPUnit_Framework_MockObject_MockObject|PsrProducer
- */
- private function createPsrProducerMock()
- {
- return $this->createMock(PsrProducer::class);
- }
-
- /**
- * @return QueueMetaRegistry
- */
- private function createDummyQueueMetaRegistry()
- {
- $registry = new QueueMetaRegistry($this->createDummyConfig(), []);
- $registry->add('default');
- $registry->add('aFooQueue');
- $registry->add('aBarQueue', 'aBarQueue');
-
- return $registry;
- }
-
- /**
- * @return Config
- */
- private function createDummyConfig()
- {
- return Config::create('aPrefix');
- }
-}
diff --git a/Tests/Functional/MongodbConsumerTest.php b/Tests/Functional/MongodbConsumerTest.php
index 609a22e..b6b644b 100644
--- a/Tests/Functional/MongodbConsumerTest.php
+++ b/Tests/Functional/MongodbConsumerTest.php
@@ -19,12 +19,12 @@ class MongodbConsumerTest extends TestCase
*/
private $context;
- public function setUp()
+ protected function setUp(): void
{
$this->context = $this->buildMongodbContext();
}
- protected function tearDown()
+ protected function tearDown(): void
{
if ($this->context) {
$this->context->close();
diff --git a/Tests/MongodbConnectionFactoryTest.php b/Tests/MongodbConnectionFactoryTest.php
index fa89b34..d5dd9ca 100644
--- a/Tests/MongodbConnectionFactoryTest.php
+++ b/Tests/MongodbConnectionFactoryTest.php
@@ -5,18 +5,21 @@
use Enqueue\Mongodb\MongodbConnectionFactory;
use Enqueue\Mongodb\MongodbContext;
use Enqueue\Test\ClassExtensionTrait;
-use Interop\Queue\PsrConnectionFactory;
+use Enqueue\Test\ReadAttributeTrait;
+use Interop\Queue\ConnectionFactory;
+use PHPUnit\Framework\TestCase;
/**
* @group mongodb
*/
-class MongodbConnectionFactoryTest extends \PHPUnit_Framework_TestCase
+class MongodbConnectionFactoryTest extends TestCase
{
use ClassExtensionTrait;
+ use ReadAttributeTrait;
public function testShouldImplementConnectionFactoryInterface()
{
- $this->assertClassImplements(PsrConnectionFactory::class, MongodbConnectionFactory::class);
+ $this->assertClassImplements(ConnectionFactory::class, MongodbConnectionFactory::class);
}
public function testCouldBeConstructedWithEmptyConfiguration()
@@ -44,6 +47,20 @@ public function testCouldBeConstructedWithCustomConfiguration()
$this->assertAttributeEquals($params, 'config', $factory);
}
+ public function testCouldBeConstructedWithCustomConfigurationFromDsn()
+ {
+ $params = [
+ 'dsn' => 'mongodb://127.0.0.3/test-db-name?enqueue_collection=collection-name&polling_interval=3000',
+ 'dbname' => 'test-db-name',
+ 'collection_name' => 'collection-name',
+ 'polling_interval' => 3000,
+ ];
+
+ $factory = new MongodbConnectionFactory($params['dsn']);
+
+ $this->assertAttributeEquals($params, 'config', $factory);
+ }
+
public function testShouldCreateContext()
{
$factory = new MongodbConnectionFactory();
diff --git a/Tests/MongodbConsumerTest.php b/Tests/MongodbConsumerTest.php
index 0c2e5c6..6cd5975 100644
--- a/Tests/MongodbConsumerTest.php
+++ b/Tests/MongodbConsumerTest.php
@@ -8,25 +8,21 @@
use Enqueue\Mongodb\MongodbMessage;
use Enqueue\Mongodb\MongodbProducer;
use Enqueue\Test\ClassExtensionTrait;
-use Interop\Queue\InvalidMessageException;
-use Interop\Queue\PsrConsumer;
-use Interop\Queue\PsrMessage;
+use Interop\Queue\Consumer;
+use Interop\Queue\Exception\InvalidMessageException;
+use Interop\Queue\Message;
+use PHPUnit\Framework\TestCase;
/**
* @group mongodb
*/
-class MongodbConsumerTest extends \PHPUnit_Framework_TestCase
+class MongodbConsumerTest extends TestCase
{
use ClassExtensionTrait;
public function testShouldImplementConsumerInterface()
{
- $this->assertClassImplements(PsrConsumer::class, MongodbConsumer::class);
- }
-
- public function testCouldBeConstructedWithRequiredArguments()
- {
- new MongodbConsumer($this->createContextMock(), new MongodbDestination('queue'));
+ $this->assertClassImplements(Consumer::class, MongodbConsumer::class);
}
public function testShouldReturnInstanceOfDestination()
@@ -38,6 +34,9 @@ public function testShouldReturnInstanceOfDestination()
$this->assertSame($destination, $consumer->getQueue());
}
+ /**
+ * @doesNotPerformAssertions
+ */
public function testCouldCallAcknowledgedMethod()
{
$consumer = new MongodbConsumer($this->createContextMock(), new MongodbDestination('queue'));
@@ -103,7 +102,7 @@ public function testRejectShouldReSendMessageToSameQueueOnRequeue()
$context
->expects($this->once())
->method('createProducer')
- ->will($this->returnValue($producerMock))
+ ->willReturn($producerMock)
;
$consumer = new MongodbConsumer($context, $queue);
@@ -112,7 +111,7 @@ public function testRejectShouldReSendMessageToSameQueueOnRequeue()
}
/**
- * @return MongodbProducer|\PHPUnit_Framework_MockObject_MockObject
+ * @return MongodbProducer|\PHPUnit\Framework\MockObject\MockObject
*/
private function createProducerMock()
{
@@ -120,7 +119,7 @@ private function createProducerMock()
}
/**
- * @return \PHPUnit_Framework_MockObject_MockObject|MongodbContext
+ * @return \PHPUnit\Framework\MockObject\MockObject|MongodbContext
*/
private function createContextMock()
{
@@ -128,85 +127,93 @@ private function createContextMock()
}
}
-class InvalidMessage implements PsrMessage
+class InvalidMessage implements Message
{
- public function getBody()
+ public function getBody(): string
{
+ throw new \BadMethodCallException('This should not be called directly');
}
- public function setBody($body)
+ public function setBody(string $body): void
{
}
- public function setProperties(array $properties)
+ public function setProperties(array $properties): void
{
}
- public function getProperties()
+ public function getProperties(): array
{
+ throw new \BadMethodCallException('This should not be called directly');
}
- public function setProperty($name, $value)
+ public function setProperty(string $name, $value): void
{
}
- public function getProperty($name, $default = null)
+ public function getProperty(string $name, $default = null)
{
}
- public function setHeaders(array $headers)
+ public function setHeaders(array $headers): void
{
}
- public function getHeaders()
+ public function getHeaders(): array
{
+ throw new \BadMethodCallException('This should not be called directly');
}
- public function setHeader($name, $value)
+ public function setHeader(string $name, $value): void
{
}
- public function getHeader($name, $default = null)
+ public function getHeader(string $name, $default = null)
{
}
- public function setRedelivered($redelivered)
+ public function setRedelivered(bool $redelivered): void
{
}
- public function isRedelivered()
+ public function isRedelivered(): bool
{
+ throw new \BadMethodCallException('This should not be called directly');
}
- public function setCorrelationId($correlationId)
+ public function setCorrelationId(?string $correlationId = null): void
{
}
- public function getCorrelationId()
+ public function getCorrelationId(): ?string
{
+ throw new \BadMethodCallException('This should not be called directly');
}
- public function setMessageId($messageId)
+ public function setMessageId(?string $messageId = null): void
{
}
- public function getMessageId()
+ public function getMessageId(): ?string
{
+ throw new \BadMethodCallException('This should not be called directly');
}
- public function getTimestamp()
+ public function getTimestamp(): ?int
{
+ throw new \BadMethodCallException('This should not be called directly');
}
- public function setTimestamp($timestamp)
+ public function setTimestamp(?int $timestamp = null): void
{
}
- public function setReplyTo($replyTo)
+ public function setReplyTo(?string $replyTo = null): void
{
}
- public function getReplyTo()
+ public function getReplyTo(): ?string
{
+ throw new \BadMethodCallException('This should not be called directly');
}
}
diff --git a/Tests/MongodbContextTest.php b/Tests/MongodbContextTest.php
index 0c06397..8cdef79 100644
--- a/Tests/MongodbContextTest.php
+++ b/Tests/MongodbContextTest.php
@@ -8,26 +8,25 @@
use Enqueue\Mongodb\MongodbMessage;
use Enqueue\Mongodb\MongodbProducer;
use Enqueue\Test\ClassExtensionTrait;
-use Interop\Queue\InvalidDestinationException;
-use Interop\Queue\PsrContext;
-use Interop\Queue\PsrDestination;
+use Enqueue\Test\ReadAttributeTrait;
+use Interop\Queue\Context;
+use Interop\Queue\Destination;
+use Interop\Queue\Exception\InvalidDestinationException;
+use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
use MongoDB\Client;
+use PHPUnit\Framework\TestCase;
/**
* @group mongodb
*/
-class MongodbContextTest extends \PHPUnit_Framework_TestCase
+class MongodbContextTest extends TestCase
{
use ClassExtensionTrait;
+ use ReadAttributeTrait;
public function testShouldImplementContextInterface()
{
- $this->assertClassImplements(PsrContext::class, MongodbContext::class);
- }
-
- public function testCouldBeConstructedWithRequiredArguments()
- {
- new MongodbContext($this->createClientMock());
+ $this->assertClassImplements(Context::class, MongodbContext::class);
}
public function testCouldBeConstructedWithEmptyConfiguration()
@@ -69,6 +68,32 @@ public function testShouldCreateMessage()
$this->assertFalse($message->isRedelivered());
}
+ public function testShouldConvertFromArrayToMongodbMessage()
+ {
+ $arrayData = [
+ '_id' => 'stringId',
+ 'body' => 'theBody',
+ 'properties' => json_encode(['barProp' => 'barPropVal']),
+ 'headers' => json_encode(['fooHeader' => 'fooHeaderVal']),
+ 'priority' => '12',
+ 'published_at' => 1525935820,
+ 'redelivered' => false,
+ ];
+
+ $context = new MongodbContext($this->createClientMock());
+ $message = $context->convertMessage($arrayData);
+
+ $this->assertInstanceOf(MongodbMessage::class, $message);
+
+ $this->assertEquals('stringId', $message->getId());
+ $this->assertEquals('theBody', $message->getBody());
+ $this->assertEquals(['barProp' => 'barPropVal'], $message->getProperties());
+ $this->assertEquals(['fooHeader' => 'fooHeaderVal'], $message->getHeaders());
+ $this->assertEquals(12, $message->getPriority());
+ $this->assertEquals(1525935820, $message->getPublishedAt());
+ $this->assertFalse($message->isRedelivered());
+ }
+
public function testShouldCreateTopic()
{
$context = new MongodbContext($this->createClientMock());
@@ -145,18 +170,17 @@ public function testShouldReturnConfig()
], $context->getConfig());
}
- public function testShouldThrowBadMethodCallExceptionOncreateTemporaryQueueCall()
+ public function testShouldThrowNotSupportedOnCreateTemporaryQueueCall()
{
$context = new MongodbContext($this->createClientMock());
- $this->expectException(\BadMethodCallException::class);
- $this->expectExceptionMessage('Mongodb transport does not support temporary queues');
+ $this->expectException(TemporaryQueueNotSupportedException::class);
$context->createTemporaryQueue();
}
/**
- * @return \PHPUnit_Framework_MockObject_MockObject|Client
+ * @return \PHPUnit\Framework\MockObject\MockObject|Client
*/
private function createClientMock()
{
@@ -164,6 +188,6 @@ private function createClientMock()
}
}
-class NotSupportedDestination2 implements PsrDestination
+class NotSupportedDestination2 implements Destination
{
}
diff --git a/Tests/MongodbDestinationTest.php b/Tests/MongodbDestinationTest.php
index ef81e8f..4e94ef0 100644
--- a/Tests/MongodbDestinationTest.php
+++ b/Tests/MongodbDestinationTest.php
@@ -1,33 +1,34 @@
assertClassImplements(PsrDestination::class, MongodbDestination::class);
+ $this->assertClassImplements(Destination::class, MongodbDestination::class);
}
public function testShouldImplementTopicInterface()
{
- $this->assertClassImplements(PsrTopic::class, MongodbDestination::class);
+ $this->assertClassImplements(Topic::class, MongodbDestination::class);
}
public function testShouldImplementQueueInterface()
{
- $this->assertClassImplements(PsrQueue::class, MongodbDestination::class);
+ $this->assertClassImplements(Queue::class, MongodbDestination::class);
}
public function testShouldReturnTopicAndQueuePreviouslySetInConstructor()
diff --git a/Tests/MongodbMessageTest.php b/Tests/MongodbMessageTest.php
index b9e5e75..391d8f7 100644
--- a/Tests/MongodbMessageTest.php
+++ b/Tests/MongodbMessageTest.php
@@ -4,11 +4,12 @@
use Enqueue\Mongodb\MongodbMessage;
use Enqueue\Test\ClassExtensionTrait;
+use PHPUnit\Framework\TestCase;
/**
* @group mongodb
*/
-class MongodbMessageTest extends \PHPUnit_Framework_TestCase
+class MongodbMessageTest extends TestCase
{
use ClassExtensionTrait;
diff --git a/Tests/MongodbProducerTest.php b/Tests/MongodbProducerTest.php
index ca59d55..6987b1a 100644
--- a/Tests/MongodbProducerTest.php
+++ b/Tests/MongodbProducerTest.php
@@ -3,42 +3,24 @@
namespace Enqueue\Mongodb\Tests;
use Enqueue\Mongodb\MongodbContext;
-use Enqueue\Mongodb\MongodbDestination;
use Enqueue\Mongodb\MongodbMessage;
use Enqueue\Mongodb\MongodbProducer;
use Enqueue\Test\ClassExtensionTrait;
-use Interop\Queue\InvalidDestinationException;
-use Interop\Queue\InvalidMessageException;
-use Interop\Queue\PsrDestination;
-use Interop\Queue\PsrProducer;
+use Interop\Queue\Destination;
+use Interop\Queue\Exception\InvalidDestinationException;
+use Interop\Queue\Producer;
+use PHPUnit\Framework\TestCase;
/**
* @group mongodb
*/
-class MongodbProducerTest extends \PHPUnit_Framework_TestCase
+class MongodbProducerTest extends TestCase
{
use ClassExtensionTrait;
public function testShouldImplementProducerInterface()
{
- $this->assertClassImplements(PsrProducer::class, MongodbProducer::class);
- }
-
- public function testCouldBeConstructedWithRequiredArguments()
- {
- new MongodbProducer($this->createContextMock());
- }
-
- public function testShouldThrowIfBodyOfInvalidType()
- {
- $this->expectException(InvalidMessageException::class);
- $this->expectExceptionMessage('The message body must be a scalar or null. Got: stdClass');
-
- $producer = new MongodbProducer($this->createContextMock());
-
- $message = new MongodbMessage(new \stdClass());
-
- $producer->send(new MongodbDestination(''), $message);
+ $this->assertClassImplements(Producer::class, MongodbProducer::class);
}
public function testShouldThrowIfDestinationOfInvalidType()
@@ -56,7 +38,7 @@ public function testShouldThrowIfDestinationOfInvalidType()
}
/**
- * @return \PHPUnit_Framework_MockObject_MockObject|MongodbContext
+ * @return \PHPUnit\Framework\MockObject\MockObject|MongodbContext
*/
private function createContextMock()
{
@@ -64,6 +46,6 @@ private function createContextMock()
}
}
-class NotSupportedDestination1 implements PsrDestination
+class NotSupportedDestination1 implements Destination
{
}
diff --git a/Tests/MongodbSubscriptionConsumerTest.php b/Tests/MongodbSubscriptionConsumerTest.php
new file mode 100644
index 0000000..d982e04
--- /dev/null
+++ b/Tests/MongodbSubscriptionConsumerTest.php
@@ -0,0 +1,178 @@
+assertTrue($rc->implementsInterface(SubscriptionConsumer::class));
+ }
+
+ public function testShouldAddConsumerAndCallbackToSubscribersPropertyOnSubscribe()
+ {
+ $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
+
+ $fooCallback = function () {};
+ $fooConsumer = $this->createConsumerStub('foo_queue');
+
+ $barCallback = function () {};
+ $barConsumer = $this->createConsumerStub('bar_queue');
+
+ $subscriptionConsumer->subscribe($fooConsumer, $fooCallback);
+ $subscriptionConsumer->subscribe($barConsumer, $barCallback);
+
+ $this->assertAttributeSame([
+ 'foo_queue' => [$fooConsumer, $fooCallback],
+ 'bar_queue' => [$barConsumer, $barCallback],
+ ], 'subscribers', $subscriptionConsumer);
+ }
+
+ public function testThrowsIfTrySubscribeAnotherConsumerToAlreadySubscribedQueue()
+ {
+ $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
+
+ $fooCallback = function () {};
+ $fooConsumer = $this->createConsumerStub('foo_queue');
+
+ $barCallback = function () {};
+ $barConsumer = $this->createConsumerStub('foo_queue');
+
+ $subscriptionConsumer->subscribe($fooConsumer, $fooCallback);
+
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('There is a consumer subscribed to queue: "foo_queue"');
+ $subscriptionConsumer->subscribe($barConsumer, $barCallback);
+ }
+
+ /**
+ * @doesNotPerformAssertions
+ */
+ public function testShouldAllowSubscribeSameConsumerAndCallbackSecondTime()
+ {
+ $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
+
+ $fooCallback = function () {};
+ $fooConsumer = $this->createConsumerStub('foo_queue');
+
+ $subscriptionConsumer->subscribe($fooConsumer, $fooCallback);
+ $subscriptionConsumer->subscribe($fooConsumer, $fooCallback);
+ }
+
+ public function testShouldRemoveSubscribedConsumerOnUnsubscribeCall()
+ {
+ $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
+
+ $fooConsumer = $this->createConsumerStub('foo_queue');
+ $barConsumer = $this->createConsumerStub('bar_queue');
+
+ $subscriptionConsumer->subscribe($fooConsumer, function () {});
+ $subscriptionConsumer->subscribe($barConsumer, function () {});
+
+ // guard
+ $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer);
+
+ $subscriptionConsumer->unsubscribe($fooConsumer);
+
+ $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
+ }
+
+ public function testShouldDoNothingIfTryUnsubscribeNotSubscribedQueueName()
+ {
+ $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
+
+ $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {});
+
+ // guard
+ $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
+
+ $subscriptionConsumer->unsubscribe($this->createConsumerStub('bar_queue'));
+
+ $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
+ }
+
+ public function testShouldDoNothingIfTryUnsubscribeNotSubscribedConsumer()
+ {
+ $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
+
+ $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {});
+
+ // guard
+ $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
+
+ $subscriptionConsumer->unsubscribe($this->createConsumerStub('foo_queue'));
+
+ $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
+ }
+
+ public function testShouldRemoveAllSubscriberOnUnsubscribeAllCall()
+ {
+ $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
+
+ $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {});
+ $subscriptionConsumer->subscribe($this->createConsumerStub('bar_queue'), function () {});
+
+ // guard
+ $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer);
+
+ $subscriptionConsumer->unsubscribeAll();
+
+ $this->assertAttributeCount(0, 'subscribers', $subscriptionConsumer);
+ }
+
+ public function testThrowsIfTryConsumeWithoutSubscribers()
+ {
+ $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('No subscribers');
+
+ $subscriptionConsumer->consume();
+ }
+
+ /**
+ * @return MongodbContext|\PHPUnit\Framework\MockObject\MockObject
+ */
+ private function createMongodbContextMock()
+ {
+ return $this->createMock(MongodbContext::class);
+ }
+
+ /**
+ * @param mixed|null $queueName
+ *
+ * @return Consumer|\PHPUnit\Framework\MockObject\MockObject
+ */
+ private function createConsumerStub($queueName = null)
+ {
+ $queueMock = $this->createMock(Queue::class);
+ $queueMock
+ ->expects($this->any())
+ ->method('getQueueName')
+ ->willReturn($queueName);
+
+ $consumerMock = $this->createMock(MongodbConsumer::class);
+ $consumerMock
+ ->expects($this->any())
+ ->method('getQueue')
+ ->willReturn($queueMock)
+ ;
+
+ return $consumerMock;
+ }
+}
diff --git a/Tests/Spec/MongodbConnectionFactoryTest.php b/Tests/Spec/MongodbConnectionFactoryTest.php
index 324fe52..9f0d195 100644
--- a/Tests/Spec/MongodbConnectionFactoryTest.php
+++ b/Tests/Spec/MongodbConnectionFactoryTest.php
@@ -3,16 +3,13 @@
namespace Enqueue\Mongodb\Tests\Spec;
use Enqueue\Mongodb\MongodbConnectionFactory;
-use Interop\Queue\Spec\PsrConnectionFactorySpec;
+use Interop\Queue\Spec\ConnectionFactorySpec;
/**
* @group mongodb
*/
-class MongodbConnectionFactoryTest extends PsrConnectionFactorySpec
+class MongodbConnectionFactoryTest extends ConnectionFactorySpec
{
- /**
- * {@inheritdoc}
- */
protected function createConnectionFactory()
{
return new MongodbConnectionFactory();
diff --git a/Tests/Spec/MongodbContextTest.php b/Tests/Spec/MongodbContextTest.php
index 51d4c4b..dfd5de3 100644
--- a/Tests/Spec/MongodbContextTest.php
+++ b/Tests/Spec/MongodbContextTest.php
@@ -3,19 +3,16 @@
namespace Enqueue\Mongodb\Tests\Spec;
use Enqueue\Test\MongodbExtensionTrait;
-use Interop\Queue\Spec\PsrContextSpec;
+use Interop\Queue\Spec\ContextSpec;
/**
* @group functional
* @group mongodb
*/
-class MongodbContextTest extends PsrContextSpec
+class MongodbContextTest extends ContextSpec
{
use MongodbExtensionTrait;
- /**
- * {@inheritdoc}
- */
protected function createContext()
{
return $this->buildMongodbContext();
diff --git a/Tests/Spec/MongodbMessageTest.php b/Tests/Spec/MongodbMessageTest.php
index 51ab55a..92983d4 100644
--- a/Tests/Spec/MongodbMessageTest.php
+++ b/Tests/Spec/MongodbMessageTest.php
@@ -3,16 +3,13 @@
namespace Enqueue\Mongodb\Tests\Spec;
use Enqueue\Mongodb\MongodbMessage;
-use Interop\Queue\Spec\PsrMessageSpec;
+use Interop\Queue\Spec\MessageSpec;
/**
* @group mongodb
*/
-class MongodbMessageTest extends PsrMessageSpec
+class MongodbMessageTest extends MessageSpec
{
- /**
- * {@inheritdoc}
- */
protected function createMessage()
{
return new MongodbMessage();
diff --git a/Tests/Spec/MongodbProducerTest.php b/Tests/Spec/MongodbProducerTest.php
index 54eb096..68b6007 100644
--- a/Tests/Spec/MongodbProducerTest.php
+++ b/Tests/Spec/MongodbProducerTest.php
@@ -3,19 +3,16 @@
namespace Enqueue\Mongodb\Tests\Spec;
use Enqueue\Test\MongodbExtensionTrait;
-use Interop\Queue\Spec\PsrProducerSpec;
+use Interop\Queue\Spec\ProducerSpec;
/**
* @group functional
* @group mongodb
*/
-class MongodbProducerTest extends PsrProducerSpec
+class MongodbProducerTest extends ProducerSpec
{
use MongodbExtensionTrait;
- /**
- * {@inheritdoc}
- */
protected function createProducer()
{
return $this->buildMongodbContext()->createProducer();
diff --git a/Tests/Spec/MongodbQueueTest.php b/Tests/Spec/MongodbQueueTest.php
index a555461..25e437b 100644
--- a/Tests/Spec/MongodbQueueTest.php
+++ b/Tests/Spec/MongodbQueueTest.php
@@ -3,16 +3,13 @@
namespace Enqueue\Mongodb\Tests\Spec;
use Enqueue\Mongodb\MongodbDestination;
-use Interop\Queue\Spec\PsrQueueSpec;
+use Interop\Queue\Spec\QueueSpec;
/**
* @group mongodb
*/
-class MongodbQueueTest extends PsrQueueSpec
+class MongodbQueueTest extends QueueSpec
{
- /**
- * {@inheritdoc}
- */
protected function createQueue()
{
return new MongodbDestination(self::EXPECTED_QUEUE_NAME);
diff --git a/Tests/Spec/MongodbRequeueMessageTest.php b/Tests/Spec/MongodbRequeueMessageTest.php
index 454d357..8a90724 100644
--- a/Tests/Spec/MongodbRequeueMessageTest.php
+++ b/Tests/Spec/MongodbRequeueMessageTest.php
@@ -13,9 +13,6 @@ class MongodbRequeueMessageTest extends RequeueMessageSpec
{
use MongodbExtensionTrait;
- /**
- * {@inheritdoc}
- */
protected function createContext()
{
return $this->buildMongodbContext();
diff --git a/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php b/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php
index a5eb351..f54513f 100644
--- a/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php
+++ b/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php
@@ -13,9 +13,6 @@ class MongodbSendAndReceiveDelayedMessageFromQueueTest extends SendAndReceiveDel
{
use MongodbExtensionTrait;
- /**
- * {@inheritdoc}
- */
protected function createContext()
{
return $this->buildMongodbContext();
diff --git a/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php b/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php
index 5400820..6aadef7 100644
--- a/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php
+++ b/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php
@@ -5,7 +5,7 @@
use Enqueue\Mongodb\MongodbContext;
use Enqueue\Mongodb\MongodbMessage;
use Enqueue\Test\MongodbExtensionTrait;
-use Interop\Queue\PsrContext;
+use Interop\Queue\Context;
use Interop\Queue\Spec\SendAndReceivePriorityMessagesFromQueueSpec;
/**
@@ -18,7 +18,7 @@ class MongodbSendAndReceivePriorityMessagesFromQueueTest extends SendAndReceiveP
private $publishedAt;
- public function setUp()
+ protected function setUp(): void
{
parent::setUp();
@@ -26,7 +26,7 @@ public function setUp()
}
/**
- * @return PsrContext
+ * @return Context
*/
protected function createContext()
{
@@ -34,13 +34,11 @@ protected function createContext()
}
/**
- * {@inheritdoc}
- *
* @param MongodbContext $context
*
* @return MongodbMessage
*/
- protected function createMessage(PsrContext $context, $body)
+ protected function createMessage(Context $context, $body)
{
/** @var MongodbMessage $message */
$message = parent::createMessage($context, $body);
diff --git a/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php b/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php
index d87ac10..f16e80b 100644
--- a/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php
+++ b/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php
@@ -13,9 +13,6 @@ class MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest extends SendAndReceiv
{
use MongodbExtensionTrait;
- /**
- * {@inheritdoc}
- */
protected function createContext()
{
return $this->buildMongodbContext();
diff --git a/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php b/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php
index 992c062..c9b9cb2 100644
--- a/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php
+++ b/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php
@@ -13,9 +13,6 @@ class MongodbSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec
{
use MongodbExtensionTrait;
- /**
- * {@inheritdoc}
- */
protected function createContext()
{
return $this->buildMongodbContext();
diff --git a/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php b/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php
index c539386..a416d3c 100644
--- a/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php
+++ b/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php
@@ -13,9 +13,6 @@ class MongodbSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec
{
use MongodbExtensionTrait;
- /**
- * {@inheritdoc}
- */
protected function createContext()
{
return $this->buildMongodbContext();
diff --git a/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php b/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php
index ea4febc..43ae34c 100644
--- a/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php
+++ b/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php
@@ -13,9 +13,6 @@ class MongodbSendToAndReceiveNoWaitFromQueueTest extends SendToAndReceiveNoWaitF
{
use MongodbExtensionTrait;
- /**
- * {@inheritdoc}
- */
protected function createContext()
{
return $this->buildMongodbContext();
diff --git a/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php b/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php
index 1e1be32..0fe9f0e 100644
--- a/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php
+++ b/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php
@@ -13,9 +13,6 @@ class MongodbSendToAndReceiveNoWaitFromTopicTest extends SendToAndReceiveNoWaitF
{
use MongodbExtensionTrait;
- /**
- * {@inheritdoc}
- */
protected function createContext()
{
return $this->buildMongodbContext();
diff --git a/Tests/Spec/MongodbSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php b/Tests/Spec/MongodbSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php
new file mode 100644
index 0000000..2fe16e8
--- /dev/null
+++ b/Tests/Spec/MongodbSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php
@@ -0,0 +1,40 @@
+buildMongodbContext();
+ }
+
+ /**
+ * @param MongodbContext $context
+ */
+ protected function createQueue(Context $context, $queueName)
+ {
+ /** @var MongodbDestination $queue */
+ $queue = parent::createQueue($context, $queueName);
+ $context->purgeQueue($queue);
+
+ return $queue;
+ }
+}
diff --git a/Tests/Spec/MongodbSubscriptionConsumerConsumeUntilUnsubscribedTest.php b/Tests/Spec/MongodbSubscriptionConsumerConsumeUntilUnsubscribedTest.php
new file mode 100644
index 0000000..b18e0bf
--- /dev/null
+++ b/Tests/Spec/MongodbSubscriptionConsumerConsumeUntilUnsubscribedTest.php
@@ -0,0 +1,40 @@
+buildMongodbContext();
+ }
+
+ /**
+ * @param MongodbContext $context
+ */
+ protected function createQueue(Context $context, $queueName)
+ {
+ /** @var MongodbDestination $queue */
+ $queue = parent::createQueue($context, $queueName);
+ $context->purgeQueue($queue);
+
+ return $queue;
+ }
+}
diff --git a/Tests/Spec/MongodbSubscriptionConsumerStopOnFalseTest.php b/Tests/Spec/MongodbSubscriptionConsumerStopOnFalseTest.php
new file mode 100644
index 0000000..3acfa94
--- /dev/null
+++ b/Tests/Spec/MongodbSubscriptionConsumerStopOnFalseTest.php
@@ -0,0 +1,40 @@
+buildMongodbContext();
+ }
+
+ /**
+ * @param MongodbContext $context
+ */
+ protected function createQueue(Context $context, $queueName)
+ {
+ /** @var MongodbDestination $queue */
+ $queue = parent::createQueue($context, $queueName);
+ $context->getClient()->dropDatabase($queueName);
+
+ return $queue;
+ }
+}
diff --git a/Tests/Spec/MongodbTopicTest.php b/Tests/Spec/MongodbTopicTest.php
index 14a79f5..ab5c025 100644
--- a/Tests/Spec/MongodbTopicTest.php
+++ b/Tests/Spec/MongodbTopicTest.php
@@ -3,16 +3,13 @@
namespace Enqueue\Mongodb\Tests\Spec;
use Enqueue\Mongodb\MongodbDestination;
-use Interop\Queue\Spec\PsrTopicSpec;
+use Interop\Queue\Spec\TopicSpec;
/**
* @group mongodb
*/
-class MongodbTopicTest extends PsrTopicSpec
+class MongodbTopicTest extends TopicSpec
{
- /**
- * {@inheritdoc}
- */
protected function createTopic()
{
return new MongodbDestination(self::EXPECTED_TOPIC_NAME);
diff --git a/Tests/Symfony/MongodbTransportFactoryTest.php b/Tests/Symfony/MongodbTransportFactoryTest.php
deleted file mode 100644
index 3621aca..0000000
--- a/Tests/Symfony/MongodbTransportFactoryTest.php
+++ /dev/null
@@ -1,164 +0,0 @@
-assertClassImplements(TransportFactoryInterface::class, MongodbTransportFactory::class);
- }
-
- public function testCouldBeConstructedWithDefaultName()
- {
- $transport = new MongodbTransportFactory();
-
- $this->assertEquals('mongodb', $transport->getName());
- }
-
- public function testCouldBeConstructedWithCustomName()
- {
- $transport = new MongodbTransportFactory('theCustomName');
-
- $this->assertEquals('theCustomName', $transport->getName());
- }
-
- public function testShouldAllowAddConfiguration()
- {
- $transport = new MongodbTransportFactory();
- $tb = new TreeBuilder();
- $rootNode = $tb->root('foo');
-
- $transport->addConfiguration($rootNode);
- $processor = new Processor();
- $config = $processor->process($tb->buildTree(), [[
- 'dsn' => 'mongodb://127.0.0.1/',
- ]]);
-
- $this->assertEquals([
- 'dsn' => 'mongodb://127.0.0.1/',
- 'dbname' => 'enqueue',
- 'collection_name' => 'enqueue',
- 'polling_interval' => 1000,
- ], $config);
- }
-
- public function testShouldAllowAddConfigurationAsString()
- {
- $transport = new MongodbTransportFactory();
- $tb = new TreeBuilder();
- $rootNode = $tb->root('foo');
-
- $transport->addConfiguration($rootNode);
- $processor = new Processor();
- $config = $processor->process($tb->buildTree(), ['mysqlDSN']);
-
- $this->assertEquals([
- 'dsn' => 'mysqlDSN',
- 'dbname' => 'enqueue',
- 'collection_name' => 'enqueue',
- 'polling_interval' => 1000,
- ], $config);
- }
-
- public function testShouldCreateMongodbConnectionFactory()
- {
- $container = new ContainerBuilder();
-
- $transport = new MongodbTransportFactory();
-
- $serviceId = $transport->createConnectionFactory($container, [
- 'dsn' => 'mysqlDSN',
- 'dbname' => 'enqueue',
- 'collection_name' => 'enqueue',
- 'polling_interval' => 1000,
- ]);
-
- $this->assertTrue($container->hasDefinition($serviceId));
- $factory = $container->getDefinition($serviceId);
- $this->assertEquals(MongodbConnectionFactory::class, $factory->getClass());
-
- $this->assertSame([
- 'dsn' => 'mysqlDSN',
- 'dbname' => 'enqueue',
- 'collection_name' => 'enqueue',
- 'polling_interval' => 1000,
- ], $factory->getArgument(0));
- }
-
- public function testShouldCreateConnectionFactoryFromDsnString()
- {
- $container = new ContainerBuilder();
-
- $transport = new MongodbTransportFactory();
-
- $serviceId = $transport->createConnectionFactory($container, [
- 'dsn' => 'theDSN',
- 'connection' => [],
- 'lazy' => true,
- 'table_name' => 'enqueue',
- 'polling_interval' => 1000,
- ]);
-
- $this->assertTrue($container->hasDefinition($serviceId));
- $factory = $container->getDefinition($serviceId);
- $this->assertEquals(MongodbConnectionFactory::class, $factory->getClass());
- $this->assertSame('theDSN', $factory->getArgument(0)['dsn']);
- }
-
- public function testShouldCreateContext()
- {
- $container = new ContainerBuilder();
-
- $transport = new MongodbTransportFactory();
-
- $serviceId = $transport->createContext($container, []);
-
- $this->assertEquals('enqueue.transport.mongodb.context', $serviceId);
- $this->assertTrue($container->hasDefinition($serviceId));
-
- $context = $container->getDefinition('enqueue.transport.mongodb.context');
- $this->assertInstanceOf(Reference::class, $context->getFactory()[0]);
- $this->assertEquals('enqueue.transport.mongodb.connection_factory', (string) $context->getFactory()[0]);
- $this->assertEquals('createContext', $context->getFactory()[1]);
- }
-
- public function testShouldCreateDriver()
- {
- $container = new ContainerBuilder();
-
- $transport = new MongodbTransportFactory();
-
- $serviceId = $transport->createDriver($container, []);
-
- $this->assertEquals('enqueue.client.mongodb.driver', $serviceId);
- $this->assertTrue($container->hasDefinition($serviceId));
-
- $driver = $container->getDefinition($serviceId);
- $this->assertSame(MongodbDriver::class, $driver->getClass());
-
- $this->assertInstanceOf(Reference::class, $driver->getArgument(0));
- $this->assertEquals('enqueue.transport.mongodb.context', (string) $driver->getArgument(0));
-
- $this->assertInstanceOf(Reference::class, $driver->getArgument(1));
- $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1));
-
- $this->assertInstanceOf(Reference::class, $driver->getArgument(2));
- $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2));
- }
-}
diff --git a/composer.json b/composer.json
index 9642adb..f64d53e 100644
--- a/composer.json
+++ b/composer.json
@@ -10,19 +10,16 @@
"homepage": "https://enqueue.forma-pro.com/",
"license": "MIT",
"require": {
- "php": "^7.0",
- "queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1",
+ "php": "^8.1",
+ "queue-interop/queue-interop": "^0.8",
"mongodb/mongodb": "^1.2",
- "ext-mongodb": "^1.3"
+ "ext-mongodb": "^1.5"
},
"require-dev": {
- "phpunit/phpunit": "~5.4.0",
- "symfony/dependency-injection": "^4",
- "symfony/config": "^4",
- "queue-interop/queue-spec": "^0.5.5@dev",
- "enqueue/test": "^0.8.25@dev",
- "enqueue/enqueue": "^0.8@dev",
- "enqueue/null": "^0.8@dev"
+ "phpunit/phpunit": "^9.5",
+ "queue-interop/queue-spec": "^0.6.2",
+ "enqueue/test": "0.10.x-dev",
+ "enqueue/null": "0.10.x-dev"
},
"support": {
"email": "opensource@forma-pro.com",
@@ -32,20 +29,15 @@
"docs": "https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md"
},
"autoload": {
- "psr-4": {
- "Enqueue\\Mongodb\\": ""
- },
+ "psr-4": { "Enqueue\\Mongodb\\": "" },
"exclude-from-classmap": [
"/Tests/"
]
},
- "suggest": {
- "enqueue/enqueue": "If you'd like to use advanced features like Client abstract layer or Symfony integration features"
- },
"minimum-stability": "dev",
"extra": {
"branch-alias": {
- "dev-master": "0.8.x-dev"
+ "dev-master": "0.10.x-dev"
}
}
}
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index 1f34af0..6b99609 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -1,16 +1,11 @@
-
+