Qラーニング、3つの部分からなるストーリーを味わう

この記事は、確率的環境でエージェントを管理するためのq学習アルゴリズムの実装に関する小さなメモです。 記事の最初の部分は、シミュレーションを実行するための環境を作成することに専念します。nxnフィールドでのミニゲームでは、エージェントは、ランダムに移動する敵からできるだけ遠ざける必要があります。 敵をそれぞれ追い抜くための敵のタスク。 シミュレーションでエージェントが行った移動ごとにポイントが付与されます。 記事の第2部では、q学習アルゴリズムの基礎とその実装について説明します。 3番目の部分では、エージェントによる環境の認識を決定するパラメーターの変更を試みます。 彼のゲームのパフォーマンスに対するこれらのパラメーターの影響を分析しましょう。 特に、最小限の数のサードパーティモジュールを使用することに重点を移しました。 目標は、アルゴリズムの本質に触れること、いわば触れることです。 実装には、「純粋な」Python 3のみを使用します。







パート1.環境の作成



エージェントが動作する環境は、サイズnの2次元フィールドで表されます。 これはどのように見えるかです:







ここで、ユニットはエージェントを表し、デュースで敵を表します。 次に、コードに直接目を向けます。 環境を説明する変数:フィールドサイズ、エージェント、敵のリスト。



環境クラス
class W: def __init__(self,n): self.n=n self.P=P(1,1,n) self.ens=[EN(3,3,n),EN(4,4,n),EN(5,5,n)]
      
      







タイプアクター:エージェントまたは敵。



俳優クラス
 class un: def __init__(self,x,y): self.x = x self.y = y def getxy(self): return self.x, self.y
      
      







エージェントの説明。

エージェントクラス
 class P(un): def __init__(self,x,y,n): self.n=n un.__init__(self,x,y)
      
      







エージェント戦略はまだ空です。



メソッド呼び出しエージェント戦略
  def strtg(self): return 0,0
      
      







シミュレーションの各参加者は、斜めを含む任意の方向に移動できますが、俳優もまったく移動しない場合があります。 敵の例で許可された一連の動き:







対角線に沿った動きのおかげで、敵はどのように離れようとしても、常にエージェントを追い越す機会があります。 これにより、エージェントのトレーニングの程度に関係なく、有限数のシミュレーション移動が提供されます。 バイアスの有効性をチェックしながら、考えられるアクションを書き留めます。 このメソッドは、エージェントの座標の変更を受け入れ、その位置を更新します。



エージェント移動方法
  def move(self): dx,dy=self.strtg() a=self.x+dx b=self.y+dy expr=((0<=a<self.n) and (0<=b<self.n)) if expr: self.x=a self.y=b
      
      







敵の説明。



敵のクラス
 class EN(un): def __init__(self,x,y,n): self.n=n un.__init__(self,x,y)
      
      







変位の許容性の検証で敵を移動します。 このメソッドは、ストロークの有効性の条件が満たされるまでサイクルを終了しません。



敵の移動方法
  def move(self): expr=False while not expr: a=self.x+random.choice([-1,0,1]) b=self.y+random.choice([-1,0,1]) expr=((0<=a<self.n) and (0<=b<self.n)) if expr: self.x=a self.y=b
      
      







ワンステップシミュレーション。 敵とエージェントの位置を更新する



環境メソッド、次のシミュレーションステップを拡張
  def step(self): for i in self.ens: i.move() self.P.move()
      
      







シミュレーションの可視化。



環境メソッド、フィールドの描画およびアクターの位置
  def pr(self): print('\n'*100) px,py=self.P.getxy() self.wmap=list([[0 for i in range(self.n)] for j in range(self.n)]) self.wmap[py][px]=1 for i in self.ens: ex,ey=i.getxy() self.wmap[ey][ex]=2 for i in self.wmap: print(i)
      
      







シミュレーション、主な段階:



1.俳優の座標の登録。

2.シミュレーションの完了条件の達成の検証。

3.シミュレーションの移動を更新するサイクルの開始。

-描画環境。

-シミュレーション状態を更新します。

-俳優の座標の登録。

-シミュレーションの完了条件の達成の検証。

