【論文】Squeeze-and-Excitation Networks

読んだ論文をアウトプットしていく。

[1709.01507] Squeeze-and-Excitation Networks


・Feed-Foward なネットワークでAttention機構を持たせる
・チャネル間の相互関係性を考慮
・Channel-wiseな非線形な相互関係性を学習するために、小さいNNを挟んでるイメージ
・Excitationの2つめの制約である下記でなんでシグモイド採用されてるかわからん。
 " second, it must learn a non-mutually-exclusive relationship as multiple
channels are allowed to be emphasised opposed to onehot
activation"

KerasでCifar10分類、とりあえずCNNのAlexModelで

GPUマシンが使えるようになったので、Kerasで用意されているデータセットの中にcifar10があったので学習・分類してみた。
モデルはcifar10の作成者でもあり、ILSVRC2012優勝者でもあるAlex Krinzhvskyさんの優勝時のモデルがベース。
モデルの構成について深層学習 (機械学習プロフェッショナルシリーズ)にあった表を参考にした。

モデル作成してて感じたのはやっぱりKerasの自由度の低さ。
レイヤーに差し込む層のタイプによってはストライドが設定できないなど、完全な再現は無理だった。
この辺はKerasでオリジナルのレイヤーを作成して対応していく必要があるのかも。
もう少し調べてから改めて実装したい。

とりあえず、モデル学習のためのスクリプトが下記。

from keras.datasets import cifar10
from keras.models import Model
from keras.layers import Flatten, Dense, Input, Dropout
from keras.layers import Convolution2D, MaxPooling2D, BatchNormalization
from keras.optimizers import Adam
from keras.utils import np_utils

def make_network():
    input_shape = (3,32, 32)
    img_input = Input(shape=input_shape)    


    # Block 1
    x = Convolution2D(96, 11, 11, activation='relu', border_mode='same', name='conv1')(img_input)
    x = MaxPooling2D((3, 3), strides=(2, 2), name='pool1')(x)

    # Block 2
    x = BatchNormalization(name='norm1_')(x)
    x = Convolution2D(256, 5, 5, activation='relu', border_mode='same', name='conv2')(x)
    x = MaxPooling2D((3, 3), strides=(2, 2), name='pool2')(x)

    # Block 3
    x = BatchNormalization(name='norm2_')(x)
    x = Convolution2D(384, 3, 3, activation='relu', border_mode='same', name='conv3')(x)
    x = Convolution2D(384, 3, 3, activation='relu', border_mode='same', name='conv4')(x)
    x = Convolution2D(256, 3, 3, activation='relu', border_mode='same', name='conv5')(x)
    x = MaxPooling2D((3, 3), strides=(2, 2), name='pool5')(x)

    #Classification block
    x = Flatten(name='flatten')(x)
    x = Dense(1024, activation='relu', name='fc6')(x)
    x = Dropout(p = 0.5)(x)
    x = Dense(1024, activation='relu', name='fc7')(x)
    x = Dropout(p = 0.5)(x)
    x = Dense(10, activation='softmax', name='fc8')(x)

    model = Model(img_input, x)

    return model
    

def train_model(model, X_train, Y_train, nb_epoch, batch_size):
    adam = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08)
    model.compile(loss='categorical_crossentropy', optimizer=adam, metrics=['accuracy'])

    model.fit(X_train, Y_train,
              batch_size=batch_size,
              nb_epoch=nb_epoch,
              validation_split=0.1,
              verbose=1)



if __name__ == '__main__':
  (X_train, y_train), (X_test, y_test) = cifar10.load_data()
    batch_size = 32
    nb_classes = 10
    nb_epoch = 200
    data_augmentation = True
    
    
    # convert class vectors to binary class matrices
    nb_classes = 10
    
    #きっとone-hotエンコーディング
    Y_train = np_utils.to_categorical(y_train, nb_classes)
    Y_test = np_utils.to_categorical(y_test, nb_classes) 
    
    X_train = X_train.astype('float32')
    X_test = X_test.astype('float32')
    X_train /= 255 
    X_test /= 255  
    
    model = make_network()
    %time train_model(model, X_train, Y_train, nb_epoch = nb_epoch, batch_size = batch_size)
    
    #モデルのセーブ
    model.save('CNN_Trained_for_cifar10.h5')

