Big Data SQL Quick Start. Big Data SQL over Kafka – Part 23

G Data SQL 3.2 version brings a few interesting features. Among those features, one of the most interesting is the ability to read Kafka. Before drilling down into details, I’d like to explain in the nutshell what Kafka is.

What is Kafka?

The full scope of the information about Kafka you may find here, but in the nutshell, it’s distributed fault tolerant message system. It allows you to connect many systems in an organized fashion. Instead, connect each system peer to peer:

you may land all your messages company wide on one system and consume it from there, like this:

Kafka is kind of Data Hub system, where you land the messages and serve it after.

More technical details.

I’d like to introduce a few key Kafka’s terms.

1) Kafka Broker. This is Kafka service, which you run on each server and which operates all read and write request

2) Kafka Producer. The process which writes data in Kafka

3) Kafka Consumer. The process, which reads data from Kafka.

4) Message. The name describes itself, I just want to add that messages have key and value. In comparison to NoSQL databases key Kafka’s key is not indexed. It has application purposes (you may put some application logic in Key) and administrative purposes (each message with the same key goes to the same partition).

5) Topic. Set of the messages organized into topics. Database guys would compare it with a table.

6) Partition. It’s a good practice to divide the topic into partitions for performance and maintenance purposes. Messages within the same key go to the same partition. If a key is absent, messages are distributing in round – robin fashion.

7) Offset. The offset is the position of each message in the topic. The offset is indexed and it allows you quickly access your particular message.

When do you delete data?

One of the basic Kafka concepts is that of retention – Kafka does not keep data forever, nor does it wait for all consumers to read a message before deleting a message. Instead, the Kafka administrator configures a retention period for each topic – either amount of time for which to store messages before deleting them or how much data to store older messages are purged. This two parameters control this: log.retention.ms and log.retention.bytes.

The amount of data to retain in the log for each topic-partition. This is the limit per partition: multiply by the number of partitions to get the total data retained for the topic. 

How to query Kafka data with Big Data SQL?

for query the Kafka data you need to create hive table first. let me show an ent-to-end example. I do have a JSON file:

$   cat web_clicks.json
{ click_date: "38041", click_time: "67786", date: "2004-02-26", am_pm: "PM", shift: "second", sub_shift: "evening", item_sk: "396439", web_page: "646"}
{ click_date: "38041", click_time: "41831", date: "2004-02-26", am_pm: "AM", shift: "first", sub_shift: "morning", item_sk: "90714", web_page: "804"}
{ click_date: "38041", click_time: "60334", date: "2004-02-26", am_pm: "PM", shift: "second", sub_shift: "afternoon", item_sk: "151944", web_page: "867"}
{ click_date: "38041", click_time: "53225", date: "2004-02-26", am_pm: "PM", shift: "first", sub_shift: "afternoon", item_sk: "175796", web_page: "563"}
{ click_date: "38041", click_time: "47515", date: "2004-02-26", am_pm: "PM", shift: "first", sub_shift: "afternoon", item_sk: "186943", web_page: "777"}
{ click_date: "38041", click_time: "73633", date: "2004-02-26", am_pm: "PM", shift: "second", sub_shift: "evening", item_sk: "118004", web_page: "647"}
{ click_date: "38041", click_time: "43133", date: "2004-02-26", am_pm: "AM", shift: "first", sub_shift: "morning", item_sk: "148210", web_page: "930"}
{ click_date: "38041", click_time: "80675", date: "2004-02-26", am_pm: "PM", shift: "second", sub_shift: "evening", item_sk: "380306", web_page: "484"}
{ click_date: "38041", click_time: "21847", date: "2004-02-26", am_pm: "AM", shift: "third", sub_shift: "morning", item_sk: "55425", web_page: "95"}
{ click_date: "38041", click_time: "35131", date: "2004-02-26", am_pm: "AM", shift: "first", sub_shift: "morning", item_sk: "185071", web_page: "118"}

and I’m going to load it into Kafka with standard Kafka tool “kafka-console-producer”:

$   cat web_clicks.json|kafka-console-producer --broker-list bds2:9092,bds3:9092,bds4:9092,bds5:9092,bds6:9092 --topic json_clickstream

for a check that messages have appeared in the topic you may use the following command:

$   kafka-console-consumer --zookeeper bds1:2181,bds2:2181,bds3:2181 --topic json_clickstream --from-beginning

after I’ve loaded this file into Kafka topic, I create a table in Hive.

Make sure that you have oracle-kafka.jar and kafka-clients*.jar in your hive.aux.jars.path:

and here:

after this you may run follow DDL in the hive:

