Discussion:
Using CDC Feature to Stream C* to Kafka (Design Proposal)
(too old to reply)
Joy Gao
2018-09-06 17:53:21 UTC
Permalink
Hi all,

We are fairly new to Cassandra. We began looking into the CDC feature
introduced in 3.0. As we spent more time looking into it, the complexity
began to add up (i.e. duplicated mutation based on RF, out of order
mutation, mutation does not contain full row of data, etc). These
limitations have already been mentioned in the discussion thread in
CASSANDRA-8844, so we understand the design decisions around this. However,
we do not want to push solving this complexity to every downstream
consumers, where they each have to handle
deduping/ordering/read-before-write to get full row; instead we want to
solve them earlier in the pipeline, so the change message are
deduped/ordered/complete by the time they arrive in Kafka. Dedupe can be
solved with a cache, and ordering can be solved since mutations have
timestamps, but the one we have the most trouble with is not having the
full row of data.

We had a couple discussions with some folks in other companies who are
working on applying CDC feature for their real-time data pipelines. On a
high-level, the common feedback we gathered is to use a stateful processing
approach to maintain a separate db which mutations are applied to, which
then allows them to construct the "before" and "after" data without having
to query the original Cassandra db on each mutation. The downside of this
is the operational overhead of having to maintain this intermediary db for
CDC.

We have an unconventional idea (inspired by DSE Advanced Replication) that
eliminates some of the operational overhead, but with tradeoff of
increasing code complexity and memory pressure. The high level idea is a
stateless processing approach where we have a process in each C* node that
parse mutation from CDC logs and query local node to get the "after" data,
which avoid network hops and thus making reading full-row of data more
efficient. We essentially treat the mutations in CDC log as change
notifications. To solve dedupe/ordering, only the primary node for each
token range will send the data to Kafka, but data are reconciled with peer
nodes to prevent data loss.

We have a* WIP design doc
<https://wepayinc.box.com/s/fmdtw0idajyfa23hosf7x4ustdhb0ura>* that goes
over this idea in details.

We haven't sort out all the edge cases yet, but would love to get some
feedback from the community on the general feasibility of this approach.
Any ideas/concerns/questions would be helpful to us. Thanks!

Joy
Jonathan Haddad
2018-09-09 13:08:36 UTC
Permalink
I'll be honest, I'm having a hard time wrapping my head around an
architecture where you use CDC to push data into Kafka. I've worked on
plenty of systems that use Kafka as a means of communication, and one of
the consumers is a process that stores data in Cassandra. That's pretty
normal. Sending Cassandra mutations to Kafka, on the other hand, feels
backwards and for 99% of teams, more work than it's worth.

There may be some use cases for it.. but I'm not sure what they are. It
might help if you shared the use cases where the extra complexity is
required? When does writing to Cassandra which then dedupes and writes to
Kafka a preferred design then using Kafka and simply writing to Cassandra?

If the answer is "because it's fun to solve hard problems" that's OK too!

Jon
Post by Joy Gao
Hi all,
We are fairly new to Cassandra. We began looking into the CDC feature
introduced in 3.0. As we spent more time looking into it, the complexity
began to add up (i.e. duplicated mutation based on RF, out of order
mutation, mutation does not contain full row of data, etc). These
limitations have already been mentioned in the discussion thread in
CASSANDRA-8844, so we understand the design decisions around this. However,
we do not want to push solving this complexity to every downstream
consumers, where they each have to handle
deduping/ordering/read-before-write to get full row; instead we want to
solve them earlier in the pipeline, so the change message are
deduped/ordered/complete by the time they arrive in Kafka. Dedupe can be
solved with a cache, and ordering can be solved since mutations have
timestamps, but the one we have the most trouble with is not having the
full row of data.
We had a couple discussions with some folks in other companies who are
working on applying CDC feature for their real-time data pipelines. On a
high-level, the common feedback we gathered is to use a stateful processing
approach to maintain a separate db which mutations are applied to, which
then allows them to construct the "before" and "after" data without having
to query the original Cassandra db on each mutation. The downside of this
is the operational overhead of having to maintain this intermediary db for
CDC.
We have an unconventional idea (inspired by DSE Advanced Replication) that
eliminates some of the operational overhead, but with tradeoff of
increasing code complexity and memory pressure. The high level idea is a
stateless processing approach where we have a process in each C* node that
parse mutation from CDC logs and query local node to get the "after" data,
which avoid network hops and thus making reading full-row of data more
efficient. We essentially treat the mutations in CDC log as change
notifications. To solve dedupe/ordering, only the primary node for each
token range will send the data to Kafka, but data are reconciled with peer
nodes to prevent data loss.
We have a* WIP design doc
<https://wepayinc.box.com/s/fmdtw0idajyfa23hosf7x4ustdhb0ura>* that goes
over this idea in details.
We haven't sort out all the edge cases yet, but would love to get some
feedback from the community on the general feasibility of this approach.
Any ideas/concerns/questions would be helpful to us. Thanks!
Joy
--
Jon Haddad
http://www.rustyrazorblade.com
twitter: rustyrazorblade
Dinesh Joshi
2018-09-10 07:58:07 UTC
Permalink
There may be some use cases for it.. but I'm not sure what they are. It might help if you shared the use cases where the extra complexity is required? When does writing to Cassandra which then dedupes and writes to Kafka a preferred design then using Kafka and simply writing to Cassandra?
From the reading of the proposal, it seems bring functionality similar to MySQL's binlog to Kafka connector. This is useful for many applications that want to be notified when certain (or any) rows change in the database primarily for a event driven application architecture.