データサイズが50,000で45,000を学習用、残りを評価用に使い、GPUがGeForce GTX 960、メモリ容量が2GBのマシンでEpoch数を200回にして学習。
かかった時間が13時間半と、自分の予想より結構長かった。
最後にモデルをセーブしておいたので、とりあえずこのモデルの評価はまた次回。

参考

深層学習 (機械学習プロフェッショナルシリーズ)

深層学習 (機械学習プロフェッショナルシリーズ)

確率的勾配法あれこれまとめ

Kerasで選択できる最適化アルゴリズムそれぞれの違いと使い所がいまいちわからんかったので調べてみた。


Incorporating Nesterov Momentum into Adamがアルゴリズムを整理してくれているので理解しやすかった。

とりあえずざっくりと俯瞰した感じだと、いかに効率良く傾斜を降下していくかという課題を解決するっていう大枠からはみ出るものはない。そんで、構築しているモデルの種類やサイズによってベストなアルゴリズムは変わってくるので、突き詰めるのであれば要実験。ただ、上記論文は、NadamかRSMProp使っときゃいいんじゃないっすか、みたいなこと言ってる。なんにしろ2000年代後半以降で進化が進んでいる分野であり、今後もアップデートがあるだろうから追っていきたい。

SGD

まずはオーソドックスな勾配法。


g_t ← \nabla_{\theta_1} f(\theta_{t-1}) \\
\theta_t ← \theta_{t-1} - \eta g_t

後述するMomentum法や、NAGもKerasの中ではSGDメソッドの中でサポートされている。

Mpomentum

勾配に加えてMomentumベクトル mu を加えてパラメータを更新する。


g_t ← \nabla_{\theta_1} f(\theta_{t-1}) \\
m_t ← m_{t-1} + g_t \\
\theta_t ← \theta_{t-1} - \eta g_t

Nesterov's accelerated gradient(NAG)

Momuentum法に対して、勾配計算の段階ですでにMomentumを考慮することで、現在のパラメータ\thetaではなく、次のパラメーターの推定値について計算することで効率よく予測するアルゴリズム。


g_t ← \nabla_{\theta_1} f(\theta_{t-1} - \eta \mu m_{t-1}) \\
m_t ← m_{t-1} + g_t \\
\theta_t ← \theta_{t-1} - \eta g_t

AdaGrad

それぞれ個別のパラメータ\theta_iに対して異なる学習率を適用するアルゴリズム。
学習率の補正にそれまでのパラメータ[\theta_i]の勾配の二乗和を用いるのでL2ノルムベースのアルゴリズムと分類される。


g_t ← \nabla_{\theta_1} f(\theta_{t-1}) \\\\
n_t ← n_{t-1} + g^2_t \\
\theta_t ← \theta_{t-1} - \eta \frac{g_t}{\sqrt{n_t}+\epsilon}

ここでn_tはさきのそれぞれのパラメータの勾配の二乗和のタイムステップtまでのベクトル

RSMProp

これもL2ノルムベース。前述の[n]に対して、勾配の二乗の減衰平均E[g^2_t]を用いる。AdaGradで大きくなりすぎる[n]ことが問題だったがこれで解消。


g_t ← \nabla_{\theta_1} f(\theta_{t-1}) \\
n_t ← \nu n_{t-1} + (1-\nu)g^2_t \\
\theta_t ← \theta_{t-1} - \eta \frac{g_t}{\sqrt{n_t}+\epsilon}

Adam

Momentum法とRMSPropを組み合わせたもの。
勾配の1乗と勾配の2乗、両方使えばいいじゃん的な。


g_t ← \nabla_{\theta_1} f(\theta_{t-1}) \\

m_t ← \mu m_{t-1} + (1-\mu)g_t \\
\hat m_t ← \frac{m_t}{1-\mu^t} \\

n_t ← \nu n_{t-1} + (1-\nu)g^2_t \\
\hat n_t ← \frac{n_t}{1-\nu ^t} \\


\theta_t ← \theta_{t-1} - \eta \frac{g_t}{\sqrt{\hat n_t}+\epsilon}

AdaMax

AdamのL2ノルムを拡張し無限にするとよりシンプルなアルゴリズムとなる、らしい。


g_t ← \nabla_{\theta_1} f(\theta_{t-1}) \\

m_t ← \mu m_{t-1} + (1-\mu)g_t \\
\hat m_t ← \frac{m_t}{1-\mu^t} \\

