2016-08-18

Getting started with Aiven Kafka

Apache Kafka is a high-throughput publish-subscribe message broker, perfect for providing messaging for microservices. Aiven offers fully managed Kafka-as-a-Service in Amazon Web Services, Google Compute Engine, DigitalOcean and UpCloud (Microsoft Azure support is coming during 2016 Q3!).

Apache Kafka


Apache Kafka is a popular open-source publish-subscribe message broker. Kafka is distributed by design and offers scalability, high-throughput and fault-tolerance. Kafka excels in streaming data workloads, ingesting and serving hundreds of megabytes per second from thousands of clients.

Apache Kafka deployment example



Apache Kafka was originally developed by LinkedIn and open sourced in 2011.

Kafka is often used as a central piece in analytics, telemetry and log aggregation workloads, where it is used to capture and distribute event data at very high data rates. It can act as a communications hub for microservices for distributing work over a large cluster of nodes.

With Aiven, we use Kafka as a message bus between our cluster nodes as well as delivering telemetry, statistics and logging data. Kafka's guarantees for message delivery and fault-tolerance allows us to simplify and de-couple service components.

What is Aiven Kafka


Aiven Kafka is our fully managed Kafka service. We take care of the deployment and operational burden of running your own Kafka service, and make sure your cluster stays available, healthy and always up-to-date. We ensure your data remains safe by encrypting it both in transit and at rest on disk.

We offer multiple different plan types with different cluster sizing and capacity, and charge only based on your actual use on an hourly basis. Custom plans for deployments that are larger or have specific needs can also be requested. Aiven also makes it possible to migrate between the plans with no downtime to address changes in your requirements.

Below, I'll walk you through setting up and running with your first Aiven Kafka service.


Getting started with Aiven Kafka


Creating Aiven Kafka service is easy: just select the correct service type from the drop down menu on the new service creation dialog. You'll have the option of selecting three or five node cluster plans with the storage sizing of your choice. The larger node count allows for larger throughput or larger replica factors for mission critical data. If unsure, pick a three node cluster; you can always change the selected plan at a later time.



All Aiven services are offered over SSL encrypted connections for your protection. With Kafka, you're also required to perform client authentication with service certificates we provide. You can find and download these keys and certificates on the connection parameters section on the service details page: access key and certifications plus CA certificate you can use to verify the Aiven endpoint. Store these locally, we'll be referring back to them in code examples below (ca.crt, client.crt, client.key).




Finally, you can create the topics you'd like to use under the topics tab on the service details page. In Kafka terms, topics are logical channels that your send messages to and read them from. Topics themselves are divided into one or more partitions. Partitions can be used to handle larger read/write rates, but do note that Kafka's ordering guarantees are only valid within one partition.

When creating a topic, you can select number of partitions, number of replicas and how many hours the messages are retained in the Kafka logs before deletion. You also can increase the number of partitions at a later time.



That's it! The service is up and running and ready to capture and distribute your messages. Aiven team will take care of the operational burden of your cluster, and ensure it remains available and in use at all times. To utilize the service, we've included code examples in Python and Node.js below. Just make sure to replace the value of bootstrap_servers below with the service URL from the service details page. Also, verify that the SSL settings below point to the actual key and certificate files downloaded earlier.

Accessing Aiven Kafka in Python


Producing messages - Kafka term for sending them:

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers="getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705",
    security_protocol="SSL",
    ssl_cafile="ca.crt",
    ssl_certfile="client.crt",
    ssl_keyfile="client.key",
)

for i in range(1, 4):
    message = "message number {}".format(i)
    print("Sending: {}".format(message))
    producer.send("demo-topic", message.encode("utf-8"))

# Wait for all messages to be sent
producer.flush()

Consuming or receiving the same:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "demo-topic",
    bootstrap_servers="getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705",
    client_id="demo-client-1",
    group_id="demo-group",
    security_protocol="SSL",
    ssl_cafile="ca.crt",
    ssl_certfile="client.crt",
    ssl_keyfile="client.key",
)

for msg in consumer:
    print("Received: {}".format(msg.value))

Output from the producer above:

$ python kafka-producer.py
Sending: message number 1
Sending: message number 2
Sending: message number 3

And the consuming side:

$ python kafka-consumer.py
Received: message number 1
Received: message number 2
Received: message number 3

 

Accessing Aiven Kafka in Node.js


