跳到主要內容

分散式系統的好幫手-RabbitMQ (下)


在上次的文章中間單的介紹RabbitMQ 如何發送和接收訊息,今天我們要來介紹一下RabbitMQ的四種 Exchange以及其路由規則。

1. Fanout Exchange: 就像是廣播一樣,發送時會將資料送到有綁定Fanout Exchange的Queue中。 例如:發送資料到fanout.exchange,則queue-1和queue-2都會收到資料。
 
        public void FanoutPublisher()
        {
            ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "queue-1", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "queue-2", durable: false, exclusive: false, autoDelete: false, arguments: null);

                    channel.ExchangeDeclare(exchange: "fanout.exchange", type: ExchangeType.Fanout, durable: true, autoDelete: false);

                    channel.QueueBind(queue: "queue-1", exchange: "fanout.exchange", routingKey: null);
                    channel.QueueBind(queue: "queue-2", exchange: "fanout.exchange", routingKey: null);

                    string message = "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(exchange: "fanout.exchange",
                                         routingKey: null,
                                         basicProperties: null,
                                         body: body);
                }
            }

        }


2. Direct Exchange: 需要綁定routingKey作為路由規則,發送時會依據routingKey發送到特定的Queue。 例如:發送資料到direct.exchange且routingKey為json,這時只有queue-2會收到資料,這是因為它綁定的routingKey為json。
 
        public void DirectPublisher()
        {
            ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost", RequestedHeartbeat = 60, AutomaticRecoveryEnabled = true };
            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "queue-1", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "queue-2", durable: false, exclusive: false, autoDelete: false, arguments: null);

                    channel.ExchangeDeclare(exchange: "direct.exchange", type: ExchangeType.Direct, durable: true, autoDelete: false);

                    channel.QueueBind(queue: "queue-1", exchange: "direct.exchange", routingKey: "xml");
                    channel.QueueBind(queue: "queue-2", exchange: "direct.exchange", routingKey: "json");

                    string message = "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(exchange: "direct.exchange",
                                         routingKey: "xml",
                                         basicProperties: null,
                                         body: body);

                    channel.BasicPublish(exchange: "direct.exchange",
                                routingKey: "json",
                                basicProperties: null,
                                body: body);
                }
            }

        }


3. Topic Exchange: 與Direct Exchange一樣需要綁定routingKey作為路由規則,發送時會依據routingKey發送到特定的Queue,但不同的是Topic Exchange可以用#和*來匹配Queue,其中#代表可以允許有多個單字存在或不存在;*只與允許一個單字存在。 例如:發送資料到topic.exchange且routingKey為data.json,queue-1和queue-2都會收到資料。 倘若發送資料到topic.exchange且routingKey為my.data.json,這時只有queue-2會收到資料,這是因為它綁定的routingKey為#.json可以允許有多個單字存在。
 
        public void TopicPublisher()
        {
            ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "queue-1", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "queue-2", durable: false, exclusive: false, autoDelete: false, arguments: null);

                    channel.ExchangeDeclare(exchange: "topic.exchange", type: ExchangeType.Topic, durable: true, autoDelete: false);

                    channel.QueueBind(queue: "queue-1", exchange: "topic.exchange", routingKey: "*.json");
                    channel.QueueBind(queue: "queue-2", exchange: "topic.exchange", routingKey: "#.json");

                    string message = "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(exchange: "topic.exchange",
                                         routingKey: "data.json",
                                         basicProperties: null,
                                         body: body);

                    channel.BasicPublish(exchange: "topic.exchange",
                                routingKey: "my.data.json",
                                basicProperties: null,
                                body: body);
                }
            }
        }



4. Header Exchange: 比起上述三種Exchange,Header Exchange是最有彈性的,因為它可以自行定義Header內容作為路由規則,且還可以用x-match做全部或部分的比對。 例如:發送資料到header.exchange且header內容為 { "type", "report" },雖然queue-1和queue-2都有{ "type", "report" },但是queue-1的x-match為all,因此只有queue-2可以收到資料。
 
        public void HeaderPublisher()
        {
            ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "queue-1", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "queue-2", durable: false, exclusive: false, autoDelete: false, arguments: null);

                    channel.ExchangeDeclare(exchange: "header.exchange", type: ExchangeType.Headers, durable: true, autoDelete: false);

                    Dictionary xmlHeader = new Dictionary{
                        { "format", "xml" },
                        { "type", "report" },
                        { "x-match", "all" }
                    };
                    channel.QueueBind(queue: "queue-1", exchange: "header.exchange", routingKey: null, arguments: xmlHeader);

                    Dictionary jsonHeader = new Dictionary {
                        { "format", "json" },
                        { "type", "report" },
                        { "x-match", "any" }
                    };
                    channel.QueueBind(queue: "queue-2", exchange: "header.exchange", routingKey: null, arguments: jsonHeader);

                    string message = "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);


                    Dictionary myHeader = new Dictionary
                    {
                        { "type", "report" }
                    };

                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    properties.Headers = myHeader;
                    channel.BasicPublish(exchange: "header.exchange",
                                         routingKey: null,
                                         basicProperties: properties,
                                         body: body);
                }
            }
        }