Implementing this in the database layer means there is a standard approach to getting a change notification stream. Downstream subscribers can then decide which notifications to act on.

LinkedIn's databus is similar in functionality - https://github.com/linkedin/databus <https://github.com/linkedin/databus> However it is for heterogenous datastores.
We have a WIP design doc <https://wepayinc.box.com/s/fmdtw0idajyfa23hosf7x4ustdhb0ura> that goes over this idea in details.
We haven't sort out all the edge cases yet, but would love to get some feedback from the community on the general feasibility of this approach. Any ideas/concerns/questions would be helpful to us. Thanks!
Interesting idea. I did go over the proposal briefly. I concur with Jon about adding more use-cases to clarify this feature's potential use-cases.

Dinesh
Rahul Singh
2018-09-10 13:02:26 UTC
Permalink
Not everyone has it their way like Frank Sinatra. Due to various reasons, folks need to get the changes in Cassandra to be duplicated to a topic for further processing - especially if the new system owner doesn’t own the whole platform.

There are various ways to do this but you have to deal with the consequences.

1. Kafka Connect using landoops current source connector which does “allow filtering” on tables. Sends changes to Kafka topic. Then you can either process using Kafka Streams, Kafka Connect sink, or Kafka Consumer API.

2. CDC to Kafka , especially if the CDC is coming from commit logs - you may see duplicates from nodes.

3. Triggers to Kafka , this is the only way I know now to do once only messages to Kafka for every mutation that Cassandra receives. This could be problematic because you may lose sending a message to Kafka — because you only get it once.

Ideally you’ll want to do what Jon suggested and source the event from Kafka for all subsequent processes rather than process in Cassandra and the create the event in Kafka.

Rahul Singh
Chief Executive Officer
m 202.905.2818

Anant Corporation
1010 Wisconsin Ave NW, Suite 250
Washington, D.C. 20007

We build and manage digital business technology platforms.
Post by Dinesh Joshi
There may be some use cases for it.. but I'm not sure what they are.  It might help if you shared the use cases where the extra complexity is required?  When does writing to Cassandra which then dedupes and writes to Kafka a preferred design then using Kafka and simply writing to Cassandra?
From the reading of the proposal, it seems bring functionality similar to MySQL's binlog to Kafka connector. This is useful for many applications that want to be notified when certain (or any) rows change in the database primarily for a event driven application architecture.
Implementing this in the database layer means there is a standard approach to getting a change notification stream. Downstream subscribers can then decide which notifications to act on.
LinkedIn's databus is similar in functionality - https://github.com/linkedin/databus However it is for heterogenous datastores.
We have a WIP design doc that goes over this idea in details.
We haven't sort out all the edge cases yet, but would love to get some feedback from the community on the general feasibility of this approach. Any ideas/concerns/questions would be helpful to us. Thanks!
Interesting idea. I did go over the proposal briefly. I concur with Jon about adding more use-cases to clarify this feature's potential use-cases.
Dinesh
Rahul Singh
2018-09-10 13:08:21 UTC
Permalink
In response to mimicking Advanced replication in DSE. I understand the goal. Although DSE advanced replication does one way, those are use cases with limited value to me because ultimately it’s still a master slave design.

I’m working on a prototype for this for two way replication between clusters or databases regardless of dB tech - and every variation I can get to comes down to some implementation of the Calvin protocol which basically verifies the change in either cluster , sequences it according to impact to underlying data, and then schedules the mutation in a predictable manner on both clusters / DBS.

All that means is that I need to sequence the change before it happens so I can predictably ensure it’s Scheduled for write / Mutation. So I’m
Back to square one: having a definitive queue / ledger separate from the individual commit log of the cluster.


Rahul Singh
Chief Executive Officer
m 202.905.2818

Anant Corporation
1010 Wisconsin Ave NW, Suite 250
Washington, D.C. 20007