Here's a Node.js example utilizing node-rdkafka module:

var Kafka = require('node-rdkafka');

var producer = new Kafka.Producer({
    'metadata.broker.list': 'getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705',
    'security.protocol': 'ssl',
    'ssl.key.location': 'client.key',
    'ssl.certificate.location': 'client.crt',
    'ssl.ca.location': 'ca.crt',
    'dr_cb': true
});

producer.connect();

producer.on('ready', function() {
    var topic = producer.Topic('demo-topic', {'request.required.acks': 1});
    producer.produce({
        message: new Buffer('Hello world!'),
        topic: topic,
    }, function(err) {
        if (err) {
            console.log('Failed to send message', err);
        } else {
            console.log('Message sent successfully');
        }
    });
});

And the consuming side:

var Kafka = require('node-rdkafka');

var consumer = new Kafka.KafkaConsumer({
    'metadata.broker.list': 'getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705',
    'group.id': 'demo-group',
    'security.protocol': 'ssl',
    'ssl.key.location': 'client.key',
    'ssl.certificate.location': 'client.crt',
    'ssl.ca.location': 'ca.crt',
});

var stream = consumer.getReadStream('demo-topic');

stream.on('data', function(data) {
    console.log('Got message:', data.message.toString());
});



Trying Aiven is free, no credit card required


Remember that trying Aiven is free: you will receive US$10 worth of free credits at sign-up which you can use to try any of our service plans. The offer works for all of our services: PostgreSQL, Redis, InfluxDB, Grafana, Elasticsearch and Kafka!

Go to https://aiven.io/ to get started!


Cheers,

    Team Aiven

8 comments:

  1. confused about: client_id="demo-client-1", group_id="demo-group" in the consumer (python version), what to fill in there?

    ReplyDelete
  2. The group_id defines the group your consumer belongs to. So say you have consumers a, b and c consuming a topic called mytopic, the group defines that its members should process an equal subset (amount of partitions) of the topic. So assuming you create 3 partitions and have the consumers (clients) a, b and c, each of them would get a single partition assigned to them.

    The client_id specifies the name of the client as it'll occur on Kafka logs, often people set it to the name of their container or VM.

    I've attached copypasted definitions of the arguments from kafka-python below:

    client_id (str): a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: 'kafka-python-{version}'

    group_id (str or None): name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: 'kafka-python-default-group'

    ReplyDelete
  3. Thanks for explanation.

    So, for this example I could just leave it as is, it does not depend on my provisioned service.

    How to tell the consumer from which topic to fetch the messages?
    Running the kafka-consumer.py gives:

    Traceback (most recent call last):
    File "kafka-consumer.py", line 11, in
    for msg in consumer:
    File "/usr/local/lib/python2.7/site-packages/six.py", line 558, in next
    return type(self).__next__(self)
    File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 850, in __next__
    return next(self._iterator)
    File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 771, in _message_generator
    assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
    AssertionError: No topic subscription or manual partition assignment


    ReplyDelete
  4. Seems to be a mistake in the example, KafkaConsumer takes an argument called topics which can be either a string or a list of strings.

    So in the case of the example it should be an additional keyword parameter topics='demo-topic' for the initialization of KafkaConsumer. I'll get it fixed in the actual text ASAP.

    ReplyDelete
  5. ah yes, with the following it works:

    consumer = KafkaConsumer("demo-topic",
    bootstrap_servers="getting-started-with-kafka.htn-aiven-demo.aivencloud.com:17705",
    client_id="demo-client-1",
    group_id="demo-group",
    security_protocol="SSL",
    ssl_cafile="ca.crt",
    ssl_certfile="client.crt",
    ssl_keyfile="client.key",
    )

    Thanks Hannu for your help.

    ReplyDelete
  6. Hi,

    Can you provide similar instructions for Java clients? The mechanism is different, as the cert/key files cannot be supplied directly, and as far as I can tell, the authentication requires creating a keystore and a truststore for the server.

    ReplyDelete
  7. Hi,

    Keystore/truststore management commands as well as examples of Java producer and consumer can be found in our support article: https://support.aiven.io/hc/en-us/articles/213574085-Getting-started-with-Aiven-Kafka. I hope this helps!

    ReplyDelete
  8. Thanks Heikki! This is very useful.

    ReplyDelete

Note: Only a member of this blog may post a comment.