Skip to content

Commit 5268fac

Browse files
committed
analytics improvements:
- primitives - sentry - DateHour - sentry - analytics - tests improvements - sentry - db - analytics - query & test improvements
1 parent 966af1a commit 5268fac

File tree

4 files changed

+348
-234
lines changed

4 files changed

+348
-234
lines changed

primitives/src/ipfs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ impl Url {
9595
mod postgres {
9696
use super::IPFS;
9797
use bytes::BytesMut;
98-
use tokio_postgres::types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type};
9998
use std::error::Error;
99+
use tokio_postgres::types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type};
100100

101101
impl ToSql for IPFS {
102102
fn to_sql(

primitives/src/sentry.rs

Lines changed: 193 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@ use crate::{
55
validator::{ApproveState, Heartbeat, MessageTypes, NewState, Type as MessageType},
66
Address, Balances, BigNum, CampaignId, Channel, ChannelId, UnifiedNum, ValidatorId, IPFS,
77
};
8-
use bytes::BytesMut;
9-
use chrono::{Date, DateTime, Datelike, TimeZone, Timelike, Utc};
10-
use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type};
11-
use serde::{Deserialize, Serialize};
12-
use std::{collections::HashMap, error::Error, fmt, hash::Hash};
8+
use chrono::{Date, DateTime, NaiveDate, TimeZone, Timelike, Utc};
9+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
10+
use std::{collections::HashMap, fmt, hash::Hash};
11+
use thiserror::Error;
1312

1413
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1514
#[serde(rename_all = "camelCase")]
@@ -120,7 +119,7 @@ pub mod message {
120119
}
121120
}
122121

123-
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
122+
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
124123
#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
125124
pub enum Event {
126125
#[serde(rename_all = "camelCase")]
@@ -139,10 +138,39 @@ pub enum Event {
139138
},
140139
}
141140

