Skip to content

Commit 80ef562

Browse files
committed
feat: timeout xmtp messages
1 parent 50e1314 commit 80ef562

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

apps/api/src/chat/services/broadcast.consumer.ts

+6-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { JobNonRetriableError } from '@app/common/errors/job-non-retriable-error
22
import { wait } from '@app/common/utils/async.utils'
33
import { XmtpLib } from '@app/definitions/integration-definitions/xmtp/xmtp.lib'
44
import { getWalletName } from '@app/definitions/utils/address.utils'
5-
import { sendXmtpMessage } from '@chainjet/tools/dist/messages'
65
import { InjectQueue, Process, Processor } from '@nestjs/bull'
76
import { Logger } from '@nestjs/common'
87
import { Interval } from '@nestjs/schedule'
@@ -108,7 +107,12 @@ export class BroadcastConsumer {
108107
continue
109108
}
110109
try {
111-
const message = await sendXmtpMessage(client, sendTo, campaign.message + '\n\n' + unsubscribeMessage)
110+
const message = await XmtpLib.sendDirectMessageWithTimeout(
111+
client,
112+
sendTo,
113+
campaign.message + '\n\n' + unsubscribeMessage,
114+
10 * 1000,
115+
)
112116
await this.campaignMessageService.createOne({
113117
campaign: campaign._id,
114118
address: contact.address,

libs/definitions/src/integration-definitions/xmtp/xmtp.lib.ts

+27-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { Client, Conversation } from '@xmtp/xmtp-js'
1+
import { sendXmtpMessage } from '@chainjet/tools/dist/messages'
2+
import { Client, Conversation, DecodedMessage } from '@xmtp/xmtp-js'
23
import { isAddress } from 'ethers/lib/utils'
34

45
const clientsCache = new Map<string, Client>()
@@ -48,4 +49,29 @@ export const XmtpLib = {
4849
}
4950
return xmtpMessage
5051
},
52+
53+
sendDirectMessageWithTimeout: async (
54+
client: Client,
55+
sendTo: string,
56+
message: string,
57+
timeoutMs: number,
58+
): Promise<DecodedMessage> => {
59+
let timeoutHandle: NodeJS.Timeout | undefined
60+
61+
const timeoutPromise = new Promise<DecodedMessage>((_, reject) => {
62+
timeoutHandle = setTimeout(() => {
63+
reject(new Error('Message sending timed out'))
64+
}, timeoutMs)
65+
})
66+
67+
const sendMessagePromise: Promise<DecodedMessage> = sendXmtpMessage(client, sendTo, message)
68+
69+
try {
70+
return await Promise.race([sendMessagePromise, timeoutPromise])
71+
} catch (error) {
72+
throw error
73+
} finally {
74+
clearTimeout(timeoutHandle)
75+
}
76+
},
5177
}

0 commit comments

Comments
 (0)