Amazon Simple Queue Service (SQS) は AWS のメッセージキューイングサービスです。完全マネージドなサービスであり、ボリューム等の細かい心配をせずにシステム間の連携に使用できるのが嬉しいのですが、SQS にはいくつか単純なキューにはない独自の実装を持っています。

その一つとして visibility timeout というものがあります。これは 「自分以外の他の consumer が同じメッセージを (できるだけ) 受信しないようにするため」 の機能なのですが、ドキュメントを読んだだけでは細かい挙動でわからないところがあったので、実際に動かして調査してみました。

テストに使用したコードは こちら に置いています。

Queue の作成

まずはテストに使用するキューを作成します。

# Create queue
$ aws sqs create-queue --queue-name sample-queue

作成したキューのデフォルト設定は以下のようになります。

VisibilityTimeout: 30 となっているので、デフォルトでは受信したメッセージが 30 秒は他の consumer から見えなくなるということがわかります。

# Describe queue
$ $ aws sqs get-queue-attributes --queue-url <queue_url> --attribute-names All
{
    "Attributes": {
        "QueueArn": "<queue_arn>",
        "ApproximateNumberOfMessages": "0",
        "ApproximateNumberOfMessagesNotVisible": "0",
        "ApproximateNumberOfMessagesDelayed": "0",
        "CreatedTimestamp": "1505547619",
        "LastModifiedTimestamp": "1505547619",
        "VisibilityTimeout": "30",
        "MaximumMessageSize": "262144",
        "MessageRetentionPeriod": "345600",
        "DelaySeconds": "0",
        "ReceiveMessageWaitTimeSeconds": "0"
    }
}

Queue へのメッセージ送受信

Visibility timeout について調査する前に、SQS を介した簡単なメッセージの送受信を行ってみます。

以下では aws-java-sdk-sqs を使用して Scala の簡単なコードを書いて試しています。

送信

  • SQS へのメッセージ送信のために SimpleMessagaeSender を定義
    • AmazonSQS を介して SQS とやり取りをする
    • SendMessageRequest が送信するメッセージを表す
package com.tiqwab.example.sqs

import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
import com.amazonaws.services.sqs.model.SendMessageRequest

class SimpleMessageSender(
    val queueUrl: String
) {

  lazy val sqs: AmazonSQS = AmazonSQSClientBuilder.defaultClient()

  def send(msg: String): Unit = {
    val request = new SendMessageRequest()
      .withMessageBody(msg)
      .withQueueUrl(queueUrl)
    sqs.sendMessage(request)
  }

}

SimpleMessageSender を使用してキューにメッセージを投げてみます。

package com.tiqwab.example.sqs

object SendMessageMain {

  def main(args: Array[String]): Unit = {
    val sender = new SimpleMessageSender(
      queueUrl =
        "https://sqs.ap-northeast-1.amazonaws.com/517530801117/sample-queue"
    )
    sender.send("hello")
  }

}

AWS の Web コンソールを見るとたしかにメッセージ数が増えていることがわかります。

受信

  • SQS からのメッセージ受信のために SimpleMessageReceiver を定義
    • AmazonSQS#receiveMessageReceiveMessageRequest オブジェクトを渡す
    • AmazonSQS#deleteMessage でメッセージを処理したことになる
      • これを呼ばないと visibility timeout (default 30s) 後に再度キューから取得可能になる
package com.tiqwab.example.sqs

import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
import com.amazonaws.services.sqs.model.Message
import scala.collection.JavaConverters._

class SimpleMessageReceiver(
    val queueUrl: String
) {

  lazy val sqs: AmazonSQS = AmazonSQSClientBuilder.defaultClient()

  def receive(f: Message => Unit): Unit = {
    val request = new ReceiveMessageRequest(queueUrl)
    val messages = sqs.receiveMessage(request).getMessages.asScala
    messages.foreach { m =>
      f(m)
      sqs.deleteMessage(queueUrl, m.getReceiptHandle)
    }
  }

}

実際にメッセージを受信してみます。

package com.tiqwab.example.sqs

object ReceiveMessageMain {

  def main(args: Array[String]): Unit = {
    val receiver = new SimpleMessageReceiver(
      queueUrl =
        "https://sqs.ap-northeast-1.amazonaws.com/517530801117/sample-queue"
    )
    receiver.receive(m => println(m.getBody))
  }

}

