Ranking Metrics with Spark

Short Getting Started guide on Ranking Metrics in Spark
Spark
RecSys
Scala
ML
Published

February 7, 2019

Introduction

In a previous post we trained an ALS model with Spark, this time we’ll be focused on the evaluation side. We’ll discuss why RMSE is the wrong choice when you don’t have explicit ratings and rely on implicit feedback. Then we’ll zoom-in on the NDCG metric: what it measures, why graded relevance matters, and how to compute it with Spark. Spark 2.4 (latest at the time of writing) has RankingMetrics only in the legacy RDD based MLlib API. So we’ll roll our own DataFrame based solution, and we’ll see how checking the execution plan might be helpful even in simple situations.

What makes a ranking “good”?

Before we look at any metric, let’s think about what we actually want from a ranking model. A model gives us a relevance score for a given set of items, and we sort items by that score in descending order to get the list shown to the user. Here we assume that the model gives a higher score for more relevant items.

Now let’s say we have two different models that score the same five candidate items. After sorting each model’s scores, we end up with these two lists of item IDs:

list A:   [42, 7, 99, 3,  55]
list B:   [7,  3, 99, 42, 55]

Which one is better? You can’t tell from the IDs alone, but you can observe how the user interacted with the items. For example, in an online shop case, we could have logged that the user clicked item 7 and ignored everything else. That’s strong evidence that list B is the better ranking, it put the clicked item at the top while list A buried it at position 2 behind something the user scrolled past.

Now imagine that in addition to the click, we also know the user added item 3 to the cart. Adding to the cart is a stronger engagement signal than a click, so really both lists were suboptimal, item 3 should probably have been first. That’s the essence of graded relevance: not just “relevant vs. not”, but “how relevant, on what scale”.

Why does the order matter so much? Because in practice you only ever present the top few items:

  • Search results show ~10-15 links above the fold, and most users never scroll past the first page.
  • A “recommended for you” widget has maybe 5 to 10 tiles.
  • A push notification has room for exactly one item.

If the most relevant item is at position 20, for the user it might as well not exist. So we don’t just want relevant items in our result set, we want them at the top, and we want the very best ones before the merely good ones.

Where does relevance come from?

To talk about “best items at the top” we need a concrete relevance score per (user, item) pair. There are a few common sources:

  • Explicit ratings. 0-5 stars, thumbs-up, scale of “perfect / good / fair / bad”. Directly interpretable, but rare outside of a few domains like movies, books, surveys. But even in those domains, it’s very rare to see explicit ratings these days.
  • Implicit events, binarized. Did the user click / purchase / book, yes or no. Easiest signal to get, but throws away a lot of information.
  • Implicit events, graded. The weighted sum of events we used as \(r_{ui}\) in the ALS post, something like 1 * click + 2 * pdp_view + 5 * cart_add + 10 * purchase. Stronger actions get more weight, so a purchased item ends up with much higher relevance than a mere click.
  • Watch or listen time. A song played to completion five times is more relevant than one skipped after 10 seconds.
  • Manual grades. For search quality, it’s common to have human annotators label a handful of (query, item) pairs on a fixed scale.

Once relevance is a number rather than a flag, “good ranking” has a natural shape: for a given query, list items in descending relevance order. The more the model’s order agrees with that, especially at the top, the better it is.

Why RMSE is the wrong tool

RMSE shows, point by point, how close the predicted scores are to the true scores. That’s fine when you directly collect those scores. But when you deal with ranked lists, only the order matters, the absolute values don’t really matter, even when you have explicit scores.

An example. Say the user has five items with true relevance \(4, 3, 2, 1, 0\) (call them items \(A, B, C, D, E\)). Model 1 predicts \(4.6, 2.4, 2, 1, 0\); model 2 predicts \(3.4, 3.6, 2, 1, 0\). The errors are \(\pm 0.6\) on the top two items in both cases, just with the signs swapped, so both models have identical RMSE \(\approx 0.38\). But model 1 ranks \(A, B, C, D, E\), exactly the truth, while model 2 ranks \(B, A, C, D, E\) and gets the top item wrong. If your UI shows only the top result, model 1 surfaces the best item and model 2 buries it at position 2. RMSE is equally happy with both.

