본문 바로가기
머신러닝, 딥러닝/Recommendation System

Matrix Factorization(MF)기반 추천

by Deeppago 2022. 2. 21.

이전 업로드한 메모리 기반 협업 필터링인 사용자 기반 협업 필터링(UBCF)에 이어서 모델 기반 협업 필터링인 MF기반의 협업 필터링의 기본 개념에 대해서 정리해보려고 한다.

구현 코드 : SGD를 사용한 MF 알고리즘

-목차-

1. Matrix Factorization(MF) 의 원리

2. SGD(Stochastic Gradient Descent)를 사용한 MF 알고리즘

3. 모델 기반 추천 시스템의 장단점

4. SGD(Stochastic Gradient Descent)를 사용한 MF 구현


1. Matrix Factorization(MF) 의 원리

우선 MF 모델은 user-item 의 matrix에서 이미 rating이 부여되어 있는 상황을 가정한다. (당연히 sparse한 matrix를 가정한다)  MF의 목적은, 바꿔 말하면 Matrix Complement 이다. 아직 평가를 내리지 않은 user-item의 빈 공간을 Model-based Learning으로 채워넣는 것을 의미한다.

 

유저간, 혹은 아이템간 유사도를 이용하는 Memory-based 방법과 달리, MF는 행렬 인수 분해라는 수학적 방법으로 접근한다. 이는 행렬은 두개의 하위 행렬로 분해가 가능하며, 다시 곱해져서 원래 행렬과 동일한 크기의 단일 행렬이 될 수 있다는 성질에 기인한 것이다.

 

크기 U X M을 가지는 rating matrix R이 있다고 하자. 이 때 R은 각각의 크기가 U X K, M X K인 두 개의 행렬 P와 Q로 분해될 수 있다고 가정해본다. 그리고 다시 P X Q 행렬을 계산하면, 원래의 matrix R와 굉장히 유사하며 크기가 동일한 행렬이 생성된다. 중요한 것은, 행렬이 재생성 되면서 빈공간이 채워진다는 것이다. 이러한 행렬 인수 분해의 원칙은 비평가 항목을 채우기 위함이라는 것을 알 수 있다. 또한 한 가지 알 수 있는 것은, 분해된 행렬 P, Q는 각각 User-latent factor matrix, Item-latent factor matrix라는 각각의 내재적 의미를 나타내는 잠재 행렬도 나타낼 수 있다는 것이다. 이는 사람이 해석하는 잠재의미로도 해석은 가능하지만 기계가 해석하기 위한 행렬, 즉 블랙 박스 모델에 더 가깝다.

 

 


2. SGD(Stochastic Gradient Descent)를 사용한 MF 알고리즘

어떤 도메인에 대해서 사용자와 아이템의 특성을 잘 설명할 수 있는 \(K\)개의 요인이 존재하고, 각 사용자와 아이템의 \(K\)개 요인에 대한 측정값을 알아낼 수 있다면 모든 사용자의 모든 아이템에 대한 예측 평점을 계산할 수 있다. 이것이 MF의 주요한 원리이다.

주어진 (사용자 x 아이템)의 평점행렬인 \(R\)로부터 \(P\)와 \(Q\)를 분해하는 알고리즘을 개념적으로 설명하면 아래와 같다.

  1. 잠재요인의 개수인 \(K\)를 정한다. 
  2. 주어진 \(K\)에 따라 \(P(M\times K)\)와 \(Q(N\times K)\)행렬을 만들고 초기화 한다.
  3. 주어진 \(P\), \(Q\)행렬을 사용해서 예측 평점 \(\hat{R}(=P\times Q^T)\)을 구한다.
  4. \(R\)에 있는 실제 평점에 대해서 예측 평점 \(\hat{R}\)의 예측과 비교해서 오차를 구하고, 이 오차를 줄이기 위해서 \(P\), \(Q\)값을 수정한다.
  5. 전체오차가 미리 정해진 기준값 이하가 되거나 미리 정해진 반복 횟수에 도달할 때까지 3, 4과정을 반복한다.

여기에서 핵심은 4에서 예측 오차를 줄이기 위해서 \(P\), \(Q\)를 어떻게 수정하는가이다. 가장 일반적인 방법은 기계학습에서 많이 사용되는 SGD(Stochastic Gradient Descent)방법을 적용하는 것이다. SGD 방법을 사용해서 \(P\), \(Q\)행렬을 최적화하는 방법을 알아보자.

 