n_t ← max(\mu n_{t-1}, |g_t|) \\

\theta_t ← \theta_{t-1} - \eta \frac{g_t}{\sqrt{n_t}+\epsilon}

Nadam

Adamと違ってNAGとRMSPropを組み合わせたもの。


g_t ← \nabla_{\theta_1} f(\theta_{t-1}) \\
\hat g_t ← \frac{g_t}{1-\prod_{i=1}^{t} \mu_i}\\


m_t ← \mu m_{t-1} + (1-\mu)g_t \\
\hat m_t ← \frac{m_t}{1-\prod_{i=1}^{t} \mu_i} \\

n_t ← \nu n_{t-1} + (1-\nu)g^2_t \\
\hat n_t ← \frac{n_t}{1-\nu ^t} \\

\bar \mu_t ← (1- \mu_t)\hat g_t + \mu_{t+1} \hat \mu_t \\

\theta_t ← \theta_{t-1} - \eta \frac{\bar \mu_t}{\sqrt{\hat n_t}+\epsilon}

CODE COMPLETE 第2版 を読んで

『CODE COMPLETE 第2版 上』、『CODE COMPLETE 第2版 下』を読み終えました。

CODE COMPLETE 第2版 上 完全なプログラミングを目指して

CODE COMPLETE 第2版 上 完全なプログラミングを目指して

CODE COMPLETE 第2版 下 完全なプログラミングを目指して

CODE COMPLETE 第2版 下 完全なプログラミングを目指して

正直、自分は大規模開発に携わったこともないし、もしかしたらこれから携わることもないかもしれない。そもそも、職業プログラマですらないかもしれない。けどプログラミングを道具として使う立場として、読んで理解できる部分は多かったです。

特に第7部の”ソフトウェア職人気質とは”では情報工学を学ばず、独学でやってきた自分がチームでプログラミングを行う際に注意すべきだろうなと思っていた、レイアウトやステートメントの仕方、変数の名付け方などが体型だって書いてあり、迷った際に帰ってくる部分ができたのがありがたかったです。

次の職務でどの程度コンストラクションに気をつけなければいけないかまだ検討が付かない段階ですので、他の多くの読者の方々とは得られたものが違ったかも知れません。
自分の業務の中でのプログラミングとの付き合い方がわかってくるであろう、半年後、一年後にもう一度読み直してみようと思います。

CodingBatでアルゴリズム100本ノック

これまでの自分のプログラミングは機械学習周辺に偏っていて、コンピューターサイエンスやったことある人間が通ってくる基本的なアルゴリズムについての知識が足りていないとの指摘を受けたので、その部分を埋めるためにCodingBatにチャレンジしてみた。

Coding Bat
http://codingbat.com/

簡単なアルゴリズムをひたすら書かせるプログラミングサイトで、初めの一歩として楽しくできた。
自分のコードはGitHubに載せておくことにする。

次はもう少し難しいアルゴリズムを練習するために、積読になっているこの本を練習する予定。

世界で闘うプログラミング力を鍛える150問 トップIT企業のプログラマになるための本

世界で闘うプログラミング力を鍛える150問 トップIT企業のプログラマになるための本

Scikit-learnのpipeleine.Pipelineが便利

分析する際に、次元圧縮→分類のような流れで行う場合には、scikit-learnのPipelineが便利。特にハイパーパラメーターを探すときには手続が煩雑になることもありますが、まとめて分類器としててGridSearchCVに突っ込むだけで良いのでめんどくさいこと考えずに済みますね。

今回はScikit-learnのサンプルデータの中から、the digits datasetのデータをロードし、PCAでの次元圧縮からSVMでの分類をPipelineでまとめて実行する手順を確認してみました。

コード

まずはデータを読み込み、訓練データとテストデータに分割。
また手書き文字のデータがどんなものかを描き出します。

import numpy as np

from sklearn import svm
from sklearn.decomposition import PCA

from sklearn import datasets
from sklearn.grid_search import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.cross_validation import train_test_split
from sklearn import metrics

from matplotlib import pyplot as plt
from matplotlib import cm
%matplotlib inline

digits = datasets.load_digits()
X = digits.data
y = digits.target
print 'Number of data = {0}, Dimension = {1}'.format(X.shape[0],X.shape[1])