For implicit data the mismatch is worse still, and we already bumped into it in the previous post. There’s no true “rating” to regress on, predictions live roughly in \([0, 1]\) as preference scores, while the training signal was play counts or weighted event sums in whatever range they happen to be in. Running RegressionEvaluator on those two columns gives a huge, meaningless number that has nothing to do with whether your top-10 is any good.

What we want is a metric that takes the model’s ranking, compares it against the ideal ranking derived from relevance, and strongly rewards getting the top positions right. That’s what ranking metrics are for, and NDCG is one of the most common ones.

NDCG, piece by piece

NDCG stands for Normalized Discounted Cumulative Gain. Let’s dig into it piece-by-piece.

Gain is the relevance of the item shown at position \(i\). It is typically calculated in one of two ways:

  • linear gain: it’s just the raw relevance, \(g_i = rel_i\);
  • exponential gain: it grows with how good the item is, \(g_i = 2^{rel_i} - 1\).

Exponential gain strongly amplifies the reward for top-quality items: an item with relevance 5 gets gain 31, while a grade 4 gets 15. Linear gain treats adjacent grades as just one point apart.

Cumulative means we sum those gains over the list.

Discounted means each gain is shrunk by a factor that grows with position, so an item near the top of the list contributes more than one further down: \[ DCG_k = \sum_{i=1}^{k} \frac{g_i}{\log_2(i + 1)}. \] The \(\log_2(i + 1)\) denominator is the widely used discount, a choice that is somewhat arbitrary but so well-established that nobody really argues about it.

Normalized means we divide by the DCG of the best (ideal) possible ranking, items sorted by their true relevance, called IDCG: \[ NDCG_k = \frac{DCG_k}{IDCG_k}. \] That brings the metric into \([0, 1]\) range, with \(1.0\) being a perfect list.

A couple of practical notes before we go to code:

  • With binary relevance, both gain formulas give the same NDCG, since \(2^{1} - 1 = 1\). The choice between exponential and linear only matters with multi-level relevance grades.
  • NDCG is a ratio, so the logarithm base cancels out. You’ll see implementations using \(\log_2\), \(\log_e\) or \(\log_{10}\), and they all produce the same number.
  • Ties in either predicted scores or ground truth relevance introduce ambiguity; you need a deterministic tiebreaker if you want reproducible metrics run-to-run.

Implementing NDCG using DataFrame API

Spark 2.4 ships org.apache.spark.mllib.evaluation.RankingMetrics in the legacy RDD based MLlib package, which covers a handful of ranking metrics including NDCG. But there are no ranking metrics in the DataFrame based ML package. So let’s implement it ourselves, the plan looks straightforward: 1. For each query, rank items by true relevance (desc), this is the ideal order. 2. For each query, rank items by model prediction (desc), this is the actual order. 3. Compute gain at each position using that item’s true relevance. 4. Sum gains per query, DCG (predicted order) and IDCG (ideal order). 5. Divide.

Here’s the most natural way to write that:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

def computeNDCG(df: DataFrame, gainType: String, k: Int = 0): DataFrame = {
  // Tiebreaker, sort by itemId ensures deterministic ordering for tied scores.
  val idealWindow = Window.partitionBy("queryId")
    .orderBy(col("relevance").desc, col("itemId").asc)
  val predictedWindow = Window.partitionBy("queryId")
    .orderBy(col("prediction").desc, col("itemId").asc)

  val ranked = df
    .withColumn("rank_ideal",     row_number().over(idealWindow))
    .withColumn("rank_predicted", row_number().over(predictedWindow))

  def gainExpr(rankCol: String) = gainType match {
    case "exponential" => (pow(lit(2), col("relevance")) - 1) / log2(col(rankCol) + 1)
    case "linear"      => col("relevance") / log2(col(rankCol) + 1)
  }

  def applyK(frame: DataFrame, rankCol: String) =
    if (k > 0) frame.where(col(rankCol) <= k) else frame

  // IDCG: gains when items are in ideal order.
  val idcg = applyK(ranked, "rank_ideal")
    .withColumn("ideal_gain", gainExpr("rank_ideal"))
    .groupBy("queryId").agg(sum("ideal_gain").as("IDCG"))

  // DCG: gains in model's predicted order, using each item's true relevance.
  val dcg = applyK(ranked, "rank_predicted")
    .withColumn("pred_gain", gainExpr("rank_predicted"))
    .groupBy("queryId").agg(sum("pred_gain").as("DCG"))

  // NDCG: combining two components into ratio
  idcg.join(dcg, "queryId")
    .withColumn("NDCG", col("DCG") / col("IDCG"))
    .select("queryId", "IDCG", "DCG", "NDCG")
    .orderBy("queryId")
}

