Skip to content

Commit

Permalink
Fix/homepage scoring (#251)
Browse files Browse the repository at this point in the history
* Update video relevance manager query

* Remove single video schedules for relevance recalc

* Fix namings

* Reformat query

* Revert "Remove single video schedules for relevance recalc"

This reverts commit 3a685b9

* Adjust logic for single channel recalc

* Unblock video relevance recalc only after processor reaches last exported block

* Increase global recalc interval

* Avoid query if set is empty

* Introduce smaller interval for scheduled channels

* Typo fix

Co-authored-by: Zeeshan Akram <[email protected]>

* Linter

* Add video recalc on channel weight change

---------

Co-authored-by: Zeeshan Akram <[email protected]>
  • Loading branch information
WRadoslaw and zeeshanakram3 authored Dec 6, 2023
1 parent 4c6e3af commit c8b2c2e
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 61 deletions.
6 changes: 3 additions & 3 deletions src/mappings/content/commentsAndReactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ export async function processReactVideoMessage(
existingReaction
)

videoRelevanceManager.scheduleRecalcForVideo(video.id)
videoRelevanceManager.scheduleRecalcForChannel(channelId)

return new MetaprotocolTransactionResultOK()
}
Expand Down Expand Up @@ -497,7 +497,7 @@ export async function processCreateCommentMessage(
// schedule comment counters update
commentCountersManager.scheduleRecalcForComment(comment.parentCommentId)
commentCountersManager.scheduleRecalcForVideo(comment.videoId)
videoRelevanceManager.scheduleRecalcForVideo(comment.videoId)
videoRelevanceManager.scheduleRecalcForChannel(video.channelId)

const event = overlay.getRepository(Event).new({
...genericEventFields(overlay, block, indexInBlock, txHash),
Expand Down Expand Up @@ -661,7 +661,7 @@ export async function processDeleteCommentMessage(
// schedule comment counters update
commentCountersManager.scheduleRecalcForComment(comment.parentCommentId)
commentCountersManager.scheduleRecalcForVideo(comment.videoId)
videoRelevanceManager.scheduleRecalcForVideo(comment.videoId)
videoRelevanceManager.scheduleRecalcForChannel(video.channelId)

// update the comment
comment.text = ''
Expand Down
2 changes: 0 additions & 2 deletions src/mappings/content/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ import {
genericEventFields,
invalidMetadata,
metaprotocolTransactionFailure,
videoRelevanceManager,
} from '../utils'
import {
AsDecoded,
Expand Down Expand Up @@ -582,7 +581,6 @@ export async function processModerateCommentMessage(
// schedule comment counters updates
commentCountersManager.scheduleRecalcForComment(comment.parentCommentId)
commentCountersManager.scheduleRecalcForVideo(comment.videoId)
videoRelevanceManager.scheduleRecalcForVideo(comment.videoId)

comment.text = ''
comment.status = CommentStatus.MODERATED
Expand Down
2 changes: 1 addition & 1 deletion src/mappings/content/video.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export async function processVideoCreatedEvent({
videoRelevance: 0,
})

videoRelevanceManager.scheduleRecalcForVideo(videoId)
videoRelevanceManager.scheduleRecalcForChannel(channelId.toString())

// fetch related channel and owner
const channel = await overlay.getRepository(Channel).getByIdOrFail(channelId.toString())
Expand Down
6 changes: 5 additions & 1 deletion src/mappings/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ import { NextEntityIdManager } from '../utils/NextEntityIdManager'

export const commentCountersManager = new CommentCountersManager()
export const videoRelevanceManager = new VideoRelevanceManager()
// eslint-disable-next-line no-void
void videoRelevanceManager.init({
fullUpdateLoopTime: 1000 * 60 * 60 * 12, // 12 hrs
scheduledUpdateLoopTime: 1000 * 60 * 10, // 10 mins
})
export const migrateCounters = new NextEntityIdManager()
videoRelevanceManager.init(1000 * 60 * 60)

export const JOYSTREAM_SS58_PREFIX = 126

Expand Down
10 changes: 9 additions & 1 deletion src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ async function processEvent<EventName extends EventNames>(
async function afterDbUpdate(em: EntityManager) {
await commentCountersManager.updateVideoCommentsCounters(em)
await commentCountersManager.updateParentRepliesCounters(em)
await videoRelevanceManager.updateVideoRelevanceValue(em)
}

processor.run(new TypeormDatabase({ isolationLevel: 'READ COMMITTED' }), async (ctx) => {
Expand Down Expand Up @@ -314,9 +313,18 @@ processor.run(new TypeormDatabase({ isolationLevel: 'READ COMMITTED' }), async (
}
}
}

if (
!videoRelevanceManager.isVideoRelevanceEnabled &&
block.header.height >= exportBlockNumber
) {
videoRelevanceManager.turnOnVideoRelevanceManager()
}

// Importing exported offchain state
if (block.header.height >= exportBlockNumber && !offchainState.isImported) {
ctx.log.info(`Export block ${exportBlockNumber} reached, importing offchain state...`)
// there is no need to recalc video relevance before orion is synced
await overlay.updateDatabase()
const em = overlay.getEm()
await offchainState.import(em)
Expand Down
2 changes: 2 additions & 0 deletions src/server-extension/resolvers/AdminResolver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ export class AdminResolver {
}
)

await videoRelevanceManager.updateVideoRelevanceValue(em, true)

// Push the result into the results array
results.push({
channelId,
Expand Down
4 changes: 0 additions & 4 deletions src/server-extension/resolvers/AdminResolver/utils.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import { EntityManager, In } from 'typeorm'
import { CommentCountersManager } from '../../../utils/CommentsCountersManager'
import { Comment } from '../../../model'
import { VideoRelevanceManager } from '../../../utils/VideoRelevanceManager'

export async function processCommentsCensorshipStatusUpdate(em: EntityManager, ids: string[]) {
const manager = new CommentCountersManager()
const videoRelevanceManager = new VideoRelevanceManager()
const comments = await em.getRepository(Comment).find({ where: { id: In(ids) } })
comments.forEach((c) => {
manager.scheduleRecalcForComment(c.parentCommentId)
manager.scheduleRecalcForVideo(c.videoId)
videoRelevanceManager.scheduleRecalcForVideo(c.videoId)
})
await manager.updateVideoCommentsCounters(em)
await manager.updateParentRepliesCounters(em)
await videoRelevanceManager.updateVideoRelevanceValue(em)
}
3 changes: 1 addition & 2 deletions src/server-extension/resolvers/VideosResolver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,9 @@ export class VideosResolver {

const tick = await config.get(ConfigVariable.VideoRelevanceViewsTick, em)
if (video.viewsNum % tick === 0) {
videoRelevanceManager.scheduleRecalcForVideo(videoId)
videoRelevanceManager.scheduleRecalcForChannel(video.channelId)
}
await em.save([video, video.channel, newView])
await videoRelevanceManager.updateVideoRelevanceValue(em)
return {
videoId,
viewsNum: video.viewsNum,
Expand Down
135 changes: 88 additions & 47 deletions src/utils/VideoRelevanceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,35 @@ import { globalEm } from './globalEm'
// constant used to parse seconds from creation
export const NEWNESS_SECONDS_DIVIDER = 60 * 60 * 24

type VideoRelevanceManagerLoops = {
fullUpdateLoopTime: number
scheduledUpdateLoopTime: number
}

export class VideoRelevanceManager {
private videosToUpdate: Set<string> = new Set()
private channelsToUpdate: Set<string> = new Set()
private _isVideoRelevanceEnabled = false

public get isVideoRelevanceEnabled(): boolean {
return this._isVideoRelevanceEnabled
}

async init({
fullUpdateLoopTime,
scheduledUpdateLoopTime,
}: VideoRelevanceManagerLoops): Promise<void> {
const em = await globalEm

this.updateScheduledLoop(em, scheduledUpdateLoopTime)
.then(() => {
/* Do nothing */
})
.catch((err) => {
console.error(err)
process.exit(-1)
})

init(intervalMs: number): void {
this.updateLoop(intervalMs)
this.updateFullUpdateLoop(em, fullUpdateLoopTime)
.then(() => {
/* Do nothing */
})
Expand All @@ -19,68 +43,85 @@ export class VideoRelevanceManager {
})
}

scheduleRecalcForVideo(id: string | null | undefined) {
turnOnVideoRelevanceManager() {
this._isVideoRelevanceEnabled = true
}

scheduleRecalcForChannel(id: string | null | undefined) {
if (id) {
this.videosToUpdate.add(id)
this.channelsToUpdate.add(id)
}
}

async updateVideoRelevanceValue(em: EntityManager, forceUpdateAll?: boolean) {
if (this.videosToUpdate.size || forceUpdateAll) {
const [
newnessWeight,
viewsWeight,
commentsWeight,
reactionsWeight,
[joystreamTimestampWeight, ytTimestampWeight] = [7, 3],
defaultChannelWeight,
] = await config.get(ConfigVariable.RelevanceWeights, em)
const channelWeight = defaultChannelWeight ?? 1
await em.query(`
WITH weighted_timestamp AS (
SELECT
"video"."id",
(
if (!this._isVideoRelevanceEnabled || !(this.channelsToUpdate.size || forceUpdateAll)) {
return
}

const [
newnessWeight,
viewsWeight,
commentsWeight,
reactionsWeight,
[joystreamTimestampWeight, ytTimestampWeight] = [7, 3],
defaultChannelWeight,
] = await config.get(ConfigVariable.RelevanceWeights, em)
const channelWeight = defaultChannelWeight ?? 1
const wtEpoch = `((
extract(epoch from video.created_at)*${joystreamTimestampWeight} +
COALESCE(extract(epoch from video.published_before_joystream), extract(epoch from video.created_at))*${ytTimestampWeight}
) / ${joystreamTimestampWeight} + ${ytTimestampWeight} as wtEpoch,
"channel"."channel_weight" as CW
FROM
"video"
INNER JOIN
"channel" ON "video"."channel_id" = "channel"."id"
) / ${joystreamTimestampWeight} + ${ytTimestampWeight})`

await em.query(`
WITH videos_with_weight AS (
SELECT
video.id as videoId,
channel.id as channelId,
(ROUND((
(extract(epoch from now()) - ${wtEpoch})
/ ${NEWNESS_SECONDS_DIVIDER} * ${newnessWeight * -1}
+ (views_num * ${viewsWeight})
+ (comments_count * ${commentsWeight})
+ (reactions_count * ${reactionsWeight}))
* COALESCE(channel.channel_weight, ${channelWeight}),2)) as videoRelevance
FROM video
INNER JOIN channel ON video.channel_id = channel.id
${
forceUpdateAll
? ''
: `WHERE "video"."id" IN (${[...this.videosToUpdate.values()]
: `WHERE video.channel_id in (${[...this.channelsToUpdate.values()]
.map((id) => `'${id}'`)
.join(', ')})`
}
)
UPDATE
"video"
SET
"video_relevance" = ROUND(
(
(extract(epoch from now()) - wtEpoch) / ${NEWNESS_SECONDS_DIVIDER} * ${newnessWeight * -1} +
(views_num * ${viewsWeight}) +
(comments_count * ${commentsWeight}) +
(reactions_count * ${reactionsWeight})
) * COALESCE(CW, ${channelWeight}),
2)
FROM
weighted_timestamp
WHERE
"video".id = weighted_timestamp.id;
ORDER BY video.id),
top_channel_score as (
SELECT
channel.id as channelId,
MAX(videoCte.videoRelevance) as maxChannelRelevance
FROM channel
INNER JOIN videos_with_weight as videoCte on videoCte.channelId = channel.id
GROUP BY channel.id)
UPDATE video
SET video_relevance = COALESCE(topChannelVideo.maxChannelRelevance, 1)
FROM videos_with_weight as videoCte
LEFT JOIN top_channel_score as topChannelVideo on topChannelVideo.channelId = videoCte.channelId and topChannelVideo.maxChannelRelevance = videoCte.videoRelevance
WHERE video.id = videoCte.videoId;
`)
this.videosToUpdate.clear()
this.channelsToUpdate.clear()
}

private async updateScheduledLoop(em: EntityManager, intervalMs: number): Promise<void> {
while (true) {
await this.updateVideoRelevanceValue(em)
await new Promise((resolve) => setTimeout(resolve, intervalMs))
}
}

private async updateLoop(intervalMs: number): Promise<void> {
const em = await globalEm
private async updateFullUpdateLoop(em: EntityManager, intervalMs: number): Promise<void> {
while (true) {
await this.updateVideoRelevanceValue(em, true)
await this.updateVideoRelevanceValue(em)
await new Promise((resolve) => setTimeout(resolve, intervalMs))
}
}
Expand Down

0 comments on commit c8b2c2e

Please sign in to comment.