読者です 読者をやめる 読者になる 読者になる

Android Akkaの話

はじめまして、BONX Androidエンジニアの麻植です。

さて、BONX Android版は6月にリニューアルをしまして、その前後でUIだけでなく内部の設計も変更しています。 まだまだ機種依存バグへの対策などは継続中で、一部端末をお使いの方にはご不便をおかけしていて申し訳ありませんが、βの頃から比べると通話品質などについて大きく改善しています。

具体的には内部設計をEvent Driven APIと、Thread / lockによる明示的な排他制御から、Akkaによるメッセージ駆動の設計に変更しました。

AndroidでAkka?

Androidエンジニアは、Akkaの名前を初めて聞かれる方も多いかもしれません。 Scalaエンジニアにはお馴染みのAkkaでも、Androidで使うとなると、どうやっていいのかわからず二の足を踏まれるかもしれません。

しかし、ユースケースによってはAndroidでもAkkaが有力な選択肢になりえますし、おそらく使用方法もご想像されてるより簡単です。

Akkaとメッセージ駆動

Lightbend社CTOのJonas Bonér氏が開発した、メッセージ駆動による並行・分散処理のtoolkitです。 Actorと呼ばれる並行処理の基礎単位と、Actor間のメッセージによる相互作用を基に設計します。

まずは雰囲気を掴んでいただくために、VoIPのクライアントアプリにおける、最も単純な再生側のActorの構成を見てみましょう。

TCP接続を管理するTCPActor、プロトコルに基づき解釈をするReceiveActor、音声バイト列のデコード処理を担うDecodeActor、音声PCMデータの再生を担うPlayActorから構成されます。 各々のアクターは、!関数を利用して、他のActorへメッセージを送ることができます。

※ ここでは解説しやすいようにTCPを使っていますが、一般的なVoIPではUDPが採用されることが多いでしょう。なお、akka.ioはUDPに対応しています。

class TCPActor(inetAddress: InetSocketAddress, receiveActor: ActorRef) extends Actor {
  override def preStart = {
    IO(Tcp) ! Connect(inetAddress, timeout = Option(5000 millis))
  }

  def receive: Receive = {
    case c: Connected =>
      sender ! Register(self)
      context become receiveAfterConnection(sender)
    /* 実際にはその他のエラーハンドリングなどが色々 */
  }

  def receiveAfterConnection(conn: ActorRef): Receive = {
    case Received(bytes) => receiveActor ! bytes
    case bs: ByteString => conn ! Write(bs)
    /* 実際にはその他のエラーハンドリングなどが色々 */
  }
}
class ReceiveActor() extends Actor {
  lazy val decodeTag = "decode"

  override def preStart = {
    context.actorOf(Props(classOf[DecodeActor]), decodeTag)
  }

  def receive = {
    case bytes: ByteString =>
      for {
        decodeActor <- context.child(decodeTag)
        byteArray <- checkProtocol(bytes)
      } decodeActor ! AudioPacket(byteArray)
  }

  //受信バイト列のparse
  private def checkProtocol(bytes: ByteString): Seq[Array[Byte]] = ???
}
class DecodeActor() extends Actor {
  private var decoder: Option[Decoder] = None
  lazy val playTag = "play"

  override def preStart = {
    decoder = Option(new Decoder())
    context.actorOf(Props(classOf[PlayActor], YOUR_SAMPLE_RATE, YOUR_BUFFER_SIZE), playTag)
  }

  def receive = {
    case AudioPacket(bytes) =>
      for {
        playActor <- context.child(playTag)
        dec <- decoder
      } playActor ! PCM(dec.decode(bytes))
  }

  override def postStop = {
    decoder.foreach(_.release())
    decoder = None
  }
}
class PlayActor(samplingRate: Int, trackBufSize: Int) extends Actor {
  private var track: Option[AudioTrack] = None

  override def preStart = {
    track = Option(new AudioTrack(android.media.AudioManager.STREAM_VOICE_CALL, samplingRate, AudioFormat.CHANNEL_OUT_MONO, AudioFormat.ENCODING_PCM_16BIT, trackBufSize, AudioTrack.MODE_STREAM, AudioManager.AUDIO_SESSION_ID_GENERATE))
    track.foreach(_.play)
  }