We build and manage digital business technology platforms.
Post by Dinesh Joshi
There may be some use cases for it.. but I'm not sure what they are.  It might help if you shared the use cases where the extra complexity is required?  When does writing to Cassandra which then dedupes and writes to Kafka a preferred design then using Kafka and simply writing to Cassandra?
From the reading of the proposal, it seems bring functionality similar to MySQL's binlog to Kafka connector. This is useful for many applications that want to be notified when certain (or any) rows change in the database primarily for a event driven application architecture.
Implementing this in the database layer means there is a standard approach to getting a change notification stream. Downstream subscribers can then decide which notifications to act on.
LinkedIn's databus is similar in functionality - https://github.com/linkedin/databus However it is for heterogenous datastores.
We have a WIP design doc that goes over this idea in details.
We haven't sort out all the edge cases yet, but would love to get some feedback from the community on the general feasibility of this approach. Any ideas/concerns/questions would be helpful to us. Thanks!
Interesting idea. I did go over the proposal briefly. I concur with Jon about adding more use-cases to clarify this feature's potential use-cases.
Dinesh
DuyHai Doan
2018-09-10 21:21:03 UTC
Permalink
Also using Calvin means having to implement a distributed monotonic
sequence as a primitive, not trivial at all ...
Post by Rahul Singh
In response to mimicking Advanced replication in DSE. I understand the
goal. Although DSE advanced replication does one way, those are use cases
with limited value to me because ultimately it’s still a master slave
design.
I’m working on a prototype for this for two way replication between
clusters or databases regardless of dB tech - and every variation I can get
to comes down to some implementation of the Calvin protocol which basically
verifies the change in either cluster , sequences it according to impact to
underlying data, and then schedules the mutation in a predictable manner on
both clusters / DBS.
All that means is that I need to sequence the change before it happens so
I can predictably ensure it’s Scheduled for write / Mutation. So I’m
Back to square one: having a definitive queue / ledger separate from the
individual commit log of the cluster.
Rahul Singh
Chief Executive Officer
m 202.905.2818
Anant Corporation
1010 Wisconsin Ave NW, Suite 250
<https://maps.google.com/?q=1010+Wisconsin+Ave+NW,+Suite+250+%0D%0AWashington,+D.C.+20007&entry=gmail&source=g>
Washington, D.C. 20007
We build and manage digital business technology platforms.
There may be some use cases for it.. but I'm not sure what they are. It
might help if you shared the use cases where the extra complexity is
required? When does writing to Cassandra which then dedupes and writes to
Kafka a preferred design then using Kafka and simply writing to Cassandra?
From the reading of the proposal, it seems bring functionality similar to
MySQL's binlog to Kafka connector. This is useful for many applications
that want to be notified when certain (or any) rows change in the database
primarily for a event driven application architecture.
Implementing this in the database layer means there is a standard approach
to getting a change notification stream. Downstream subscribers can then
decide which notifications to act on.
LinkedIn's databus is similar in functionality -
https://github.com/linkedin/databus However it is for heterogenous
datastores.
Post by Joy Gao
We have a* WIP design doc
<https://wepayinc.box.com/s/fmdtw0idajyfa23hosf7x4ustdhb0ura>* that goes
over this idea in details.
We haven't sort out all the edge cases yet, but would love to get some
feedback from the community on the general feasibility of this approach.
Any ideas/concerns/questions would be helpful to us. Thanks!
Interesting idea. I did go over the proposal briefly. I concur with Jon
about adding more use-cases to clarify this feature's potential use-cases.
Dinesh
Rahul Singh
2018-09-11 14:01:17 UTC
Permalink
You know what they say: Go big or go home.

Right now candidates are Cassandra itself but embedded or on the side not on the actual data clusters, zookeeper (yuck) , Kafka (which needs zookeeper, yuck) , S3 (outside service dependency, so no go. )

Jeff, Those are great patterns. ESP. Second one. Have used it several times. Cassandra is a great place to store data in transport.


Rahul
Also using Calvin means having to implement a distributed monotonic sequence as a primitive, not trivial at all ...
Post by Rahul Singh
In response to mimicking Advanced replication in DSE. I understand the goal. Although DSE advanced replication does one way, those are use cases with limited value to me because ultimately it’s still a master slave design.
I’m working on a prototype for this for two way replication between clusters or databases regardless of dB tech - and every variation I can get to comes down to some implementation of the Calvin protocol which basically verifies the change in either cluster , sequences it according to impact to underlying data, and then schedules the mutation in a predictable manner on both clusters / DBS.
All that means is that I need to sequence the change before it happens so I can predictably ensure it’s Scheduled for write / Mutation. So I’m
Back to square one: having a definitive queue / ledger separate from the individual commit log of the cluster.
Rahul Singh
Chief Executive Officer
m 202.905.2818
Anant Corporation
1010 Wisconsin Ave NW, Suite 250
Washington, D.C. 20007
We build and manage digital business technology platforms.
Post by Dinesh Joshi
There may be some use cases for it.. but I'm not sure what they are.  It might help if you shared the use cases where the extra complexity is required?  When does writing to Cassandra which then dedupes and writes to Kafka a preferred design then using Kafka and simply writing to Cassandra?
From the reading of the proposal, it seems bring functionality similar to MySQL's binlog to Kafka connector. This is useful for many applications that want to be notified when certain (or any) rows change in the database primarily for a event driven application architecture.
Implementing this in the database layer means there is a standard approach to getting a change notification stream. Downstream subscribers can then decide which notifications to act on.
LinkedIn's databus is similar in functionality - https://github.com/linkedin/databus However it is for heterogenous datastores.
We have a WIP design doc that goes over this idea in details.
We haven't sort out all the edge cases yet, but would love to get some feedback from the community on the general feasibility of this approach. Any ideas/concerns/questions would be helpful to us. Thanks!
Interesting idea. I did go over the proposal briefly. I concur with Jon about adding more use-cases to clarify this feature's potential use-cases.
Dinesh
Joy Gao
2018-09-12 03:38:52 UTC
Permalink
Thank you all for the feedback so far.

