Blog do projektu Open Source JavaHotel

czwartek, 29 września 2016

IBM InfoSphere Streams, Big SQL and HBase

Introduction
Big SQL can run over HBase. IBM InfoSphere Streams does not have any mean to load data directly to Big SQL. Although it is possible to use general-purpose Database Toolkit (DB2), running INSERT statement for bulk data loading is very inefficient.
Another idea is to define Big SQL table as HBase table (CREATE HBASE TABLE) and load data directly to underlying HBase table. But HBase, unlike Hive,  is a schemaless database, everything is stored as a sequence of bytes. So the content of HBase table should be stored using format supported by Big SQL.
IBM InfoSphere Streams provides HBase toolkit, but HBasePut operator cannot be used directly. For instance, string value stored by HBasePut operator is not valid CHAR column in terms of Big SQL, it should be pre-formated beforehand to binary format.
The solution is to develop additional Streams operator to encode all SPL types to binary (blob) format and such binary stream to be consumed by HBasePut operator.
Instead of (example):
stream<rstring key, tuple<rstring title, rstring author_fname,
   rstring author_lname, rstring year, rstring rating> bookData> toHBASE =
   Functor(bookStream)
  {
   output
    toHBASE : key = title + ":" + year, bookData = bookStream ;
  }

  // Now put it into HBASE.  We don't specify a columnQualifier and the attribute
  // given by valueAttrName is a tuple, so it treats the attribute names in that 
  // tuple as columnQualifiers, and the attribute values 
  () as putsink = HBASEPut(toHBASE)
  {
   param
    rowAttrName : "key" ;
    tableName : "streamsSample_books" ;
    staticColumnFamily : "all" ;
    valueAttrName : "bookData" ;
  }

use
stream<rstring key, tuple<rstring title, rstring author_fname,
   rstring author_lname, rstring year, rstring rating> bookData> toEncode =
   Functor(bookStream)
  {
   output
    toHBASE : key = title + ":" + year, bookData = bookStream ;
  }

 stream<blob key, tuple<blob title, blob author_fname, blob author_lname, blob year, blob rating> bookData> toHBase
 HBaseEncode(toEncode) 
                {
                }

  // Now put it into HBASE.  We don't specify a columnQualifier and the attribute
  // given by valueAttrName is a tuple, so it treats the attribute names in that 
  // tuple as columnQualifiers, and the attribute values 
  () as putsink = HBASEPut(toHBASE)
  {
   param
    rowAttrName : "key" ;
    tableName : "streamsSample_books" ;
    staticColumnFamily : "all" ;
    valueAttrName : "bookData" ;
  }
Solution
HBaseEncode operator is to be implemented as Java operator. Before developing it I decided to create a simple Java project encoding a subset of Java types to the binary format accepted by Big SQL.
The Big SQL HBase data format is described here,so it looked as a simple coding task. But unfortunately, the description is not accurate, so I was bogged down by unexpected problems. For instance: NULL value is marked by 0x01 value, not 0x00. Also, the source code for Hive SerDe is not very useful, because Big SQL encoding diverges in many points.
So I ended up with loading data through Big SQL INSERT command and analyzing a binary content of underlying HBase table trying to guess the proper binary format.
Java project
The Java project is available here. It consists of several subprojects.
HBaseBigSql  (Javadoc) will be used by Streams operator directly. It does not have any dependency. Big SQL types supported are specified by enum BIGSQLTYPE. The most important class is ToBIGSQL.java class containing the result of painstaking process revealing HBase binary format for all Big SQL types.
HBasePutGet (Javadoc) subproject contains several supporting classes to put data into HBase table. It has HBase client dependency.
TestH is Junit tests. The test case was very simple. CREATE HBASE TABLE, load data to underlying HBase table and get data through Big SQL SELECT statement and compare the result, whether data stored in HBase equals to data retrieved by Big SQL.
Possible extensions
  • The solution only writes data to HBase in Big SQL format. Opposite is to code a methods to read data from HBase table.
  • Compound indexes and columns are not supported.
Next step
Develop IBM InfoSphere Streams HBaseEncode operator.