Skip to content

Commit 2c2a590

Browse files
authored
Fix replication task serialization (#6741)
1 parent 58c4a50 commit 2c2a590

File tree

4 files changed

+26
-1
lines changed

4 files changed

+26
-1
lines changed

Diff for: service/history/execution/mutable_state_builder.go

+6
Original file line numberDiff line numberDiff line change
@@ -1772,6 +1772,11 @@ func (e *mutableStateBuilder) eventsToReplicationTask(
17721772

17731773
// the visibility timestamp will be set in shard context
17741774
replicationTask := &persistence.HistoryReplicationTask{
1775+
WorkflowIdentifier: persistence.WorkflowIdentifier{
1776+
DomainID: e.executionInfo.DomainID,
1777+
WorkflowID: e.executionInfo.WorkflowID,
1778+
RunID: e.executionInfo.RunID,
1779+
},
17751780
TaskData: persistence.TaskData{
17761781
Version: firstEvent.Version,
17771782
},
@@ -1794,6 +1799,7 @@ func (e *mutableStateBuilder) syncActivityToReplicationTask(
17941799
}
17951800

17961801
return convertSyncActivityInfos(
1802+
e.executionInfo,
17971803
e.pendingActivityInfoIDs,
17981804
e.syncActivityTasks,
17991805
)

Diff for: service/history/execution/mutable_state_builder_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -3536,6 +3536,11 @@ func TestCloseTransactionAsMutation(t *testing.T) {
35363536
persistence.HistoryTaskCategoryTimer: nil,
35373537
persistence.HistoryTaskCategoryReplication: []persistence.Task{
35383538
&persistence.HistoryReplicationTask{
3539+
WorkflowIdentifier: persistence.WorkflowIdentifier{
3540+
DomainID: "some-domain-id",
3541+
WorkflowID: "",
3542+
RunID: "",
3543+
},
35393544
FirstEventID: 1,
35403545
NextEventID: 2,
35413546
TaskData: persistence.TaskData{

Diff for: service/history/execution/mutable_state_util.go

+6
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func (policy TransactionPolicy) Ptr() *TransactionPolicy {
4646
}
4747

4848
func convertSyncActivityInfos(
49+
executionInfo *persistence.WorkflowExecutionInfo,
4950
activityInfos map[int64]*persistence.ActivityInfo,
5051
inputs map[int64]struct{},
5152
) []persistence.Task {
@@ -55,6 +56,11 @@ func convertSyncActivityInfos(
5556
if ok {
5657
// the visibility timestamp will be set in shard context
5758
outputs = append(outputs, &persistence.SyncActivityTask{
59+
WorkflowIdentifier: persistence.WorkflowIdentifier{
60+
DomainID: executionInfo.DomainID,
61+
WorkflowID: executionInfo.WorkflowID,
62+
RunID: executionInfo.RunID,
63+
},
5864
TaskData: persistence.TaskData{
5965
Version: activityInfo.Version,
6066
},

Diff for: service/history/execution/mutable_state_util_test.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -457,13 +457,21 @@ func TestGetChildExecutionDomainEntry(t *testing.T) {
457457

458458
func TestConvert(t *testing.T) {
459459
t.Run("convertSyncActivityInfos", func(t *testing.T) {
460+
executionInfo := &persistence.WorkflowExecutionInfo{
461+
DomainID: "some-domain-id",
462+
WorkflowID: "some-workflow-id",
463+
RunID: "some-run-id",
464+
}
460465
activityInfos := map[int64]*persistence.ActivityInfo{1: {Version: 1, ScheduleID: 1}}
461466
inputs := map[int64]struct{}{1: {}}
462-
outputs := convertSyncActivityInfos(activityInfos, inputs)
467+
outputs := convertSyncActivityInfos(executionInfo, activityInfos, inputs)
463468
assert.NotNil(t, outputs)
464469
assert.Equal(t, 1, len(outputs))
465470
assert.Equal(t, int64(1), outputs[0].(*persistence.SyncActivityTask).ScheduledID)
466471
assert.Equal(t, int64(1), outputs[0].GetVersion())
472+
assert.Equal(t, executionInfo.DomainID, outputs[0].(*persistence.SyncActivityTask).DomainID)
473+
assert.Equal(t, executionInfo.WorkflowID, outputs[0].(*persistence.SyncActivityTask).WorkflowID)
474+
assert.Equal(t, executionInfo.RunID, outputs[0].(*persistence.SyncActivityTask).RunID)
467475
})
468476
}
469477

0 commit comments

Comments
 (0)