Skip to content
Andy Coates edited this page Oct 23, 2019 · 14 revisions

This doc contains answers frequently asked questions. Answers are broken down into the following sections:

Basics

Why isn’t my KSQL query returning data?

Robin Moffatt has written an excellent blog post to help you figure out why. You can find it here.

Aggregations

Why does KSQL output two events for each input event when aggregating a table?

Inspired by this SO question

Example

-- Given:x
-- Create Trades table:
create table Trades(TradeId string, AccountId string, Amount double) with (KAFKA_TOPIC = 'TradeHistory', VALUE_FORMAT = 'JSON', PARTITIONS=1, KEY='TradeId');

-- Run this in a different console to see the results of the aggregation:
-- pre version 5.4:
select AccountId, count(*) as Count, sum(Amount) as Total from Trades group by AccountId;
-- post version 5.4:
select AccountId, count(*) as Count, sum(Amount) as Total from Trades group by AccountId EMIT CHANGES;

-- When we:
INSERT INTO Trades (TradeId, AccountId, Amount) VALUES ('t1', 'acc1', 106.0);

-- Then the above select window outputs:
-- AccountId | Count | Sum
   acc1 | 1 | 106.0 

-- When we:
INSERT INTO Trades (TradeId, AccountId, Amount) VALUES ('t1', 'acc1', 107.0);

-- Then the above select window may output:
-- AccountId | Count | Sum
   acc1 | 0 | 0.0
   acc1 | 1 | 107.0

If you run this example locally you may get the two output rows shown above when you insert the update to the trade t1, or you may only see the last row. The question is why does the second insert potentially result in two output events, rather than one?

Answer

When KSQL sees an update to an existing row in a table it internally emits a CDC event, which contains the old and new value. Aggregations handle this by first undoing the old value, before applying the new value.

So, in the example above, when the second insert happens it's actually updating an existing row in the Trades table. So KSQL first undos the old value. This results in the COUNT going down by 1 to 0, and the SUM going down by the old value of 106.0 to 0. Then KSQL applies the new row value, which sees the COUNT going up by 1 to 1 and the SUM going up by the new value 107.0 to 107.0.

If a third insert was done for the same TradeId and AccountId then the same pattern would be seen, i.e. first the old value would be removed, resulting in the count and sum going to zero, before the new row is added.

Why does KSQL do this? Well, to help understand what might at first look like strange behaviour, let's consider what happens when we add a few more rows:

-- When we insert with different tradeId, but same AccountId:
INSERT INTO Trades (TradeId, AccountId, Amount) VALUES ('t2', 'acc1', 10.0);

-- Then select window will output
-- Single row as this the above is an insert of a new row, so no undo to do
-- COUNT is now 2, as 2 source table rows are contributing to the aggregate
-- SUM is now 117.0, as this is the sum of the two source trade's Amount
-- TradeId | Count | Sum
   acc1 | 2 | 117.0 

-- When we update the new trade to reference a different AccountId:
INSERT INTO Trades (TradeId, AccountId, Amount) VALUES ('t2', 'acc3', 10.0);

-- Then the above select window outputs:
-- First KSQL undoes the old value for tradeId 2:
--   This drops the count of trades against acc1 to a single trade
--   And drops the sum of Amount for acc1 to the amount of that single trade
-- AccountId | Count | Sum
   acc1 | 1 | 107.0
-- Then it applies the new aggregate value for the new AccountId:
--   This outputs a new row with AccountId acc3
--   With a single trade contributing to the aggregate, so COUNT of 1 and SUM of 10.0
-- AccountId | Count | Sum
   acc3 | 1 | 10.0

Hopefully, the above example makes the behaviour seem a little less strange. Undoing the old value is important to ensure the aggregate values are correct. The behaviour only seems strange when the old and new values both affect the same aggregate row.

By default, KSQL is configured to buffer results for up to 2 seconds, or 10MB of data, before flushing the results to Kafka. This is why you may see a slight delay on the output when inserting values in this example. If both output rows are buffered together then KSQL will suppress the first result. This is why you often do not see the intermediate row being output. The configurations commit.interval.ms and cache.max.bytes.buffering, which are set to 2 seconds and 10MB, respectively, can be used to tune this behaviour. Setting either of these settings to zero will cause KSQL to always output all intermediate results.

We have a Github issue to enhance KSQL to make use of Kafka Stream's Suppression functionality, which would allow user better control over how results are materialized, and avoid the intermediate output when the old and new values affect the same aggregate row.

Joins

Coming soon...

Pull Queries

Why do my pull queries not include a ROWTIME column?

Example

-- Given an example source:
CREATE STREAM INPUT (ID INT) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON', PARTITIONS=1);

-- Given an example aggregate table:
CREATE TABLE OUTPUT AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ID;

-- Given running a push query in a different window:
SELECT * FROM OUTPUT EMIT CHANGES LIMIT 2;

-- When: Inject some data:
INSERT INTO INPUT (ID) VALUES (42);
INSERT INTO INPUT (ID) VALUES (42);

-- Then you'll see the row output by the push query, and it will include ROWTIME:
+---------------+-------+-------+
|ROWTIME        |ROWKEY |COUNT  |
+---------------+-------+-------+
|1571769443063  |42     |1      |
|1571769543463  |42     |1      |

-- However, if you then issue a pull query no ROWTIME is returned:
SELECT * FROM OUTPUT WHERE ROWKEY='42';
 ROWKEY STRING KEY | COUNT BIGINT 
 42                | 2   

You may be left wondering why ROWTIME is not being returned by the pull query?

Answer

The ROWTIME that is output for a push query is the event time of the event that caused the state of the table to change and a message to be output to OUTPUT, i.e. the ROWTIME we see is the ROWTIME from the rows we inserted into the INPUT stream.

However, when we issue a pull query we are asking for the current state of the row in the aggregate table. There is no source event, like we have with push queries, from which to derive a ROWTIME. Hence, pull query results do not include a ROWTIME column, by default.

If you require a ROWTIME then you need to think which ROWTIME do you need? Remember, the table is potentially aggregating multiple source rows into a single row in the table. Which source row's ROWTIME do you need? Once this decision is made the appropriate ROWTIME can be captured within the aggregate queries definition, NB ROWTIME is a reserved column name, so you'll need to give it another name:

-- We can change the OUTPUT table definition to include the ROWTIME we need:
-- Note: you'll need to terminate the old query and drop the old OUTPUT table def first
CREATE TABLE OUTPUT AS SELECT MAX(ROWTIME) AS MAX_TIME, COUNT(1) AS COUNT FROM INPUT GROUP BY ID;

-- Inject some data:
INSERT INTO INPUT (ID) VALUES (42);
INSERT INTO INPUT (ID) VALUES (42);

-- And then pull queries will return this:
SELECT * FROM OUTPUT WHERE ROWKEY='42';
 ROWKEY STRING KEY | MAX_TIME BIGINT | COUNT BIGINT 
 42                | 1571772033364   | 2  
Clone this wiki locally