Reads the same way the definition does: compute IDCG, compute DCG, join, divide.

Take a small toy dataset: two queries, with q1 showing the model getting the order badly wrong and q2 only swapping a tied pair:

case class Rating(queryId: String, itemId: Int, relevance: Double, prediction: Double)

val ratings = Seq(
  Rating("q1", 1, 4.0, 0.2),
  Rating("q1", 2, 3.0, 0.4),
  Rating("q1", 3, 2.0, 0.5),
  Rating("q1", 4, 1.0, 0.3),
  Rating("q1", 5, 0.0, 0.1),

  Rating("q2", 1, 2.0, 0.3),
  Rating("q2", 2, 2.0, 0.5),
  Rating("q2", 3, 1.0, 0.4),
  Rating("q2", 4, 0.0, 0.2)
)

Running with exponential gain:

computeNDCG(ratings.toDF(), "exponential").show(false)
+-------+------------------+------------------+------------------+
|queryId|IDCG              |DCG               |NDCG              |
+-------+------------------+------------------+------------------+
|q1     |21.347184833073598|14.376656646101099|0.6734685045602393|
|q2     |5.392789260714372 |5.130929753571458 |0.9514426589871553|
+-------+------------------+------------------+------------------+

Now query 1 scores 0.67, the poor ordering of high-relevance items is being properly penalised. Query 2 is close to 1.0, the only “mistake” is a swap between two tied items. This is the behaviour we wanted.

A story of one optimization

The code is correct and works according to the math equations. Usually people rely on the query engine to compute the result in the most efficient way. What we have here is that both branches of the computation derive from the same ranked DataFrame, partitioned the same way, grouped the same way. Looks like it should be low-hanging fruit for the optimizer to do its job?

Let’s see:

computeNDCG(ratings.toDF(), "exponential", k = 3).explain(true)

So the physical plan looks like (I trimmed some noise):

*(4) Project [queryId, IDCG, DCG, NDCG]
+- *(4) BroadcastHashJoin [queryId], [queryId], Inner, BuildRight
   :- *(4) HashAggregate(keys=[queryId], functions=[sum(ideal_gain)])
   :  +- ... Filter (rank_ideal <= 3)
   :     +- Window [row_number() ... relevance DESC]
   :        +- *(1) Sort [queryId ASC, relevance DESC, itemId ASC]
   :           +- Exchange hashpartitioning(queryId, 200)          // <- SHUFFLE 1
   :              +- LocalTableScan [queryId, itemId, relevance]
   +- BroadcastExchange
      +- *(3) HashAggregate(keys=[queryId], functions=[sum(pred_gain)])
         +- ... Filter (rank_predicted <= 3)
            +- Window [row_number() ... prediction DESC]
               +- *(2) Sort [queryId ASC, prediction DESC, itemId ASC]
                  +- Exchange hashpartitioning(queryId, 200)       // <- SHUFFLE 2
                     +- LocalTableScan [queryId, itemId, relevance, prediction]

Two things jump out:

  1. The source data is scanned twice. Catalyst does not recognise that the two branches come from the same DataFrame. It plants two independent LocalTableScan nodes. With a real source (Parquet, a Hive table) that becomes two full reads.
  2. Two shuffles plus a broadcast join. Each branch has its own Exchange hashpartitioning(queryId) for the groupBy, and then the two aggregates are joined at the end.

