Skip to content

Commit b0770ff

Browse files
committed
Run suspending calls within Dispatchers.Default
This addresses a need for off-main-thread invocation of Call.Factory.newCall to support lazy HttpClient initialization.
1 parent 1490e6b commit b0770ff

File tree

4 files changed

+100
-104
lines changed

4 files changed

+100
-104
lines changed

retrofit-mock/src/main/java/retrofit2/mock/BehaviorDelegate.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,9 @@ public <R> T returning(Call<R> call) {
7777

7878
Call<Object> adaptedCall = (Call<Object>) adapted;
7979
Continuation<Object> continuation = (Continuation<Object>) args[args.length - 1];
80-
try {
81-
return adapterInfo.wantsResponse
82-
? KotlinExtensions.awaitResponse(adaptedCall, continuation)
83-
: KotlinExtensions.await(adaptedCall, continuation);
84-
} catch (Exception e) {
85-
return KotlinExtensions.suspendAndThrow(e, continuation);
86-
}
80+
return adapterInfo.wantsResponse
81+
? KotlinExtensions.awaitResponse(adaptedCall, continuation)
82+
: KotlinExtensions.await(adaptedCall, continuation);
8783
});
8884
}
8985

retrofit/kotlin-test/src/test/java/retrofit2/KotlinSuspendTest.kt

+33-4
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,15 @@
1515
*/
1616
package retrofit2
1717

18+
import java.io.IOException
19+
import java.lang.reflect.ParameterizedType
20+
import java.lang.reflect.Type
21+
import java.util.concurrent.Executors
22+
import kotlin.coroutines.CoroutineContext
1823
import kotlinx.coroutines.CoroutineDispatcher
1924
import kotlinx.coroutines.GlobalScope
25+
import kotlinx.coroutines.Runnable
26+
import kotlinx.coroutines.asCoroutineDispatcher
2027
import kotlinx.coroutines.async
2128
import kotlinx.coroutines.runBlocking
2229
import kotlinx.coroutines.withContext
@@ -35,10 +42,6 @@ import retrofit2.helpers.ToStringConverterFactory
3542
import retrofit2.http.GET
3643
import retrofit2.http.HEAD
3744
import retrofit2.http.Path
38-
import java.io.IOException
39-
import java.lang.reflect.ParameterizedType
40-
import java.lang.reflect.Type
41-
import kotlin.coroutines.CoroutineContext
4245

4346
class KotlinSuspendTest {
4447
@get:Rule val server = MockWebServer()
@@ -353,6 +356,32 @@ class KotlinSuspendTest {
353356
}
354357
}
355358

359+
@Test fun usesCoroutineContextForCallFactory() {
360+
val okHttpClient = OkHttpClient()
361+
var callFactoryThread: Thread? = null
362+
val outerContextThread: Thread
363+
val retrofit = Retrofit.Builder()
364+
.baseUrl(server.url("/"))
365+
.callFactory {
366+
callFactoryThread = Thread.currentThread()
367+
okHttpClient.newCall(it)
368+
}
369+
.addConverterFactory(ToStringConverterFactory())
370+
.build()
371+
val example = retrofit.create(Service::class.java)
372+
373+
server.enqueue(MockResponse().setBody("Hi"))
374+
375+
runBlocking {
376+
outerContextThread = Thread.currentThread()
377+
example.body()
378+
}
379+
380+
assertThat(callFactoryThread).isNotNull
381+
assertThat(outerContextThread).isNotEqualTo(callFactoryThread)
382+
}
383+
384+
356385
@Suppress("EXPERIMENTAL_OVERRIDE")
357386
private object DirectUnconfinedDispatcher : CoroutineDispatcher() {
358387
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

retrofit/src/main/java/retrofit2/HttpServiceMethod.java

+8-25
Original file line numberDiff line numberDiff line change
@@ -192,12 +192,7 @@ protected Object adapt(Call<ResponseT> call, Object[] args) {
192192
Continuation<Response<ResponseT>> continuation =
193193
(Continuation<Response<ResponseT>>) args[args.length - 1];
194194

195-
// See SuspendForBody for explanation about this try/catch.
196-
try {
197-
return KotlinExtensions.awaitResponse(call, continuation);
198-
} catch (Exception e) {
199-
return KotlinExtensions.suspendAndThrow(e, continuation);
200-
}
195+
return KotlinExtensions.awaitResponse(call, continuation);
201196
}
202197
}
203198

