Skip to content

Commit e47c90d

Browse files
Use EventingBasicConsumer in tutorials
Fixes rabbitmq#58 and rabbitmq#59 (while we are at it).
1 parent f283572 commit e47c90d

File tree

11 files changed

+163
-157
lines changed

11 files changed

+163
-157
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
*/.ok
22
*/distribute-*.tar.gz
3+
*.zip
34
erlang/amqp_client*
45
erlang/rabbit_common*
56
python/venv

dotnet-visual-studio/1_Receive/Program.cs

+13-12
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,24 @@ class Program
88
public static void Main()
99
{
1010
var factory = new ConnectionFactory() { HostName = "localhost" };
11-
using( var connection = factory.CreateConnection() )
12-
using( var channel = connection.CreateModel() )
11+
using(var connection = factory.CreateConnection())
12+
using(var channel = connection.CreateModel())
1313
{
14-
channel.QueueDeclare( queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null );
14+
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
1515

16-
var consumer = new QueueingBasicConsumer( channel );
17-
channel.BasicConsume( queue: "hello", noAck: true, consumer: consumer );
16+
Console.WriteLine(" [*] Waiting for messages.");
1817

19-
Console.WriteLine( " [*] Waiting for messages. To exit press CTRL+C" );
20-
while( true )
18+
var consumer = new EventingBasicConsumer(channel);
19+
consumer.Received += (model, ea) =>
2120
{
22-
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
23-
2421
var body = ea.Body;
25-
var message = Encoding.UTF8.GetString( body );
26-
Console.WriteLine( " [x] Received {0}", message );
27-
}
22+
var message = Encoding.UTF8.GetString(body);
23+
Console.WriteLine(" [x] Received {0}", message);
24+
};
25+
channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer);
26+
27+
Console.WriteLine(" Press [enter] to exit.");
28+
Console.ReadLine();
2829
}
2930
}
3031
}

dotnet-visual-studio/2_Worker/Program.cs

+19-17
Original file line numberDiff line numberDiff line change
@@ -9,31 +9,33 @@ class Program
99
public static void Main()
1010
{
1111
var factory = new ConnectionFactory() { HostName = "localhost" };
12-
using( var connection = factory.CreateConnection() )
13-
using( var channel = connection.CreateModel() )
12+
using(var connection = factory.CreateConnection())
13+
using(var channel = connection.CreateModel())
1414
{
15-
channel.QueueDeclare( queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null );
15+
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
1616

17-
channel.BasicQos( prefetchSize: 0, prefetchCount: 1, global: false );
18-
var consumer = new QueueingBasicConsumer( channel );
19-
channel.BasicConsume( queue: "task_queue", noAck: false, consumer: consumer );
17+
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
2018

21-
Console.WriteLine( " [*] Waiting for messages. To exit press CTRL+C" );
22-
while( true )
23-
{
24-
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
19+
Console.WriteLine(" [*] Waiting for messages.");
2520

21+
var consumer = new EventingBasicConsumer(channel);
22+
consumer.Received += (model, ea) =>
23+
{
2624
var body = ea.Body;
27-
var message = Encoding.UTF8.GetString( body );
28-
Console.WriteLine( " [x] Received {0}", message );
25+
var message = Encoding.UTF8.GetString(body);
26+
Console.WriteLine(" [x] Received {0}", message);
27+
28+
int dots = message.Split('.').Length - 1;
29+
Thread.Sleep(dots * 1000);
2930

30-
int dots = message.Split( '.' ).Length - 1;
31-
Thread.Sleep( dots * 1000 );
31+
Console.WriteLine(" [x] Done");
3232

33-
Console.WriteLine( " [x] Done" );
33+
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
34+
};
35+
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
3436

35-
channel.BasicAck( deliveryTag: ea.DeliveryTag, multiple: false );
36-
}
37+
Console.WriteLine(" Press [enter] to exit.");
38+
Console.ReadLine();
3739
}
3840
}
3941
}