4.環境を描画します。



環境メソッド、シミュレーションシーケンスの再生
  def play(self): px,py=self.P.getxy() bl=True for i in self.ens: ex,ey=i.getxy() bl=bl and (px,py)!=(ex,ey) iter=0 while bl: time.sleep(1) wr.pr() self.step() px,py=self.P.getxy() bl=True for i in self.ens: ex,ey=i.getxy() bl=bl and (px,py)!=(ex,ey) print((px,py),(ex,ey)) print('___') iter=iter+1 print(iter)
      
      







環境の初期化と起動。



シミュレーションを実行する
 if __name__=="__main__": wr=W(7) wr.play() wr.pr()
      
      







パート2.エージェントトレーニング



q学習法は関数Qの導入に基づいています。これは、現在の状態sのエージェントが可能なアクションの値を反映しており、その中にシミュレーションが配置されています。 または簡単に:







Qsa









この関数は、エージェントが特定の動きで特定のアクションを完了することで受け取ることができる報酬の評価を設定します。 また、エージェントが将来受け取る報酬の評価も含まれます。 学習プロセスは、エージェントの各ターンでの関数Qの値の反復的な改良です。 まず、エージェントがこのターンに受け取る報酬の量を決定する必要があります。 変数に書きましょう rt 。 次に、後続の移動で予想される最大報酬を決定します。







 maxaQst+1a









ここで、エージェントにとって大きな価値があるもの、つまり一時的な報酬または将来の報酬を決定する必要があります。 この問題は、後続の賞のコンポーネント評価における追加要因によって解決されます。 結果として、このステップでエージェントによって予測される関数Qの値は、可能な限り値に近いはずです。







rt+ gamma cdot maxaQst+1a









したがって、エージェントの現在の動きに対する関数Qの予測エラーは次のように記述されます。







 Deltaq=rt+ gamma cdot maxaQst+1aQstat









エージェントのトレーニング率を調整する係数を導入します。 関数Qを繰り返し計算するための式は次の形式になります。







Qst+1at+1=Qstat+ alpha cdot Deltaq









関数Qの反復計算の一般式:







Qst+1at+1=Qstat+ alpha cdotrt+ gamma cdot maxaQst+1aQstat









エージェントの観点から環境の説明を構成する主な機能を選び出します。 パラメータのセットが広いほど、環境の変化に対する応答がより正確になります。 同時に、状態空間のサイズは学習速度に大きく影響する可能性があります。



エージェントメソッド、環境の状態を示す記号
  def get_features(self,x,y): features=[] for i in self.ens: #800-1400 ex,ey=i.getxy() features.append(ex) features.append(ey) features.append(x) features.append(y) return features
      
      







クラスQモデルの作成。 可変ガンマ-後続の賞の貢献度を平準化できる減衰係数。 変数alphaはモデル学習率係数です。 状態変数-モデル状態辞書。

クラスQモデル
 class Q: def __init__(self): self.gamma=0.95 self.alpha=0.05 self.state={}
      
      







このターンのエージェントの状態を取得します。



モデルのQメソッドは、要求に応じて、エージェントの状態を受け取ります
  def get_wp(self,plr): self.plr=plr
      
      







モデルをトレーニングします。



Qモデル法、モデルトレーニングの1段階
  def run_model(self,silent=1): self.plr.prev_state=self.plr.curr_state[:-2]+(self.plr.dx,self.plr.dy) self.plr.curr_state=tuple(self.plr.get_features(self.plr.x,self.plr.y))+( self.plr.dx,self.plr.dy) if not silent: print(self.plr.prev_state) print(self.plr.curr_state) r=self.plr.reward if self.plr.prev_state not in self.state: self.state[self.plr.prev_state]=0 nvec=[] for i in self.plr.actions: cstate=self.plr.curr_state[:-2]+(i[0],i[1]) if cstate not in self.state: self.state[cstate]=0 nvec.append(self.state[cstate]) nvec=max(nvec) self.state[self.plr.prev_state]=self.state[self.plr.prev_state]+self.alpha*(-self.state[self.plr.prev_state]+r+self.gamma*nvec)
      
      







