1. Fanout Exchange: 就像是廣播一樣,發送時會將資料送到有綁定Fanout Exchange的Queue中。
例如:發送資料到fanout.exchange,則queue-1和queue-2都會收到資料。
public void FanoutPublisher()
{
ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "queue-1", durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "queue-2", durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(exchange: "fanout.exchange", type: ExchangeType.Fanout, durable: true, autoDelete: false);
channel.QueueBind(queue: "queue-1", exchange: "fanout.exchange", routingKey: null);
channel.QueueBind(queue: "queue-2", exchange: "fanout.exchange", routingKey: null);
string message = "Hello World";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "fanout.exchange",
routingKey: null,
basicProperties: null,
body: body);
}
}
}
2. Direct Exchange: 需要綁定routingKey作為路由規則,發送時會依據routingKey發送到特定的Queue。
例如:發送資料到direct.exchange且routingKey為json,這時只有queue-2會收到資料,這是因為它綁定的routingKey為json。
public void DirectPublisher()
{
ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost", RequestedHeartbeat = 60, AutomaticRecoveryEnabled = true };
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "queue-1", durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "queue-2", durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(exchange: "direct.exchange", type: ExchangeType.Direct, durable: true, autoDelete: false);
channel.QueueBind(queue: "queue-1", exchange: "direct.exchange", routingKey: "xml");
channel.QueueBind(queue: "queue-2", exchange: "direct.exchange", routingKey: "json");
string message = "Hello World";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct.exchange",
routingKey: "xml",
basicProperties: null,
body: body);
channel.BasicPublish(exchange: "direct.exchange",
routingKey: "json",
basicProperties: null,
body: body);
}
}
}
3. Topic Exchange: 與Direct Exchange一樣需要綁定routingKey作為路由規則,發送時會依據routingKey發送到特定的Queue,但不同的是Topic Exchange可以用#和*來匹配Queue,其中#代表可以允許有多個單字存在或不存在;*只與允許一個單字存在。 例如:發送資料到topic.exchange且routingKey為data.json,queue-1和queue-2都會收到資料。 倘若發送資料到topic.exchange且routingKey為my.data.json,這時只有queue-2會收到資料,這是因為它綁定的routingKey為#.json可以允許有多個單字存在。
public void TopicPublisher()
{
ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "queue-1", durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "queue-2", durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(exchange: "topic.exchange", type: ExchangeType.Topic, durable: true, autoDelete: false);
channel.QueueBind(queue: "queue-1", exchange: "topic.exchange", routingKey: "*.json");
channel.QueueBind(queue: "queue-2", exchange: "topic.exchange", routingKey: "#.json");
string message = "Hello World";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "topic.exchange",
routingKey: "data.json",
basicProperties: null,
body: body);
channel.BasicPublish(exchange: "topic.exchange",
routingKey: "my.data.json",
basicProperties: null,
body: body);
}
}
}
4. Header Exchange: 比起上述三種Exchange,Header Exchange是最有彈性的,因為它可以自行定義Header內容作為路由規則,且還可以用x-match做全部或部分的比對。
例如:發送資料到header.exchange且header內容為 { "type", "report" },雖然queue-1和queue-2都有{ "type", "report" },但是queue-1的x-match為all,因此只有queue-2可以收到資料。
public void HeaderPublisher()
{
ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "queue-1", durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "queue-2", durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(exchange: "header.exchange", type: ExchangeType.Headers, durable: true, autoDelete: false);
Dictionary xmlHeader = new Dictionary{
{ "format", "xml" },
{ "type", "report" },
{ "x-match", "all" }
};
channel.QueueBind(queue: "queue-1", exchange: "header.exchange", routingKey: null, arguments: xmlHeader);
Dictionary jsonHeader = new Dictionary {
{ "format", "json" },
{ "type", "report" },
{ "x-match", "any" }
};
channel.QueueBind(queue: "queue-2", exchange: "header.exchange", routingKey: null, arguments: jsonHeader);
string message = "Hello World";
var body = Encoding.UTF8.GetBytes(message);
Dictionary myHeader = new Dictionary
{
{ "type", "report" }
};
IBasicProperties properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = myHeader;
channel.BasicPublish(exchange: "header.exchange",
routingKey: null,
basicProperties: properties,
body: body);
}
}
}
參考文獻
[1] https://www.rabbitmq.com/
留言
張貼留言