Skip to content

Commit

Permalink
add nestedPoolId and user. Create update function
Browse files Browse the repository at this point in the history
  • Loading branch information
franzns committed Feb 23, 2024
1 parent 4262192 commit 2cca5b7
Show file tree
Hide file tree
Showing 14 changed files with 258 additions and 101 deletions.
15 changes: 10 additions & 5 deletions modules/actions/pool/add-pools-from-subgraph.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { addMissingPoolsFromSubgraph } from './add-pools-from-subgraph';
import { prisma } from '../../../prisma/prisma-client';
import { PrismaPool } from '@prisma/client';
import { VaultPoolFragment as VaultSubgraphPoolFragment } from '../../sources/subgraphs/balancer-v3-vault/generated/types';
import {
SwapFragment,
VaultPoolFragment as VaultSubgraphPoolFragment,
} from '../../sources/subgraphs/balancer-v3-vault/generated/types';
import { TypePoolFragment as PoolSubgraphPoolFragment } from '../../subgraphs/balancer-v3-pools/generated/types';
import { GraphQLClient } from 'graphql-request';

// Mock the module dependencies
jest.mock('@modules/sources/contracts', () => ({
...jest.requireActual('@modules/sources/contracts'),
jest.mock('../../sources/contracts', () => ({
...jest.requireActual('../../sources/contracts'),
fetchErc20Headers: jest.fn().mockResolvedValue({ '2': { name: 'name', symbol: 'symbol' } }),
fetchWeightedPoolData: jest.fn().mockResolvedValue({}),
fetchPoolTokens: jest.fn().mockResolvedValue({}),
Expand All @@ -33,7 +37,8 @@ jest.mock('../../../prisma/prisma-client', () => ({

// describe('syncPools', () => {
// const vaultSubgraphClient = {
// getAllPools: jest.fn().mockResolvedValue([{ id: '1' }, { id: '2' }] as VaultSubgraphPoolFragment[]),
// getAllInitializedPools: jest.fn().mockResolvedValue([{ id: '1' }, { id: '2' }] as VaultSubgraphPoolFragment[]),
// getSwapsSince: jest.fn().mockResolvedValue([{ id: '1' }, { id: '2' }] as SwapFragment[]),
// };
// const poolSubgraphClient = {
// Pools: jest.fn().mockResolvedValue({
Expand All @@ -50,7 +55,7 @@ jest.mock('../../../prisma/prisma-client', () => ({
// });

// it('should fetch pools from vault subgraph', async () => {
// expect(vaultSubgraphClient.getAllPools).toHaveBeenCalled();
// expect(vaultSubgraphClient.getAllInitializedPools).toHaveBeenCalled();
// });

// it('should fetch pools from pools subgraph', async () => {
Expand Down
32 changes: 4 additions & 28 deletions modules/actions/pool/add-pools-from-subgraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
import { V3PoolsSubgraphClient } from '../../subgraphs/balancer-v3-pools';
import { BalancerVaultSubgraphSource } from '../../sources/subgraphs/balancer-v3-vault';
import _ from 'lodash';
import { tokensTransformer } from '../../sources/transformers/tokens-transformer';

type PoolDbEntry = {
pool: Prisma.PrismaPoolCreateInput;
Expand All @@ -26,13 +27,10 @@ type PoolDbEntry = {
export async function addMissingPoolsFromSubgraph(
vaultSubgraphClient: BalancerVaultSubgraphSource,
poolSubgraphClient: V3PoolsSubgraphClient,
// viemClient: ViemClient,
// vaultAddress: string,
chain = 'SEPOLIA' as Chain,
): Promise<string[]> {
// Fetch pools from subgraph
// TODO this needs paging
const vaultSubgraphPools = await vaultSubgraphClient.getAllPools();
const vaultSubgraphPools = await vaultSubgraphClient.getAllInitializedPools();
const { pools: poolSubgraphPools } = await poolSubgraphClient.Pools();

// Find pools missing from the database
Expand All @@ -42,28 +40,7 @@ export async function addMissingPoolsFromSubgraph(

// Store pool tokens and BPT in the tokens table before creating the pools
try {
const allTokens: { address: string; name: string; decimals: number; symbol: string; chain: Chain }[] = [];
missingPools.forEach((pool) => {
allTokens.push({
address: pool.address,
decimals: 18,
name: pool.name,
symbol: pool.symbol,
chain: chain,
});
if (pool.tokens) {
for (const poolToken of pool.tokens) {
allTokens.push({
address: poolToken.address,
decimals: poolToken.decimals,
name: poolToken.name,
symbol: poolToken.symbol,
chain: chain,
});
}
}
});

const allTokens = tokensTransformer(missingPools, chain);
await prisma.prismaToken.createMany({
data: allTokens,
skipDuplicates: true,
Expand Down Expand Up @@ -91,7 +68,7 @@ export async function addMissingPoolsFromSubgraph(
// TODO: Will be great to create all the token data here, including dynamic data
// but for now we can only store static data, because prisma doesn't support nested createMany
// to create dynamic data tabels as well. One solution is to move "dynamicData" to the tokens table
data: poolTokensTransformer(vaultSubgraphPool),
data: poolTokensTransformer(vaultSubgraphPool, chain),
},
},
// placeholder data, will be updated with onchain values
Expand Down Expand Up @@ -124,7 +101,6 @@ export async function addMissingPoolsFromSubgraph(
data: entry.poolTokenDynamicData,
});

// TODO deal with nested pools
await prisma.prismaPoolExpandedTokens.createMany({
skipDuplicates: true,
data: entry.poolExpandedTokens,
Expand Down
101 changes: 101 additions & 0 deletions modules/actions/pool/update-pools-from-subgraph.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { Chain, Prisma, PrismaPoolType } from '@prisma/client';
import { prisma } from '../../../prisma/prisma-client';
import {
poolTransformer,
poolTokensTransformer,
poolTokensDynamicDataTransformer,
poolExpandedTokensTransformer,
} from '../../sources/transformers';
import { V3PoolsSubgraphClient } from '../../subgraphs/balancer-v3-pools';
import { BalancerVaultSubgraphSource } from '../../sources/subgraphs/balancer-v3-vault';
import _ from 'lodash';
import { tokensTransformer } from '../../sources/transformers/tokens-transformer';

type PoolDbEntry = {
pool: Prisma.PrismaPoolCreateInput;
poolTokenDynamicData: Prisma.PrismaPoolTokenDynamicDataCreateManyInput[];
poolExpandedTokens: Prisma.PrismaPoolExpandedTokensCreateManyInput[];
};

/**
* Makes sure that all pools are synced in the database
*
* @param vaultSubgraphClient
* @param poolSubgraphClient
* @param chain
* @returns syncedPools - the pools that were synced
*/
export async function updatePoolsFromSubgraph(
vaultSubgraphClient: BalancerVaultSubgraphSource,
poolSubgraphClient: V3PoolsSubgraphClient,
chain = 'SEPOLIA' as Chain,
) {
// Fetch pools from subgraph
const vaultSubgraphPools = await vaultSubgraphClient.getAllInitializedPools();
const { pools: poolSubgraphPools } = await poolSubgraphClient.Pools();

// Find pools missing from the database
const dbPools = await prisma.prismaPool.findMany({ where: { chain, vaultVersion: 3 } });
const dbPoolIds = new Set(dbPools.map((pool) => pool.id.toLowerCase()));
const presentPools = vaultSubgraphPools.filter((pool) => dbPoolIds.has(pool.id));

// Making sure all tokens are present
try {
const allTokens = tokensTransformer(presentPools, chain);
await prisma.prismaToken.createMany({
data: allTokens,
skipDuplicates: true,
});
} catch (e) {
console.error('Error creating tokens', e);
}

for (const presentPool of presentPools) {
const vaultSubgraphPool = vaultSubgraphPools.find((pool) => pool.id === presentPool.id);
const poolSubgraphPool = poolSubgraphPools.find((pool) => pool.id === presentPool.id);
if (!vaultSubgraphPool || !poolSubgraphPool) {
// That won't happen, but TS doesn't know that
continue;
}

const dbPool = poolTransformer(vaultSubgraphPool, poolSubgraphPool, chain);

await prisma.prismaPool.update({
where: { id_chain: { id: presentPool.id, chain: chain } },
data: {
owner: dbPool.owner,
type: dbPool.type,
typeData: dbPool.typeData,
version: dbPool.version,
},
});

const transformedPoolToken = poolTokensTransformer(vaultSubgraphPool, chain);

for (const poolToken of transformedPoolToken) {
await prisma.prismaPoolToken.update({
where: { id_chain: { id: poolToken.id, chain: chain } },
data: {
nestedPoolId: poolToken.nestedPoolId,
},
});
}

const transformedPoolExpandedTokens = poolExpandedTokensTransformer(vaultSubgraphPool, chain);

for (const poolToken of transformedPoolExpandedTokens) {
await prisma.prismaPoolExpandedTokens.update({
where: {
tokenAddress_poolId_chain: {
chain: chain,
poolId: presentPool.id,
tokenAddress: poolToken.tokenAddress,
},
},
data: {
nestedPoolId: poolToken.nestedPoolId,
},
});
}
}
}
43 changes: 6 additions & 37 deletions modules/actions/swap/add-swaps-from-subgraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { BalancerVaultSubgraphSource } from '../../sources/subgraphs/balancer-v3
import _ from 'lodash';
import moment from 'moment';
import { tokenService } from '../../token/token.service';
import { swapsTransformer } from '../../sources/transformers/swaps-transformer';

type PoolDbEntry = {
pool: Prisma.PrismaPoolCreateInput;
Expand All @@ -20,64 +21,32 @@ type PoolDbEntry = {
*/
export async function syncSwapsFromSubgraph(
vaultSubgraphClient: BalancerVaultSubgraphSource,
// viemClient: ViemClient,
// vaultAddress: string,
chain = 'SEPOLIA' as Chain,
daysToSync = 7,
daysToSync = 30,
): Promise<string[]> {
const poolIds = new Set<string>();
const txs = new Set<string>();
const tokenPrices = await tokenService.getTokenPrices(chain);

// only sync from the latest swap in DB to avoid duplicate work
const lastSwap = await prisma.prismaPoolSwap.findFirst({
orderBy: { timestamp: 'desc' },
where: { chain: chain },
});

//ensure we only query the last daysToSync worth of swaps
const daysToSyncTimestamp = moment().subtract(daysToSync, 'day').unix();
//ensure we only sync the last 48 hours worth of swaps
const timestamp = lastSwap && lastSwap.timestamp > daysToSyncTimestamp ? lastSwap.timestamp : daysToSyncTimestamp;

// TODO use paging
const swaps = await vaultSubgraphClient.getSwapsSince(timestamp);

await prisma.prismaPoolSwap.createMany({
skipDuplicates: true,
data: swaps.map((swap) => {
let valueUSD = 0;
const tokenInPrice = tokenService.getPriceForToken(tokenPrices, swap.tokenIn); // TODO need to get price close to swap timestamp
const tokenOutPrice = tokenService.getPriceForToken(tokenPrices, swap.tokenOut); // TODO need to get price close to swap timestamp

if (tokenInPrice > 0) {
valueUSD = tokenInPrice * parseFloat(swap.tokenAmountIn);
} else {
valueUSD = tokenOutPrice * parseFloat(swap.tokenAmountOut);
}

poolIds.add(swap.pool);
txs.add(swap.transactionHash);

return {
id: swap.id,
chain: chain,
timestamp: parseFloat(swap.blockTimestamp),
poolId: swap.pool,
userAddress: '0x000', //swap.user.id,
tokenIn: swap.tokenIn,
tokenInSym: swap.tokenIn, // TODO add symbol
tokenOut: swap.tokenOut,
tokenOutSym: swap.tokenOut, // TODO add symbol
tokenAmountIn: swap.tokenAmountIn,
tokenAmountOut: swap.tokenAmountOut,
tx: swap.transactionHash,
valueUSD,
};
}),
data: await swapsTransformer(swaps, chain),
});

// Do we need to create batch swaps as well?
// await this.createBatchSwaps(Array.from(txs));

// Remove everything older that daysToSync
await prisma.prismaPoolSwap.deleteMany({
where: {
timestamp: { lt: daysToSyncTimestamp },
Expand Down
2 changes: 1 addition & 1 deletion modules/controllers/jobs-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export function JobsController(tracer?: any) {
// find all missing pools and add them to the DB
const added = await addMissingPoolsFromSubgraph(vaultSubgraphClient, poolSubgraphClient, chain);

// update with latest on-chain data (needed?)
// update with latest on-chain data (needed? this will run on a separate job anyway)
const updated = await updateOnChainDataForPools(vaultAddress, '123', added, viemClient, latestBlock);

return updated;
Expand Down
27 changes: 27 additions & 0 deletions modules/controllers/pools-controller.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import config from '../../config';
import { updateOnchainDataForAllPools } from '../actions/pool/update-on-chain-data';
import { updatePoolsFromSubgraph } from '../actions/pool/update-pools-from-subgraph';
import { syncSwapsFromSubgraph } from '../actions/swap/add-swaps-from-subgraph';
import { updateVolumeAndFees } from '../actions/swap/update-volume-and-fees';
import { chainIdToChain } from '../network/chain-id-to-chain';
import { BalancerVaultSubgraphSource } from '../sources/subgraphs/balancer-v3-vault';
import { getViemClient } from '../sources/viem-client';
import { getPoolsSubgraphClient } from '../subgraphs/balancer-v3-pools';

/**
* Controller responsible for matching job requests to configured job handlers
Expand All @@ -17,6 +19,31 @@ export function PoolsController(tracer?: any) {
// Setup tracing
// ...
return {
async updatePoolsFromSubgraph(chainIds: string[]) {
const updatedPools: string[] = [];
for (const chainId of chainIds) {
const chain = chainIdToChain[chainId];
const {
subgraphs: { balancerV3, balancerPoolsV3 },
} = config[chain];

// Guard against unconfigured chains
if (!balancerV3) {
throw new Error(`Chain not configured: ${chain}`);
}

const vaultSubgraphClient = new BalancerVaultSubgraphSource(balancerV3);
const poolSubgraphClient = getPoolsSubgraphClient(balancerPoolsV3!);

// TODO: add syncing v2 pools as well by splitting the poolService into separate
// actions with extracted configuration

// find all missing pools and add them to the DB
const added = await updatePoolsFromSubgraph(vaultSubgraphClient, poolSubgraphClient, chain);
}

return updatedPools;
},
async updateOnChainDataForAllPools(chainId: string) {
const chain = chainIdToChain[chainId];
const {
Expand Down
2 changes: 1 addition & 1 deletion modules/pool/subgraph-mapper.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Chain, PrismaPoolType } from '@prisma/client';
import { Chain, Prisma, PrismaPoolType } from '@prisma/client';
import { BalancerPoolFragment } from '../subgraphs/balancer-subgraph/generated/balancer-subgraph-types';
import { AddressZero } from '@ethersproject/constants';
import { fx, gyro, linear, element, stable } from './pool-data';
Expand Down
14 changes: 3 additions & 11 deletions modules/sources/subgraphs/balancer-v3-vault/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
import { GraphQLClient } from 'graphql-request';
import {
OrderDirection,
Pool_OrderBy,
PoolsQuery,
SwapFragment,
Swap_OrderBy,
VaultPoolFragment,
getSdk,
} from './generated/types';
import { OrderDirection, Pool_OrderBy, SwapFragment, Swap_OrderBy, VaultPoolFragment, getSdk } from './generated/types';

export class BalancerVaultSubgraphSource {
private sdk: ReturnType<typeof getSdk>;
Expand All @@ -20,15 +12,15 @@ export class BalancerVaultSubgraphSource {
this.sdk = getSdk(new GraphQLClient(subgraphUrl));
}

public async getAllPools(): Promise<VaultPoolFragment[]> {
public async getAllInitializedPools(): Promise<VaultPoolFragment[]> {
const limit = 1000;
let hasMore = true;
let id = `0x`;
let pools: VaultPoolFragment[] = [];

while (hasMore) {
const response = await this.sdk.Pools({
where: { id_gt: id },
where: { id_gt: id, isInitialized: true },
orderBy: Pool_OrderBy.Id,
orderDirection: OrderDirection.Asc,
first: limit,
Expand Down
3 changes: 3 additions & 0 deletions modules/sources/subgraphs/balancer-v3-vault/pools.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ fragment VaultPool on Pool {
balance
totalProtocolSwapFee
totalProtocolYieldFee
nestedPool {
id
}
}
rateProviders {
address
Expand Down
Loading

0 comments on commit 2cca5b7

Please sign in to comment.