The immediate use case for us is setting up a real-time streaming data
pipeline from C* to our Data Warehouse (BigQuery), where other teams can
access the data for reporting/analytics/ad-hoc query. We already do this
with MySQL
<https://wecode.wepay.com/posts/streaming-databases-in-realtime-with-mysql-debezium-kafka>,
where we stream the MySQL Binlog via Debezium <https://debezium.io>'s MySQL
Connector to Kafka, and then use a BigQuery Sink Connector to stream data
to BigQuery.

Re Jon's comment about why not write to Kafka first? In some cases that may
be ideal; but one potential concern we have with writing to Kafka first is
not having "read-after-write" consistency. The data could be written to
Kafka, but not yet consumed by C*. If the web service issues a (quorum)
read immediately after the (quorum) write, the data that is being returned
could still be outdated if the consumer did not catch up. Having web
service interacts with C* directly solves this problem for us (we could add
a cache before writing to Kafka, but that adds additional operational
complexity to the architecture; alternatively, we could write to Kafka and
C* transactionally, but distributed transaction is slow).

Having the ability to stream its data to other systems could make C* more
flexible and more easily integrated into a larger data ecosystem. As Dinesh
has mentioned, implementing this in the database layer means there is a
standard approach to getting a change notification stream (unlike trigger
which is ad-hoc and customized). Aside from replication, the change events
could be used for updating Elasticsearch, generating derived views (i.e.
for reporting), sending to an audit services, sending to a notification
service, and in our case, streaming to our data warehouse for analytics.
(one article that goes over database streaming is Martin Kleppman's Turning
the Database Inside Out with Apache Samza
<https://www.confluent.io/blog/turning-the-database-inside-out-with-apache-samza/>,
which seems relevant here). For reference, this turning database into a
stream of change events is pretty common in SQL databases (i.e. mysql
binlog, postgres WAL) and NoSQL databases that have primary-replica setup
(i.e. Mongodb Oplog). Recently CockroachDB introduced a CDC feature as well
(and they have master-less replication too).

