Skip to content

Commit 14c50a1

Browse files
committed
adding IT test
1 parent 7e2f81d commit 14c50a1

File tree

7 files changed

+226
-4
lines changed

7 files changed

+226
-4
lines changed

sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.dapr.testcontainers.DaprLogLevel;
2121
import io.dapr.workflows.client.DaprWorkflowClient;
2222
import io.dapr.workflows.client.WorkflowInstanceStatus;
23+
import io.dapr.workflows.client.WorkflowRuntimeStatus;
2324
import io.dapr.workflows.runtime.WorkflowRuntime;
2425
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
2526
import org.junit.jupiter.api.BeforeEach;
@@ -117,6 +118,36 @@ public void testWorkflows() throws Exception {
117118
assertEquals(instanceId, workflowOutput.getWorkflowId());
118119
}
119120

121+
@Test
122+
public void testSuspendAndResumeWorkflows() throws Exception {
123+
TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>());
124+
String instanceId = workflowClient.scheduleNewWorkflow(TestWorkflow.class, payload);
125+
workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false);
126+
127+
workflowClient.suspendWorkflow(instanceId, "testing suspend.");
128+
129+
130+
WorkflowInstanceStatus instanceState = workflowClient.getInstanceState(instanceId, false);
131+
assertNotNull(instanceState);
132+
assertEquals(WorkflowRuntimeStatus.SUSPENDED, instanceState.getRuntimeStatus());
133+
134+
workflowClient.resumeWorkflow(instanceId, "testing resume");
135+
136+
instanceState = workflowClient.getInstanceState(instanceId, false);
137+
assertNotNull(instanceState);
138+
assertEquals(WorkflowRuntimeStatus.RUNNING, instanceState.getRuntimeStatus());
139+
140+
workflowClient.raiseEvent(instanceId, "MoveForward", payload);
141+
142+
Duration timeout = Duration.ofSeconds(10);
143+
instanceState = workflowClient.waitForInstanceCompletion(instanceId, timeout, true);
144+
145+
assertNotNull(instanceState);
146+
assertEquals(WorkflowRuntimeStatus.COMPLETED, instanceState.getRuntimeStatus());
147+
148+
}
149+
150+
120151
private TestWorkflowPayload deserialize(String value) throws JsonProcessingException {
121152
return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class);
122153
}

sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,26 @@ public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, NewWorkfl
129129
orchestrationInstanceOptions);
130130
}
131131

