Skip to content

Commit

Permalink
refactor(group-analytics): Add project field to group type (#25600)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
Twixes and github-actions[bot] authored Oct 31, 2024
1 parent 968aede commit 12ff477
Show file tree
Hide file tree
Showing 21 changed files with 207 additions and 89 deletions.
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0504_add_dead_clicks_setting
posthog: 0505_grouptypemapping_project
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
10 changes: 5 additions & 5 deletions plugin-server/functional_tests/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,14 @@ export const fetchGroups = async (teamId: number) => {
return queryResult.data.map((group) => ({ ...group, group_properties: JSON.parse(group.group_properties) }))
}

export const createGroupType = async (teamId: number, index: number, groupType: string) => {
export const createGroupType = async (teamId: number, projectId: number, index: number, groupType: string) => {
await postgres.query(
PostgresUse.COMMON_WRITE,
`
INSERT INTO posthog_grouptypemapping (team_id, group_type, group_type_index)
VALUES ($1, $2, $3)
INSERT INTO posthog_grouptypemapping (team_id, project_id, group_type, group_type_index)
VALUES ($1, $2, $3, $4)
`,
[teamId, groupType, index],
[teamId, projectId, groupType, index],
'insertGroupType'
)
}
Expand Down Expand Up @@ -455,7 +455,7 @@ export const createOrganizationRaw = async (organizationProperties = {}) => {

await postgres.query(
PostgresUse.COMMON_WRITE,
`INSERT into posthog_organization
`INSERT into posthog_organization
(${keys})
VALUES (${values})
`,
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/functional_tests/webhooks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ test.concurrent(`webhooks: fires slack webhook`, async () => {
})
const teamId = await createTeam(organizationId, `http://localhost:${server.address()?.port}`)
const user = await createUser(teamId, new UUIDT().toString())
await createGroupType(teamId, 0, 'organization')
await createGroupType(teamId, teamId, 0, 'organization')
await createGroup(teamId, 0, 'TestWebhookOrg', { name: 'test-webhooks' })
const action = await createAction({
team_id: teamId,
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ export interface RawOrganization {
/** Usable Team model. */
export interface Team {
id: number
project_id: number
uuid: string
organization_id: string
name: string
Expand Down
20 changes: 12 additions & 8 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1342,14 +1342,18 @@ export class DB {
}

public async getTeamsInOrganizationsWithRootPluginAccess(): Promise<Team[]> {
return (
await this.postgres.query(
PostgresUse.COMMON_READ,
'SELECT * from posthog_team WHERE organization_id = (SELECT id from posthog_organization WHERE plugins_access_level = $1)',
[OrganizationPluginsAccessLevel.ROOT],
'getTeamsInOrganizationsWithRootPluginAccess'
)
).rows as Team[]
const selectResult = await this.postgres.query<Team>(
PostgresUse.COMMON_READ,
'SELECT * from posthog_team WHERE organization_id = (SELECT id from posthog_organization WHERE plugins_access_level = $1)',
[OrganizationPluginsAccessLevel.ROOT],
'getTeamsInOrganizationsWithRootPluginAccess'
)
for (const row of selectResult.rows) {
// pg returns int8 as a string, since it can be larger than JS's max safe integer,
// but this is not a problem for project_id, which is a long long way from that limit.
row.project_id = parseInt(row.project_id as unknown as string)
}
return selectResult.rows
}

public async addOrUpdatePublicJob(
Expand Down
20 changes: 13 additions & 7 deletions plugin-server/src/worker/ingestion/group-type-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,19 @@ export class GroupTypeManager {
}
}

public async fetchGroupTypeIndex(teamId: TeamId, groupType: string): Promise<GroupTypeIndex | null> {
public async fetchGroupTypeIndex(
teamId: TeamId,
projectId: TeamId,
groupType: string
): Promise<GroupTypeIndex | null> {
const groupTypes = await this.fetchGroupTypes(teamId)

if (groupType in groupTypes) {
return groupTypes[groupType]
} else {
const [groupTypeIndex, isInsert] = await this.insertGroupType(
teamId,
projectId,
groupType,
Object.keys(groupTypes).length
)
Expand All @@ -70,6 +75,7 @@ export class GroupTypeManager {

public async insertGroupType(
teamId: TeamId,
projectId: TeamId,
groupType: string,
index: number
): Promise<[GroupTypeIndex | null, boolean]> {
Expand All @@ -81,21 +87,21 @@ export class GroupTypeManager {
PostgresUse.COMMON_WRITE,
`
WITH insert_result AS (
INSERT INTO posthog_grouptypemapping (team_id, group_type, group_type_index)
VALUES ($1, $2, $3)
INSERT INTO posthog_grouptypemapping (team_id, project_id, group_type, group_type_index)
VALUES ($1, $2, $3, $4)
ON CONFLICT DO NOTHING
RETURNING group_type_index
)
SELECT group_type_index, 1 AS is_insert FROM insert_result
SELECT group_type_index, 1 AS is_insert FROM insert_result
UNION
SELECT group_type_index, 0 AS is_insert FROM posthog_grouptypemapping WHERE team_id = $1 AND group_type = $2;
SELECT group_type_index, 0 AS is_insert FROM posthog_grouptypemapping WHERE team_id = $1 AND group_type = $3;
`,
[teamId, groupType, index],
[teamId, projectId, groupType, index],
'insertGroupType'
)

if (insertGroupTypeResult.rows.length == 0) {
return await this.insertGroupType(teamId, groupType, index + 1)
return await this.insertGroupType(teamId, projectId, groupType, index + 1)
}

const { group_type_index, is_insert } = insertGroupTypeResult.rows[0]
Expand Down
3 changes: 2 additions & 1 deletion plugin-server/src/worker/ingestion/groups.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import { GroupTypeManager } from './group-type-manager'

export async function addGroupProperties(
teamId: TeamId,
projectId: TeamId,
properties: Properties,
groupTypeManager: GroupTypeManager
): Promise<Properties> {
for (const [groupType, groupIdentifier] of Object.entries(properties.$groups || {})) {
const columnIndex = await groupTypeManager.fetchGroupTypeIndex(teamId, groupType)
const columnIndex = await groupTypeManager.fetchGroupTypeIndex(teamId, projectId, groupType)
if (columnIndex !== null) {
// :TODO: Update event column instead
properties[`$group_${columnIndex}`] = groupIdentifier
Expand Down
20 changes: 15 additions & 5 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,12 @@ export class EventsProcessor {

if (this.pluginsServer.SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP === false) {
try {
await this.groupAndFirstEventManager.updateGroupsAndFirstEvent(team.id, event, properties)
await this.groupAndFirstEventManager.updateGroupsAndFirstEvent(
team.id,
team.project_id,
event,
properties
)
} catch (err) {
Sentry.captureException(err, { tags: { team_id: team.id } })
status.warn('⚠️', 'Failed to update property definitions for an event', {
Expand All @@ -168,10 +173,10 @@ export class EventsProcessor {

if (processPerson) {
// Adds group_0 etc values to properties
properties = await addGroupProperties(team.id, properties, this.groupTypeManager)
properties = await addGroupProperties(team.id, team.project_id, properties, this.groupTypeManager)

if (event === '$groupidentify') {
await this.upsertGroup(team.id, properties, timestamp)
await this.upsertGroup(team.id, team.project_id, properties, timestamp)
}
}

Expand Down Expand Up @@ -278,13 +283,18 @@ export class EventsProcessor {
return [rawEvent, ack]
}

private async upsertGroup(teamId: number, properties: Properties, timestamp: DateTime): Promise<void> {
private async upsertGroup(
teamId: number,
projectId: number,
properties: Properties,
timestamp: DateTime
): Promise<void> {
if (!properties['$group_type'] || !properties['$group_key']) {
return
}

const { $group_type: groupType, $group_key: groupKey, $group_set: groupPropertiesToSet } = properties
const groupTypeIndex = await this.groupTypeManager.fetchGroupTypeIndex(teamId, groupType)
const groupTypeIndex = await this.groupTypeManager.fetchGroupTypeIndex(teamId, projectId, groupType)

if (groupTypeIndex !== null) {
await upsertGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ export class GroupAndFirstEventManager {
this.groupTypeManager = groupTypeManager
}

public async updateGroupsAndFirstEvent(teamId: number, event: string, properties: Properties): Promise<void> {
public async updateGroupsAndFirstEvent(
teamId: number,
projectId: number,
event: string,
properties: Properties
): Promise<void> {
if (EVENTS_WITHOUT_EVENT_DEFINITION.includes(event)) {
return
}
Expand Down Expand Up @@ -56,7 +61,9 @@ export class GroupAndFirstEventManager {
const { $group_type: groupType, $group_set: groupPropertiesToSet } = properties
if (groupType != null && groupPropertiesToSet != null) {
// This "fetch" is side-effecty, it inserts a group-type and assigns an index if one isn't found
const groupPromise = this.groupTypeManager.fetchGroupTypeIndex(teamId, groupType).then(() => {})
const groupPromise = this.groupTypeManager
.fetchGroupTypeIndex(teamId, projectId, groupType)
.then(() => {})
promises.push(groupPromise)
}
}
Expand Down
18 changes: 16 additions & 2 deletions plugin-server/src/worker/ingestion/team-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ export async function fetchTeam(client: PostgresRouter, teamId: Team['id']): Pro
`
SELECT
id,
project_id,
uuid,
organization_id,
name,
Expand All @@ -172,7 +173,13 @@ export async function fetchTeam(client: PostgresRouter, teamId: Team['id']): Pro
[teamId],
'fetchTeam'
)
return selectResult.rows[0] ?? null
if (selectResult.rows.length === 0) {
return null
}
// pg returns int8 as a string, since it can be larger than JS's max safe integer,
// but this is not a problem for project_id, which is a long long way from that limit.
selectResult.rows[0].project_id = parseInt(selectResult.rows[0].project_id as unknown as string)
return selectResult.rows[0]
}

export async function fetchTeamByToken(client: PostgresRouter, token: string): Promise<Team | null> {
Expand All @@ -181,6 +188,7 @@ export async function fetchTeamByToken(client: PostgresRouter, token: string): P
`
SELECT
id,
project_id,
uuid,
organization_id,
name,
Expand All @@ -199,7 +207,13 @@ export async function fetchTeamByToken(client: PostgresRouter, token: string): P
[token],
'fetchTeamByToken'
)
return selectResult.rows[0] ?? null
if (selectResult.rows.length === 0) {
return null
}
// pg returns int8 as a string, since it can be larger than JS's max safe integer,
// but this is not a problem for project_id, which is a long long way from that limit.
selectResult.rows[0].project_id = parseInt(selectResult.rows[0].project_id as unknown as string)
return selectResult.rows[0]
}

export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Promise<Record<string, TeamIDWithConfig>> {
Expand Down
18 changes: 10 additions & 8 deletions plugin-server/tests/helpers/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,16 @@ export async function createUserTeamAndOrganization(
}

export async function getTeams(hub: Hub): Promise<Team[]> {
return (
await hub.db.postgres.query(
PostgresUse.COMMON_READ,
'SELECT * FROM posthog_team ORDER BY id',
undefined,
'fetchAllTeams'
)
).rows
const selectResult = await hub.db.postgres.query<Team>(
PostgresUse.COMMON_READ,
'SELECT * FROM posthog_team ORDER BY id',
undefined,
'fetchAllTeams'
)
for (const row of selectResult.rows) {
row.project_id = parseInt(row.project_id as unknown as string)
}
return selectResult.rows
}

export async function getFirstTeam(hub: Hub): Promise<Team> {
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/tests/main/db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ describe('DB', () => {
anonymize_ips: false,
api_token: 'token1',
id: teamId,
project_id: teamId,
ingested_event: true,
name: 'TEST PROJECT',
organization_id: organizationId,
Expand Down Expand Up @@ -884,6 +885,7 @@ describe('DB', () => {
anonymize_ips: false,
api_token: 'token2',
id: teamId,
project_id: teamId,
ingested_event: true,
name: 'TEST PROJECT',
organization_id: organizationId,
Expand Down
Loading

0 comments on commit 12ff477

Please sign in to comment.