hive> CREATE EXTERNAL TABLE json_web_clicks_kafka
row format serde 'oracle.hadoop.kafka.hive.KafkaSerDe'
stored by 'oracle.hadoop.kafka.hive.KafkaStorageHandler'
tblproperties(
'oracle.kafka.table.key.type'='long',
'oracle.kafka.table.value.type'='string',
'oracle.kafka.bootstrap.servers'='bds2:9092,bds3:9092,bds4:9092,bds5:9092,bds6:9092',
'oracle.kafka.table.topics'='json_clickstream'
);
hive> describe json_web_clicks_kafka;
hive> select * from json_web_clicks_kafka limit 1;

and as soon as hive table been created I create ORACLE_HIVE table in Oracle:

SQL> CREATE TABLE json_web_clicks_kafka (
topic varchar2(50),
partitionid integer,
VALUE  varchar2(4000),
offset integer,
timestamp timestamp, 
timestamptype integer
)
ORGANIZATION EXTERNAL
(TYPE ORACLE_HIVE DEFAULT DIRECTORY DEFAULT_DIR
   ACCESS PARAMETERS
      (
       com.oracle.bigdata.cluster=CLUSTER
       com.oracle.bigdata.tablename=default.json_web_clicks_kafka
      )
) 
PARALLEL 
REJECT LIMIT UNLIMITED;

here you also have to keep in mind that you need to add oracle -kafka.jar and  kafka -clients*.jar in your bigdata.properties file on the database and on the Hadoop side. I have dedicated the blog about how to do this here.

Now we are ready to query:

SQL> SELECT * FROM json_web_clicks_kafka
WHERE 
ROWNUM<3;

json_clickstream	209	{ click_date: "38041", click_time: "43213"..."}	0	26-JUL-17 05.55.51.762000 PM	1
json_clickstream	209	{ click_date: "38041", click_time: "74669"... }	1	26-JUL-17 05.55.51.762000 PM	1

Oracle 12c provides powerful capabilities for working with JSON, such as dot API. It allows us to easily query the JSON data as a structure: 

SELECT t.value.click_date,
       t.value.click_time
  FROM json_web_clicks_kafka t
 WHERE ROWNUM < 3;

38041	40629
38041	48699

Working with AVRO messages.

In many cases, customers are using AVRO as flexible self-described format and for exchanging messages through the Kafka. For sure we do support it and doing this in very easy and flexible way.

I do have a topic, which contains AVRO messages and I define Hive table over it:

CREATE EXTERNAL TABLE web_sales_kafka
row format serde 'oracle.hadoop.kafka.hive.KafkaSerDe'
stored by 'oracle.hadoop.kafka.hive.KafkaStorageHandler'
tblproperties(
'oracle.kafka.table.key.type'='long',
'oracle.kafka.table.value.type'='avro',
'oracle.kafka.table.value.schema'='{"type":"record","name":"avro_table","namespace":"default","fields":
[{"name":"ws_sold_date_sk","type":["null","long"],"default":null},
{"name":"ws_sold_time_sk","type":["null","long"],"default":null},
{"name":"ws_ship_date_sk","type":["null","long"],"default":null},
{"name":"ws_item_sk","type":["null","long"],"default":null},
{"name":"ws_bill_customer_sk","type":["null","long"],"default":null},
{"name":"ws_bill_cdemo_sk","type":["null","long"],"default":null},
{"name":"ws_bill_hdemo_sk","type":["null","long"],"default":null},
{"name":"ws_bill_addr_sk","type":["null","long"],"default":null},
{"name":"ws_ship_customer_sk","type":["null","long"],"default":null}
]}',
'oracle.kafka.bootstrap.servers'='bds2:9092',
'oracle.kafka.table.topics'='web_sales_avro'
);
describe web_sales_kafka;
select * from web_sales_kafka limit 1;

Here I define ‘oracle.kafka.table.value.type’=’avro’ and also I have to specify ‘oracle.kafka.table.value.schema’. After this we have structure.

In a similar way I define a table in Oracle RDBMS:

