Skip to content

Chore.mns no sleep 2 #5624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 51 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
7ba3477
feat: embedded google pubsub transformer
Sidddddarth Feb 18, 2025
21d505d
feat: add kafka embedded destination transformer
itsmihir Mar 5, 2025
f18f1ae
test: add tests for event failure
itsmihir Mar 5, 2025
cd1e71a
fix: linting
itsmihir Mar 5, 2025
7bfc78b
chore: add check for eventTypeToTopicMap and eventToTopicMap
itsmihir Mar 5, 2025
94ea530
chore: add logger for destination transformer
itsmihir Mar 6, 2025
8dee2dd
Merge branch 'feat.embeddedGooglepbusubTransformer' into feat.embedde…
itsmihir Mar 6, 2025
96c1811
fix: remove source definition type from metadata
itsmihir Mar 6, 2025
41fdf9a
chore: add utils to get topic map
itsmihir Mar 6, 2025
4980c78
Merge branch 'feat.embeddedGooglepbusubTransformer' into feat.embedde…
itsmihir Mar 6, 2025
21c6951
chore: refactor kafka transformer to use utils
itsmihir Mar 6, 2025
2e87dbd
feat: refactor pubsub transformer to use utils.GetTopicMap
itsmihir Mar 6, 2025
7a912da
feat: add check for topic being empty in kafka destination
itsmihir Mar 6, 2025
ff6a412
Merge branch 'feat.embeddedGooglepbusubTransformer' of https://github…
itsmihir Mar 6, 2025
a22a6b7
chore: minor refactor of filterConfigTopics to return bool
itsmihir Mar 6, 2025
19873e5
chore: remove unnecessary pointer dereferencing
itsmihir Mar 6, 2025
121162d
chore: minor refactor move sourceDefinitionType
itsmihir Mar 6, 2025
29616ff
feat: add validation error stats tags
itsmihir Mar 7, 2025
b6388d3
fix: linting
itsmihir Mar 7, 2025
1ada467
chore: refactor pubsub transformer to use findAttributeValue
itsmihir Mar 7, 2025
959dd74
Merge branch 'feat.embeddedGooglepbusubTransformer' of https://github…
itsmihir Mar 7, 2025
9c9b249
chore: add stat tags to kafka destination transformer
itsmihir Mar 7, 2025
4e8982a
chore: add missing test for panic
itsmihir Mar 7, 2025
1cab7fc
chore: remove GetAttributesAsMapOfInterface
itsmihir Mar 7, 2025
df217f0
chore: add tests for pubsub transformer
itsmihir Mar 7, 2025
8bae196
feat: add dot notation support for attributes
itsmihir Mar 10, 2025
c185379
Merge branch 'feat.embeddedGooglepbusubTransformer' of https://github…
itsmihir Mar 10, 2025
2f85520
chore: add tests for attributes metadata
itsmihir Mar 10, 2025
46286e3
fix: linting
itsmihir Mar 10, 2025
5812ae1
Merge branch 'master' of https://github.com/rudderlabs/rudder-server …
itsmihir Mar 10, 2025
a90759a
Merge branch 'feat.embeddedGooglepbusubTransformer' of https://github…
itsmihir Mar 10, 2025
86340ff
fix: linting
itsmihir Mar 10, 2025
cd7dcd6
feat: handle timestamp field for identify and track events
itsmihir Mar 11, 2025
05142a1
fix: linting
itsmihir Mar 11, 2025
8874e60
Merge branch 'feat.embeddedGooglepbusubTransformer' of https://github…
itsmihir Mar 11, 2025
50ee824
feat: set correct timestamp for retl event
itsmihir Mar 11, 2025
a68ebf2
chore: minor test changes
itsmihir Mar 11, 2025
cbf2507
Merge branch 'master' of https://github.com/rudderlabs/rudder-server …
itsmihir Mar 11, 2025
0e0f876
Merge branch 'feat.embeddedGooglepbusubTransformer' of https://github…
itsmihir Mar 11, 2025
cd418e1
chore: fix failing tests
itsmihir Mar 11, 2025
975af21
Merge branch 'feat.embeddedGooglepbusubTransformer' of https://github…
itsmihir Mar 11, 2025
e74be51
chore: add debugging info
lvrach Mar 12, 2025
c1cb1e4
Revert "chore: add debugging info"
lvrach Mar 12, 2025
ffc2d46
log router delivery time
lvrach Mar 12, 2025
81df7ae
Revert "log router delivery time"
lvrach Mar 13, 2025
d414897
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
lvrach Mar 13, 2025
7e42fcb
chore: jobsdb trace context
lvrach Mar 19, 2025
2aad8fc
extra trace span
lvrach Mar 19, 2025
adad92f
remove sleeps
lvrach Mar 19, 2025
4e5fbe3
no work counter for sleep
lvrach Mar 19, 2025
828c3ff
add zipkin and redpanda
lvrach Mar 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,34 @@ services:
- ETCD_ADVERTISE_CLIENT_URLS=http://mode-provider:2379
ports:
- "2379:2379"
zipkin:
image: openzipkin/zipkin:latest
restart: unless-stopped
ports:
- "9411:9411"
environment:
- STORAGE_TYPE=mem
redpanda:
image: redpandadata/redpanda:latest
container_name: redpanda
ports:
- "9092:9092" # Kafka API
environment:
- REDPANDA_RPC_SERVER_LISTEN_ADDR=0.0.0.0
- REDPANDA_ADVERTISED_KAFKA_ADDR=localhost:9092
command:
- redpanda
- start
- --smp=1
- --memory=1G
- --reserve-memory=0M
- --overprovisioned
- --node-id=0
- --check=false
- --kafka-addr=0.0.0.0:9092
- --advertise-kafka-addr=localhost:9092
- --pandaproxy-addr=0.0.0.0:8082
- --advertise-pandaproxy-addr=localhost:8082
- --schema-registry-addr=0.0.0.0:8081
- --rpc-addr=0.0.0.0:33145
- --advertise-rpc-addr=localhost:33145
31 changes: 31 additions & 0 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,9 @@ type JobT struct {
LastJobStatus JobStatusT `json:"LastJobStatus"`
Parameters json.RawMessage `json:"Parameters"`
WorkspaceId string `json:"WorkspaceId"`

ctx context.Context
ctxOnce sync.Once
}

