makenowjust-labs/blog

MakeNowJust Laboratory Tech Blog

Λ*とMAT*によるSymbolic Finite Automataの学習

2025-02-20 (更新: 2025-02-23) / 読むのにかかる時間: 約49.7

有限状態オートマトンの一種にSymbolic Finite Automata (SFA)と呼ばれるものがあります。 これは遷移が文字ではなく条件式でラベル付けされるオートマトンで、現実世界でSFAを使うことで効率的に表現できる例があることから、研究が進められています。

SFAに対するオートマトン学習アルゴリズムとして、Λ\Lambda*MAT\mathrm{MAT}*が知られています。 どちらも既存のオートマトン学習のアルゴリズムをベースとしてSFAに拡張したものですが、学習に用いるデータ構造の特徴を上手く利用した興味深いものになっています。

この記事ではまずSFAの定義を確認し、そのあとにΛ\Lambda*MAT\mathrm{MAT}*について解説します。

想定読者: オートマトン学習について知識・関心がある方。

SFA

Symbolic Finite Automata (SFA)とは有限状態オートマトンの一種で、遷移が文字ではなく文字についての条件式でラベル付けされるものです。

例えば次のような有限状態オートマトン (DFA)を考えます。 このDFAでは、状態q1q_1からq2q_2へ文字0,1,2,30, 1, 2, 3で遷移できます。 しかし、遷移が各文字について、合計で4本の遷移が必要となります。

mermaid diagram
図1: DFAの例。同じ状態の組 (q1q_1q2q_2) の遷移が4本存在する

SFAでは同じ状態の組の遷移を1つの遷移で表せます。 今回の例では、0x30 \le x \le 3という1つの式をラベルに持つ遷移になっています。

mermaid diagram
図2: SFAの例。図1では4本だった同じ状態の組 (q1q_1q2q_2) の遷移が1つの条件式にまとめられている

このようにSFAを使うことで、複数の同じ状態の組の遷移を効率的に表現できます。 現実の有限状態オートマトンでも同じ状態の組の遷移はよくあることで1、有限状態オートマトンを応用する上で重要だと考えられるため、研究が進められています。

実効的Bool代数

SFAの定義に入る前に、遷移のラベルに用いられる条件式として利用する実効的Bool代数 (Effective Boolean Algebra)について確認します。

実効的 (effective) というのは表示関数_\llbracket \_ \rrbracketが効率的に計算できるということを意味しているのかと思います。

実効的Bool代数の例を2つ挙げます。

SFAの定義

ここからはSFAを定義していきます。

文字とはDA\mathcal{D}_\mathcal{A}の要素で、その有限列のことを文字列と呼び、文字列全体の集合をDA\mathcal{D}_\mathcal{A}^\astで、空文字列はε\varepsilonで表します。 遷移(q,ψ,p)Δ(q, \psi, p) \in \Deltaについて、qq遷移元pp遷移先ψ\psi遷移条件と呼びます。 遷移(q,ψ,p)Δ(q, \psi, p) \in \Deltaと文字aDAa \in \mathcal{D}_\mathcal{A}について、aψa \in \llbracket \psi \rrbracketのときqqからppaaにより遷移可能と言い、qapq \xrightarrow{a} pと表します。 この関係は文字列wDAw \in \mathcal{D}_\mathcal{A}^\astに対して推移的に拡張され、qwpq \xrightarrow{w} pと表します。 文字列wDAw \in \mathcal{D}_\mathcal{A}^\astについて、ww受理されるとはqinitwqq_\mathsf{init} \xrightarrow{w} qという初期状態からの遷移が存在しqFq \in Fであることを意味します。 SFA MM言語とはMMの受理する文字列全体からなる集合で、L(M)\mathcal{L}(M)と表します。