142-
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash, Eq)]
141+
impl Event {
142+
pub fn is_click_event(&self) -> bool {
143+
matches!(self, Event::Click { .. })
144+
}
145+
146+
pub fn is_impression_event(&self) -> bool {
147+
matches!(self, Event::Impression { .. })
148+
}
149+
150+
pub fn as_str(&self) -> &str {
151+
self.as_ref()
152+
}
153+
}
154+
155+
impl AsRef<str> for Event {
156+
fn as_ref(&self) -> &str {
157+
match *self {
158+
Event::Impression { .. } => "IMPRESSION",
159+
Event::Click { .. } => "CLICK",
160+
}
161+
}
162+
}
163+
164+
impl fmt::Display for Event {
165+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166+
f.write_str(self.as_ref())
167+
}
168+
}
169+
170+
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
143171
#[serde(rename_all = "camelCase")]
144172
pub struct UpdateAnalytics {
145-
pub time: DateHour,
173+
pub time: DateHour<Utc>,
146174
pub campaign_id: CampaignId,
147175
pub ad_unit: Option<IPFS>,
148176
pub ad_slot: Option<IPFS>,
@@ -160,7 +188,7 @@ pub struct UpdateAnalytics {
160188
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
161189
#[serde(rename_all = "camelCase")]
162190
pub struct Analytics {
163-
pub time: DateHour,
191+
pub time: DateHour<Utc>,
164192
pub campaign_id: CampaignId,
165193
pub ad_unit: Option<IPFS>,
166194
pub ad_slot: Option<IPFS>,
@@ -172,66 +200,117 @@ pub struct Analytics {
172200
pub os_name: OperatingSystem,
173201
pub event_type: String,
174202
pub payout_amount: UnifiedNum,
175-
pub payout_count: i32,
176-
}
177-
178-
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Hash, Eq)]
179-
pub struct DateHour {
180-
pub date: DateTime<Utc>,
203+
pub payout_count: u32,
204+
}
205+
206+
#[derive(Debug, Error, PartialEq, Eq)]
207+
#[error("Minutes ({minutes}), seconds ({seconds}) & nanoseconds ({nanoseconds}) should all be set to 0 (zero)")]
208+
pub struct DateHourError {
209+
pub minutes: u32,
210+
pub seconds: u32,
211+
pub nanoseconds: u32,
212+
}
213+
214+
#[derive(Clone, PartialEq, Eq, Hash)]
215+
/// [`DateHour`] holds the date and hour (only).
216+
/// It uses [`chrono::DateTime`] when serializing and deserializing.
217+
/// When serializing it always sets minutes and seconds to `0` (zero).
218+
/// When deserializing the minutes and seconds should always be set to `0` (zero),
219+
/// otherwise an error will be returned.
220+
pub struct DateHour<Tz: TimeZone> {
221+
pub date: Date<Tz>,
222+
/// hour is in the range of `0 - 23`
181223
pub hour: u32,
182224
}
183225

184-
impl<'a> FromSql<'a> for DateHour {
185-
fn from_sql(ty: &Type, raw: &'a [u8]) -> Result<Self, Box<dyn Error + Sync + Send>> {
186-
let date_time = <DateTime<Utc> as FromSql>::from_sql(ty, raw)?;
187-
assert_eq!(date_time.time().minute(), 0);
188-
assert_eq!(date_time.time().second(), 0);
189-
assert_eq!(date_time.time().nanosecond(), 0);
190-
Ok(Self {
191-
date: date_time,
192-
hour: date_time.hour(),
193-
})
226+
impl DateHour<Utc> {
227+
/// # Panics
228+
///
229+
/// When wrong inputs have been passed, i.e. for year, month, day or hour.
230+
pub fn from_ymdh(year: i32, month: u32, day: u32, hour: u32) -> Self {
231+
Self::from_ymdh_opt(year, month, day, hour).expect("Valid Date with hour")
194232
}
195-
accepts!(TIMESTAMPTZ);
196-
}
197233

198-
impl ToSql for DateHour {
199-
fn to_sql(&self, ty: &Type, w: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
200-
Utc.ymd(self.date.year(), self.date.month(), self.date.day())
201-
.and_hms(self.hour, 0, 0)
202-
.to_sql(ty, w)
234+
/// Makes a new [`DateHour`] from year, month, day and hour.
235+
///
236+
/// Returns `None` on invalid year, month, day or hour.
237+
///
238+
/// See [`chrono::NaiveDate::from_ymd_opt()`] & [`chrono::NaiveTime::from_hms_opt()`] for details
239+
pub fn from_ymdh_opt(year: i32, month: u32, day: u32, hour: u32) -> Option<Self> {
240+
if hour >= 24 {
241+
return None;
242+
}
243+
244+
let date = NaiveDate::from_ymd_opt(year, month, day)?;
245+
Some(Self {
246+
date: Date::from_utc(date, Utc),
247+
hour,
248+
})
203249
}
204250

205-
accepts!(TIMESTAMPTZ);
206-
to_sql_checked!();
207-
}
251+
pub fn now() -> Self {
252+
let datetime = Utc::now();
208253

209-
impl Event {
210-
pub fn is_click_event(&self) -> bool {
211-
matches!(self, Event::Click { .. })
254+
Self {
255+
date: datetime.date(),
256+
hour: datetime.hour(),
257+
}
212258
}
259+
}
213260

214-
pub fn is_impression_event(&self) -> bool {
215-
matches!(self, Event::Impression { .. })
261+
/// Manually implement [`Copy`] as it requires a where clause for the [`TimeZone::Offset`]
262+
impl<Tz: TimeZone> Copy for DateHour<Tz> where Tz::Offset: Copy {}
263+
264+
impl<Tz: TimeZone> DateHour<Tz> {
265+
/// Creates a [`DateTime`] with minutes, seconds, nanoseconds set to `0` (zero)
266+
pub fn to_datetime(&self) -> DateTime<Tz> {
267+
self.date.and_hms(self.hour, 0, 0)
216268
}
269+
}
217270

218-
pub fn as_str(&self) -> &str {
219-
self.as_ref()
271+
impl<Tz: TimeZone> fmt::Debug for DateHour<Tz> {
272+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273+
self.to_datetime().fmt(f)
220274
}
221275
}
222276

223-
impl AsRef<str> for Event {
224-
fn as_ref(&self) -> &str {
225-
match *self {
226-
Event::Impression { .. } => "IMPRESSION",
227-
Event::Click { .. } => "CLICK",
277+
impl<Tz: TimeZone> TryFrom<DateTime<Tz>> for DateHour<Tz> {
278+
type Error = DateHourError;
279+
280+
fn try_from(datetime: DateTime<Tz>) -> Result<Self, Self::Error> {
281+
let time = datetime.time();
282+
283+
match (time.minute(), time.second(), time.nanosecond()) {
284+
(0, 0, 0) => Ok(Self {
285+
date: datetime.date(),
286+
hour: datetime.hour(),
287+
}),
288+
_ => Err(DateHourError {
289+
minutes: datetime.minute(),
290+
seconds: datetime.second(),
291+
nanoseconds: datetime.nanosecond(),
292+
}),
228293
}
229294
}
230295
}
231296

232-
impl fmt::Display for Event {
233-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
234-
f.write_str(self.as_ref())
297+
impl Serialize for DateHour<Utc> {
298+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
299+
where
300+
S: Serializer,
301+
{
302+
self.to_datetime().serialize(serializer)
303+
}
304+
}
305+
306+
impl<'de> Deserialize<'de> for DateHour<Utc> {
307+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
308+
where
309+
D: Deserializer<'de>,
310+
{
311+
let datetime = <DateTime<Utc>>::deserialize(deserializer)?;
312+
313+
Self::try_from(datetime).map_err(serde::de::Error::custom)
235314
}
236315
}
237316

@@ -672,15 +751,16 @@ pub mod campaign_create {
672751

673752
#[cfg(feature = "postgres")]
674753
mod postgres {
675-
use super::{Analytics, MessageResponse, ValidatorMessage};
754+
use super::{Analytics, DateHour, MessageResponse, ValidatorMessage};
676755
use crate::{
677756
sentry::EventAggregate,
678757
validator::{messages::Type as MessageType, MessageTypes},
679758
};
680759
use bytes::BytesMut;
760+
use chrono::{DateTime, Timelike, Utc};
681761
use serde::Deserialize;
682762
use tokio_postgres::{
683-
types::{accepts, to_sql_checked, IsNull, Json, ToSql, Type},
763+
types::{accepts, to_sql_checked, FromSql, IsNull, Json, ToSql, Type},
684764
Error, Row,
685765
};
686766

@@ -735,30 +815,40 @@ mod postgres {
735815
}
736816

737817
impl From<&Row> for Analytics {
818+
/// # Panics
819+
///
820+
/// When a field is missing in [`Row`] or if the [`Analytics`] `ad_unit` or `ad_slot` [`crate::IPFS`] is wrong.
738821
fn from(row: &Row) -> Self {
739-
let ad_slot_type: String = row.get("ad_slot_type");
740-
let ad_slot_type = match ad_slot_type.len() {
741-
0 => None,
742-
_ => Some(ad_slot_type),
743-
};
744-
745-
let hostname: String = row.get("hostname");
746-
let hostname = match hostname.len() {
747-
0 => None,
748-
_ => Some(hostname),
749-
};
750-
751-
let country: String = row.get("country");
752-
let country = match country.len() {
753-
0 => None,
754-
_ => Some(country),
755-
};
822+
let ad_slot_type = row
823+
.get::<_, Option<String>>("ad_slot_type")
824+
.filter(|string| !string.is_empty());
825+
let hostname = row
826+
.get::<_, Option<String>>("hostname")
827+
.filter(|string| !string.is_empty());
828+
let country = row
829+
.get::<_, Option<String>>("country")
830+
.filter(|string| !string.is_empty());
831+
832+
let ad_unit = row.get::<_, Option<String>>("ad_unit").and_then(|string| {
833+
if !string.is_empty() {
834+
Some(string.parse().expect("Valid IPFS"))
835+
} else {
836+
None
837+
}
838+
});
839+
let ad_slot = row.get::<_, Option<String>>("ad_slot").and_then(|string| {
840+
if !string.is_empty() {
841+
Some(string.parse().expect("Valid IPFS"))
842+
} else {
843+
None
844+
}
845+
});
756846

757847
Self {
758848
campaign_id: row.get("campaign_id"),
759849
time: row.get("time"),
760-
ad_unit: row.try_get("ad_unit").ok(),
761-
ad_slot: row.try_get("ad_slot").ok(),
850+
ad_unit,
851+
ad_slot,
762852
ad_slot_type,
763853
advertiser: row.get("advertiser"),
764854
publisher: row.get("publisher"),
@@ -767,10 +857,41 @@ mod postgres {
767857
os_name: row.get("os"),
768858
event_type: row.get("event_type"),
769859
payout_amount: row.get("payout_amount"),
770-
payout_count: row.get("payout_count"),
860+
payout_count: row.get::<_, i32>("payout_count").unsigned_abs(),
771861
}
772862
}
773863
}
864+
865+
impl<'a> FromSql<'a> for DateHour<Utc> {
866+
fn from_sql(
867+
ty: &Type,
868+
raw: &'a [u8],
869+
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
870+
let datetime = <DateTime<Utc> as FromSql>::from_sql(ty, raw)?;
871+
assert_eq!(datetime.time().minute(), 0);
872+
assert_eq!(datetime.time().second(), 0);
873+
assert_eq!(datetime.time().nanosecond(), 0);
874+
875+
Ok(Self {
876+
date: datetime.date(),
877+
hour: datetime.hour(),
878+
})
879+
}
880+
accepts!(TIMESTAMPTZ);
881+
}
882+
883+
impl ToSql for DateHour<Utc> {
884+
fn to_sql(
885+
&self,
886+
ty: &Type,
887+
w: &mut BytesMut,
888+
) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
889+
self.date.and_hms(self.hour, 0, 0).to_sql(ty, w)
890+
}
891+
892+
accepts!(TIMESTAMPTZ);
893+
to_sql_checked!();
894+
}
774895
}
775896

776897
#[cfg(test)]

0 commit comments

Comments
 (0)