앞서 설명했던 \(P\), \(Q\)행렬을 좀더 자세히 들여다보자. 예측값 \(\hat{R}\)은 \(P\times Q^T\)이므로 사용자 \(i\)의 아이템 \(j\)에 대한 예측값 \(\hat{r}_{ij}\)는 아래와 같이 계산된다.

\[\hat{r_{ij}} = p_{i}^{T}q_{j} = \sum_{k=1}^{K}p_{ik}q_{kj}\]

 

SGD를 위한 오차 지표인 MSE나 RMSE를 줄이기 위해서는 오차의 제곱을 최소화 해야한다. 또한 다른 기계학습 알고리즘과 마찬가지로 과적합(over-fitting)을 방지하기 위해서 아래와 같이 MSE함수를 정의할 수 있다.

\[e^2_{ij} = (r_{ij} - \sum_{k=1}^{K}p_{ik}q_{kj})^2 + \frac{\beta }{2}\sum_{k=1}^{K}(\left\|P \right\|^2+\left\|Q \right\|^2)\]

 

여기서 한가지 더 고려할 것은 각 사용자와 각 아이템의 경향성(bias)이다. 각 사용자와 각 아이템은 일정한 경향성(평가가 높거나 낮은 경향)이 있다. 그래서 이 경향서을 제외하고 나머지 데이터만을 분석하는 것이 더 정확하다. 그래서 \(\hat{r}_{ij}\)을 경향성을 분리해서 수정하면 아래와 같다.

\[\hat{r}_{ij} = b + bu_i+bd_j+\sum_{k=1}^{K}p_{ik}q_{ki}\]

 

여기서 \(b\)는 전체평균 \(bu_i\)는 사용자 \(i\)의 평균과 전체평균의 편자(사용자 \(i\)의 평가경향) \(bd_j\)는 아이템 \(j\)의 평균과 전체평균의 편자(아이템 \(j\)의 평가경향)을 의미한다. MF에서는 계산할 때마다 오차를 최소화 하도록 평가 경향을 다시 조정한다. \(e^2_{ij}\)에서 \(\hat{r_{ij}}\)부분을 위의 식으로 변환하고 \(bu_i\)와 \(bd_j\)에 대해서 편미분과정을 거치면 아래의 식에따라 \(bu_i\)와 \(bd_j\)가 갱신된다.

