Loading...

Brief Introduction of a Continuous SQL-stream Sending and Processing System (Part 1: SQLite)

View: 272    Dowload: 0   Comment: 0   Post by: admin   Category: MySQL   Fields: Other

Application of SocketPro onto various databases for continuous inline request/result batching and real-time stream processing with bi-directional asynchronous data transferring

Introduction

Most of client server database systems only support synchronous communication between client and backend database by use of blocking socket and some chatty protocol that requires a client or server to wait for an acknowledgement before sending a new chunk of data. The wait time, which is also called as latency, could be starting from a few tenths for a local area network (LAN) to hundreds of milliseconds for a wide area network (WAN). Large wait times can significantly degrade the quality of an application.

Fortunately, UDAParts has developed a powerful and secure communication framework named as SocketPro, which is written with continuous inline request/result batching and real-time stream processing capabilities by use of asynchronous data transferring and parallel computation for the best network efficiency, development simplicity, performance, scalability, and many great and even unique features at the site (https://github.com/udaparts/socketpro).

Further, UDAParts has applied the powerful SocketPro framework onto popular opened source databases such as SQLite and MySQL as well as others through ODBC drivers to support continuous SQL-stream sending and processing. At the end, these pre-compiled components and opened source codes for databases are totally free forever to the public.

For reduction of learning complexity, we use SQLite database as the first sample for the first article, and MySQL as the second sample for the second article.

Source Codes and Samples

All related source codes and samples are located at https://github.com/udaparts/socketpro. After cloning it into your computer by GIT, pay attention to the subdirectory usqlite inside the directory socketpro/samples/module_sample.

You can see these samples are created from .NET, C/C++, Java and Python development environments. They can be compiled and run on either Linux or windows platforms. In case you are not used to C/C++ development, UDAParts also distributes pre-compiled test applications, test_ssqlite for server and test_csqlite for client inside the directory socketpro/bin/(win|linux) because these test applications are written from C/C++.

In addition, you can figure out how to load a SocketPro service into a server application within your familiar development environment by looking at tutorial sample all_servers at the directory socketpro/tutorials/(cplusplus|csharp|vbnet|java/src)/all_servers. However, we only use C# code (socketpro/samples/module_sample/usqlite/test_csharp) in this article for explanations.

You should distribute system libraries inside the directory of socketpro/bin into your system directory before running these sample applications.

In regards to SocketPro communication framework, you may also refer to its development guide documentation at socketpro/doc/SocketPro development guide.pdf.

Main Function

SocketPro is written from bottom to support parallel computation by use of one or more pools of non-blocking sockets. Each of the pools may be made of one or more threads and each of the threads hosts one or more non-blocking sockets at client side. However, we just use one pool for clear demonstration here, and the pool is made of one thread and one socket for this sample at client side as shown in the below Code snippet 1.

static void Main(string[] args)
{
    Console.WriteLine("Remote host: ");
    string host = Console.ReadLine();
    CConnectionContext cc = new CConnectionContext
                            (host, 20901, "usqlite_client", "password_for_usqlite");
    using (CSocketPool<CSqlite> spSqlite = new CSocketPool<CSqlite>())
    {
        //start a socket pool with 1 thread hosting 1 non-blocking socket
        if (!spSqlite.StartSocketPool(cc, 1, 1))
        {
            Console.WriteLine("Failed in connecting to remote async sqlite server");
            Console.WriteLine("Press any key to close the application ......");
            Console.Read();
            return;
        }
        CSqlite sqlite = spSqlite.Seek(); //get one async sqlite handler

        //open a global database at server side because an empty string is given
        bool ok = sqlite.Open("", (handler, res, errMsg) =>
        {
            Console.WriteLine("res = {0}, errMsg: {1}", res, errMsg);
        });

        //prepare two test tables, COMPANY and EMPLOYEE
        TestCreateTables(sqlite);

        //a container for receiving all tables data
        List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>> lstRowset = 
                         new List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>>();

        ok = sqlite.BeginTrans(); //start manual transaction

        //test both prepare and query statements
        TestPreparedStatements(sqlite, lstRowset);

        //test both prepare and query statements involved with reading and updating BLOB and large text
        InsertBLOBByPreparedStatement(sqlite, lstRowset);
            
        ok = sqlite.EndTrans(); //end manual transaction
            
        sqlite.WaitAll();

        //display received rowsets
        int index = 0;
        Console.WriteLine();
        Console.WriteLine("+++++ Start rowsets +++");
        foreach (KeyValuePair<CDBColumnInfoArray, CDBVariantArray> it in lstRowset)
        {
            Console.Write("Statement index = {0}", index);
            if (it.Key.Count > 0)
                Console.WriteLine(", rowset with columns = {0}, records = {1}.", 
                                  it.Key.Count, it.Value.Count / it.Key.Count);
            else
                Console.WriteLine(", no rowset received.");
            ++index;
        }
        Console.WriteLine("+++++ End rowsets +++");
        Console.WriteLine();
        Console.WriteLine("Press any key to close the application ......");
        Console.Read();
    }
}
Code snippet 1: Main function for demonstration of use of SocketPro SQL-stream system at client side

Starting one socket pool: The above code snippet 1 starts one socket pool which only has one worker thread that only hosts one non-blocking socket for demonstration clarity by use of one instance of connection context. However, you can create multiple pools within one client application if necessary. Afterwards, we get one asynchronous sqlite handler.

Opening database: We can send a request to open a sqlite server database. If the first input is an empty or null string as shown in this example, we are opening one instance of server global database usqlite.db, for example. If you like to create an own database, you can simply give a non-empty valid string. In addition, you need to set a callback or Lambda expression for tracking returning error message from server side if you like as shown. It is noted that SocketPro supports only asynchronous data transferring between client and server so that a request could be inputted with one or more callbacks for processing returning data. This is completely different from synchronous data transferring. In addition, we create an instance of container that is used to receive all sets of records in coming queried rowsets.

Streaming SQL statements: Keep in mind that SocketPro supports streaming all types of any number of requests on one non-blocking socket session effortlessly by design. Certainly, we are able to stream all SQL statements as well as others as shown in the above code snippet 1. All SocketPro SQL-stream services support this particular feature for the best network efficiency, which significantly improves data accessing performance. As far as we know, you cannot find such a wonderful feature from other technologies. If you find one, please let us know. Like normal database accessing APIs, SocketPro SQL-stream technology supports manual transaction too as shown in the above code snippet 1. We are going to elaborate the three functions, TestCreateTablesTestPreparedStatements and InsertBLOBByPreparedStatement in successive sections.

Waiting until all processed: Since SocketPro uses asynchronous data transferring by default, SocketPro must provide a way to wait until all requests and returning results are sent, returned and processed. SocketPro comes one unique method WaitAll at client side to serve this purpose. If you like, you can use this method to convert all asynchronous requests into synchronous ones.

TestCreateTables

This function is internally made of sending two SQL DDL statements for creating two tables, COMPANY and EMPLOYEE, as shown in the below code snippet 2.

static void TestCreateTables(CSqlite sqlite)
{
    string create_table = "CREATE TABLE COMPANY(ID INT8 PRIMARY KEY NOT NULL,name CHAR(64)NOT NULL,_
                           ADDRESS varCHAR(256)not null,Income float not null)";
    bool ok = sqlite.Execute(create_table, (handler, res, errMsg, affected, fail_ok, id) =>
    {
        Console.WriteLine("affected = {0}, fails = {1}, oks = {2}, res = {3}, errMsg: {4}, _
             last insert id = {5}", affected, (uint)(fail_ok >> 32), (uint)fail_ok, res, errMsg, id);
    });
    create_table = "CREATE TABLE EMPLOYEE(EMPLOYEEID INT8 PRIMARY KEY NOT NULL unique,_
                    CompanyId INT8 not null,name NCHAR(64)NOT NULL,_
                    JoinDate DATETIME not null default(datetime('now')),IMAGE BLOB,_
                    DESCRIPTION NTEXT,Salary real,FOREIGN KEY(CompanyId)REFERENCES COMPANY(id))";
    ok = sqlite.Execute(create_table, (handler, res, errMsg, affected, fail_ok, id) =>
    {
        Console.WriteLine("affected = {0}, fails = {1}, oks = {2}, res = {3}, errMsg: {4}, _
          last insert id = {5}", affected, (uint)(fail_ok >> 32), (uint)fail_ok, res, errMsg, id);
    });
}
Code snippet 2: Creating two SQLite tables in streaming by SocketPro SQL-stream technology

You can execute any number of SQL statements in stream as shown in the Code snippet 2. Each of the requests consists of one input SQL statement and one optional callback (or Lambda expression) for tracking expected returning results. Again, this is different from common database accessing approach as SocketPro uses asynchronous data transferring for communication.

TestPreparedStatements

SocketPro SQL-stream technology supports preparing SQL statement just like common database accessing APIs. Particularly, SocketPro SQL-stream technology even supports preparing multiple SQL statements at one shot for SQLite server database as shown in the below code snippet 3.

static void TestPreparedStatements(CSqlite sqlite, 
   List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>> ra)
{
    //a complex SQL statement combined with query and insert prepare statements
    string sql_insert_parameter = "Select datetime('now');
    INSERT OR REPLACE INTO COMPANY(ID,NAME,ADDRESS,Income)VALUES(?,?,?,?)";
    bool ok = sqlite.Prepare(sql_insert_parameter, (handler, res, errMsg) =>
    {
        Console.WriteLine("res = {0}, errMsg: {1}", res, errMsg);
    });

    CDBVariantArray vData = new CDBVariantArray();
    vData.Add(1);
    vData.Add("Google Inc.");
    vData.Add("1600 Amphitheatre Parkway, Mountain View, CA 94043, USA");
    vData.Add(66000000000.0);

    vData.Add(2);
    vData.Add("Microsoft Inc.");
    vData.Add("700 Bellevue Way NE- 22nd Floor, Bellevue, WA 98804, USA");
    vData.Add(93600000000.0);

    vData.Add(3);
    vData.Add("Apple Inc.");
    vData.Add("1 Infinite Loop, Cupertino, CA 95014, USA");
    vData.Add(234000000000.0);

    //send three sets of parameterized data in one shot for processing
    ok = sqlite.Execute(vData, (handler, res, errMsg, affected, fail_ok, id) =>
    {
        Console.WriteLine("affected = {0}, fails = {1}, oks = {2}, res = {3}, errMsg: {4}, 
        last insert id = {5}", affected, (uint)(fail_ok >> 32), (uint)fail_ok, res, errMsg, id);
    }, (handler, rowData) =>
    {
        //rowset data come here
        int last = ra.Count - 1;
        KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = ra[last];
        item.Value.AddRange(rowData);
    }, (handler) =>
    {
        //rowset header meta info comes here
        KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = new KeyValuePair<CDBColumnInfoArray, 
                                 CDBVariantArray>(handler.ColumnInfo, new CDBVariantArray());
        ra.Add(item);
    });
}
Code snippet 3: Sending multiple sets of parameters for processing multiple SQL statements in one shot by SocketPro SQL-stream technology

It is noted that the sample preparing SQL statement consists of one query and one insert statement. When the function is called, a client will expect three sets of records returned and three records inserted into the table COMPANY. The sample is designed for demonstrating the power of SocketPro SQL-stream technology. In reality, you probably don't prepare a combined SQL statement having multiple basic SQL statements. If you use a parameterized statement, you are required to send a prepare request first. After obtaining an array of data as shown in the above code snippet 3, you can send multiple sets of parameter data for processing from client to server in one single shot at the end. If you have a large amount of data, you could call the method Execute repeatedly without needing to prepare a statement again.

Next, we need more details for how to handle returning record sets. The method Execute has three callbacks or Lambda expressions for the second, third and fourth input parameters except the first input for parameter data array. Whenever a record set is coming, the third callback will be automatically called by SQLite client handler for record set column meta information. If actual records are available, the second callback will be called and you can populate data into a container ra. At the end, the first callback will be called for you to track the number of affected records and last insert identification number if successful. If we take code snippet 3 as a sample, the third callback will be called three times and the first callback will be called one time only, but it is expected that the times of calling the second callback is dependent on both the number of records and the size of one record.

InsertBLOBByPreparedStatement

Now, you can see SocketPro SQL-stream technology provides all required features for accessing a backend database. Before the end of this article, we are going to use the sample to show how to handle large binary and text objects within SocketPro-stream technology. Usually, it is difficult to access large objects inside databases efficiently. However, it is truly very simple with SocketPro SQL-stream technology for both development and efficiency as shown at the below Code snippet 4.

After looking through the code snippet in code snippet 4, you would find that this code snippet is really the same as one in the previous code snippet 3 although this code snippet is longer. Therefore, this approach is really a good thing for a software developer to reuse SocketPro SQL-stream technology for handling all types of database table fields in the same coding style for easy development.

SocketPro always divides a large binary or text object into chunks first at both client and server sides. Afterwards, SocketPro sends these smaller chunks to the other side. At the end, SocketPro will reconstruct the original large binary or text object from collected smaller chunks. This happens silently at run time for reduction of memory foot print.

static void InsertBLOBByPreparedStatement(CSqlite sqlite, 
           List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>> ra)
{
    string wstr = "";
    //prepare junk data for testing
    while (wstr.Length < 128 * 1024)
    {
        wstr += "广告做得不那么夸张的就不说了,看看这三家,都是正儿八经的公立三甲,
         附属医院,不是武警,也不是部队,更不是莆田,都在卫生部门直接监管下,照样明目张胆地骗人。";
    }
    string str = "";
    while (str.Length < 256 * 1024)
    {
        str += "The epic takedown of his opponent on an all-important voting day was 
                extraordinary even by the standards of the 2016 campaign -- and quickly drew 
                a scathing response from Trump.";
    }

    //a complex SQL statement combined with two insert and query prepare statements
    string sqlInsert = "insert or replace into employee(EMPLOYEEID,CompanyId,name,
    JoinDate,image,DESCRIPTION,Salary)values(?,?,?,?,?,?,?);select * from employee where employeeid=?";
    bool ok = sqlite.Prepare(sqlInsert, (handler, res, errMsg) =>
    {
        Console.WriteLine("res = {0}, errMsg: {1}", res, errMsg);
    });
    CDBVariantArray vData = new CDBVariantArray();
    using (CScopeUQueue sbBlob = new CScopeUQueue())
    {
        //first set of data
        vData.Add(1);
        vData.Add(1); //google company id
        vData.Add("Ted Cruz");
        vData.Add(DateTime.Now);
        sbBlob.Save(wstr);
        vData.Add(sbBlob.UQueue.GetBuffer());
        vData.Add(wstr);
        vData.Add(254000.0);
        vData.Add(1);

        //second set of data
        vData.Add(2);
        vData.Add(1); //google company id
        vData.Add("Donald Trump");
        vData.Add(DateTime.Now);
        sbBlob.UQueue.SetSize(0);
        sbBlob.Save(str);
        vData.Add(sbBlob.UQueue.GetBuffer());
        vData.Add(str);
        vData.Add(20254000.0);
        vData.Add(2);

        //third set of data
        vData.Add(3);
        vData.Add(2); //Microsoft company id
        vData.Add("Hillary Clinton");
        vData.Add(DateTime.Now);
        sbBlob.Save(wstr);
        vData.Add(sbBlob.UQueue.GetBuffer());
        vData.Add(wstr);
        vData.Add(6254000.0);
        vData.Add(3);
    }
    //send three sets of parameterized data in one shot for processing
    ok = sqlite.Execute(vData, (handler, res, errMsg, affected, fail_ok, id) =>
    {
        Console.WriteLine("affected = {0}, fails = {1}, oks = {2}, res = {3}, errMsg: {4}, 
        last insert id = {5}", affected, (uint)(fail_ok >> 32), (uint)fail_ok, res, errMsg, id);
    }, (handler, rowData) =>
    {
        //rowset data come here
        int last = ra.Count - 1;
        KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = ra[last];
        item.Value.AddRange(rowData);
    }, (handler) =>
    {
        //rowset header meta info comes here
        KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = 
        new KeyValuePair<CDBColumnInfoArray, CDBVariantArray>(handler.ColumnInfo, new CDBVariantArray());
        ra.Add(item);
    });
}
Code snippet 4: Insert and query tables having multiple large binary and text objects with SocketPro SQL-stream technology

Performance Study

SocketPro SQL-stream technology has excellent performance in database data accessing for both query and update. You can see two performance test projects (cppperf and netperf) available at socketpro/samples/module_sample/usqlite/DBPerf/. The first sample is written by C++ and the other by C#. In addition, MySQL sakila sample database, which is located in the directory socketpro/samples/module_sample/usqlite/DBPerf, is used for you to play after running the sample test_csqlite for creating a global SQLite database usqlite.db.

Our performance study shows that it is easy to get query executed at the speed of 12,000 times per second and socket connection. For insert, you can easily get the speed like 50,000 inserts per second for SQLite.

Points of Interest

SocketPro SQLite SQL-stream service provides all required basic client/server database features, but it does deliver the following unique features:

  1. Continuous inline request/result batching and real-time SQL-stream processing for the best network efficiency
  2. Bi-directional asynchronous data transferring between client and server, but all asynchronous requests can be converted into synchronous ones
  3. Superior performance and scalability because of powerful SocketPro communication architecture
  4. Real-time cache for table updateinsert and delete. You can set a callback at client side for tracking table record add, delete and update events
  5. All requests are cancelable by executing the method Cancel of class CClientSocket at client side
  6. Both windows and Linux are supported
  7. Simple development for all supported development languages
  8. Both client and server components are thread-safe. They can be easily reused within your multi-threaded applications with much fewer thread related issues

Brief Introduction of a Continuous SQL-stream Sending and Processing System (Part 1: SQLite)

Application of SocketPro onto various databases for continuous inline request/result batching and real-time stream processing with bi-directional asynchronous data transferring

Posted on 09-03-2018 

Comment:

To comment you must be logged in members.

Files with category

 
File suggestion for you
Loading...
File top downloads
Loading...
Loading...
Codetitle - library source code to share, download the file to the community
Copyright © 2018. All rights reserved. codetitle Develope by Vinagon .Ltd