  def receive = {
    case PCM(pcms) => track.foreach(_.write(pcms, 0, pcms.length))
  }

  override def postStop = {
    track.foreach {
      tr =>
        tr.flush()
        tr.stop()
        tr.release()
    }
    track = None
  }
}
//何かしらのcodecのDecoder
class Decoder {
  def decode(bytes: Array[Byte]): Array[Short] = ???

  def release(): Unit = ???
}

case class AudioPacket(byte: Array[Byte])

case class PCM(data: Array[Short])
val system = ActorSystem("bonx")
val receiveActor = system.actorOf(Props(classOf[ReceiveActor]))
val tcpActor = system.actorOf(Props(classOf[TCPActor], yourInetAddress, receiveActor))

TCPActorでは、akka.ioパッケージを利用してサーバーとTCP接続し、byte列の送受信を管理します。実際のプロダクトではreceive関数ではエラー処理など他にも様々なことが必要なりますが、簡単のために端折っています。

ReceiveActorは送られてきたパケットを解釈し、後続のDecode、そして再生へ回します。 また、このActorは後述のDecodeActorとPlayActorの親Actorにしており、それぞれの子Actor内でエラーが起きた場合のハンドリングも行います。

DecodeActorでは、Actor内部でDecoderオブジェクトを初期化し、かつ隠蔽しています。 ここで重要なのが、Actorの実行モデルとして、基本的に1Actorインスタンスは1スレッドで逐次処理をするということです。 このため、DecoderがThreadセーフでなくても、明示的な排他制御が不要となります。

PlayActorでも、Actor内部でAudioTrackオブジェクトを初期化し、かつ隠蔽しています。 AudioTrackもDecoder同様にThreadセーフである必要がありません。

各々のActorが担う処理を終えた後、次のアクターへメッセージを送信します。このメッセージの後続処理に成功するかどうかは、基本的には送り手のActorは気にする必要がありません(Fire and Forget)。代わりにエラーが発生した場合などには親ActorのsupervisorStrategyに応じて自動的にRestartをさせたり、必要な場合にはTimeoutを設定することができます。

Actor間の通信も直接Actorインスタンスの扱ったメソッド呼び出しではなく、Actorへの参照を表すActorRefに対しメッセージを送信し、受信側のActorもメールボックスから逐次取り出すため、結合が疎に保つことができます。

これらのメッセージ駆動特徴により、複雑な並行処理になりがちな音声ストリーム処理でも、Actorの実行モデルが別途toolkitにて適切に管理されており、かつ不変なメッセージオブジェクトを利用しActorの内部構造を露出させない限りにおいて、明示的な排他制御から開放された見通しの良いコードを実現できます。

Why Akka?

Actorはメッセージに反応するReceive関数やライフサイクル管理のための関数のほか、重要なコンポーネントとしてmailboxを持ちます。

Akkaにおいては、Receive関数は当然ながら、mailboxもカスタマイズ可能です。 また、Actorの実行モデルであるdispatcherの設定を変更できます。

Androidでは端末スペックの向上により、オクタコアCPU、3GB RAMを積んだ端末も珍しくなくなりました。 しかしながら、まだまだサーバーに比べCPUリソースもメモリ量も少ない実行環境に配慮したmailboxと実行モデルの採用が必須です。

その点、Akkaは柔軟なカスタマイズが可能である点が優れています。

Androidで使う上での制限

Akkaのversion

Android SDKJava 6相当のJava bytecodeをdex(Dalvik Executable)に変換します。 そのため、Akkaは(Java 8向けである)最新版2.4系ではなく、2.3系を使う必要が有ります。

Proguard

AndroidScalaを扱う上では、Proguardは(少なくともプロダクションにおいては)必須です。 Akkaは application.conf や、Propsとリフレクションによるインスタンス化ししばしば行うため、Proguardと相性が悪いです。

実際のProguardの設定は、MacroidのAkka integrationの項目が詳しいでしょう。

それでは。