Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ordered concurrency #31

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"typescript": "^5.5.4"
},
"dependencies": {
"@supercharge/promise-pool": "^3.2.0",
"@types/showdown": "^2.0.6",
"axios": "^1.7.2",
"dotenv": "^16.4.5",
Expand Down
79 changes: 61 additions & 18 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ dotenv.config()
import { AxiosError } from 'axios'
import lineByLine from 'n-readlines'
import { exit } from 'node:process'
import { PromisePool } from '@supercharge/promise-pool'
import 'reflect-metadata'
import { Entity, entities } from './Entities'
import { handleDirectChats } from './handlers/directChats'
import { handleRoomMemberships } from './handlers/handleRoomMemberships'
import { handle as handleMessage } from './handlers/messages'
import { handle as handleMessage, RcMessage } from './handlers/messages'
import { handlePinnedMessages } from './handlers/pinnedMessages'
import { handle as handleRoom } from './handlers/rooms'
import { handle as handleUser } from './handlers/users'
Expand All @@ -17,32 +18,74 @@ import { whoami } from './helpers/synapse'

log.info('rocketchat2matrix starts.')

/**
* Reads a file line by line, parses them to JSON and yields the JSON object
* This is needed because lineByLine isn't implemented as an iterator
* @param path The path of the file
*/
async function* jsonIterator(path: string) {
const rl = new lineByLine(path)
let line: false | Buffer
while ((line = rl.next())) {
yield JSON.parse(line.toString())
}
}

/**
* Reads a file line by line and handles the lines parsed to JSON according to the expected type
* @param entity The Entity with it's file name and type definitions
*/
async function loadRcExport(entity: Entity) {
const rl = new lineByLine(`./inputs/${entities[entity].filename}`)
const concurrency = parseInt(process.env.CONCURRENCY_LIMIT || '50')
let messagesPerRoom: Map<string, RcMessage[]> = new Map()
const messageBatchSize = parseInt(process.env.MESSAGE_BATCH_SIZE || '1000000')
const jsonItems = jsonIterator(`./inputs/${entities[entity].filename}`)

let line: false | Buffer
while ((line = rl.next())) {
const item = JSON.parse(line.toString())
switch (entity) {
case Entity.Users:
await handleUser(item)
break
switch (entity) {
case Entity.Users:
await PromisePool.withConcurrency(concurrency)
.for(jsonItems)
.process((item) => handleUser(item))
break

case Entity.Rooms:
await handleRoom(item)
break
case Entity.Rooms:
await PromisePool.withConcurrency(concurrency)
.for(jsonItems)
.process((item) => handleRoom(item))
break

case Entity.Messages:
await handleMessage(item)
break
case Entity.Messages:
let i = 0
for await (const item of jsonItems) {
if (messagesPerRoom.has(item.rid)) {
messagesPerRoom.get(item.rid)?.push(item)
} else {
messagesPerRoom.set(item.rid, [item])
}
if (i % messageBatchSize === 0) {
await PromisePool.withConcurrency(concurrency)
.for(messagesPerRoom.values())
.process(async (room) => {
for (const item of room) {
await handleMessage(item)
}
})
messagesPerRoom = new Map()
}
i++
}
// handle messages again for the last (incomplete) batch
await PromisePool.withConcurrency(concurrency)
.for(messagesPerRoom.values())
.process(async (room) => {
for (const item of room) {
await handleMessage(item)
}
})
break

default:
throw new Error(`Unhandled Entity: ${entity}`)
}
default:
throw new Error(`Unhandled Entity: ${entity}`)
}
}

Expand Down