SQL> CREATE TABLE WEB_SALES_KAFKA_AVRO
   (  "WS_SOLD_DATE_SK" NUMBER, 
  "WS_SOLD_TIME_SK" NUMBER, 
  "WS_SHIP_DATE_SK" NUMBER, 
  "WS_ITEM_SK" NUMBER, 
  "WS_BILL_CUSTOMER_SK" NUMBER, 
  "WS_BILL_CDEMO_SK" NUMBER, 
  "WS_BILL_HDEMO_SK" NUMBER, 
  "WS_BILL_ADDR_SK" NUMBER, 
  "WS_SHIP_CUSTOMER_SK" NUMBER
  topic varchar2(50),
  partitionid integer,
  KEY NUMBER,
  offset integer,
  timestamp timestamp, 
  timestamptype INTEGER
   ) 
   ORGANIZATION EXTERNAL 
    ( TYPE ORACLE_HIVE
      DEFAULT DIRECTORY "DEFAULT_DIR"
      ACCESS PARAMETERS
      ( com.oracle.bigdata.tablename: web_sales_kafka
          )     
    )
   REJECT LIMIT UNLIMITED ;

And we good to query the data!

Performance considerations.

1) Number of Partitions.

This is the most important thing to keep in mind there is a nice article about how to choose a right number of partitions. For Big Data SQL purposes I’d recommend using a number of partitions a bit more than you have CPU cores on your Big Data SQL cluster.

2) Query fewer columns

Use column pruning feature. In other words list only necessary columns in your SELECT and WHERE statements. Here is the example.

I’ve created void PL/SQL function, which does nothing. But PL/SQL couldn’t be offloaded to the cell side and we will move all the data towards the database side:

SQL> create or replace function fnull(input number) return number is
Result number;
begin
Result:=input;
return(Result);
end fnull;

after this I ran the query, which requires one column and checked how much data have been returned to the DB side:

SQL> SELECT MIN(fnull(WS_SOLD_DATE_SK))
     FROM WEB_SALES_KAFKA_AVRO;

“cell interconnect bytes returned by XT smart scan” 5741.81MB

after this I repeat the same test case with 10 columns:

SQL> SELECT MIN(fnull(WS_SOLD_DATE_SK)),
       MIN(fnull(WS_SOLD_TIME_SK)),
       MIN(fnull(WS_SHIP_DATE_SK)),
       MIN(fnull(WS_ITEM_SK)),
       MIN(fnull(WS_BILL_CUSTOMER_SK)),
       MIN(fnull(WS_BILL_CDEMO_SK)),
       MIN(fnull(WS_BILL_HDEMO_SK)),
       MIN(fnull(WS_BILL_ADDR_SK)),
       MIN(fnull(WS_SHIP_CUSTOMER_SK)),
       MIN(fnull(WS_SHIP_CDEMO_SK))
  FROM WEB_SALES_KAFKA_AVRO;

“cell interconnect bytes returned by XT smart scan” 32193.98 MB

so, hopefully, this test case clearly shows that you have to use only useful columns

3) Indexes

There is no Indexes rather than Offset columns. The fact that you have key column doesn’t have to mislead you – it’s not indexed. The only offset allows you have a quick random access

4) Warm up your data

If you want to read data faster many times, you have to warm it up, by running “select *” type of the queries.

Kafka relies on Linux filesystem cache, so for reading the same dataset faster many times, you have to read it the first time.

Here is the example

- I clean up the Linux filesystem cache

dcli -C "sync; echo 3 > /proc/sys/vm/drop_caches"

- I tun the first query:

SELECT COUNT(1) FROM WEB_RETURNs_JSON_KAFKA t

it took 278 seconds.

- Second and third time took 92 seconds only.

5) Use bigger Replication Factor

Use bigger replication factor. Here is the example. I do have two tables one is created over the Kafka topic with Replication Factor  = 1, second is created over Kafka topic with ith Replication Factor  = 3.

SELECT COUNT(1) FROM JSON_KAFKA_RF1 t

this query took 278 seconds for the first run and 92 seconds for the next runs

SELECT COUNT(1) FROM JSON_KAFKA_RF3 t

This query took 279 seconds for the first run, but 34 seconds for the next runs.

6) Compression considerations

Kafka supports different type of compressions. If you store the data in JSON or XML format compression rate could be significant. Here is the examples of the numbers, that could be:

Data format and compression type Size of the data, GB
JSON on HDFS, uncompressed 273.1
JSON in Kafka, uncompressed 286.191
JSON in Kafka, Snappy 180.706
JSON in Kafka, GZIP 52.2649
AVRO in Kafka, uncompressed 252.975
AVRO in Kafka, Snappy 158.117
AVRO in Kafka, GZIP 54.49

This feature may save some space on the disks, but taking into account, that Kafka primarily used for the temporal store (like one week or one month), I’m not sure that it makes any sense. Also, you will pay some performance penalty, querying this data (and burn more CPU). 

I’ve run a query like:

SQL> select count(1) from ...

and had followed results:

Type of compression Elapsed time, sec
uncompressed 76
snappy 80
gzip 92