@@ -226,25 +221,13 @@ protected Object adapt(Call<ResponseT> call, Object[] args) {
226221
//noinspection unchecked Checked by reflection inside RequestFactory.
227222
Continuation<ResponseT> continuation = (Continuation<ResponseT>) args[args.length - 1];
228223

229-
// Calls to OkHttp Call.enqueue() like those inside await and awaitNullable can sometimes
230-
// invoke the supplied callback with an exception before the invoking stack frame can return.
231-
// Coroutines will intercept the subsequent invocation of the Continuation and throw the
232-
// exception synchronously. A Java Proxy cannot throw checked exceptions without them being
233-
// declared on the interface method. To avoid the synchronous checked exception being wrapped
234-
// in an UndeclaredThrowableException, it is intercepted and supplied to a helper which will
235-
// force suspension to occur so that it can be instead delivered to the continuation to
236-
// bypass this restriction.
237-
try {
238-
if (isUnit) {
239-
//noinspection unchecked Checked by isUnit
240-
return KotlinExtensions.awaitUnit((Call<Unit>) call, (Continuation<Unit>) continuation);
241-
} else if (isNullable) {
242-
return KotlinExtensions.awaitNullable(call, continuation);
243-
} else {
244-
return KotlinExtensions.await(call, continuation);
245-
}
246-
} catch (Exception e) {
247-
return KotlinExtensions.suspendAndThrow(e, continuation);
224+
if (isUnit) {
225+
//noinspection unchecked Checked by isUnit
226+
return KotlinExtensions.awaitUnit((Call<Unit>) call, (Continuation<Unit>) continuation);
227+
} else if (isNullable) {
228+
return KotlinExtensions.await(call, continuation);
229+
} else {
230+
return KotlinExtensions.await(call, continuation);
248231
}
249232
}
250233
}

retrofit/src/main/java/retrofit2/KotlinExtensions.kt

+56-68
Original file line numberDiff line numberDiff line change
@@ -20,67 +20,70 @@ package retrofit2
2020

2121
import kotlinx.coroutines.Dispatchers
2222
import kotlinx.coroutines.suspendCancellableCoroutine
23-
import java.lang.reflect.ParameterizedType
24-
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
25-
import kotlin.coroutines.intrinsics.intercepted
26-
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
23+
import kotlinx.coroutines.withContext
2724
import kotlin.coroutines.resume
2825
import kotlin.coroutines.resumeWithException
2926

3027
inline fun <reified T: Any> Retrofit.create(): T = create(T::class.java)
3128

