12
12
* Created by eduard on 06.10.2017.
13
13
*/
14
14
public class API {
15
- private Channel _channel ;
15
+ private com . rabbitmq . client . Channel _channel ;
16
16
17
17
private final String _queue ;
18
18
@@ -28,12 +28,18 @@ public API(String host, int port, String user, String pass, String virtualHost,
28
28
29
29
public API (String host , int port , String user , String pass , String virtualHost , String queue , int durable , String exchange ) throws Exception {
30
30
ConnectionFactory factory = new ConnectionFactory ();
31
- factory .setHost (host );
32
- factory .setPort (port );
33
- factory .setUsername (user );
34
- factory .setPassword (pass );
35
- factory .setVirtualHost (virtualHost );
36
- //factory.setAutomaticRecoveryEnabled(true);
31
+
32
+ if (host .toLowerCase ().startsWith ("amqp://" )) {
33
+ // we got URI connection string
34
+ factory .setUri (host );
35
+ } else {
36
+ factory .setHost (host );
37
+ factory .setPort (port );
38
+ factory .setUsername (user );
39
+ factory .setPassword (pass );
40
+ factory .setVirtualHost (virtualHost );
41
+ //factory.setAutomaticRecoveryEnabled(true);
42
+ }
37
43
38
44
_connection = factory .newConnection ();
39
45
_channel = _connection .createChannel ();
@@ -68,19 +74,19 @@ public API(String host, int port, String user, String pass, String virtualHost,
68
74
_exchange = exchange != null ? exchange : "" ;
69
75
}
70
76
71
- public void sendMessage (byte [] msg , String correlationId , String messageId ) throws Exception {
72
- sendMessageToQueue (_queue , msg , correlationId , messageId );
77
+ public void sendMessageId (byte [] msg , String correlationId , String messageId ) throws Exception {
78
+ sendMessageToQueueId (_queue , msg , correlationId , messageId );
73
79
}
74
80
75
81
public void sendMessage (byte [] msg ) throws Exception {
76
82
sendMessageToQueue (_queue , msg );
77
83
}
78
84
79
85
public void sendMessageToQueue (String queue , byte [] msg ) throws Exception {
80
- sendMessageToQueue (queue , msg , null , null );
86
+ sendMessageToQueueId (queue , msg , null , null );
81
87
}
82
88
83
- public void sendMessageToQueue (String queue , byte [] msg , String correlationId , String messageId ) throws Exception {
89
+ public void sendMessageToQueueId (String queue , byte [] msg , String correlationId , String messageId ) throws Exception {
84
90
AMQP .BasicProperties props = createProperties (correlationId , messageId );
85
91
_channel .basicPublish (_exchange , queue , props , msg );
86
92
}
@@ -163,7 +169,7 @@ private AMQP.BasicProperties createProperties(String correlationId, String messa
163
169
String contentType = ContentType ;
164
170
String contentEncoding = null ;
165
171
HashMap <String , Object > headers = null ;
166
- Integer deliveryMode = null ;
172
+ Integer deliveryMode = Integer . valueOf ( 2 ) ;
167
173
Integer priority = null ;
168
174
//String correlationId= null;
169
175
String replyTo = null ;
@@ -174,6 +180,7 @@ private AMQP.BasicProperties createProperties(String correlationId, String messa
174
180
String userId = null ;
175
181
String appId = null ;
176
182
String clusterId = null ;
183
+
177
184
AMQP .BasicProperties props = new AMQP .BasicProperties (contentType , contentEncoding , headers , deliveryMode , priority , correlationId , replyTo , expiration , messageId , timestamp , type , userId , appId , clusterId );
178
185
return props ;
179
186
}
0 commit comments