賞を獲得する方法。 エージェントには、シミュレーションの継続に対する報酬が与えられます。 このペナルティは、敵との衝突に対して認められます。



環境法、賞のサイズ決定
  def get_reward(self,end_bool): if end_bool: self.P.reward=1 else: self.P.reward=-1
      
      







エージェント戦略。 モデル状態ディクショナリから最適な値を選択します。 epsクラスがエージェントクラスに追加され、移動を選択するときにランダム性の要素が導入されました。 これは、この状態で関連する可能なアクションを調査するために行われます。



エージェント方式、次の移動の選択
  def strtg(self): if random.random()<self.eps: act=random.choice(self.actions) else: name1=tuple(self.get_features(self.x,self.y)) best=[(0,0),float('-inf')] for i in self.actions: namea=name1+(i[0],i[1]) if namea not in self.QM.state: self.QM.state[namea]=0 if best[1]<self.QM.state[namea]: best=[i,self.QM.state[namea]] act=best[0] return act
      
      







シミュレーションの状態に関する情報の完全な転送(エージェントの絶対座標、敵の絶対座標)により、エージェントが何を学んだかを示します。



アルゴリズムの結果(GIF〜1Mb)




パート3.エージェントの環境の状態を反映するパラメーターを変更します



まず、エージェントが環境に関する情報をまったく受け取らない状況を考えます。



機能1
  def get_features(self,x,y): features=[] return features
      
      







このようなアルゴリズムの操作の結果、エージェントはシミュレーションごとに平均40〜75ポイントを獲得します。 トレーニングスケジュール:







データエージェントに追加します。 まず第一に、敵がどれだけ遠くにあるかを知る必要があります。 ユークリッド距離を使用してこれを計算します。 また、エージェントがどの程度接近したかを把握することも重要です。



機能2
  def get_features(self,x,y): features=[] for i in self.ens: ex,ey=i.getxy() dx=abs(x-ex) dy=abs(y-ey) l=hypot(dx,dy) features.append(l) to_brdr=min(x,y,self.n-1-x,self.n-1-y) features.append(to_brdr) return features
      
      







この情報を考慮すると、エージェントのスコアがシミュレーションごとに20ポイント低くなり、平均スコアは60〜100ポイントの範囲になります。 環境の変化への対応が改善されたという事実にもかかわらず、私たちはまだ必要なデータの大部分を失っています。 そのため、エージェントは、敵との距離を空けるために、またはエッジの端から離れるためにどの方向に移動するかをまだ知りません。 トレーニングスケジュール:







次のステップは、敵が敵のどちら側にいるのか、エージェントがフィールドの端にいるかどうかをエージェントに追加することです。



機能3
  def get_features(self,x,y): features=[] for i in self.ens: ex,ey=i.getxy() features.append(x-ex) features.append(y-ey) # if near wall x & y. if x==0: features.append(-1) elif x==self.n-1: features.append(1) else: features.append(0) if y==0: features.append(-1) elif y==self.n-1: features.append(1) else: features.append(0) return features
      
      







このような変数のセットは、ポイントの平均数をすぐに400〜800に大幅に増やします。 トレーニングスケジュール:







最後に、アルゴリズムに環境に関する利用可能なすべての情報を提供します。 つまり、エージェントと敵の絶対座標。



機能4
  def get_features(self,x,y): features=[] for i in self.ens: ex,ey=i.getxy() features.append(ex) features.append(ey) features.append(x) features.append(y) return features
      
      







このパラメーターセットにより、エージェントはテストシミュレーションで800〜1400ポイントを獲得できます。 トレーニングスケジュール:







そこで、q-learningアルゴリズムの動作を調べました。 エージェントが環境を知覚する原因となるパラメーターをひねりながら、情報を適切に転送し、環境を説明するときにすべての側面を考慮すると、その行動の効果にどのように影響するかが明確にわかりました。 ただし、環境の完全な説明には欠点があります。 つまり、エージェントに送信される情報量の増加に伴う状態空間の爆発的な成長。 この問題を回避するために、たとえばDQNアルゴリズムなど、状態空間を近似するアルゴリズムが開発されました。



便利なリンク:



habrahabr.ru/post/274597

ai.berkeley.edu/reinforcement.html



