diff --git a/app/src/main/resources/explorer-backend-openapi.json b/app/src/main/resources/explorer-backend-openapi.json index f87fb108..a1fcb107 100644 --- a/app/src/main/resources/explorer-backend-openapi.json +++ b/app/src/main/resources/explorer-backend-openapi.json @@ -3166,6 +3166,138 @@ } } }, + "/addresses/{address}/amount-history-deltas": { + "get": { + "tags": [ + "Addresses" + ], + "operationId": "getAddressesAddressAmount-history-deltas", + "parameters": [ + { + "name": "address", + "in": "path", + "required": true, + "schema": { + "type": "string", + "format": "address" + } + }, + { + "name": "fromTs", + "in": "query", + "required": true, + "schema": { + "type": "integer", + "format": "int64", + "minimum": "0" + } + }, + { + "name": "toTs", + "in": "query", + "required": true, + "schema": { + "type": "integer", + "format": "int64", + "minimum": "0" + } + }, + { + "name": "interval-type", + "in": "query", + "required": true, + "schema": { + "$ref": "#/components/schemas/IntervalType" + } + } + ], + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AmountHistory" + }, + "example": { + "amountHistory": [ + [ + 1611041396892, + "1" + ] + ] + } + } + } + }, + "400": { + "description": "BadRequest", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/BadRequest" + }, + "example": { + "detail": "Something bad in the request" + } + } + } + }, + "401": { + "description": "Unauthorized", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/Unauthorized" + }, + "example": { + "detail": "You shall not pass" + } + } + } + }, + "404": { + "description": "NotFound", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/NotFound" + }, + "example": { + "resource": "wallet-name", + "detail": "wallet-name not found" + } + } + } + }, + "500": { + "description": "InternalServerError", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InternalServerError" + }, + "example": { + "detail": "Ouch" + } + } + } + }, + "503": { + "description": "ServiceUnavailable", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ServiceUnavailable" + }, + "example": { + "detail": "Self clique unsynced" + } + } + } + } + } + } + }, "/infos": { "get": { "tags": [ diff --git a/app/src/main/scala/org/alephium/explorer/api/AddressesEndpoints.scala b/app/src/main/scala/org/alephium/explorer/api/AddressesEndpoints.scala index d0d37cf2..64114fe3 100644 --- a/app/src/main/scala/org/alephium/explorer/api/AddressesEndpoints.scala +++ b/app/src/main/scala/org/alephium/explorer/api/AddressesEndpoints.scala @@ -174,6 +174,14 @@ trait AddressesEndpoints extends BaseEndpoint with QueryParams { .in(intervalTypeQuery) .out(jsonBody[AmountHistory]) + val getAddressAmountHistoryDeltas + : BaseEndpoint[(Address, TimeInterval, IntervalType), AmountHistory] = + addressesEndpoint.get + .in("amount-history-deltas") + .in(timeIntervalQuery) + .in(intervalTypeQuery) + .out(jsonBody[AmountHistory]) + private case class TextCsv() extends CodecFormat { override val mediaType: MediaType = MediaType.TextCsv } diff --git a/app/src/main/scala/org/alephium/explorer/docs/Documentation.scala b/app/src/main/scala/org/alephium/explorer/docs/Documentation.scala index 6d59769f..7d7b19d8 100644 --- a/app/src/main/scala/org/alephium/explorer/docs/Documentation.scala +++ b/app/src/main/scala/org/alephium/explorer/docs/Documentation.scala @@ -58,6 +58,7 @@ trait Documentation exportTransactionsCsvByAddress, getAddressAmountHistoryDEPRECATED, getAddressAmountHistory, + getAddressAmountHistoryDeltas, getInfos, getHeights, listMempoolTransactions, diff --git a/app/src/main/scala/org/alephium/explorer/persistence/queries/TransactionQueries.scala b/app/src/main/scala/org/alephium/explorer/persistence/queries/TransactionQueries.scala index 88101d57..ee2b37b2 100644 --- a/app/src/main/scala/org/alephium/explorer/persistence/queries/TransactionQueries.scala +++ b/app/src/main/scala/org/alephium/explorer/persistence/queries/TransactionQueries.scala @@ -39,6 +39,7 @@ import org.alephium.protocol.ALPH import org.alephium.protocol.model.{Address, BlockHash, TransactionId} import org.alephium.util.{TimeStamp, U256} +// scalastyle:off number.of.methods object TransactionQueries extends StrictLogging { @SuppressWarnings(Array("org.wartremover.warts.PublicInference")) @@ -371,6 +372,26 @@ object TransactionQueries extends StrictLogging { """.asAS[(TimeStamp, Option[U256])] } + def sumAddressOutputsAsDeltas( + address: Address, + from: TimeStamp, + to: TimeStamp, + intervalType: IntervalType + ): DBActionSR[(TimeStamp, Option[U256])] = { + val dateGroup = QueryUtil.dateGroupQuery(intervalType) + sql""" + SELECT + LEAST($to, GREATEST($from, #${QueryUtil.extractEpoch(dateGroup)} - 1)) as ts, + SUM(amount) + FROM outputs + WHERE address = $address + AND main_chain = true + AND block_timestamp >= $from + AND block_timestamp <= $to + GROUP BY ts + """.asAS[(TimeStamp, Option[U256])] + } + def sumAddressOutputsDEPRECATED(address: Address, from: TimeStamp, to: TimeStamp)(implicit ec: ExecutionContext ): DBActionR[U256] = { @@ -422,6 +443,27 @@ object TransactionQueries extends StrictLogging { """.asAS[(TimeStamp, Option[U256])] } + def sumAddressInputsAsDeltas( + address: Address, + from: TimeStamp, + to: TimeStamp, + intervalType: IntervalType + ): DBActionSR[(TimeStamp, Option[U256])] = { + val dateGroup = QueryUtil.dateGroupQuery(intervalType) + + sql""" + SELECT + LEAST($to, GREATEST($from, #${QueryUtil.extractEpoch(dateGroup)} - 1)) as ts, + SUM(output_ref_amount) + FROM inputs + WHERE output_ref_address = $address + AND main_chain = true + AND block_timestamp >= $from + AND block_timestamp <= $to + GROUP BY ts + """.asAS[(TimeStamp, Option[U256])] + } + private def buildTransaction( txHashesTs: ArraySeq[TxByAddressQR], inputs: ArraySeq[InputsFromTxQR], diff --git a/app/src/main/scala/org/alephium/explorer/service/TransactionService.scala b/app/src/main/scala/org/alephium/explorer/service/TransactionService.scala index 22ce6b48..0a8fe809 100644 --- a/app/src/main/scala/org/alephium/explorer/service/TransactionService.scala +++ b/app/src/main/scala/org/alephium/explorer/service/TransactionService.scala @@ -121,6 +121,16 @@ trait TransactionService { ec: ExecutionContext, dc: DatabaseConfig[PostgresProfile] ): Future[ArraySeq[(TimeStamp, BigInteger)]] + + def getAmountHistoryAsDeltas( + address: Address, + from: TimeStamp, + to: TimeStamp, + intervalType: IntervalType + )(implicit + ec: ExecutionContext, + dc: DatabaseConfig[PostgresProfile] + ): Future[ArraySeq[(TimeStamp, BigInteger)]] } object TransactionService extends TransactionService { @@ -310,6 +320,44 @@ object TransactionService extends TransactionService { ) } + def getAmountHistoryAsDeltas( + address: Address, + from: TimeStamp, + to: TimeStamp, + intervalType: IntervalType + )(implicit + ec: ExecutionContext, + dc: DatabaseConfig[PostgresProfile] + ): Future[ArraySeq[(TimeStamp, BigInteger)]] = { + run( + for { + inputs <- sumAddressInputsAsDeltas(address, from, to, intervalType) + outputs <- sumAddressOutputsAsDeltas(address, from, to, intervalType) + } yield { + val ins = inputs.collect { case (ts, Some(u256)) => (ts, u256) }.toMap + val outs = outputs.collect { case (ts, Some(u256)) => (ts, u256) }.toMap + + val timestamps = scala.collection.SortedSet.from(ins.keys ++ outs.keys) + + timestamps + .foldLeft(ArraySeq.empty[(TimeStamp, BigInteger)]) { case (res, ts) => + (ins.get(ts), outs.get(ts)) match { + case (Some(in), Some(out)) => + val diff = out.v.subtract(in.v) + res :+ (ts, diff) + case (Some(in), None) => + // No Output, all inputs are spent + res :+ (ts, in.v.negate) + case (None, Some(out)) => + res :+ (ts, out.v) + case (None, None) => + res + } + } + } + ) + } + def amountHistoryToJsonFlowable(history: Flowable[(BigInteger, TimeStamp)]): Flowable[Buffer] = { history .concatMap { case (diff, to: TimeStamp) => diff --git a/app/src/main/scala/org/alephium/explorer/web/AddressServer.scala b/app/src/main/scala/org/alephium/explorer/web/AddressServer.scala index c5133b42..ebeca57d 100644 --- a/app/src/main/scala/org/alephium/explorer/web/AddressServer.scala +++ b/app/src/main/scala/org/alephium/explorer/web/AddressServer.scala @@ -155,6 +155,23 @@ class AddressServer( }) } } + }), + route(getAddressAmountHistoryDeltas.serverLogic[Future] { + case (address, timeInterval, intervalType) => + validateTimeInterval(timeInterval, intervalType) { + transactionService + .getAmountHistoryAsDeltas( + address, + timeInterval.from, + timeInterval.to, + intervalType + ) + .map { values => + AmountHistory(values.map { case (ts, value) => + TimedAmount(ts, value) + }) + } + } }) ) diff --git a/app/src/test/scala/org/alephium/explorer/service/EmptyTransactionService.scala b/app/src/test/scala/org/alephium/explorer/service/EmptyTransactionService.scala index e33d55e2..a5cb6e59 100644 --- a/app/src/test/scala/org/alephium/explorer/service/EmptyTransactionService.scala +++ b/app/src/test/scala/org/alephium/explorer/service/EmptyTransactionService.scala @@ -125,4 +125,14 @@ trait EmptyTransactionService extends TransactionService { ec: ExecutionContext, dc: DatabaseConfig[PostgresProfile] ): Future[ArraySeq[(TimeStamp, BigInteger)]] = ??? + + def getAmountHistoryAsDeltas( + address: Address, + from: TimeStamp, + to: TimeStamp, + intervalType: IntervalType + )(implicit + ec: ExecutionContext, + dc: DatabaseConfig[PostgresProfile] + ): Future[ArraySeq[(TimeStamp, BigInteger)]] = ??? } diff --git a/app/src/test/scala/org/alephium/explorer/service/TransactionServiceSpec.scala b/app/src/test/scala/org/alephium/explorer/service/TransactionServiceSpec.scala index 00627497..323485f8 100644 --- a/app/src/test/scala/org/alephium/explorer/service/TransactionServiceSpec.scala +++ b/app/src/test/scala/org/alephium/explorer/service/TransactionServiceSpec.scala @@ -490,6 +490,14 @@ class TransactionServiceSpec extends AlephiumActorSpecLike with DatabaseFixtureF history is historyDepracted + val deltas = TransactionService + .getAmountHistoryAsDeltas(address, fromTs, toTs, intervalType) + .futureValue + .map { case (ts, sum) => (ts.millis, sum) } + val deltaResult = deltas.map(_._2).fold(java.math.BigInteger.ZERO)(_ add _) + + deltaResult is history.last._2 + val times = history.map(_._1) // Test that history is always ordered correctly diff --git a/app/src/test/scala/org/alephium/explorer/web/AddressServerSpec.scala b/app/src/test/scala/org/alephium/explorer/web/AddressServerSpec.scala index adf3f769..5b82f8e0 100644 --- a/app/src/test/scala/org/alephium/explorer/web/AddressServerSpec.scala +++ b/app/src/test/scala/org/alephium/explorer/web/AddressServerSpec.scala @@ -131,6 +131,17 @@ class AddressServerSpec() dc: DatabaseConfig[PostgresProfile] ): Future[ArraySeq[(TimeStamp, BigInteger)]] = Future.successful(amountHistory.map { case (ts, bi) => (bi, ts) }) + + override def getAmountHistoryAsDeltas( + address: Address, + from: TimeStamp, + to: TimeStamp, + intervalType: IntervalType + )(implicit + ec: ExecutionContext, + dc: DatabaseConfig[PostgresProfile] + ): Future[ArraySeq[(TimeStamp, BigInteger)]] = + Future.successful(amountHistory.map { case (ts, bi) => (bi, ts) }) } val tokenService = new EmptyTokenService { @@ -340,6 +351,50 @@ class AddressServerSpec() } } + "/addresses/
/amount-history-deltas" should { + val address = addressGen.sample.get + val timestamps = transactions.map(_.timestamp.millis).sorted + val intervalTypes = ArraySeq[IntervalType](IntervalType.Hourly, IntervalType.Daily) + val fromTs = timestamps.head + def maxTimeSpan(intervalType: IntervalType) = intervalType match { + case IntervalType.Hourly => Duration.ofDaysUnsafe(7) + case IntervalType.Daily => Duration.ofDaysUnsafe(365) + case IntervalType.Weekly => Duration.ofDaysUnsafe(365) + } + def getToTs(intervalType: IntervalType) = + fromTs + maxTimeSpan(intervalType).millis + + "return the amount history deltas as json" in { + intervalTypes.foreach { intervalType => + val toTs = getToTs(intervalType) + + Get( + s"/addresses/${address}/amount-history-deltas?fromTs=$fromTs&toTs=$toTs&interval-type=$intervalType" + ) check { response => + response.body is Right( + s"""{"amountHistory":${amountHistory + .map { case (amount, ts) => s"""[${ts.millis},"$amount"]""" } + .mkString("[", ",", "]")}}""" + ) + } + } + } + + "respect the time range and time interval" in { + intervalTypes.foreach { intervalType => + val wrongToTs = getToTs(intervalType) + 1 + + Get( + s"/addresses/${address}/amount-history-deltas?fromTs=$fromTs&toTs=$wrongToTs&interval-type=$intervalType" + ) check { response => + response.body is Left( + s"""{"detail":"Time span cannot be greater than ${maxTimeSpan(intervalType)}"}""" + ) + } + } + } + } + "getTransactionsByAddresses" should { "list transactions for an array of addresses" in { forAll(addressGen) { address => diff --git a/benchmark/src/test/scala/org/alephium/explorer/benchmark/db/DBBenchmark.scala b/benchmark/src/test/scala/org/alephium/explorer/benchmark/db/DBBenchmark.scala index 552ff464..4ba12c4d 100644 --- a/benchmark/src/test/scala/org/alephium/explorer/benchmark/db/DBBenchmark.scala +++ b/benchmark/src/test/scala/org/alephium/explorer/benchmark/db/DBBenchmark.scala @@ -297,7 +297,7 @@ class DBBenchmark { implicit val ec: ExecutionContext = state.config.db.ioExecutionContext implicit val dc: DatabaseConfig[PostgresProfile] = state.config val timestamps = state.blocks.map(_.timestamp) - val from = timestamps.min + val from = timestamps.drop(timestamps.length / 2).min val to = from.plusMillisUnsafe(Duration.ofDaysUnsafe(365L).millis) val intervalType = IntervalType.Daily @@ -307,4 +307,20 @@ class DBBenchmark { val _ = Await.result(res, requestTimeout) } + + @Benchmark + def getAmountHistoryDeltas(state: Address_ReadState): Unit = { + implicit val ec: ExecutionContext = state.config.db.ioExecutionContext + implicit val dc: DatabaseConfig[PostgresProfile] = state.config + val timestamps = state.blocks.map(_.timestamp) + val from = timestamps.drop(timestamps.length / 2).min + val to = from.plusMillisUnsafe(Duration.ofDaysUnsafe(365L).millis) + val intervalType = IntervalType.Daily + + val res = TransactionService + .getAmountHistoryAsDeltas(state.address, from, to, intervalType) + + val _ = + Await.result(res, requestTimeout) + } }