Hope that answers the question. That said, dedupe/ordering/getting full row
of data via C* CDC is a hard problem, but may be worth solving for reasons
mentioned above. Our proposal is an user approach to solve these problems.
Maybe the more sensible thing to do is to build it as part of C* itself,
but that's a much bigger discussion. If anyone is building a streaming
pipeline for C*, we'd be interested in hearing their approaches as well.
Post by Rahul Singh
You know what they say: Go big or go home.
Right now candidates are Cassandra itself but embedded or on the side not
on the actual data clusters, zookeeper (yuck) , Kafka (which needs
zookeeper, yuck) , S3 (outside service dependency, so no go. )
Jeff, Those are great patterns. ESP. Second one. Have used it several
times. Cassandra is a great place to store data in transport.
Rahul
Also using Calvin means having to implement a distributed monotonic
sequence as a primitive, not trivial at all ...
Post by Rahul Singh
In response to mimicking Advanced replication in DSE. I understand the
goal. Although DSE advanced replication does one way, those are use cases
with limited value to me because ultimately it’s still a master slave
design.
I’m working on a prototype for this for two way replication between
clusters or databases regardless of dB tech - and every variation I can get
to comes down to some implementation of the Calvin protocol which basically
verifies the change in either cluster , sequences it according to impact to
underlying data, and then schedules the mutation in a predictable manner on
both clusters / DBS.
All that means is that I need to sequence the change before it happens so
I can predictably ensure it’s Scheduled for write / Mutation. So I’m
Back to square one: having a definitive queue / ledger separate from the
individual commit log of the cluster.
Rahul Singh
Chief Executive Officer
m 202.905.2818
Anant Corporation
1010 Wisconsin Ave NW, Suite 250
<https://maps.google.com/?q=1010+Wisconsin+Ave+NW,+Suite+250+%0D%0AWashington,+D.C.+20007&entry=gmail&source=g>
Washington, D.C. 20007
We build and manage digital business technology platforms.
There may be some use cases for it.. but I'm not sure what they are. It
might help if you shared the use cases where the extra complexity is
required? When does writing to Cassandra which then dedupes and writes to
Kafka a preferred design then using Kafka and simply writing to Cassandra?
From the reading of the proposal, it seems bring functionality similar to
MySQL's binlog to Kafka connector. This is useful for many applications
that want to be notified when certain (or any) rows change in the database
primarily for a event driven application architecture.
Implementing this in the database layer means there is a standard
approach to getting a change notification stream. Downstream subscribers
can then decide which notifications to act on.
LinkedIn's databus is similar in functionality -
https://github.com/linkedin/databus However it is for heterogenous
datastores.
Post by Joy Gao
We have a* WIP design doc
<https://wepayinc.box.com/s/fmdtw0idajyfa23hosf7x4ustdhb0ura>* that
goes over this idea in details.
We haven't sort out all the edge cases yet, but would love to get some
feedback from the community on the general feasibility of this approach.
Any ideas/concerns/questions would be helpful to us. Thanks!
Interesting idea. I did go over the proposal briefly. I concur with Jon
about adding more use-cases to clarify this feature's potential use-cases.
Dinesh
Joy Gao
2018-09-12 03:56:46 UTC
Permalink
Re Rahul: "Although DSE advanced replication does one way, those are use
cases with limited value to me because ultimately it’s still a master slave
design."
Completely agree. I'm not familiar with Calvin protocol, but that sounds
interesting (reading time...).
Post by Joy Gao
Thank you all for the feedback so far.
The immediate use case for us is setting up a real-time streaming data
pipeline from C* to our Data Warehouse (BigQuery), where other teams can
access the data for reporting/analytics/ad-hoc query. We already do this
with MySQL
<https://wecode.wepay.com/posts/streaming-databases-in-realtime-with-mysql-debezium-kafka>,
where we stream the MySQL Binlog via Debezium <https://debezium.io>'s
MySQL Connector to Kafka, and then use a BigQuery Sink Connector to stream
data to BigQuery.
Re Jon's comment about why not write to Kafka first? In some cases that
may be ideal; but one potential concern we have with writing to Kafka first
is not having "read-after-write" consistency. The data could be written to
Kafka, but not yet consumed by C*. If the web service issues a (quorum)
read immediately after the (quorum) write, the data that is being returned
could still be outdated if the consumer did not catch up. Having web
service interacts with C* directly solves this problem for us (we could add
a cache before writing to Kafka, but that adds additional operational
complexity to the architecture; alternatively, we could write to Kafka and
C* transactionally, but distributed transaction is slow).
Having the ability to stream its data to other systems could make C* more
flexible and more easily integrated into a larger data ecosystem. As Dinesh
has mentioned, implementing this in the database layer means there is a
standard approach to getting a change notification stream (unlike trigger
which is ad-hoc and customized). Aside from replication, the change events
could be used for updating Elasticsearch, generating derived views (i.e.
for reporting), sending to an audit services, sending to a notification
service, and in our case, streaming to our data warehouse for analytics.
(one article that goes over database streaming is Martin Kleppman's Turning
the Database Inside Out with Apache Samza
<https://www.confluent.io/blog/turning-the-database-inside-out-with-apache-samza/>,
which seems relevant here). For reference, this turning database into a
stream of change events is pretty common in SQL databases (i.e. mysql
binlog, postgres WAL) and NoSQL databases that have primary-replica setup
(i.e. Mongodb Oplog). Recently CockroachDB introduced a CDC feature as well
(and they have master-less replication too).
Hope that answers the question. That said, dedupe/ordering/getting full
row of data via C* CDC is a hard problem, but may be worth solving for
reasons mentioned above. Our proposal is an user approach to solve these
problems. Maybe the more sensible thing to do is to build it as part of C*
itself, but that's a much bigger discussion. If anyone is building a
streaming pipeline for C*, we'd be interested in hearing their approaches
as well.
Post by Rahul Singh
You know what they say: Go big or go home.
Right now candidates are Cassandra itself but embedded or on the side not
on the actual data clusters, zookeeper (yuck) , Kafka (which needs
zookeeper, yuck) , S3 (outside service dependency, so no go. )
Jeff, Those are great patterns. ESP. Second one. Have used it several
times. Cassandra is a great place to store data in transport.
Rahul
Also using Calvin means having to implement a distributed monotonic
sequence as a primitive, not trivial at all ...
On Mon, Sep 10, 2018 at 3:08 PM, Rahul Singh <
Post by Rahul Singh
In response to mimicking Advanced replication in DSE. I understand the
goal. Although DSE advanced replication does one way, those are use cases
with limited value to me because ultimately it’s still a master slave
design.
I’m working on a prototype for this for two way replication between
clusters or databases regardless of dB tech - and every variation I can get
to comes down to some implementation of the Calvin protocol which basically
verifies the change in either cluster , sequences it according to impact to
underlying data, and then schedules the mutation in a predictable manner on
both clusters / DBS.
All that means is that I need to sequence the change before it happens
so I can predictably ensure it’s Scheduled for write / Mutation. So I’m
Back to square one: having a definitive queue / ledger separate from the
individual commit log of the cluster.
Rahul Singh
Chief Executive Officer
m 202.905.2818
Anant Corporation
1010 Wisconsin Ave NW, Suite 250
<https://maps.google.com/?q=1010+Wisconsin+Ave+NW,+Suite+250+%0D%0AWashington,+D.C.+20007&entry=gmail&source=g>
Washington, D.C. 20007
We build and manage digital business technology platforms.
There may be some use cases for it.. but I'm not sure what they are. It
might help if you shared the use cases where the extra complexity is
required? When does writing to Cassandra which then dedupes and writes to
Kafka a preferred design then using Kafka and simply writing to Cassandra?
From the reading of the proposal, it seems bring functionality similar
to MySQL's binlog to Kafka connector. This is useful for many applications
that want to be notified when certain (or any) rows change in the database
primarily for a event driven application architecture.
Implementing this in the database layer means there is a standard
approach to getting a change notification stream. Downstream subscribers
can then decide which notifications to act on.
LinkedIn's databus is similar in functionality -
https://github.com/linkedin/databus However it is for heterogenous
datastores.
Post by Joy Gao
We have a* WIP design doc
<https://wepayinc.box.com/s/fmdtw0idajyfa23hosf7x4ustdhb0ura>* that
goes over this idea in details.
We haven't sort out all the edge cases yet, but would love to get some
feedback from the community on the general feasibility of this approach.
Any ideas/concerns/questions would be helpful to us. Thanks!
Interesting idea. I did go over the proposal briefly. I concur with Jon
about adding more use-cases to clarify this feature's potential use-cases.
Dinesh
DuyHai Doan
2018-09-12 09:18:22 UTC
Permalink
The biggest problem of having CDC working correctly in C* is the
deduplication issue.