プログラムの完全なコード:



1.環境の基本モデル(パート1から)
 import random import time class W: def __init__(self,n): self.n=n self.P=P(1,1,n) self.ens=[EN(3,3,n),EN(4,4,n),EN(5,5,n)] def step(self): for i in self.ens: i.move() self.P.move() def pr(self): print('\n'*100) px,py=self.P.getxy() self.wmap=list([[0 for i in range(self.n)] for j in range(self.n)]) self.wmap[py][px]=1 for i in self.ens: ex,ey=i.getxy() self.wmap[ey][ex]=2 for i in self.wmap: print(i) def play(self): px,py=self.P.getxy() bl=True for i in self.ens: ex,ey=i.getxy() bl=bl and (px,py)!=(ex,ey) iter=0 while bl: time.sleep(1) wr.pr() self.step() px,py=self.P.getxy() bl=True for i in self.ens: ex,ey=i.getxy() bl=bl and (px,py)!=(ex,ey) print((px,py),(ex,ey)) print('___') iter=iter+1 print(iter) class un: def __init__(self,x,y): self.x = x self.y = y def getxy(self): return self.x, self.y class P(un): def __init__(self,x,y,n): self.n=n un.__init__(self,x,y) def strtg(self): return 0,0 def move(self): dx,dy=self.strtg() a=self.x+dx b=self.y+dy expr=((0<=a<self.n) and (0<=b<self.n)) if expr: self.x=a self.y=b class EN(un): def __init__(self,x,y,n): self.n=n un.__init__(self,x,y) def move(self): expr=False while not expr: a=self.x+random.choice([-1,0,1]) b=self.y+random.choice([-1,0,1]) expr=((0<=a<self.n) and (0<=b<self.n)) if expr: self.x=a self.y=b if __name__=="__main__": wr=W(7) wr.play() wr.pr()
      
      