dotnet-visual-studio/3_ReceiveLogs/Program.cs

+13-14
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,25 @@ class Program
88
public static void Main()
99
{
1010
var factory = new ConnectionFactory() { HostName = "localhost" };
11-
using( var connection = factory.CreateConnection() )
12-
using( var channel = connection.CreateModel() )
11+
using(var connection = factory.CreateConnection())
12+
using(var channel = connection.CreateModel())
1313
{
14-
channel.ExchangeDeclare( exchange: "logs", type: "fanout" );
14+
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
1515

1616
var queueName = channel.QueueDeclare().QueueName;
17+
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
1718

18-
channel.QueueBind( queue: queueName, exchange: "logs", routingKey: "" );
19-
var consumer = new QueueingBasicConsumer( channel );
20-
channel.BasicConsume( queue: queueName, noAck: true, consumer: consumer );
21-
22-
Console.WriteLine( " [*] Waiting for logs. To exit press CTRL+C" );
23-
while( true )
19+
var consumer = new EventingBasicConsumer(channel);
20+
consumer.Received += (model, ea) =>
2421
{
25-
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
26-
2722
var body = ea.Body;
28-
var message = Encoding.UTF8.GetString( body );
29-
Console.WriteLine( " [x] {0}", message );
30-
}
23+
var message = Encoding.UTF8.GetString(body);
24+
Console.WriteLine(" [x] {0}", message);
25+
};
26+
channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
27+
28+
Console.WriteLine(" Press [enter] to exit.");
29+
Console.ReadLine();
3130
}
3231
}
3332
}

dotnet-visual-studio/4_ReceiveLogsDirect/Program.cs

+19-19
Original file line numberDiff line numberDiff line change
@@ -5,43 +5,43 @@
55

66
class Program
77
{
8-
public static void Main( string[] args )
8+
public static void Main(string[] args)
99
{
1010
var factory = new ConnectionFactory() { HostName = "localhost" };
11-
using( var connection = factory.CreateConnection() )
12-
using( var channel = connection.CreateModel() )
11+
using(var connection = factory.CreateConnection())
12+
using(var channel = connection.CreateModel())
1313
{
14-
channel.ExchangeDeclare( exchange: "direct_logs", type: "direct" );
14+
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
1515
var queueName = channel.QueueDeclare().QueueName;
1616

17-
if( args.Length < 1 )
17+
if(args.Length < 1)
1818
{
19-
Console.Error.WriteLine( "Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0] );
20-
Console.WriteLine( " Press [enter] to exit." );
19+
Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]);
20+
Console.WriteLine(" Press [enter] to exit.");
2121
Console.ReadLine();
2222
Environment.ExitCode = 1;
2323
return;
2424
}
2525

26-
foreach( var severity in args )
26+
foreach(var severity in args)
2727
{
28-
channel.QueueBind( queue: queueName, exchange: "direct_logs", routingKey: severity );
28+
channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity);
2929
}
3030

31-
Console.WriteLine( " [*] Waiting for messages. To exit press CTRL+C" );
31+
Console.WriteLine(" [*] Waiting for messages.");
3232

33-
var consumer = new QueueingBasicConsumer( channel );
34-
channel.BasicConsume( queue: queueName, noAck: true, consumer: consumer );
35-
36-
while( true )
33+
var consumer = new EventingBasicConsumer(channel);
34+
consumer.Received += (model, ea) =>
3735
{
38-
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
39-
4036
var body = ea.Body;
41-
var message = Encoding.UTF8.GetString( body );
37+
var message = Encoding.UTF8.GetString(body);
4238
var routingKey = ea.RoutingKey;
43-
Console.WriteLine( " [x] Received '{0}':'{1}'", routingKey, message );
44-
}
39+
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
40+
};
41+
channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
42+
43+
Console.WriteLine(" Press [enter] to exit.");
44+
Console.ReadLine();
4545
}
4646
}
4747
}

