Λ*とMAT*によるSymbolic Finite Automataの学習
有限状態オートマトンの一種にSymbolic Finite Automata (SFA)と呼ばれるものがあります。 これは遷移が文字ではなく条件式でラベル付けされるオートマトンで、現実世界でSFAを使うことで効率的に表現できる例があることから、研究が進められています。
SFAに対するオートマトン学習アルゴリズムとして、とが知られています。 どちらも既存のオートマトン学習のアルゴリズムをベースとしてSFAに拡張したものですが、学習に用いるデータ構造の特徴を上手く利用した興味深いものになっています。
この記事ではまずSFAの定義を確認し、そのあとにとについて解説します。
想定読者: オートマトン学習について知識・関心がある方。
SFA
Symbolic Finite Automata (SFA)とは有限状態オートマトンの一種で、遷移が文字ではなく文字についての条件式でラベル付けされるものです。
例えば次のような有限状態オートマトン (DFA)を考えます。 このDFAでは、状態からへ文字で遷移できます。 しかし、遷移が各文字について、合計で4本の遷移が必要となります。
SFAでは同じ状態の組の遷移を1つの遷移で表せます。 今回の例では、という1つの式をラベルに持つ遷移になっています。
このようにSFAを使うことで、複数の同じ状態の組の遷移を効率的に表現できます。 現実の有限状態オートマトンでも同じ状態の組の遷移はよくあることで1、有限状態オートマトンを応用する上で重要だと考えられるため、研究が進められています。
実効的Bool代数
SFAの定義に入る前に、遷移のラベルに用いられる条件式として利用する実効的Bool代数 (Effective Boolean Algebra)について確認します。
定義 (実効的Bool代数): 実効的Bool代数 (Effective Boolean Algebra) はのことで、それぞれ次のような値です。
- は値の定義域です。
- は上のBool結合に閉じた述語からなる集合で、です。
- はの表示関数 (denotation function) で、次のようなものです。
- 、
- 、
- すべてのについて、、、となる。
実効的 (effective) というのは表示関数が効率的に計算できるということを意味しているのかと思います。
実効的Bool代数の例を2つ挙げます。
例 (等価性代数): 任意の集合に対して等価性代数 (equality algebra) とは、 () 形式のBool結合からなる集合を述語とする実効的Bool代数です。 等価性代数ではやのような述語が表現できます。
例 (区間代数): 整数の閉区間の有限の和集合全体からなる集合も、集合演算をBool代数の演算として実効的Bool代数を構成し、これを区間代数 (interval algebra) と呼びます。 否定について閉じているためにやのような端がやになっている区間も含みます。 区間代数ではや (正の整数) のような述語が表現できます。
SFAの定義
ここからはSFAを定義していきます。
定義 (SFA): Symbolic Finite Automaton (SFA) はで、それぞれ次のような値です。
- は実効的Bool代数です。
- は状態の有限集合で、は初期状態、は受理状態の集合です。
- は遷移関数です。
文字とはの要素で、その有限列のことを文字列と呼び、文字列全体の集合をで、空文字列はで表します。 遷移について、を遷移元、を遷移先、を遷移条件と呼びます。 遷移と文字について、のときからでにより遷移可能と言い、と表します。 この関係は文字列に対して推移的に拡張され、と表します。 文字列について、が受理されるとはという初期状態からの遷移が存在しであることを意味します。 SFA の言語とはの受理する文字列全体からなる集合で、と表します。
SFA が決定的 (deterministic) であるとは、遷移元の同じ2つの遷移でのときであることを意味します。 SFA が完全 (complete) であるとは、すべての状態でであることを意味します。 この記事ではこれ以降、とくに言及がなければSFAは決定的かつ完全なものとします。 (実際、任意のSFAは決定的かつ完全なものに変換可能なことが知られています [D'Antoni & Veanes, 2017]。)
次の区間代数上のSFAは決定的かつ完全で、偶数個の負の数を含む文字列を受理します。
[Drews & D'Antoni, 2017]
は2017年にDrewsとD'Antoniによって提案されたSFAに対するオートマトン学習のアルゴリズムです。
はAngluinのをシンボリックに拡張したアルゴリズムで、observation tableは完全でなくても仮説のDFAを構築できることに注目して、まず、部分的 (partial) な仮説のDFAを構築します。 さらに、その遷移関数に現れる文字から完全なSFAになるような述語を生成するpartitioning関数を利用して、部分的なDFAを完全なSFAにすることでSFAを学習します。
observation table
はじめにで用いるobservation tableを導入します。
定義 (observation table): SFA に対するobservation tableとはのことで、それぞれ次のような値です。
- は接頭辞 (prefix) の集合で、は境界 (boundary) の集合です。
- は接尾辞 (suffix) の集合です。
- は観測結果の記録で、文字列についてなら、ならとなるように更新されます。
さらに、observation tableは次の規則を満たします。
- とは互いに素。
- はprefix-closedで、である。
- すべてのでとなるが存在する2。
- である。
observation table とについて、観測表の行をとして定義します。
observation table は、
- すべてのについてとなるが存在するとき、closedといいます。
- すべてのについてならのとき、reducedといいます。
- すべてのについて、もしがでなら、であるとき、consistentといいます。
- すべてのとについてのとき、evidence-closedという。
- closed、reduced、consistent、evidence-closedであるとき、cohensiveといいます。
observtation tableがconhebsiveなとき、次のevidence automaton を構築できます。
- 状態の集合はです。
- 初期状態はで、受理状態の集合はです。
- とについてであれば、遷移が存在します。 ここではとなるとします。 (closedかつreducedなのでそのような接頭辞がただ1つ存在します。)
このようにして構築されたは決定性有限状態オートマトンなのですが、完全 (complete) とは限らないことに注意してください。 ただし、observation tableのに対して、次のような性質を満たします。
補題 (evidence compatibiltiy): cohensiveなobservation table とそれから構築されたevidence automaton およびについて、ならはを受理し、ならはを受理しません。
partitioning関数
次にevidence automatonを完全なSFAへと変換する鍵となるpartitioning関数を定義します。
定義 (partitioning関数): 実効的Bool代数に対するpartitioning関数 (partitioning function) とは関数で、入力のリストは互いに素なの部分集合の列であり、出力のリストは述語の列で、次のような規則を満たします。
- 、
- すべてのでのとき、
- 各と対応するで。
各実効的Bool代数に対して、次のようなpartitioning関数が考えられます。
例 (等価代数のpartitioning関数): 等価代数のpartitioning関数は次のようなものが考えられます。
- でが最大となるを求めます。
- と、各についてとします。 ここでについてです。
例えばであればのようになります。
例 (区間代数のpartitioning関数): 区間代数のpartitioning関数は次のようなものが考えられます。
- として、を第1要素で昇順でソートする。
- 各についてで初期化し、次の処理を行う。
- 各についてと更新する。ここでととする (はの番目の要素で、は番目の要素)。
- についてと更新する (はの1番目の要素)。
- についてと更新する (はの最後の要素)。
partitioning関数があれば、evidence automatonから完全なSFAを得るのはそれほど難しくありません。 evidence automaton と状態について、状態の集合に順序を付けてとしたとき、としてとが考えられます。 そこで、SFAでの遷移をとすると、仮説のSFA が得られます。
この仮説のSFA に対しても、と同様に次のような性質が成り立ちます。
補題 (SFAのevidence compatibiltiy): cohensiveなobservation table とそれから構築された仮説のSFAおよびについて、ならで、ならとなります。
アルゴリズム
ここからはアルゴリズムの詳細を説明します。
まず学習の設定ですがAngluinのと同じように、目的の言語に対して次のMQとEQが与えられるものとします。
- : membership query。なら、そうでないならを返す。
- : equivalence query。ならを、そうでないなら反例となる文字列を返す。
アルゴリズムではまず、Angluinのでobservation tableがconsistentかつclosedとなることを目指したように、でもobservation tableがcohensiveとなることを目指します。 そのために、次の手続きを繰り返し適用します。
- Close: となるが存在しないを探索し、見つかったらそのをからに移動し、適当なでをに追加します。
- EvidenceClose: であるとの組を探索し、見つかったらやその接頭辞をに追加します。
- MakeConsistent: でだがであるとを探索し、見つかったら、であるでをに追加します。
- Distribute: MakeConsistentでをに追加する前に、だがである各で、をに追加します。
このうちDistributeはobservation tableに既にある情報を上手く利用することで実際の計算を効率的に行うための工夫で、学習の正しさには直接関係ないのではないかと思います3。
そして、cohensiveなobservation tableから仮説のSFA を構築してEQに渡します。 反例の文字列が返ってきた場合は、に含まれない反例の接頭辞をすべてに追加します。 EQが反例の文字列を返す理由は2つありますが、次のようにどちらの場合もこの方法で正しく処理されることが分かります。
- 正しい状態が存在しなかった場合: MakeConsistentにより新しい接尾辞がに追加されCloseで新しい状態が見つかる。
- 遷移の述語が間違っていた場合: に反例の接頭辞が含まれているはずなので、次のの計算の際に新しい文字がのいずれかの要素に追加される。
アルゴリズムの全体は次のようになります。 各手続きの疑似コードは (面倒なので) 省略します。 書いてある通りにするだけです。
Algorithm 1 algorithm
function Λ*(MQ, EQ, )
, and
MQ() and MQ()
repeat
if is not closed then
Close()
end if
if is not evidence-closed then
EvidenceClose()
end if
if is not consistent then
MakeConsistent() and Distribute()
end if
Let be the evidence automaton constructed from
Let be the hypothesis SFA constructed from and
if EQ() is a counterexample then
end if
until EQ() = 1
return
end function
Scalaによる実装も示します。 基本的には説明した通りです。
Scalaによる実装
Gist: https://gist.github.com/makenowjust/bfca1fc504e4780a06f4fb3ab2d710ca
// This is an implementation of the Λ* algorithm in Scala 3.
//
// The Λ* algorithm is a learning algorithm for symbolic automata, proposed by
// Samuel Drews and Loris D'Antoni (2017), "Learning Symbolic Automata"
// https://doi.org/10.1007/978-3-662-54577-5_10.
import scala.collection.mutable
/** `BoolAlg` represents an effective Boolean algebra on the domain `D`.
*
* `P` is a type of predicates on the domain `D`.
*/
trait BoolAlg[D, P]:
/** Returns the predicate that is always true. */
def `true`: P
/** Returns the predicate that is always false. */
def `false`: P
/** Returns the predicate: p ∧ q. */
def and(p: P, q: P): P
/** Returns the predicate: p ∨ q. */
def or(p: P, q: P): P
/** Returns the predicate: ¬p. */
def not(p: P): P
/** Checks if the denotation of `p` contains `d`. */
def contains(p: P, d: D): Boolean
/** Computes the partitioning function to `ds`.
*
* This returns the sequence of separating predicates of `ds`.
*/
def partition(ds: Seq[Set[D]]): Seq[P]
/** `Pred` is a concrete representation of predicates on atomic proposition `A`.
*/
enum Pred[+A]:
case True, False
case Atom(a: A)
case And(p: Pred[A], q: Pred[A])
case Or(p: Pred[A], q: Pred[A])
case Not(p: Pred[A])
infix def and[AA >: A](that: Pred[AA]): Pred[AA] = (this, that) match
case (True, q) => q
case (p, True) => p
case (False, _) | (_, False) => False
case (p, q) => And(p, q)
infix def or[AA >: A](that: Pred[AA]): Pred[AA] = (this, that) match
case (True, _) | (_, True) => True
case (p, False) => p
case (False, q) => q
case (p, q) => Or(p, q)
def not: Pred[A] = this match
case True => False
case False => True
case Not(p) => p
case p => Not(p)
/** Checks if the denotation of `p` contains `d`.
*
* `atom` is a function that checks if the atomic proposition contains a data value.
*/
def contains[D](d: D)(atom: (A, D) => Boolean): Boolean = this match
case True => true
case False => false
case Atom(a) => atom(a, d)
case And(p, q) => p.contains(d)(atom) && q.contains(d)(atom)
case Or(p, q) => p.contains(d)(atom) || q.contains(d)(atom)
case Not(p) => !p.contains(d)(atom)
object Pred:
/** `EqualityAlgebra` is an instance of `BoolAlg` for the equality. */
given EqualityAlgebra[A]: BoolAlg[A, Pred[A]] with
def `true`: Pred[A] = Pred.True
def `false`: Pred[A] = Pred.False
def and(p: Pred[A], q: Pred[A]): Pred[A] = Pred.And(p, q)
def or(p: Pred[A], q: Pred[A]): Pred[A] = Pred.Or(p, q)
def not(p: Pred[A]): Pred[A] = Pred.Not(p)
def contains(p: Pred[A], d: A): Boolean = p.contains(d)(_ == _)
def partition(dss: Seq[Set[A]]): Seq[Pred[A]] =
val maxIndex = dss.zipWithIndex.maxBy(_._1.size)._2
val largePred = dss.iterator.zipWithIndex
.filter(_._2 != maxIndex)
.map(_._1)
.flatten
.map(Pred.Atom(_))
.foldLeft(Pred.False)(_ or _)
dss.zipWithIndex.map:
case (ds, i) if i == maxIndex => largePred.not
case (ds, i) =>
ds.iterator.map(Pred.Atom(_)).foldLeft(Pred.False)(_ or _)
/** `Dfa` represents a deterministic finite automaton.
*
* This is used for representing evidence automata (Def. 3).
*/
final case class Dfa[S, A](
initialState: S,
acceptStateSet: Set[S],
transitionFunction: Map[S, Map[A, S]]
):
/** Converts this evidence automaton to an SFA using the partitioning function.
*/
def toSfa[P](using P: BoolAlg[A, P]): Sfa[S, P] =
val transitionFunction = this.transitionFunction.map:
case (state, edgeMap) =>
val nextStateToChars =
edgeMap.toSeq.groupBy(_._2).view.mapValues(_.map(_._1)).toSeq
val preds = P.partition(nextStateToChars.map(_._2.toSet))
val newEdgeMap = nextStateToChars
.zip(preds)
.map:
case ((nextState, _), pred) => (pred, nextState)
.toMap
state -> newEdgeMap
Sfa(initialState, acceptStateSet, transitionFunction.toMap)
/** `Sfa` represents a symbolic finite automaton (Def. 1).
*
* In this implementation, SFAs are assumed to be deterministic and finite.
*/
final case class Sfa[S, P](
initialState: S,
acceptStateSet: Set[S],
transitionFunction: Map[S, Map[P, S]]
):
def transition[A](state: S, char: A)(using P: BoolAlg[A, P]): Option[S] =
val edgeMap = transitionFunction(state)
edgeMap.find((p, _) => P.contains(p, char)).map(_._2)
/** `Alphabet` represents an alphabet of characters. */
trait Alphabet[A]:
/** Returns the arbitrary character. */
def arbChar: A
object Alphabet:
/** Creates an instance of `Alphabet` with the given character. */
def apply[A](char: A): Alphabet[A] = new Alphabet[A]:
def arbChar = char
/** `Oracle` represents an oracle that provides membership and equivalence queries. */
trait Oracle[A]:
/** Checks if the given word is in the target language. */
def membershipQuery(word: Seq[A]): Boolean
/** Checks if the given SFA is equivalent to the target language.
*
* This returns a counterexample if the given SFA is not equivalent to the target language.
*/
def equivalenceQuery[P](sfa: Sfa[?, P])(using BoolAlg[A, P]): Option[Seq[A]]
object Oracle:
/** Creates an instance of `Oracle` with the given function. */
def fromFunction[A](
finiteAlphabet: Set[A],
minWordLength: Int = 10,
maxWordLength: Int = 100,
numWords: Int = 100,
randomSeed: Long = 0L
)(f: Seq[A] => Boolean): Oracle[A] =
val alphabetIndexedSeq = finiteAlphabet.toIndexedSeq
new Oracle[A]:
def membershipQuery(word: Seq[A]): Boolean = f(word)
def equivalenceQuery[P](sfa: Sfa[?, P])(using BoolAlg[A, P]): Option[Seq[A]] =
val rand = util.Random(randomSeed)
util.boundary:
for i <- 0 until numWords do
val size = rand.between(minWordLength, maxWordLength + 1)
var word = Seq.empty[A]
var state = sfa.initialState
for j <- 0 until size do
val char = alphabetIndexedSeq(rand.nextInt(alphabetIndexedSeq.size))
word :+= char
state = sfa.transition(state, char).get
if sfa.acceptStateSet.contains(state) != f(word) then
util.boundary.break(Some(word))
None
/** `Prefix` is a prefix of a word. */
type Prefix[A] = Seq[A]
/** `Suffix` is a suffix of a word. */
type Suffix[A] = Seq[A]
/** `Sig` is a signature of a prefix.
*
* A signature is a sequence of booleans; for a prefix `s`, `sig(i)` is true
* if `s ++ suffices(i)` is in the target language, otherwise false.
*/
type Sig = Seq[Boolean]
/** `ObservationTable` represents an observation table (Def. 2).
*
* In this implementation, an observation table `(S, R, E, f)` is represented as
* four values `prefixSet, boundarySet, suffices` and `rowMap`, where:
*
* - `prefixSet` is corresponding to `S`,
* - `boundarySet` is corresponding to `R`,
* - `suffices` is corresponding to `E`, and
* - `rowMap` is corresponding to `f`.
*/
final case class ObservationTable[A](
prefixSet: Set[Prefix[A]],
boundarySet: Set[Prefix[A]],
suffices: Seq[Suffix[A]],
rowMap: Map[Prefix[A], Sig]
):
/** Returns a row of the given prefix.
*
* A row value is the signature of the prefix, and it returns the pre-computed value.
*/
private def row(prefix: Prefix[A]): Sig = rowMap(prefix)
/** Computes the signature of the given prefix. */
private def sig(prefix: Prefix[A])(using O: Oracle[A]): Sig =
suffices.map(suffix => O.membershipQuery(prefix ++ suffix))
/** Returns the set of prefixes and boundaries. */
private def prefixAndBoundarySet: Set[Prefix[A]] =
prefixSet ++ boundarySet
/** Returns the set of extensions of the given prefix.
*
* An extension is a word that is obtained by appending a character to the prefix.
*/
private def extensionSet(prefix: Prefix[A]): Set[Prefix[A]] =
prefixSet.filter(b => prefix.size + 1 == b.size && b.startsWith(prefix)) ++
boundarySet.filter(b => prefix.size + 1 == b.size && b.startsWith(prefix))
/** Finds and returns an unclosed boundary if it exists. */
def findUnclosedBoundary(): Option[Prefix[A]] =
val candidates = boundarySet.filter: boundary =>
!prefixSet.exists(row(_) == row(boundary))
if candidates.nonEmpty then Some(candidates.minBy(_.size))
else None
/** Does the "close" operation (§3.3) with the given prefix. */
def close(prefix: Prefix[A])(using A: Alphabet[A], O: Oracle[A]): ObservationTable[A] =
val newPrefixSet = prefixSet + prefix
val newBoundary = prefix :+ A.arbChar
val newBoundarySet = boundarySet - prefix + newBoundary
val newRowMap =
if rowMap.contains(newBoundary) then rowMap
else rowMap + (newBoundary -> sig(newBoundary))
ObservationTable(newPrefixSet, newBoundarySet, suffices, newRowMap)
/** Finds and return a sequence of evidence-unclosed words if it exists. */
def findEvidenceUnclosedWord(): Seq[Seq[A]] =
val prefixAndBoundarySet = this.prefixAndBoundarySet
prefixSet.iterator
.flatMap: prefix =>
suffices.iterator
.map(prefix ++ _)
.filterNot(prefixAndBoundarySet)
.toSeq
/** Does the "evidence-close" operation (§3.3) with the given word. */
def evidenceClose(word: Seq[A])(using O: Oracle[A]): ObservationTable[A] =
if prefixSet.contains(word) then this
else
var newBoundarySet = boundarySet
var newRowMap = rowMap
for newBoundary <- word.inits; if !boundarySet.contains(newBoundary) do
newBoundarySet += newBoundary
newRowMap += newBoundary -> sig(newBoundary)
ObservationTable(prefixSet, newBoundarySet, suffices, newRowMap)
/** Finds and returns an inconsistent suffix if it exists. */
def findInconsistentSuffix(): Option[Suffix[A]] =
val prefixAndBoundarySet = this.prefixAndBoundarySet
val triple = (
for
prefix1 <- prefixAndBoundarySet.iterator
ext1 <- extensionSet(prefix1).iterator
char = ext1.last
prefix2 <- prefixAndBoundarySet.iterator
ext2 = prefix2 :+ char
if prefixAndBoundarySet.contains(ext2)
if row(prefix1) == row(prefix2)
if row(ext1) != row(ext2)
yield (prefix1, prefix2, char)
).nextOption()
triple.map: (prefix1, prefix2, char) =>
val ext1 = prefix1 :+ char
val ext2 = prefix2 :+ char
val index = row(ext1)
.zip(row(ext2))
.zipWithIndex
.find:
case ((b1, b2), i) => b1 != b2
.map(_._2)
.get
char +: suffices(index)
/** Does the "make-consistent" and "distribute" operations (§3.3) with the given suffix. */
def makeConsistentAndDistribute(newSuffix: Suffix[A])(using O: Oracle[A]): ObservationTable[A] =
import O.{membershipQuery => MQ}
val prefixAndBoundarySet = this.prefixAndBoundarySet
var boundarySetToAdd = Set.empty[Prefix[A]]
// First, the learner do "distribute" operation.
for
prefix1 <- prefixAndBoundarySet
prefix2 <- prefixAndBoundarySet
if row(prefix1) == row(prefix2)
if MQ(prefix1 ++ newSuffix) != MQ(prefix2 ++ newSuffix)
do
for
char <- extensionSet(prefix1).map(_.last)
if !prefixSet.contains(prefix2 :+ char)
do boundarySetToAdd += prefix2 :+ char
for
char <- extensionSet(prefix2).map(_.last)
if !prefixSet.contains(prefix1 :+ char)
do boundarySetToAdd += prefix1 :+ char
var newRowMap = rowMap
val newBoundarySet = boundarySet ++ boundarySetToAdd
for newBoundary <- boundarySetToAdd; if !rowMap.contains(newBoundary) do
newRowMap += newBoundary -> sig(newBoundary)
// Next, the learner do "make-consistent" operation.
val newSuffices = suffices :+ newSuffix
for prefix <- newRowMap.keys do newRowMap += prefix -> (newRowMap(prefix) :+ MQ(prefix ++ newSuffix))
ObservationTable(prefixSet, newBoundarySet, newSuffices, newRowMap)
/** Processes the given counterexample. */
def process(cex: Seq[A])(using O: Oracle[A]): ObservationTable[A] =
val prefixAndBoundarySet = this.prefixAndBoundarySet
var newBoundarySet = boundarySet
var newRowMap = rowMap
for boundary <- cex.inits; if !prefixAndBoundarySet.contains(boundary) do
newBoundarySet += boundary
newRowMap += boundary -> sig(boundary)
ObservationTable(prefixSet, newBoundarySet, suffices, newRowMap)
/** Builds an evidence automaton from the observation table (§3.1). */
def buildEvidence(): Dfa[Prefix[A], A] =
val prefixAndBoundarySet = this.prefixAndBoundarySet
val rowToPrefix = rowMap.view.filterKeys(prefixSet).map(_.swap).toMap
val transitionFunction = mutable.Map.empty[Prefix[A], Map[A, Prefix[A]]]
for
prefix0 <- prefixAndBoundarySet
ext0 <- extensionSet(prefix0)
do
val char = ext0.last
val prefix = rowToPrefix(row(prefix0))
val boundary = rowToPrefix(row(ext0))
if !transitionFunction.contains(prefix) then
transitionFunction(prefix) = Map.empty
transitionFunction(prefix) += char -> boundary
val initialState = rowToPrefix(rowMap(Seq.empty))
val acceptSet = prefixSet.map(row).filter(_.head).map(rowToPrefix)
Dfa(initialState, acceptSet, transitionFunction.toMap)
object ObservationTable:
/** Creates an empty observation table. */
def empty[A](using A: Alphabet[A], O: Oracle[A]): ObservationTable[A] =
import O.{membershipQuery => MQ}
val prefixSet = Set(Seq.empty[A])
val boundarySet = Set(Seq(A.arbChar))
val suffices = Seq(Seq.empty)
val rowMap = Map(
Seq.empty -> Seq(MQ(Seq.empty)),
Seq(A.arbChar) -> Seq(MQ(Seq(A.arbChar)))
)
ObservationTable(prefixSet, boundarySet, suffices, rowMap)
/** `Learner` provides an implementation of the Λ* algorithm. */
object Learner:
/** Infers an SFA from the given alphabet and oracle using the Λ* algorithm. */
def learn[A, P](
alphabet: Alphabet[A],
oracle: Oracle[A],
)(using P: BoolAlg[A, P]): Sfa[Prefix[A], P] =
import oracle.{equivalenceQuery => EQ}
given Alphabet[A] = alphabet
given Oracle[A] = oracle
var obs = ObservationTable.empty
var result = Option.empty[Sfa[Prefix[A], P]]
while result.isEmpty do
util.boundary:
println(s"obs = $obs")
for boundary <- obs.findUnclosedBoundary() do
println(s"close($boundary)")
obs = obs.close(boundary)
util.boundary.break()
val words = obs.findEvidenceUnclosedWord()
if words.nonEmpty then
for word <- words do
println(s"evidenceClose($word)")
obs = obs.evidenceClose(word)
util.boundary.break()
for suffix <- obs.findInconsistentSuffix() do
println(s"makeConsistentAndDistribute($suffix)")
obs = obs.makeConsistentAndDistribute(suffix)
util.boundary.break()
val dfa = obs.buildEvidence()
val sfa = dfa.toSfa
println(s"EQ($sfa)")
EQ(sfa) match
case Some(cex) =>
println(s"process($cex)")
obs = obs.process(cex)
case None =>
result = Some(sfa)
result.get
val alphabet = Alphabet(0)
val oracle = Oracle.fromFunction(Set(0, 1, 2, 3)): word =>
word.count(_ == 0) % 3 == 0 && word.count(_ == 1) % 2 == 0
val sfa = Learner.learn(alphabet, oracle)(using Pred.EqualityAlgebra[Int])
println(sfa)
直感的にはは、でアルファベットのすべての文字に対して遷移を求めていたものを、最初の1文字以外は反例やこれまでの観察に基づいて遷移を求めるようにしたものだと考えられます。 そのため、と同様に次のことが言えます。
定理 (の最小性): が停止したなら、最小の状態数のSFAが求まる。
の計算量
の計算量は、partitioning関数やEQの返す反例の質に依存しています。 反例の質とは何でしょうか?
例えば、図3のSFA (負の数が偶数個含む文字列を受理する) を学習することを考えてみてください。 の数という述語が上手く学習できず、のような間違った区間で学習していたとします。
ここで、返ってきた反例がであれば、正しく述語を修正できます。 しかし反例がであれば、述語をとしか修正できず、学習があまり進みません。 さらに、このあとに返ってくる反例が、、…と2番目の値が減っていくのであれば、正しく学習するために1000回以上のEQを呼び出さなければいけません。
このように、では反例の質やpartitioning関数の特徴についても考えなければ計算量について議論できません。 ですが、そのままでは議論が難しいので、まず -learnability について考えます。 ただし、ここでは簡潔な説明のみにします。 詳細な説明は論文[Drews & D'Antoni, 2017]を参照してください。
適当な実効的Bool代数とその上のpartitioning関数を考えます。 をを分割する述語のリストの集合 (つまりの返り値の集合) とします。 をへの入力とその返り値、目的の分割の3つを受け取って、より詳細なへの入力を返す関数 (ジェネレータ) の集合とします。
ジェネレータは目的の分割とpartitioning関数に対して、とおいてと繰り返し呼び出すことで、最終的にとなるようなが求められる関数と考えられます。
つまり、ジェネレータはpartitioning関数の特徴や反例の質により、述語の学習がどのように進むかを抽象化しています。 例えば図4の場合、でのへの入力はのようになっていて、ここで反例としてが返ってきたことはが、反例としてが返ってきたことはが相当します。
それでは-learnabilityを定義します。
定義 (-learnability): 実効的Bool代数とpartitioning関数、ジェネレータが与えられたとき次のような暗黙の関数が存在する場合、組を -learnable と呼びます。
- 分割について、大きさ4が高々のリストをに入力することで目的の分割が得られる。
を呼び出す毎に入力のリストは大きくなっていくので、の定義は「高々回を呼び出すことで目的の分割が得られれる」とも言い換えられます。
論文ではこのあと-learnableな実効的Bool代数のクラスを分類したり、2つの-learnableな実行的Bool代数とpartitioning関数から、その直積や直和に対応するpartitioning関数やが計算できる (つまり直積や直和も-learnableである) ことを示しています。 これらも興味深い話題です。 詳細は論文は参照してください。
論文ではを使って、次のような定理を述べています。 ただし赤文字の下線はこの記事の筆者によるもので、後で説明するように、筆者はこの定理に疑問的です。
定理 (SFAの学習可能性): 実効的Bool代数上のSFA が状態で表せる場合、学習するために必要なequivalence queryの回数はで上から抑えられる (?)。 ここでは状態の遷移先の状態に対する述語の分割で、はの遷移先に対する反例の質を反映した関数です。
の疑問点
疑問1: Distributeは必要なのか
上で説明した通り、手続きDistributeはアルゴリズムの正しさを保証するために必要なものではないと考えています。 実装で実験した限りでもDistributeの有無で学習が停止しなくなる、といったことはありませんでした。
アルゴリズムでは、Closeでobservation tableがclosedになり、EvidenceCloseでevidence-closedになり、MakeConsistentでconsistentになります。 reducedかどうかはobservation tableの更新の仕方で保証されているので、これらの手続きのみでアルゴリズムは正しく動作するはずです。
計算量的にも全ての状態の組をループする必要があり、そこまで効率的なわけではないので、この処理を省いた方が効率的なこともあるかもしれません。 ただ、既知の観察を上手く使えなくなってしまうので、equivalence queryの回数などに微妙な影響があるのではないかと思います。
疑問2: equivalence queryの上界
定理 (SFAの学習可能性) ではで抑えれらると述べています。 上界なので間違ってはいないと思うのですが、過剰に多く見積っているように思えます。 とくにというのがどこから出てきたのかよく分かりません。
ではequivalence queryは、次の2つの目的で利用されます。
- 新しい状態を発見する。
- 遷移の述語をより詳細にする。
このうち1の目的で利用されるのは高々回です。 そして2の目的で利用されるのが高々回のはずです。 よってequivalence queryの回数は高々回というのが妥当な気がします。
さらにobservation tableで各状態は1文字は遷移が存在する (observation tableの定義の3の条件) ので、に渡されるリストも空ではなく1文字は存在する状態から学習が始まります。 よって、実際には2の目的で利用されるのは高々となり、equivalence queryの回数は高々回と考えられそうです。
それではというのはどこから出てきたのでしょうか? 仮説の1つとして、membership queryの回数と間違えたのではないかと個人的に予想しています。
個の状態を区別するためには、接尾辞の集合も程度の大きさになる可能性があります。 よって、DFAの時点で、状態のDFAを学習するために回のmembership queryが必要となります。 SFAの場合はそれに加えて述語を学習するために個の接頭辞がに追加されているため、回のmembership queryが必要となりそうです。
とはいえ、これは反例の文字列の長さについての考慮などが抜けていて、あまり良い見積りではないように感じます。
[Argyros & D'Antoni, 2018]
5は2018年にArgyrosとD'Antoniによって提案されたSFAに対するオートマトン学習のアルゴリズムです。
ではKearns-Vazirani (KV) のアルゴリズムで利用されるclassification treeを使います。 classification treeを構築したあとに、各遷移の述語がSFA全体で完全・決定的となるように学習していきます。 classification treeはobservation treeと異なり遷移先の状態が必ず求められるので、どのような文字である状態からある状態に遷移するかは分からなくても、ある文字である状態からある状態に遷移するかは分かります。 つまり、では各遷移に対して、
- ある文字である状態からある状態に遷移するか→membership query
- その述語がSFA全体で完全・決定的か→equivalence query
という通常のオートマトン学習でのMAT (Minimally Adequate Teacher) に相当する情報を使って、述語を学習していきます。 これがと呼ばれる所以です。
classification tree
まずclassification treeの定義を確認します。
定義 (classification tree): アルファベット上のclassification tree は2分木で、それぞれ
- はノードの集合で、
- は葉ノードの集合、
- は木の辺です。
classification treeにはどのノードからも遷移が存在しない根ノードがただ1つ存在します。 木の辺はの下にある葉ノードについて、学習対象の言語のmembership query MQについて、となることを意図しています。
classification tree に対して、木を辿って対応する葉ノードを取得する関数を定義します。
ここで、、、です。 さらに、とします。
classification tree の葉ノードを新しいノードと葉ノードで置き換えた木をとします (図5)。
空文字列の根ノードのみからなるclassification treeをとします6。 このからはじめてで更新していくと、常に葉ノードについてとなります。
アルファベットが有限の大きさであれば、classification tree に対して、状態の集合をで遷移関数をとしてDFAが得られます。
一方で、SFAではアルファベットは有限の大きさとは限らず、これだけでは構築できません。 ただし、実質的な遷移関数であるが既に手に入っているので、ここから遷移の述語が構築できればSFAが得られることになります。
SFAの構築
実効的Bool代数とclassification tree 、2つの葉ノードについて、をからの遷移の述語のlearnerとします。 このlearnerには次のような手続きが与えられています。
- : 学習中のからの遷移の述語を返す。
- : learner を反例となる文字で正しくなるように更新する (つまり更新後にとなるようにする)。
ここでについては次のように定義されます。
例えば、等価代数に対するlearnerは次のように与えられます。
例 (等価代数のlearner): 等価代数のlearner は2つ組です。 はからに遷移できる反例の文字の部分集合で、はからに遷移しない反例の文字の部分集合です。 learnerはで初期化されて、各手続きは次のようになります。
- : ならを、ならを返します。
- : ならをに、そうでないならをに更新します。
各葉ノードの組に対してlearner を初期化して、各葉ノードについて次の処理を繰り返します。
- 完全化: とします。 ここでであれば、構築されたSFAでからの遷移はすべての文字について存在しますが、そうでなければ何かしら抜けている文字があることになります。 そのような文字がある場合、としてを呼び出し、を更新します。 この処理をが無くなるまで繰り返します。
- 決定化: 異なる葉ノードについて、とします。 ここでなら構築されたSFAでからの遷移は決定的ですが、そうでなければある文字の遷移先が少なくともとの2つ存在することになります。 がある場合、についてではない場合、を呼び出し、を更新します。 この処理をが無くなるまで繰り返します。
この処理を行なったあとにとして遷移関数を作ると、SFAは完全かつ決定的になります。 このSFAをの仮説のSFA と呼ぶことにします。
反例の処理
仮説のSFA が得られたら、それをequivalence query に渡して目的の言語になっているか確認します。 ここで、反例の文字列が返ってきた場合にどうすればいいでしょうか?
ではまず[Isberner & Steffen, 2014]と同様に二分探索を利用して、反例の原因となる遷移を行なったインデックスを特定します。 反例の原因となる遷移を行なったインデックスは、をの先頭から番目までの部分文字列、を番目の文字、を番目から末尾までの部分文字列として、としたとき、となるインデックスのこととします。
Keanrs-Vaziraniであれば、このようなインデックスが見つかったら、としてclassification treeを更新することで学習を進められました。 しかしSFAの学習の場合、これは新しい状態を発見することに相当するので、述語を詳細にするためには別の処理をしなければいけません。
まず、としてを確認します。 そして、次の処理を行います。
- なら、遷移は正しく行なわれていたので、新しい状態を追加するためにでclassification treeを更新し、この影響を受けるlearnerを初期化し直す。
- なら、遷移の述語に不十分な点があるので、とをUpdateで更新する。
こうすることで正しく学習を進められます。
アルゴリズム
最後に、ここまで説明してきたを疑似コードで示します。 引数のMQはmembership query、EQはequivalence queryで、は実効的Bool代数のlearnerの初期化や操作を与えるものです。
Algorithm 2 algorithm
function MAT*(MQ, EQ, )
InitializeClassificationTree()
InitializeGuardLearners()
GetSFAModel()
while EQ() do
EQ()
ProcessCounterexample()
GetSFAModel()
end while
return
end function
アルゴリズム中に現れる手続きはそれぞれ、次のようなものです。 これらは、ここまでで解説してきた内容と対応しています。
- : 初期化されたclassification tree を返す。
- : の各遷移に対してlearnerを初期化して返す。
- : の各遷移のlearnerを更新して、完全かつ決定的な仮説のSFAを返す。
- : 反例の文字列を処理して、classification treeと各遷移のlearnerを更新する。
このアルゴリズムをScalaで実装したものも示します。
Scalaによる実装
Gist: https://gist.github.com/makenowjust/53ed1b8e066952df9d4578d18d20097e
// This is an implementation of the MAT* algorithm in Scala 3.
//
// The MAT* algorithm is a learning algorithm for symbolic finite automata, proposed by
// George Argyros and Loris D'Antoni, "The Learnability of Symbolic Automata"
// https://doi.org/10.1007/978-3-319-96145-3_23.
import scala.annotation.tailrec
import scala.util.hashing.MurmurHash3
/** `BoolAlg` represents an effective Boolean algebra on the domain `D`.
*
* `P` is a type of predicates on the domain `D`.
*/
trait BoolAlg[D, P]:
/** Returns the predicate that is always true. */
def `true`: P
/** Returns the predicate that is always false. */
def `false`: P
/** Returns the predicate: `p ∧ q`. */
def and(p: P, q: P): P
/** Returns the predicate: `p ∨ q`. */
def or(p: P, q: P): P
/** Returns the predicate: `¬p`. */
def not(p: P): P
/** Checks if the denotation of `p` contains `d`. */
def contains(p: P, d: D): Boolean
/** Returns a witness data of the predicate `p` if it exists. */
def witness(p: P): Option[D]
/** `Interval` is a closed interval `[left, right]` of integers. */
final class Interval(val left: Int, val right: Int):
/** Checks if the interval contains the given integer `n`. */
def contains(n: Int): Boolean = left <= n && n <= right
/** Checks if the interval overlaps with the given interval `that`. */
def overlaps(that: Interval): Boolean =
val l = math.max(left, that.left)
val r = math.min(right, that.right)
l <= r
/** Checks if the interval is contiguous with the given interval `that`. */
def contiguous(that: Interval): Boolean =
val l = math.max(left, that.left)
val r = math.min(right, that.right)
l == r + 1
/** Computes the union of the interval with the given interval `that` if it can be represented as a single interval.
*/
def unionOf(that: Interval): Option[Interval] =
Option.when(overlaps(that) || contiguous(that)):
val l = math.min(left, that.left)
val r = math.max(right, that.right)
Interval(l, r)
override def equals(that: Any): Boolean = that match
case that: Interval => left == that.left && right == that.right
case _ => false
override def hashCode(): Int =
var hash = "Interval".##
hash = MurmurHash3.mix(hash, left.##)
hash = MurmurHash3.mix(hash, right.##)
MurmurHash3.finalizeHash(hash, 2)
override def toString(): String = s"Interval($left, $right)"
object Interval:
/** Returns a new interval `[left, right]`. If `left > right`, an exception is thrown. */
def apply(left: Int, right: Int): Interval =
require(left <= right, s"Invalid interval: $left > $right")
new Interval(left, right)
/** `IntervalSet` is a set of intervals. */
final class IntervalSet(val intervals: IndexedSeq[Interval]):
/** Check if the interval set is empty. */
def isEmpty: Boolean = intervals.isEmpty
/** Checks if the interval set contains the given integer `n`. */
def contains(n: Int): Boolean =
val index = intervals.search(Interval(n, n))(Ordering.by(_.right)).insertionPoint
index < intervals.length && intervals(index).contains(n)
/** Computes the union of the interval set with the given interval set `that`. */
infix def union(that: IntervalSet): IntervalSet =
IntervalSet.from(intervals ++ that.intervals)
/** Computes the intersection of the interval set with the given interval set `that`. */
infix def intersect(that: IntervalSet): IntervalSet =
(complement union that.complement).complement
/** Computes the complement of the interval set. */
def complement: IntervalSet =
if intervals.isEmpty then IntervalSet.universal
else
val complemented = IndexedSeq.newBuilder[Interval]
if intervals.head.left != Int.MinValue then complemented += Interval(Int.MinValue, intervals.head.left - 1)
for i <- 0 until intervals.length - 1 do
val left = intervals(i).right
val right = intervals(i + 1).left
complemented += Interval(left + 1, right - 1)
if intervals.last.right != Int.MaxValue then complemented += Interval(intervals.last.right + 1, Int.MaxValue)
new IntervalSet(complemented.result())
override def equals(that: Any): Boolean = that match
case that: IntervalSet => intervals == that.intervals
case _ => false
override def hashCode(): Int =
val hash = "IntervalSet".##
MurmurHash3.mixLast(hash, intervals.##)
override def toString(): String = intervals.mkString("IntervalSet(", ", ", ")")
object IntervalSet:
/** The empty interval set. */
val empty: IntervalSet = new IntervalSet(IndexedSeq.empty)
/** The universal interval set. */
val universal: IntervalSet = new IntervalSet(IndexedSeq(Interval(Int.MinValue, Int.MaxValue)))
/** Returns a new interval set from the given intervals. */
def apply(intervals: Interval*): IntervalSet = from(intervals)
/** Returns a new interval set from the given iterator of intervals. */
def from(intervals: IterableOnce[Interval]): IntervalSet =
val sorted = intervals.iterator.toSeq.sortBy(i => (i.left, i.right))
if sorted.isEmpty then empty
else
val merged = IndexedSeq.newBuilder[Interval]
var current = sorted.head
for interval <- sorted.tail do
current.unionOf(interval) match
case Some(nextCurrent) => current = nextCurrent
case None =>
merged += current
current = interval
merged += current
new IntervalSet(merged.result())
given boolAlg: BoolAlg[Int, IntervalSet] with
def `true`: IntervalSet = IntervalSet.universal
def `false`: IntervalSet = IntervalSet.empty
def and(p: IntervalSet, q: IntervalSet): IntervalSet = p intersect q
def or(p: IntervalSet, q: IntervalSet): IntervalSet = p union q
def not(p: IntervalSet): IntervalSet = p.complement
def contains(p: IntervalSet, d: Int): Boolean = p.contains(d)
def witness(p: IntervalSet): Option[Int] =
Option.when(!p.isEmpty)(p.intervals.head.left)
/** `Membership` represents a membership query. */
trait Membership[A]:
/** Checks if the given input is a member of the target language. */
def apply(a: A): Boolean
/** `Learner` represents a Boolean algebra learner.
*
* This trait takes three type parameters:
*
* - `L` is a type of learner instance.
* - `A` is a type of input data.
* - `H` is a type of hypothesis model.
*/
trait Learner[L, A, H]:
/** Returns a new learner instance. */
def create(using Membership[A]): L
/** Returns a learner updated with the given cex (counterexample). */
def update(learner: L, cex: A)(using Membership[A]): L
/** Returns the hypothesis model conjected by the learner. */
def conject(learner: L)(using Membership[A]): H
object Learner:
/** Learns a hypothesis model from the given membership query and equivalence query. */
def learn[L, A, H](mq: Membership[A], eq: (H) => Option[A])(using L: Learner[L, A, H]): H =
given Membership[A] = mq
var learner = L.create(using mq)
var cex: Option[A] = eq(L.conject(learner))
while cex.isDefined do
learner = L.update(learner, cex.get)
cex = eq(L.conject(learner))
L.conject(learner)
/** `IntervalSetLearner` is a learner for the `IntervalSet` Boolean algebra. */
final case class IntervalSetLearner(posExampleSet: Set[Int], negExampleSet: Set[Int]):
/** Returns a new learner updated with the given counterexample `cex`. */
def update(cex: Int)(using mq: Membership[Int]): IntervalSetLearner =
if mq(cex) then copy(posExampleSet = posExampleSet + cex)
else copy(negExampleSet = negExampleSet + cex)
/** Returns the hypothesis model conjected by the learner. */
def conject(): IntervalSet =
if posExampleSet.isEmpty then IntervalSet.empty
else if negExampleSet.isEmpty then IntervalSet.universal
else if posExampleSet.size < negExampleSet.size then
posExampleSet.foldLeft(IntervalSet.empty)((l, r) => l union IntervalSet(Interval(r, r)))
else negExampleSet.foldLeft(IntervalSet.empty)((l, r) => l union IntervalSet(Interval(r, r))).complement
object IntervalSetLearner:
/** The empty interval set learner. */
val empty: IntervalSetLearner = IntervalSetLearner(Set.empty, Set.empty)
given learner: Learner[IntervalSetLearner, Int, IntervalSet] with
def create(using Membership[Int]): IntervalSetLearner = IntervalSetLearner.empty
def update(learner: IntervalSetLearner, cex: Int)(using mq: Membership[Int]): IntervalSetLearner =
learner.update(cex)
def conject(learner: IntervalSetLearner)(using Membership[Int]): IntervalSet =
learner.conject()
/** `Sfa` represents a symbolic finite automaton.
*
* In this implementation, SFAs are assumed to be deterministic and finite.
*/
final case class Sfa[S, P](
initialState: S,
acceptStateSet: Set[S],
transitionFunction: Map[S, Map[P, S]]
):
/** Computes the next state from the given state and input data. */
def transition[A](state: S, char: A)(using P: BoolAlg[A, P]): Option[S] =
val edgeMap = transitionFunction(state)
edgeMap.find((p, _) => P.contains(p, char)).map(_._2)
/** Computes the next state from the given state and word. */
def transitions[A](state: S, word: Seq[A])(using P: BoolAlg[A, P]): Option[S] =
word.foldLeft(Option(state))((state, char) => state.flatMap(transition(_, char)))
/** `Prefix` is a prefix of a word. */
type Prefix[A] = Seq[A]
/** `Suffix` is a suffix of a word. */
type Suffix[A] = Seq[A]
/** `CTree` is a classification tree. */
enum CTree[A]:
case Leaf(prefix: Prefix[A])
case Node(suffix: Suffix[A], trueBranch: CTree[A], falseBranch: CTree[A])
/** Returns the set of leaf nodes. */
def leafSet: Set[Prefix[A]] = this match
case Leaf(prefix) => Set(prefix)
case Node(suffix, trueBranch, falseBranch) =>
trueBranch.leafSet ++ falseBranch.leafSet
/** Computes the leaf node that the given word belongs to. */
@tailrec
final def sift(word: Prefix[A])(using mq: Membership[Seq[A]]): Seq[A] = this match
case Leaf(prefix) => prefix
case Node(suffix, trueBranch, falseBranch) =>
val branch =
if mq(word ++ suffix) then trueBranch
else falseBranch
branch.sift(word)
/** Returns a new classification tree by splitting the leaf node with given values. */
final def split(oldLeaf: Prefix[A], newLeaf: Prefix[A], newSuffix: Suffix[A])(using
mq: Membership[Seq[A]]
): CTree[A] =
this match
case Leaf(leaf) =>
assert(leaf == oldLeaf, s"Invalid prefix: $leaf != $oldLeaf")
if mq(oldLeaf ++ newSuffix) then Node(newSuffix, Leaf(oldLeaf), Leaf(newLeaf))
else Node(newSuffix, Leaf(newLeaf), Leaf(oldLeaf))
case Node(suffix, trueBranch, falseBranch) =>
if mq(oldLeaf ++ suffix) then Node(suffix, trueBranch.split(oldLeaf, newLeaf, newSuffix), falseBranch)
else Node(suffix, trueBranch, falseBranch.split(oldLeaf, newLeaf, newSuffix))
/** `SfaLearner` is a learner for symbolic finite automata. */
final case class SfaLearner[L, A](
tree: CTree[A],
acceptMap: Map[Prefix[A], Boolean],
guardLearnerMap: Map[(Prefix[A], Prefix[A]), L]
):
/** Returns a membership query for the learner of the given transition. */
private def membership(leaf1: Prefix[A], leaf2: Prefix[A])(using Membership[Seq[A]]): Membership[A] =
new Membership[A]:
def apply(a: A): Boolean = tree.sift(leaf1 ++ Seq(a)) == leaf2
/** Splits the classification tree and updates the learner. */
private def split[P](oldLeaf: Prefix[A], newLeaf: Prefix[A], newSuffix: Suffix[A])(using
mq: Membership[Seq[A]],
L: Learner[L, A, P]
): SfaLearner[L, A] =
println(s"split($oldLeaf, $newLeaf, $newSuffix)")
val newTree = tree.split(oldLeaf, newLeaf, newSuffix)
val newAcceptMap = acceptMap ++ Map(newLeaf -> mq(newLeaf))
val newLeafPairs = newTree.leafSet.iterator.flatMap(leaf => Iterator((newLeaf, leaf), (leaf, newLeaf)))
val oldLeafPairs = tree.leafSet.iterator.map(leaf => (leaf, oldLeaf))
val newGuardLearnerMap = guardLearnerMap ++ (newLeafPairs ++ oldLeafPairs).map((leaf1, leaf2) =>
(leaf1, leaf2) -> L.create(using membership(leaf1, leaf2))
)
SfaLearner(newTree, newAcceptMap, newGuardLearnerMap)
/** Make the guards complete for the given source leaf. */
private def makeGuardsComplete[P](
srcLeaf: Prefix[A]
)(using mq: Membership[Seq[A]], L: Learner[L, A, P], P: BoolAlg[A, P]): (Boolean, SfaLearner[L, A]) =
println(s"makeGuardsComplete($srcLeaf)")
val leafSet = tree.leafSet
var newGuardLearnerMap = guardLearnerMap
var updated = false
var continue = true
while continue do
val guards = leafSet.map: leaf =>
L.conject(newGuardLearnerMap(srcLeaf, leaf))(using membership(srcLeaf, leaf))
val completePred = P.not(guards.foldLeft(P.`false`)(P.or(_, _)))
P.witness(completePred) match
case None => continue = false
case Some(cex) =>
val tgtLeaf = tree.sift(srcLeaf ++ Seq(cex))
val learner = newGuardLearnerMap((srcLeaf, tgtLeaf))
val newLearner = L.update(learner, cex)(using membership(srcLeaf, tgtLeaf))
newGuardLearnerMap += (srcLeaf, tgtLeaf) -> newLearner
updated = true
(updated, copy(guardLearnerMap = newGuardLearnerMap))
/** Make the guards deterministic for the given source leaf. */
private def makeGuardsDeterministic[P](
srcLeaf: Prefix[A]
)(using mq: Membership[Seq[A]], L: Learner[L, A, P], P: BoolAlg[A, P]): (Boolean, SfaLearner[L, A]) =
println(s"makeGuardsDeterministic($srcLeaf)")
val leafSet = tree.leafSet
var newGuardLearnerMap = guardLearnerMap
var updated = false
var continue = true
while continue do
var updatedLocal = false
for leaf1 <- leafSet; leaf2 <- leafSet; if leaf1 != leaf2 do
val guard1 = L.conject(newGuardLearnerMap(srcLeaf, leaf1))(using membership(srcLeaf, leaf1))
val guard2 = L.conject(newGuardLearnerMap(srcLeaf, leaf2))(using membership(srcLeaf, leaf2))
val deterministicPred = P.and(guard1, guard2)
P.witness(deterministicPred) match
case None => ()
case Some(cex) =>
for (leaf, guard) <- Seq((leaf1, guard1), (leaf2, guard2)) do
if membership(srcLeaf, leaf)(cex) != P.contains(guard, cex) then
val learner = newGuardLearnerMap((srcLeaf, leaf))
val newLearner = L.update(learner, cex)(using membership(srcLeaf, leaf))
newGuardLearnerMap += (srcLeaf, leaf) -> newLearner
updatedLocal = true
updated = true
if !updatedLocal then continue = false
(updated, copy(guardLearnerMap = newGuardLearnerMap))
/** Iterates `makeGuardsComplete` and `makeGuardsDeterministic` until the guards are complete and deterministic. */
private def makeGuardsCompleteAndDeterministic[P](
srcLeaf: Prefix[A]
)(using Membership[Seq[A]], Learner[L, A, P], BoolAlg[A, P]): SfaLearner[L, A] =
println(s"makeGuardsCompleteAndDeterministic($srcLeaf)")
var learner = this
var isComplete = false
var isDeterministic = false
while !isComplete || !isDeterministic do
val result1 = learner.makeGuardsComplete(srcLeaf)
learner = result1._2
val result2 = learner.makeGuardsDeterministic(srcLeaf)
learner = result2._2
isComplete = !result1._1
isDeterministic = !result2._1
learner
/** Returns the index of the breakpoint in the counterexample. */
def analyzeCex[P](hypothesis: Sfa[Prefix[A], P], cex: Seq[A])(using mq: Membership[Seq[A]], P: BoolAlg[A, P]): Int =
var expected = mq(cex)
var low = 0
var high = cex.length
while high - low > 1 do
val mid = (high - low) / 2 + low
val state = hypothesis.transitions(hypothesis.initialState, cex.slice(0, mid)).get
val word = state ++ cex.slice(mid, cex.length)
val actual = mq(word)
if actual == expected then low = mid
else high = mid
low
/** Returns a new learner updated with the given counterexample `cex`. */
def update[P](cex: Seq[A])(using mq: Membership[Seq[A]], L: Learner[L, A, P], P: BoolAlg[A, P]): SfaLearner[L, A] =
println(s"update($cex)")
val hypothesis = conject[P]()
val breakpoint = analyzeCex(hypothesis, cex)
val srcLeaf = hypothesis.transitions(hypothesis.initialState, cex.slice(0, breakpoint)).get
val tgtWord = srcLeaf ++ Seq(cex(breakpoint))
val tgtLeaf = tree.sift(tgtWord)
val guard = L.conject(guardLearnerMap((srcLeaf, tgtLeaf)))(using membership(srcLeaf, tgtLeaf))
if P.contains(guard, cex(breakpoint)) then
val newSuffix = cex.slice(breakpoint + 1, cex.length)
var newLearner = split(tgtLeaf, tgtWord, newSuffix)
for leaf <- newLearner.tree.leafSet do newLearner = newLearner.makeGuardsCompleteAndDeterministic(leaf)
newLearner
else
val newTgtLeafGuard =
L.update(guardLearnerMap((srcLeaf, tgtLeaf)), cex(breakpoint))(using membership(srcLeaf, tgtLeaf))
val tgtState = hypothesis.transition(srcLeaf, cex(breakpoint)).get
val newTgtStateGuard =
L.update(guardLearnerMap((srcLeaf, tgtState)), cex(breakpoint))(using membership(srcLeaf, tgtState))
val newLearner = copy(guardLearnerMap =
guardLearnerMap ++ Map((srcLeaf, tgtLeaf) -> newTgtLeafGuard, (srcLeaf, tgtState) -> newTgtStateGuard)
)
newLearner.makeGuardsCompleteAndDeterministic(srcLeaf)
/** Returns the hypothesis SFA conjected by the learner. */
def conject[P]()(using mq: Membership[Seq[A]], L: Learner[L, A, P]): Sfa[Prefix[A], P] =
val initialState = Seq.empty
val acceptStateSet = acceptMap.keySet.filter(acceptMap(_))
val leafSet = tree.leafSet
val transitionFunction = tree.leafSet.iterator.map: srcLeaf =>
val guardMap = leafSet.iterator.map: tgtLeaf =>
val guard = L.conject(guardLearnerMap(srcLeaf, tgtLeaf))(using membership(srcLeaf, tgtLeaf))
guard -> tgtLeaf
srcLeaf -> guardMap.toMap
Sfa(initialState, acceptStateSet, transitionFunction.toMap)
object SfaLearner:
/** Returns an empty SFA learner. */
def empty[L, A, P](using mq: Membership[Seq[A]], L: Learner[L, A, P], P: BoolAlg[A, P]): SfaLearner[L, A] =
SfaLearner(
CTree.Leaf(Seq.empty),
Map(Seq.empty -> mq(Seq.empty)),
Map((Seq.empty, Seq.empty) -> L.create(using (_ => true)))
).makeGuardsCompleteAndDeterministic(Seq.empty)
given learner[L, A, P](using L: Learner[L, A, P], P: BoolAlg[A, P]): Learner[SfaLearner[L, A], Seq[A], Sfa[Seq[A], P]]
with
def create(using Membership[Seq[A]]): SfaLearner[L, A] = SfaLearner.empty
def update(learner: SfaLearner[L, A], cex: Seq[A])(using Membership[Seq[A]]): SfaLearner[L, A] =
learner.update(cex)
def conject(learner: SfaLearner[L, A])(using Membership[Seq[A]]): Sfa[Seq[A], P] =
learner.conject()
/** Creates an equivalence query from the given membership query and finite alphabet. */
def equivalence[A, P](
mq: Membership[Seq[A]],
finiteAlphabet: Set[A],
minWordLength: Int = 10,
maxWordLength: Int = 100,
numWords: Int = 100,
randomSeed: Long = 0L
)(using BoolAlg[A, P]): (Sfa[Prefix[A], P]) => Option[Seq[A]] =
val alphabetIndexedSeq = finiteAlphabet.toIndexedSeq
(sfa: Sfa[Prefix[A], P]) =>
println(sfa)
val rand = util.Random(randomSeed)
util.boundary:
for i <- 0 until numWords do
val size = rand.between(minWordLength, maxWordLength + 1)
var word = Seq.empty[A]
var state = sfa.initialState
for j <- 0 until size do
val char = alphabetIndexedSeq(rand.nextInt(alphabetIndexedSeq.size))
word :+= char
state = sfa.transition(state, char).get
if sfa.acceptStateSet.contains(state) != mq(word) then util.boundary.break(Some(word))
None
val mq = new Membership[Seq[Int]]:
def apply(word: Seq[Int]): Boolean =
word.count(_ == 0) % 3 == 0 && word.count(_ == 1) % 2 == 0
val eq = SfaLearner.equivalence[Int, IntervalSet](mq, Set(0, 1, 2, 3))
val sfa = Learner.learn[SfaLearner[IntervalSetLearner, Int], Seq[Int], Sfa[Prefix[Int], IntervalSet]](mq, eq)
println(sfa)
次に、このアルゴリズムの計算量について考察します。
大きさの述語をで与えられるlearnerで学習するとして、をMQの呼び出し回数、をEQの呼び出し回数とします。 さらに、SFA の最大の遷移の述語の大きさをと表すことにします。 このとき、の計算量について、論文では次のように述べています。
定理: SFA とのlearnerを与えるについて、として、は高々回のmembership queryの呼び出しと、回のequivalence queryの呼び出しでを学習できる。 ここではequivalence queryの返す反例の文字列の長さの最大値とする。
また、ここまでの考察を整理するとSFAに対してもUpdateやConjectが与えられるため、SFA自身を遷移の述語の実効的Bool代数にしたSFAも (いわゆる拡張SFA) もまたで学習できるということが分かります。 これはなかなか興味深いことなのではないかと思います。
との比較
とはどちらも既存のオートマトン学習のアルゴリズムを元にして、SFAに拡張したものですが、いくつか違いがあります。 次の表でそれらをまとめました。
アルゴリズム | データ構造 | 遷移のBool代数に求めるもの |
---|---|---|
observation table () | partitioning関数 | |
classification tree (KV) | learnerの手続き |
個人的にはの方が、Bool代数に求めるものやアルゴリズムがより洗練された定義になっているように感じています7。
あとがき
この記事ではSFAの学習アルゴリズムであるとについて解説しました。
SFAは応用が分かりやすく興味深い対象なのではないかと思います。 BDDを使うことで、最小化や空性の判定など、様々な問題が効率的に解けることも知られています。 オートマトン学習に限らず、そういった方向で学んでみても面白いのではないでしょうか。
正直の計算量の辺りの話は全く自信がありません。 間違っていたらごめんなさい。
また、こうしてSFAのオートマトン学習のアルゴリズムについて調べると、本当に重要なのはアルゴリズムそのものではなく、その上で学習可能な実効的Bool代数の述語のクラスの方が重要なのではないかという気がします。 最近の研究である[Fisman, Frenkel, & Zilles, 2023]では、この学習可能な実効的Bool代数について、より詳細に考察されているように見えます。
長い記事ですが、最後まで目を通していただきありがとうございました。
参考文献
- [D'Antoni & Veanes, 2017]
D'Antoni, Loris, and Margus Veanes. "The power of symbolic automata and transducers." Computer Aided Verification: 29th International Conference, CAV 2017, Heidelberg, Germany, July 24-28, 2017, Proceedings, Part I 30. Springer International Publishing, 2017.
https://link.springer.com/chapter/10.1007/978-3-319-63387-9_3
- [Drews & D'Antoni, 2017]
Drews, Samuel, and Loris D'Antoni. "Learning symbolic automata." International Conference on Tools and Algorithms for the Construction and Analysis of Systems. Berlin, Heidelberg: Springer Berlin Heidelberg, 2017.
https://link.springer.com/chapter/10.1007/978-3-662-54577-5_10
- [Argyros & D'Antoni, 2018]
Argyros, George, and Loris D'Antoni. "The learnability of symbolic automata." Computer Aided Verification: 30th International Conference, CAV 2018, Held as Part of the Federated Logic Conference, FloC 2018, Oxford, UK, July 14-17, 2018, Proceedings, Part I 30. Springer International Publishing, 2018.
https://link.springer.com/chapter/10.1007/978-3-319-96145-3_23
- [Isberner & Steffen, 2014]
Isberner, Malte, and Bernhard Steffen. "An abstract framework for counterexample analysis in active automata learning." International Conference on Grammatical Inference. PMLR, 2014.
- [Fisman, Frenkel, & Zilles, 2023]
Fisman, Dana, Hadar Frenkel, and Sandra Zilles. "Inferring symbolic automata." Logical Methods in Computer Science 19 (2023).
脚注
-
例えば正規表現で
\w
や[0-9]
のような文字クラスを利用すると、その文字クラスに含まれる文字の分だけ同じ状態の組の遷移が現れることになります。 ↩ -
論文ではですが、アルファベットが有限で遷移先がすべて異なる状態となるときに成り立たないのでが正しいはずです。 ↩
-
論文では"This operation distributes the old evidence leading out of the amalgamated state between the newly differentiated states, simplifying the constructions in Sect.4." (pp. 179) とあります。 しかし、4章は学習の学習可能性や学習可能な実効的Bool代数の直積・直和もまた学習可能であることの説明で、constructionらしいのは直積・直和の実効的Bool代数を構成している部分くらいしかなく、最後の部分が要領の得ない一文に感じます。 (個人的には3.4章 "Worked Example"の間違いなのではないかと思っています。) ↩
-
論文では" needs as input a list of sets, provided by , with total size at most to discover a target partition ."とありますが、このtotal sizeというのが厳密に何を意味するか定かではありません。 への入力は集合の列なので、単なる列の長さではなく、要素の集合の大きさの総和なのではないかと疑っているのですが、それだと空集合の列の大きさがになってしまい、本当にそれでいいのか自信がありません。 ↩
-
論文では斜体の表記なのですが、あんまりな見た目なのでとしています。 ↩
-
論文では根ノードと、それとは異なる葉ノードとしてを追加して、片側が欠けた木として初期化していますが、なぜそんなことをしているのか意味不明なので、より分かりやすい形にしています。 このため、論文ではがを返す可能性があるものとなっています。 ↩
-
後発のアルゴリズムなので当然なのですが。 ↩