OR Tools를 이용해서 풀어봤던 동일한 2D Bin Packing 문제를 강화학습으로 풀어보겠습니다.
아래와 같은 크기를 갖는 10개의 Item을 10*10의 크기를 갖는 2차원 공간에 강화학습 Agent가 모두 적재하면 성공이라고 생각할 수 있습니다.
def _generate_items(self):
items = []
items.append((1, 9))
items.append((10, 1))
items.append((5, 2))
items.append((4, 4))
items.append((3, 3))
items.append((3, 3))
items.append((3, 3))
items.append((3, 3))
items.append((2, 3))
items.append((2, 3))
return items
Plain Text
복사
State정의
State는 의 정의를 위해서 Bin의 비어있는 칸은 0, 물건이 채워진 칸은 1로 표시 하는 가장 직관적인 방법을 사용했습니다.
10x10 의 크기를 갖는 2차원 배열을 0으로 초기화 한 다음, Item이 놓여질 때 마다 1로 업데이트를 하여 State 정의하였습니다.
예를 들어, 10x10 상자에 (1,9) 크기의 물건 하나가 좌측 상단에 놓인 상태는 다음과 같이 표현할 수 있습니다.
Action정의
Action은 Agent가 취할 수 있는 행동으로 정의 할 수 있습니다.
Bin Packing에서 Agent의 행동은 "다음에 들어올 Item을 어느 좌표에 놓을 것인가?" 로 정의할 수 있는데,
상자가 10x10 크기이므로 놓을 수 있는 좌표의 개수는 총 10 * 10 = 100개 이므로 선택할 수 있는 Action의 수는 100개 입니다.
100개의 좌표를 Action Space로 정의하면 아래와 같이 생각할 수 있습니다.
Action 0 = (0, 0)에 놓기
Action 1 = (0, 1)에 놓기
...
Action 99 = (9, 9)에 놓기
Masking
100개의 모든 행동을 고려하여 학습을 하다 보면
Item을 적재할 수 없는 불가능한 행동도 학습을 하게 되어 강화학습 Agent가 학습하는데 많은 비효율이 발생할 수 있습니다.
이러한 비효율을 극복할 수 있도록 Action Masking 기법을 사용했습니다.
Action Masking이란 현재 State와 놓을 물건의 크기를 바탕으로, 애초에 불가능한 행동은 선택지에서 제외하는 것입니다.
물건을 놓을 수 있는 모든 유효한 좌표를 미리 계산하여, 100개의 Action에 대한 Q-value(미래 보상 예측값)를 출력하면,
유효하지 않은 행동의 Q-value를 아주 작은 값(-무한대)으로 만들어 버립니다.
이렇게 하면 Agent는 자연스럽게 유효한 행동 중에서만 최적의 행동을 선택하게 됩니다.
이 기법 덕분에 불필요한 학습 과정을 크게 줄이고 훨씬 안정적으로 모델을 훈련시킬 수 있었습니다.
전체소스
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import numpy as np
import time
from collections import namedtuple, deque
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
class BinPackingEnv:
def __init__(self, bin_width, bin_height):
self.bin_width = bin_width
self.bin_height = bin_height
self.action_space_size = bin_width * bin_height
self.items_list = self._generate_items()
self.reset()
def _generate_items(self):
items = []
items.append((1, 9))
items.append((10, 1))
items.append((5, 2))
items.append((4, 4))
items.append((3, 3))
items.append((3, 3))
items.append((3, 3))
items.append((3, 3))
items.append((2, 3))
items.append((2, 3))
return items
def reset(self):
self.bin = np.zeros((self.bin_height, self.bin_width))
self.items = list(self.items_list)
#random.shuffle(self.items)
self.current_item_idx = 0
def _print_status(self):
print(self.bin)
if self.current_item_idx < len(self.items):
w, h = self.items[self.current_item_idx]
print(f"다음 물건 크기: {w}x{h}")
def get_state_and_valid_actions(self):
state = torch.from_numpy(self.bin).float().unsqueeze(0)
if self.current_item_idx >= len(self.items):
return state, torch.zeros(self.action_space_size, dtype=torch.bool)
item_w, item_h = self.items[self.current_item_idx]
valid_actions_mask = torch.zeros(self.bin_height, self.bin_width, dtype=torch.bool)
for y in range(self.bin_height - item_h + 1):
for x in range(self.bin_width - item_w + 1):
is_empty = True
for r in range(y, y + item_h):
for c in range(x, x + item_w):
if self.bin[r, c] != 0:
is_empty = False
break
if not is_empty:
break
if is_empty: #가능한 영역이라면 해당 좌표를 1로 업데이트
valid_actions_mask[y, x] = True
return state, valid_actions_mask.flatten()
def _check_if_any_valid_moves_exist(self):
if self.current_item_idx >= len(self.items):
return True # 이미 모든 아이템을 다 놓았으므로 유효하다고 판단
item_w, item_h = self.items[self.current_item_idx]
for y in range(self.bin_height - item_h + 1):
for x in range(self.bin_width - item_w + 1):
is_empty = True
for r in range(y, y + item_h): #range(2, 5)는 2, 3, 4 생성
for c in range(x, x + item_w):
if self.bin[r, c] != 0:
is_empty = False
break
if not is_empty:
break
if is_empty:
return True
return False # 전체를 탐색해도 놓을 공간이 없으면 False 반환
def step(self, action):
item_w, item_h = self.items[self.current_item_idx]
y, x = np.unravel_index(action, (self.bin_height, self.bin_width))
for r in range(item_h):
# 가로(너비) 방향으로 반복
for c in range(item_w):
# (y, x)에서 시작해서, 세로로 r칸, 가로로 c칸 떨어진 위치에 물건이 놓였다는 의미로 1을 표시
self.bin[y + r, x + c] = 1
reward = float(item_w * item_h)
self.current_item_idx += 1
done = False
if not self._check_if_any_valid_moves_exist():
if self.current_item_idx >= len(self.items):
reward += 1
else:
reward -= 1
done = True
if self.current_item_idx >= len(self.items) and not done:
reward += 1
done = True
return reward, done
class DQN(nn.Module):
def __init__(self, h, w, outputs):
super(DQN, self).__init__()
input_size = h * w
self.fc1 = nn.Linear(input_size, 256)
self.fc2 = nn.Linear(256, 256)
self.fc3 = nn.Linear(256, outputs)
def forward(self, x):
# view를 사용하여 1차원 벡터로 만듭니다. ex) [0, 0] 또는 [9, 9]
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))
class ReplayBuffer:
def __init__(self, capacity):
self.memory = deque([], maxlen=capacity)
def push(self, *args):
self.memory.append(Transition(*args))
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
BIN_SIZE = 10
N_ACTIONS = BIN_SIZE * BIN_SIZE
EPISODES = 40000
LEARNING_RATE = 0.0005
GAMMA = 0.99
BATCH_SIZE = 128
REPLAY_MEMORY_SIZE = 10000
env = BinPackingEnv(BIN_SIZE, BIN_SIZE)
policy_net = DQN(BIN_SIZE, BIN_SIZE, N_ACTIONS).to(device)
target_net = DQN(BIN_SIZE, BIN_SIZE, N_ACTIONS).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()
optimizer = optim.Adam(policy_net.parameters(), lr=LEARNING_RATE)
loss_function = nn.MSELoss()
replay_buffer = ReplayBuffer(REPLAY_MEMORY_SIZE)
epsilon = 0.7
TARGET_UPDATE = 50
def optimize_model():
if len(replay_buffer) < BATCH_SIZE:
return
transitions = replay_buffer.sample(BATCH_SIZE)
batch = Transition(*zip(*transitions))
state_batch = torch.cat(batch.state).to(device)
action_batch = torch.cat(batch.action).to(device)
reward_batch = torch.cat(batch.reward).to(device)
done_batch = torch.cat(batch.done).to(device)
# None이 아닌 다음 상태들만 필터링하여 배치로 만듭니다.
non_final_next_states_list = [s for s in batch.next_state if s is not None]
if len(non_final_next_states_list) > 0:
non_final_next_states = torch.cat(non_final_next_states_list).to(device)
else:
non_final_next_states = torch.empty(0, 1, BIN_SIZE, BIN_SIZE).to(device)
predicted_q = policy_net(state_batch).gather(1, action_batch)
next_state_q_values = torch.zeros(BATCH_SIZE, device=device)
non_final_mask = ~done_batch
# 다음 상태가 있는 경우에만 target_net으로 Q값 계산
if non_final_next_states.size(0) > 0:
next_state_q_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
target_q = reward_batch + (GAMMA * next_state_q_values)
loss = loss_function(predicted_q, target_q.unsqueeze(1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
print("===== 학습 시작 =====")
total_rewards = []
for i_episode in range(EPISODES):
env.reset()
done = False
episode_reward = 0
while not done:
state, valid_actions_mask = env.get_state_and_valid_actions()
if not valid_actions_mask.any():
break
if random.random() < epsilon:
valid_indices = torch.where(valid_actions_mask)[0]
action = random.choice(valid_indices).item()
else:
with torch.no_grad():
q_values = policy_net(state.to(device))
# 유효하지 않은 행동의 Q-값을 -무한대로 만들어 선택되지 않도록 합니다.
q_values = q_values.masked_fill(~valid_actions_mask.to(device), -float('inf'))
action = q_values.max(1)[1].item()
reward, done = env.step(action)
episode_reward += reward
next_state, _ = env.get_state_and_valid_actions()
if done:
next_state = None
action_tensor = torch.tensor([[action]], dtype=torch.long)
reward_tensor = torch.tensor([reward], dtype=torch.float)
done_tensor = torch.tensor([done], dtype=torch.bool)
replay_buffer.push(state, action_tensor, next_state, reward_tensor, done_tensor)
optimize_model()
epsilon = max(0.01, epsilon * 0.99995) # Epsilon 감쇠율 (최소값 0.01 보장)
if i_episode % TARGET_UPDATE == 0:
target_net.load_state_dict(policy_net.state_dict())
total_rewards.append(episode_reward)
if (i_episode + 1) % 100 == 0:
avg_reward = np.mean(total_rewards[-100:])
print(f"에피소드 {i_episode + 1}/{EPISODES} | 최근 100 에피소드 평균 보상: {avg_reward:.2f} | Epsilon: {epsilon:.3f}")
print("\n===== 학습 완료! =====\n")
print("===== 실제 적재 테스트 시작 =====")
env.reset()
env._print_status()
time.sleep(1)
done = False
while not done:
state, valid_actions_mask = env.get_state_and_valid_actions()
if not valid_actions_mask.any():
print("\n실패! 더 이상 물건을 놓을 공간이 없습니다.")
break
with torch.no_grad():
q_values = policy_net(state.to(device))
q_values = q_values.masked_fill(~valid_actions_mask.to(device), -float('inf'))
action = q_values.max(1)[1].item()
print(f"\n-> AI의 선택: {np.unravel_index(action, (BIN_SIZE, BIN_SIZE))} 위치에 놓기")
_, done = env.step(action)
env._print_status()
time.sleep(1)
placed_items = env.current_item_idx
total_items = len(env.items)
print("\n===== 테스트 종료! =====")
print(f"최종 결과: 총 {total_items}개의 물건 중 {placed_items}개를 적재했습니다.")
print("최종 상자 모습:")
print(env.bin)
Python
복사
학습결과는 아래와 같습니다.
0으로 표시된 빈 공간을 최소화 하여 적재된 것을 확인할 수 있습니다.
Using device: cuda
===== 학습 시작 =====
에피소드 100/40000 | 최근 100 에피소드 평균 보상: 65.52 | Epsilon: 0.697
에피소드 200/40000 | 최근 100 에피소드 평균 보상: 66.54 | Epsilon: 0.693
에피소드 300/40000 | 최근 100 에피소드 평균 보상: 69.78 | Epsilon: 0.690
...
에피소드 39800/40000 | 최근 100 에피소드 평균 보상: 86.92 | Epsilon: 0.096
에피소드 39900/40000 | 최근 100 에피소드 평균 보상: 91.20 | Epsilon: 0.095
에피소드 40000/40000 | 최근 100 에피소드 평균 보상: 89.87 | Epsilon: 0.095
===== 학습 완료! =====
===== 실제 적재 테스트 시작 =====
현재 상자 상태:
[[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
다음 물건 크기: 1x9
-> AI의 선택: (np.int64(0), np.int64(6)) 위치에 놓기
현재 상자 상태:
[[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
다음 물건 크기: 10x1
-> AI의 선택: (np.int64(9), np.int64(0)) 위치에 놓기
현재 상자 상태:
[[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
다음 물건 크기: 5x2
-> AI의 선택: (np.int64(4), np.int64(1)) 위치에 놓기
현재 상자 상태:
[[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 1. 1. 1. 1. 1. 1. 0. 0. 0.]
[0. 1. 1. 1. 1. 1. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
다음 물건 크기: 4x4
-> AI의 선택: (np.int64(0), np.int64(0)) 위치에 놓기
현재 상자 상태:
[[1. 1. 1. 1. 0. 0. 1. 0. 0. 0.]
[1. 1. 1. 1. 0. 0. 1. 0. 0. 0.]
[1. 1. 1. 1. 0. 0. 1. 0. 0. 0.]
[1. 1. 1. 1. 0. 0. 1. 0. 0. 0.]
[0. 1. 1. 1. 1. 1. 1. 0. 0. 0.]
[0. 1. 1. 1. 1. 1. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
다음 물건 크기: 3x3
-> AI의 선택: (np.int64(0), np.int64(7)) 위치에 놓기
현재 상자 상태:
[[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 0. 0. 0.]
[0. 1. 1. 1. 1. 1. 1. 0. 0. 0.]
[0. 1. 1. 1. 1. 1. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
다음 물건 크기: 3x3
-> AI의 선택: (np.int64(3), np.int64(7)) 위치에 놓기
현재 상자 상태:
[[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
다음 물건 크기: 3x3
-> AI의 선택: (np.int64(6), np.int64(0)) 위치에 놓기
현재 상자 상태:
[[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 0. 0. 0. 1. 0. 0. 0.]
[1. 1. 1. 0. 0. 0. 1. 0. 0. 0.]
[1. 1. 1. 0. 0. 0. 1. 0. 0. 0.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
다음 물건 크기: 3x3
-> AI의 선택: (np.int64(6), np.int64(7)) 위치에 놓기
현재 상자 상태:
[[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 0. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 0. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 0. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
다음 물건 크기: 2x3
-> AI의 선택: (np.int64(6), np.int64(4)) 위치에 놓기
현재 상자 상태:
[[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 0. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 0. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 0. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
다음 물건 크기: 2x3
-> AI의 선택: (np.int64(0), np.int64(4)) 위치에 놓기
현재 상자 상태:
[[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 0. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 0. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 0. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
===== 테스트 종료! =====
최종 결과: 총 10개의 물건 중 10개를 적재했습니다.
최종 상자 모습:
[[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 0. 0. 1. 1. 1. 1.]
[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 0. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 0. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 0. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
Python
복사
Reference:
[1] The Bin Packing Problem by Google, https://developers.google.com/optimization/pack/bin_packing, Creative Commons Attribution 4.0 License