3229
suspend fun <T : Any> Call<T>.await(): T {
33-
return suspendCancellableCoroutine { continuation ->
34-
continuation.invokeOnCancellation {
35-
cancel()
36-
}
37-
enqueue(object : Callback<T> {
38-
override fun onResponse(call: Call<T>, response: Response<T>) {
39-
if (response.isSuccessful) {
40-
val body = response.body()
41-
if (body == null) {
42-
val invocation = call.request().tag(Invocation::class.java)!!
43-
val method = invocation.method()
44-
val e = KotlinNullPointerException("Response from " +
30+
// TODO: a better solution for off-main-thread call factories than this.
31+
return withContext(Dispatchers.Default) {
32+
suspendCancellableCoroutine { continuation ->
33+
continuation.invokeOnCancellation {
34+
cancel()
35+
}
36+
enqueue(object : Callback<T> {
37+
override fun onResponse(call: Call<T>, response: Response<T>) {
38+
if (response.isSuccessful) {
39+
val body = response.body()
40+
if (body == null) {
41+
val invocation = call.request().tag(Invocation::class.java)!!
42+
val method = invocation.method()
43+
val e = KotlinNullPointerException("Response from " +
4544
method.declaringClass.name +
4645
'.' +
4746
method.name +
4847
" was null but response body type was declared as non-null")
49-
continuation.resumeWithException(e)
48+
continuation.resumeWithException(e)
49+
} else {
50+
continuation.resume(body)
51+
}
5052
} else {
51-
continuation.resume(body)
53+
continuation.resumeWithException(HttpException(response))
5254
}
53-
} else {
54-
continuation.resumeWithException(HttpException(response))
5555
}
56-
}
5756

58-
override fun onFailure(call: Call<T>, t: Throwable) {
59-
continuation.resumeWithException(t)
60-
}
61-
})
57+
override fun onFailure(call: Call<T>, t: Throwable) {
58+
continuation.resumeWithException(t)
59+
}
60+
})
61+
}
6262
}
6363
}
6464

6565
@JvmName("awaitNullable")
6666
suspend fun <T : Any> Call<T?>.await(): T? {
67-
return suspendCancellableCoroutine { continuation ->
68-
continuation.invokeOnCancellation {
69-
cancel()
70-
}
71-
enqueue(object : Callback<T?> {
72-
override fun onResponse(call: Call<T?>, response: Response<T?>) {
73-
if (response.isSuccessful) {
74-
continuation.resume(response.body())
75-
} else {
76-
continuation.resumeWithException(HttpException(response))
77-
}
67+
// TODO: a better solution for off-main-thread call factories than this.
68+
return withContext(Dispatchers.Default) {
69+
suspendCancellableCoroutine { continuation ->
70+
continuation.invokeOnCancellation {
71+
cancel()
7872
}
73+
enqueue(object : Callback<T?> {
74+
override fun onResponse(call: Call<T?>, response: Response<T?>) {
75+
if (response.isSuccessful) {
76+
continuation.resume(response.body())
77+
} else {
78+
continuation.resumeWithException(HttpException(response))
79+
}
80+
}
7981

80-
override fun onFailure(call: Call<T?>, t: Throwable) {
81-
continuation.resumeWithException(t)
82-
}
83-
})
82+
override fun onFailure(call: Call<T?>, t: Throwable) {
83+
continuation.resumeWithException(t)
84+
}
85+
})
86+
}
8487
}
8588
}
8689

@@ -91,36 +94,21 @@ suspend fun Call<Unit>.await() {
9194
}
9295

9396
suspend fun <T> Call<T>.awaitResponse(): Response<T> {
94-
return suspendCancellableCoroutine { continuation ->
95-
continuation.invokeOnCancellation {
96-
cancel()
97-
}
98-
enqueue(object : Callback<T> {
99-
override fun onResponse(call: Call<T>, response: Response<T>) {
100-
continuation.resume(response)
97+
// TODO: a better solution for off-main-thread call factories than this.
98+
return withContext(Dispatchers.Default) {
99+
suspendCancellableCoroutine { continuation ->
100+
continuation.invokeOnCancellation {
101+
cancel()
101102
}
103+
enqueue(object : Callback<T> {
104+
override fun onResponse(call: Call<T>, response: Response<T>) {
105+
continuation.resume(response)
106+
}
102107

103-
override fun onFailure(call: Call<T>, t: Throwable) {
104-
continuation.resumeWithException(t)
105-
}
106-
})
107-
}
108-
}
109-
110-
/**
111-
* Force the calling coroutine to suspend before throwing [this].
112-
*
113-
* This is needed when a checked exception is synchronously caught in a [java.lang.reflect.Proxy]
114-
* invocation to avoid being wrapped in [java.lang.reflect.UndeclaredThrowableException].
115-
*
116-
* The implementation is derived from:
117-
* https://github.com/Kotlin/kotlinx.coroutines/pull/1667#issuecomment-556106349
118-
*/
119-
internal suspend fun Exception.suspendAndThrow(): Nothing {
120-
suspendCoroutineUninterceptedOrReturn<Nothing> { continuation ->
121-
Dispatchers.Default.dispatch(continuation.context) {
122-
continuation.intercepted().resumeWithException(this@suspendAndThrow)
108+
override fun onFailure(call: Call<T>, t: Throwable) {
109+
continuation.resumeWithException(t)
110+
}
111+
})
123112
}
124-
COROUTINE_SUSPENDED
125113
}
126114
}

0 commit comments

Comments
 (0)