Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Argus] Improves 'HEAD /assets/{id}' requests latency by implementing caching QN requests #4834

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions distributor-node/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
##

### 1.3.0

- Adds support for TTL based caching of `StorageDataObject` QN entity for `HEAD /assets` requests. The TTL is configurable using `interval.queryNodeCacheTTL` flag.

### 1.2.2

- **FIX** `sendExtrinsic`: The send extrinsic function (which is a wrapper around PolkadotJS `tx.signAndSend` function) has been fixed to handle the case when tx has been finalized before the callback registered in `tx.signAndSend` would run.
Expand Down
1 change: 1 addition & 0 deletions distributor-node/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ limits:
outboundRequestsTimeoutMs: 5000
pendingDownloadTimeoutSec: 3600
maxCachedItemSize: 1G
queryNodeCacheTTL: 60
intervals:
saveCacheState: 60
checkStorageNodeResponseTimes: 60
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
## queryNodeCacheTTL Type

`integer`

## queryNodeCacheTTL Constraints

**minimum**: the value of this number must greater than or equal to: `1`

## queryNodeCacheTTL Default Value

The default value is:

```json
60
```
31 changes: 31 additions & 0 deletions distributor-node/docs/schema/definition-properties-limits.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
| [pendingDownloadTimeoutSec](#pendingdownloadtimeoutsec) | `integer` | Required | cannot be null | [Distributor node configuration](definition-properties-limits-properties-pendingdownloadtimeoutsec.md "https://joystream.org/schemas/argus/config#/properties/limits/properties/pendingDownloadTimeoutSec") |
| [maxCachedItemSize](#maxcacheditemsize) | `string` | Optional | cannot be null | [Distributor node configuration](definition-properties-limits-properties-maxcacheditemsize.md "https://joystream.org/schemas/argus/config#/properties/limits/properties/maxCachedItemSize") |
| [dataObjectSourceByObjectIdTTL](#dataobjectsourcebyobjectidttl) | `integer` | Optional | cannot be null | [Distributor node configuration](definition-properties-limits-properties-dataobjectsourcebyobjectidttl.md "https://joystream.org/schemas/argus/config#/properties/limits/properties/dataObjectSourceByObjectIdTTL") |
| [queryNodeCacheTTL](#querynodecachettl) | `integer` | Optional | cannot be null | [Distributor node configuration](definition-properties-limits-properties-querynodecachettl.md "https://joystream.org/schemas/argus/config#/properties/limits/properties/queryNodeCacheTTL") |

## storage

Expand Down Expand Up @@ -187,3 +188,33 @@ The default value is:
```json
60
```

## queryNodeCacheTTL

TTL (in seconds) for the Apollo's InMemoryCache, to cache the data fetched from the query node.

`queryNodeCacheTTL`

* is optional

* Type: `integer`

* cannot be null

* defined in: [Distributor node configuration](definition-properties-limits-properties-querynodecachettl.md "https://joystream.org/schemas/argus/config#/properties/limits/properties/queryNodeCacheTTL")

### queryNodeCacheTTL Type

`integer`

### queryNodeCacheTTL Constraints

**minimum**: the value of this number must greater than or equal to: `1`

### queryNodeCacheTTL Default Value

The default value is:

```json
60
```
5 changes: 3 additions & 2 deletions distributor-node/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@joystream/distributor-cli",
"description": "Joystream distributor node CLI",
"version": "1.2.2",
"version": "1.3.0",
"author": "Joystream contributors",
"bin": {
"joystream-distributor": "./bin/run"
Expand All @@ -14,6 +14,7 @@
"@joystream/opentelemetry": "1.0.0",
"@joystream/storage-node-client": "^3.0.0",
"@joystream/types": "^2.0.0",
"@nerdwallet/apollo-cache-policies": "2.10.0",
"@oclif/command": "^1",
"@oclif/config": "^1",
"@oclif/plugin-help": "^3",
Expand Down Expand Up @@ -66,12 +67,12 @@
"@types/cors": "^2.8.12",
"@types/express-http-proxy": "^1.6.2",
"@types/inquirer": "^8.1.1",
"@types/mime": "^3.0.1",
"@types/mocha": "^5",
"@types/node": "^14",
"@types/node-cache": "^4.2.5",
"@types/node-cleanup": "^2.1.1",
"@types/send": "^0.17.0",
"@types/mime": "^3.0.1",
"@types/ws": "^5.1.2",
"chai": "^4",
"globby": "^10",
Expand Down
6 changes: 6 additions & 0 deletions distributor-node/src/schemas/configSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ export const configSchema: JSONSchema4 = objectSchema({
type: 'integer',
minimum: 1,
},
queryNodeCacheTTL: {
description: ` TTL (in seconds) for the Apollo's InMemoryCache, to cache the data fetched from the query node.`,
default: 60,
type: 'integer',
minimum: 1,
},
},
required: [
'storage',
Expand Down
5 changes: 3 additions & 2 deletions distributor-node/src/services/content/ContentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { NetworkingService } from '../networking'
import { ContentHash } from '../crypto/ContentHash'
import { PendingDownloadStatusType } from '../networking/PendingDownload'
import { FSP } from './FSPromise'
import { QueryFetchPolicy } from '../networking/query-node/api'

export const DEFAULT_CONTENT_TYPE = 'application/octet-stream'
export const MIME_TYPE_DETECTION_CHUNK_SIZE = 4100
Expand Down Expand Up @@ -313,7 +314,7 @@ export class ContentService {
})
}

public async objectStatus(objectId: string): Promise<ObjectStatus> {
public async objectStatus(objectId: string, qnFetchPolicy: QueryFetchPolicy = 'no-cache'): Promise<ObjectStatus> {
const pendingDownload = this.stateCache.getPendingDownload(objectId)

if (!pendingDownload && this.exists(objectId)) {
Expand All @@ -324,7 +325,7 @@ export class ContentService {
return { type: ObjectStatusType.PendingDownload, pendingDownload }
}

const objectInfo = await this.networking.dataObjectInfo(objectId)
const objectInfo = await this.networking.dataObjectInfo(objectId, qnFetchPolicy)
if (!objectInfo.exists) {
return { type: ObjectStatusType.NotFound }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ export class PublicApiController {

public async assetHead(req: express.Request<AssetRouteParams>, res: express.Response): Promise<void> {
const { objectId } = req.params
const objectStatus = await this.content.objectStatus(objectId)
const objectStatus = await this.content.objectStatus(objectId, 'cache-first')

res.setHeader('timing-allow-origin', '*')
res.setHeader('accept-ranges', 'bytes')
Expand Down
8 changes: 4 additions & 4 deletions distributor-node/src/services/networking/NetworkingService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ReadonlyConfig } from '../../types/config'
import { QueryNodeApi } from './query-node/api'
import { QueryFetchPolicy, QueryNodeApi } from './query-node/api'
import { Logger } from 'winston'
import { LoggingService } from '../logging'
import { StorageNodeApi } from './storage-node/api'
Expand Down Expand Up @@ -50,7 +50,7 @@ export class NetworkingService {
this.logging = logging
this.stateCache = stateCache
this.logger = logging.createLogger('NetworkingManager')
this.queryNodeApi = new QueryNodeApi(config.endpoints.queryNode, this.logging)
this.queryNodeApi = new QueryNodeApi(config, this.logging)
void this.checkActiveStorageNodeEndpoints()
// Queues
this.testLatencyQueue = queue({ concurrency: MAX_CONCURRENT_RESPONSE_TIME_CHECKS, autostart: true }).on(
Expand Down Expand Up @@ -137,8 +137,8 @@ export class NetworkingService {
return parsed
}

public async dataObjectInfo(objectId: string): Promise<DataObjectInfo> {
const details = await this.queryNodeApi.getDataObjectDetails(objectId)
public async dataObjectInfo(objectId: string, fetchPolicy: QueryFetchPolicy): Promise<DataObjectInfo> {
const details = await this.queryNodeApi.getDataObjectDetails(objectId, fetchPolicy)
let exists = false
let isSupported = false
let isAccepted = false
Expand Down
80 changes: 56 additions & 24 deletions distributor-node/src/services/networking/query-node/api.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import {
ApolloClient,
DocumentNode,
FetchPolicy,
HttpLink,
InMemoryCache,
NormalizedCacheObject,
defaultDataIdFromObject,
from,
split,
} from '@apollo/client/core'
import { onError } from '@apollo/client/link/error'
import { WebSocketLink } from '@apollo/client/link/ws'
import { getMainDefinition } from '@apollo/client/utilities'
import { InvalidationPolicyCache } from '@nerdwallet/apollo-cache-policies'
import fetch from 'cross-fetch'
import { FragmentDefinitionNode } from 'graphql'
import { Logger } from 'winston'
import ws from 'ws'
import { ReadonlyConfig } from '../../../types'
import { LoggingService } from '../../logging'
import {
DataObjectDetailsFragment,
Expand Down Expand Up @@ -41,6 +43,8 @@ import { Maybe } from './generated/schema'

const MAX_RESULTS_PER_QUERY = 1000

export type QueryFetchPolicy = Extract<FetchPolicy, 'cache-first' | 'no-cache'>

type PaginationQueryVariables = {
limit: number
lastCursor?: Maybe<string>
Expand All @@ -57,20 +61,23 @@ type PaginationQueryResult<T = unknown> = {
type CustomVariables<T> = Omit<T, keyof PaginationQueryVariables>

export class QueryNodeApi {
private config: ReadonlyConfig
private apolloClient: ApolloClient<NormalizedCacheObject>
private logger: Logger

public constructor(endpoint: string, logging: LoggingService, exitOnError = false) {
public constructor(config: ReadonlyConfig, logging: LoggingService, exitOnError = false) {
this.config = config

this.logger = logging.createLogger('QueryNodeApi')
const errorLink = onError(({ graphQLErrors, networkError }) => {
const message = networkError?.message || 'Graphql syntax errors found'
this.logger.error('Error when trying to execute a query!', { err: { message, graphQLErrors, networkError } })
exitOnError && process.exit(-1)
})

const queryLink = from([errorLink, new HttpLink({ uri: endpoint, fetch })])
const queryLink = from([errorLink, new HttpLink({ uri: this.config.endpoints.queryNode, fetch })])
const wsLink = new WebSocketLink({
uri: endpoint,
uri: this.config.endpoints.queryNode,
options: {
reconnect: true,
},
Expand All @@ -87,13 +94,20 @@ export class QueryNodeApi {

this.apolloClient = new ApolloClient({
link: splitLink,
cache: new InMemoryCache({
dataIdFromObject: (object) => {
// setup cache object id for ProcessorState entity type
if (object.__typename === 'ProcessorState') {
return object.__typename
}
return defaultDataIdFromObject(object)
// Ref: https://www.apollographql.com/docs/react/api/core/ApolloClient/#assumeimmutableresults
assumeImmutableResults: true,
cache: new InvalidationPolicyCache({
typePolicies: {
ProcessorState: {
keyFields: (object) => object.__typename,
},
},
invalidationPolicies: {
types: {
StorageDataObject: {
timeToLive: (this.config.limits.queryNodeCacheTTL || 60) * 1000, // in MS,
},
},
},
}),
defaultOptions: { query: { fetchPolicy: 'no-cache', errorPolicy: 'all' } },
Expand All @@ -107,9 +121,12 @@ export class QueryNodeApi {
>(
query: DocumentNode,
variables: VariablesT,
resultKey: keyof QueryT
resultKey: keyof QueryT,
fetchPolicy: QueryFetchPolicy
): Promise<Required<QueryT>[keyof QueryT] | null> {
return (await this.apolloClient.query<QueryT, VariablesT>({ query, variables })).data[resultKey] || null
return (
(await this.apolloClient.query<QueryT, VariablesT>({ query, variables, fetchPolicy })).data[resultKey] || null
)
}

// Get entities by "non-unique" input and return first result
Expand Down Expand Up @@ -172,12 +189,30 @@ export class QueryNodeApi {
})
}

public getDataObjectDetails(objectId: string): Promise<DataObjectDetailsFragment | null> {
return this.uniqueEntityQuery<GetDataObjectDetailsQuery, GetDataObjectDetailsQueryVariables>(
protected readFragment<FragmentT, VariablesT extends Record<string, unknown>>(
id: string,
fragment: DocumentNode
): FragmentT | null {
// Get the fragment name, usually first element of the definitions array is the name of the top level fragment
const fragmentName = (fragment.definitions[0] as FragmentDefinitionNode).name.value
if (!fragmentName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to also check that the fragmentName === "id"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I checked that Id isn't necessarily same as the fragment name. id refers to the cached object ID (e.g. for the StorageDataObject entity type it can be StorageDataObject:1) however the fragment name refers to the specific fragment of the entity type that we want to query from the cache, e.g. DataObjectDetails

throw new Error('Unable to extract fragment name from provided DocumentNode.')
}
return this.apolloClient.cache.readFragment<FragmentT, VariablesT>({ id, fragment, fragmentName })
}

public async getDataObjectDetails(
objectId: string,
fetchPolicy: QueryFetchPolicy
): Promise<DataObjectDetailsFragment | null> {
const result = await this.uniqueEntityQuery<GetDataObjectDetailsQuery, GetDataObjectDetailsQueryVariables>(
GetDataObjectDetails,
{ id: objectId },
'storageDataObjectByUniqueInput'
'storageDataObjectByUniqueInput',
fetchPolicy
)

return result
}

public getDistributionBucketsWithObjectsByIds(ids: string[]): Promise<DistirubtionBucketWithObjectsFragment[]> {
Expand Down Expand Up @@ -206,13 +241,10 @@ export class QueryNodeApi {

public async getQueryNodeState(): Promise<QueryNodeStateFieldsFragment | null> {
// fetch cached state
const cachedState = this.apolloClient.readFragment<
QueryNodeStateSubscription['stateSubscription'],
QueryNodeStateSubscriptionVariables
>({
id: 'ProcessorState',
fragment: QueryNodeStateFields,
})
const cachedState = this.readFragment<QueryNodeStateFieldsFragment, QueryNodeStateSubscriptionVariables>(
'ProcessorState',
QueryNodeStateFields
)

// If we have the state in cache, return it
if (cachedState) {
Expand Down
11 changes: 10 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2539,6 +2539,15 @@
resolved "https://registry.yarnpkg.com/@n1ru4l/graphql-live-query/-/graphql-live-query-0.9.0.tgz#defaebdd31f625bee49e6745934f36312532b2bc"
integrity sha512-BTpWy1e+FxN82RnLz4x1+JcEewVdfmUhV1C6/XYD5AjS7PQp9QFF7K8bCD6gzPTr2l+prvqOyVueQhFJxB1vfg==

"@nerdwallet/[email protected]":
version "2.10.0"
resolved "https://registry.yarnpkg.com/@nerdwallet/apollo-cache-policies/-/apollo-cache-policies-2.10.0.tgz#27c258aaa6479b6c7edd54dc8ea14584e1fdf27c"
integrity sha512-Kh2mzBnC0DBzdqi715qUymnqCEOknnre2CpyA4ZusQ2MYN33P0K7ztVYoFdGyncvf/3wYFdgrhatmT6QvYa+Yw==
dependencies:
graphql "^15.5.0"
lodash "^4.17.21"
uuid "^7.0.3"

"@nestjs/[email protected]":
version "8.4.4"
resolved "https://registry.yarnpkg.com/@nestjs/common/-/common-8.4.4.tgz#0914c6c0540b5a344c7c8fd6072faa1a49af1158"
Expand Down Expand Up @@ -11163,7 +11172,7 @@ graphql-ws@^5.4.1:
resolved "https://registry.yarnpkg.com/graphql-ws/-/graphql-ws-5.8.2.tgz#800184b1addb20b3010dc06cb70877703a5fff20"
integrity sha512-hYo8kTGzxePFJtMGC7Y4cbypwifMphIJJ7n4TDcVUAfviRwQBnmZAbfZlC+XFwWDUaR7raEDQPxWctpccmE0JQ==

"graphql@^0.11.0 || ^0.12.0 || ^0.13.0 || ^14.0.0", graphql@^14.7.0, graphql@^15.3.0, graphql@^15.4.0, graphql@^15.8.0:
"graphql@^0.11.0 || ^0.12.0 || ^0.13.0 || ^14.0.0", graphql@^14.7.0, graphql@^15.3.0, graphql@^15.4.0, graphql@^15.5.0, graphql@^15.8.0:
version "15.8.0"
resolved "https://registry.yarnpkg.com/graphql/-/graphql-15.8.0.tgz#33410e96b012fa3bdb1091cc99a94769db212b38"
integrity sha512-5gghUc24tP9HRznNpV2+FIoq3xKkj5dTQqf4v0CpdPbFVwFkWoxOM+o+2OC9ZSvjEMTjfmG9QT+gcvggTwW1zw==
Expand Down
Loading