跳到主要內容

分散式系統的好幫手-RabbitMQ (上)


在分散式系統架構中,子系統之間應該要遵守低耦合的設計原則,盡可能地減少它們之間的相依性,然而我們要如何才能達到解耦合?
一般來說我們會使用Message Queue 作為分散式系統架構中的一個解耦合手段,簡單說地說一下它的運作方式,一般會有發佈者、接收者這兩種角色,發佈者將Message送到Queue ,接收者可以訂閱Queue,以便接收發佈者所傳送過來的Message,這種設計模式也就是所謂的觀察者模式。

目前市面上已經有很多Message Queue的框架,如ActiveMQ、RabbitMQ和RocketMQ,
而我們今天來簡單的地介紹最知名也最多人使用的RabbitMQ框架,它是由Erlang 語言開發且基於AMQP (Advanced Message Queuing Protocol) ,因此包含訊息、佇列、路由、可靠性、易用性、擴展性、安全性重要特徵,同時RabbitMQ也支援多樣的用戶端程式語言如Python、Ruby、.NET、Java、JavaScript、C、PHP。

現在我們以簡單的程式碼來介紹一下,如何在.NET使用RabbitMQ Client來建立MQ,並且進行發送與收取訊息:

1. 安裝 RabbitMQ.Client
2. 建立Queue後,接著綁定路由規則Exchange與RoutingKey
            ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "test-queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    channel.ExchangeDeclare(exchange: "test-queue-direct", type: ExchangeType.Direct, durable: true, autoDelete: false);
                    channel.QueueBind(queue: "test-queue", exchange: "test-queue.direct", routingKey: "routing-1");
                }
            }
※ durable:表示Queue是否會被存儲在硬碟中
※ exclusive:表示Queue只能被目前建立的連接使用,而且當連接關閉後Queue即被刪除
※ autoDelete:表示當沒有Publisher和Consumer使用Queue時,Queue就會被自動刪除。

3. 使用BasicPublish方法發送訊息至Queue,其路由規則就是根據事先定義好的Exchange與RoutingKey進行發送
            ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    string message = "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "test-queue.direct",
                                         routingKey: "routing-1",
                                         basicProperties: null,
                                         body: body);
                    Console.WriteLine($"Send=>{message}");
                }
            }
4. 建立Consumer來接收Queue的訊息
            ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost"};
            IConnection connection = factory.CreateConnection();
            IModel channel = connection.CreateModel();
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                byte[] body = e.Body;
                string message = Encoding.UTF8.GetString(body);
                Console.WriteLine($"Received:{message}");
            };
            channel.BasicConsume(queue: "test-queue", autoAck: true, consumer: consumer);
※ 一般我們使用Consumer習慣將autoAck為true,若要自己控制可以透過BasicAck和BasicNack這兩個方法來控管。
※ 為了防止Consumer一次接受過多訊息,可以使用basicQos來控管。
※ 若在Consumer的Received Callback含有非同步方法,請將Connection設為DispatchConsumersAsync = true,並且改用AsyncEventingBasicConsumer,以免碰上非預期的錯誤。

參考文獻
[1]  https://www.rabbitmq.com/

留言

這個網誌中的熱門文章

reCAPTCHA v3 的簡單教學

Google於10/29正式發布reCAPTCHA v3 API,該版本中最大的亮點就是透過分析使用者瀏覽網站的行為,以辨識使用者是否為機器人。換句話說,使用者再也不用一直回答問題和不停地點選圖片,所以說以後再也不用聽到客戶抱怨老是點錯圖片,果然科技始於人性啊~ 另外reCAPTCHA v3 的使用方式也非常簡單,只要短短幾個步驟,就能輕鬆地將reCAPTCHA v3運用在專案中。 1. 首先,至  https://www.google.com/recaptcha/admin  替你的網站註冊一個reCAPTCHA。 2. 註冊後,將會得到Site Key和Secret Key,其中Site Key是給前端使用,而Secret Key則是給後端使用。 3. 在前端HTML head的部分加入reCAPTCHA的api.js。 <script src='https://www.google.com/recaptcha/api.js?render=6Lc5vHgUAAAAAKHxlH0FdDJdA2-8yfzpoMDLIWPc'></script> 4. 接著在body的部分呼叫reCAPTCHA API向Google取得token後,再將token送至後端進行驗證,範例如下: <script> grecaptcha.ready(function () { console.log('1. grecaptcha.ready'); console.log('2. grecaptcha.execute("6Lc5vHgUAAAAAKHxlH0FdDJdA2-8yfzpoMDLIWPc", { action: "@Url.Action("VerifyBot", "Account")" })'); grecaptcha.execute('6Lc5vHgUAAAAAKHxlH0FdDJdA2-8yfzpoMDLIWPc', { action: '@...

