Skip to content

Commit afb63da

Browse files
committed
changes to analytics - nullable slot/unit, DateHour struct, separate analytics struct for update/get, test changes
1 parent 456acc0 commit afb63da

File tree

6 files changed

+96
-115
lines changed

6 files changed

+96
-115
lines changed

primitives/src/analytics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ pub struct AnalyticsQuery {
8080
pub segment_by_channel: Option<String>,
8181
}
8282

83-
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Display)]
83+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Display, Hash, Eq)]
8484
#[serde(untagged, into = "String", from = "String")]
8585
pub enum OperatingSystem {
8686
Linux,

primitives/src/campaign.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ mod campaign_id {
2929
use thiserror::Error;
3030
use uuid::Uuid;
3131

32-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
3333
/// an Id of 16 bytes, (de)serialized as a `0x` prefixed hex
3434
/// In this implementation of the `CampaignId` the value is generated from a `Uuid::new_v4().to_simple()`
3535
pub struct CampaignId([u8; 16]);

primitives/src/sentry.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ use crate::{
55
validator::{ApproveState, Heartbeat, MessageTypes, NewState, Type as MessageType},
66
Address, Balances, BigNum, CampaignId, Channel, ChannelId, UnifiedNum, ValidatorId, IPFS,
77
};
8-
use chrono::{DateTime, Utc};
8+
use bytes::BytesMut;
9+
use chrono::{Date, DateTime, Datelike, TimeZone, Timelike, Utc};
10+
use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type};
911
use serde::{Deserialize, Serialize};
10-
use std::{collections::HashMap, fmt, hash::Hash};
12+
use std::{collections::HashMap, error::Error, fmt, hash::Hash};
1113

1214
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1315
#[serde(rename_all = "camelCase")]
@@ -137,10 +139,10 @@ pub enum Event {
137139
},
138140
}
139141

