Reading an Avro Key Value in Java
Or, how to produce and eat Kafka records using Avro serialization in Coffee.
And so far nosotros've seen how to produce and consume simple Cord records using Coffee and console tools. In this post, I would like to show you lot how to send and read Avro messages from Java using the kafka-clients library.
If you're new to Avro, I take written a full post about why you should consider Avro serialization for Kafka messages, then check it out to learn more.
Running a Kafka cluster locally
To examination the producers and consumers, permit'south run a Kafka cluster locally, consisting of one banker, one zookeeper and a Schema Registry.
To simplify our job, nosotros will run these servers as Docker containers, using docker-compose.
Don't have docker-compose? Cheque: how to install docker-etch
I've prepared a docker-compose file which you lot tin grab from Coding Harbour'south GitHub:
git clone https://github.com/codingharbour/kafka-docker-compose.git
One time you have the projection, navigate to a binder called single-node-avro-kafka and start the Kafka cluster:
docker-compose up -d
The output should look something like this:
$ docker-etch up -d Starting sna-zookeeper ... done Starting sna-kafka ... done Starting sna-schema-registry ... done
Your local Kafka cluster is now prepare to be used. By running docker-etch ps, we tin see that the Kafka banker is available on port 9092, while the Schema Registry runs on port 8081. Brand a note of that, considering we'll demand it before long.
$ docker-compose ps Name Control State Ports ---------------------------------------------------------------------------------------------------- sna-kafka /etc/confluent/docker/run Upwardly 0.0.0.0:9092->9092/tcp sna-schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp sna-zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
Defining the Avro Schema
Let'due south create a schema for the letters we'll be sending through Kafka. We'll call our message: SimpleMessage, and information technology volition take two fields:
- content – a cord field, holding the message nosotros desire to ship and
- date_time – human-readable engagement-fourth dimension showing when the message was sent
Avro schemas are written in a JSON format, and so our SimpleMessage schema will wait like this:
{ "type": "record", "name": "SimpleMessage", "namespace": "com.codingharbour.avro", "fields": [ {"name": "content", "type":"cord", "doc": "Bulletin content"}, {"name": "date_time", "type":"string", "medico": "Datetime when the message was generated"} ] }
The schema consists of couple of elements:
- Type – Describes a data type of the entire schema. Type 'record' means that the schema describes a complex information type, which includes other fields.
- Proper name – The name of the schema. In our case "SimpleMessage"
- Namespace – Namespace of the schema that qualifies the name. In our case, the namespace is "com.codingharbour.avro"
- Listing of fields – Ane or more fields that are in this complex information type
Each field in a schema is a JSON object with multiple attributes:
- name – proper name of the field
- blazon – data blazon of the field. Avro supports archaic types like int, string, bytes etc, and circuitous types like record, enum, etc
- doctor – Documentation for the given field
- default – the default value for the field, used by the consumer to populate the value when the field is missing from the message.
For more info on Avro data types and schema check the Avro spec.
Schema Registry
As I've mentioned in the previous post, every Avro bulletin contains the schema used to serialize it. But sending thousands or millions of messages per second with the aforementioned schema is a huge waste of bandwidth and storage space. That'southward where the Schema Registry, KafkaAvroSerializer and KafkaAvroDeserializer come into play.
Instead of writing the schema to the message, KafkaAvroSerializer volition write the schema to the Schema Registry and it will just write the schema id to the message. Then, when the Kafka record reaches the consumer, the consumer will use KafkaAvroDeserializer to fetch the schema from the Schema Registry based on the schema id from the message. Once the schema is fetched, the KafkaAvroDeserializer can deserialize the bulletin.
This is why, when using KafkaAvro(De)Serializer in a producer or a consumer, we need to provide the URL of the schema registry. Remember that our Schema Registry runs on port 8081.
Here's a snippet from our producer:
backdrop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.grade); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.course); backdrop.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
If we don't specify the URL, the (de)serializer will mutter when we try to send/read a message.
Ok, the next thing is to meet how an Avro schema gets translated into a Coffee object.
Avro record in Coffee
Note: do not misfile an Avro record with a Kafka record. Each Avro schema describes ane or more than Avro records. An Avro record is a complex information type in Avro, consisting of other fields, with their own data types (archaic or complex). Kafka record, on the other hand, consists of a key and a value and each of them can have separate serialization. Meaning, east.1000. that Kafka primal may be ane Avro record, while a Kafka value is some other Avro tape (if we choose to use Avro serialization for both the primal and the value).
When information technology comes to representing an Avro record in Coffee, Avro library provides 2 interfaces: GenericRecord or SpecificRecord. Let's see what the departure is and when to use which.
An case of a GenericRecord allows u.s.a. to access the schema fields either past alphabetize or by name, as seen below:
GenericRecord tape = ... ; //obtain a generic record //accessing the field by proper name record.put("date_time", "2020-01-01 12:45:00") String dateTime = (String) record.get("date_time"); //accessing the field by index record.put(0, "this is message number 1"); String content = (String) tape.get(0);
Using a GenericRecord is ideal when a schema is not known in advance or when yous desire to handle multiple schemas with the same code (due east.g. in a Kafka Connector). The drawback of GenericRecord is the lack of type-prophylactic. GenericRecord's put and get methods piece of work with Object.
SpecificRecord is an interface from the Avro library that allows usa to use an Avro record as a POJO. This is done by generating a Java form (or classes) from the schema, by using avro-maven-plugin. The generated class will implement the SpecificRecord interface, as seen below.
/* Class generated by avro-maven-plugin*/ public course SimpleMessage extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { //content removed for brevity } //using the SpecificRecord by using the actual implementation SimpleMessage simpleMessage = new SimpleMessage(); simpleMessage.setContent("Hello world");
The drawback of SpecificRecord is that y'all need to generate a course for each schema you lot plan to employ, in advance. Which again means you lot need the Avro schema in advance, to be able to generate the Java class.
Producing Avro messages using GenericRecord
Get-go, nosotros set the properties the producer needs. We specify our brokers, serializers for the primal and the value, equally well as the URL for the Schema Registry. So we instantiate the Kafka producer:
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); Producer producer = new KafkaProducer<>(properties);
As you see, we are using Cord serializer for the keys and Avro for values. Notice that the producer expects GenericRecords as the value of the Kafka record.
The next footstep is to create an instance of an Avro tape based on our schema:
//avro schema String simpleMessageSchema = "{" + " \"type\": \"record\"," + " \"proper name\": \"SimpleMessage\"," + " \"namespace\": \"com.codingharbour.avro\"," + " \"fields\": [" + " {\"name\": \"content\", \"type\": \"string\", \"md\": \"Message content\"}," + " {\"name\": \"date_time\", \"type\": \"cord\", \"doc\": \"Datetime when the message\"}" + " ]" + "}"; //parse the schema Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(simpleMessageSchema); //fix the avro record GenericRecord avroRecord = new GenericData.Tape(schema); avroRecord.put("content", "Hello earth"); avroRecord.put("date_time", Instant.now().toString());
Here, we specified the schema straight in the code. By parsing the schema we get a Schema object, which nosotros use to instantiate a new GenericRecord. Finally, we fix the record's fields by proper name, using the put method.
The last thing to practice is create a Kafka record with nada key and Avro record in the value and write it to a topic chosen avro-topic:
//gear up the kafka tape ProducerRecord record = new ProducerRecord<>("avro-topic", zippo, avroRecord); producer.send(tape); //ensures record is sent before closing the producer producer.flush(); producer.close();
Producing Avro letters using SpecificRecord
Some other way to produce the same record as above is to use the SpecificRecord interface. We will generate a Coffee cclass from the Avro schema using the avro-maven-plugin. We'll add the plugin to our pom.xml:
org.apache.avro avro-maven-plugin 1.9.2 generate-sources schema ${projection.basedir}/src/chief/avro/ ${project.basedir}/target/generated-sources/avro/ String
Avro plugin is configured in a higher place to generate classes based on schemas in the src/primary/avro binder and to store the classes in the target/generated-sources/avro/.
If yous cheque the src/master/avro binder, yous will see the Avro schema for our SimpleMessage. It's the same schema we used in the GenericRecord example above. When y'all execute mvn compile, the SimpleMessage class volition be generated in the target binder.
Then we'll define properties for the Kafka producer, aforementioned as in the GenericRecord example:
Properties backdrop = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); backdrop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.form); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.course); properties.put("schema.registry.url", "http://localhost:8081"); Producer producer = new KafkaProducer<>(backdrop);
The simply deviation compared to the GenericRecord example is the blazon for the value of the Kafka record, which is now SpecificRecord.
Next, we create the instance of the SimpleMessage:
//create the specific record SimpleMessage simpleMessage = new SimpleMessage(); simpleMessage.setContent("Hello world"); simpleMessage.setDateTime(Instant.now().toString());
And lastly, nosotros create a Kafka record and write it to the "avro-topic" topic:
ProducerRecord record = new ProducerRecord<>("avro-topic", null, simpleMessage); producer.send(record); //ensures record is sent before closing the producer producer.affluent(); producer.close();
Note that both producers above take written to a topic called 'avro-topic'. Then we at present have ii records to consume. Let's see how nosotros tin can create the consumers.
Consuming Avro messages using GenericRecord
The consumer that uses GenericRecord, does not demand a schema nor a Coffee grade generated from the schema. All the data volition be obtained by the deserializer from the schema registry.
First, we'll create properties for the consumer and istantiate it:
Properties properties = new Backdrop(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); backdrop.put(ConsumerConfig.GROUP_ID_CONFIG, "generic-record-consumer-group"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); backdrop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.form); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.grade); properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); KafkaConsumer consumer = new KafkaConsumer<>(properties);
Then we'll subscribe our consumer to the 'avro-topic' topic and starting time listening for records:
consumer.subscribe(Collections.singleton("avro-topic")); //poll the tape from the topic while (truthful) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord tape : records) { Organization.out.println("Message content: " + record.value().get("content")); System.out.println("Message time: " + record.value().become("date_time")); } consumer.commitAsync(); }
Hither nosotros get the field values by name, using the Object go(String fundamental) method of GenericRecord.
Consuming Avro messages using SpecificRecord
The last thing to evidence is how to eat Avro Kafka record, which is automatically cast into proper Java class, generated from Avro schema.
As before, we'll showtime with preparing properties for the consumer and instantiating it:
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "specific-tape-consumer-group"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "primeval"); backdrop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, fake); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); //ensures records are properly converted KafkaConsumer consumer = new KafkaConsumer<>(properties);
Everything is the same every bit with the previous consumer, except the 3rd line from the bottom. Permit's look at it again:
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
This line is necessary if you lot want your Avro records to be properly converted into the excepted Coffee form (in our case, this is SimpleMessage).
Now, all we have to do is subscribe our consumer to the topic and start consuming:
consumer.subscribe(Collections.singleton("avro-topic")); //poll the tape from the topic while (true) { ConsumerRecords records = consumer.poll(Elapsing.ofMillis(100)); for (ConsumerRecord record : records) { Arrangement.out.println("Message content: " + record.value().getContent()); //1 System.out.println("Message time: " + record.value().getDateTime()); //ii } consumer.commitAsync(); }
Y'all come across in a higher place lines marked ane and 2 how the fields of SimpleMessage are accessed using proper getter methods.
In that location y'all have it, two ways to produce and two ways to swallow Kafka Avro records. Hope this was helpful.
Equally always, the code from this web log mail service is available on the CodingHarbour'south Github repo: https://github.com/codingharbour/kafka-avro
Would you lot like to acquire more about Kafka?
I have created a Kafka mini-course that you lot can get absolutely complimentary. Sign up below and I will transport you lessons directly to your inbox.
Photo credit: @bosco_shots
Related posts:
stanfordinecting38.blogspot.com
Source: https://codingharbour.com/apache-kafka/guide-to-apache-avro-and-kafka/
0 Response to "Reading an Avro Key Value in Java"
Post a Comment