func (job *JobT) String() string {
Expand All @@ -428,6 +431,34 @@ func (job *JobT) sanitizeJSON() error {
return nil
}

type traceContainer struct {
TraceParent string `json:"traceparent"`
}

func (job *JobT) Ctx() context.Context {
job.ctxOnce.Do(func() {
if job.Parameters == nil {
job.ctx = context.Background()
return
}

var c traceContainer
err := jsonrs.Unmarshal(job.Parameters, &c)
if err != nil {
job.ctx = context.Background()
return
}

if c.TraceParent == "" {
job.ctx = context.Background()
return
}
ctx := stats.InjectTraceParentIntoContext(context.Background(), c.TraceParent)
job.ctx = ctx
})
return job.ctx
}

// The struct fields need to be exposed to JSON package
type dataSetT struct {
JobTable string `json:"job"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats, opts ...Opt)
handle.config.maxRetryBackoffInterval = conf.GetReloadableDurationVar(30, time.Second, "Processor.DestinationTransformer.maxRetryBackoffInterval", "Processor.maxRetryBackoffInterval")
handle.config.batchSize = config.GetReloadableIntVar(100, 1, "Processor.DestinationTransformer.batchSize", "Processor.transformBatchSize")

handle.config.maxLoggedEvents = conf.GetReloadableIntVar(10000, 1, "Processor.DestinationTransformer.maxLoggedEvents")

handle.stats.comparisonTime = handle.stat.NewStat("embedded_destination_transform_comparison_time", stats.TimerType)
handle.stats.matchedEvents = handle.stat.NewStat("embedded_destination_transform_matched_events", stats.CountType)
handle.stats.mismatchedEvents = handle.stat.NewStat("embedded_destination_transform_mismatched_events", stats.CountType)

handle.loggedEvents = 0
handle.loggedEventsMu = sync.Mutex{}
handle.loggedFileName = generateLogFileName()

for _, opt := range opts {
opt(handle)
}
Expand All @@ -70,15 +80,27 @@ type Client struct {
maxRetryBackoffInterval config.ValueLoader[time.Duration]
timeoutDuration time.Duration
batchSize config.ValueLoader[int]

maxLoggedEvents config.ValueLoader[int]
}
guardConcurrency chan struct{}
conf *config.Config
log logger.Logger
stat stats.Stats
client transformerclient.Client

stats struct {
comparisonTime stats.Timer
matchedEvents stats.Counter
mismatchedEvents stats.Counter
}

loggedEventsMu sync.Mutex
loggedEvents int64
loggedFileName string
}

func (d *Client) Transform(ctx context.Context, clientEvents []types.TransformerEvent) types.Response {
func (d *Client) transform(ctx context.Context, clientEvents []types.TransformerEvent) types.Response {
batchSize := d.config.batchSize.Load()
if len(clientEvents) == 0 {
return types.Response{}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package kafka

import (
"context"
"fmt"
"net/http"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
utils "github.com/rudderlabs/rudder-server/processor/internal/transformer/destination_transformer/embedded"
"github.com/rudderlabs/rudder-server/processor/types"
"github.com/rudderlabs/rudder-server/utils/misc"
)

var canonicalNames = []string{"KAFKA", "kafka", "Kafka"}

func Transform(_ context.Context, events []types.TransformerEvent) types.Response {
response := types.Response{}
eventTypeToTopicMap := utils.GetTopicMap(events[0].Destination, "eventTypeToTopicMap", true)
eventToTopicMap := utils.GetTopicMap(events[0].Destination, "eventToTopicMap", false)

for _, event := range events {
event.Metadata.SourceDefinitionType = "" // TODO: Currently, it's getting ignored during JSON marshalling Remove this once we start using it.

if event.Destination.ID != events[0].Destination.ID {
panic("all events must have the same destination")
}

event.Message = utils.UpdateTimestampFieldForRETLEvent(event.Message)
var integrationsObj map[string]interface{}
for _, canonicalName := range canonicalNames {
if inObj, ok := misc.MapLookup(event.Message, "integrations", canonicalName).(map[string]interface{}); ok {
integrationsObj = inObj
break
}
}

var userId string
if id, ok := event.Message["userId"].(string); ok && id != "" {
userId = id
} else if id, ok := event.Message["anonymousId"].(string); ok {
userId = id
}

topic, err := getTopic(event, integrationsObj, eventTypeToTopicMap, eventToTopicMap)
if err != nil {
response.FailedEvents = append(response.FailedEvents, types.TransformerResponse{
Error: err.Error(),
Metadata: event.Metadata,
StatusCode: http.StatusInternalServerError,
StatTags: utils.GetValidationErrorStatTags(event.Destination),
})
continue
}

outputEvent := map[string]interface{}{
"message": utils.GetMessageAsMap(event.Message),
"userId": userId,
"topic": topic,
}

if schemaId, ok := integrationsObj["schemaId"].(string); ok && schemaId != "" {
outputEvent["schemaId"] = schemaId
}

event.Metadata.RudderID = fmt.Sprintf("%s<<>>%s", event.Metadata.RudderID, topic)

response.Events = append(response.Events, types.TransformerResponse{
Output: outputEvent,
Metadata: event.Metadata,
StatusCode: http.StatusOK,
})
}

return response
}

func getTopic(event types.TransformerEvent, integrationsObj map[string]interface{}, eventTypeToTopicMap, eventToTopicMap map[string]string) (string, error) {
if topic, ok := integrationsObj["topic"].(string); ok && topic != "" {
return topic, nil
}

if topic, ok := filterConfigTopics(event.Message, event.Destination, eventTypeToTopicMap, eventToTopicMap); ok && topic != "" {
return topic, nil
}

if topic, ok := event.Destination.Config["topic"].(string); ok && topic != "" {
return topic, nil
}

return "", fmt.Errorf("Topic is required for Kafka destination")
}

func filterConfigTopics(message types.SingularEventT, destination backendconfig.DestinationT, eventTypeToTopicMap, eventToTopicMap map[string]string) (string, bool) {
if destination.Config["enableMultiTopic"] == true {
messageType, ok := message["type"].(string)
if !ok {
return "", false
}

switch messageType {
case "identify", "screen", "page", "group", "alias":
{
if topic, ok := eventTypeToTopicMap[messageType]; ok {
return topic, true
}
break
}
case "track":
{
eventName, ok := message["event"].(string)
if !ok || eventName == "" {
return "", false
}

if topic, ok := eventToTopicMap[eventName]; ok {
return topic, true
}
break
}
}
}
return "", false
}
Loading
Loading