p = np.random.random_integers(0, len(digits.data), 25)
for index, (data, label) in enumerate(np.array(zip(digits.data, digits.target))[p]):
    plt.subplot(5, 5, index + 1)
    plt.axis('off')
    plt.imshow(data.reshape(8, 8), cmap=plt.cm.gray_r, interpolation='nearest')
    plt.title('%i' % label)
plt.show()


#Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

Number of data = 1797, Dimension = 64

f:id:tkzs:20160626092429p:plain


続いて、PCAで次元削減、SVMでの分類という2つの手順を一つのPipelineで分類器としてまとめます。
今回グリッドサーチでパラメータ探索するためにリスト形式で候補を入れてます。
複数の関数にまたがってパラメーターを指定することになるため、ハイフン二つの後にパラメーター名を指定することで、コントラクタに渡すようになってます。

estimators = [('pca', PCA()),
              ('svm', svm.SVC())]


parameters = {"pca__n_components" : range(2, 6),
              "svm__kernel" : ["linear", "poly", "rbf", "sigmoid"],
              'svm__C': np.logspace(0, 2, 10).tolist(),
              "svm__gamma": np.logspace(-3, 0, 10).tolist()}

pl = Pipeline(estimators)

最後にplを一つの分類器としてみなし、そのままGridSearchCVに突っ込みます。

clf = GridSearchCV(pl, parameters, n_jobs=-1)
clf.fit(X_train, y_train)
print 'Best_estimator = {0}'.format(clf.best_estimator_.get_params())

#予測
Predict = clf.predict(X_test)

n_samples = len(digits.data) 
expected = digits.target[n_samples * -4 / 10:] 
predicted = clf.predict(digits.data[n_samples * -4 / 10:]) 


print("Classification report for classifier %s:\n%s\n"
      % (clf, metrics.classification_report(expected, predicted)))
print("Confusion matrix:\n%s" % metrics.confusion_matrix(expected, predicted))

Best_estimator = {'svm__max_iter': -1, 'svm__coef0': 0.0, 'svm': SVC(C=1.6681005372, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape=None, degree=3, gamma=0.01, kernel='rbf',
  max_iter=-1, probability=False, random_state=None, shrinking=True,
  tol=0.001, verbose=False), 'svm__random_state': None, 'pca__copy': True, 'svm__degree': 3, 'pca__n_components': 5, 'svm__gamma': 0.01, 'svm__shrinking': True, 'pca__whiten': False, 'svm__tol': 0.001, 'svm__verbose': False, 'svm__C': 1.6681005372000588, 'steps': [('pca', PCA(copy=True, n_components=5, whiten=False)), ('svm', SVC(C=1.6681005372, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape=None, degree=3, gamma=0.01, kernel='rbf',
  max_iter=-1, probability=False, random_state=None, shrinking=True,
  tol=0.001, verbose=False))], 'svm__probability': False, 'svm__class_weight': None, 'pca': PCA(copy=True, n_components=5, whiten=False), 'svm__decision_function_shape': None, 'svm__kernel': 'rbf', 'svm__cache_size': 200}
Classification report for classifier GridSearchCV(cv=None, error_score='raise',
       estimator=Pipeline(steps=[('pca', PCA(copy=True, n_components=None, whiten=False)), ('svm', SVC(C=1.0, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape=None, degree=3, gamma='auto', kernel='rbf',
  max_iter=-1, probability=False, random_state=None, shrinking=True,
  tol=0.001, verbose=False))]),
       fit_params={}, iid=True, n_jobs=-1,
       param_grid={'pca__n_components': [2, 3, 4, 5], 'svm__C': [1.0, 1.6681005372000588, 2.7825594022071245, 4.641588833612778, 7.742636826811269, 12.91549665014884, 21.544346900318832, 35.93813663804626, 59.94842503189409, 100.0], 'svm__gamma': [0.001, 0.0021544346900318843, 0.004641588833612777, 0.01, 0.021544346900318832, 0.046415888336127774, 0.1, 0.21544346900318823, 0.46415888336127775, 1.0]},
       pre_dispatch='2*n_jobs', refit=True, scoring=None, verbose=0):
             precision    recall  f1-score   support

          0       1.00      0.96      0.98        71
          1       0.99      0.97      0.98        73
          2       0.94      0.93      0.94        71
          3       0.95      0.96      0.95        74
          4       0.96      0.96      0.96        74
          5       0.97      1.00      0.99        71
          6       0.99      1.00      0.99        74
          7       0.93      0.92      0.92        72
          8       0.84      0.85      0.85        68
          9       0.90      0.92      0.91        71