Four stages, two full shuffles, one broadcast join, two scans. For such a simple operation.

Why doesn’t Catalyst just fuse the two groupBy(queryId).agg(sum(...)) into one? Because there’s no optimizer rule that looks at A.groupBy(k).agg(f(a)) JOIN B.groupBy(k).agg(f(b)) where A and B come from the same source, and rewrites it into groupBy(k).agg(f(a), f(b)). By the time the optimizer sees the join, each side is already its own plan subtree, with its own scan, shuffle and aggregate. The shared origin is lost.

The cost is small on toy data. On a real evaluation set with millions of (user, item) pairs and a non-trivial source, it’s two reads of the raw data plus a second shuffle. It’s worth fixing.

Fusing it back together

The trick is to notice that every row in ranked already carries both rank_ideal and rank_predicted. Instead of splitting into two branches and joining, we compute both gains on every row and sum them in a single groupBy. Cutoffs at \(k\) become when(rank <= k, gain).otherwise(0.0) conditions:

def computeNDCG(df: DataFrame, gainType: String, k: Int = 0): DataFrame = {
  val idealWindow = Window.partitionBy("queryId")
    .orderBy(col("relevance").desc, col("itemId").asc)
  val predictedWindow = Window.partitionBy("queryId")
    .orderBy(col("prediction").desc, col("itemId").asc)

  val ranked = df
    .withColumn("rank_ideal",     row_number().over(idealWindow))
    .withColumn("rank_predicted", row_number().over(predictedWindow))

  def gainAt(rankCol: String) = gainType match {
    case "exponential" => (pow(lit(2), col("relevance")) - 1) / log2(col(rankCol) + 1)
    case "linear"      => col("relevance") / log2(col(rankCol) + 1)
  }

  // Zero out gains for positions beyond the cutoff (if any).
  def condGain(rankCol: String) = {
    val gain = gainAt(rankCol)
    if (k > 0) when(col(rankCol) <= k, gain).otherwise(lit(0.0))
    else gain
  }

  ranked
    .withColumn("ideal_gain", condGain("rank_ideal"))
    .withColumn("pred_gain",  condGain("rank_predicted"))
    .groupBy("queryId")
    .agg(
      sum("ideal_gain").as("IDCG"),
      sum("pred_gain").as("DCG")
    )
    .withColumn("NDCG", col("DCG") / col("IDCG"))
    .select("queryId", "IDCG", "DCG", "NDCG")
    .orderBy("queryId")
}

Results are identical to the naive version, we’re just rearranging the same arithmetic. Now the physical plan:

*(3) HashAggregate(keys=[queryId], functions=[sum(ideal_gain), sum(pred_gain)])
+- ... Project [queryId, CASE WHEN rank_ideal <= 3 ... , CASE WHEN rank_predicted <= 3 ...]
   +- Window [row_number() ... prediction DESC]
      +- *(2) Sort [queryId ASC, prediction DESC, itemId ASC]
         +- Window [row_number() ... relevance DESC]
            +- *(1) Sort [queryId ASC, relevance DESC, itemId ASC]
               +- Exchange hashpartitioning(queryId, 200)          // <- SHUFFLE (only one!)
                  +- LocalTableScan [queryId, itemId, relevance, prediction]

Three stages, one shuffle, one scan, no join.

The two window sorts are unavoidable, ideal order uses relevance DESC, predicted order uses prediction DESC, and the data has to be sorted both ways to compute row_number(). But since both windows partition by the same key (queryId), only one shuffle is needed to collocate the data, and the second window re-sorts within existing partitions without another exchange.

Instead of Conclusion

A few things worth taking away from this.

Don’t judge a ranker with RMSE. For lists, use a ranking metric. NDCG is a safe default, though there are many others, like MAP@K and Precision@K.

Watch out for the query plan. Built-in optimizers are good, but chances are that if you do something more complicated than selecting a set of columns, there will be something that doesn’t work the way you expect. Always worth having a look. The annoying part is that the “natural” way to write the code is also the slower one, and Catalyst can’t save you.

And this wraps up the post. Let me know if you want to know more about using Spark for ranking.