You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository has been archived by the owner on Jun 11, 2021. It is now read-only.
In one of my applications I am generating an array of messages and iterating over them and pushing them (Publish) to the producer.
I am creating the producer as follows -
val deadLetterOption = config.deadletter match {
case true => clientProps ++ Map("x-dead-letter-routing-key" -> config.deadletterRoutingKey, "x-dead-letter-exchange" -> config.deadletterExchange)
case _ => clientProps
}
val ttlOption = config.ttl > 0 match {
case true => deadLetterOption ++ Map("x-message-ttl" -> new Integer(config.ttl))
case false => deadLetterOption
}
val channelParameters = Option(ChannelParameters(1))
val exchangeParams = ExchangeParameters(name = exchange, passive = false,
exchangeType = "direct", durable = true, autodelete = false, clientProps)
val queueParams = QueueParameters(queueName, passive = false, durable = true, exclusive = false, autodelete = false,
clientProps)
val producer = ConnectionOwner.createChildActor(connection, ChannelOwner.props(channelParams = channelParameters),
timeout = timeout.second)
Amqp.waitForConnection(system, producer).await()
producer ! DeclareExchange(exchangeParams)
producer ! DeclareQueue(queueParams)
producer ! QueueBind(queue = queueName, exchange = exchange, routing_key = queueName)
I am getting the following in my deadletter watcher -
Publish(my_ex,my_q,[B@f94d482,None,true,false)
When I generate 100k messages, I get this dropped messages for > 10k messages.
I am using unbounded mailbox (unless this library uses some other mailbox from code).
What could be the reason for these drops?
The text was updated successfully, but these errors were encountered:
Hi,
Sorry for the delay, I was off in the mountains!
And sorry for asking a basic question, but are you sure that your producer program does not exit before all messages have actually been pushed to the broker ? (see #72 for something similar).
Is there a way that you can package a small, complete sample so that I can try and reproduce the problem ?
Thanks
I am absolutely sure that the producer program has not exited, because when I am testing locally my producer and consumer actors are in the same executable.
I will try to reproduce with a self contained example.
This might be a bug in the way the producer actor is killed. For this specific use case I iterate over the messages and send them to a producer. Once the loop is done I send a PoisonPill message to the producer. It seems the producer actor and the RMQ channel is getting closed before the messages I have already sent are sent to RMQ.
For now I have added a delay (thread sleep) after sending the last message to producer actor to delay issuing the PoisonPill.
Let me know if this information helps in finding the issue.
Sign up for freeto subscribe to this conversation on GitHub.
Already have an account?
Sign in.
In one of my applications I am generating an array of messages and iterating over them and pushing them (Publish) to the producer.
I am creating the producer as follows -
val deadLetterOption = config.deadletter match {
case true => clientProps ++ Map("x-dead-letter-routing-key" -> config.deadletterRoutingKey, "x-dead-letter-exchange" -> config.deadletterExchange)
case _ => clientProps
}
val ttlOption = config.ttl > 0 match {
case true => deadLetterOption ++ Map("x-message-ttl" -> new Integer(config.ttl))
case false => deadLetterOption
}
val channelParameters = Option(ChannelParameters(1))
val exchangeParams = ExchangeParameters(name = exchange, passive = false,
exchangeType = "direct", durable = true, autodelete = false, clientProps)
val queueParams = QueueParameters(queueName, passive = false, durable = true, exclusive = false, autodelete = false,
clientProps)
val producer = ConnectionOwner.createChildActor(connection, ChannelOwner.props(channelParams = channelParameters),
timeout = timeout.second)
Amqp.waitForConnection(system, producer).await()
producer ! DeclareExchange(exchangeParams)
producer ! DeclareQueue(queueParams)
producer ! QueueBind(queue = queueName, exchange = exchange, routing_key = queueName)
I am getting the following in my deadletter watcher -
Publish(my_ex,my_q,[B@f94d482,None,true,false)
When I generate 100k messages, I get this dropped messages for > 10k messages.
I am using unbounded mailbox (unless this library uses some other mailbox from code).
What could be the reason for these drops?
The text was updated successfully, but these errors were encountered: