Skip to content

Commit 117ba4b

Browse files
committed
fix(buffer): live counter
1 parent 2720d32 commit 117ba4b

File tree

6 files changed

+59
-27
lines changed

6 files changed

+59
-27
lines changed

apps/api/src/controllers/live.controller.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import { getSuperJson } from '@openpanel/common';
66
import type { IServiceEvent, Notification } from '@openpanel/db';
77
import {
88
TABLE_NAMES,
9+
eventBuffer,
910
getEvents,
10-
getLiveVisitors,
1111
getProfileByIdCached,
1212
transformMinimalEvent,
1313
} from '@openpanel/db';
@@ -82,20 +82,20 @@ export function wsVisitors(
8282
if (channel === 'event:received') {
8383
const event = getSuperJson<IServiceEvent>(message);
8484
if (event?.projectId === params.projectId) {
85-
getLiveVisitors(params.projectId).then((count) => {
85+
eventBuffer.getActiveVisitorCount(params.projectId).then((count) => {
8686
connection.socket.send(String(count));
8787
});
8888
}
8989
}
9090
};
9191
const pmessage = (pattern: string, channel: string, message: string) => {
92-
if (!message.startsWith('live:visitors:')) {
92+
if (!message.startsWith('live:visitor:')) {
9393
return null;
9494
}
9595

9696
const [projectId] = getLiveEventInfo(message);
9797
if (projectId && projectId === params.projectId) {
98-
getLiveVisitors(params.projectId).then((count) => {
98+
eventBuffer.getActiveVisitorCount(params.projectId).then((count) => {
9999
connection.socket.send(String(count));
100100
});
101101
}

apps/dashboard/src/components/overview/live-counter/index.tsx

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import withSuspense from '@/hocs/with-suspense';
22

3-
import { getLiveVisitors } from '@openpanel/db';
3+
import { eventBuffer } from '@openpanel/db';
44

55
import type { LiveCounterProps } from './live-counter';
66
import LiveCounter from './live-counter';
77

88
async function ServerLiveCounter(props: Omit<LiveCounterProps, 'data'>) {
9-
const count = await getLiveVisitors(props.projectId);
9+
const count = await eventBuffer.getActiveVisitorCount(props.projectId);
1010
return <LiveCounter data={count} {...props} />;
1111
}
1212

apps/dashboard/src/hooks/useWS.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export default function useWS<T>(
3939
onMessage(event) {
4040
try {
4141
const data = getSuperJson<T>(event.data);
42-
if (data) {
42+
if (data !== null && data !== undefined) {
4343
debouncedOnMessage(data);
4444
}
4545
} catch (error) {

packages/db/src/buffers/event-buffer-redis.ts

+47-2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ export class EventBuffer extends BaseBuffer {
5454
)
5555
: 1000;
5656

57+
private activeVisitorsExpiration = 60 * 5; // 5 minutes
58+
5759
private sessionEvents = ['screen_view', 'session_end'];
5860

5961
// LIST - Stores events without sessions
@@ -246,8 +248,11 @@ return "OK"
246248
}
247249

248250
if (event.profile_id) {
249-
multi.sadd(`live:visitors:${event.project_id}`, event.profile_id);
250-
multi.expire(`live:visitors:${event.project_id}`, 60 * 5); // 5 minutes
251+
this.incrementActiveVisitorCount(
252+
multi,
253+
event.project_id,
254+
event.profile_id,
255+
);
251256
}
252257

253258
if (!_multi) {
@@ -689,4 +694,44 @@ return "OK"
689694
);
690695
return count;
691696
}
697+
698+
private async incrementActiveVisitorCount(
699+
multi: ReturnType<Redis['multi']>,
700+
projectId: string,
701+
profileId: string,
702+
) {
703+
// Add/update visitor with current timestamp as score
704+
const now = Date.now();
705+
const zsetKey = `live:visitors:${projectId}`;
706+
return (
707+
multi
708+
// To keep the count
709+
.zadd(zsetKey, now, profileId)
710+
// To trigger the expiration listener
711+
.set(
712+
`live:visitor:${projectId}:${profileId}`,
713+
'1',
714+
'EX',
715+
this.activeVisitorsExpiration,
716+
)
717+
);
718+
}
719+
720+
public async getActiveVisitorCount(projectId: string): Promise<number> {
721+
const redis = getRedisCache();
722+
const zsetKey = `live:visitors:${projectId}`;
723+
const cutoff = Date.now() - this.activeVisitorsExpiration * 1000;
724+
725+
const multi = redis.multi();
726+
multi
727+
.zremrangebyscore(zsetKey, '-inf', cutoff)
728+
.zcount(zsetKey, cutoff, '+inf');
729+
730+
const [, count] = (await multi.exec()) as [
731+
[Error | null, any],
732+
[Error | null, number],
733+
];
734+
735+
return count[1] || 0;
736+
}
692737
}

packages/db/src/buffers/index.ts

+3-12
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,7 @@
1-
import { BotBuffer as BotBufferPsql } from './bot-buffer-psql';
21
import { BotBuffer as BotBufferRedis } from './bot-buffer-redis';
3-
import { EventBuffer as EventBufferPsql } from './event-buffer-psql';
42
import { EventBuffer as EventBufferRedis } from './event-buffer-redis';
5-
import { ProfileBuffer as ProfileBufferPsql } from './profile-buffer-psql';
63
import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer-redis';
74

8-
export const eventBuffer = process.env.USE_NEW_BUFFER
9-
? new EventBufferRedis()
10-
: new EventBufferPsql();
11-
export const profileBuffer = process.env.USE_NEW_BUFFER
12-
? new ProfileBufferRedis()
13-
: new ProfileBufferPsql();
14-
export const botBuffer = process.env.USE_NEW_BUFFER
15-
? new BotBufferRedis()
16-
: new BotBufferPsql();
5+
export const eventBuffer = new EventBufferRedis();
6+
export const profileBuffer = new ProfileBufferRedis();
7+
export const botBuffer = new BotBufferRedis();

packages/db/src/services/event.service.ts

+2-6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import { mergeDeepRight, omit, uniq } from 'ramda';
1+
import { mergeDeepRight, uniq } from 'ramda';
22
import { escape } from 'sqlstring';
33
import { v4 as uuid } from 'uuid';
44

55
import { toDots } from '@openpanel/common';
6-
import { cacheable, getRedisCache } from '@openpanel/redis';
6+
import { cacheable } from '@openpanel/redis';
77
import type { IChartEventFilter } from '@openpanel/validation';
88

99
import { botBuffer, eventBuffer } from '../buffers';
@@ -234,10 +234,6 @@ export function transformMinimalEvent(
234234
};
235235
}
236236

237-
export async function getLiveVisitors(projectId: string) {
238-
return getRedisCache().scard(`live:visitors:${projectId}`);
239-
}
240-
241237
export async function getEvents(
242238
sql: string,
243239
options: GetEventsOptions = {},

0 commit comments

Comments
 (0)