so, uncompressed is the leader. Gzip and Snappy slower (not significantly, but slow). taking into account this as well as fact, that Kafka is a temporal store, I wouldn’t recommend using compression without any exeptional need. 

7) Use parallelize your processing.

If for some reasons you are using a small number of partitions, you could use Hive metadata parameter “oracle.kafka.partition.chunk.size” for increase parallelism. This parameter defines a size of the input Split. So, if you set up this parameter equal 1MB and your topic has 4MB total, you will proceed it with 4 parallel threads.

Here is the test case:

- Drop Kafka topic

$   kafka-topics --delete --zookeeper cfclbv3870:2181,cfclbv3871:2181,cfclbv3872:2181 --topic store_sales

- Create again with only one partition

$   kafka-topics --create --zookeeper cfclbv3870:2181,cfclbv3871:2181,cfclbv3872:2181 --replication-factor 3 --partitions 1 --topic store_sales

- Check it

$   kafka-topics --describe --zookeeper cfclbv3870:2181,cfclbv3871:2181,cfclbv3872:2181 --topic store_sales
...
Topic:store_sales       PartitionCount:1        ReplicationFactor:3     Configs:
      Topic: store_sales      Partition: 0    Leader: 79      Replicas: 79,76,77      Isr: 79,76,77
...

- Check the size of input file:

$   du -h store_sales.dat
19G     store_sales.dat

- Load data to the Kafka topic

$   cat store_sales.dat|kafka-console-producer --broker-list cfclbv3870.us2.oraclecloud.com:9092,cfclbv3871.us2.oraclecloud.com:9092,cfclbv3872.us2.oraclecloud.com:9092,cfclbv3873.us2.oraclecloud.com:9092,cfclbv3874.us2.oraclecloud.com:9092 --topic store_sales  --request-timeout-ms 30000  --batch-size 1000000

- Create Hive External table

hive> CREATE EXTERNAL TABLE store_sales_kafka
row format serde 'oracle.hadoop.kafka.hive.KafkaSerDe'
stored by 'oracle.hadoop.kafka.hive.KafkaStorageHandler'
tblproperties(
'oracle.kafka.table.key.type'='long',
'oracle.kafka.table.value.type'='string',
'oracle.kafka.bootstrap.servers'='cfclbv3870:9092,cfclbv3871:9092,cfclbv3872:9092,cfclbv3873:9092,cfclbv3874:9092',
'oracle.kafka.table.topics'='store_sales'
);

- Create Oracle external table

SQL> CREATE TABLE STORE_SALES_KAFKA
   (	TOPIC VARCHAR2(50), 
      PARTITIONID NUMBER, 
      VALUE VARCHAR2(4000), 
      OFFSET NUMBER, 
      TIMESTAMP TIMESTAMP, 
      TIMESTAMPTYPE NUMBER
   ) 
   ORGANIZATION EXTERNAL 
    ( TYPE ORACLE_HIVE
      DEFAULT DIRECTORY DEFAULT_DIR
      ACCESS PARAMETERS
      ( com.oracle.bigdata.tablename=default.store_sales_kafka
    )     
    )
   REJECT LIMIT UNLIMITED 
  PARALLEL ;

- Run test query

SQL> SELECT COUNT(1) FROM store_sales_kafka;

it took 142 seconds

- Re-create Hive external table with ‘oracle.kafka.partition.chunk.size’ parameter equal 1MB

hive> CREATE EXTERNAL TABLE store_sales_kafka
row format serde 'oracle.hadoop.kafka.hive.KafkaSerDe'
stored by 'oracle.hadoop.kafka.hive.KafkaStorageHandler'
tblproperties(
'oracle.kafka.chop.partition'='true',
'oracle.kafka.partition.chunk.size'='1048576',
'oracle.kafka.table.key.type'='long',
'oracle.kafka.table.value.type'='string',
'oracle.kafka.bootstrap.servers'='cfclbv3870:9092,cfclbv3871:9092,cfclbv3872:9092,cfclbv3873:9092,cfclbv3874:9092',
'oracle.kafka.table.topics'='store_sales'
);

- Run query again:

SQL> SELECT COUNT(1) FROM store_sales_kafka;

Now it took only 7 seconds

One MB split is quite low, and for big topics we recommend to use 256MB.

8) Querying small topics.

Sometimes it happens that you need to query really small topics (few hundreds of messages, for example), but very frequently. At this case, it makes sense to create a topic with fewer paritions.

Here is the test case example:

- Create topic with 1000 partitions

