Read Kafka topic from beginning in KStream
My spring boot project has an application that demonstrates Kafka Streams API. I am able to consume all the messages in topic customer
using the command
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic customer --from-beginning
What's the similar command in Kafka Streams API to consume messages with KStream or KTable? I tried
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
properties.put("auto.offset.reset", "earliest");
Both didn't work. I did create a test case to consume with KafkaConsumer
instead of Streams, it didn't work. Code uploaded to Github for reference. Any help would be great.
See also questions close to this topic
-
Regular network connection failure of a Kafka broker in a 3 broker cluster in OpenShift
For reasons unknown and often several times a week in Production and Test, we cannot communicate with a Kafka broker, and this message repeats in the log: WARN Connection to node nnnn could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Strangely this in turn prevents Kafka working (We cannot Produce/Consume).
OpenShift doesn't recognize it's not working, and Kafka doesn't recognize it either.
I am about to add a Livenessprobe to the YAML to restart the Pod if the command in a Broker container is not executed, but we'd like to find the root cause naturally.
If I use the Curl url:hostport command from another Broker or Zookeeper node, you can get a reply from all other Brokers and Zookeepers. Yet Curl to the Kafka node that has "failed" returns "Could not resolve host ...", even though I can go into OpenShift and use the Terminal. I cannot find any other errors in the logs.
I don't know if this is a Kafka or OpenShift/Kubernetes issue.
If anyone else has had this and resolved it, I'd be grateful for some pointers.
-
Kafka - error when producing from command line (character ('<' (code 60)): expected a valid value)
I spinned on my laptop a Kafka in Docker (with docker-compose).
After that, created new kafka topic with:
kafka-topics --zookeeper localhost:2181 --create --topic simple --replication-factor 1 --partitions 1
(did not create schema in Schema Registry yet).
Now trying to produce (based on this example - step 3 - https://docs.confluent.io/4.0.0/quickstart.html):
kafka-avro-console-producer \ --broker-list localhost:9092 --topic simple \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Entering value:
{"f1": "value1"}
Error:
{"f1": "value1"} org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]} Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null') at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:299) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:294) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:61) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:100) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79) at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:166) at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:59) at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
How to solve this?
Can it be because Kafka cluster uses SSL but error is bogus? Thanks. -
Kafka inside Docker - how to read/write to a topic from command line?
I have a Kafka running inside Docker with SSL enabled.
docker ps
:CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES b8f6b1c573a1 nginx:1.13.9-alpine "nginx -g 'daemon of…" 9 hours ago Up 9 hours 80/tcp, 0.0.0.0:8081->443/tcp ng 761ce6ee2960 confluentinc/cp-schema-registry:4.0.0 "/etc/confluent/dock…" 9 hours ago Up 9 hours 0.0.0.0:8080->8080/tcp, 8081/tcp sr 16d7b81dfbc8 confluentinc/cp-kafka:4.0.0 "/etc/confluent/dock…" 9 hours ago Up 9 hours 0.0.0.0:9092-9093->9092-9093/tcp k1 9be579992536 confluentinc/cp-zookeeper:4.0.0 "/etc/confluent/dock…" 9 hours ago Up 9 hours 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zk
How to write to a topic from command line?
Tried (topic 'test' exists):kafka-console-producer --broker-list kafka:9093 --topic test # [2018-04-23 17:55:14,325] ERROR Error when sending message to topic test with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) # org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. (60s timeout) kafka-console-producer --broker-list kafka:9092 --topic test >aa #[2018-04-23 18:00:59,443] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient) #[2018-04-23 18:00:59,444] ERROR Error when sending message to topic test with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) # org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test] kafka-console-producer --broker-list localhost:9092 --topic test dnk306@9801a7a5b33d >aa #[2018-04-23 21:52:47,056] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient) #[2018-04-23 21:52:47,056] ERROR Error when sending message to topic test with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) #org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
-
Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?
We're in need to do some pre-processing on every message (decrypt/re-encrypt with different keys) from one topic into another one.
I've been looking into using Kafka Connect since it provides a lot of good things out of the box (config management, offset storage, error handling, etc.).
But it also feels that I'd end implementing
SourceConnector
andSinkConnector
to just move data between two topics and neither of those interfaces are meant to doTopic A -> (Connector) -> Topic B
. Is this the right approach? Should I just useSinkConnector
alone and have mySourceTask.put()
do all the logic to write to Kafka?Other options are
KafkaConsumer/Producer
and or Streams but these will then need its own instance(s) to run the logic, not offset retry error handling. -
Polling with auto commit enabled in Kafka
When auto commit is enabled in Kafka and polling is triggered, is the (maximum present) offset committed before or after reading the new records?
Here we can assume that auto commit interval necessitate auto commit with this poll event.
Let me know, if question is not very clear.
Thanks.
-
Rewind Offset Spark Structured Streaming from Kafka
I am using spark structured streaming (2.2.1) to consume a topic from Kafka (0.10).
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", fromKafkaServers) .option("subscribe", topicName) .option("startingOffset", "earliest") .load()
My checkpoint location is set on an external HDFS dir. In some cases, I would like to restart the streaming application and consume data from the beginning. However, even though I delete all the checkpointing data from the HDFS dir and resubmit the jar, Spark is still able to find my last consumed offset and resume from there. Where else does the offset live? I suspect it is related to Kafka Consumer Id. However, I am unable to set group.id with spark structured streaming per Spark Doc and it seems like all applications subscribing to the same topic gets assigned to one consumer group. What if I would like to have two independent streaming job running that subscribes to the same topic?
-
Using Kafka Streams GlobalKTable to invert many-to-many relationship
I have a compacted Kafka topic that is a stream of entities have the latest representation of that entity in a many-to-many relationship that I'd like to invert.
An example would be a topic of
Author
objects where the topic key is theAuthor.id
(AAA) and the value is an array of `Book' identifier values:"AAA" -> {"books": [456]}
When an
Author
writes a newBook
with an ID of333
, a new event with the same key is written to the stream with the updated list of books:"AAA" -> {"books": [456, 333]}
It is also possible that a
Book
had multipleAuthors
so that sameBook
identifier could appear in another event:"BBB" -> {"books": [333, 555]}
I'd like to invert this using kafka streams into a stream of
Books -> [Author]
, so the above events would result in something like:456 -> {"authors": ["AAA"]} 333 -> {"authors": ["AAA", "BBB"]} 555 -> {"authors": ["BBB"]}
When I start my app up again, I want the state to be restored such that if I read in another
Author
record it will invert the relationship appropriatley. So this:"CCC" -> {"books": [555]}
would know that
"BBB"
was also anAuthor
and would emit the updated event:555 -> {"authors": ["BBB", "CCC"]}
I've been eyeing the
GlobalKTable
which reads in the full topic state locally, but can't figure out how to get it to invert the relationship and aggregate the values together.If I could, I think I could join that
GlobalKTable
with a stream of the events and get the full list ofAuthor
s for eachBook
. -
Avro Schema - Can I Use a Custom Type
How can I use a custom type in my Avro Schemas?
I have the following example with the type
CustomType
- I've defined it in thefields
section however when I use it withinsubtitle
it cannot validate it sayingCustomType is not valid
.Any ideas?
{ "type": "record", "name": "TestSchema", "namespace": "com.test", "fields": [ { "name": "title", "type": { "type": "record", "name": "CustomType", "fields": [ { "name": "value", "type": "string" }, { "name": "secondLine", "type": "string" } ] } }, { "name": "subtitle", "type": "CustomType" } ] }
-
UNKNOWN_PRODUCER_ID When using apache kafka streams (scala)
I am running 3 instances of a service that I wrote using:
- Scala 2.11.12
- kafkaStreams 1.1.0
- kafkaStreamsScala 0.2.1 (by lightbend)
The service uses Kafka streams with the following topology (high level):
- InputTopic
- Parse to known Type
- Clear messages that the parsing failed on
- split every single message to 6 new messages
- on each message run: map.groupByKey.reduce(with local store).toStream.to
Everything works as expected but i can't get rid of a WARN message that keeps showing:
15:46:00.065 [kafka-producer-network-thread | my_service_name-1ca232ff-5a9c-407c-a3a0-9f198c6d1fa4-StreamThread-1-0_0-producer] [WARN ] [o.a.k.c.p.i.Sender] - [Producer clientId=my_service_name-1ca232ff-5a9c-407c-a3a0-9f198c6d1fa4-StreamThread-1-0_0-producer, transactionalId=my_service_name-0_0] Got error produce response with correlation id 28 on topic-partition my_service_name-state_store_1-repartition-1, retrying (2 attempts left). Error: UNKNOWN_PRODUCER_ID
As you can see, I get those errors from the INTERNAL topics that Kafka stream manage. Seems like some kind of retention period on the producer metadata in the internal topics / some kind of a producer id reset.
Couldn't find anything regarding this issue, only a description of the error itself from here:
ERROR CODE RETRIABLE DESCRIPTION UNKNOWN_PRODUCER_ID 59 False This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producer id are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
Hope you can help, Thanks
Edit: It seems that the WARN message does not pop up on version 1.0.1 of kafka streams.