Основа

Основой для разработки плагина послужила статья Нагрузочное тестирование «не-HTTP». Ч.2 Gatling. Механизм отправки был написан по примеру из статьи.

Использовать существующий плагин AMQP plugin by @maiha, указанный в документации на Gatling 3: https://gatling.io/docs/3.1/extensions/ для версии Gatling 3.1.2 не удалось. Плагин устарел. А переделать его оперативно не вышло из-за больших изменений в самом Gatling.

Разработка

Получившийся скрипт, который отправляет просто сообщение с фиксированным тестом «Hello, World»: AmqpGatlingSimulation.scala (https://gist.github.com/polarnik/10dce91e372ed74ba7038250125f26a6)

package simulations

import io.gatling.commons.validation.Validation
import io.gatling.core.Predef._
import io.gatling.core.action.{RequestAction}
import io.gatling.core.action.builder.ActionBuilder
import io.gatling.core.session.{Expression, Session}
import io.gatling.core.structure.ScenarioContext
import io.gatling.core.action.Action

import scala.concurrent.duration._
import scala.language.postfixOps
import com.rabbitmq.client.ConnectionFactory
import com.typesafe.scalalogging.StrictLogging
import io.gatling.commons.stats.OK
import io.gatling.commons.util.Clock
import io.gatling.core.stats.StatsEngine
import io.gatling.core.util.NameGen
import io.gatling.commons.validation._

class RabbitTest extends Simulation {

  def amqpSend(message: String) = new ActionBuilder {
    def build(ctx: ScenarioContext, next: Action): Action = {
      new AmqpSend(message, ctx, next)
    }
  }

  val scn = scenario("AMQP protocol test")
    .exec(amqpSend("Hello, World"))

  setUp(
    scn.inject(
      rampUsersPerSec(1) to (60) during(180 seconds),
    )
  )

}

class Around(before: () => Unit, after: () => Unit) {

  def apply(f: => Any): Unit = {
    before()
    f
    after()
  }
}

trait AmqpLogging extends StrictLogging {

  def logMessage(text: => String, msg: String): Unit = {
    logger.debug(text)
    logger.trace(msg.toString)
  }
}

class AmqpSend(val message: String, val ctx: ScenarioContext, val next: Action) extends RequestAction with AmqpLogging with NameGen {

  override val name: String = genName("amqp publish")
  override val requestName: Expression[String] = "amqp publish"

  override def clock: Clock = ctx.coreComponents.clock

  override def statsEngine: StatsEngine = ctx.coreComponents.statsEngine

  def send(): Unit = {
    val factory = new ConnectionFactory()
    factory.setUsername("guest")
    factory.setPassword("guest")
    factory.setHost("localhost")
    factory.setVirtualHost("/")
    factory.setPort(5672)

    val conn = factory.newConnection()

    val channel = conn.createChannel()

    channel.exchangeDeclare("amqp.direct.1", "direct", true)

    val messageBodyBytes = message.getBytes()

    channel.basicPublish("amqp.direct.1", "routing.key.1", null, messageBodyBytes)

    channel.close()
    conn.close()
  }

  override def sendRequest(requestName: String, session: Session): Validation[Unit] =
  for {
    around <- aroundSend(requestName, session)
  } yield {
    around(
      send()
    )
  }

  protected def aroundSend(requestName: String, session: Session): Validation[Around] = {
    new Around(
      before = () => {
        if (logger.underlying.isDebugEnabled) {
          logMessage(s"Message sent", message)
        }
      },
      after = () => {
        ctx.coreComponents.configuration.resolve(
          {
            val now = clock.nowMillis
            statsEngine.logResponse(session, requestName, session.startDate, now, OK, None, None)
          }
        )
        next ! session
      }

    ).success
  }
}

Результаты

Таким простым скриптом на локальном RabbitMQ с настройками по умолчанию получилось достичь интенсивности до 700 запросов в сек.

По профилю нагрузки, мне нужен скрипт, который будет делать не более 50 запросов в сек, поэтому даже такой неоптимальный механизм нагрузки полностью устраивает.

Неоптимальность заключается в том, что перед каждым запросом подключаюсь к rabbitmq.

Также в коде заданы параметры подключения. Но это несложно параметризировать.

Нагрузка была возрастающая, но был достигнут предел в 650-700 запросов в сек

Плагин от Вячеслава Калёкина (Tinkoff Credit System)

https://github.com/TinkoffCreditSystems/gatling-amqp-plugin

Распространяется в исходных кодах. Для сборки нужно выполнить команду:

sbt publishLocal

Поддерживает две операции (на момент 29 сентября 2019 года):

  • publish — отправка сообщения
  • requestReply — отправка сообщения и получение ответа по CorrelationID из другой очереди

Отправка запросов в RabbitMQ (AMQP) из Gatling: 2 комментария

Добавить комментарий

Заполните поля или щелкните по значку, чтобы оставить свой комментарий:

Логотип WordPress.com

Для комментария используется ваша учётная запись WordPress.com. Выход /  Изменить )

Фотография Facebook

Для комментария используется ваша учётная запись Facebook. Выход /  Изменить )

Connecting to %s