MongoDB: Save Files Using GridFS

過去我們在使用File System,我們必須自己處理備份、複製、擴充的問題;如今我們可以我們可以使用MongoDB作為File DB,它可以利用Replica和Sharing的機制幫助我們解決備份、複製、動態擴充、分散式儲存、自動平衡、故障回復的問題,且效能優於RDBMS。若真要說MongoDB這類NoSql的缺點就是它不能處理Transaction。 在MongoDB中對大於16MB BSON Document(如:圖片、音頻、影片等)是使用GridFS的方式做儲存。 GridFS是一種在MongoDB中存儲大二進製文件的機制,GridFS 會將文件分割成多個Chunk(預設256 KB),而GridFS使用fs.files和fs.chunks等兩個Collection來存儲檔案資料,其中fs.files負責存放文件的名稱、大小、上傳的時間等資訊,而fs.chunks則是負責存放經過分割後的Chunks,其優點是透過分割儲存的方式能夠快速讀取檔案中任何的片段。 fs.files Collection { "_id" : , // 檔案的Unique ID "filename": data_string, //檔案名稱 "length" : data_number, // 檔案大小 "chunkSize" : data_number, // chunk大小,預設256k "uploadDate" : data_date, // 儲存時間 "md5" : data_string //檔案的md5值 } fs.chunks Collection { "_id" : , // 檔案chunk的Unique ID "files_id" : , //對應檔案的Unique ID "n" : chunk_number, // 檔案chunk的數量 "data" : data_binary, // 以二進為儲存檔案 } 在下面例子,我們將簡單地示範使用.NET MongoDB Driver來存取與操作MongoDB的G...

Upload large files to web api using resumable.js

最近工作剛好遇到大檔案(> 2GB)上傳的需求,大檔案的上傳方式與平常處裡小檔案上傳當方式有些不同,對於大檔案上傳我們需要額外考量三個問題 : 1. 上傳逾時。 2  IIS 不支援超過2BG 的檔案上傳 3. 檔案在上傳過程中中斷。 上述這三個問題可以透過multipart/form-data和chuck 的方式,將檔案分割成無數個chunk,且給予每個chunk編號,如果傳送失敗就重新傳送chunk ,進而實現斷點續傳的功能。 目前市面三個有支援大檔案與斷點續傳的JavaScript Library分別有 Resumable.js 、 Dropzone.js 、jQuery Ajax File Upload,且它們底層皆是使用HTML5 File API,此外它們的作者都有提供Server Side Implementation Sample,而且連TypeScript的定義檔都有,真的都是佛心來著。 接著我們開始介紹如何使用Resumable.js 上傳檔案至Web API(.NET) : Server Side (Web API)的主要流程: 1. 首先,在Web API需要使用MultipartFormDataStreamProvider來取得每一次Resumable.js 所傳過來的chunk,而chunk由FormData和FileData組成,其中FormData記載Resumable.js所定義的chuck欄位,而FileData則存放chunk的檔案內容。 2. 接著從FormData得到這些Resumable.js所定義的chuck欄位resumableFilename、resumableIdentifier、resumableTotalChunks、resumableChunkNumber欄位,並且根據這些欄位我們將chunk儲存在Upload 目錄下。 3. 等待所有chunk都到位,將合併這些chunk並組成完整的檔案。 FileStorageController Class [RoutePrefix("api/FileStorage")] public class FileStorageController : ApiController { ...