Brief Introduction of SocketPro High Performance and Scalable Persistent Message Queue

View: 569    Dowload: 0   Comment: 0   Post by: admin   Category: Javascript   Fields: Other

Continuous in-line request/result batching, real-time stream sending/processing, asynchronous data transferring and parallel computation for best performance and scalability


Persistent message queue allows applications running on separate machines/processes to communicate in a failsafe manner. A message queue is a temporary storage location or file from which messages can be saved and read reliably, as and when conditions permit. Unlike sockets and other common channels that require direct connections always exist, persistent message queue enables communication among applications which may not always be connected. There are many persistent message queues implemented in own ways. SocketPro comes with an extremely high performance persistent message queue for you to freely reuse.

Both SocketPro client and server core libraries are internally implemented with persistent message queue. Its client queue is used to back up requests so that all requests can be resent to a server for processing in case the server is not accessible for whatever reasons such as server power-off, server application down, network off and so on. Essentially, client queue is used as a tool for fault auto recovery to increase application stability and reduction of development complexity as shown at the directory of socketPro/samples/auto_recovery/(test_cplusplus|test_java|test_python|test_sharp) after cloning source codes from https://github.com/udaparts/socketpro.

This article is focused on SocketPro server side persistent message queue. It is noted that precompiled SocketPro server side library of persistent message queue is completely free to you with open source codes which are extremely simple and understandable. You can also rely on the open source codes to extend them for your complex needs.

Source Codes and Samples

All related source codes and samples are located at https://github.com/udaparts/socketpro. After cloning it into your computer by GIT, pay attention to the subdirectory uasyncqueue inside the directory socketpro/samples/module_sample. You can see these samples are created from .NET, C/C++, Java and Python development environments. They can be compiled and run on either Linux or window platforms. SocketPro comes with a pre-compiled system library uasyncqueue, which is located at directories socketpro/bin/win and socketpro/bin/linux for both Windows and Linux variants, respectively. In addition, you can figure out how to load the SocketPro queue service into a server application with your familiar development environment by looking at tutorial sample all_servers at the directory socketpro/tutorials/(cplusplus|csharp|vbnet|java/src)/all_servers. However, we only use C# client code (socketpro/samples/module_sample/uasyncqueue /test_csahrp) in this article for explanations.

You should distribute these system libraries inside the directory socketpro/bin into your system directory before running these sample applications. In regards to SocketPro communication framework, you may also refer to its development guide documentation at socketpro/doc/SocketPro development guide.pdf.

Main Function

SocketPro is written from bottom to support parallel computation by use of one or more pools of non-blocking sockets. Each of pools may be made of one or more threads, and each of the threads hosts one or more non-blocking sockets at client side. To increase scalability, you can create one or more pools having multiple non-block sockets that are connected to different queue servers so that you can send messages for queuing in parallel style. However, we just use one pool for demonstration clarity here. Further, the pool is only made of one thread and one socket for this sample at client side as shown in the below code snippet 1.