受信の挙動は以下のようになります。

  • 初回実行後 “hello” というメッセージが出力される
  • 二度目以降はメッセージが削除されているので取得されない

Visibility timeout の挙動

ここからが本題で visibility timeout を使用したメッセージ受信の挙動を確認していきます。

基本

  • メッセージ受診時に削除を行わない場合、visibility timeout 後に再度メッセージが取得される
    • 削除しない限り何度でもメッセージが受信され得る

この挙動を確認するためにメッセージを受信後削除しないよう受信側のクラスを変更しました。

class SimpleMessageReceiver(
    val queueUrl: String
) {

  lazy val sqs: AmazonSQS = AmazonSQSClientBuilder.defaultClient()

  // Accept Boolean to control deletion
  def receive(f: String => Unit, doesDelete: Boolean = true): Unit = {
    val request = new ReceiveMessageRequest(queueUrl)
    val messages = sqs.receiveMessage(request).getMessages.asScala
    messages.foreach { m =>
      f(m)
      if (doesDelete) sqs.deleteMessage(queueUrl, m.getReceiptHandle)
    }
  }

}
object ReceiveMessageMain {

  def main(args: Array[String]): Unit = {
    val receiver = new SimpleMessageReceiver(
      queueUrl =
        "https://sqs.ap-northeast-1.amazonaws.com/517530801117/sample-queue"
    )

    @tailrec
    def loop(n: Int): Unit = n match {
      case 0 =>
        ()
      case _ =>
        println(s"${System.currentTimeMillis()}: ")
        receiver.receive(msg => println(s"$msg.getBody"), false)
        Thread.sleep(1000)
        loop(n - 1)
    }
    loop(65)
  }
}
  • Visibility timeout をデフォルト値の 30 秒とし、1 秒ごとにメッセージ受信を試みている
    • おおよそ 30 秒後にメッセージが繰り返し受信されることがわかる
...
1505550644841: 
visibility_timeout_sample
...
1505550686487: 
visibility_timeout_sample
...
1505550717256: 
visibility_timeout_sample
...

Attribute の利用

  • メッセージには body 以外にも Attribute, MessageAttribute と呼ばれる属性が付属している
    • Attribute: システム側が自動的に設定している属性?
    • MessageAttribute: ユーザ側が設定できる属性
  • ReceiveMessageRequest 作成時にメッセージと一緒に持ってきたい属性を指定する
  • 受信後に属性を変更し、visibility timeout した場合、セットした属性は反映されない

上の挙動を確認するために、メッセージ受信時に ApproximateReceiveCountLastReceivedTimestamp という二つの属性を表示するようにしました。 ApproximateReceiveCount はシステムが自動的につけている値、LastReceivedTimestamp は自分で独自にメッセージ受信後につけている値です。

class SimpleMessageReceiver(
    val queueUrl: String
) {

  lazy val sqs: AmazonSQS = AmazonSQSClientBuilder.defaultClient()

  // Receive messages with attributes
  def receive(f: Message => Unit, doesDelete: Boolean = true): Unit = {
    val request = new ReceiveMessageRequest(queueUrl)
      .withAttributeNames("ApproximateReceiveCount")
      .withMessageAttributeNames("LastReceivedTimestamp")
    val messages = sqs.receiveMessage(request).getMessages.asScala
    messages.foreach { m =>
      f(m)
      if (doesDelete) sqs.deleteMessage(queueUrl, m.getReceiptHandle)
    }
  }

}
object ReceiveMessageMain {

  def main(args: Array[String]): Unit = {
    val receiver = new SimpleMessageReceiver(
      queueUrl =
        "https://sqs.ap-northeast-1.amazonaws.com/517530801117/sample-queue"
    )

    @tailrec
    def loop(n: Int): Unit = n match {
      case 0 =>
        ()
      case _ =>
        println(s"${System.currentTimeMillis()}: ")
        receiver.receive(
          msg => {
            println(
              s"body: ${msg.getBody}, attributes: ${msg.getAttributes}, messageAttributes: ${msg.getMessageAttributes}")

            val attributes = msg.getMessageAttributes.asScala
            val lastReceivedTimestamp = new MessageAttributeValue()
              .withDataType("Number")
              .withStringValue(System.currentTimeMillis.toString)
            msg.setMessageAttributes(
              attributes
                .updated("LastReceivedTimestamp", lastReceivedTimestamp)
                .asJava)
          },
          false
        )
        Thread.sleep(1000)
        loop(n - 1)
    }
    loop(65)
  }
}
  • ApproximateReceiveCount は受信毎にカウントが増えている
  • メッセージ受信後、自分で LastReceivedTimestamp を設定しているが、次回受診時には反映されていない
