Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Heatmaps ingestion #21629

Merged
merged 3 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { LogLevel, PluginLogLevel, PluginsServerConfig, stringToPluginServerMode
import { isDevEnv, isTestEnv, stringToBoolean } from '../utils/env-utils'
import { KAFKAJS_LOG_LEVEL_MAPPING } from './constants'
import {
KAFKA_CLICKHOUSE_HEATMAP_EVENTS,
KAFKA_EVENTS_JSON,
KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
Expand Down Expand Up @@ -104,6 +105,7 @@ export function getDefaultConfig(): PluginsServerConfig {
KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY: 1,
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: '',
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: KAFKA_EVENTS_JSON,
CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: KAFKA_CLICKHOUSE_HEATMAP_EVENTS,
CONVERSION_BUFFER_ENABLED: false,
CONVERSION_BUFFER_ENABLED_TEAMS: '',
CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: '',
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/config/kafka-topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_se
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
// write performance events to ClickHouse
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`
// write heatmap events to ClickHouse
export const KAFKA_CLICKHOUSE_HEATMAP_EVENTS = `${prefix}clickhouse_heatmap_events${suffix}`

// log entries for ingestion into clickhouse
export const KAFKA_LOG_ENTRIES = `${prefix}log_entries${suffix}`
24 changes: 24 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export interface PluginsServerConfig {
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS: boolean // whether to disallow external schemas like protobuf for clickhouse kafka engine
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: string // (advanced) a comma separated list of teams to disable clickhouse external schemas for
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string // (advanced) topic to send events to for clickhouse ingestion
CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: string // (advanced) topic to send heatmap data to for clickhouse ingestion
REDIS_URL: string
POSTHOG_REDIS_PASSWORD: string
POSTHOG_REDIS_HOST: string
Expand Down Expand Up @@ -1099,3 +1100,26 @@ export type RRWebEvent = Record<string, any> & {
export interface ValueMatcher<T> {
(value: T): boolean
}

export type RawClickhouseHeatmapEvent = {
/**
* session id lets us offer example recordings on high traffic parts of the page,
* and could let us offer more advanced filtering of heatmap data
* we will break the relationship between particular sessions and clicks in aggregating this data
* it should always be treated as an exemplar and not as concrete values
*/
session_id: string
distinct_id: string
viewport_width: number
viewport_height: number
pointer_target_fixed: boolean
current_url: string
// x is the x with resolution applied, the resolution converts high fidelity mouse positions into an NxN grid
x: number
// y is the y with resolution applied, the resolution converts high fidelity mouse positions into an NxN grid
y: number
scale_factor: 16 // in the future we may support other values
timestamp: string
type: string
team_id: number
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { URL } from 'url'

import { PreIngestionEvent, RawClickhouseHeatmapEvent, TimestampFormat } from '../../../types'
import { castTimestampOrNow } from '../../../utils/utils'
import { captureIngestionWarning } from '../utils'
import { EventPipelineRunner } from './runner'

// This represents the scale factor for the heatmap data. Essentially how much we are reducing the resolution by.
const SCALE_FACTOR = 16

type HeatmapDataItem = {
x: number
y: number
target_fixed: boolean
type: string
}

type HeatmapData = Record<string, HeatmapDataItem[]>

export function extractHeatmapDataStep(
runner: EventPipelineRunner,
event: PreIngestionEvent
): Promise<[PreIngestionEvent, Promise<void>[]]> {
const { eventUuid, teamId } = event

let acks: Promise<void>[] = []

try {
const heatmapEvents = extractScrollDepthHeatmapData(event) ?? []

// eslint-disable-next-line @typescript-eslint/no-floating-promises
acks = heatmapEvents.map((rawEvent) => {
return runner.hub.kafkaProducer.produce({
topic: runner.hub.CLICKHOUSE_HEATMAPS_KAFKA_TOPIC,
key: eventUuid,
value: Buffer.from(JSON.stringify(rawEvent)),
waitForAck: true,
})
})
} catch (e) {
acks.push(
captureIngestionWarning(runner.hub.kafkaProducer, teamId, 'invalid_heatmap_data', {
eventUuid,
})
)
}

// We don't want to ingest this data to the events table
delete event.properties['$heatmap_data']

return Promise.resolve([event, acks])
}

function replacePathInUrl(url: string, newPath: string): string {
const parsedUrl = new URL(url)
parsedUrl.pathname = newPath
return parsedUrl.toString()
}

function extractScrollDepthHeatmapData(event: PreIngestionEvent): RawClickhouseHeatmapEvent[] {
const { teamId, timestamp, properties } = event
const {
$viewport_height,
$viewport_width,
$session_id,
distinct_id,
$prev_pageview_pathname,
$prev_pageview_max_scroll,
$current_url,
$heatmap_data,
} = properties || {}

let heatmapData = $heatmap_data as HeatmapData | null

if ($prev_pageview_pathname && $current_url) {
// We are going to add the scroll depth info derived from the previous pageview to the current pageview's heatmap data
if (!heatmapData) {
heatmapData = {}
}

const previousUrl = replacePathInUrl($current_url, $prev_pageview_pathname)
heatmapData[previousUrl] = heatmapData[previousUrl] || []
heatmapData[previousUrl].push({
x: 0,
y: $prev_pageview_max_scroll,
target_fixed: false,
type: 'scrolldepth',
})
}

let heatmapEvents: RawClickhouseHeatmapEvent[] = []

if (!heatmapData) {
return []
}

Object.entries(heatmapData).forEach(([url, items]) => {
if (Array.isArray(items)) {
heatmapEvents = heatmapEvents.concat(
(items as any[]).map(
(hme: {
x: number
y: number
target_fixed: boolean
type: string
}): RawClickhouseHeatmapEvent => ({
type: hme.type,
x: Math.round(hme.x / SCALE_FACTOR),
y: Math.round(hme.y / SCALE_FACTOR),
pointer_target_fixed: hme.target_fixed,
viewport_height: Math.round($viewport_height / SCALE_FACTOR),
viewport_width: Math.round($viewport_width / SCALE_FACTOR),
current_url: url,
session_id: $session_id,
scale_factor: SCALE_FACTOR,
timestamp: castTimestampOrNow(timestamp ?? null, TimestampFormat.ClickHouse),
team_id: teamId,
distinct_id: distinct_id,
})
)
)
}
})

return heatmapEvents
}
13 changes: 12 additions & 1 deletion plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { normalizeProcessPerson } from '../../../utils/event'
import { status } from '../../../utils/status'
import { captureIngestionWarning, generateEventDeadLetterQueueMessage } from '../utils'
import { createEventStep } from './createEventStep'
import { extractHeatmapDataStep } from './extractHeatmapDataStep'
import {
eventProcessedAndIngestedCounter,
pipelineLastStepCounter,
Expand Down Expand Up @@ -216,9 +217,19 @@ export class EventPipelineRunner {
event.team_id
)

const [preparedEventWithoutHeatmaps, heatmapKafkaAcks] = await this.runStep(
extractHeatmapDataStep,
[this, preparedEvent],
event.team_id
)

if (heatmapKafkaAcks.length > 0) {
kafkaAcks.push(...heatmapKafkaAcks)
}

const [rawClickhouseEvent, eventAck] = await this.runStep(
createEventStep,
[this, preparedEvent, person, processPerson],
[this, preparedEventWithoutHeatmaps, person, processPerson],
event.team_id
)

Expand Down
Loading
Loading