static void Main(string[] args) {
    Console.WriteLine("Remote host: "); string host = Console.ReadLine();
    CConnectionContext cc = new CConnectionContext
                            (host, 20901, "async_queue_client", "pwd_for_async_queue");
    using (CSocketPool<CAsyncQueue> spAq = new CSocketPool<CAsyncQueue>()) {
        //spAq.QueueName = "aq_backup"; //uncomment for message no loss
                                        //by use of local message queue
        if (!spAq.StartSocketPool(cc, 1, 1)) {
            Console.WriteLine("Failed in connecting to remote async queue server,
                               and press any key to close the application ......");
            Console.Read(); return;
        CAsyncQueue aq = spAq.Seek(); //CAsyncQueue aq = spAq.SeekByQueue();
        //Optionally, you can enqueue messages with transaction style
        //by calling the methods StartQueueTrans and EndQueueTrans in pair
        aq.StartQueueTrans(TEST_QUEUE_KEY, (errCode) => {
            //error code could be one of CAsyncQueue.QUEUE_OK,
            //CAsyncQueue.QUEUE_TRANS_ALREADY_STARTED, ......
        //test manual message batching
        using (CScopeUQueue sb = new CScopeUQueue()) {
            CUQueue q = sb.UQueue;
            CAsyncQueue.BatchMessage(idMessage3, "Hello", "World", q);
            CAsyncQueue.BatchMessage(idMessage4, true, 234.456, "MyTestWhatever", q);
            aq.EnqueueBatch(TEST_QUEUE_KEY, q, (res) => {
                System.Diagnostics.Debug.Assert(res == 2);
        TestDequeue(aq); aq.WaitAll();
        //get a queue message count and queue file size with default option oMemoryCached
        aq.FlushQueue(TEST_QUEUE_KEY, (messageCount, fileSize) => {
            Console.WriteLine("Total message count={0},
                              queue file size={1}", messageCount, fileSize);
        aq.GetKeys((keys) => {
        aq.CloseQueue(TEST_QUEUE_KEY, (errCode) => {
            //error code could be one of CAsyncQueue.QUEUE_OK,
            //CAsyncQueue.QUEUE_TRANS_ALREADY_STARTED, ......
        Console.WriteLine("Press any key to close the application ......"); Console.Read();
Code snippet 1: Main function for demonstration of use of SocketPro persistent message queue at client side

Starting one socket pool: The above code snippet 1 starts one socket pool which only has one worker thread that only hosts one non-blocking socket (if (!spAq.StartSocketPool(cc, 1, 1))) for demonstration clarity by use of one instance of connection context. It is noted that you can create multiple pools within one client application if necessary. Afterwards, we get one asynchronous CAsyncQueue handler (CAsyncQueue aq = spAq.Seek();).

Streaming message: We can send individual messages onto a server for saving in stream style without batching at client side (TestEnqueue(aq);). We are going to talk with details in a new section TestEnqueue.

Manual message batching: When there are many small messages to be sent for saving, these small messages will require very much CPU costs at both client and server sides because of thread synchronization, function processing, SocketPro internal inline batching as well as others. To reduce these costs, we can batch these small messages into one bigger chunk, and send them as one larger unit to server for saving (using (CScopeUQueue sb = new CScopeUQueue()) { ....}). This is a way to improve message en-queue performance, but it also increases latency because it requires a time interval, which is usually more than 1 millisecond, for collecting an enough number of small messages before manual batching. Also, it requires more codes. It is NOTrecommended with SocketPro as long as either performance of streaming message queues meets your needs or message sizes are not very small.

Saving message in transaction style: SocketPro persistent message queue supports saving messages in transaction style. To use this feature, you have to call the methods StartQueueTrans and EndQueueTrans in pairs as shown in the above Code snippet 1. It is noted that the total size of batched messages shouldn’t be over four gig bytes.

Reading messages in a queue file from multiple consumers: Certainly, you can read messages from a queue (TestDequeue(aq);). We’ll elaborate it more in the coming section TestDequeue in detail. It is noted that one SocketPro queue supports message writing from multiple providers and message reading from multiple consumers simultaneously at the same time. Just for your information, many other queue implementations don’t support multiple consumers on one queue file.

Scalability: A client is able to create a pool that has multiple sockets connected to different server queue machines. A client is able to use the pool method Seek or SeekByQueue, and distribute messages onto different servers for saving. Don’t be fooled by this sample code because the demonstration is designed for clarity and beginner.

No message loss: Message saving requires transferring messages from client or provider to a message queue server. The server and network may be down for many possible reasons. Therefore, messages could be lost without your care by extra coding. You can prevent it with SocketPro easily by use of client or local message queue for backing up these messages before putting them on wire (spAq.QueueName = "aq_backup";). In case a server or network is down, SocketPro can resend messages that are backed up in a local or client message queue file when a queue server application is re-accessible.

Other functionalities: SocketPro persistent message queue provides other methods to check the count of messages, the size of a queue file and keys to different message queues as well as closing a queue as shown at the end of the above code snippet 1.


The function, an example for en-queuing messages, is simple as shown in the below code snippet 2.

static bool TestEnqueue(CAsyncQueue aq) {
        bool ok = true; Console.WriteLine("Going to enqueue 1024 messages ......");
        for (int n = 0; n < 1024; ++n) {
            string str = n + " Object test";
            ushort idMessage;
            switch (n % 3) {
                case 0:
                    idMessage = idMessage0;
                case 1:
                    idMessage = idMessage1;
                    idMessage = idMessage2;
            //en-queue two unicode strings and one int
            ok = aq.Enqueue(TEST_QUEUE_KEY, idMessage, "SampleName", str, n);
            if (!ok) break;
        return ok;
Code snippet 2: Sample code for sending 1024 message queues to a server for saving

As shown at the above code snippet 2, we can continuously send individual messages (aq.Enqueue) in streaming style. You can see that it is really easy to en-queue messages with SocketPro.


The below code snippet 3 is a demonstration for de-queuing messages in batch.

static void TestDequeue(CAsyncQueue aq) {
        //prepare callback for parsing messages dequeued from server side
        aq.ResultReturned += (sender, idReq, q) => {
            bool processed = true;
            switch (idReq) {
                case idMessage0:
                case idMessage1:
                case idMessage2:
                    Console.Write("message id={0}", idReq);
                        string name, str; int index;
                        //parse a dequeued message which should be the same 
                        //as the above enqueued message (two unicode strings and one int)
                        q.Load(out name).Load(out str).Load(out index);
                        Console.WriteLine(", name={0}, str={1}, index={2}", name, str, index);
                case idMessage3: {
                        string s1, s2;
                        q.Load(out s1).Load(out s2);
                        Console.WriteLine("{0} {1}", s1, s2);
                case idMessage4: {
                        bool b; double dbl; string s;
                        q.Load(out b).Load(out dbl).Load(out s);
                        Console.WriteLine("b= {0}, d= {1}, s= {2}", b, dbl, s);
                    processed = false;
            return processed;
        //prepare a callback for processing returned result of dequeue request
        CAsyncQueue.DDequeue d = (messageCount, fileSize, messages, bytes) => {
            Console.WriteLine("Total message count={0}, queue file size={1}, 
            messages dequeued={2}, message bytes dequeued={3}", messageCount, fileSize, messages, bytes);
            if (messageCount > 0) {
                //there are more messages left at server queue, we re-send a request to dequeue
                aq.Dequeue(TEST_QUEUE_KEY, aq.LastDequeueCallback);
        Console.WriteLine("Going to dequeue messages ......");
        bool ok = aq.Dequeue(TEST_QUEUE_KEY, d);
        //optionally, add one extra to improve processing concurrency at both client 
        //and server sides for better performance and through-output
        ok = aq.Dequeue(TEST_QUEUE_KEY, d);
Code snippet 3: Sample code for de-queuing messages in batch

The callback (aq.ResultReturned += (sender, idReq, q) => { .....};) in code snippet 3 is used to parse messages that come from remote message queue file. The codes of line 48 through 57 are used to parse messages that originated from the previous Figure 2. The codes (case idMessage0: case idMessage1: case idMessage2:) are used to parse manual batched messages (using (CScopeUQueue sb = new CScopeUQueue()) { ....}) that originated in the previous code snippet 1. As hinted by comment (//prepare a callback for processing returned result of dequeue request), the callback (CAsyncQueue.DDequeue d = ......) is used to monitor key message queue data like message count (messages to be de-queued), server queue file size, messages transferred by the below call Dequeue, and message size in bytes. Inside the callback, it is necessary to call the method Dequeue recursively if there is a message remaining in a server queue file (if (messageCount > 0)).

After preparing the previous two callbacks, we finally call the method Dequeue for sending a request to server for reading messages in batch. Optionally, we can call the method Dequeue one or two times more so that it can increase de-queuing throughput or performance because client side message parsing and server message reading can have better concurrency in processing.

Performance Study

SocketPro is written from the beginning to support streaming requests by use of non-block sockets and inner algorithms for the best network and code efficiency. You can refer to the short article of the file socketpro/doc/sq_kafka_perf.pdf for performance study result. The performance study samples are located in the directory socketpro/samples/qperf.

Our results show that SocketPro queue is significantly faster than Kafka, especially when writing high volume of small messages.

Highlights of SocketPro Persistent Message Queu

Recently, Kafka queue is most popular for its performance and scalability. Therefore, it is worth comparing SocketPro persistent message queue with Kafka, which may highlight SocketPro persistent message queue advantages.

  1. SocketPro persistent message queue has no complex configuration settings for you to understand and configure. Contrarily, Kafka requires you to understand many configuration settings ahead.
  2. SocketPro persistent message queue supports manual transaction for better stability, but Kafka doesn’t.
  3. A queue file can be sharable among multiple consumers at the same time with SocketPro queue, but Kafka is not capable to do so.
  4. SocketPro queue can guarantee no message loss as long as you turn on local or client message queue, but you cannot do so with Kafka.
  5. SocketPro queue supports message availability to notify all connected consumers in real-time fashion for the shortest latency. Its latency is always equal to 1.5 times of network latency and could be as low as 0.3 ms on local network area. Kafka’s lowest latency is 1 ms at best after you must configure a setting specifically for it.
  6. SocketPro queue is significantly faster than Kafka especially in high volume of small message writing.
  7. You can selectively en-queue a portion of messages at your will with SocketPro, but you are forced to en-queue all messages with Kafka. Further, you can integrate message queue with SocketPro other features such as online message bus, local message queue, client server communication, and so on.
  8. Both client and server codes of SocketPro persistent message queue are extremely simple, you can easily extend and modify them for your complex needs. It is not so easy for you to do so with Kafka.
  9. You can embed SocketPro queue within your application system with much simpler distribution and low dependency. It is not so easy for you to do so with Kafka.

Brief Introduction of SocketPro High Performance and Scalable Persistent Message Queue

Continuous in-line request/result batching, real-time stream sending/processing, asynchronous data transferring and parallel computation for best performance and scalability

Posted on 06-03-2018 


To comment you must be logged in members.

Files with category

  • Mini Youtube Using ReactJS

    Mini Youtube Using ReactJS

    View: 169    Download: 3   Comment: 0

    Category: Javascript     Fields: none

    This is one the best starter for ReactJS. MiniYoutube as the name suggests is a youtube like website developed using reactJS and youtube API. This project actually let's you search , play and list youtube videos. Do check it out and start learning...

  • AngularJS and REST API

    AngularJS and REST API

    View: 315    Download: 0   Comment: 0

    Category: Javascript     Fields: Other

    This is a tutorial for those interested in a quick introduction to AngularJS and REST API. We will build the familiar Periodic Table of the Elements found in every chemistry textbook, and allow the user to select a Chemical Element by clicking on...

  • Collective Intelligence, Recommending Items Based on Similar Users' Taste

    Collective Intelligence, Recommending Items Based on Similar Users' Taste

    View: 250    Download: 0   Comment: 0

    Category: Javascript     Fields: Other

    Using Collaborative Filtering to find people who share tastes, and for making automatic recommendations based on things that other people like.

  • Think Like a Bird for Better Parallel Programming

    Think Like a Bird for Better Parallel Programming

    View: 226    Download: 0   Comment: 0

    Category: Javascript     Fields: Other

    Coding an application to run in parallel is hard, right? I mean, it must be hard or we’d see parallel programs everywhere. All we'd see are slick parallel apps that use every available core effortlessly. Instead multi-threaded apps are the exception...

  • Getting Started with the Bing Search APIs

    Getting Started with the Bing Search APIs

    View: 249    Download: 0   Comment: 0

    Category: Javascript     Fields: Other

    Bing Search API is a set of REST interfaces that find web pages, news, images, videos, entities, related searches, spelling corrections, and more in response to queries from any programming language that can generate a web request. Applications that...

  • Brief Introduction of SocketPro High Performance and Scalable Persistent Message Queue

    Brief Introduction of SocketPro High Performance and Scalable Persistent Message Queue

    View: 569    Download: 0   Comment: 0

    Category: Javascript     Fields: Other

    Continuous in-line request/result batching, real-time stream sending/processing, asynchronous data transferring and parallel computation for best performance and scalability

  • Iteration Over Java Collections with High Performance

    Iteration Over Java Collections with High Performance

    View: 219    Download: 0   Comment: 0

    Category: Javascript     Fields: Other

    Java developers usually deal with Collections such as ArrayList, HashSet, Java 8 come with lambda and streaming API helps us to easily work with Collections. In most cases, we work with few thousands of items and performance isn't a concern. But in...

  • SR2JLIB - A Symbolic Regression Library for Java

    SR2JLIB - A Symbolic Regression Library for Java

    View: 215    Download: 0   Comment: 0

    Category: Javascript     Fields: Other

    Grammar-Guided Genetic Programming library featuring: multi-threading, just-in-time compilation of individuals, dynamic class loading, and JNI interfacing with C/C++ code

File suggestion for you
File top downloads
Codetitle - library source code to share, download the file to the community
Copyright © 2018. All rights reserved. codetitle Develope by Vinagon .Ltd