avg / total       0.95      0.95      0.95       719


Confusion matrix:
[[68  0  0  0  2  0  1  0  0  0]
 [ 0 71  0  0  0  0  0  1  0  1]
 [ 0  0 66  1  0  0  0  0  2  2]
 [ 0  0  0 71  0  0  0  2  0  1]
 [ 0  0  0  0 71  0  0  0  3  0]
 [ 0  0  0  0  0 71  0  0  0  0]
 [ 0  0  0  0  0  0 74  0  0  0]
 [ 0  1  0  0  0  0  0 66  5  0]
 [ 0  0  4  0  1  0  0  2 58  3]
 [ 0  0  0  3  0  2  0  0  1 65]]

Spark MLlibの協調フィルタリングを活用したMovie Recommendation

Sparkを触る機会が増えてきてるので、知識の棚卸しを兼ねてMLlib使ってレコメンデーションシステムを実装してみました。SparkSamit2014などMLlibのチュートリアル的に色々使われているSparkのMovie Recommendationですが、edXのIntroduction to Big Data with Apache Sparが内容的にも良さそうだったので、題材にしながら実装しました。本講座はSpark 1.3.1での実装ですが少し古すぎるので、1.6.1で使える機能は使う形でコード変えてます。

おおまかな手順

①データの準備
元データを訓練、評価、テストデータにそれぞれ分割
②評価数500以上の映画の中から平均評価点が高いものを表示
③協調フィルタリングの実装
④訓練データに自分をuserID"0"として加え、好きな映画を評価
⑤自分の評価をもとに、アルゴリズムに映画を推薦させる

コード

まずはデータを読み込んで分割、眺めてみます。
使うデータはUserID::MovieID::Rating::Timestampで構成されるmovieデータセットと、MovieID::Title::Genresで構成されるRatingデータセットがあります。

numPartitions = 2

ratingFileName = "ratings.txt"
rawRatings = sc.textFile(ratingFileName, numPartitions)

moviesFileName = "movies.txt"
rawMovies = sc.textFile(moviesFileName, numPartitions)

def get_ratings_tuple(entry):
    items = entry.split('::')
    return int(items[0]), int(items[1]), float(items[2])

def get_movie_tuple(entry):
    items = entry.split('::')
    return int(items[0]), items[1]

ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()

ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count()

print 'There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)
print 'Ratings: %s' % ratingsRDD.take(3)
print 'Movies: %s' % moviesRDD.take(3)

There are 1000209 ratings and 3883 movies in the datasets
Ratings: [(1, 1193, 5.0), (1, 661, 3.0), (1, 914, 3.0)]
Movies: [(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)'), (3, u'Grumpier Old Men (1995)')]

どのような映画が人気か調べるため500以上のReviewがある中で、人気の高いものを20個選び出します。
MovieIDと評価の数、平均評価点を含むRDDを作成し、それとmovieRDDをjoinさせることで、(平均評価点、映画のタイトル、評価数)を持つタプルを生成します。
その後、評価数が500個以上あるタイトルのみ選び出し、平均評価点でソートしたものを求めます。
評価点と映画のタイトルのアルファベット順でのソートのためにsortFunction()という関数を作ってます。

def getCountsAndAverages(IDandRatingsTuple):
    aggr_result = (IDandRatingsTuple[0], (len(IDandRatingsTuple[1]), float(sum(IDandRatingsTuple[1])) / len(IDandRatingsTuple[1])))
    return aggr_result


movieNameWithAvgRatingsRDD = (ratingsRDD
                          .map(lambda x:(x[1], x[2]))
                          .groupByKey()
                          .map(getCountsAndAverages)
                          .join(moviesRDD)
                          .map(lambda x:(x[1][0][1], x[1][1], x[0])))


print 'movieNameWithAvgRatingsRDD: %s\n' % movieNameWithAvgRatingsRDD.take(3)



def sortFunction(tuple):
    key = unicode('%.3f' % tuple[0])
    value = tuple[1]
    return (key + ' ' + value)

movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
                                    .filter(lambda x: (x[2] > 500))
                                    .sortBy(sortFunction, False))

