Skip to content

Commit

Permalink
Format
Browse files Browse the repository at this point in the history
  • Loading branch information
jbrooks-stripe committed Jun 20, 2024
1 parent 2af7a40 commit 889fd68
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 51 deletions.
109 changes: 59 additions & 50 deletions online/src/main/scala/ai/chronon/online/FetcherBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,56 +205,60 @@ class FetcherBase(kvStore: KVStore,
val groupByRequestToKvRequest: Seq[(Request, Try[GroupByRequestMeta])] = requests.iterator
.filter(r => r.keys == null || r.keys.values == null || r.keys.values.exists(_ != null))
.map { request =>
val groupByRequestMetaTry: Try[GroupByRequestMeta] = getGroupByServingInfo(request.name)
.map { groupByServingInfo =>
val context =
request.context.getOrElse(Metrics.Context(Metrics.Environment.GroupByFetching, groupByServingInfo.groupBy))
context.increment("group_by_request.count")
var batchKeyBytes: Array[Byte] = null
var streamingKeyBytes: Array[Byte] = null
try {
// The formats of key bytes for batch requests and key bytes for streaming requests may differ based
// on the KVStore implementation, so we encode each distinctly.
batchKeyBytes =
kvStore.createKeyBytes(request.keys, groupByServingInfo, groupByServingInfo.groupByOps.batchDataset)
streamingKeyBytes =
kvStore.createKeyBytes(request.keys, groupByServingInfo, groupByServingInfo.groupByOps.streamingDataset)
} catch {
// TODO: only gets hit in cli path - make this code path just use avro schema to decode keys directly in cli
// TODO: Remove this code block
case ex: Exception =>
val castedKeys = groupByServingInfo.keyChrononSchema.fields.map {
case StructField(name, typ) => name -> ColumnAggregator.castTo(request.keys.getOrElse(name, null), typ)
}.toMap
try {
batchKeyBytes =
kvStore.createKeyBytes(castedKeys, groupByServingInfo, groupByServingInfo.groupByOps.batchDataset)
streamingKeyBytes =
kvStore.createKeyBytes(castedKeys, groupByServingInfo, groupByServingInfo.groupByOps.streamingDataset)
} catch {
case exInner: Exception =>
exInner.addSuppressed(ex)
throw new RuntimeException("Couldn't encode request keys or casted keys", exInner)
}
}
val batchRequest = GetRequest(batchKeyBytes, groupByServingInfo.groupByOps.batchDataset)
val streamingRequestOpt = groupByServingInfo.groupByOps.inferredAccuracy match {
// fetch batch(ir) and streaming(input) and aggregate
case Accuracy.TEMPORAL =>
Some(
GetRequest(streamingKeyBytes,
groupByServingInfo.groupByOps.streamingDataset,
Some(groupByServingInfo.batchEndTsMillis)))
// no further aggregation is required - the value in KvStore is good as is
case Accuracy.SNAPSHOT => None
val groupByRequestMetaTry: Try[GroupByRequestMeta] = getGroupByServingInfo(request.name)
.map { groupByServingInfo =>
val context =
request.context.getOrElse(
Metrics.Context(Metrics.Environment.GroupByFetching, groupByServingInfo.groupBy))
context.increment("group_by_request.count")
var batchKeyBytes: Array[Byte] = null
var streamingKeyBytes: Array[Byte] = null
try {
// The formats of key bytes for batch requests and key bytes for streaming requests may differ based
// on the KVStore implementation, so we encode each distinctly.
batchKeyBytes =
kvStore.createKeyBytes(request.keys, groupByServingInfo, groupByServingInfo.groupByOps.batchDataset)
streamingKeyBytes =
kvStore.createKeyBytes(request.keys, groupByServingInfo, groupByServingInfo.groupByOps.streamingDataset)
} catch {
// TODO: only gets hit in cli path - make this code path just use avro schema to decode keys directly in cli
// TODO: Remove this code block
case ex: Exception =>
val castedKeys = groupByServingInfo.keyChrononSchema.fields.map {
case StructField(name, typ) =>
name -> ColumnAggregator.castTo(request.keys.getOrElse(name, null), typ)
}.toMap
try {
batchKeyBytes =
kvStore.createKeyBytes(castedKeys, groupByServingInfo, groupByServingInfo.groupByOps.batchDataset)
streamingKeyBytes = kvStore.createKeyBytes(castedKeys,
groupByServingInfo,
groupByServingInfo.groupByOps.streamingDataset)
} catch {
case exInner: Exception =>
exInner.addSuppressed(ex)
throw new RuntimeException("Couldn't encode request keys or casted keys", exInner)
}
}
val batchRequest = GetRequest(batchKeyBytes, groupByServingInfo.groupByOps.batchDataset)
val streamingRequestOpt = groupByServingInfo.groupByOps.inferredAccuracy match {
// fetch batch(ir) and streaming(input) and aggregate
case Accuracy.TEMPORAL =>
Some(
GetRequest(streamingKeyBytes,
groupByServingInfo.groupByOps.streamingDataset,
Some(groupByServingInfo.batchEndTsMillis)))
// no further aggregation is required - the value in KvStore is good as is
case Accuracy.SNAPSHOT => None
}
GroupByRequestMeta(groupByServingInfo, batchRequest, streamingRequestOpt, request.atMillis, context)
}
GroupByRequestMeta(groupByServingInfo, batchRequest, streamingRequestOpt, request.atMillis, context)
if (groupByRequestMetaTry.isFailure) {
request.context.foreach(_.increment("group_by_serving_info_failure.count"))
}
if (groupByRequestMetaTry.isFailure) {
request.context.foreach(_.increment("group_by_serving_info_failure.count"))
request -> groupByRequestMetaTry
}
request -> groupByRequestMetaTry
}.toSeq
.toSeq
val allRequests: Seq[GetRequest] = groupByRequestToKvRequest.flatMap {
case (_, Success(GroupByRequestMeta(_, batchRequest, streamingRequestOpt, _, _))) =>
Some(batchRequest) ++ streamingRequestOpt
Expand Down Expand Up @@ -437,7 +441,8 @@ class FetcherBase(kvStore: KVStore,
case Right(keyMissingException) => {
Map(keyMissingException.requestName + "_exception" -> keyMissingException.getMessage)
}
case Left(PrefixedRequest(prefix, groupByRequest)) => parseGroupByResponse(prefix, groupByRequest, responseMap)
case Left(PrefixedRequest(prefix, groupByRequest)) =>
parseGroupByResponse(prefix, groupByRequest, responseMap)
}.toMap
}
joinValuesTry match {
Expand All @@ -457,14 +462,18 @@ class FetcherBase(kvStore: KVStore,
}
}

def parseGroupByResponse(prefix: String, groupByRequest: Request, responseMap: Map[Request, Try[Map[String, AnyRef]]]): Map[String, AnyRef] = {
def parseGroupByResponse(prefix: String,
groupByRequest: Request,
responseMap: Map[Request, Try[Map[String, AnyRef]]]): Map[String, AnyRef] = {
// Group bys with all null keys won't be requested from the KV store and we don't expect a response.
val isRequiredRequest = groupByRequest.keys.values.exists(_ != null) || groupByRequest.keys.isEmpty

val response: Try[Map[String, AnyRef]] = responseMap.get(groupByRequest) match {
case Some(value) => value
case None =>
if (isRequiredRequest) Failure(new IllegalStateException(s"Couldn't find a groupBy response for $groupByRequest in response map")) else Success(null)
if (isRequiredRequest)
Failure(new IllegalStateException(s"Couldn't find a groupBy response for $groupByRequest in response map"))
else Success(null)
}

response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class FetcherBaseTest extends MockitoSugar with Matchers {

@Test
def testParsingGroupByResponse_HappyHase(): Unit = {
val baseFetcher = new FetcherBase((mock[KVStore])
val baseFetcher = new FetcherBase(mock[KVStore])
val request = Request(name = "name", keys = Map("email" -> "email"), atMillis = None, context = None)
val response: Map[Request, Try[Map[String, AnyRef]]] = Map(
request -> Success(Map(
Expand Down

0 comments on commit 889fd68

Please sign in to comment.