diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala index 6e9d0ace45..1661db2e28 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala @@ -13,7 +13,7 @@ import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, I import org.apache.spark.mllib.linalg import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseMatrix} import org.apache.spark.sql.functions.{col, collect_list, sum, udf, _} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType, IntegerType, LongType} import org.apache.spark.sql.{DataFrame, Dataset} import java.text.SimpleDateFormat @@ -106,8 +106,28 @@ class SAR(override val uid: String) extends Estimator[SARModel] (0 to numItems.value).map(i => map.getOrElse(i, 0.0).toFloat).toArray }) - dataset - .withColumn(C.AffinityCol, (dataset.columns.contains(getTimeCol), dataset.columns.contains(getRatingCol)) match { + val userColType = dataset.schema(getUserCol).dataType + val itemColType = dataset.schema(getItemCol).dataType + + val castedDataset = (userColType, itemColType) match { + case (StringType, StringType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + .withColumn(getItemCol, col(getItemCol).cast("int")) + case (StringType, _) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + case (_, StringType) => + dataset.withColumn(getItemCol, col(getItemCol).cast("int")) + case (IntegerType, IntegerType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + .withColumn(getItemCol, col(getItemCol).cast("int")) + case (LongType, LongType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("long")) + .withColumn(getItemCol, col(getItemCol).cast("long")) + case _ => dataset + } + + castedDataset + .withColumn(C.AffinityCol, (castedDataset.columns.contains(getTimeCol), castedDataset.columns.contains(getRatingCol)) match { case (true, true) => blendWeights(timeDecay(col(getTimeCol)), col(getRatingCol)) case (true, false) => timeDecay(col(getTimeCol)) case (false, true) => col(getRatingCol) @@ -197,7 +217,27 @@ class SAR(override val uid: String) extends Estimator[SARModel] }) }) - dataset + val userColType = dataset.schema(getUserCol).dataType + val itemColType = dataset.schema(getItemCol).dataType + + val castedDataset = (userColType, itemColType) match { + case (StringType, StringType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + .withColumn(getItemCol, col(getItemCol).cast("int")) + case (StringType, _) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + case (_, StringType) => + dataset.withColumn(getItemCol, col(getItemCol).cast("int")) + case (IntegerType, IntegerType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + .withColumn(getItemCol, col(getItemCol).cast("int")) + case (LongType, LongType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("long")) + .withColumn(getItemCol, col(getItemCol).cast("long")) + case _ => dataset + } + + castedDataset .select(col(getItemCol), col(getUserCol)) .groupBy(getItemCol).agg(collect_list(getUserCol) as "collect_list") .withColumn(C.FeaturesCol, createItemFeaturesVector(col("collect_list"))) diff --git a/core/src/test/python/synapsemltest/recommendation/test_ranking.py b/core/src/test/python/synapsemltest/recommendation/test_ranking.py index d2d439c374..88f015d0d4 100644 --- a/core/src/test/python/synapsemltest/recommendation/test_ranking.py +++ b/core/src/test/python/synapsemltest/recommendation/test_ranking.py @@ -67,10 +67,94 @@ .cache() ) +ratings_with_strings = ( + spark.createDataFrame( + [ + ("user0", "item1", 4, 4), + ("user0", "item3", 1, 1), + ("user0", "item4", 5, 5), + ("user0", "item5", 3, 3), + ("user0", "item7", 3, 3), + ("user0", "item9", 3, 3), + ("user0", "item10", 3, 3), + ("user1", "item1", 4, 4), + ("user1", "item2", 5, 5), + ("user1", "item3", 1, 1), + ("user1", "item6", 4, 4), + ("user1", "item7", 5, 5), + ("user1", "item8", 1, 1), + ("user1", "item10", 3, 3), + ("user2", "item1", 4, 4), + ("user2", "item2", 1, 1), + ("user2", "item3", 1, 1), + ("user2", "item4", 5, 5), + ("user2", "item5", 3, 3), + ("user2", "item6", 4, 4), + ("user2", "item8", 1, 1), + ("user2", "item9", 5, 5), + ("user2", "item10", 3, 3), + ("user3", "item2", 5, 5), + ("user3", "item3", 1, 1), + ("user3", "item4", 5, 5), + ("user3", "item5", 3, 3), + ("user3", "item6", 4, 4), + ("user3", "item7", 5, 5), + ("user3", "item8", 1, 1), + ("user3", "item9", 5, 5), + ("user3", "item10", 3, 3), + ], + ["originalCustomerID", "newCategoryID", "rating", "notTime"], + ) + .coalesce(1) + .cache() +) + +ratings_with_integers = ( + spark.createDataFrame( + [ + (0, 1, 4, 4), + (0, 3, 1, 1), + (0, 4, 5, 5), + (0, 5, 3, 3), + (0, 7, 3, 3), + (0, 9, 3, 3), + (0, 10, 3, 3), + (1, 1, 4, 4), + (1, 2, 5, 5), + (1, 3, 1, 1), + (1, 6, 4, 4), + (1, 7, 5, 5), + (1, 8, 1, 1), + (1, 10, 3, 3), + (2, 1, 4, 4), + (2, 2, 1, 1), + (2, 3, 1, 1), + (2, 4, 5, 5), + (2, 5, 3, 3), + (2, 6, 4, 4), + (2, 8, 1, 1), + (2, 9, 5, 5), + (2, 10, 3, 3), + (3, 2, 5, 5), + (3, 3, 1, 1), + (3, 4, 5, 5), + (3, 5, 3, 3), + (3, 6, 4, 4), + (3, 7, 5, 5), + (3, 8, 1, 1), + (3, 9, 5, 5), + (3, 10, 3, 3), + ], + ["originalCustomerID", "newCategoryID", "rating", "notTime"], + ) + .coalesce(1) + .cache() +) + class RankingSpec(unittest.TestCase): @staticmethod - def adapter_evaluator(algo): + def adapter_evaluator(algo, data): recommendation_indexer = RecommendationIndexer( userInputCol=USER_ID, userOutputCol=USER_ID_INDEX, @@ -80,7 +164,7 @@ def adapter_evaluator(algo): adapter = RankingAdapter(mode="allUsers", k=5, recommender=algo) pipeline = Pipeline(stages=[recommendation_indexer, adapter]) - output = pipeline.fit(ratings).transform(ratings) + output = pipeline.fit(data).transform(data) print(str(output.take(1)) + "\n") metrics = ["ndcgAt", "fcp", "mrr"] @@ -91,13 +175,21 @@ def adapter_evaluator(algo): + str(RankingEvaluator(k=3, metricName=metric).evaluate(output)), ) - # def test_adapter_evaluator_als(self): - # als = ALS(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) - # self.adapter_evaluator(als) - # - # def test_adapter_evaluator_sar(self): - # sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) - # self.adapter_evaluator(sar) + def test_adapter_evaluator_als(self): + als = ALS(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) + self.adapter_evaluator(als, ratings) + + def test_adapter_evaluator_sar(self): + sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) + self.adapter_evaluator(sar, ratings) + + def test_adapter_evaluator_sar_with_strings(self): + sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) + self.adapter_evaluator(sar, ratings_with_strings) + + def test_adapter_evaluator_sar_with_integers(self): + sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) + self.adapter_evaluator(sar, ratings_with_integers) def test_all_tiny(self): customer_index = StringIndexer(inputCol=USER_ID, outputCol=USER_ID_INDEX) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala index 501652f68b..e3abbb0423 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala @@ -1,6 +1,3 @@ -// Copyright (C) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See LICENSE in project root for information. - package com.microsoft.azure.synapse.ml.recommendation import com.microsoft.azure.synapse.ml.core.test.fuzzing.{EstimatorFuzzing, TestObject, TransformerFuzzing} @@ -106,6 +103,128 @@ class SARSpec extends RankingTestBase with EstimatorFuzzing[SAR] { test("tlc test userpred jac3 userid only")( SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "jaccard", simJac3, userAff, userpredJac3)) + test("SAR with String User Column") { + val stringUserCol = "stringUserId" + val stringItemCol = "stringItemId" + + val stringRatings: DataFrame = spark + .createDataFrame(Seq( + ("user1", "item1", 2), + ("user1", "item3", 1), + ("user1", "item4", 5), + ("user2", "item1", 4), + ("user2", "item2", 5), + ("user2", "item3", 1), + ("user3", "item1", 4), + ("user3", "item3", 1), + ("user3", "item4", 5) + )) + .toDF(stringUserCol, stringItemCol, ratingCol) + .dropDuplicates() + .cache() + + val stringRecommendationIndexer: RecommendationIndexer = new RecommendationIndexer() + .setUserInputCol(stringUserCol) + .setUserOutputCol(userColIndex) + .setItemInputCol(stringItemCol) + .setItemOutputCol(itemColIndex) + .setRatingCol(ratingCol) + + val transformedStringDf: DataFrame = stringRecommendationIndexer.fit(stringRatings) + .transform(stringRatings).cache() + + val algo = new SAR() + .setUserCol(stringRecommendationIndexer.getUserOutputCol) + .setItemCol(stringRecommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol) + .setSupportThreshold(1) + .setSimilarityFunction("jaccard") + .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") + + val adapter: RankingAdapter = new RankingAdapter() + .setK(5) + .setRecommender(algo) + + val recopipeline = new Pipeline() + .setStages(Array(stringRecommendationIndexer, adapter)) + .fit(stringRatings) + + val output = recopipeline.transform(stringRatings) + + val evaluator: RankingEvaluator = new RankingEvaluator() + .setK(5) + .setNItems(10) + + assert(evaluator.setMetricName("ndcgAt").evaluate(output) > 0.0) + assert(evaluator.setMetricName("fcp").evaluate(output) > 0.0) + assert(evaluator.setMetricName("mrr").evaluate(output) > 0.0) + + val users: DataFrame = spark + .createDataFrame(Seq(("user1", "item1"), ("user2", "item2"))) + .toDF(stringUserCol, stringItemCol) + + val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel + .asInstanceOf[SARModel].recommendForUserSubset(users, 10) + assert(recs.count == 2) + } + + test("SAR with Different DataTypes in User Column") { + val mixedUserCol = "mixedUserId" + val mixedItemCol = "mixedItemId" + + val mixedRatings: DataFrame = spark + .createDataFrame(Seq( + (1, "item1", 2), + (1, "item3", 1), + (1, "item4", 5), + (2, "item1", 4), + (2, "item2", 5), + (2, "item3", 1), + (3, "item1", 4), + (3, "item3", 1), + (3, "item4", 5), + ("user4", "item1", 3), + ("user4", "item2", 2), + ("user4", "item3", 4) + )) + .toDF(mixedUserCol, mixedItemCol, ratingCol) + .dropDuplicates() + .cache() + + val algo = new SAR() + .setUserCol(mixedUserCol) + .setItemCol(mixedItemCol) + .setRatingCol(ratingCol) + .setSupportThreshold(1) + .setSimilarityFunction("jaccard") + .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") + + val adapter: RankingAdapter = new RankingAdapter() + .setK(5) + .setRecommender(algo) + + val recopipeline = new Pipeline() + .setStages(Array(adapter)) + .fit(mixedRatings) + + val output = recopipeline.transform(mixedRatings) + + val evaluator: RankingEvaluator = new RankingEvaluator() + .setK(5) + .setNItems(10) + + assert(evaluator.setMetricName("ndcgAt").evaluate(output) > 0.0) + assert(evaluator.setMetricName("fcp").evaluate(output) > 0.0) + assert(evaluator.setMetricName("mrr").evaluate(output) > 0.0) + + val users: DataFrame = spark + .createDataFrame(Seq((1, "item1"), (2, "item2"), ("user4", "item3"))) + .toDF(mixedUserCol, mixedItemCol) + + val recs = recopipeline.stages(0).asInstanceOf[RankingAdapterModel].getRecommenderModel + .asInstanceOf[SARModel].recommendForUserSubset(users, 10) + assert(recs.count == 3) + } } class SARModelSpec extends RankingTestBase with TransformerFuzzing[SARModel] {