print 'Movies with highest ratings:'
print '(average rating, movie name, number of reviews)'
for ratingsTuple in movieLimitedAndSortedByRatingRDD.take(10):
    print ratingsTuple


movieNameWithAvgRatingsRDD: [(3.49618320610687, u'Great Mouse Detective, The (1986)', 2048), (3.7871690427698574, u'Moonstruck (1987)', 3072), (2.7294117647058824, u'Waiting to Exhale (1995)', 4)]


Movies with highest ratings:
(average rating, movie name, number of reviews)
(5.0, u'Ulysses (Ulisse) (1954)', 3172)
(5.0, u'Song of Freedom (1936)', 3382)
(5.0, u'Smashing Time (1967)', 3233)
(5.0, u'Schlafes Bruder (Brother of Sleep) (1995)', 989)
(5.0, u'One Little Indian (1973)', 3607)
(5.0, u'Lured (1947)', 3656)
(5.0, u'Gate of Heavenly Peace, The (1995)', 787)
(5.0, u'Follow the Bitch (1998)', 1830)
(5.0, u'Bittersweet Motel (2000)', 3881)
(5.0, u'Baby, The (1973)', 3280)

次に協調フィルタリングを使用したレコメンデーションを実施します。
MLlibにはALSを使ったライブラリがあるのでそのまんま活用します。

ひとまずモデル構築のために訓練、評価、テストデータにデータセットを分割します。

trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0)

print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
                                                    validationRDD.count(),
                                                    testRDD.count())
print trainingRDD.take(3)
print validationRDD.take(3)
print testRDD.take(3)

validationForPredictRDD = validationRDD.map(lambda x: (x[0], x[1]))
print validationForPredictRDD.take(3)

actualReformattedRDD = validationRDD.map(lambda x: ((x[0], x[1]), x[2]))
print actualReformattedRDD.take(3)


Training: 600364, validation: 199815, test: 200030

[(1, 661, 3.0), (1, 914, 3.0), (1, 1197, 3.0)]
[(1, 3408, 4.0), (1, 2355, 5.0), (1, 938, 4.0)]
[(1, 1193, 5.0), (1, 1287, 5.0), (1, 2804, 5.0)]
[(1, 3408), (1, 2355), (1, 938)]
[((1, 3408), 4.0), ((1, 2355), 5.0), ((1, 938), 4.0)]

モデル構築ですが、グリッドサーチによってより良いパラメーターを探します。
またモデルの評価にはMLlibに用意されている指標の中で平均二乗誤差の平方根(RMSE)を使います。

from pyspark.mllib.recommendation import ALS
from pyspark.mllib.evaluation import RegressionMetrics

seed = 5L
iterations = [5,7,10]
regularizationParameter = 0.1
ranks = [4, 8, 12]
RMSEs = [0, 0, 0, 0, 0, 0, 0, 0, 0]
err = 0
tolerance = 0.03

minRMSE = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
    for iteration in iterations:
        model = ALS.train(trainingRDD,
                          rank,
                          seed=seed,
                          iteration,
                          lambda_=regularizationParameter)
        predictedRatingsRDD = model.predictAll(validationForPredictRDD)
        predictedReformattedRDD = predictedRatingsRDD.map(lambda x: ((x[0], x[1]), x[2]))
    
        predictionAndObservations = (predictedReformattedRDD
                                     .join(actualReformattedRDD)
                                     .map(lambda x: x[1]))
    
        metrics = RegressionMetrics(predictionAndObservations)
        RMSE = metrics.rootMeanSquaredError
        RMSEs[err] = RMSE
        err += 1
        
        print 'For rank %s and itereation %s, the RMSE is %s' % (rank, iteration, RMSE)
        if RMSE < minRMSE:
            minRMSE = RMSE
            bestIteretioin = iteretaion
            bestRank = rank

print 'The best model was trained with rank %s and iteratin %s'  % (bestRank, bestIteretion)
            

For rank 4 and itereation 5, the RMSE is 0.903719946201
For rank 4 and itereation 7, the RMSE is 0.893408395534
For rank 4 and itereation 10, the RMSE is 0.886260195446
For rank 8 and itereation 5, the RMSE is 0.89365207233
For rank 8 and itereation 7, the RMSE is 0.883901283207
For rank 8 and itereation 10, the RMSE is 0.876701840863
For rank 12 and itereation 5, the RMSE is 0.887127524585
For rank 12 and itereation 7, the RMSE is 0.87863327159
For rank 12 and itereation 10, the RMSE is 0.872532683651
The best model was trained with rank 12 and iteratin 10

