在分散式系統架構中,子系統之間應該要遵守低耦合的設計原則,盡可能地減少它們之間的相依性,然而我們要如何才能達到解耦合?
一般來說我們會使用Message Queue 作為分散式系統架構中的一個解耦合手段,簡單說地說一下它的運作方式,一般會有發佈者、接收者這兩種角色,發佈者將Message送到Queue ,接收者可以訂閱Queue,以便接收發佈者所傳送過來的Message,這種設計模式也就是所謂的觀察者模式。
目前市面上已經有很多Message Queue的框架,如ActiveMQ、RabbitMQ和RocketMQ,
而我們今天來簡單的地介紹最知名也最多人使用的RabbitMQ框架,它是由Erlang 語言開發且基於AMQP (Advanced Message Queuing Protocol) ,因此包含訊息、佇列、路由、可靠性、易用性、擴展性、安全性重要特徵,同時RabbitMQ也支援多樣的用戶端程式語言如Python、Ruby、.NET、Java、JavaScript、C、PHP。
現在我們以簡單的程式碼來介紹一下,如何在.NET使用RabbitMQ Client來建立MQ,並且進行發送與收取訊息:
1. 安裝 RabbitMQ.Client。
2. 建立Queue後,接著綁定路由規則Exchange與RoutingKey。
ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "test-queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(exchange: "test-queue-direct", type: ExchangeType.Direct, durable: true, autoDelete: false);
channel.QueueBind(queue: "test-queue", exchange: "test-queue.direct", routingKey: "routing-1");
}
}
※ durable:表示Queue是否會被存儲在硬碟中。
※ exclusive:表示Queue只能被目前建立的連接使用,而且當連接關閉後Queue即被刪除。
※ autoDelete:表示當沒有Publisher和Consumer使用Queue時,Queue就會被自動刪除。
3. 使用BasicPublish方法發送訊息至Queue,其路由規則就是根據事先定義好的Exchange與RoutingKey進行發送。※ autoDelete:表示當沒有Publisher和Consumer使用Queue時,Queue就會被自動刪除。
ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
string message = "Hello World";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "test-queue.direct",
routingKey: "routing-1",
basicProperties: null,
body: body);
Console.WriteLine($"Send=>{message}");
}
}
4. 建立Consumer來接收Queue的訊息。 ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost"};
IConnection connection = factory.CreateConnection();
IModel channel = connection.CreateModel();
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
byte[] body = e.Body;
string message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received:{message}");
};
channel.BasicConsume(queue: "test-queue", autoAck: true, consumer: consumer);
※ 一般我們使用Consumer習慣將autoAck為true,若要自己控制可以透過BasicAck和BasicNack這兩個方法來控管。※ 為了防止Consumer一次接受過多訊息,可以使用basicQos來控管。
※ 若在Consumer的Received Callback含有非同步方法,請將Connection設為DispatchConsumersAsync = true,並且改用AsyncEventingBasicConsumer,以免碰上非預期的錯誤。
[1] https://www.rabbitmq.com/
留言
張貼留言