ScalaでElasticsearchを使う(elastic4s)

Posted: 2019-11-27

ScalaでElasticsearchを扱うライブラリelastic4sについて、比較的古めの情報しか見つからなかったので、使い方をまとめておきます。HTTPのAPIを提供するサーバーを作るコードを使って解説していきます。完全なコードはpecorarista/elastic4s-exampleを参照してください。

elastic4sを使う理由ですがAlpakka Elasticsearchと違い、日本語の文字を使ったクエリに対応できていたからです。ただAlpakka ElasticsearchにPRを出したところマージされたので、今後のリリースでは解消されると思います。elastic4sとAlpakka Elasticsearchの違いとしては前者がDSLを使ってクエリを作ることが挙げられます。ここらへんは好みが分かれるところだと思いますが、個人的にはDSLを使う方が好きなのでelastic4sを使うことにしてよかったです。

Elasticsearchの準備

elastic4sのバージョンと整合性をとる必要があるため、今回は7.3.2をインストールします。インストール方法についてはよい解説がたくさんあるため、ここでは特に説明しません。

今回使うデータの仕様を定義したindex.jsonを準備します。retrieved(取得日時)はUNIX Timeを表す整数です。

index.json
{
  "settings": {
    "analysis": {
      "analyzer": {
        "document_analyzer": { "type": "custom", "tokenizer": "kuromoji_tokenizer" }
      }
    }
  },
  "mappings":{
    "properties": {
      "uri": { "type": "keyword" },
      "title": { "type": "keyword" },
      "retrieved": { "type": "date", "format": "epoch_second" },
      "content": { "type": "text", "analyzer": "document_analyzer" }
    }
  }
}

これを使用してインデックスを作成します。

curl -X PUT \
    --header "Content-Type: application/json"  \
    "http://localhost:9200/documents" \
    --data-binary @index.json

次に登録するデータを準備します。ここではbulk APIを使うので、以下のような形式のファイルを作成します。

data.jsonl
{"index": {"_index": "documents", "_id": "http://example.com/article/123"}}
{"uri": "http://example.com/article/123", "title": "テスト", "retrieved": 1319937300, "content": "本文"}
...

これを以下のコマンドでElasticsearchに送ります。

curl -X POST \
    --header "Content-Type: application/json"
    "http://localhost:9200/_bulk" \
    --data-binary @data.jsonl

依存関係の定義

elastic4sのREADMEを参考にbuild.sbtを編集します。今回はHTTPサーバーの機能を使うのでAkka HTTPの関連するライブラリも依存関係に含めます。このときelastic4sのメジャーバージョン・マイナーバージョンがElasticsearchのそれと一致するように気をつけてください。

${project_root}/build.sbt
name := "example"
version := "0.1.0"
lazy val root = (project in file("."))
scalaVersion := "2.12.10"

val elastic4sVersion = "7.3.1"
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.26"
libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.1.10"
libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.10"
libraryDependencies += "com.typesafe.akka" %% "akka-slf4j" % "2.5.26"
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-client-akka" % elastic4sVersion
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sVersion
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-json-spray" % elastic4sVersion
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-testkit" % elastic4sVersion % "test"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime
libraryDependencies += "ch.megard" %% "akka-http-cors" % "0.4.1"

Scalaでのデータ型の定義

まずは型クラス(case class)を定義します。今回はGETリクエストによる検索と、POSTリクエストによる更新のみの単純なAPIを作るので、以下の2つの型クラスを定義すれば十分です。

${project_root}/src/main/scala/${organization}/${app}/ApplicationTypes.scala
object ApplicationTypes {
  case class Document(
    uri: String,
    title: String,
    retrieved: Long,
    content: String
  )
  case class UpdateDocumentRequest(
    uri: String,
    title: Option[String],
    retrieved: Option[Long],
    content: Option[String]
  )
}

次にJSONをScalaの型クラスに変換する方法を定義します。elastic4sは以下のライブラリをサポートしているので好きなものを使ってください。今回はSpray Jsonを使います。これはAkka HTTPがSpray Jsonをサポートしているため、自分でコードを書く量が減らせるからです。

  • Jackson
  • Circe
  • Json4s
  • PlayJson
  • Spray Json

