ScalaでElasticsearchを使う(elastic4s)
ScalaでElasticsearchを扱うライブラリelastic4sについて、比較的古めの情報しか見つからなかったので、使い方をまとめておきます。HTTPのAPIを提供するサーバーを作るコードを使って解説していきます。完全なコードはpecorarista/elastic4s-exampleを参照してください。
elastic4sを使う理由ですがAlpakka Elasticsearchと違い、日本語の文字を使ったクエリに対応できていたからです。ただAlpakka ElasticsearchにPRを出したところマージされたので、今後のリリースでは解消されると思います。elastic4sとAlpakka Elasticsearchの違いとしては前者がDSLを使ってクエリを作ることが挙げられます。ここらへんは好みが分かれるところだと思いますが、個人的にはDSLを使う方が好きなのでelastic4sを使うことにしてよかったです。
Elasticsearchの準備
elastic4sのバージョンと整合性をとる必要があるため、今回は7.3.2をインストールします。インストール方法についてはよい解説がたくさんあるため、ここでは特に説明しません。
今回使うデータの仕様を定義したindex.json
を準備します。retrieved
(取得日時)はUNIX Timeを表す整数です。
{
"settings": {
"analysis": {
"analyzer": {
"document_analyzer": { "type": "custom", "tokenizer": "kuromoji_tokenizer" }
}
}
},
"mappings":{
"properties": {
"uri": { "type": "keyword" },
"title": { "type": "keyword" },
"retrieved": { "type": "date", "format": "epoch_second" },
"content": { "type": "text", "analyzer": "document_analyzer" }
}
}
}
これを使用してインデックスを作成します。
curl -X PUT \
--header "Content-Type: application/json" \
"http://localhost:9200/documents" \
--data-binary @index.json
次に登録するデータを準備します。ここではbulk APIを使うので、以下のような形式のファイルを作成します。
{"index": {"_index": "documents", "_id": "http://example.com/article/123"}}
{"uri": "http://example.com/article/123", "title": "テスト", "retrieved": 1319937300, "content": "本文"}
...
これを以下のコマンドでElasticsearchに送ります。
curl -X POST \
--header "Content-Type: application/json"
"http://localhost:9200/_bulk" \
--data-binary @data.jsonl
依存関係の定義
elastic4sのREADMEを参考にbuild.sbtを編集します。今回はHTTPサーバーの機能を使うのでAkka HTTPの関連するライブラリも依存関係に含めます。このときelastic4sのメジャーバージョン・マイナーバージョンがElasticsearchのそれと一致するように気をつけてください。
name := "example"
version := "0.1.0"
lazy val root = (project in file("."))
scalaVersion := "2.12.10"
val elastic4sVersion = "7.3.1"
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.26"
libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.1.10"
libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.10"
libraryDependencies += "com.typesafe.akka" %% "akka-slf4j" % "2.5.26"
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-client-akka" % elastic4sVersion
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sVersion
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-json-spray" % elastic4sVersion
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-testkit" % elastic4sVersion % "test"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime
libraryDependencies += "ch.megard" %% "akka-http-cors" % "0.4.1"
Scalaでのデータ型の定義
まずは型クラス(case class)を定義します。今回はGETリクエストによる検索と、POSTリクエストによる更新のみの単純なAPIを作るので、以下の2つの型クラスを定義すれば十分です。
object ApplicationTypes {
case class Document(
uri: String,
title: String,
retrieved: Long,
content: String
)
case class UpdateDocumentRequest(
uri: String,
title: Option[String],
retrieved: Option[Long],
content: Option[String]
)
}
次にJSONをScalaの型クラスに変換する方法を定義します。elastic4sは以下のライブラリをサポートしているので好きなものを使ってください。今回はSpray Jsonを使います。これはAkka HTTPがSpray Jsonをサポートしているため、自分でコードを書く量が減らせるからです。
- Jackson
- Circe
- Json4s
- PlayJson
- Spray Json
今回はJSONとScalaにおける型が1対1で対応するため、jsonFormatN
を指定するだけで自動的に変換規則が生成されます。jsonFormatN
のN
は型クラスのフィールド数に合わせる必要があります。
import ApplicationTypes._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
trait JsonProtocol extends SprayJsonSupport with DefaultJsonProtocol {
implicit val documentFormat = jsonFormat4(Document)
implicit val documentUpdateRequestFormat = jsonFormat4(UpdateDocumentRequest)
}
object JsonProtocol extends JsonProtocol
クエリの組み立て方
elastic4sのドキュメントはやや古くなっている箇所が多いのでelastic4s/src/main/scala/com/requests/searches/queriesのあたりを参考にしながら書くとよいと思います。
private def findDocuments(term: Option[String]): Future[Seq[Document]] = {
val queryBody = term match {
case Some(s) => matchQuery("content", s)
case None => matchAllQuery()
}
elasticClient
.execute(
search(documentsIndexName).limit(20).query(queryBody)
)
.map { response =>
response match {
case RequestSuccess(status @ _, body @ _, headers @ _, result) =>
result.hits.hits.to[Seq].map(_.to[Document])
case _ => Seq.empty[Document]
}
}
}
private def updateDocument(request: UpdateDocumentRequest): Future[Option[String]] =
elasticClient
.execute {
update(request.uri)
.in(documentsIndexName)
.doc(UpdateDocumentRequest(request.uri, request.title, request.retrieved, request.content))
}
.map { response =>
response match {
case RequestFailure(status @ _, body @ _, headers @ _, error) => Some(error.reason)
case _ => None
}
}
ルーティング規則の定義
Akka HTTPのドキュメントを参考に、ルーティング規則を書いていきます。またAPIを提供するときオリジン間リソース共有 (CORS)について制御する必要が生じると思うので、cors()
を呼んでいます。今回の範囲では特に必要ではないので省いても大丈夫です。
trait Route {
implicit val elasticClient: ElasticClient
implicit val executionContext: ExecutionContext
val documentsIndexName = "documents"
val routes = cors() {
path(documentsIndexName) {
get {
parameter('term.?) { term =>
complete(findDocuments(term))
}
} ~
post {
entity(as[UpdateDocumentRequest]) { request =>
val message: Future[String] = updateDocument(request).map(_.getOrElse("Success!"))
complete(message)
}
}
}
}
private def findDocuments(term: Option[String]): Future[Seq[Document]] = ...
private def updateDocument(request: UpdateDocumentRequest): Future[Option[String]] = ...
}
実行
サーバーを起動するため、実行可能なオブジェクトを作成します
object Main extends App with Route {
implicit val system = ActorSystem()
implicit val executionContext = system.dispatcher
implicit val materializer = ActorMaterializer()
val akkaClient = AkkaHttpClient(AkkaHttpClientSettings(Seq("localhost:9200")))
val elasticClient = ElasticClient(akkaClient)
val bindingFuture = Http().bindAndHandle(routes, "localhost", 8080)
println(s"Server online at http://localhost:8080/.\nPress Return to exit.")
StdIn.readLine()
elasticClient.close()
bindingFuture
.flatMap(_.unbind())
.onComplete { _ => system.terminate() }
}
Scala側でサーバーを起動する前に、Elasticsearchが9200番のポートでアクセス可能なことを確認します。それができたらsbt
コマンドで対話環境に入り、run
と打ってHTTPサーバーを起動します。ここまでできたら、ブラウザのアドレスバーにhttp://localhost:8080/documents?term=...
(...
の部分は好きな文字列)と入力すると、検索結果が返ってくるはずです(termクエリなので単語の分かち書きを考慮した検索になっていることに注意してください。)。また更新については以下のようなPOSTリクエストを投げると“Success!”と帰ってくるはずです。もう一度検索し、タイトルが更新されていることを確認してみてください。
curl -X POST \
--header "Content-Type: application/json" \
"http://localhost:8080/documents" \
--data '{"uri": "http://example.com/article/123", "title": "更新"}'