$   kafka-topics --create --zookeeper cfclbv3870:2181,cfclbv3871:2181,cfclbv3872:2181 --replication-factor 3 --partitions 1000 --topic small_topic

- Load only one message there

$   echo "test"|kafka-console-producer --broker-list cfclbv3870.us2.oraclecloud.com:9092,cfclbv3871.us2.oraclecloud.com:9092,cfclbv3872.us2.oraclecloud.com:9092,cfclbv3873.us2.oraclecloud.com:9092,cfclbv3874.us2.oraclecloud.com:9092 --topic small_topic

- Create hive external table

hive> CREATE EXTERNAL TABLE small_topic_kafka
row format serde 'oracle.hadoop.kafka.hive.KafkaSerDe'
stored by 'oracle.hadoop.kafka.hive.KafkaStorageHandler'
tblproperties(
'oracle.kafka.table.key.type'='long',
'oracle.kafka.table.value.type'='string',
'oracle.kafka.bootstrap.servers'='cfclbv3870:9092,cfclbv3871:9092,cfclbv3872:9092,cfclbv3873:9092,cfclbv3874:9092',
'oracle.kafka.table.topics'='small_topic'
);

- Create Oracle external table

SQL> CREATE TABLE small_topic_kafka (
topic varchar2(50),
partitionid integer,
VALUE varchar2(4000),
offset integer,
timestamp timestamp,
timestamptype integer
)
ORGANIZATION EXTERNAL
(TYPE ORACLE_HIVE DEFAULT DIRECTORY DEFAULT_DIR
ACCESS PARAMETERS
(
com.oracle.bigdata.tablename=default.small_topic_kafka
)
)
PARALLEL
REJECT LIMIT UNLIMITED;

- Query all rows from it

SQL> SELECT * FROM small_topic_kafka

it took 6 seconds

- Create topic with only one partition and put only one message there and run same SQL query over it

$   kafka-topics --create --zookeeper cfclbv3870:2181,cfclbv3871:2181,cfclbv3872:2181 --replication-factor 3 --partitions 1 --topic small_topic
  
$   echo "test"|kafka-console-producer --broker-list cfclbv3870.us2.oraclecloud.com:9092,cfclbv3871.us2.oraclecloud.com:9092,cfclbv3872.us2.oraclecloud.com:9092,cfclbv3873.us2.oraclecloud.com:9092,cfclbv3874.us2.oraclecloud.com:9092 --topic small_topic
SQL> SELECT * FROM small_topic_kafka

now it takes only 0.5 second

9) Type of data in Kafka messages.

You have few options for storing data in Kafka messages and for sure you want to do pushdown processing. Big Data SQL supports pushdown operations only for JSONs. This means that everything that you could expose thought the JSON will be pushed down to the cell side and will be prosessed there.

Example

- The query which could be pushed down to the cell side (JSON):

SQL> SELECT COUNT(1) FROM WEB_RETURN_JSON_KAFKA t
WHERE
t.VALUE.after.WR_ORDER_NUMBER=233183247;

- The query which could not be pushed down to the cell side (XML):

SQL> SELECT COUNT(1)
  FROM WEB_RETURNS_XML_KAFKA t
 WHERE 
XMLTYPE(t.value).EXTRACT('/operation/col[@name="WR_ORDER_NUMBER"]/after/text()')
 .getNumberVal() = 233183247;

If amounts of data is not significant, you could use Big Data SQL for processing. If we are talking about big data volumes, you could process it once and convert into different file formats on HDFS, with Hive query:

hive> select xpath_int(value,'/operation/col[@name="WR_ORDER_NUMBER"]/after/text()') from WEB_RETURNS_XML_KAFKA limit 1 ;

10) JSON vs AVRO format in the Kafka topics

In continuing to the previous point, you may be wondering which semi-structured format use? The answer is easy – use what your data source produce there is no significant performance difference between Avro and JSON. For example, a query like:

SQL> SELECT COUNT(1) FROM WEB_RETURNS_avro_kafka t
WHERE
t.WR_ORDER_NUMBER=233183247;

Will be done in 112 seconds in case of JSON and in 105 seconds in case of Avro.

and JSON topic will take 286.33 GB and Avro will take 202.568 GB. There is some difference, but not worth for converting the original format.

How to bring data from OLTP databases in Kafka? Use Golden Gate!

Oracle Golden Gate is the well-known product for capturing commit logs on the database side and bring the changes into a target system. The good news that Kafka may play a role in the target system. I’d like to skip the detailed explanation of this feature, because it’s already explained in very deep details here.

Let’s block ads! (Why?)

Oracle Blogs | Oracle The Data Warehouse Insider Blog