dotnet-visual-studio/5_ReceiveLogsTopic/Program.cs

+15-15
Original file line numberDiff line numberDiff line change
@@ -5,41 +5,41 @@
55

66
class Program
77
{
8-
public static void Main( string[] args )
8+
public static void Main(string[] args)
99
{
1010
var factory = new ConnectionFactory() { HostName = "localhost" };
11-
using( var connection = factory.CreateConnection() )
12-
using( var channel = connection.CreateModel() )
11+
using(var connection = factory.CreateConnection())
12+
using(var channel = connection.CreateModel())
1313
{
14-
channel.ExchangeDeclare( exchange: "topic_logs", type: "topic" );
14+
channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
1515
var queueName = channel.QueueDeclare().QueueName;
1616

17-
if( args.Length < 1 )
17+
if(args.Length < 1)
1818
{
19-
Console.Error.WriteLine( "Usage: {0} [binding_key...]", Environment.GetCommandLineArgs()[0] );
20-
Console.WriteLine( " Press [enter] to exit." );
19+
Console.Error.WriteLine("Usage: {0} [binding_key...]", Environment.GetCommandLineArgs()[0]);
20+
Console.WriteLine(" Press [enter] to exit.");
2121
Console.ReadLine();
2222
Environment.ExitCode = 1;
2323
return;
2424
}
2525

26-
foreach( var bindingKey in args )
26+
foreach(var bindingKey in args)
2727
{
28-
channel.QueueBind( queue: queueName, exchange: "topic_logs", routingKey: bindingKey );
28+
channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: bindingKey);
2929
}
3030

31-
Console.WriteLine( " [*] Waiting for messages. To exit press CTRL+C" );
31+
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
3232

33-
var consumer = new QueueingBasicConsumer( channel );
34-
channel.BasicConsume( queue: queueName, noAck: true, consumer: consumer );
33+
var consumer = new QueueingBasicConsumer(channel);
34+
channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
3535

36-
while( true )
36+
while(true)
3737
{
3838
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
3939
var body = ea.Body;
40-
var message = Encoding.UTF8.GetString( body );
40+
var message = Encoding.UTF8.GetString(body);
4141
var routingKey = ea.RoutingKey;
42-
Console.WriteLine( " [x] Received '{0}':'{1}'", routingKey, message );
42+
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
4343
}
4444
}
4545
}

dotnet/Receive.cs

+12-13
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,22 @@ class Receive
88
public static void Main()
99
{
1010
var factory = new ConnectionFactory() { HostName = "localhost" };
11-
using( var connection = factory.CreateConnection() )
12-
using( var channel = connection.CreateModel() )
11+
using(var connection = factory.CreateConnection())
12+
using(var channel = connection.CreateModel())
1313
{
14-
channel.QueueDeclare( queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null );
14+
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
1515

16-
var consumer = new QueueingBasicConsumer( channel );
17-
channel.BasicConsume( queue: "hello", noAck: true, consumer: consumer );
18-
19-
Console.WriteLine( " [*] Waiting for messages. To exit press CTRL+C" );
20-
while( true )
16+
var consumer = new EventingBasicConsumer(channel);
17+
consumer.Received += (model, ea) =>
2118
{
22-
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
23-
2419
var body = ea.Body;
25-
var message = Encoding.UTF8.GetString( body );
26-
Console.WriteLine( " [x] Received {0}", message );
27-
}
20+
var message = Encoding.UTF8.GetString(body);
21+
Console.WriteLine(" [x] Received {0}", message);
22+
};
23+
channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer);
24+
25+
Console.WriteLine(" Press [enter] to exit.");
26+
Console.ReadLine();
2827
}
2928
}
3029
}

dotnet/ReceiveLogs.cs