參考文獻
[1]  https://www.rabbitmq.com/

留言

這個網誌中的熱門文章

reCAPTCHA v3 的簡單教學

Google於10/29正式發布reCAPTCHA v3 API,該版本中最大的亮點就是透過分析使用者瀏覽網站的行為,以辨識使用者是否為機器人。換句話說,使用者再也不用一直回答問題和不停地點選圖片,所以說以後再也不用聽到客戶抱怨老是點錯圖片,果然科技始於人性啊~ 另外reCAPTCHA v3 的使用方式也非常簡單,只要短短幾個步驟,就能輕鬆地將reCAPTCHA v3運用在專案中。 1. 首先,至  https://www.google.com/recaptcha/admin  替你的網站註冊一個reCAPTCHA。 2. 註冊後,將會得到Site Key和Secret Key,其中Site Key是給前端使用,而Secret Key則是給後端使用。 3. 在前端HTML head的部分加入reCAPTCHA的api.js。 <script src='https://www.google.com/recaptcha/api.js?render=6Lc5vHgUAAAAAKHxlH0FdDJdA2-8yfzpoMDLIWPc'></script> 4. 接著在body的部分呼叫reCAPTCHA API向Google取得token後,再將token送至後端進行驗證,範例如下: <script> grecaptcha.ready(function () { console.log('1. grecaptcha.ready'); console.log('2. grecaptcha.execute("6Lc5vHgUAAAAAKHxlH0FdDJdA2-8yfzpoMDLIWPc", { action: "@Url.Action("VerifyBot", "Account")" })'); grecaptcha.execute('6Lc5vHgUAAAAAKHxlH0FdDJdA2-8yfzpoMDLIWPc', { action: '@

MongoDB: Save Files Using GridFS

過去我們在使用File System,我們必須自己處理備份、複製、擴充的問題;如今我們可以我們可以使用MongoDB作為File DB,它可以利用Replica和Sharing的機制幫助我們解決備份、複製、動態擴充、分散式儲存、自動平衡、故障回復的問題,且效能優於RDBMS。若真要說MongoDB這類NoSql的缺點就是它不能處理Transaction。 在MongoDB中對大於16MB BSON Document(如:圖片、音頻、影片等)是使用GridFS的方式做儲存。 GridFS是一種在MongoDB中存儲大二進製文件的機制,GridFS 會將文件分割成多個Chunk(預設256 KB),而GridFS使用fs.files和fs.chunks等兩個Collection來存儲檔案資料,其中fs.files負責存放文件的名稱、大小、上傳的時間等資訊,而fs.chunks則是負責存放經過分割後的Chunks,其優點是透過分割儲存的方式能夠快速讀取檔案中任何的片段。 fs.files Collection { "_id" : , // 檔案的Unique ID "filename": data_string, //檔案名稱 "length" : data_number, // 檔案大小 "chunkSize" : data_number, // chunk大小,預設256k "uploadDate" : data_date, // 儲存時間 "md5" : data_string //檔案的md5值 } fs.chunks Collection { "_id" : , // 檔案chunk的Unique ID "files_id" : , //對應檔案的Unique ID "n" : chunk_number, // 檔案chunk的數量 "data" : data_binary, // 以二進為儲存檔案 } 在下面例子,我們將簡單地示範使用.NET MongoDB Driver來存取與操作MongoDB的G

Upload large files to web api using resumable.js

最近工作剛好遇到大檔案(> 2GB)上傳的需求,大檔案的上傳方式與平常處裡小檔案上傳當方式有些不同,對於大檔案上傳我們需要額外考量三個問題 : 1. 上傳逾時。 2  IIS 不支援超過2BG 的檔案上傳 3. 檔案在上傳過程中中斷。 上述這三個問題可以透過multipart/form-data和chuck 的方式,將檔案分割成無數個chunk,且給予每個chunk編號,如果傳送失敗就重新傳送chunk ,進而實現斷點續傳的功能。 目前市面三個有支援大檔案與斷點續傳的JavaScript Library分別有 Resumable.js 、 Dropzone.js 、jQuery Ajax File Upload,且它們底層皆是使用HTML5 File API,此外它們的作者都有提供Server Side Implementation Sample,而且連TypeScript的定義檔都有,真的都是佛心來著。 接著我們開始介紹如何使用Resumable.js 上傳檔案至Web API(.NET) : Server Side (Web API)的主要流程: 1. 首先,在Web API需要使用MultipartFormDataStreamProvider來取得每一次Resumable.js 所傳過來的chunk,而chunk由FormData和FileData組成,其中FormData記載Resumable.js所定義的chuck欄位,而FileData則存放chunk的檔案內容。 2. 接著從FormData得到這些Resumable.js所定義的chuck欄位resumableFilename、resumableIdentifier、resumableTotalChunks、resumableChunkNumber欄位,並且根據這些欄位我們將chunk儲存在Upload 目錄下。 3. 等待所有chunk都到位,將合併這些chunk並組成完整的檔案。 FileStorageController Class [RoutePrefix("api/FileStorage")] public class FileStorageController : ApiController {