140-
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
142+
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash, Eq)]
141143
#[serde(rename_all = "camelCase")]
142144
pub struct UpdateAnalytics {
143-
pub time: DateTime<Utc>,
145+
pub time: DateHour,
144146
pub campaign_id: CampaignId,
145147
pub ad_unit: Option<IPFS>,
146148
pub ad_slot: Option<IPFS>,
@@ -158,7 +160,7 @@ pub struct UpdateAnalytics {
158160
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
159161
#[serde(rename_all = "camelCase")]
160162
pub struct Analytics {
161-
pub time: DateTime<Utc>,
163+
pub time: DateHour,
162164
pub campaign_id: CampaignId,
163165
pub ad_unit: Option<IPFS>,
164166
pub ad_slot: Option<IPFS>,
@@ -173,29 +175,34 @@ pub struct Analytics {
173175
pub payout_count: i32,
174176
}
175177

178+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Hash, Eq)]
176179
pub struct DateHour {
177-
date: Date,
178-
hour: u64,
180+
pub date: DateTime<Utc>,
181+
pub hour: u32,
179182
}
180183

181-
impl <'a> FromSql<'a> for DateHour {
184+
impl<'a> FromSql<'a> for DateHour {
182185
fn from_sql(ty: &Type, raw: &'a [u8]) -> Result<Self, Box<dyn Error + Sync + Send>> {
183-
let str_slice = <&str as FromSql>::from_sql(ty, raw)?;
184-
185-
Ok(json.0)
186+
let date_time = <DateTime<Utc> as FromSql>::from_sql(ty, raw)?;
187+
assert_eq!(date_time.time().minute(), 0);
188+
assert_eq!(date_time.time().second(), 0);
189+
assert_eq!(date_time.time().nanosecond(), 0);
190+
Ok(Self {
191+
date: date_time,
192+
hour: date_time.hour(),
193+
})
186194
}
195+
accepts!(TIMESTAMPTZ);
187196
}
188197

189198
impl ToSql for DateHour {
190-
fn to_sql(
191-
&self,
192-
ty: &Type,
193-
w: &mut BytesMut,
194-
) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
195-
self.to_string().to_sql(ty, w)
199+
fn to_sql(&self, ty: &Type, w: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
200+
Utc.ymd(self.date.year(), self.date.month(), self.date.day())
201+
.and_hms(self.hour, 0, 0)
202+
.to_sql(ty, w)
196203
}
197204

198-
accepts!(TEXT, VARCHAR);
205+
accepts!(TIMESTAMPTZ);
199206
to_sql_checked!();
200207
}
201208

@@ -665,7 +672,7 @@ pub mod campaign_create {
665672

666673
#[cfg(feature = "postgres")]
667674
mod postgres {
668-
use super::{Analytics, UpdateAnalytics, MessageResponse, ValidatorMessage};
675+
use super::{Analytics, MessageResponse, ValidatorMessage};
669676
use crate::{
670677
sentry::EventAggregate,
671678
validator::{messages::Type as MessageType, MessageTypes},

primitives/src/unified_num.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use std::{
5959
FromStr,
6060
Serialize,
6161
Deserialize,
62+
Hash,
6263
)]
6364
#[serde(into = "String", try_from = "String")]
6465
pub struct UnifiedNum(u64);

sentry/src/analytics.rs

Lines changed: 45 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ use crate::{
55
use chrono::{Timelike, Utc};
66
use primitives::{
77
analytics::OperatingSystem,
8-
sentry::{Event, Analytics, UpdateAnalytics},
8+
sentry::{DateHour, Event, UpdateAnalytics},
99
Address, Campaign, UnifiedNum,
1010
};
11+
use std::collections::HashSet;
1112

1213
/// Validator fees will not be included in analytics
1314
pub async fn record(
@@ -16,14 +17,22 @@ pub async fn record(
1617
session: &Session,
1718
events_with_payouts: Vec<(Event, Address, UnifiedNum)>,
1819
) -> Result<(), PoolError> {
19-
let os_name = session.os.as_ref().map(|os| OperatingSystem::map_os(os)).unwrap_or_default();
20+
let os_name = session
21+
.os
22+
.as_ref()
23+
.map(|os| OperatingSystem::map_os(os))
24+
.unwrap_or_default();
2025
let time = {
2126
let full_utc = Utc::now();
2227

2328
// leave only the Hour portion and erase the minutes & seconds
24-
full_utc.date().and_hms(full_utc.hour(), 0, 0)
29+
DateHour {
30+
date: full_utc.date().and_hms(0, 0, 0), // TODO: Fix
31+
hour: full_utc.hour(),
32+
}
2533
};
2634

35+
let mut analytics_set: HashSet<UpdateAnalytics> = HashSet::new();
2736
for (event, _payout_addr, payout_amount) in events_with_payouts {
2837
let event_type = event.to_string();
2938
let (publisher, ad_unit, referrer, ad_slot, ad_slot_type) = {
@@ -62,7 +71,7 @@ pub async fn record(
6271
};
6372

6473
// DB: Insert or Update all events
65-
let event_for_db = UpdateAnalytics {
74+
let mut analytics = UpdateAnalytics {
6675
campaign_id: campaign.id,
6776
time,
6877
ad_unit,
@@ -77,18 +86,28 @@ pub async fn record(
7786
amount_to_add: payout_amount,
7887
count_to_add: 1,
7988
};
80-
81-
insert_analytics(pool, &event_for_db).await?;
89+
// TODO: tidy up this operation
90+
match analytics_set.get(&analytics) {
91+
Some(a) => {
92+
analytics.amount_to_add += &a.amount_to_add;
93+
analytics.count_to_add = a.count_to_add + 1;
94+
let _ = &analytics_set.replace(a.to_owned());
95+
}
96+
None => {
97+
let _ = &analytics_set.insert(analytics);
98+
}
99+
}
100+
}
101+
for a in analytics_set.iter() {
102+
insert_analytics(pool, a).await?;
82103
}
83-
84104
Ok(())
85105
}
86106

87107
#[cfg(test)]
88108
mod test {
89109
use super::*;
90110
use primitives::{
91-
analytics::OperatingSystem,
92111
util::tests::prep_db::{ADDRESSES, DUMMY_CAMPAIGN},
93112
UnifiedNum,
94113
};
@@ -146,53 +165,23 @@ mod test {
146165
.await
147166
.expect("should record");
148167

149-
let query_click_event = Analytics {
150-
time: Utc::now(),
151-
campaign_id: DUMMY_CAMPAIGN.id,
152-
ad_unit: None,
153-
ad_slot: None,
154-
ad_slot_type: None,
155-
advertiser: campaign.creator,
156-
publisher: ADDRESSES["leader"],
157-
hostname: None,
158-
country: None,
159-
os_name: OperatingSystem::Other,
160-
event_type: "CLICK".to_string(),
161-
payout_amount: Default::default(),
162-
payout_count: 1,
163-
};
164-
165-
let query_impression_event = Analytics {
166-
time: Utc::now(),
167-
campaign_id: DUMMY_CAMPAIGN.id,
168-
ad_unit: None,
169-
ad_slot: None,
170-
ad_slot_type: None,
171-
advertiser: campaign.creator,
172-
publisher: ADDRESSES["leader"],
173-
hostname: None,
174-
country: None,
175-
os_name: OperatingSystem::Other,
176-
event_type: "IMPRESSION".to_string(),
177-
payout_amount: Default::default(),
178-
payout_count: 1,
179-
};
180-
181-
let click_analytics = find_analytics(&database.pool, &query_click_event)
168+
let analytics = find_analytics(&database.pool)
182169
.await
183170
.expect("should find analytics");
184-
let impression_analytics =
185-
find_analytics(&database.pool, &query_impression_event)
186-
.await
187-
.expect("should find analytics");
188-
assert_eq!(click_analytics.event_type, "CLICK".to_string());
171+
let click_analytics = analytics
172+
.iter()
173+
.find(|a| a.event_type == "CLICK")
174+
.expect("There should be a click event");
175+
let impression_analytics = analytics
176+
.iter()
177+
.find(|a| a.event_type == "IMPRESSION")
178+
.expect("There should be an impression event");
189179
assert_eq!(
190180
click_analytics.payout_amount,
191181
UnifiedNum::from_u64(1_000_000)
192182
);
193183
assert_eq!(click_analytics.payout_count, 1);
194184

195-
assert_eq!(impression_analytics.event_type, "IMPRESSION".to_string());
196185
assert_eq!(
197186
impression_analytics.payout_amount,
198187
UnifiedNum::from_u64(1_000_000)
@@ -203,13 +192,17 @@ mod test {
203192
.await
204193
.expect("should record");
205194

206-
let click_analytics = find_analytics(&database.pool, &query_click_event)
195+
let analytics = find_analytics(&database.pool)
207196
.await
208197
.expect("should find analytics");
209-
let impression_analytics =
210-
find_analytics(&database.pool, &query_impression_event)
211-
.await
212-
.expect("should find analytics");
198+
let click_analytics = analytics
199+
.iter()
200+
.find(|a| a.event_type == "CLICK")
201+
.expect("There should be a click event");
202+
let impression_analytics = analytics
203+
.iter()
204+
.find(|a| a.event_type == "IMPRESSION")
205+
.expect("There should be an impression event");
213206
assert_eq!(
214207
click_analytics.payout_amount,
215208
UnifiedNum::from_u64(2_000_000)

sentry/src/db/analytics.rs

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use crate::{epoch, Auth};
22
use chrono::Utc;
33
use primitives::{
44
analytics::{AnalyticsData, AnalyticsQuery, ANALYTICS_QUERY_LIMIT},
5-
sentry::{AdvancedAnalyticsResponse, ChannelReport, UpdateAnalytics, Analytics, PublisherReport},
5+
sentry::{
6+
AdvancedAnalyticsResponse, Analytics, ChannelReport, PublisherReport, UpdateAnalytics,
7+
},
68
ChannelId, ValidatorId,
79
};
810
use redis::{aio::MultiplexedConnection, cmd};
@@ -274,45 +276,16 @@ pub async fn insert_analytics(
274276
Ok(event_analytics)
275277
}
276278

277-
pub async fn find_analytics(
278-
pool: &DbPool,
279-
event: &Analytics,
280-
) -> Result<Analytics, PoolError> {
279+
// Currently used for testing only
280+
pub async fn find_analytics(pool: &DbPool) -> Result<Vec<Analytics>, PoolError> {
281281
let client = pool.get().await?;
282282

283-
let query = "SELECT campaign_id, time, ad_unit, ad_slot, ad_slot_type, advertiser, publisher, hostname, country, os, event_type, payout_amount, payout_count
284-
FROM analytics WHERE campaign_id = $1 AND time = date_trunc('hour', cast($2 as timestamp with time zone)) AND ad_unit = $3 AND ad_slot = $4 AND ad_slot_type = $5 AND advertiser = $6 AND publisher = $7 AND hostname = $8 AND country = $9 AND os = $10 AND event_type = $11";
285-
283+
let query = "SELECT * FROM analytics";
286284
let stmt = client.prepare(query).await?;
287-
let ad_unit = match event.ad_unit {
288-
Some(ipfs) => ipfs.to_string(),
289-
None => "".to_string(),
290-
};
291285

292-
let ad_slot = match event.ad_slot {
293-
Some(ipfs) => ipfs.to_string(),
294-
None => "".to_string(),
295-
};
296-
let row = client
297-
.query_one(
298-
&stmt,
299-
&[
300-
&event.campaign_id,
301-
&event.time,
302-
&ad_unit,
303-
&ad_slot,
304-
&event.ad_slot_type.as_ref().unwrap_or(&"".to_string()),
305-
&event.advertiser,
306-
&event.publisher,
307-
&event.hostname.as_ref().unwrap_or(&"".to_string()),
308-
&event.country.as_ref().unwrap_or(&"".to_string()),
309-
&event.os_name.to_string(),
310-
&event.event_type,
311-
],
312-
)
313-
.await?;
286+
let rows = client.query(&stmt, &[]).await?;
314287

315-
let event_analytics = Analytics::from(&row);
288+
let event_analytics: Vec<Analytics> = rows.iter().map(Analytics::from).collect();
316289
Ok(event_analytics)
317290
}
318291

@@ -321,6 +294,7 @@ mod test {
321294
use super::*;
322295
use primitives::{
323296
analytics::OperatingSystem,
297+
sentry::DateHour,
324298
util::tests::prep_db::{ADDRESSES, DUMMY_CAMPAIGN},
325299
UnifiedNum, IPFS,
326300
};
@@ -338,7 +312,10 @@ mod test {
338312
.expect("Migrations should succeed");
339313
{
340314
let analytics = UpdateAnalytics {
341-
time: Utc.ymd(2021, 2, 1).and_hms(7, 0, 0),
315+
time: DateHour {
316+
date: Utc.ymd(2021, 2, 1).and_hms(1, 0, 0),
317+
hour: 1,
318+
},
342319
campaign_id: DUMMY_CAMPAIGN.id,
343320
ad_unit: Some(
344321
IPFS::try_from("Qmasg8FrbuSQpjFu3kRnZF9beg8rEBFrqgi1uXDRwCbX5f")
@@ -359,13 +336,13 @@ mod test {
359336
count_to_add: 1,
360337
};
361338

362-
let insert_res =
363-
insert_analytics(&database.clone(), &analytics.clone())
364-
.await
365-
.expect("Should insert");
339+
let insert_res = insert_analytics(&database.clone(), &analytics.clone())
340+
.await
341+
.expect("Should insert");
366342

367343
assert_eq!(insert_res.campaign_id, analytics.campaign_id);
368-
assert_eq!(insert_res.time.date(), analytics.time.date());
344+
assert_eq!(insert_res.time.date, analytics.time.date);
345+
assert_eq!(insert_res.time.hour, analytics.time.hour);
369346
assert_eq!(insert_res.ad_unit, analytics.ad_unit);
370347
assert_eq!(insert_res.ad_slot, analytics.ad_slot);
371348
assert_eq!(insert_res.ad_slot_type, analytics.ad_slot_type);
@@ -386,7 +363,10 @@ mod test {
386363
}
387364
{
388365
let analytics_with_empty_fields = UpdateAnalytics {
389-
time: Utc.ymd(2021, 2, 1).and_hms(7, 0, 0),
366+
time: DateHour {
367+
date: Utc.ymd(2021, 2, 1).and_hms(1, 0, 0),
368+
hour: 1,
369+
},
390370
campaign_id: DUMMY_CAMPAIGN.id,
391371
ad_unit: None,
392372
ad_slot: None,

0 commit comments

Comments
 (0)