+15-14
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,27 @@ class ReceiveLogs
88
public static void Main()
99
{
1010
var factory = new ConnectionFactory() { HostName = "localhost" };
11-
using( var connection = factory.CreateConnection() )
12-
using( var channel = connection.CreateModel() )
11+
using(var connection = factory.CreateConnection())
12+
using(var channel = connection.CreateModel())
1313
{
14-
channel.ExchangeDeclare( exchange: "logs", type: "fanout" );
14+
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
1515

1616
var queueName = channel.QueueDeclare().QueueName;
17+
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
1718

18-
channel.QueueBind( queue: queueName, exchange: "logs", routingKey: "" );
19-
var consumer = new QueueingBasicConsumer( channel );
20-
channel.BasicConsume( queue: queueName, noAck: true, consumer: consumer );
21-
22-
Console.WriteLine( " [*] Waiting for logs. To exit press CTRL+C" );
23-
while( true )
19+
Console.WriteLine(" [*] Waiting for logs.");
20+
21+
var consumer = new EventingBasicConsumer(channel);
22+
consumer.Received += (model, ea) =>
2423
{
25-
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
26-
2724
var body = ea.Body;
28-
var message = Encoding.UTF8.GetString( body );
29-
Console.WriteLine( " [x] {0}", message );
30-
}
25+
var message = Encoding.UTF8.GetString(body);
26+
Console.WriteLine(" [x] {0}", message);
27+
};
28+
channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
29+
30+
Console.WriteLine(" Press [enter] to exit.");
31+
Console.ReadLine();
3132
}
3233
}
3334
}

dotnet/ReceiveLogsDirect.cs

+18-18
Original file line numberDiff line numberDiff line change
@@ -8,40 +8,40 @@ class ReceiveLogsDirect
88
public static void Main( string[] args )
99
{
1010
var factory = new ConnectionFactory() { HostName = "localhost" };
11-
using( var connection = factory.CreateConnection() )
12-
using( var channel = connection.CreateModel() )
11+
using(var connection = factory.CreateConnection())
12+
using(var channel = connection.CreateModel())
1313
{
14-
channel.ExchangeDeclare( exchange: "direct_logs", type: "direct" );
14+
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
1515
var queueName = channel.QueueDeclare().QueueName;
1616

17-
if( args.Length < 1 )
17+
if(args.Length < 1)
1818
{
19-
Console.Error.WriteLine( "Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0] );
20-
Console.WriteLine( " Press [enter] to exit." );
19+
Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]);
20+
Console.WriteLine(" Press [enter] to exit.");
2121
Console.ReadLine();
2222
Environment.ExitCode = 1;
2323
return;
2424
}
2525

26-
foreach( var severity in args )
26+
foreach(var severity in args)
2727
{
28-
channel.QueueBind( queue: queueName, exchange: "direct_logs", routingKey: severity );
28+
channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity);
2929
}
3030

31-
Console.WriteLine( " [*] Waiting for messages. To exit press CTRL+C" );
31+
Console.WriteLine(" [*] Waiting for messages.");
3232

33-
var consumer = new QueueingBasicConsumer( channel );
34-
channel.BasicConsume( queue: queueName, noAck: true, consumer: consumer );
35-
36-
while( true )
33+
var consumer = new EventingBasicConsumer(channel);
34+
consumer.Received += (model, ea) =>
3735
{
38-
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
39-
4036
var body = ea.Body;
41-
var message = Encoding.UTF8.GetString( body );
37+
var message = Encoding.UTF8.GetString(body);
4238
var routingKey = ea.RoutingKey;
43-
Console.WriteLine( " [x] Received '{0}':'{1}'", routingKey, message );
44-
}
39+
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
40+
};
41+
channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
42+
43+
Console.WriteLine(" Press [enter] to exit.");
44+
Console.ReadLine();
4545
}
4646
}
4747
}

0 commit comments

Comments
 (0)