132+
/**
133+
* Suspend the workflow associated with the provided instance id.
134+
*
135+
* @param workflowInstanceId Workflow instance id to suspend.
136+
* @param reason reason for suspending the workflow instance.
137+
*/
138+
public void suspendWorkflow(String workflowInstanceId, @Nullable String reason) {
139+
this.innerClient.suspendInstance(workflowInstanceId, reason);
140+
}
141+
142+
/**
143+
* Resume the workflow associated with the provided instance id.
144+
*
145+
* @param workflowInstanceId Workflow instance id to resume.
146+
* @param reason reason for resuming the workflow instance.
147+
*/
148+
public void resumeWorkflow(String workflowInstanceId, @Nullable String reason) {
149+
this.innerClient.resumeInstance(workflowInstanceId, reason);
150+
}
151+
132152
/**
133153
* Terminates the workflow associated with the provided instance id.
134154
*

sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,17 @@ public void raiseEvent() {
217217
expectedEventName, expectedEventPayload);
218218
}
219219

220+
@Test
221+
public void suspendResumeInstance() {
222+
String expectedArgument = "TestWorkflowInstanceId";
223+
client.suspendWorkflow(expectedArgument, "suspending workflow instance");
224+
client.resumeWorkflow(expectedArgument, "resuming workflow instance");
225+
verify(mockInnerClient, times(1)).suspendInstance(expectedArgument,
226+
"suspending workflow instance");
227+
verify(mockInnerClient, times(1)).resumeInstance(expectedArgument,
228+
"resuming workflow instance");
229+
}
230+
220231
@Test
221232
public void purgeInstance() {
222233
String expectedArgument = "TestWorkflowInstanceId";

spring-boot-examples/workflows/src/main/java/io/dapr/springboot/examples/wfp/WorkflowPatternsRestController.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.dapr.springboot.examples.wfp.fanoutin.Result;
2525
import io.dapr.springboot.examples.wfp.remoteendpoint.Payload;
2626
import io.dapr.springboot.examples.wfp.remoteendpoint.RemoteEndpointWorkflow;
27+
import io.dapr.springboot.examples.wfp.suspendresume.SuspendResumeWorkflow;
2728
import io.dapr.workflows.client.DaprWorkflowClient;
2829
import io.dapr.workflows.client.WorkflowInstanceStatus;
2930
import org.slf4j.Logger;
@@ -153,4 +154,39 @@ public Payload remoteEndpoint(@RequestBody Payload payload)
153154
return workflowInstanceStatus.readOutputAs(Payload.class);
154155
}
155156

157+
@PostMapping("wfp/suspendresume")
158+
public String suspendResume(@RequestParam("orderId") String orderId) {
159+
String instanceId = daprWorkflowClient.scheduleNewWorkflow(SuspendResumeWorkflow.class);
160+
logger.info("Workflow instance " + instanceId + " started");
161+
ordersToApprove.put(orderId, instanceId);
162+
return instanceId;
163+
}
164+
165+
@PostMapping("wfp/suspendresume-suspend")
166+
public String suspendResumeExecuteSuspend(@RequestParam("orderId") String orderId) {
167+
String instanceId = ordersToApprove.get(orderId);
168+
daprWorkflowClient.suspendWorkflow(instanceId, "testing suspend");
169+
WorkflowInstanceStatus instanceState = daprWorkflowClient.getInstanceState(instanceId, false);
170+
return instanceState.getRuntimeStatus().name();
171+
}
172+
173+
@PostMapping("wfp/suspendresume-resume")
174+
public String suspendResumeExecuteResume(@RequestParam("orderId") String orderId) {
175+
String instanceId = ordersToApprove.get(orderId);
176+
daprWorkflowClient.resumeWorkflow(instanceId, "testing resume");
177+
WorkflowInstanceStatus instanceState = daprWorkflowClient.getInstanceState(instanceId, false);
178+
return instanceState.getRuntimeStatus().name();
179+
}
180+
181+
182+
@PostMapping("wfp/suspendresume-continue")
183+
public Decision suspendResumeContinue(@RequestParam("orderId") String orderId, @RequestParam("decision") Boolean decision)
184+
throws TimeoutException {
185+
String instanceId = ordersToApprove.get(orderId);
186+
logger.info("Workflow instance " + instanceId + " continue");
187+
daprWorkflowClient.raiseEvent(instanceId, "Approval", decision);
188+
WorkflowInstanceStatus workflowInstanceStatus = daprWorkflowClient
189+
.waitForInstanceCompletion(instanceId, null, true);
190+
return workflowInstanceStatus.readOutputAs(Decision.class);
191+
}
156192
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2023 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.springboot.examples.wfp.suspendresume;
15+
16+
import io.dapr.workflows.WorkflowActivity;
17+
import io.dapr.workflows.WorkflowActivityContext;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
import org.springframework.stereotype.Component;
21+
22+
import java.util.concurrent.TimeUnit;
23+
24+
@Component
25+
public class PerformTaskActivity implements WorkflowActivity {
26+
@Override
27+
public Object run(WorkflowActivityContext ctx) {
28+
Logger logger = LoggerFactory.getLogger(PerformTaskActivity.class);
29+
logger.info("Starting Activity: " + ctx.getName());
30+
31+
logger.info("Running activity...");
32+
//Sleeping for 5 seconds to simulate long running operation
33+
try {
34+
TimeUnit.SECONDS.sleep(5);
35+
} catch (InterruptedException e) {
36+
throw new RuntimeException(e);
37+
}
38+
39+
logger.info("Completing activity...");
40+
41+
return "OK";
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2023 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.springboot.examples.wfp.suspendresume;
15+
16+
import io.dapr.springboot.examples.wfp.externalevent.Decision;
17+
import io.dapr.workflows.Workflow;
18+
import io.dapr.workflows.WorkflowStub;
19+
import org.springframework.stereotype.Component;
20+
21+
@Component
22+
public class SuspendResumeWorkflow implements Workflow {
23+
@Override
24+
public WorkflowStub create() {
25+
return ctx -> {
26+
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
27+
28+
ctx.callActivity(PerformTaskActivity.class.getName(), String.class).await();
29+
30+
ctx.getLogger().info("Waiting for approval...");
31+
Boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await();
32+
33+
ctx.getLogger().info("approval-event arrived");
34+
35+
ctx.callActivity(PerformTaskActivity.class.getName(), String.class).await();
36+
37+
ctx.complete(new Decision(approved));
38+
};
39+
}
40+
}

spring-boot-examples/workflows/src/test/java/io/dapr/springboot/examples/wfp/WorkflowPatternsAppTests.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313

1414
package io.dapr.springboot.examples.wfp;
1515

16-
import io.dapr.client.DaprClient;
1716
import io.dapr.springboot.DaprAutoConfiguration;
1817
import io.dapr.springboot.examples.wfp.continueasnew.CleanUpLog;
1918
import io.dapr.springboot.examples.wfp.remoteendpoint.Payload;
19+
import io.dapr.workflows.client.WorkflowRuntimeStatus;
2020
import io.github.microcks.testcontainers.MicrocksContainersEnsemble;
2121
import io.restassured.RestAssured;
2222
import io.restassured.http.ContentType;
@@ -32,15 +32,13 @@
3232
import static org.hamcrest.CoreMatchers.containsString;
3333
import static org.hamcrest.Matchers.equalTo;
3434
import static org.junit.jupiter.api.Assertions.assertEquals;
35+
import static org.junit.jupiter.api.Assertions.assertNotNull;
3536

3637
@SpringBootTest(classes = {TestWorkflowPatternsApplication.class, DaprTestContainersConfig.class,
3738
DaprAutoConfiguration.class, },
3839
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
3940
class WorkflowPatternsAppTests {
4041

41-
@Autowired
42-
private DaprClient daprClient;
43-
4442
@Autowired
4543
private MicrocksContainersEnsemble ensemble;
4644

@@ -160,4 +158,47 @@ void testRemoteEndpoint() {
160158
.getServiceInvocationsCount("API Payload Processor", "1.0.0"));
161159
}
162160

161+
@Test
162+
void testSuspendResume() {
163+
164+
String instanceId = given()
165+
.queryParam("orderId", "123")
166+
.when()
167+
.post("/wfp/suspendresume")
168+
.then()
169+
.statusCode(200).extract().asString();
170+
171+
assertNotNull(instanceId);
172+
173+
// The workflow is waiting on an event, let's suspend the workflow
174+
String state = given()
175+
.queryParam("orderId", "123")
176+
.when()
177+
.post("/wfp/suspendresume-suspend")
178+
.then()
179+
.statusCode(200).extract().asString();
180+
181+
assertEquals(WorkflowRuntimeStatus.SUSPENDED.name(), state);
182+
183+
// The let's resume the suspended workflow and check the state
184+
state = given()
185+
.queryParam("orderId", "123")
186+
.when()
187+
.post("/wfp/suspendresume-resume")
188+
.then()
189+
.statusCode(200).extract().asString();
190+
191+
assertEquals(WorkflowRuntimeStatus.RUNNING.name(), state);
192+
193+
// Now complete the workflow by sending an event
194+
given()
195+
.queryParam("orderId", "123")
196+
.queryParam("decision", false)
197+
.when()
198+
.post("/wfp/suspendresume-continue")
199+
.then()
200+
.statusCode(200).body("approved", equalTo(false));
201+
202+
}
203+
163204
}

0 commit comments

Comments
 (0)