テストデータでチェックします。RMSEに問題ないので過学習は起こしてなさそうです。

bestModel = ALS.train(trainingRDD,
                      bestRank,
                      seed=seed,
                      iterations=bestIteretion,
                      lambda_=regularizationParameter)

testForPredictingRDD = testRDD.map(lambda x: (x[0], x[1]))
testReformattedRDD = testRDD.map(lambda x: ((x[0], x[1]), x[2]))

predictedTestRDD = bestModel.predictAll(testForPredictingRDD)
predictedTestReformattedRDD = predictedTestRDD.map(lambda x: ((x[0], x[1]), x[2]))

predictionAndObservationsTest = (predictedTestReformattedRDD
                             .join(testReformattedRDD)
                             .map(lambda x: x[1]))

metrics = RegressionMetrics(predictionAndObservationsTest)
testRMSE = metrics.rootMeanSquaredError

print 'The model had a RMSE on the test set of %s' % testRMSE


The model had a RMSE on the test set of 0.87447554868


最後に自分の好きな映画を評価し、userID"0"としてデータに追加。その評価に基づいて映画を推薦してもらいます。
movieRDDから自分の好きな映画を取り出して評価して加え、モデルの訓練に使用します。その後、userID"0"向けの予測を得て、評価点の高いものから表示するようにします。

myUserID = 0
myRatedMovies = [(myUserID, 1, 5), #Toy Story
                 (myUserID, 648, 3), # Mission Impossible
                 (myUserID, 1580, 4), # Men In Black
                 (myUserID, 1097, 3), # ET
                 (myUserID, 3247, 5)] #Sister Act

myRatingsRDD = sc.parallelize(myRatedMovies)
trainingWithMyRatingsRDD = trainingRDD.union(myRatingsRDD)

myRatingsModel = ALS.train(trainingWithMyRatingsRDD,
                           bestRank, 
                           seed=seed,
                           iterations=bestIteretion,
                           lambda_=regularizationParameter)


myUnratedMoviesRDD = (moviesRDD
                      .filter(lambda x: x[0] not in [x[1] for x in myRatedMovies])
                      .map(lambda x: (myUserID, x[0])))

predictedRatingsRDD = myRatingsModel.predictAll(myUnratedMoviesRDD)
predictedRDD = predictedRatingsRDD.map(lambda x: (x[1], x[2]))

movieCountsRDD = (ratingsRDD
                  .map(lambda x:(x[1], x[2]))
                  .groupByKey()
                  .map(getCountsAndAverages)
                  .map(lambda x: (x[0], x[1][0])))


#Marge PredictedRDD and CountsRDD
predictedWithCountsRDD  = (predictedRDD
                           .join(movieCountsRDD))


ratingsWithNamesRDD = (predictedWithCountsRDD
                       .filter(lambda x: x[1][1] > 75)
                       .join(moviesRDD)
                       .map(lambda x: (x[1][0][0], x[1][1], x[1][0][1])))

predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(10, key=lambda x: -x[0])
print ('My highest rated movies as predicted (for movies with more than 75 reviews):\n%s' %
        '\n'.join(map(str, predictedHighestRatedMovies)))

My highest rated movies as predicted (for movies with more than 75 reviews):
(4.74482593848827, u'Sound of Music, The (1965)', 882)
(4.580669496447569, u'Mary Poppins (1964)', 1011)
(4.486424714752521, u'Beauty and the Beast (1991)', 1060)
(4.478042748281928, u'Mulan (1998)', 490)
(4.477453571213953, u'Toy Story 2 (1999)', 1585)
(4.439390718632932, u'Fantasia 2000 (1999)', 453)
(4.405894101045507, u'FairyTale: A True Story (1997)', 87)
(4.4030583744108425, u"Singin' in the Rain (1952)", 751)
(4.390333274084924, u'Top Hat (1935)', 251)
(4.347757079374581, u'Gone with the Wind (1939)', 1156)

ベタな映画を評価してるので、ベタな映画が推薦されてますね笑