일단 가장 쉬운 REINFORCE로 해보기로 결정했다.
Environment 구현에서는 step Function 정의가 핵심이다.
step Function을 정의하기 위해서는 아래 개념의 정의가 필요하다.
action
state, statePrime
전체소스
# @title 기본 제목 텍스트
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import numpy as np
import time
from collections import namedtuple, deque
# ===================================================================
# 1. 환경 (Environment) - 'step' 메서드 수정
# ===================================================================
class BinPackingEnv:
def __init__(self, bin_width, bin_height):
self.bin_width = bin_width
self.bin_height = bin_height
self.action_space_size = bin_width * bin_height
self.items_list = self._generate_items()
self.reset()
def _generate_items(self):
items = []
items.append((1, 9))
items.append((10, 1))
items.append((5, 2))
items.append((4, 4))
items.append((3, 3))
items.append((3, 3))
items.append((3, 3))
items.append((3, 3))
items.append((2, 3))
items.append((2, 3))
return items
def reset(self):
self.bin = np.zeros((self.bin_height, self.bin_width))
self.items = list(self.items_list)
random.shuffle(self.items)
self.current_item_idx = 0
def _print_status(self):
print("\n현재 상자 상태:")
print(self.bin)
if self.current_item_idx < len(self.items):
w, h = self.items[self.current_item_idx]
print(f"다음 물건 크기: {w}x{h}")
def get_state_and_valid_actions(self):
state = torch.from_numpy(self.bin).float().unsqueeze(0).unsqueeze(0)
if self.current_item_idx >= len(self.items):
return state, torch.zeros(self.action_space_size, dtype=torch.bool)
item_w, item_h = self.items[self.current_item_idx]
valid_actions_mask = torch.zeros(self.bin_height, self.bin_width, dtype=torch.bool)
for y in range(self.bin_height - item_h + 1):
for x in range(self.bin_width - item_w + 1):
if np.sum(self.bin[y:y + item_h, x:x + item_w]) == 0:
valid_actions_mask[y, x] = True
return state, valid_actions_mask.flatten()
def _check_if_any_valid_moves_exist(self):
"""다음 아이템을 놓을 수 있는 유효한 공간이 하나라도 있는지 확인"""
if self.current_item_idx >= len(self.items):
return True # 이미 모든 아이템을 다 놓았으므로 유효하다고 판단
item_w, item_h = self.items[self.current_item_idx]
for y in range(self.bin_height - item_h + 1):
for x in range(self.bin_width - item_w + 1):
if np.sum(self.bin[y:y + item_h, x:x + item_w]) == 0:
return True # 놓을 수 있는 공간을 하나라도 찾으면 즉시 True 반환
return False # 전체를 탐색해도 놓을 공간이 없으면 False 반환
def step(self, action):
item_w, item_h = self.items[self.current_item_idx]
y, x = np.unravel_index(action, (self.bin_height, self.bin_width))
self.bin[y:y + item_h, x:x + item_w] = 1
reward = float(item_w * item_h) / 10.0 # 보상 스케일링 (학습 안정화에 도움)
self.current_item_idx += 1
done = False
# << 변경: 보상 설계 (Reward Shaping) 적용 >>
# 다음 아이템을 놓을 곳이 있는지 확인
if not self._check_if_any_valid_moves_exist():
# 모든 아이템을 성공적으로 다 넣었는지 확인
if self.current_item_idx >= len(self.items):
reward += 100 # 성공 보너스
else:
reward -= 100 # 아이템이 남았는데 놓을 곳이 없으면 실패 패널티
done = True
# 모든 아이템을 다 넣었지만, 위 조건에서 걸러지지 않은 경우(마지막 아이템을 딱 맞게 넣은 경우)
if self.current_item_idx >= len(self.items) and not done:
reward += 100
done = True
return reward, done
# ===================================================================
# 2. 신경망 (DQN) - 구조 변경
# ===================================================================
class DQN(nn.Module):
# << 변경: 신경망 구조를 더 깊게 만듦 >>
def __init__(self, h, w, outputs):
super(DQN, self).__init__()
self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
self.conv3 = nn.Conv2d(64, 64, kernel_size=3, padding=1) # 레이어 추가
self.head = nn.Linear(h * w * 64, outputs) # 최종 레이어 입력 크기 변경
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
x = F.relu(self.conv3(x)) # 추가된 레이어 통과
return self.head(x.view(x.size(0), -1))
# ===================================================================
# 3. 리플레이 버퍼 (Replay Buffer) - 변경 없음
# ===================================================================
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))
class ReplayBuffer:
def __init__(self, capacity):
self.memory = deque([], maxlen=capacity)
def push(self, *args):
self.memory.append(Transition(*args))
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
# --- 하이퍼파라미터 설정 (변경) ---
BIN_SIZE = 10
N_ACTIONS = BIN_SIZE * BIN_SIZE
EPISODES = 15000 # << 변경: 학습 시간 증가
LEARNING_RATE = 0.0005 # << 변경: 학습률 약간 낮춰 안정성 증가
GAMMA = 0.99 # << 변경: 미래 보상 가치 증가
BATCH_SIZE = 128 # << 변경: 배치 크기 증가
REPLAY_MEMORY_SIZE = 10000
# --- 학습 준비 ---
env = BinPackingEnv(BIN_SIZE, BIN_SIZE)
policy_net = DQN(BIN_SIZE, BIN_SIZE, N_ACTIONS)
target_net = DQN(BIN_SIZE, BIN_SIZE, N_ACTIONS)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()
optimizer = optim.Adam(policy_net.parameters(), lr=LEARNING_RATE)
loss_function = nn.MSELoss()
replay_buffer = ReplayBuffer(REPLAY_MEMORY_SIZE)
epsilon = 0.9
TARGET_UPDATE = 100
# ===================================================================
# 4. 최적화 함수 (Optimize Function) - 변경 없음
# ===================================================================
def optimize_model():
if len(replay_buffer) < BATCH_SIZE:
return
transitions = replay_buffer.sample(BATCH_SIZE)
batch = Transition(*zip(*transitions))
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
done_batch = torch.cat(batch.done)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
predicted_q = policy_net(state_batch).gather(1, action_batch)
next_state_q_values = torch.zeros(BATCH_SIZE)
non_final_mask = ~done_batch # 종료되지 않은 상태들만 True
# 다음 상태가 있는 경우에만 target_net으로 Q값 계산
if non_final_next_states.size(0) > 0:
next_state_q_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
target_q = reward_batch + (GAMMA * next_state_q_values)
loss = loss_function(predicted_q, target_q.unsqueeze(1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
# ===================================================================
# 5. 학습 루프 (Training Loop) - Epsilon 감쇠율 수정
# ===================================================================
print("===== 학습 시작 =====")
total_rewards = []
for i_episode in range(EPISODES):
env.reset()
done = False
episode_reward = 0
while not done:
state, valid_actions_mask = env.get_state_and_valid_actions()
if not valid_actions_mask.any():
break
if random.random() < epsilon:
valid_indices = torch.where(valid_actions_mask)[0]
action = random.choice(valid_indices).item()
else:
with torch.no_grad():
q_values = policy_net(state)
q_values = q_values.masked_fill(~valid_actions_mask, -float('inf'))
action = q_values.max(1)[1].item()
reward, done = env.step(action)
episode_reward += reward
next_state, _ = env.get_state_and_valid_actions()
if done:
next_state = None
action_tensor = torch.tensor([[action]], dtype=torch.long)
reward_tensor = torch.tensor([reward], dtype=torch.float)
done_tensor = torch.tensor([done], dtype=torch.bool)
replay_buffer.push(state, action_tensor, next_state, reward_tensor, done_tensor)
optimize_model()
if epsilon > 0.05:
epsilon *= 0.9995 # << 변경: Epsilon 감쇠율 완만하게 조정
if i_episode % TARGET_UPDATE == 0:
target_net.load_state_dict(policy_net.state_dict())
total_rewards.append(episode_reward)
if (i_episode + 1) % 100 == 0:
avg_reward = np.mean(total_rewards[-100:])
print(f"에피소드 {i_episode + 1}/{EPISODES} | 최근 100 에피소드 평균 보상: {avg_reward:.2f} | Epsilon: {epsilon:.3f}")
print("\n===== 학습 완료! =====\n")
# ===================================================================
# 6. 학습된 모델 테스트 - 변경 없음
# ===================================================================
print("===== 실제 적재 테스트 시작 =====")
env.reset()
env._print_status()
time.sleep(1)
done = False
while not done:
state, valid_actions_mask = env.get_state_and_valid_actions()
if not valid_actions_mask.any():
print("\n실패! 더 이상 물건을 놓을 공간이 없습니다.")
break
with torch.no_grad():
q_values = policy_net(state)
q_values = q_values.masked_fill(~valid_actions_mask, -float('inf'))
action = q_values.max(1)[1].item()
print(f"\n-> AI의 선택: {np.unravel_index(action, (BIN_SIZE, BIN_SIZE))} 위치에 놓기")
_, done = env.step(action)
env._print_status()
time.sleep(1)
placed_items = env.current_item_idx
total_items = len(env.items)
print("\n===== 테스트 종료! =====")
print(f"최종 결과: 총 {total_items}개의 물건 중 {placed_items}개를 적재했습니다.")
print("최종 상자 모습:")
print(env.bin)
Python
복사
Reference:
[1] The Bin Packing Problem by Google, https://developers.google.com/optimization/pack/bin_packing, Creative Commons Attribution 4.0 License