SFA MM決定的 (deterministic) であるとは、遷移元の同じ2つの遷移(q,ψ1,q1),(q,ψ2,q2)Δ(q, \psi_1, q_1), (q, \psi_2, q_2) \in \Deltaq1q2q_1 \ne q_2のときψ1ψ2=\llbracket \psi_1 \land \psi_2 \rrbracket = \emptysetであることを意味します。 SFA MM完全 (complete) であるとは、すべての状態qQq \in Q(q,ψ,p)Δψ=DA\llbracket \bigvee_{(q, \psi, p) \in \Delta} \psi \rrbracket = \mathcal{D}_\mathcal{A}であることを意味します。 この記事ではこれ以降、とくに言及がなければSFAは決定的かつ完全なものとします。 (実際、任意のSFAは決定的かつ完全なものに変換可能なことが知られています [D'Antoni & Veanes, 2017]。)

次の区間代数上のSFAは決定的かつ完全で、偶数個の負の数を含む文字列を受理します。

mermaid diagram
図3: 区間代数上の決定的かつ完全なSFAの例。偶数個の負の数を含む文字列を受理する

Λ\Lambda* [Drews & D'Antoni, 2017]

Λ\Lambda*は2017年にDrewsとD'Antoniによって提案されたSFAに対するオートマトン学習のアルゴリズムです。

Λ\Lambda*はAngluinのLL*をシンボリックに拡張したアルゴリズムで、observation tableは完全でなくても仮説のDFAを構築できることに注目して、まず、部分的 (partial) な仮説のDFAを構築します。 さらに、その遷移関数に現れる文字から完全なSFAになるような述語を生成するpartitioning関数を利用して、部分的なDFAを完全なSFAにすることでSFAを学習します。

observation table

はじめにΛ\Lambda*で用いるobservation tableを導入します。

observation table (S,R,E,f)(S, R, E, f)wSRw \in S \cup Rについて、観測表の行row(w)\mathsf{row}(w)row(w) ⁣:eEf(we)\mathsf{row}(w)\colon e \in E \mapsto f(w \cdot e)として定義します。

observation table (S,R,E,f)(S, R, E, f)は、

  • すべてのrRr \in Rについてrow(r)=row(s)\mathsf{row}(r) = \mathsf{row}(s)となるsSs \in Sが存在するとき、closedといいます。
  • すべてのs1,s2Ss_1, s_2 \in Sについてs1s2s_1 \ne s_2ならrow(s1)row(s2)\mathsf{row}(s_1) \ne \mathsf{row}(s_2)のとき、reducedといいます。
  • すべてのw1,w2SRw_1, w_2 \in S \cup Rについて、もしaDAa \in \mathcal{D}_\mathcal{A}w1a,w2aSRw_1 a, w_2 a \in S \cup Rrow(w1)=row(w2)\mathsf{row}(w_1) = \mathsf{row}(w_2)なら、row(w1a)=row(w2a)\mathsf{row}(w_1 a) = \mathsf{row}(w_2 a)であるとき、consistentといいます。
  • すべてのeEe \in EsSs \in SについてseSRs \cdot e \in S \cup Rのとき、evidence-closedという。
  • closed、reduced、consistent、evidence-closedであるとき、cohensiveといいます。

observtation tableがconhebsiveなとき、次のevidence automaton Mevid=(DA,Q,qinit,F,Δ)M_\mathsf{evid} = (\mathcal{D}_\mathcal{A}, Q, q_\mathsf{init}, F, \Delta)を構築できます。

  • 状態の集合はQ={qssS}Q = \{ q_s \mid s \in S \}です。
  • 初期状態はqinit=qεq_\mathsf{init} = q_\varepsilonで、受理状態の集合はF={qssSf(s)=1}F = \{ q_s \mid s \in S \land f(s) = 1 \}です。
  • wSRw \in S \cup RaDAa \in \mathcal{D}_\mathcal{A}についてwaSRw \cdot a \in S \cup Rであれば、遷移(qg(w),a,qg(wa))Δ(q_{g(w)}, a, q_{g(w \cdot a)}) \in \Deltaが存在します。 ここでg(w)g(w)row(w)=row(s)\mathsf{row}(w) = \mathsf{row}(s)となるsSs \in Sとします。 (closedかつreducedなのでそのような接頭辞がただ1つ存在します。)

このようにして構築されたMevidM_\mathsf{evid}は決定性有限状態オートマトンなのですが、完全 (complete) とは限らないことに注意してください。 ただし、observation tableのffに対して、次のような性質を満たします。

partitioning関数

次にevidence automatonを完全なSFAへと変換する鍵となるpartitioning関数を定義します。

各実効的Bool代数に対して、次のようなpartitioning関数が考えられます。

partitioning関数があれば、evidence automatonから完全なSFAを得るのはそれほど難しくありません。 evidence automaton Mevid=(DA,Q,qinit,F,Δ)M_\mathsf{evid} = (\mathcal{D}_\mathcal{A}, Q, q_\mathsf{init}, F, \Delta)と状態qQq \in Qについて、状態の集合に順序を付けてqiQq_i \in Qとしたとき、iq={a(q,a,qi)Δ}\ell^q_i = \{ a \mid (q, a, q_i) \in \Delta \}としてLDq=1q2qQqL^q_\mathcal{D} = \ell^q_1 \ell^q_2 \cdots \ell^q_{|Q|}LΨq=P(LDq)=ψ1qψ2qψQqL^q_\Psi = P(L^q_\mathcal{D}) = \psi^q_1 \psi^q_2 \cdots \psi^q_{|Q|}が考えられます。 そこで、SFAでの遷移をΔ={(q,ψiq,qi)qQi{1,,Q}}\Delta' = \{ (q, \psi^q_i, q_i) \mid q \in Q \land i \in \{ 1, \dots, |Q| \} \}とすると、仮説のSFA H=(DA,Q,qinit,F,Δ)H = (\mathcal{D}_\mathcal{A}, Q, q_\mathsf{init}, F, \Delta')が得られます。

この仮説のSFA HHに対しても、MevidM_\mathsf{evid}と同様に次のような性質が成り立ちます。

Λ\Lambda*アルゴリズム

ここからはΛ\Lambda*アルゴリズムの詳細を説明します。

まず学習の設定ですがAngluinのLL*と同じように、目的の言語Ltgt\mathcal{L}_\mathsf{tgt}に対して次のMQEQが与えられるものとします。

  • MQ(w)\text{\htmlClass{katex-ps-funcname}{MQ}}(w): membership querywLtgtw \in \mathcal{L}_\mathsf{tgt}なら11、そうでないなら00を返す。
  • EQ(H)\text{\htmlClass{katex-ps-funcname}{EQ}}(H): equivalence queryL(H)=Ltgt\mathcal{L}(H) = \mathcal{L}_\mathsf{tgt}なら11を、そうでないなら反例となる文字列wL(H)Ltgtw \in \mathcal{L}(H) \bigtriangleup \mathcal{L}_\mathsf{tgt}を返す。

アルゴリズムではまず、AngluinのLL*でobservation tableがconsistentかつclosedとなることを目指したように、Λ\Lambda*でもobservation tableがcohensiveとなることを目指します。 そのために、次の手続きを繰り返し適用します。

  • Close: row(s)=row(r)\mathsf{row}(s) = \mathsf{row}(r)となるsSs \in Sが存在しないrRr \in Rを探索し、見つかったらそのrrRRからSSに移動し、適当なaarbDAa_\mathsf{arb} \in \mathcal{D}_\mathcal{A}raarbr \cdot a_\mathsf{arb}RRに追加します。
  • EvidenceClose: seSRs \cdot e \notin S \cup RであるsSs \in SeEe \in Eの組を探索し、見つかったらses \cdot eやその接頭辞をRRに追加します。
  • MakeConsistent: row(w1)=row(w2)\mathsf{row}(w_1) = \mathsf{row}(w_2)w1a,w2aSRw_1 a, w_2 a \in S \cup Rだがrow(w1a)row(w2a)\mathsf{row}(w_1 a) \ne \mathsf{row}(w_2 a)であるw1,w2SRw_1, w_2 \in S \cup RaDAa \in \mathcal{D}_\mathcal{A}を探索し、見つかったら、f(w1ae)f(w2ae)f(w_1 \cdot a e) \ne f(w_2 \cdot a e)であるeEe \in Eaea eEEに追加します。
  • Distribute: MakeConsistentaea eEEに追加する前に、row(u1)=row(u2)\mathsf{row}(u_1) = \mathsf{row}(u_2)だがf(u1e)f(u2e)f(u_1 e) \ne f(u_2 e)である各u1,u2SRu_1, u_2 \in S \cup Rで、({u2bu1bSR}{u1bu2bSR})S(\{ u_2 b \mid u_1 b \in S \cup R \} \cup \{ u_1 b \mid u_2 b \in S \cup R \}) \setminus SRRに追加します。

このうちDistributeはobservation tableに既にある情報を上手く利用することで実際の計算を効率的に行うための工夫で、学習の正しさには直接関係ないのではないかと思います3

そして、cohensiveなobservation tableから仮説のSFA HHを構築してEQに渡します。 反例の文字列が返ってきた場合は、SSに含まれない反例の接頭辞をすべてRRに追加します。 EQが反例の文字列を返す理由は2つありますが、次のようにどちらの場合もこの方法で正しく処理されることが分かります。

  1. 正しい状態が存在しなかった場合: MakeConsistentにより新しい接尾辞がEEに追加されCloseで新しい状態が見つかる。
  2. 遷移の述語が間違っていた場合: SRS \cup Rに反例の接頭辞が含まれているはずなので、次のP(LDq)P(L^q_\mathcal{D})の計算の際に新しい文字がLDqL^q_\mathcal{D}のいずれかの要素に追加される。

アルゴリズムの全体は次のようになります。 各手続きの疑似コードは (面倒なので) 省略します。 書いてある通りにするだけです。

Algorithm 1 Λ\Lambda* algorithm

function Λ*(MQ, EQ, aarb,Pa_\mathsf{arb}, P)

S{ε},R{aarb},E{ε}S \gets \{ \varepsilon \}, R \gets \{ a_\mathsf{arb} \}, E \gets \{ \varepsilon \}, and ff \gets \emptyset

f(ε)f(\varepsilon) \gets MQ(ε\varepsilon) and f(aarb)f(a_\mathsf{arb}) \gets MQ(aarba_\mathsf{arb})

repeat

if (S,R,E,f)(S, R, E, f) is not closed then

Close()

continue\mathbf{continue}

end if

if (S,R,E,f)(S, R, E, f) is not evidence-closed then

EvidenceClose()

continue\mathbf{continue}

end if

if (S,R,E,f)(S, R, E, f) is not consistent then

MakeConsistent() and Distribute()

continue\mathbf{continue}

end if

Let MevidM_\mathsf{evid} be the evidence automaton constructed from (S,R,E,f)(S, R, E, f)

Let HH be the hypothesis SFA constructed from MevidM_\mathsf{evid} and PP

if w=w = EQ(HH) is a counterexample then

RR({ww is a prefix of w}S)R \gets R \cup (\{ w' \mid w'\text{ is a prefix of }w \} \setminus S)

end if

until EQ(HH) = 1

return HH

end function

Scalaによる実装も示します。 基本的には説明した通りです。

Scalaによる実装

Gist: https://gist.github.com/makenowjust/bfca1fc504e4780a06f4fb3ab2d710ca

// This is an implementation of the Λ* algorithm in Scala 3.
//
// The Λ* algorithm is a learning algorithm for symbolic automata, proposed by
// Samuel Drews and Loris D'Antoni (2017), "Learning Symbolic Automata"
// https://doi.org/10.1007/978-3-662-54577-5_10.
 
import scala.collection.mutable
 
/** `BoolAlg` represents an effective Boolean algebra on the domain `D`.
  *
  * `P` is a type of predicates on the domain `D`.
  */
trait BoolAlg[D, P]:
 
  /** Returns the predicate that is always true. */
  def `true`: P
 
  /** Returns the predicate that is always false. */
  def `false`: P
 
  /** Returns the predicate: p ∧ q. */
  def and(p: P, q: P): P
 
  /** Returns the predicate: p ∨ q. */
  def or(p: P, q: P): P
 
  /** Returns the predicate: ¬p. */
  def not(p: P): P
 
  /** Checks if the denotation of `p` contains `d`. */
  def contains(p: P, d: D): Boolean
 
  /** Computes the partitioning function to `ds`.
    *
    * This returns the sequence of separating predicates of `ds`.
    */
  def partition(ds: Seq[Set[D]]): Seq[P]
 
/** `Pred` is a concrete representation of predicates on atomic proposition `A`.
  */
enum Pred[+A]:
  case True, False
  case Atom(a: A)
  case And(p: Pred[A], q: Pred[A])
  case Or(p: Pred[A], q: Pred[A])
  case Not(p: Pred[A])
 
  infix def and[AA >: A](that: Pred[AA]): Pred[AA] = (this, that) match
    case (True, q)               => q
    case (p, True)               => p
    case (False, _) | (_, False) => False
    case (p, q)                  => And(p, q)
 
  infix def or[AA >: A](that: Pred[AA]): Pred[AA] = (this, that) match
    case (True, _) | (_, True) => True
    case (p, False)            => p
    case (False, q)            => q
    case (p, q)                => Or(p, q)
 
  def not: Pred[A] = this match
    case True   => False
    case False  => True
    case Not(p) => p
    case p      => Not(p)
 
  /** Checks if the denotation of `p` contains `d`.
    *
    * `atom` is a function that checks if the atomic proposition contains a data value.
    */
  def contains[D](d: D)(atom: (A, D) => Boolean): Boolean = this match
    case True      => true
    case False     => false
    case Atom(a)   => atom(a, d)
    case And(p, q) => p.contains(d)(atom) && q.contains(d)(atom)
    case Or(p, q)  => p.contains(d)(atom) || q.contains(d)(atom)
    case Not(p)    => !p.contains(d)(atom)
 
object Pred:
 
  /** `EqualityAlgebra` is an instance of `BoolAlg` for the equality. */
  given EqualityAlgebra[A]: BoolAlg[A, Pred[A]] with
    def `true`: Pred[A] = Pred.True
    def `false`: Pred[A] = Pred.False
    def and(p: Pred[A], q: Pred[A]): Pred[A] = Pred.And(p, q)
    def or(p: Pred[A], q: Pred[A]): Pred[A] = Pred.Or(p, q)
    def not(p: Pred[A]): Pred[A] = Pred.Not(p)
    def contains(p: Pred[A], d: A): Boolean = p.contains(d)(_ == _)
    def partition(dss: Seq[Set[A]]): Seq[Pred[A]] =
      val maxIndex = dss.zipWithIndex.maxBy(_._1.size)._2
      val largePred = dss.iterator.zipWithIndex
        .filter(_._2 != maxIndex)
        .map(_._1)
        .flatten
        .map(Pred.Atom(_))
        .foldLeft(Pred.False)(_ or _)
      dss.zipWithIndex.map:
        case (ds, i) if i == maxIndex => largePred.not
        case (ds, i) =>
          ds.iterator.map(Pred.Atom(_)).foldLeft(Pred.False)(_ or _)
 
/** `Dfa` represents a deterministic finite automaton.
  *
  * This is used for representing evidence automata (Def. 3).
  */
final case class Dfa[S, A](
    initialState: S,
    acceptStateSet: Set[S],
    transitionFunction: Map[S, Map[A, S]]
):
 
  /** Converts this evidence automaton to an SFA using the partitioning function.
    */
  def toSfa[P](using P: BoolAlg[A, P]): Sfa[S, P] =
    val transitionFunction = this.transitionFunction.map:
      case (state, edgeMap) =>
        val nextStateToChars =
          edgeMap.toSeq.groupBy(_._2).view.mapValues(_.map(_._1)).toSeq
        val preds = P.partition(nextStateToChars.map(_._2.toSet))
        val newEdgeMap = nextStateToChars
          .zip(preds)
          .map:
            case ((nextState, _), pred) => (pred, nextState)
          .toMap
        state -> newEdgeMap
 
    Sfa(initialState, acceptStateSet, transitionFunction.toMap)
 
/** `Sfa` represents a symbolic finite automaton (Def. 1).
  *
  * In this implementation, SFAs are assumed to be deterministic and finite.
  */
final case class Sfa[S, P](
    initialState: S,
    acceptStateSet: Set[S],
    transitionFunction: Map[S, Map[P, S]]
):
  def transition[A](state: S, char: A)(using P: BoolAlg[A, P]): Option[S] =
    val edgeMap = transitionFunction(state)
    edgeMap.find((p, _) => P.contains(p, char)).map(_._2)
 
/** `Alphabet` represents an alphabet of characters. */
trait Alphabet[A]:
 
  /** Returns the arbitrary character. */
  def arbChar: A
 
object Alphabet:
 
  /** Creates an instance of `Alphabet` with the given character. */
  def apply[A](char: A): Alphabet[A] = new Alphabet[A]:
    def arbChar = char
 
/** `Oracle` represents an oracle that provides membership and equivalence queries. */
trait Oracle[A]:
 
  /** Checks if the given word is in the target language. */
  def membershipQuery(word: Seq[A]): Boolean
 
  /** Checks if the given SFA is equivalent to the target language.
    *
    * This returns a counterexample if the given SFA is not equivalent to the target language.
    */
  def equivalenceQuery[P](sfa: Sfa[?, P])(using BoolAlg[A, P]): Option[Seq[A]]
 
object Oracle:
 
  /** Creates an instance of `Oracle` with the given function. */
  def fromFunction[A](
    finiteAlphabet: Set[A],
    minWordLength: Int = 10,
    maxWordLength: Int = 100,
    numWords: Int = 100,
    randomSeed: Long = 0L
  )(f: Seq[A] => Boolean): Oracle[A] =
    val alphabetIndexedSeq = finiteAlphabet.toIndexedSeq
 
    new Oracle[A]:
      def membershipQuery(word: Seq[A]): Boolean = f(word)
      def equivalenceQuery[P](sfa: Sfa[?, P])(using BoolAlg[A, P]): Option[Seq[A]] =
        val rand = util.Random(randomSeed)
        util.boundary:
          for i <- 0 until numWords do
            val size = rand.between(minWordLength, maxWordLength + 1)
            var word = Seq.empty[A]
            var state = sfa.initialState
            for j <- 0 until size do
              val char = alphabetIndexedSeq(rand.nextInt(alphabetIndexedSeq.size))
              word :+= char
              state = sfa.transition(state, char).get
              if sfa.acceptStateSet.contains(state) != f(word) then
                util.boundary.break(Some(word))
          None
 
/** `Prefix` is a prefix of a word. */
type Prefix[A] = Seq[A]
 
/** `Suffix` is a suffix of a word. */
type Suffix[A] = Seq[A]
 
/** `Sig` is a signature of a prefix.
  *
  * A signature is a sequence of booleans; for a prefix `s`, `sig(i)` is true
  * if `s ++ suffices(i)` is in the target language, otherwise false.
  */
type Sig = Seq[Boolean]
 
/** `ObservationTable` represents an observation table (Def. 2).
  *
  * In this implementation, an observation table `(S, R, E, f)` is represented as
  * four values `prefixSet, boundarySet, suffices` and `rowMap`, where:
  *
  * - `prefixSet` is corresponding to `S`,
  * - `boundarySet` is corresponding to `R`,
  * - `suffices` is corresponding to `E`, and
  * - `rowMap` is corresponding to `f`.
  */
final case class ObservationTable[A](
    prefixSet: Set[Prefix[A]],
    boundarySet: Set[Prefix[A]],
    suffices: Seq[Suffix[A]],
    rowMap: Map[Prefix[A], Sig]
):
 
  /** Returns a row of the given prefix.
    *
    * A row value is the signature of the prefix, and it returns the pre-computed value.
    */
  private def row(prefix: Prefix[A]): Sig = rowMap(prefix)
 
  /** Computes the signature of the given prefix. */
  private def sig(prefix: Prefix[A])(using O: Oracle[A]): Sig =
    suffices.map(suffix => O.membershipQuery(prefix ++ suffix))
 
  /** Returns the set of prefixes and boundaries. */
  private def prefixAndBoundarySet: Set[Prefix[A]] =
    prefixSet ++ boundarySet
 
  /** Returns the set of extensions of the given prefix.
    *
    * An extension is a word that is obtained by appending a character to the prefix.
    */
  private def extensionSet(prefix: Prefix[A]): Set[Prefix[A]] =
    prefixSet.filter(b => prefix.size + 1 == b.size && b.startsWith(prefix)) ++
      boundarySet.filter(b => prefix.size + 1 == b.size && b.startsWith(prefix))
 
  /** Finds and returns an unclosed boundary if it exists. */
  def findUnclosedBoundary(): Option[Prefix[A]] =
    val candidates = boundarySet.filter: boundary =>
      !prefixSet.exists(row(_) == row(boundary))
    if candidates.nonEmpty then Some(candidates.minBy(_.size))
    else None
 
  /** Does the "close" operation (§3.3) with the given prefix. */
  def close(prefix: Prefix[A])(using A: Alphabet[A], O: Oracle[A]): ObservationTable[A] =
    val newPrefixSet = prefixSet + prefix
    val newBoundary = prefix :+ A.arbChar
    val newBoundarySet = boundarySet - prefix + newBoundary
    val newRowMap =
      if rowMap.contains(newBoundary) then rowMap
      else rowMap + (newBoundary -> sig(newBoundary))
    ObservationTable(newPrefixSet, newBoundarySet, suffices, newRowMap)
 
  /** Finds and return a sequence of evidence-unclosed words if it exists. */
  def findEvidenceUnclosedWord(): Seq[Seq[A]] =
    val prefixAndBoundarySet = this.prefixAndBoundarySet
    prefixSet.iterator
      .flatMap: prefix =>
        suffices.iterator
          .map(prefix ++ _)
          .filterNot(prefixAndBoundarySet)
      .toSeq
 
  /** Does the "evidence-close" operation (§3.3) with the given word. */
  def evidenceClose(word: Seq[A])(using O: Oracle[A]): ObservationTable[A] =
    if prefixSet.contains(word) then this
    else
      var newBoundarySet = boundarySet
      var newRowMap = rowMap
      for newBoundary <- word.inits; if !boundarySet.contains(newBoundary) do
        newBoundarySet += newBoundary
        newRowMap += newBoundary -> sig(newBoundary)
      ObservationTable(prefixSet, newBoundarySet, suffices, newRowMap)
 
  /** Finds and returns an inconsistent suffix if it exists. */
  def findInconsistentSuffix(): Option[Suffix[A]] =
    val prefixAndBoundarySet = this.prefixAndBoundarySet
    val triple = (
      for
        prefix1 <- prefixAndBoundarySet.iterator
        ext1 <- extensionSet(prefix1).iterator
        char = ext1.last
        prefix2 <- prefixAndBoundarySet.iterator
        ext2 = prefix2 :+ char
        if prefixAndBoundarySet.contains(ext2)
        if row(prefix1) == row(prefix2)
        if row(ext1) != row(ext2)
      yield (prefix1, prefix2, char)
    ).nextOption()
    triple.map: (prefix1, prefix2, char) =>
      val ext1 = prefix1 :+ char
      val ext2 = prefix2 :+ char
      val index = row(ext1)
        .zip(row(ext2))
        .zipWithIndex
        .find:
          case ((b1, b2), i) => b1 != b2
        .map(_._2)
        .get
      char +: suffices(index)
 
  /** Does the "make-consistent" and "distribute" operations (§3.3) with the given suffix. */
  def makeConsistentAndDistribute(newSuffix: Suffix[A])(using O: Oracle[A]): ObservationTable[A] =
    import O.{membershipQuery => MQ}
 
    val prefixAndBoundarySet = this.prefixAndBoundarySet
    var boundarySetToAdd = Set.empty[Prefix[A]]
 
    // First, the learner do "distribute" operation.
    for
      prefix1 <- prefixAndBoundarySet
      prefix2 <- prefixAndBoundarySet
      if row(prefix1) == row(prefix2)
      if MQ(prefix1 ++ newSuffix) != MQ(prefix2 ++ newSuffix)
    do
      for
        char <- extensionSet(prefix1).map(_.last)
        if !prefixSet.contains(prefix2 :+ char)
      do boundarySetToAdd += prefix2 :+ char
      for
        char <- extensionSet(prefix2).map(_.last)
        if !prefixSet.contains(prefix1 :+ char)
      do boundarySetToAdd += prefix1 :+ char
 
    var newRowMap = rowMap
    val newBoundarySet = boundarySet ++ boundarySetToAdd
    for newBoundary <- boundarySetToAdd; if !rowMap.contains(newBoundary) do
      newRowMap += newBoundary -> sig(newBoundary)
 
    // Next, the learner do "make-consistent" operation.
    val newSuffices = suffices :+ newSuffix
    for prefix <- newRowMap.keys do newRowMap += prefix -> (newRowMap(prefix) :+ MQ(prefix ++ newSuffix))
 
    ObservationTable(prefixSet, newBoundarySet, newSuffices, newRowMap)
 
  /** Processes the given counterexample. */
  def process(cex: Seq[A])(using O: Oracle[A]): ObservationTable[A] =
    val prefixAndBoundarySet = this.prefixAndBoundarySet
    var newBoundarySet = boundarySet
    var newRowMap = rowMap
    for boundary <- cex.inits; if !prefixAndBoundarySet.contains(boundary) do
      newBoundarySet += boundary
      newRowMap += boundary -> sig(boundary)
    ObservationTable(prefixSet, newBoundarySet, suffices, newRowMap)
 
  /** Builds an evidence automaton from the observation table (§3.1). */
  def buildEvidence(): Dfa[Prefix[A], A] =
    val prefixAndBoundarySet = this.prefixAndBoundarySet
    val rowToPrefix = rowMap.view.filterKeys(prefixSet).map(_.swap).toMap
    val transitionFunction = mutable.Map.empty[Prefix[A], Map[A, Prefix[A]]]
    for
      prefix0 <- prefixAndBoundarySet
      ext0 <- extensionSet(prefix0)
    do
      val char = ext0.last
      val prefix = rowToPrefix(row(prefix0))
      val boundary = rowToPrefix(row(ext0))
      if !transitionFunction.contains(prefix) then
        transitionFunction(prefix) = Map.empty
      transitionFunction(prefix) += char -> boundary
    val initialState = rowToPrefix(rowMap(Seq.empty))
    val acceptSet = prefixSet.map(row).filter(_.head).map(rowToPrefix)
    Dfa(initialState, acceptSet, transitionFunction.toMap)
 
object ObservationTable:
 
  /** Creates an empty observation table. */
  def empty[A](using A: Alphabet[A], O: Oracle[A]): ObservationTable[A] =
    import O.{membershipQuery => MQ}
    val prefixSet = Set(Seq.empty[A])
    val boundarySet = Set(Seq(A.arbChar))
    val suffices = Seq(Seq.empty)
    val rowMap = Map(
      Seq.empty -> Seq(MQ(Seq.empty)),
      Seq(A.arbChar) -> Seq(MQ(Seq(A.arbChar)))
    )
    ObservationTable(prefixSet, boundarySet, suffices, rowMap)
 
/** `Learner` provides an implementation of the Λ* algorithm. */
object Learner:
 
  /** Infers an SFA from the given alphabet and oracle using the Λ* algorithm. */
  def learn[A, P](
    alphabet: Alphabet[A],
    oracle: Oracle[A],
  )(using P: BoolAlg[A, P]): Sfa[Prefix[A], P] =
    import oracle.{equivalenceQuery => EQ}
    given Alphabet[A] = alphabet
    given Oracle[A] = oracle
 
    var obs = ObservationTable.empty
    var result = Option.empty[Sfa[Prefix[A], P]]
 
    while result.isEmpty do
      util.boundary:
        println(s"obs = $obs")
 
        for boundary <- obs.findUnclosedBoundary() do
          println(s"close($boundary)")
          obs = obs.close(boundary)
          util.boundary.break()
 
        val words = obs.findEvidenceUnclosedWord()
        if words.nonEmpty then
          for word <- words do
            println(s"evidenceClose($word)")
            obs = obs.evidenceClose(word)
          util.boundary.break()
 
        for suffix <- obs.findInconsistentSuffix() do
          println(s"makeConsistentAndDistribute($suffix)")
          obs = obs.makeConsistentAndDistribute(suffix)
          util.boundary.break()
 
        val dfa = obs.buildEvidence()
        val sfa = dfa.toSfa
 
        println(s"EQ($sfa)")
        EQ(sfa) match
          case Some(cex) =>
            println(s"process($cex)")
            obs = obs.process(cex)
          case None      =>
            result = Some(sfa)
 
    result.get
 
val alphabet = Alphabet(0)
val oracle = Oracle.fromFunction(Set(0, 1, 2, 3)): word =>
  word.count(_ == 0) % 3 == 0 && word.count(_ == 1) % 2 == 0
 
val sfa = Learner.learn(alphabet, oracle)(using Pred.EqualityAlgebra[Int])
println(sfa)

直感的にはΛ\Lambda*は、LL*でアルファベットのすべての文字に対して遷移を求めていたものを、最初の1文字以外は反例やこれまでの観察に基づいて遷移を求めるようにしたものだと考えられます。 そのため、LL*と同様に次のことが言えます。

Λ\Lambda*の計算量

Λ\Lambda*の計算量は、partitioning関数やEQの返す反例の質に依存しています。 反例の質とは何でしょうか?

例えば、図3のSFA (負の数が偶数個含む文字列を受理する) を学習することを考えてみてください。 の数という述語が上手く学習できず、(,1000](-\infty, 1000]のような間違った区間で学習していたとします。

mermaid diagram
図4: 図3のSFAを学習している途中の仮説のSFA。赤文字の遷移の条件が間違っている

ここで、返ってきた反例が1,0\langle -1, 0 \rangleであれば、正しく述語を修正できます。 しかし反例が1,1000\langle -1, 1000 \rangleであれば、述語を(,999](-\infty, 999]としか修正できず、学習があまり進みません。 さらに、このあとに返ってくる反例が1,999\langle -1, 999 \rangle1,998\langle -1, 998 \rangle、…と2番目の値が減っていくのであれば、正しく学習するために1000回以上のEQを呼び出さなければいけません。

このように、Λ\Lambda*では反例の質やpartitioning関数の特徴についても考えなければ計算量について議論できません。 ですが、そのままでは議論が難しいので、まず sgs_g-learnability について考えます。 ただし、ここでは簡潔な説明のみにします。 詳細な説明は論文[Drews & D'Antoni, 2017]を参照してください。

適当な実効的Bool代数A\mathcal{A}とその上のpartitioning関数PPを考えます。 CCDA\mathcal{D}_\mathcal{A}を分割する述語のリストの集合 (つまりPPの返り値の集合) とします。 GGPPへの入力LLとその返り値P(L)P(L)、目的の分割ctgtCc_\mathsf{tgt} \in Cの3つを受け取って、より詳細なPPへの入力LL'を返す関数 (ジェネレータ) の集合とします。

ジェネレータgGg \in Gは目的の分割ctgtc_\mathsf{tgt}とpartitioning関数PPに対して、L0=L_0 = \langle \emptyset \rangleとおいてLi+1=g(Li,P(Li),ctgt)L_{i+1} = g(L_i, P(L_i), c_\mathsf{tgt})と繰り返し呼び出すことで、最終的にP(Li)=ctgtP(L_i) = c_\mathsf{tgt}となるようなLiL_iが求められる関数と考えられます。

つまり、ジェネレータはpartitioning関数の特徴や反例の質により、述語の学習がどのように進むかを抽象化しています。 例えば図4の場合、q1q_1でのPPへの入力はL1={1},{1001}L_1 = \langle \{ -1 \}, \{ 1001 \} \rangleのようになっていて、ここで反例として1,0\langle -1, 0\rangleが返ってきたことはg1(L1,P(L1),ctgt)={1},{0,1001}g_1(L_1, P(L_1), c_\mathsf{tgt}) = \langle \{ -1 \}, \{ 0, 1001 \} \rangleが、反例として1,1000\langle -1, 1000 \rangleが返ってきたことはg2(L1,P(L1),ctgt)={1},{1000,1001}g_2(L_1, P(L_1), c_\mathsf{tgt}) = \langle \{ -1 \}, \{ 1000, 1001 \} \rangleが相当します。

それではsgs_g-learnabilityを定義します。

ggを呼び出す毎に入力のリストは大きくなっていくので、sgs_gの定義は「高々sg(c)s_g(c)ggを呼び出すことで目的の分割が得られれる」とも言い換えられます。

論文ではこのあとsgs_g-learnableな実効的Bool代数のクラスを分類したり、2つのsgs_g-learnableな実行的Bool代数とpartitioning関数から、その直積や直和に対応するpartitioning関数やsgs_gが計算できる (つまり直積や直和もsgs_g-learnableである) ことを示しています。 これらも興味深い話題です。 詳細は論文は参照してください。

論文ではsgs_gを使って、次のような定理を述べています。 ただし赤文字の下線はこの記事の筆者によるもので、後で説明するように、筆者はこの定理に疑問的です。

Λ\Lambda*の疑問点

疑問1: Distributeは必要なのか

上で説明した通り、手続きDistributeはアルゴリズムの正しさを保証するために必要なものではないと考えています。 実装で実験した限りでもDistributeの有無で学習が停止しなくなる、といったことはありませんでした。

アルゴリズムでは、Closeでobservation tableがclosedになり、EvidenceCloseでevidence-closedになり、MakeConsistentでconsistentになります。 reducedかどうかはobservation tableの更新の仕方で保証されているので、これらの手続きのみでアルゴリズムは正しく動作するはずです。

計算量的にも全ての状態の組をループする必要があり、そこまで効率的なわけではないので、この処理を省いた方が効率的なこともあるかもしれません。 ただ、既知の観察を上手く使えなくなってしまうので、equivalence queryの回数などに微妙な影響があるのではないかと思います。

疑問2: equivalence queryの上界

定理 (SFAの学習可能性) ではn2ΣqiQsgi(ci)n^2 \Sigma_{q_i \in Q} s_{g_i}(c_i)で抑えれらると述べています。 上界なので間違ってはいないと思うのですが、過剰に多く見積っているように思えます。 とくにn2n^2というのがどこから出てきたのかよく分かりません。

Λ\Lambda*ではequivalence queryは、次の2つの目的で利用されます。

  1. 新しい状態を発見する。
  2. 遷移の述語をより詳細にする。

このうち1の目的で利用されるのは高々n=Qn = |Q|回です。 そして2の目的で利用されるのが高々ΣqiQsgi(ci)\Sigma_{q_i \in Q} s_{g_i}(c_i)回のはずです。 よってequivalence queryの回数は高々Q+ΣqiQsgi(ci)|Q| + \Sigma_{q_i \in Q} s_{g_i}(c_i)回というのが妥当な気がします。

さらにobservation tableで各状態は1文字は遷移が存在する (observation tableの定義の3の条件) ので、PPに渡されるリストも空ではなく1文字は存在する状態から学習が始まります。 よって、実際には2の目的で利用されるのは高々ΣqiQ(sgi(ci)1)=ΣqiQsgi(ci)Q\Sigma_{q_i \in Q} (s_{g_i}(c_i) - 1) = \Sigma_{q_i \in Q} s_{g_i}(c_i) - |Q|となり、equivalence queryの回数は高々ΣqiQsgi(ci)\Sigma_{q_i \in Q} s_{g_i}(c_i)回と考えられそうです。

それではn2n^2というのはどこから出てきたのでしょうか? 仮説の1つとして、membership queryの回数と間違えたのではないかと個人的に予想しています。

nn個の状態を区別するためには、接尾辞の集合EEnn程度の大きさになる可能性があります。 よって、DFAの時点で、nn状態のDFAを学習するためにO(n2)O(n^2)回のmembership queryが必要となります。 SFAの場合はそれに加えて述語を学習するためにO(ΣqiQsgi(ci))O(\Sigma_{q_i \in Q} s_{g_i}(c_i))個の接頭辞がRRに追加されているため、O((n+ΣqiQsgi(ci))n)O((n + \Sigma_{q_i \in Q} s_{g_i}(c_i)) n)回のmembership queryが必要となりそうです。

とはいえ、これは反例の文字列の長さについての考慮などが抜けていて、あまり良い見積りではないように感じます。

MAT\mathrm{MAT}* [Argyros & D'Antoni, 2018]

MAT\mathrm{MAT}*5は2018年にArgyrosとD'Antoniによって提案されたSFAに対するオートマトン学習のアルゴリズムです。

MAT\mathrm{MAT}*ではKearns-Vazirani (KV) のアルゴリズムで利用されるclassification treeを使います。 classification treeを構築したあとに、各遷移の述語がSFA全体で完全・決定的となるように学習していきます。 classification treeはobservation treeと異なり遷移先の状態が必ず求められるので、どのような文字である状態からある状態に遷移するかは分からなくても、ある文字である状態からある状態に遷移するかは分かります。 つまり、MAT\mathrm{MAT}*では各遷移に対して、

  • ある文字である状態からある状態に遷移するか→membership query
  • その述語がSFA全体で完全・決定的か→equivalence query

という通常のオートマトン学習でのMAT (Minimally Adequate Teacher) に相当する情報を使って、述語を学習していきます。 これがMAT\mathrm{MAT}*と呼ばれる所以です。

classification tree

まずclassification treeの定義を確認します。

classification treeにはどのノードからも遷移が存在しない根ノードvrootVv_\mathsf{root} \in Vがただ1つ存在します。 木の辺(v,vx,x)E(v, v_x, x) \in Evxv_xの下にある葉ノードlLl \in Lについて、学習対象の言語のmembership query MQについて、MQ(lv)=x\text{\htmlClass{katex-ps-funcname}{MQ}}(l \cdot v) = xとなることを意図しています。

classification tree T=(V,L,E)T = (V, L, E)に対して、木を辿って対応する葉ノードを取得する関数siftT\mathsf{sift}_Tを定義します。

siftT(v,w)={vif vLsiftT(vMQ(wv),w)if vL\mathsf{sift}_T(v, w) = \begin{cases} v & \text{if }v \in L \\ \mathsf{sift}_T(v_{\text{\htmlClass{katex-ps-funcname}{MQ}}(w \cdot v)}, w) & \text{if }v \notin L \end{cases}

ここで、vVv \in VwΣw \in \Sigma^\ast(v,v0,0),(v,v1,1)E(v, v_0, 0), (v, v_1, 1) \in Eです。 さらに、siftT(w)=siftT(vroot,w)\mathsf{sift}_T(w) = \mathsf{sift}_T(v_\mathsf{root}, w)とします。

classification tree TTの葉ノードloldLl_\mathsf{old} \in Lを新しいノードvvと葉ノードlnewl_\mathsf{new}で置き換えた木をsplitT(lold,lnew,v)\mathsf{split}_T(l_\mathsf{old}, l_\mathsf{new}, v)とします (図5)。

mermaid diagram
図5: 左のclassification tree T1T_1と、そのloldl_\mathsf{old}vvlnewl_\mathsf{new}で置き換えたT2=splitT(lold,lnew,v)T_2 = \mathsf{split}_T(l_\mathsf{old}, l_\mathsf{new}, v)です。赤いノードは更新する、更新された箇所を表します

空文字列の根ノードのみからなるclassification treeをTε=({ε},{ε},)T_\varepsilon = (\{ \varepsilon \}, \{ \varepsilon \}, \emptyset)とします6。 このTεT_\varepsilonからはじめてsplitT\mathsf{split}_Tで更新していくと、常に葉ノードlLl \in LについてsiftT(l)=l\mathsf{sift}_T(l) = lとなります。

アルファベットΣfin\Sigma_\mathsf{fin}が有限の大きさであれば、classification tree T=(V,L,E)T = (V, L, E)に対して、状態の集合をQ=LQ = Lで遷移関数をΔ={(l,σ,siftT(lσ))lLσΣfin}\Delta = \{ (l, \sigma, \mathsf{sift}_T(l \cdot \sigma)) \mid l \in L \land \sigma \in \Sigma_\mathsf{fin} \}としてDFAが得られます。

一方で、SFAではアルファベットは有限の大きさとは限らず、これだけでは構築できません。 ただし、実質的な遷移関数であるsiftT\mathsf{sift}_Tが既に手に入っているので、ここから遷移の述語が構築できればSFAが得られることになります。

SFAの構築

実効的Bool代数A\mathcal{A}とclassification tree T=(V,L,E)T = (V, L, E)、2つの葉ノードl1,l2Ll_1, l_2 \in Lについて、Λl1,l2\Lambda^{l_1, l_2}l1l_1からl2l_2の遷移の述語のlearnerとします。 このlearnerには次のような手続きが与えられています。

  • Conject(Λl1,l2)\text{\htmlClass{katex-ps-funcname}{Conject}}(\Lambda^{l_1, l_2}): 学習中のl1l_1からl2l_2の遷移の述語ψΨ\psi \in \Psiを返す。
  • Update(Λl1,l2,a,MQTl1,l2)\text{\htmlClass{katex-ps-funcname}{Update}}(\Lambda^{l_1, l_2}, a, \text{\htmlClass{katex-ps-funcname}{MQ}}^{l_1,l_2}_T): learner Λl1,l2\Lambda^{l_1, l_2}を反例となる文字aDAa \in \mathcal{D}_\mathcal{A}で正しくなるように更新する (つまり更新後に(aConject(Λl1,l2))    (MQTl1,l2(a)=1)(a \in \llbracket \text{\htmlClass{katex-ps-funcname}{Conject}}(\Lambda^{l_1, l_2}) \rrbracket) \iff (\text{\htmlClass{katex-ps-funcname}{MQ}}^{l_1,l_2}_T(a) = 1)となるようにする)。

ここでaDAa \in \mathcal{D}_\mathcal{A}についてMQTl1,l2\text{\htmlClass{katex-ps-funcname}{MQ}}^{l_1,l_2}_Tは次のように定義されます。

MQTl1,l2(a)={1if siftT(l1a)=l20otherwise\text{\htmlClass{katex-ps-funcname}{MQ}}^{l_1,l_2}_T(a) = \begin{cases} 1 & \text{if }\mathsf{sift}_T(l_1 \cdot a) = l_2 \\ 0 & \text{otherwise} \end{cases}

例えば、等価代数に対するlearnerは次のように与えられます。

各葉ノードの組l1,l2Ll_1, l_2 \in Lに対してlearner Λl1,l2\Lambda^{l_1, l_2}を初期化して、各葉ノードlLl \in Lについて次の処理を繰り返します。

  • 完全化: ϕ=¬(lLConject(Λl,l))\phi = \lnot (\bigvee_{l \in L} \text{\htmlClass{katex-ps-funcname}{Conject}}(\Lambda^{l, l'}))とします。 ここでϕ=\llbracket \phi \rrbracket = \emptyであれば、構築されたSFAでllからの遷移はすべての文字について存在しますが、そうでなければ何かしら抜けている文字があることになります。 そのような文字aϕa \in \llbracket \phi \rrbracketがある場合、l=siftT(la)l' = \mathsf{sift}_T(l \cdot a)としてUpdate(Λl,l,a,MQTl,l)\text{\htmlClass{katex-ps-funcname}{Update}}(\Lambda^{l, l'}, a, \text{\htmlClass{katex-ps-funcname}{MQ}}^{l,l'}_T)を呼び出し、Λl,l\Lambda^{l, l'}を更新します。 この処理をaϕa \in \llbracket \phi \rrbracketが無くなるまで繰り返します。
  • 決定化: 異なる葉ノードl1,l2l_1, l_2について、ψ=Conject(Λl,l1)Conject(Λl,l2)\psi = \text{\htmlClass{katex-ps-funcname}{Conject}}(\Lambda^{l, l_1}) \land \text{\htmlClass{katex-ps-funcname}{Conject}}(\Lambda^{l, l_2})とします。 ここでψ=\llbracket \psi \rrbracket = \emptysetなら構築されたSFAでllからの遷移は決定的ですが、そうでなければある文字aψa \in \llbracket \psi \rrbracketの遷移先が少なくともl1l_1l2l_2の2つ存在することになります。 aψa \in \llbracket \psi \rrbracketがある場合、l{l1,l2}l' \in \{ l_1, l_2 \}について(aConject(Λl,l))    (MQTl,l(a)=1)(a \in \llbracket \text{\htmlClass{katex-ps-funcname}{Conject}}(\Lambda^{l, l'}) \rrbracket) \iff (\text{\htmlClass{katex-ps-funcname}{MQ}}^{l, l'}_T(a) = 1)ではない場合、Update(Λl,l,a,MQTl,l)\text{\htmlClass{katex-ps-funcname}{Update}}(\Lambda^{l, l'}, a, \text{\htmlClass{katex-ps-funcname}{MQ}}^{l, l'}_T)を呼び出し、Λl,l\Lambda^{l, l'}を更新します。 この処理をaψa \in \llbracket \psi \rrbracketが無くなるまで繰り返します。

この処理を行なったあとにΔ={(l1,Conject(Λl1,l2),l2)l1,l2L}\Delta = \{ (l_1, \text{\htmlClass{katex-ps-funcname}{Conject}}(\Lambda^{l_1, l_2}), l_2) \mid l_1, l_2 \in L \}として遷移関数を作ると、SFAは完全かつ決定的になります。 このSFAをMAT\mathrm{MAT}*の仮説のSFA HHと呼ぶことにします。

反例の処理

仮説のSFA HHが得られたら、それをequivalence query EQ(H)\text{\htmlClass{katex-ps-funcname}{EQ}}(H)に渡して目的の言語になっているか確認します。 ここで、反例の文字列wwが返ってきた場合にどうすればいいでしょうか?

MAT\mathrm{MAT}*ではまず[Isberner & Steffen, 2014]と同様に二分探索を利用して、反例の原因となる遷移を行なったインデックスi{1,,w}i \in \{ 1, \dots, |w| \}を特定します。 反例の原因となる遷移を行なったインデックスは、w[1,i1]w_{[1,i-1]}wwの先頭からi1i-1番目までの部分文字列、wiw_iii番目の文字、w[i+1,w]w_{[i+1,|w|]}i+1i+1番目から末尾までの部分文字列として、qinitw[1,i1]l1wil2q_\mathsf{init} \xrightarrow{w_{[1, i-1]}} l_1 \xrightarrow{w_i} l_2としたとき、MQ(l1wiw[i+1,w])MQ(l2w[i+1,w])\text{\htmlClass{katex-ps-funcname}{MQ}}(l_1 \cdot w_i \cdot w_{[i+1,|w|]}) \ne \text{\htmlClass{katex-ps-funcname}{MQ}}(l_2 \cdot w_{[i+1,|w|]})となるインデックスiiのこととします。

Keanrs-Vaziraniであれば、このようなインデックスiiが見つかったら、splitT(l2,l1wi,w[i+1,w])\mathsf{split}_T(l_2, l_1 \cdot w_i, w_{[i+1,|w|]})としてclassification treeを更新することで学習を進められました。 しかしSFAの学習の場合、これは新しい状態を発見することに相当するので、述語を詳細にするためには別の処理をしなければいけません。

まず、l3=siftT(l1wi)l_3 = \mathsf{sift}_T(l_1 \cdot w_i)としてwiConject(Λl1,l3)w_i \in \llbracket \text{\htmlClass{katex-ps-funcname}{Conject}}(\Lambda^{l_1, l_3}) \rrbracketを確認します。 そして、次の処理を行います。

  • wiConject(Λl1,l3)w_i \in \llbracket \text{\htmlClass{katex-ps-funcname}{Conject}}(\Lambda^{l_1, l_3}) \rrbracketなら、遷移は正しく行なわれていたので、新しい状態を追加するためにsplitT(l2,l1wi,w[i+1,w])\mathsf{split}_T(l_2, l_1 \cdot w_i, w_{[i+1,|w|]})でclassification treeを更新し、この影響を受けるlearnerを初期化し直す。
  • wiConject(Λl1,l3)w_i \notin \llbracket \text{\htmlClass{katex-ps-funcname}{Conject}}(\Lambda^{l_1, l_3}) \rrbracketなら、遷移の述語に不十分な点があるので、Λl1,l2\Lambda^{l_1,l_2}Λl1,l3\Lambda^{l_1,l_3}Updateで更新する。

こうすることで正しく学習を進められます。

MAT\mathrm{MAT}*アルゴリズム

最後に、ここまで説明してきたMAT\mathrm{MAT}*を疑似コードで示します。 引数のMQはmembership query、EQはequivalence queryで、Λ\Lambdaは実効的Bool代数A\mathcal{A}のlearnerの初期化や操作を与えるものです。

Algorithm 2 MAT\mathrm{MAT}* algorithm

function MAT*(MQ, EQ, Λ\Lambda)

TT \gets InitializeClassificationTree()

SΛS_\Lambda \gets InitializeGuardLearners(TT)

HH \gets GetSFAModel(T,SΛT, S_\Lambda)

while EQ(HH) 1\ne 1 do

ww \gets EQ(HH)

T,SΛT, S_\Lambda \gets ProcessCounterexample(T,SΛ,wT, S_\Lambda, w)

HH \gets GetSFAModel(T,SΛT, S_\Lambda)

end while

return HH

end function

アルゴリズム中に現れる手続きはそれぞれ、次のようなものです。 これらは、ここまでで解説してきた内容と対応しています。

  • InitializeClassificationTree()\text{\htmlClass{katex-ps-funcname}{InitializeClassificationTree}}(): 初期化されたclassification tree TεT_\varepsilonを返す。
  • InitializeGuardLearners(T)\text{\htmlClass{katex-ps-funcname}{InitializeGuardLearners}}(T): TTの各遷移に対してlearnerを初期化して返す。
  • GetSFAModel(T,SΛ)\text{\htmlClass{katex-ps-funcname}{GetSFAModel}}(T, S_\Lambda): TTの各遷移のlearnerを更新して、完全かつ決定的な仮説のSFAを返す。
  • ProcessCounterexample(T,SΛ,w)\text{\htmlClass{katex-ps-funcname}{ProcessCounterexample}}(T, S_\Lambda, w): 反例の文字列wwを処理して、classification treeと各遷移のlearnerを更新する。

このアルゴリズムをScalaで実装したものも示します。

Scalaによる実装

Gist: https://gist.github.com/makenowjust/53ed1b8e066952df9d4578d18d20097e

// This is an implementation of the MAT* algorithm in Scala 3.
//
// The MAT* algorithm is a learning algorithm for symbolic finite automata, proposed by
// George Argyros and Loris D'Antoni, "The Learnability of Symbolic Automata"
// https://doi.org/10.1007/978-3-319-96145-3_23.
 
import scala.annotation.tailrec
import scala.util.hashing.MurmurHash3
 
/** `BoolAlg` represents an effective Boolean algebra on the domain `D`.
  *
  * `P` is a type of predicates on the domain `D`.
  */
trait BoolAlg[D, P]:
 
  /** Returns the predicate that is always true. */
  def `true`: P
 
  /** Returns the predicate that is always false. */
  def `false`: P
 
  /** Returns the predicate: `p ∧ q`. */
  def and(p: P, q: P): P
 
  /** Returns the predicate: `p ∨ q`. */
  def or(p: P, q: P): P
 
  /** Returns the predicate: `¬p`. */
  def not(p: P): P
 
  /** Checks if the denotation of `p` contains `d`. */
  def contains(p: P, d: D): Boolean
 
  /** Returns a witness data of the predicate `p` if it exists. */
  def witness(p: P): Option[D]
 
/** `Interval` is a closed interval `[left, right]` of integers. */
final class Interval(val left: Int, val right: Int):
 
  /** Checks if the interval contains the given integer `n`. */
  def contains(n: Int): Boolean = left <= n && n <= right
 
  /** Checks if the interval overlaps with the given interval `that`. */
  def overlaps(that: Interval): Boolean =
    val l = math.max(left, that.left)
    val r = math.min(right, that.right)
    l <= r
 
  /** Checks if the interval is contiguous with the given interval `that`. */
  def contiguous(that: Interval): Boolean =
    val l = math.max(left, that.left)
    val r = math.min(right, that.right)
    l == r + 1
 
  /** Computes the union of the interval with the given interval `that` if it can be represented as a single interval.
    */
  def unionOf(that: Interval): Option[Interval] =
    Option.when(overlaps(that) || contiguous(that)):
      val l = math.min(left, that.left)
      val r = math.max(right, that.right)
      Interval(l, r)
 
  override def equals(that: Any): Boolean = that match
    case that: Interval => left == that.left && right == that.right
    case _              => false
 
  override def hashCode(): Int =
    var hash = "Interval".##
    hash = MurmurHash3.mix(hash, left.##)
    hash = MurmurHash3.mix(hash, right.##)
    MurmurHash3.finalizeHash(hash, 2)
 
  override def toString(): String = s"Interval($left, $right)"
 
object Interval:
 
  /** Returns a new interval `[left, right]`. If `left > right`, an exception is thrown. */
  def apply(left: Int, right: Int): Interval =
    require(left <= right, s"Invalid interval: $left > $right")
    new Interval(left, right)
 
/** `IntervalSet` is a set of intervals. */
final class IntervalSet(val intervals: IndexedSeq[Interval]):
 
  /** Check if the interval set is empty. */
  def isEmpty: Boolean = intervals.isEmpty
 
  /** Checks if the interval set contains the given integer `n`. */
  def contains(n: Int): Boolean =
    val index = intervals.search(Interval(n, n))(Ordering.by(_.right)).insertionPoint
    index < intervals.length && intervals(index).contains(n)
 
  /** Computes the union of the interval set with the given interval set `that`. */
  infix def union(that: IntervalSet): IntervalSet =
    IntervalSet.from(intervals ++ that.intervals)
 
  /** Computes the intersection of the interval set with the given interval set `that`. */
  infix def intersect(that: IntervalSet): IntervalSet =
    (complement union that.complement).complement
 
  /** Computes the complement of the interval set. */
  def complement: IntervalSet =
    if intervals.isEmpty then IntervalSet.universal
    else
      val complemented = IndexedSeq.newBuilder[Interval]
      if intervals.head.left != Int.MinValue then complemented += Interval(Int.MinValue, intervals.head.left - 1)
      for i <- 0 until intervals.length - 1 do
        val left = intervals(i).right
        val right = intervals(i + 1).left
        complemented += Interval(left + 1, right - 1)
      if intervals.last.right != Int.MaxValue then complemented += Interval(intervals.last.right + 1, Int.MaxValue)
      new IntervalSet(complemented.result())
 
  override def equals(that: Any): Boolean = that match
    case that: IntervalSet => intervals == that.intervals
    case _                 => false
 
  override def hashCode(): Int =
    val hash = "IntervalSet".##
    MurmurHash3.mixLast(hash, intervals.##)
 
  override def toString(): String = intervals.mkString("IntervalSet(", ", ", ")")
 
object IntervalSet:
 
  /** The empty interval set. */
  val empty: IntervalSet = new IntervalSet(IndexedSeq.empty)
 
  /** The universal interval set. */
  val universal: IntervalSet = new IntervalSet(IndexedSeq(Interval(Int.MinValue, Int.MaxValue)))
 
  /** Returns a new interval set from the given intervals. */
  def apply(intervals: Interval*): IntervalSet = from(intervals)
 
  /** Returns a new interval set from the given iterator of intervals. */
  def from(intervals: IterableOnce[Interval]): IntervalSet =
    val sorted = intervals.iterator.toSeq.sortBy(i => (i.left, i.right))
    if sorted.isEmpty then empty
    else
      val merged = IndexedSeq.newBuilder[Interval]
      var current = sorted.head
      for interval <- sorted.tail do
        current.unionOf(interval) match
          case Some(nextCurrent) => current = nextCurrent
          case None =>
            merged += current
            current = interval
      merged += current
      new IntervalSet(merged.result())
 
  given boolAlg: BoolAlg[Int, IntervalSet] with
    def `true`: IntervalSet = IntervalSet.universal
    def `false`: IntervalSet = IntervalSet.empty
    def and(p: IntervalSet, q: IntervalSet): IntervalSet = p intersect q
    def or(p: IntervalSet, q: IntervalSet): IntervalSet = p union q
    def not(p: IntervalSet): IntervalSet = p.complement
    def contains(p: IntervalSet, d: Int): Boolean = p.contains(d)
    def witness(p: IntervalSet): Option[Int] =
      Option.when(!p.isEmpty)(p.intervals.head.left)
 
/** `Membership` represents a membership query. */
trait Membership[A]:
 
  /** Checks if the given input is a member of the target language. */
  def apply(a: A): Boolean
 
/** `Learner` represents a Boolean algebra learner.
  *
  * This trait takes three type parameters:
  *
  *   - `L` is a type of learner instance.
  *   - `A` is a type of input data.
  *   - `H` is a type of hypothesis model.
  */
trait Learner[L, A, H]:
 
  /** Returns a new learner instance. */
  def create(using Membership[A]): L
 
  /** Returns a learner updated with the given cex (counterexample). */
  def update(learner: L, cex: A)(using Membership[A]): L
 
  /** Returns the hypothesis model conjected by the learner. */
  def conject(learner: L)(using Membership[A]): H
 
object Learner:
 
  /** Learns a hypothesis model from the given membership query and equivalence query. */
  def learn[L, A, H](mq: Membership[A], eq: (H) => Option[A])(using L: Learner[L, A, H]): H =
    given Membership[A] = mq
    var learner = L.create(using mq)
    var cex: Option[A] = eq(L.conject(learner))
    while cex.isDefined do
      learner = L.update(learner, cex.get)
      cex = eq(L.conject(learner))
    L.conject(learner)
 
/** `IntervalSetLearner` is a learner for the `IntervalSet` Boolean algebra. */
final case class IntervalSetLearner(posExampleSet: Set[Int], negExampleSet: Set[Int]):
 
  /** Returns a new learner updated with the given counterexample `cex`. */
  def update(cex: Int)(using mq: Membership[Int]): IntervalSetLearner =
    if mq(cex) then copy(posExampleSet = posExampleSet + cex)
    else copy(negExampleSet = negExampleSet + cex)
 
  /** Returns the hypothesis model conjected by the learner. */
  def conject(): IntervalSet =
    if posExampleSet.isEmpty then IntervalSet.empty
    else if negExampleSet.isEmpty then IntervalSet.universal
    else if posExampleSet.size < negExampleSet.size then
      posExampleSet.foldLeft(IntervalSet.empty)((l, r) => l union IntervalSet(Interval(r, r)))
    else negExampleSet.foldLeft(IntervalSet.empty)((l, r) => l union IntervalSet(Interval(r, r))).complement
 
object IntervalSetLearner:
 
  /** The empty interval set learner. */
  val empty: IntervalSetLearner = IntervalSetLearner(Set.empty, Set.empty)
 
  given learner: Learner[IntervalSetLearner, Int, IntervalSet] with
    def create(using Membership[Int]): IntervalSetLearner = IntervalSetLearner.empty
    def update(learner: IntervalSetLearner, cex: Int)(using mq: Membership[Int]): IntervalSetLearner =
      learner.update(cex)
    def conject(learner: IntervalSetLearner)(using Membership[Int]): IntervalSet =
      learner.conject()
 
/** `Sfa` represents a symbolic finite automaton.
  *
  * In this implementation, SFAs are assumed to be deterministic and finite.
  */
final case class Sfa[S, P](
    initialState: S,
    acceptStateSet: Set[S],
    transitionFunction: Map[S, Map[P, S]]
):
 
  /** Computes the next state from the given state and input data. */
  def transition[A](state: S, char: A)(using P: BoolAlg[A, P]): Option[S] =
    val edgeMap = transitionFunction(state)
    edgeMap.find((p, _) => P.contains(p, char)).map(_._2)
 
  /** Computes the next state from the given state and word. */
  def transitions[A](state: S, word: Seq[A])(using P: BoolAlg[A, P]): Option[S] =
    word.foldLeft(Option(state))((state, char) => state.flatMap(transition(_, char)))
 
/** `Prefix` is a prefix of a word. */
type Prefix[A] = Seq[A]
 
/** `Suffix` is a suffix of a word. */
type Suffix[A] = Seq[A]
 
/** `CTree` is a classification tree. */
enum CTree[A]:
  case Leaf(prefix: Prefix[A])
  case Node(suffix: Suffix[A], trueBranch: CTree[A], falseBranch: CTree[A])
 
  /** Returns the set of leaf nodes. */
  def leafSet: Set[Prefix[A]] = this match
    case Leaf(prefix) => Set(prefix)
    case Node(suffix, trueBranch, falseBranch) =>
      trueBranch.leafSet ++ falseBranch.leafSet
 
  /** Computes the leaf node that the given word belongs to. */
  @tailrec
  final def sift(word: Prefix[A])(using mq: Membership[Seq[A]]): Seq[A] = this match
    case Leaf(prefix) => prefix
    case Node(suffix, trueBranch, falseBranch) =>
      val branch =
        if mq(word ++ suffix) then trueBranch
        else falseBranch
      branch.sift(word)
 
  /** Returns a new classification tree by splitting the leaf node with given values. */
  final def split(oldLeaf: Prefix[A], newLeaf: Prefix[A], newSuffix: Suffix[A])(using
      mq: Membership[Seq[A]]
  ): CTree[A] =
    this match
      case Leaf(leaf) =>
        assert(leaf == oldLeaf, s"Invalid prefix: $leaf != $oldLeaf")
        if mq(oldLeaf ++ newSuffix) then Node(newSuffix, Leaf(oldLeaf), Leaf(newLeaf))
        else Node(newSuffix, Leaf(newLeaf), Leaf(oldLeaf))
      case Node(suffix, trueBranch, falseBranch) =>
        if mq(oldLeaf ++ suffix) then Node(suffix, trueBranch.split(oldLeaf, newLeaf, newSuffix), falseBranch)
        else Node(suffix, trueBranch, falseBranch.split(oldLeaf, newLeaf, newSuffix))
 
/** `SfaLearner` is a learner for symbolic finite automata. */
final case class SfaLearner[L, A](
    tree: CTree[A],
    acceptMap: Map[Prefix[A], Boolean],
    guardLearnerMap: Map[(Prefix[A], Prefix[A]), L]
):
 
  /** Returns a membership query for the learner of the given transition. */
  private def membership(leaf1: Prefix[A], leaf2: Prefix[A])(using Membership[Seq[A]]): Membership[A] =
    new Membership[A]:
      def apply(a: A): Boolean = tree.sift(leaf1 ++ Seq(a)) == leaf2
 
  /** Splits the classification tree and updates the learner. */
  private def split[P](oldLeaf: Prefix[A], newLeaf: Prefix[A], newSuffix: Suffix[A])(using
      mq: Membership[Seq[A]],
      L: Learner[L, A, P]
  ): SfaLearner[L, A] =
    println(s"split($oldLeaf, $newLeaf, $newSuffix)")
 
    val newTree = tree.split(oldLeaf, newLeaf, newSuffix)
    val newAcceptMap = acceptMap ++ Map(newLeaf -> mq(newLeaf))
    val newLeafPairs = newTree.leafSet.iterator.flatMap(leaf => Iterator((newLeaf, leaf), (leaf, newLeaf)))
    val oldLeafPairs = tree.leafSet.iterator.map(leaf => (leaf, oldLeaf))
    val newGuardLearnerMap = guardLearnerMap ++ (newLeafPairs ++ oldLeafPairs).map((leaf1, leaf2) =>
      (leaf1, leaf2) -> L.create(using membership(leaf1, leaf2))
    )
    SfaLearner(newTree, newAcceptMap, newGuardLearnerMap)
 
  /** Make the guards complete for the given source leaf. */
  private def makeGuardsComplete[P](
      srcLeaf: Prefix[A]
  )(using mq: Membership[Seq[A]], L: Learner[L, A, P], P: BoolAlg[A, P]): (Boolean, SfaLearner[L, A]) =
    println(s"makeGuardsComplete($srcLeaf)")
 
    val leafSet = tree.leafSet
    var newGuardLearnerMap = guardLearnerMap
    var updated = false
    var continue = true
 
    while continue do
      val guards = leafSet.map: leaf =>
        L.conject(newGuardLearnerMap(srcLeaf, leaf))(using membership(srcLeaf, leaf))
      val completePred = P.not(guards.foldLeft(P.`false`)(P.or(_, _)))
      P.witness(completePred) match
        case None => continue = false
        case Some(cex) =>
          val tgtLeaf = tree.sift(srcLeaf ++ Seq(cex))
          val learner = newGuardLearnerMap((srcLeaf, tgtLeaf))
          val newLearner = L.update(learner, cex)(using membership(srcLeaf, tgtLeaf))
          newGuardLearnerMap += (srcLeaf, tgtLeaf) -> newLearner
          updated = true
 
    (updated, copy(guardLearnerMap = newGuardLearnerMap))
 
  /** Make the guards deterministic for the given source leaf. */
  private def makeGuardsDeterministic[P](
      srcLeaf: Prefix[A]
  )(using mq: Membership[Seq[A]], L: Learner[L, A, P], P: BoolAlg[A, P]): (Boolean, SfaLearner[L, A]) =
    println(s"makeGuardsDeterministic($srcLeaf)")
 
    val leafSet = tree.leafSet
    var newGuardLearnerMap = guardLearnerMap
    var updated = false
    var continue = true
 
    while continue do
      var updatedLocal = false
 
      for leaf1 <- leafSet; leaf2 <- leafSet; if leaf1 != leaf2 do
        val guard1 = L.conject(newGuardLearnerMap(srcLeaf, leaf1))(using membership(srcLeaf, leaf1))
        val guard2 = L.conject(newGuardLearnerMap(srcLeaf, leaf2))(using membership(srcLeaf, leaf2))
        val deterministicPred = P.and(guard1, guard2)
        P.witness(deterministicPred) match
          case None => ()
          case Some(cex) =>
            for (leaf, guard) <- Seq((leaf1, guard1), (leaf2, guard2)) do
              if membership(srcLeaf, leaf)(cex) != P.contains(guard, cex) then
                val learner = newGuardLearnerMap((srcLeaf, leaf))
                val newLearner = L.update(learner, cex)(using membership(srcLeaf, leaf))
                newGuardLearnerMap += (srcLeaf, leaf) -> newLearner
            updatedLocal = true
            updated = true
 
      if !updatedLocal then continue = false
 
    (updated, copy(guardLearnerMap = newGuardLearnerMap))
 
  /** Iterates `makeGuardsComplete` and `makeGuardsDeterministic` until the guards are complete and deterministic. */
  private def makeGuardsCompleteAndDeterministic[P](
      srcLeaf: Prefix[A]
  )(using Membership[Seq[A]], Learner[L, A, P], BoolAlg[A, P]): SfaLearner[L, A] =
    println(s"makeGuardsCompleteAndDeterministic($srcLeaf)")
 
    var learner = this
    var isComplete = false
    var isDeterministic = false
 
    while !isComplete || !isDeterministic do
      val result1 = learner.makeGuardsComplete(srcLeaf)
      learner = result1._2
      val result2 = learner.makeGuardsDeterministic(srcLeaf)
      learner = result2._2
 
      isComplete = !result1._1
      isDeterministic = !result2._1
 
    learner
 
  /** Returns the index of the breakpoint in the counterexample. */
  def analyzeCex[P](hypothesis: Sfa[Prefix[A], P], cex: Seq[A])(using mq: Membership[Seq[A]], P: BoolAlg[A, P]): Int =
    var expected = mq(cex)
    var low = 0
    var high = cex.length
 
    while high - low > 1 do
      val mid = (high - low) / 2 + low
      val state = hypothesis.transitions(hypothesis.initialState, cex.slice(0, mid)).get
      val word = state ++ cex.slice(mid, cex.length)
      val actual = mq(word)
      if actual == expected then low = mid
      else high = mid
 
    low
 
  /** Returns a new learner updated with the given counterexample `cex`. */
  def update[P](cex: Seq[A])(using mq: Membership[Seq[A]], L: Learner[L, A, P], P: BoolAlg[A, P]): SfaLearner[L, A] =
    println(s"update($cex)")
 
    val hypothesis = conject[P]()
    val breakpoint = analyzeCex(hypothesis, cex)
 
    val srcLeaf = hypothesis.transitions(hypothesis.initialState, cex.slice(0, breakpoint)).get
    val tgtWord = srcLeaf ++ Seq(cex(breakpoint))
    val tgtLeaf = tree.sift(tgtWord)
 
    val guard = L.conject(guardLearnerMap((srcLeaf, tgtLeaf)))(using membership(srcLeaf, tgtLeaf))
    if P.contains(guard, cex(breakpoint)) then
      val newSuffix = cex.slice(breakpoint + 1, cex.length)
      var newLearner = split(tgtLeaf, tgtWord, newSuffix)
      for leaf <- newLearner.tree.leafSet do newLearner = newLearner.makeGuardsCompleteAndDeterministic(leaf)
      newLearner
    else
      val newTgtLeafGuard =
        L.update(guardLearnerMap((srcLeaf, tgtLeaf)), cex(breakpoint))(using membership(srcLeaf, tgtLeaf))
      val tgtState = hypothesis.transition(srcLeaf, cex(breakpoint)).get
      val newTgtStateGuard =
        L.update(guardLearnerMap((srcLeaf, tgtState)), cex(breakpoint))(using membership(srcLeaf, tgtState))
      val newLearner = copy(guardLearnerMap =
        guardLearnerMap ++ Map((srcLeaf, tgtLeaf) -> newTgtLeafGuard, (srcLeaf, tgtState) -> newTgtStateGuard)
      )
      newLearner.makeGuardsCompleteAndDeterministic(srcLeaf)
 
  /** Returns the hypothesis SFA conjected by the learner. */
  def conject[P]()(using mq: Membership[Seq[A]], L: Learner[L, A, P]): Sfa[Prefix[A], P] =
    val initialState = Seq.empty
    val acceptStateSet = acceptMap.keySet.filter(acceptMap(_))
    val leafSet = tree.leafSet
    val transitionFunction = tree.leafSet.iterator.map: srcLeaf =>
      val guardMap = leafSet.iterator.map: tgtLeaf =>
        val guard = L.conject(guardLearnerMap(srcLeaf, tgtLeaf))(using membership(srcLeaf, tgtLeaf))
        guard -> tgtLeaf
      srcLeaf -> guardMap.toMap
    Sfa(initialState, acceptStateSet, transitionFunction.toMap)
 
object SfaLearner:
 
  /** Returns an empty SFA learner. */
  def empty[L, A, P](using mq: Membership[Seq[A]], L: Learner[L, A, P], P: BoolAlg[A, P]): SfaLearner[L, A] =
    SfaLearner(
      CTree.Leaf(Seq.empty),
      Map(Seq.empty -> mq(Seq.empty)),
      Map((Seq.empty, Seq.empty) -> L.create(using (_ => true)))
    ).makeGuardsCompleteAndDeterministic(Seq.empty)
 
  given learner[L, A, P](using L: Learner[L, A, P], P: BoolAlg[A, P]): Learner[SfaLearner[L, A], Seq[A], Sfa[Seq[A], P]]
  with
    def create(using Membership[Seq[A]]): SfaLearner[L, A] = SfaLearner.empty
    def update(learner: SfaLearner[L, A], cex: Seq[A])(using Membership[Seq[A]]): SfaLearner[L, A] =
      learner.update(cex)
    def conject(learner: SfaLearner[L, A])(using Membership[Seq[A]]): Sfa[Seq[A], P] =
      learner.conject()
 
  /** Creates an equivalence query from the given membership query and finite alphabet. */
  def equivalence[A, P](
      mq: Membership[Seq[A]],
      finiteAlphabet: Set[A],
      minWordLength: Int = 10,
      maxWordLength: Int = 100,
      numWords: Int = 100,
      randomSeed: Long = 0L
  )(using BoolAlg[A, P]): (Sfa[Prefix[A], P]) => Option[Seq[A]] =
    val alphabetIndexedSeq = finiteAlphabet.toIndexedSeq
 
    (sfa: Sfa[Prefix[A], P]) =>
      println(sfa)
      val rand = util.Random(randomSeed)
      util.boundary:
        for i <- 0 until numWords do
          val size = rand.between(minWordLength, maxWordLength + 1)
          var word = Seq.empty[A]
          var state = sfa.initialState
          for j <- 0 until size do
            val char = alphabetIndexedSeq(rand.nextInt(alphabetIndexedSeq.size))
            word :+= char
            state = sfa.transition(state, char).get
            if sfa.acceptStateSet.contains(state) != mq(word) then util.boundary.break(Some(word))
        None
 
val mq = new Membership[Seq[Int]]:
  def apply(word: Seq[Int]): Boolean =
    word.count(_ == 0) % 3 == 0 && word.count(_ == 1) % 2 == 0
val eq = SfaLearner.equivalence[Int, IntervalSet](mq, Set(0, 1, 2, 3))
val sfa = Learner.learn[SfaLearner[IntervalSetLearner, Int], Seq[Int], Sfa[Prefix[Int], IntervalSet]](mq, eq)
println(sfa)

次に、このアルゴリズムの計算量について考察します。

大きさnnの述語をΛ\Lambdaで与えられるlearnerで学習するとして、CmqΛ(n)\mathcal{C}^\Lambda_\mathrm{mq}(n)MQの呼び出し回数、CeqΛ(n)\mathcal{C}^\Lambda_\mathrm{eq}(n)EQの呼び出し回数とします。 さらに、SFA MMの最大の遷移の述語の大きさをB(M)\mathcal{B}(M)と表すことにします。 このとき、MAT\mathrm{MAT}*の計算量について、論文では次のように述べています。

また、ここまでの考察を整理するとSFAに対してもUpdateConjectが与えられるため、SFA自身を遷移の述語の実効的Bool代数にしたSFAも (いわゆる拡張SFA) もまたMAT\mathrm{MAT}*で学習できるということが分かります。 これはなかなか興味深いことなのではないかと思います。

Λ\Lambda*MAT\mathrm{MAT}*の比較

Λ\Lambda*MAT\mathrm{MAT}*はどちらも既存のオートマトン学習のアルゴリズムを元にして、SFAに拡張したものですが、いくつか違いがあります。 次の表でそれらをまとめました。

表1: Λ\Lambda*MAT\mathrm{MAT}*の比較
アルゴリズムデータ構造遷移のBool代数に求めるもの
Λ\Lambda*observation table (LL*)partitioning関数
MAT\mathrm{MAT}*classification tree (KV)learnerの手続き

個人的にはMAT\mathrm{MAT}*の方が、Bool代数に求めるものやアルゴリズムがより洗練された定義になっているように感じています7

あとがき

この記事ではSFAの学習アルゴリズムであるΛ\Lambda*MAT\mathrm{MAT}*について解説しました。

SFAは応用が分かりやすく興味深い対象なのではないかと思います。 BDDを使うことで、最小化や空性の判定など、様々な問題が効率的に解けることも知られています。 オートマトン学習に限らず、そういった方向で学んでみても面白いのではないでしょうか。

正直Λ\Lambda*の計算量の辺りの話は全く自信がありません。 間違っていたらごめんなさい。

また、こうしてSFAのオートマトン学習のアルゴリズムについて調べると、本当に重要なのはアルゴリズムそのものではなく、その上で学習可能な実効的Bool代数の述語のクラスの方が重要なのではないかという気がします。 最近の研究である[Fisman, Frenkel, & Zilles, 2023]では、この学習可能な実効的Bool代数について、より詳細に考察されているように見えます。

長い記事ですが、最後まで目を通していただきありがとうございました。

参考文献

[D'Antoni & Veanes, 2017]

D'Antoni, Loris, and Margus Veanes. "The power of symbolic automata and transducers." Computer Aided Verification: 29th International Conference, CAV 2017, Heidelberg, Germany, July 24-28, 2017, Proceedings, Part I 30. Springer International Publishing, 2017.

https://link.springer.com/chapter/10.1007/978-3-319-63387-9_3

[Drews & D'Antoni, 2017]

Drews, Samuel, and Loris D'Antoni. "Learning symbolic automata." International Conference on Tools and Algorithms for the Construction and Analysis of Systems. Berlin, Heidelberg: Springer Berlin Heidelberg, 2017.

https://link.springer.com/chapter/10.1007/978-3-662-54577-5_10

[Argyros & D'Antoni, 2018]

Argyros, George, and Loris D'Antoni. "The learnability of symbolic automata." Computer Aided Verification: 30th International Conference, CAV 2018, Held as Part of the Federated Logic Conference, FloC 2018, Oxford, UK, July 14-17, 2018, Proceedings, Part I 30. Springer International Publishing, 2018.

https://link.springer.com/chapter/10.1007/978-3-319-96145-3_23

[Isberner & Steffen, 2014]

Isberner, Malte, and Bernhard Steffen. "An abstract framework for counterexample analysis in active automata learning." International Conference on Grammatical Inference. PMLR, 2014.

https://proceedings.mlr.press/v34/isberner14a.pdf

[Fisman, Frenkel, & Zilles, 2023]

Fisman, Dana, Hadar Frenkel, and Sandra Zilles. "Inferring symbolic automata." Logical Methods in Computer Science 19 (2023).

https://lmcs.episciences.org/11224

脚注

  1. 例えば正規表現で\w[0-9]のような文字クラスを利用すると、その文字クラスに含まれる文字の分だけ同じ状態の組の遷移が現れることになります。

  2. 論文ではsaRs \cdot a \in Rですが、アルファベットが有限で遷移先がすべて異なる状態となるときに成り立たないのでsaSRs \cdot a \in S \cup Rが正しいはずです。

  3. 論文では"This operation distributes the old evidence leading out of the amalgamated state between the newly differentiated states, simplifying the constructions in Sect.4." (pp. 179) とあります。 しかし、4章は学習の学習可能性や学習可能な実効的Bool代数の直積・直和もまた学習可能であることの説明で、constructionらしいのは直積・直和の実効的Bool代数を構成している部分くらいしかなく、最後の部分が要領の得ない一文に感じます。 (個人的には3.4章 "Worked Example"の間違いなのではないかと思っています。)

  4. 論文では"PP needs as input a list of sets, provided by gg, with total size at most sg(c)s_g(c) to discover a target partition cCc \in C."とありますが、このtotal sizeというのが厳密に何を意味するか定かではありません。 PPへの入力は集合の列なので、単なる列の長さではなく、要素の集合の大きさの総和なのではないかと疑っているのですが、それだと空集合の列\langle \emptyset \rangleの大きさが00になってしまい、本当にそれでいいのか自信がありません。

  5. 論文では斜体のMATMAT*表記なのですが、あんまりな見た目なのでMAT\mathrm{MAT}*としています。

  6. 論文では根ノードと、それとは異なる葉ノードとしてε\varepsilonを追加して、片側が欠けた木として初期化していますが、なぜそんなことをしているのか意味不明なので、より分かりやすい形にしています。 このため、論文ではsiftT\mathsf{sift}_T\botを返す可能性があるものとなっています。

  7. 後発のアルゴリズムなので当然なのですが。