2. 500 + 1500のシミュレーションでエージェントトレーニングを行う環境モデル
 import random import time from math import hypot,pi,cos,sin,sqrt,exp import plot_epoch class Q: def __init__(self): self.gamma=0.95 self.alpha=0.05 self.state={} def get_wp(self,plr): self.plr=plr def run_model(self,silent=1): self.plr.prev_state=self.plr.curr_state[:-2]+(self.plr.dx,self.plr.dy) self.plr.curr_state=tuple(self.plr.get_features(self.plr.x,self.plr.y))+(self.plr.dx,self.plr.dy) if not silent: print(self.plr.prev_state) print(self.plr.curr_state) r=self.plr.reward if self.plr.prev_state not in self.state: self.state[self.plr.prev_state]=0 nvec=[] for i in self.plr.actions: cstate=self.plr.curr_state[:-2]+(i[0],i[1]) if cstate not in self.state: self.state[cstate]=0 nvec.append(self.state[cstate]) nvec=max(nvec) self.state[self.plr.prev_state]=self.state[self.plr.prev_state]+self.alpha*( -self.state[self.plr.prev_state]+r+self.gamma*nvec) class un: def __init__(self,x,y): self.x = x self.y = y self.actions=[(0,0),(-1,-1),(0,-1),(1,-1),(-1,0), (1,0),(-1,1),(0,1),(1,1)] def getxy(self): return self.x, self.y class P(un): def __init__(self,x,y,n,ens,QM,wrld): self.wrld=wrld self.QM=QM self.ens=ens self.n=n self.dx=0 self.dy=0 self.eps=0.95 self.prev_state=tuple(self.get_features(x,y))+(self.dx,self.dy) self.curr_state=tuple(self.get_features(x,y))+(self.dx,self.dy) un.__init__(self,x,y) def get_features(self,x,y): features=[] # for i in self.ens: #80-100 # ex,ey=i.getxy() # dx=abs(x-ex) # dy=abs(y-ey) # l=hypot(dx,dy) # features.append(l) # to_brdr=min(x,y,self.n-1-x,self.n-1-y) # features.append(to_brdr) for i in self.ens: #800-1400 ex,ey=i.getxy() features.append(ex) features.append(ey) features.append(x) features.append(y) # for i in self.ens: #800-1400 # ex,ey=i.getxy() # features.append(x-ex) # features.append(y-ey) # features.append(self.n-1-x) # features.append(self.n-1-y) # for i in self.ens: #400-800 # ex,ey=i.getxy() # features.append(x-ex) # features.append(y-ey) # # if near wall x & y. # if x==0: # features.append(-1) # elif x==self.n-1: # features.append(1) # else: # features.append(0) # if y==0: # features.append(-1) # elif y==self.n-1: # features.append(1) # else: # features.append(0) # features=[] #40-80 return features def strtg(self): if random.random()<self.eps: act=random.choice(self.actions) else: name1=tuple(self.get_features(self.x,self.y)) best=[(0,0),float('-inf')] for i in self.actions: namea=name1+(i[0],i[1]) if namea not in self.QM.state: self.QM.state[namea]=0 if best[1]<self.QM.state[namea]: best=[i,self.QM.state[namea]] act=best[0] return act def move(self): self.dx,self.dy=self.strtg() a=self.x+self.dx b=self.y+self.dy expr=((0<=a<self.n) and (0<=b<self.n)) if expr: self.x=a self.y=b class EN(un): def __init__(self,x,y,n): self.n=n un.__init__(self,x,y) def move(self): expr=False cou=0 while not expr: act=random.choice(self.actions) a=self.x+act[0] b=self.y+act[1] expr=((0<=a<self.n) and (0<=b<self.n)) if expr: self.x=a self.y=b class W: def __init__(self,n,QModel): self.ens=[EN(n-2,n-2,n)]#,EN(n-2,n-1,n),EN(n-1,n-2,n),EN(n-1,n-1,n)] self.P=P(1,1,n,self.ens,QModel,self) self.n=n self.QM=QModel self.QM.get_wp(self.P) def step(self): self.P.move() for i in self.ens: i.move() def pr(self,silent=1): """print map""" #print('\n'*100) px,py=self.P.getxy() self.wmap=list([[0 for i in range(self.n)] for j in range(self.n)]) self.wmap[py][px]=1 for i in self.ens: ex,ey=i.getxy() self.wmap[ey][ex]=2 if not silent: for i in self.wmap: print(i) def is_finished(self): px,py=self.P.getxy() end_bool=True for i in self.ens: ex,ey=i.getxy() end_bool=end_bool and ((px,py)!=(ex,ey)) return end_bool def get_reward(self,end_bool): if end_bool: self.P.reward=1 else: self.P.reward=-1 def play(self,silent=1,silent_run=1): end_bool=self.is_finished() iter=0 while end_bool: self.pr(silent) self.step() end_bool=self.is_finished() self.get_reward(end_bool) if silent_run: self.QM.run_model(silent) if not silent: print('___') time.sleep(0.1) iter=iter+1 return iter QModel=Q() plot=plot_epoch.epoch_graph() for i in range(500): wr=W(5,QModel) wr.P.eps=0.90 iter=wr.play(1) wr.pr(1) plot.plt_virt_game(W,QModel) for i in range(1500): wr=W(5,QModel) #print(len(QModel.state)) wr.P.eps=0.2 iter=wr.play(1) wr.pr(1) plot.plt_virt_game(W,QModel) plot.plot_graph() print('___') for i in range(10): wr=W(5,QModel) wr.P.eps=0.0 iter=wr.play(0) wr.pr(0)
      
      







3.チャート出力モジュール(plot_epoch.py​​)。
 import matplotlib.pyplot as plt class epoch_graph: def __init__(self): self.it=0 self.iter=[] self.number=[] self.iter_aver=[] def plt_append(self,iter): self.it=self.it+1 self.iter.append(iter) self.number.append(self.it) if len(self.iter)>100: self.iter_aver.append(sum(self.iter[-100:])/100) else: self.iter_aver.append(sum(self.iter)/len(self.iter)) def plt_virt_game(self,W,QModel): wr=W(5,QModel) wr.P.eps=0.0 iter=wr.play(1,0) self.plt_append(iter) def plot_graph(self): plt.plot(self.number,self.iter_aver) plt.xlabel('n_epoch') plt.ylabel('aver. score') plt.show()
      
      








All Articles