...
1505629833497: 
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=14}, messageAttributes: {}
...
1505629866777: 
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=15}, messageAttributes: {}
...
1505629898377: 
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=16}, messageAttributes: {}
...

Visibility timeout の設定

  • visibility timeout は受診時に再設定することができる

以下のクラスでは ApproximateReceiveCount を利用してメッセージの受信毎に visibility timeout 値を再設定しています。

class SimpleMessageReceiver(
    val queueUrl: String
) {

  lazy val sqs: AmazonSQS = AmazonSQSClientBuilder.defaultClient()

  def receive(f: Message => Unit, doesDelete: Boolean = true): Unit = {
    val request = new ReceiveMessageRequest(queueUrl)
      .withAttributeNames("ApproximateReceiveCount")
    val messages = sqs.receiveMessage(request).getMessages.asScala
    messages.foreach { m =>
      f(m)
      // visibility timeout の再設定
      m.getAttributes
        .get("ApproximateReceiveCount")
        .asInstanceOf[String]
        .toInt match {
        case 1 => // 1 回目の受信
          sqs.changeMessageVisibility(
            createChangeVisibilityRequest(10, m.getReceiptHandle))
        case 2 => // 2 回目の受信
          sqs.changeMessageVisibility(
            createChangeVisibilityRequest(20, m.getReceiptHandle))
        case 3 => // 3 回目の受信
          sqs.changeMessageVisibility(
            createChangeVisibilityRequest(120, m.getReceiptHandle))
          Thread.sleep(30000)
          sqs.changeMessageVisibility(
            createChangeVisibilityRequest(60, m.getReceiptHandle))
        case _ => // それ以降
          ()
      }
      if (doesDelete) sqs.deleteMessage(queueUrl, m.getReceiptHandle)
    }
  }

  def createChangeVisibilityRequest(
      timeout: Int,
      receiptHandle: String
  ): ChangeMessageVisibilityRequest =
    new ChangeMessageVisibilityRequest()
      .withQueueUrl(queueUrl)
      .withReceiptHandle(receiptHandle)
      .withVisibilityTimeout(timeout)

}
object ReceiveMessageMain {

  def main(args: Array[String]): Unit = {
    val receiver = new SimpleMessageReceiver(
      queueUrl =
        "https://sqs.ap-northeast-1.amazonaws.com/517530801117/sample-queue"
    )

    @tailrec
    def loop(n: Int): Unit = n match {
      case 0 =>
        ()
      case _ =>
        println(s"${System.currentTimeMillis()}: ")
        receiver.receive(
          msg => {
            println(s"body: ${msg.getBody}, attributes: ${msg.getAttributes}")
          },
          false
        )
        Thread.sleep(1000)
        loop(n - 1)
    }
    loop(180)
  }
}

結果は以下のようになります。

  • ChangeVisibilityRequest で設定した時間がちゃんと反映されている
  • 設定した visibility timeout は次のメッセージにのみ有効
    • 一度設定しても次々回の受信はデフォルトの visibility timeout が使用される
  • あるメッセージ受信時に複数回 ChangeVisibilityRequest を行った場合、リクエストを送信したタイミングから設定した visibility timeout が有効になる
    • 換言すれば visibility timeout の延長ができるということ
1505632893061: # <- (set visibility timeout to 10s)
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=1}
...
1505632905142: # <- (set visibility timeout to 20s)
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=2}
...
1505632929236: # (set visibility timeout to 120s)
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=3}
(wait 30s)
1505632960979: # (set visibility timeout to 60s)
...
1505633021165: 
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=4}
...
1505633058395: 
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=5}
...
1505633092792: 
body: visibility_timeout_sample, attributes: {ApproximateReceiveCount=6}
...

Memo

アプリケーションをデーモンとして実行するために sbt-native-packager が便利

$ sbt stage
$ ./target/universal/stage/bin/extension-receive-message-main &