2021-04-12-Mon-StudyKR

» studyKR

RL

MDP를 모를 때 밸류 평가하기

Temproal Difference

몬테카를로는 업데이트를 하려면 에피소드가 끝날 때까지 기다려야 한다
    업데이트를 할려면 리턴이 필요한데 리턴은 에피소드가 끝나기전까지는 알 수 없다.

추측을 추측으로 업데이트하자
목표 Target = TD Target

exp

Temporal Difference 학습 알고리즘

1. 테이블을 초기화, 각 상태에 대해 그 값을 0으로 초기화
2. 에이전트를 So에 놔두고 경험을 쌓게함
MC와 다른점 2개 --> 업데이트 수식과 시점
Gt = Rt+1 + gamma*V(St+1)

Monte-Carlo vs Temporal Difference

학습 시점

MDP를 모르는 상황에서 임의의 정책함수가 주어졌을 때 그 가치 함수를 학습하는 방법
학습 시점
    MC: 에피소드가 끝나고 리턴이 정해져야 비로소 되돌아보며 학습
    TD: 한 스텝이 끄날 때마다 바로바로 테이블 값 업데이트

    Episode MDP: 종료상태가 있어 에이전트의 경험이 에피소드 단위로 나뉘어짐
    Non-Episode MDP: 종료 상태 없이, 하나의 에피소드가 무한히 이어지는 MDP

    -> MC는 Episode MDP에만 적용되지만, TD는 둘다 적용 가능.

편향성

MC는 여러 개의 샘플이 모일수록 그 평균은 실제 가치에 수렴하게 되므로, 편향되어 있지 않다.
TD는 TD Target과 현재 추측치 사이 차이를 줄여주는 방향으로 업데이트 하므로, 편향되어 있음
    실제 가치에 다가가리라는 보장이 없음.
    * 벨만 기대 방정식 = Vpi(St+1) vs TD = V(St+1) --> Vpi이길 바라는 값

분산

MC는 리턴을 통해 업데이트가 되므로, 분산이 큼
TD는 한 샘플마다 업데이트가 가능하기 때문에, 분산이 작음

Intermediate of MC and TD

N Step TD

꼭 한 스텝만 가서 평가해야 할 이유가 없으므로, n step 만큼 가서 가치 판단
--> A3C 논문: for loop를 돌면서 특정 구간의 맨 마지막 스텝에서는 가치 함수의 평가 값인 V(s)를 넣고, 
    그 이후에는 감마를 곱해가면서 실제 보상 값을 더해주는 방식

MDP를 모를 때 Coding

import random
import numpy as np

class GridWorld():
    def __init__(self):
        self.x = 0
        self.y = 0
    
    def step(self, a):
        if a == 0:
            self.move_right()
        elif a== 1:
            self.move_left()
        elif a== 2:
            self.move_up()
        elif a==3:
            self.move_down()
        
        reward = -1
        done = self.is_done()
        return (self.x, self.y), reward, done
        def move_right(self):
        self.y += 1  
        if self.y > 3:
            self.y = 3
      
    def move_left(self):
        self.y -= 1
        if self.y < 0:
            self.y = 0
      
    def move_up(self):
        self.x -= 1
        if self.x < 0:
            self.x = 0
  
    def move_down(self):
        self.x += 1
        if self.x > 3:
            self.x = 3

    def is_done(self):
        if self.x == 3 and self.y == 3:
            return True
        else :
            return False

    def get_state(self):
        return (self.x, self.y)
      
    def reset(self):
        self.x = 0
        self.y = 0
        return (self.x, self.y)

class Agent():
    def __init__(self):
        pass        

    def select_action(self):
        coin = random.random()
        if coin < 0.25:
            action = 0
        elif coin < 0.5:
            action = 1
        elif coin < 0.75:
            action = 2
        else:
            action = 3
        return action

MC

def main():
    env = GridWorld()
    agent = Agent()
    data = [[0,0,0,0],[0,0,0,0],[0,0,0,0],[0,0,0,0]]
    gamma = 1.0
    reward = -1
    alpha = 0.001

    for k in range(50000): #5만번의 에피소드
        done = False
        history = []

        while not done:
            action = agent.select_action()
            (x,y), reward, done = env.step(action)
            history.append((x,y,reward))
        env.reset()
        # 매 에피소드가 끝나고 바로 데이터를 이용해 테이블 업데이트
        cum_reward = 0
        for transition in history[::-1]: # 방문했던 상태들을 뒤에서부터 보며 차례차례 리턴을 계산
            x, y, reward = transition
            data[x][y] = data[x][y] + alpha*(cum_reward-data[x][y])
            cum_reward = reward + gamma*cum_reward  
        # 데이터 출력    
    for row in data:
        print(row)

if __name__ == '__main__':
    main()

TD

def main():
    env = GridWorld()
    agent = Agent()
    data = [[0,0,0,0],[0,0,0,0],[0,0,0,0],[0,0,0,0]]
    gamma = 1.0
    reward = -1
    alpha = 0.01 # MC에 비해 큰 값

    for k in range(50000):
        done = False
        while not done:
            x, y = env.get_state()
            action = agent.select_action()
            (x_prime, y_prime), reward, done = env.step(action)
            x_prime, y_prime = env.get_state()
            data[x][y] = data[x][y] + alpha*(reward+gamma*data[x_prime][y_prime]-data[x][y]) # 한 step 후 바로 테이블에 데이터 업데이트
        env.reset()
            
    for row in data:
        print(row)

if __name__ == '__main__':
    main()