\[{bu}'_i = bu_i + \alpha (e_{ij}-\beta bu_i)\]

\[{bd}'_j = bd_j + \alpha (e_{ij}-\beta bd_j)\]

 

또한 오차를 줄여주는 새로운 \({p}'_i\)와 \({q}'_j\)를 구하기 위해 \(p\)와 \(q\)에대해 편미분을 하면 아래와 같다.

\[{p}'_{ik} = p_{ik} + \alpha (2e_{ij}q_{kj}-\beta p_{ik})\]

\[{q}'_{kj} = q_{kj} + \alpha (2e_{ij}p_{ik}-\beta q_{kj})\]

 

이상의 설명을 종합하면 위 \({bu}'_i\), \({bd}'_j\), \({p}'_{ik}\), \({q}'_{kj}\) 네개의 식이 SGD를 사용해서 MF의 \(P\), \(Q\)를 구하기 위한 최종 식이된다.

 


3. 모델 기반 추천 시스템의 장단점

메모리 기반 추천은 모든 데이터를 메모리에 저장하고 있기 때문에 원래 데이터를 충실하게 사용하는 장점이 있지만 대량의 데이터를 다뤄야 하는 상용 사이트에서는 계산시간이 너무 오래 걸린다는 단점이 있다.

이에 비해 모델 기반 추천 방식은 원래 데이터는 모델을 만드는 데만 사용하고 일단 모델이 만들어지면 원래 데이터는 사용하지 않기 때문에 대규모 상용 사이트에서 필요한 빠른 반응이 가능하지만 모델을 만드는 과정에서 많은 계산이 필요하다는 단점이 있다.

일반적으로 메모리 기반 추천은 개별 사용자의 데이터에 집중하는 데 비해, 모델 기반 추천은 전체 사용자의 평가 패턴으로부터 모델을 구성하기 때문에 데이터가 가지고 있는 약한 신호도 더 잘 잡아내는 장점이 있다. 

예를 들어 소수의 사용자가 소수의 영화에 대해서만 특정한 평가 패턴이 있는 경우, 개별 사용자나 개별 아이템에 집중하는 메모리 기반 알고리즘은 이것을 잡아 내기 쉽지 않지만 전체 데이터를 대상으로 모델을 구성하는 모델 기반 추천은 이것을 더 잘 잡아낼 수 있다.

 


4. SGD(Stochastic Gradient Descent)를 사용한 MF 구현

위에서 설명한 알고리즘을 Python으로 구현한 코드이다.

import numpy as np
import pandas as pd
from sklearn.utils import shuffle
import matplotlib.pyplot as plt

r_cols = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings = pd.read_csv('./Data/u.data', names=r_cols,  sep='\t',encoding='latin-1')
ratings = ratings[['user_id', 'movie_id', 'rating']].astype(int)            # timestamp 제거

# train set의 비율 75%, test set의 비율 25%
TRAIN_SIZE = 0.75
# 데이터를 무작위로 섞는다.
ratings = shuffle(ratings, random_state=1)
cutoff = int(TRAIN_SIZE * len(ratings))
ratings_train = ratings.iloc[:cutoff]
ratings_test = ratings.iloc[cutoff:]

# New MF class for training & testing
class NEW_MF():
    def __init__(self, ratings, K, alpha, beta, iterations, verbose=True):
        self.R = np.array(ratings)
        # user_id, item_id를 R의 index와 매핑하기 위한 dictionary 생성 과정
        item_id_index = []
        index_item_id = []
        for i, one_id in enumerate(ratings):
            item_id_index.append([one_id, i])
            index_item_id.append([i, one_id])
        self.item_id_index = dict(item_id_index)
        self.index_item_id = dict(index_item_id)        
        user_id_index = []
        index_user_id = []
        for i, one_id in enumerate(ratings.T):
            user_id_index.append([one_id, i])
            index_user_id.append([i, one_id])
        self.user_id_index = dict(user_id_index)
        self.index_user_id = dict(index_user_id)

        # num_users : 사용자 수, num_items(아이템 수)
        self.num_users, self.num_items = np.shape(self.R)
        # K : 잠재요인(Latent Fector)의 수
        self.K = K
        # alpha : Learning Rate
        self.alpha = alpha
        # beta : 정규화 계수
        self.beta = beta
        # iterations : 반복 횟수
        self.iterations = iterations
        # verbose : 중간 학습과정 출력 여부
        self.verbose = verbose

    # train set의 RMSE 계산
    # 현재의 P와 Q를 가지고 Root Mean Squared Error (RMSE) 계산
    def rmse(self):
        # 평점이 있는(0이 아닌) 요소의 인덱스
        xs, ys = self.R.nonzero()
        self.predictions = []
        self.errors = []
        for x, y in zip(xs, ys):
            # 사용자 x, 아이템 y에 대해서 평점 예측치를 get_prediction()함수를 사용하여 계산
            prediction = self.get_prediction(x, y)
            self.predictions.append(prediction)
            # 계산된 평점과 실제 값간의 오차를 errors리스트에 추가
            self.errors.append(self.R[x, y] - prediction)
        self.predictions = np.array(self.predictions)
        self.errors = np.array(self.errors)
        # RMSE를 계산하여 반환
        return np.sqrt(np.mean(self.errors**2))

    # Ratings for user i and item j
    def get_prediction(self, i, j):
        prediction = self.b + self.b_u[i] + self.b_d[j] + self.P[i, :].dot(self.Q[j, :].T)
        return prediction

    # Stochastic gradient descent to get optimized P and Q matrix
    def sgd(self):
        for i, j, r in self.samples:
            prediction = self.get_prediction(i, j)
            e = (r - prediction)

            self.b_u[i] += self.alpha * (e - self.beta * self.b_u[i])
            self.b_d[j] += self.alpha * (e - self.beta * self.b_d[j])

            self.P[i, :] += self.alpha * (e * self.Q[j, :] - self.beta * self.P[i,:])
            self.Q[j, :] += self.alpha * (e * self.P[i, :] - self.beta * self.Q[j,:])

##### >>>>> (3)
    # Test set을 선정
    def set_test(self, ratings_test):
        # 분리된 test set을 넘겨받아 클래스 내부의 test_set을 만드는 함수
        test_set = []
        for i in range(len(ratings_test)):      # test 데이터에 있는 각 데이터에 대해서
            # 현재 사용자의 인덱스를 user_id_index에서 받아온다.
            x = self.user_id_index[ratings_test.iloc[i, 0]]
            # 현재 아이템의 인덱스를 item_id_index에서 받아온다.
            y = self.item_id_index[ratings_test.iloc[i, 1]]
            # 현재 사용자-아이템의 평점
            z = ratings_test.iloc[i, 2]
            # (사용자, 아이템, 평점)을 test_set에 추가
            test_set.append([x, y, z])
            # R을 사용해서 MF 모델을 학습하기 떄문에 test_set의 R을 제거
            self.R[x, y] = 0                    # Setting test set ratings to 0
        self.test_set = test_set
        return test_set                         # Return test set

    # Test set의 RMSE 계산
    def test_rmse(self):
        error = 0
        # test set에 있는 각각의 (사용자, 아이템, 평점)에 대해서 RMSE를 구한다.
        for one_set in self.test_set:
            predicted = self.get_prediction(one_set[0], one_set[1])
            error += pow(one_set[2] - predicted, 2)
        return np.sqrt(error/len(self.test_set))

    # Training 하면서 test set의 정확도를 계산
    def test(self):
        # Initializing user-feature and item-feature matrix
        self.P = np.random.normal(scale=1./self.K, size=(self.num_users, self.K))
        self.Q = np.random.normal(scale=1./self.K, size=(self.num_items, self.K))

        # Initializing the bias terms
        self.b_u = np.zeros(self.num_users)
        self.b_d = np.zeros(self.num_items)
        self.b = np.mean(self.R[self.R.nonzero()])

        # List of training samples
        rows, columns = self.R.nonzero()
        self.samples = [(i, j, self.R[i,j]) for i, j in zip(rows, columns)]

        # Stochastic gradient descent for given number of iterations
        training_process = []
        for i in range(self.iterations):
            np.random.shuffle(self.samples)
            self.sgd()
            rmse1 = self.rmse()
            rmse2 = self.test_rmse()
            training_process.append((i+1, rmse1, rmse2))
            if self.verbose:
                if (i+1) % 10 == 0:
                    print("Iteration: %d ; Train RMSE = %.4f ; Test RMSE = %.4f" % (i+1, rmse1, rmse2))
        return training_process

    # Ratings for given user_id and item_id
    def get_one_prediction(self, user_id, item_id):
        return self.get_prediction(self.user_id_index[user_id], self.item_id_index[item_id])

    # Full user-movie rating matrix
    def full_prediction(self):
        return self.b + self.b_u[:,np.newaxis] + self.b_d[np.newaxis,:] + self.P.dot(self.Q.T)
        
# 최적의 K값 찾기
results = []
index = []
for K in range(50, 261, 10):
    print('K =', K)
    R_temp = ratings.pivot(index='user_id', columns='movie_id', values='rating').fillna(0)
    mf = NEW_MF(R_temp, K=K, alpha=0.001, beta=0.02, iterations=300, verbose=True)
    test_set = mf.set_test(ratings_test)
    result = mf.test()
    index.append(K)
    results.append(result)
    
# 최적의 iterations 값 찾기
summary = []
for i in range(len(results)):
    RMSE = []
    for result in results[i]:
        RMSE.append(result[2])
    min = np.min(RMSE)
    j = RMSE.index(min)
    summary.append([index[i], j+1, RMSE[j]])
    
# 그래프 그리기
plt.plot(index, [x[2] for x in summary])
plt.ylim(0.89, 0.94)
plt.xlabel('K')
plt.ylabel('RMSE')
plt.show()

# 최적의 K와 iteration으로 모델 학습
R_temp = ratings.pivot(index='user_id', columns='movie_id', values='rating').fillna(0)
mf = NEW_MF(R_temp, K=30, alpha=0.001, beta=0.02, iterations=100, verbose=True)
test_set = mf.set_test(ratings_test)
result = mf.test()

# Printing predictions
print(mf.full_prediction())
print(mf.get_one_prediction(1, 2))

 


참고 자료

https://sungkee-book.tistory.com/12

https://yamalab.tistory.com/92

Python을 이용한 개인화 추천시스템

 

댓글