Having a process to read incoming mutation from commitlog is not that hard,
having to dedup them through N replicas is much harder

The idea is : why don't we generate the CDC event directly at the
coordinator side ? Indeed, the coordinator is the single source of true for
each mutation request. As soon as the coordinator receives 1
acknowledgement from any replica, the mutation can be considered "durable"
and safely sent downstream to the CDC processor. This approach would
requires to change the write path on the coordinator side and may have
impact on performance (if writing to CDC downstream is blocking or too slow)

My 2 cents
Post by Joy Gao
Re Rahul: "Although DSE advanced replication does one way, those are use
cases with limited value to me because ultimately it’s still a master slave
design."
Completely agree. I'm not familiar with Calvin protocol, but that sounds
interesting (reading time...).
Post by Joy Gao
Thank you all for the feedback so far.
The immediate use case for us is setting up a real-time streaming data
pipeline from C* to our Data Warehouse (BigQuery), where other teams can
access the data for reporting/analytics/ad-hoc query. We already do this
with MySQL
<https://wecode.wepay.com/posts/streaming-databases-in-realtime-with-mysql-debezium-kafka>,
where we stream the MySQL Binlog via Debezium <https://debezium.io>'s
MySQL Connector to Kafka, and then use a BigQuery Sink Connector to stream
data to BigQuery.
Re Jon's comment about why not write to Kafka first? In some cases that
may be ideal; but one potential concern we have with writing to Kafka first
is not having "read-after-write" consistency. The data could be written to
Kafka, but not yet consumed by C*. If the web service issues a (quorum)
read immediately after the (quorum) write, the data that is being returned
could still be outdated if the consumer did not catch up. Having web
service interacts with C* directly solves this problem for us (we could add
a cache before writing to Kafka, but that adds additional operational
complexity to the architecture; alternatively, we could write to Kafka and
C* transactionally, but distributed transaction is slow).
Having the ability to stream its data to other systems could make C* more
flexible and more easily integrated into a larger data ecosystem. As Dinesh
has mentioned, implementing this in the database layer means there is a
standard approach to getting a change notification stream (unlike trigger
which is ad-hoc and customized). Aside from replication, the change events
could be used for updating Elasticsearch, generating derived views (i.e.
for reporting), sending to an audit services, sending to a notification
service, and in our case, streaming to our data warehouse for analytics.
(one article that goes over database streaming is Martin Kleppman's Turning
the Database Inside Out with Apache Samza
<https://www.confluent.io/blog/turning-the-database-inside-out-with-apache-samza/>,
which seems relevant here). For reference, this turning database into a
stream of change events is pretty common in SQL databases (i.e. mysql
binlog, postgres WAL) and NoSQL databases that have primary-replica setup
(i.e. Mongodb Oplog). Recently CockroachDB introduced a CDC feature as well
(and they have master-less replication too).
Hope that answers the question. That said, dedupe/ordering/getting full
row of data via C* CDC is a hard problem, but may be worth solving for
reasons mentioned above. Our proposal is an user approach to solve these
problems. Maybe the more sensible thing to do is to build it as part of C*
itself, but that's a much bigger discussion. If anyone is building a
streaming pipeline for C*, we'd be interested in hearing their approaches
as well.
Post by Rahul Singh
You know what they say: Go big or go home.
Right now candidates are Cassandra itself but embedded or on the side
not on the actual data clusters, zookeeper (yuck) , Kafka (which needs
zookeeper, yuck) , S3 (outside service dependency, so no go. )
Jeff, Those are great patterns. ESP. Second one. Have used it several
times. Cassandra is a great place to store data in transport.
Rahul
Also using Calvin means having to implement a distributed monotonic
sequence as a primitive, not trivial at all ...
On Mon, Sep 10, 2018 at 3:08 PM, Rahul Singh <
Post by Rahul Singh
In response to mimicking Advanced replication in DSE. I understand the
goal. Although DSE advanced replication does one way, those are use cases
with limited value to me because ultimately it’s still a master slave
design.
I’m working on a prototype for this for two way replication between
clusters or databases regardless of dB tech - and every variation I can get
to comes down to some implementation of the Calvin protocol which basically
verifies the change in either cluster , sequences it according to impact to
underlying data, and then schedules the mutation in a predictable manner on
both clusters / DBS.
All that means is that I need to sequence the change before it happens
so I can predictably ensure it’s Scheduled for write / Mutation. So I’m
Back to square one: having a definitive queue / ledger separate from
the individual commit log of the cluster.
Rahul Singh
Chief Executive Officer
m 202.905.2818
Anant Corporation
1010 Wisconsin Ave NW, Suite 250
<https://maps.google.com/?q=1010+Wisconsin+Ave+NW,+Suite+250+%0D%0AWashington,+D.C.+20007&entry=gmail&source=g>
Washington, D.C. 20007
We build and manage digital business technology platforms.
There may be some use cases for it.. but I'm not sure what they are.
It might help if you shared the use cases where the extra complexity is
required? When does writing to Cassandra which then dedupes and writes to
Kafka a preferred design then using Kafka and simply writing to Cassandra?
From the reading of the proposal, it seems bring functionality similar
to MySQL's binlog to Kafka connector. This is useful for many applications
that want to be notified when certain (or any) rows change in the database
primarily for a event driven application architecture.
Implementing this in the database layer means there is a standard
approach to getting a change notification stream. Downstream subscribers
can then decide which notifications to act on.
LinkedIn's databus is similar in functionality -
https://github.com/linkedin/databus However it is for heterogenous
datastores.
Post by Joy Gao
We have a* WIP design doc
<https://wepayinc.box.com/s/fmdtw0idajyfa23hosf7x4ustdhb0ura>* that
goes over this idea in details.
We haven't sort out all the edge cases yet, but would love to get some
feedback from the community on the general feasibility of this approach.
Any ideas/concerns/questions would be helpful to us. Thanks!
Interesting idea. I did go over the proposal briefly. I concur with Jon
about adding more use-cases to clarify this feature's potential use-cases.
Dinesh
Jay Zhuang
2018-09-12 17:08:12 UTC
Permalink
We have the similar use case: Streamific, the Ingestion Service for Hadoop
Big Data at Uber Engineering <https://eng.uber.com/streamific/>. We had
this data ingestion pipeline built on MySQL/schemaless
<https://eng.uber.com/schemaless-part-one/> before using Cassandra. For
Cassandra, we used to do double write to Cassandra/Kafka and moving to CDC
(as dual write has its own issues). Here is one of the use cases we
opensourced: Introducing AthenaX, Uber Engineering’s Open Source Streaming
Analytics Platform <https://eng.uber.com/athenax/>. For most of our use
cases, we cannot put kafka before Cassandra to get consistency requirement.
We're having the same challenges
<https://github.com/ngcc/ngcc2017/blob/master/CassandraDataIngestion.pdf> for
CDC, and here is what we currently do for the dedup and full update (not
perfect, we're still working on improving it):

Deduplication: currently we de-dup the data in the kafka consumer instead
of the producer which means there're 3 (RF number) copies of data in Kafka.
We're working on dedup with the cache as mentioned before (also in the PPT
<https://github.com/ngcc/ngcc2017/blob/master/CassandraDataIngestion.pdf>),
but we also want to make sure the downstream consumer is able to handle
duplicated data, as the cache won't cover 100% de-dup the data (also in our
case, cache layer has lower SLA).

