Search

강화학습 이용해서 2D Bin Packing Problem 풀어보기

일단 가장 쉬운 REINFORCE로 해보기로 결정했다.
Environment 구현에서는 step Function 정의가 핵심이다.
step Function을 정의하기 위해서는 아래 개념의 정의가 필요하다.
action
state, statePrime
전체소스
# @title 기본 제목 텍스트 import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F import random import numpy as np import time from collections import namedtuple, deque # =================================================================== # 1. 환경 (Environment) - 'step' 메서드 수정 # =================================================================== class BinPackingEnv: def __init__(self, bin_width, bin_height): self.bin_width = bin_width self.bin_height = bin_height self.action_space_size = bin_width * bin_height self.items_list = self._generate_items() self.reset() def _generate_items(self): items = [] items.append((1, 9)) items.append((10, 1)) items.append((5, 2)) items.append((4, 4)) items.append((3, 3)) items.append((3, 3)) items.append((3, 3)) items.append((3, 3)) items.append((2, 3)) items.append((2, 3)) return items def reset(self): self.bin = np.zeros((self.bin_height, self.bin_width)) self.items = list(self.items_list) random.shuffle(self.items) self.current_item_idx = 0 def _print_status(self): print("\n현재 상자 상태:") print(self.bin) if self.current_item_idx < len(self.items): w, h = self.items[self.current_item_idx] print(f"다음 물건 크기: {w}x{h}") def get_state_and_valid_actions(self): state = torch.from_numpy(self.bin).float().unsqueeze(0).unsqueeze(0) if self.current_item_idx >= len(self.items): return state, torch.zeros(self.action_space_size, dtype=torch.bool) item_w, item_h = self.items[self.current_item_idx] valid_actions_mask = torch.zeros(self.bin_height, self.bin_width, dtype=torch.bool) for y in range(self.bin_height - item_h + 1): for x in range(self.bin_width - item_w + 1): if np.sum(self.bin[y:y + item_h, x:x + item_w]) == 0: valid_actions_mask[y, x] = True return state, valid_actions_mask.flatten() def _check_if_any_valid_moves_exist(self): """다음 아이템을 놓을 수 있는 유효한 공간이 하나라도 있는지 확인""" if self.current_item_idx >= len(self.items): return True # 이미 모든 아이템을 다 놓았으므로 유효하다고 판단 item_w, item_h = self.items[self.current_item_idx] for y in range(self.bin_height - item_h + 1): for x in range(self.bin_width - item_w + 1): if np.sum(self.bin[y:y + item_h, x:x + item_w]) == 0: return True # 놓을 수 있는 공간을 하나라도 찾으면 즉시 True 반환 return False # 전체를 탐색해도 놓을 공간이 없으면 False 반환 def step(self, action): item_w, item_h = self.items[self.current_item_idx] y, x = np.unravel_index(action, (self.bin_height, self.bin_width)) self.bin[y:y + item_h, x:x + item_w] = 1 reward = float(item_w * item_h) / 10.0 # 보상 스케일링 (학습 안정화에 도움) self.current_item_idx += 1 done = False # << 변경: 보상 설계 (Reward Shaping) 적용 >> # 다음 아이템을 놓을 곳이 있는지 확인 if not self._check_if_any_valid_moves_exist(): # 모든 아이템을 성공적으로 다 넣었는지 확인 if self.current_item_idx >= len(self.items): reward += 100 # 성공 보너스 else: reward -= 100 # 아이템이 남았는데 놓을 곳이 없으면 실패 패널티 done = True # 모든 아이템을 다 넣었지만, 위 조건에서 걸러지지 않은 경우(마지막 아이템을 딱 맞게 넣은 경우) if self.current_item_idx >= len(self.items) and not done: reward += 100 done = True return reward, done # =================================================================== # 2. 신경망 (DQN) - 구조 변경 # =================================================================== class DQN(nn.Module): # << 변경: 신경망 구조를 더 깊게 만듦 >> def __init__(self, h, w, outputs): super(DQN, self).__init__() self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1) self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1) self.conv3 = nn.Conv2d(64, 64, kernel_size=3, padding=1) # 레이어 추가 self.head = nn.Linear(h * w * 64, outputs) # 최종 레이어 입력 크기 변경 def forward(self, x): x = F.relu(self.conv1(x)) x = F.relu(self.conv2(x)) x = F.relu(self.conv3(x)) # 추가된 레이어 통과 return self.head(x.view(x.size(0), -1)) # =================================================================== # 3. 리플레이 버퍼 (Replay Buffer) - 변경 없음 # =================================================================== Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done')) class ReplayBuffer: def __init__(self, capacity): self.memory = deque([], maxlen=capacity) def push(self, *args): self.memory.append(Transition(*args)) def sample(self, batch_size): return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory) # --- 하이퍼파라미터 설정 (변경) --- BIN_SIZE = 10 N_ACTIONS = BIN_SIZE * BIN_SIZE EPISODES = 15000 # << 변경: 학습 시간 증가 LEARNING_RATE = 0.0005 # << 변경: 학습률 약간 낮춰 안정성 증가 GAMMA = 0.99 # << 변경: 미래 보상 가치 증가 BATCH_SIZE = 128 # << 변경: 배치 크기 증가 REPLAY_MEMORY_SIZE = 10000 # --- 학습 준비 --- env = BinPackingEnv(BIN_SIZE, BIN_SIZE) policy_net = DQN(BIN_SIZE, BIN_SIZE, N_ACTIONS) target_net = DQN(BIN_SIZE, BIN_SIZE, N_ACTIONS) target_net.load_state_dict(policy_net.state_dict()) target_net.eval() optimizer = optim.Adam(policy_net.parameters(), lr=LEARNING_RATE) loss_function = nn.MSELoss() replay_buffer = ReplayBuffer(REPLAY_MEMORY_SIZE) epsilon = 0.9 TARGET_UPDATE = 100 # =================================================================== # 4. 최적화 함수 (Optimize Function) - 변경 없음 # =================================================================== def optimize_model(): if len(replay_buffer) < BATCH_SIZE: return transitions = replay_buffer.sample(BATCH_SIZE) batch = Transition(*zip(*transitions)) state_batch = torch.cat(batch.state) action_batch = torch.cat(batch.action) reward_batch = torch.cat(batch.reward) done_batch = torch.cat(batch.done) non_final_next_states = torch.cat([s for s in batch.next_state if s is not None]) predicted_q = policy_net(state_batch).gather(1, action_batch) next_state_q_values = torch.zeros(BATCH_SIZE) non_final_mask = ~done_batch # 종료되지 않은 상태들만 True # 다음 상태가 있는 경우에만 target_net으로 Q값 계산 if non_final_next_states.size(0) > 0: next_state_q_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach() target_q = reward_batch + (GAMMA * next_state_q_values) loss = loss_function(predicted_q, target_q.unsqueeze(1)) optimizer.zero_grad() loss.backward() optimizer.step() # =================================================================== # 5. 학습 루프 (Training Loop) - Epsilon 감쇠율 수정 # =================================================================== print("===== 학습 시작 =====") total_rewards = [] for i_episode in range(EPISODES): env.reset() done = False episode_reward = 0 while not done: state, valid_actions_mask = env.get_state_and_valid_actions() if not valid_actions_mask.any(): break if random.random() < epsilon: valid_indices = torch.where(valid_actions_mask)[0] action = random.choice(valid_indices).item() else: with torch.no_grad(): q_values = policy_net(state) q_values = q_values.masked_fill(~valid_actions_mask, -float('inf')) action = q_values.max(1)[1].item() reward, done = env.step(action) episode_reward += reward next_state, _ = env.get_state_and_valid_actions() if done: next_state = None action_tensor = torch.tensor([[action]], dtype=torch.long) reward_tensor = torch.tensor([reward], dtype=torch.float) done_tensor = torch.tensor([done], dtype=torch.bool) replay_buffer.push(state, action_tensor, next_state, reward_tensor, done_tensor) optimize_model() if epsilon > 0.05: epsilon *= 0.9995 # << 변경: Epsilon 감쇠율 완만하게 조정 if i_episode % TARGET_UPDATE == 0: target_net.load_state_dict(policy_net.state_dict()) total_rewards.append(episode_reward) if (i_episode + 1) % 100 == 0: avg_reward = np.mean(total_rewards[-100:]) print(f"에피소드 {i_episode + 1}/{EPISODES} | 최근 100 에피소드 평균 보상: {avg_reward:.2f} | Epsilon: {epsilon:.3f}") print("\n===== 학습 완료! =====\n") # =================================================================== # 6. 학습된 모델 테스트 - 변경 없음 # =================================================================== print("===== 실제 적재 테스트 시작 =====") env.reset() env._print_status() time.sleep(1) done = False while not done: state, valid_actions_mask = env.get_state_and_valid_actions() if not valid_actions_mask.any(): print("\n실패! 더 이상 물건을 놓을 공간이 없습니다.") break with torch.no_grad(): q_values = policy_net(state) q_values = q_values.masked_fill(~valid_actions_mask, -float('inf')) action = q_values.max(1)[1].item() print(f"\n-> AI의 선택: {np.unravel_index(action, (BIN_SIZE, BIN_SIZE))} 위치에 놓기") _, done = env.step(action) env._print_status() time.sleep(1) placed_items = env.current_item_idx total_items = len(env.items) print("\n===== 테스트 종료! =====") print(f"최종 결과: 총 {total_items}개의 물건 중 {placed_items}개를 적재했습니다.") print("최종 상자 모습:") print(env.bin)
Python
복사
Reference:
[1] The Bin Packing Problem by Google, https://developers.google.com/optimization/pack/bin_packing, Creative Commons Attribution 4.0 License