今回はJSONとScalaにおける型が1対1で対応するため、jsonFormatNを指定するだけで自動的に変換規則が生成されます。jsonFormatNNは型クラスのフィールド数に合わせる必要があります。

${project_root}/src/main/scala/${organization}/${app}/JsonProtocol.scala
import ApplicationTypes._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport

trait JsonProtocol extends SprayJsonSupport with DefaultJsonProtocol {
  implicit val documentFormat = jsonFormat4(Document)
  implicit val documentUpdateRequestFormat = jsonFormat4(UpdateDocumentRequest)
}
object JsonProtocol extends JsonProtocol

クエリの組み立て方

elastic4sのドキュメントはやや古くなっている箇所が多いのでelastic4s/src/main/scala/com/requests/searches/queriesのあたりを参考にしながら書くとよいと思います。

${project_root}/src/main/scala/${organization}/${app}/Route.scala
private def findDocuments(term: Option[String]): Future[Seq[Document]] = {
  val queryBody = term match {
    case Some(s) => matchQuery("content", s)
    case None => matchAllQuery()
  }
  elasticClient
    .execute(
      search(documentsIndexName).limit(20).query(queryBody)
    )
    .map { response =>
      response match {
        case RequestSuccess(status @ _, body @ _, headers @ _, result) =>
          result.hits.hits.to[Seq].map(_.to[Document])
        case _ => Seq.empty[Document]
      }
    }
}

private def updateDocument(request: UpdateDocumentRequest): Future[Option[String]] =
  elasticClient
    .execute {
      update(request.uri)
        .in(documentsIndexName)
        .doc(UpdateDocumentRequest(request.uri, request.title, request.retrieved, request.content))
    }
    .map { response =>
      response match {
        case RequestFailure(status @ _, body @ _, headers @ _, error) => Some(error.reason)
        case _ => None
      }
    }

ルーティング規則の定義

Akka HTTPのドキュメントを参考に、ルーティング規則を書いていきます。またAPIを提供するときオリジン間リソース共有 (CORS)について制御する必要が生じると思うので、cors()を呼んでいます。今回の範囲では特に必要ではないので省いても大丈夫です。

${project_root}/src/main/scala/${organization}/${app}/Route.scala
trait Route {

  implicit val elasticClient: ElasticClient
  implicit val executionContext: ExecutionContext

  val documentsIndexName = "documents"

  val routes = cors() {
    path(documentsIndexName) {
      get {
        parameter('term.?) { term =>
          complete(findDocuments(term))
        }
      } ~
        post {
          entity(as[UpdateDocumentRequest]) { request =>
            val message: Future[String] = updateDocument(request).map(_.getOrElse("Success!"))
            complete(message)
          }
        }
    }
  }

  private def findDocuments(term: Option[String]): Future[Seq[Document]] = ...

  private def updateDocument(request: UpdateDocumentRequest): Future[Option[String]] = ...

}

実行

サーバーを起動するため、実行可能なオブジェクトを作成します

${project_root}/src/main/scala/${organization}/${app}/Main.scala
object Main extends App with Route {

  implicit val system = ActorSystem()
  implicit val executionContext = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val akkaClient = AkkaHttpClient(AkkaHttpClientSettings(Seq("localhost:9200")))
  val elasticClient = ElasticClient(akkaClient)
  val bindingFuture = Http().bindAndHandle(routes, "localhost", 8080)

  println(s"Server online at http://localhost:8080/.\nPress Return to exit.")

  StdIn.readLine()
  elasticClient.close()
  bindingFuture
    .flatMap(_.unbind())
    .onComplete { _ => system.terminate() }

}

Scala側でサーバーを起動する前に、Elasticsearchが9200番のポートでアクセス可能なことを確認します。それができたらsbtコマンドで対話環境に入り、runと打ってHTTPサーバーを起動します。ここまでできたら、ブラウザのアドレスバーにhttp://localhost:8080/documents?term=......の部分は好きな文字列)と入力すると、検索結果が返ってくるはずです(termクエリなので単語の分かち書きを考慮した検索になっていることに注意してください。)。また更新については以下のようなPOSTリクエストを投げると“Success!”と帰ってくるはずです。もう一度検索し、タイトルが更新されていることを確認してみてください。

curl -X POST \
    --header "Content-Type: application/json"  \
    "http://localhost:8080/documents" \
    --data '{"uri": "http://example.com/article/123", "title": "更新"}'