Full row update: MySQL provides the full row in binlog. Cassandra commitlog
only has the updated fields, but the downstream consumer has all the
historical data and it could be merged there: Hudi: Uber Engineering’s
Incremental Processing Framework on Hadoop <https://eng.uber.com/hoodie/>,
it's also opensourced here <https://uber.github.io/hudi/index.html>.

Just FYI. ElasticSearch is also another consumer of the kafka topic: Databook:
Turning Big Data into Knowledge with Metadata at Uber
<https://eng.uber.com/databook/>. And we opensourced the data auditing
system for the pipeline: Introducing Chaperone: How Uber Engineering Audits
Kafka End-to-End <https://eng.uber.com/chaperone/>
We're also exploring Cache invalidation with CDC, currently, the update lag
(10 seconds) is the blocker issue for that.
Post by DuyHai Doan
The biggest problem of having CDC working correctly in C* is the
deduplication issue.
Having a process to read incoming mutation from commitlog is not that
hard, having to dedup them through N replicas is much harder
The idea is : why don't we generate the CDC event directly at the
coordinator side ? Indeed, the coordinator is the single source of true for
each mutation request. As soon as the coordinator receives 1
acknowledgement from any replica, the mutation can be considered "durable"
and safely sent downstream to the CDC processor. This approach would
requires to change the write path on the coordinator side and may have
impact on performance (if writing to CDC downstream is blocking or too slow)
My 2 cents
Post by Joy Gao
Re Rahul: "Although DSE advanced replication does one way, those are use
cases with limited value to me because ultimately it’s still a master slave
design."
Completely agree. I'm not familiar with Calvin protocol, but that sounds
interesting (reading time...).
Post by Joy Gao
Thank you all for the feedback so far.
The immediate use case for us is setting up a real-time streaming data
pipeline from C* to our Data Warehouse (BigQuery), where other teams can
access the data for reporting/analytics/ad-hoc query. We already do
this with MySQL
<https://wecode.wepay.com/posts/streaming-databases-in-realtime-with-mysql-debezium-kafka>,
where we stream the MySQL Binlog via Debezium <https://debezium.io>'s
MySQL Connector to Kafka, and then use a BigQuery Sink Connector to stream
data to BigQuery.
Re Jon's comment about why not write to Kafka first? In some cases that
may be ideal; but one potential concern we have with writing to Kafka first
is not having "read-after-write" consistency. The data could be written to
Kafka, but not yet consumed by C*. If the web service issues a (quorum)
read immediately after the (quorum) write, the data that is being returned
could still be outdated if the consumer did not catch up. Having web
service interacts with C* directly solves this problem for us (we could add
a cache before writing to Kafka, but that adds additional operational
complexity to the architecture; alternatively, we could write to Kafka and
C* transactionally, but distributed transaction is slow).
Having the ability to stream its data to other systems could make C*
more flexible and more easily integrated into a larger data ecosystem. As
Dinesh has mentioned, implementing this in the database layer means there
is a standard approach to getting a change notification stream (unlike
trigger which is ad-hoc and customized). Aside from replication, the change
events could be used for updating Elasticsearch, generating derived views
(i.e. for reporting), sending to an audit services, sending to a
notification service, and in our case, streaming to our data warehouse for
analytics. (one article that goes over database streaming is Martin
Kleppman's Turning the Database Inside Out with Apache Samza
<https://www.confluent.io/blog/turning-the-database-inside-out-with-apache-samza/>,
which seems relevant here). For reference, this turning database into a
stream of change events is pretty common in SQL databases (i.e. mysql
binlog, postgres WAL) and NoSQL databases that have primary-replica setup
(i.e. Mongodb Oplog). Recently CockroachDB introduced a CDC feature as well
(and they have master-less replication too).
Hope that answers the question. That said, dedupe/ordering/getting full
row of data via C* CDC is a hard problem, but may be worth solving for
reasons mentioned above. Our proposal is an user approach to solve these
problems. Maybe the more sensible thing to do is to build it as part of C*
itself, but that's a much bigger discussion. If anyone is building a
streaming pipeline for C*, we'd be interested in hearing their approaches
as well.
On Tue, Sep 11, 2018 at 7:01 AM Rahul Singh <
Post by Rahul Singh
You know what they say: Go big or go home.
Right now candidates are Cassandra itself but embedded or on the side
not on the actual data clusters, zookeeper (yuck) , Kafka (which needs
zookeeper, yuck) , S3 (outside service dependency, so no go. )
Jeff, Those are great patterns. ESP. Second one. Have used it several
times. Cassandra is a great place to store data in transport.
Rahul
Also using Calvin means having to implement a distributed monotonic
sequence as a primitive, not trivial at all ...
On Mon, Sep 10, 2018 at 3:08 PM, Rahul Singh <
Post by Rahul Singh
In response to mimicking Advanced replication in DSE. I understand the
goal. Although DSE advanced replication does one way, those are use cases
with limited value to me because ultimately it’s still a master slave
design.
I’m working on a prototype for this for two way replication between
clusters or databases regardless of dB tech - and every variation I can get
to comes down to some implementation of the Calvin protocol which basically
verifies the change in either cluster , sequences it according to impact to
underlying data, and then schedules the mutation in a predictable manner on
both clusters / DBS.
All that means is that I need to sequence the change before it happens
so I can predictably ensure it’s Scheduled for write / Mutation. So I’m
Back to square one: having a definitive queue / ledger separate from
the individual commit log of the cluster.
Rahul Singh
Chief Executive Officer
m 202.905.2818
Anant Corporation
1010 Wisconsin Ave NW, Suite 250
<https://maps.google.com/?q=1010+Wisconsin+Ave+NW,+Suite+250+%0D%0AWashington,+D.C.+20007&entry=gmail&source=g>
Washington, D.C. 20007
We build and manage digital business technology platforms.
There may be some use cases for it.. but I'm not sure what they are.
It might help if you shared the use cases where the extra complexity is
required? When does writing to Cassandra which then dedupes and writes to
Kafka a preferred design then using Kafka and simply writing to Cassandra?
From the reading of the proposal, it seems bring functionality similar
to MySQL's binlog to Kafka connector. This is useful for many applications
that want to be notified when certain (or any) rows change in the database
primarily for a event driven application architecture.
Implementing this in the database layer means there is a standard
approach to getting a change notification stream. Downstream subscribers
can then decide which notifications to act on.
LinkedIn's databus is similar in functionality -
https://github.com/linkedin/databus However it is for heterogenous
datastores.
Post by Joy Gao
We have a* WIP design doc
<https://wepayinc.box.com/s/fmdtw0idajyfa23hosf7x4ustdhb0ura>* that
goes over this idea in details.
We haven't sort out all the edge cases yet, but would love to get
some feedback from the community on the general feasibility of this
approach. Any ideas/concerns/questions would be helpful to us. Thanks!
Interesting idea. I did go over the proposal briefly. I concur with
Jon about adding more use-cases to clarify this feature's potential
use-cases.
Dinesh
Jeff Jirsa
2018-09-10 21:27:00 UTC
Permalink
Post by Jonathan Haddad
I'll be honest, I'm having a hard time wrapping my head around an
architecture where you use CDC to push data into Kafka. I've worked on
plenty of systems that use Kafka as a means of communication, and one of
the consumers is a process that stores data in Cassandra. That's pretty
normal. Sending Cassandra mutations to Kafka, on the other hand, feels
backwards and for 99% of teams, more work than it's worth.
There may be some use cases for it.. but I'm not sure what they are. It
might help if you shared the use cases where the extra complexity is
required? When does writing to Cassandra which then dedupes and writes to
Kafka a preferred design then using Kafka and simply writing to Cassandra?
Somewhat contrived, but:

1) Sending all mutations to an audit service to look for suspicious
activity (e.g. looking for someone doing something mailicous in an app with
direct db access),
2) General purpose composable pipelines (job A writes spark -> cassandra,
job B takes cassandra -> mysql / hadoop / whatever via kafka CDC)

Agree that it seems less common, but I'm sure there's a real use case for
it somewhere.
Loading...