diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index a252fcfd5..cd0ff3c1a 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -54,8 +54,7 @@ public function __construct(array $config) $this->config = $config; $this->kafkaConsumers = []; $this->rdKafkaConsumers = []; - - $this->setSerializer(new JsonSerializer()); + $this->configureSerializer($config); } /** @@ -180,6 +179,58 @@ public static function getLibrdKafkaVersion(): string return "$major.$minor.$patch"; } + /** + * @return void + * JsonSerializer should be the default fallback if no serializer is specified + */ + private function configureSerializer(array $config): void + { + if (!isset($config['serializer'])) { + $this->setSerializer(new JsonSerializer()); + + return; + } + + $serializer = $config['serializer']; + + if ($serializer instanceof Serializer) { + $this->setSerializer($serializer); + + return; + } + + $serializerClass = $this->resolveSerializerClass($serializer); + + if (!class_exists($serializerClass) || !is_a($serializerClass, Serializer::class, true)) { + throw $this->createInvalidSerializerException($serializerClass); + } + + $serializerOptions = $serializer['options'] ?? []; + $this->setSerializer(new $serializerClass($serializerOptions)); + } + + private function resolveSerializerClass(mixed $serializer): string + { + if (is_string($serializer)) { + return $serializer; + } + + if (is_array($serializer) && isset($serializer['class'])) { + return $serializer['class']; + } + + throw $this->createInvalidSerializerException($serializer); + } + + private function createInvalidSerializerException(mixed $value): \InvalidArgumentException + { + return new \InvalidArgumentException(sprintf( + 'Invalid serializer configuration. Expected "serializer" to be a string, an array with a "class" key, or a %s instance. Received %s instead.', + Serializer::class, + get_debug_type($value) + )); + } + private function getConf(): Conf { if (null === $this->conf) { diff --git a/pkg/rdkafka/Tests/RdKafkaContextTest.php b/pkg/rdkafka/Tests/RdKafkaContextTest.php index dc1b597de..a691fcc5b 100644 --- a/pkg/rdkafka/Tests/RdKafkaContextTest.php +++ b/pkg/rdkafka/Tests/RdKafkaContextTest.php @@ -36,6 +36,34 @@ public function testShouldSetJsonSerializerInConstructor() $this->assertInstanceOf(JsonSerializer::class, $context->getSerializer()); } + public function testShouldUseStringSerializerClassFromConfig() + { + $mockSerializerClass = get_class($this->createMock(Serializer::class)); + + $context = new RdKafkaContext([ + 'serializer' => $mockSerializerClass, + ]); + + $this->assertInstanceOf($mockSerializerClass, $context->getSerializer()); + } + + public function testShouldUseJsonSerializer() + { + $context = new RdKafkaContext([]); + + $this->assertInstanceOf(JsonSerializer::class, $context->getSerializer()); + } + + public function testShouldThrowExceptionOnInvalidSerializerConfig() + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('Invalid serializer configuration'); + + new RdKafkaContext([ + 'serializer' => 123, + ]); + } + public function testShouldAllowGetPreviouslySetSerializer() { $context = new RdKafkaContext([]);