-
Notifications
You must be signed in to change notification settings - Fork 329
/
Copy pathimport_handler.go
82 lines (70 loc) · 2.42 KB
/
import_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package gateway
import (
"errors"
"net/http"
"strconv"
"strings"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
kituuid "github.com/rudderlabs/rudder-go-kit/uuid"
gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types"
"github.com/rudderlabs/rudder-server/gateway/response"
)
// ImportRequestHandler is an empty struct to capture import specific request handling functionality
type ImportRequestHandler struct {
*Handle
}
// ProcessRequest on ImportRequestHandler splits payload by user and throws them into the webrequestQ and waits for all their responses before returning
func (irh *ImportRequestHandler) ProcessRequest(w *http.ResponseWriter, r *http.Request, _ string, payload []byte, arctx *gwtypes.AuthRequestContext) string {
usersPayload, payloadError := getUsersPayload(payload)
if payloadError != nil {
return payloadError.Error()
}
count := len(usersPayload)
done := make(chan string, count)
for key := range usersPayload {
irh.addToWebRequestQ(w, r, done, "batch", usersPayload[key], arctx)
}
var interimMsgs []string
for index := 0; index < count; index++ {
interimErrorMessage := <-done
interimMsgs = append(interimMsgs, interimErrorMessage)
}
return strings.Join(interimMsgs, "")
}
// getPayloadFromRequest reads the request body and returns event payloads grouped by user id
// for performance see: https://github.com/rudderlabs/rudder-server/pull/2040
func getUsersPayload(requestPayload []byte) (map[string][]byte, error) {
if !gjson.ValidBytes(requestPayload) {
return make(map[string][]byte), errors.New((response.InvalidJSON))
}
result := gjson.GetBytes(requestPayload, "batch")
var (
userCnt = make(map[string]int)
userMap = make(map[string][]byte)
)
result.ForEach(func(_, value gjson.Result) bool {
anonIDFromReq := value.Get("anonymousId").String()
userIDFromReq := value.Get("userId").String()
rudderID, err := kituuid.GetMD5UUID(userIDFromReq + ":" + anonIDFromReq)
if err != nil {
return false
}
uuidStr := rudderID.String()
tempValue, ok := userMap[uuidStr]
if !ok {
userCnt[uuidStr] = 0
userMap[uuidStr] = append([]byte(`{"batch":[`), append([]byte(value.Raw), ']', '}')...)
} else {
path := "batch." + strconv.Itoa(userCnt[uuidStr]+1)
raw, err := sjson.SetRaw(string(tempValue), path, value.Raw)
if err != nil {
return false
}
userCnt[uuidStr]++
userMap[uuidStr] = []byte(raw)
}
return true
})
return userMap, nil
}