Основа
Основой для разработки плагина послужила статья Нагрузочное тестирование «не-HTTP». Ч.2 Gatling. Механизм отправки был написан по примеру из статьи.
Использовать существующий плагин AMQP plugin by @maiha, указанный в документации на Gatling 3: https://gatling.io/docs/3.1/extensions/ для версии Gatling 3.1.2 не удалось. Плагин устарел. А переделать его оперативно не вышло из-за больших изменений в самом Gatling.
Разработка
Получившийся скрипт, который отправляет просто сообщение с фиксированным тестом «Hello, World»: AmqpGatlingSimulation.scala (https://gist.github.com/polarnik/10dce91e372ed74ba7038250125f26a6)
package simulations
import io.gatling.commons.validation.Validation
import io.gatling.core.Predef._
import io.gatling.core.action.{RequestAction}
import io.gatling.core.action.builder.ActionBuilder
import io.gatling.core.session.{Expression, Session}
import io.gatling.core.structure.ScenarioContext
import io.gatling.core.action.Action
import scala.concurrent.duration._
import scala.language.postfixOps
import com.rabbitmq.client.ConnectionFactory
import com.typesafe.scalalogging.StrictLogging
import io.gatling.commons.stats.OK
import io.gatling.commons.util.Clock
import io.gatling.core.stats.StatsEngine
import io.gatling.core.util.NameGen
import io.gatling.commons.validation._
class RabbitTest extends Simulation {
def amqpSend(message: String) = new ActionBuilder {
def build(ctx: ScenarioContext, next: Action): Action = {
new AmqpSend(message, ctx, next)
}
}
val scn = scenario("AMQP protocol test")
.exec(amqpSend("Hello, World"))
setUp(
scn.inject(
rampUsersPerSec(1) to (60) during(180 seconds),
)
)
}
class Around(before: () => Unit, after: () => Unit) {
def apply(f: => Any): Unit = {
before()
f
after()
}
}
trait AmqpLogging extends StrictLogging {
def logMessage(text: => String, msg: String): Unit = {
logger.debug(text)
logger.trace(msg.toString)
}
}
class AmqpSend(val message: String, val ctx: ScenarioContext, val next: Action) extends RequestAction with AmqpLogging with NameGen {
override val name: String = genName("amqp publish")
override val requestName: Expression[String] = "amqp publish"
override def clock: Clock = ctx.coreComponents.clock
override def statsEngine: StatsEngine = ctx.coreComponents.statsEngine
def send(): Unit = {
val factory = new ConnectionFactory()
factory.setUsername("guest")
factory.setPassword("guest")
factory.setHost("localhost")
factory.setVirtualHost("/")
factory.setPort(5672)
val conn = factory.newConnection()
val channel = conn.createChannel()
channel.exchangeDeclare("amqp.direct.1", "direct", true)
val messageBodyBytes = message.getBytes()
channel.basicPublish("amqp.direct.1", "routing.key.1", null, messageBodyBytes)
channel.close()
conn.close()
}
override def sendRequest(requestName: String, session: Session): Validation[Unit] =
for {
around <- aroundSend(requestName, session)
} yield {
around(
send()
)
}
protected def aroundSend(requestName: String, session: Session): Validation[Around] = {
new Around(
before = () => {
if (logger.underlying.isDebugEnabled) {
logMessage(s"Message sent", message)
}
},
after = () => {
ctx.coreComponents.configuration.resolve(
{
val now = clock.nowMillis
statsEngine.logResponse(session, requestName, session.startDate, now, OK, None, None)
}
)
next ! session
}
).success
}
}
Результаты
Таким простым скриптом на локальном RabbitMQ с настройками по умолчанию получилось достичь интенсивности до 700 запросов в сек.
По профилю нагрузки, мне нужен скрипт, который будет делать не более 50 запросов в сек, поэтому даже такой неоптимальный механизм нагрузки полностью устраивает.
Неоптимальность заключается в том, что перед каждым запросом подключаюсь к rabbitmq.
Также в коде заданы параметры подключения. Но это несложно параметризировать.

Плагин от Вячеслава Калёкина (Tinkoff Credit System)
https://github.com/TinkoffCreditSystems/gatling-amqp-plugin
Распространяется в исходных кодах. Для сборки нужно выполнить команду:
sbt publishLocal
Поддерживает две операции (на момент 29 сентября 2019 года):
- publish — отправка сообщения
- requestReply — отправка сообщения и получение ответа по CorrelationID из другой очереди
СЛОЖНО
НравитсяНравится
51
НравитсяНравится