Kafka Integration (w/ Kafka Connect) - How to read from the beginning of a topic?

Hi,

we are currently connecting our self-hosted Kafka with Rockset (using Kafka Connect + the Rockset connector).

It does work, but for some reason, I only get the latest data into Rockset, even though I set the “auto.offset.reset” in Kafka Connect to “earliest”… has anybody else stumbled on this bug? Is there a workaround (other than horrid hacks such as having to copy the topic to a temporary topic + having to copy that one back to the original one)?

As for the self-hosted Kafka we use Redpanda, but I doubt that that’s the reason for the Rockset connector behaving so strangely…

Best,
Ralph

1 Like

Hi @ralph -

I’m investigating. Will report back

–n

Hi @nadine -

thanks! When I polled the offset of the corresponding consumer group used by Kafka Connect, as soon as it was established, it was set to the “latest” offset of the topic… which is strange since the Kafka Connect log for the Rockset sink connector/consumer stated “auto.offset.reset=earliest”…

Best, Ralph

Hi @ralph ,

Try adding the “consumer.override.” prefix to “auto.offset.reset” in the Connector configuration. You’ll also need to set “connector.client.config.override.policy=All” in the worker properties. This will override the default setting used by the connect worker. It should look something like this:

{
    "name": "rockset-sink",
    "config":{
      "connector.class": "rockset.RocksetSinkConnector",
      "tasks.max": "5",
      "rockset.task.threads": "1",
      "rockset.batch.size": "1000",
      "topics": "<your-kafka-topics separated by commas>",
      "rockset.integration.key": "<rockset-kafka-integration-key>",
      "rockset.apiserver.url": "https://api.rs2.usw2.rockset.com",
      "format": "json",
      "consumer.override.auto.offset.reset": "earliest"
    }
}

More information on how this works can be found here: Worker Configuration Properties | Confluent Documentation.

I hope this helps.

Patrick

Hi Patrick,

thanks a lot! Tried that as well, but still got the same behavior… strange…

I have attached the logs from Kafka Connect to the end of this message - maybe you can see what is happening there…

Best,
Ralph

Kafka Connect Rockset connector config:
{
“name”: “rockset8”,
“config”:{
“connector.class”: “rockset.RocksetSinkConnector”,
“tasks.max”: “1”,
“topics”: “snacks1”,
“rockset.task.threads”: “1”,
“rockset.apiserver.url”: “https://api.euc1a1.rockset.com”,
“rockset.integration.key”: “kafka://…”,
“format”: “JSON”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”: “false”,
“value.converter.schemas.enable”: “false”,
“consumer.override.auto.offset.reset”: “earliest”
}
}

Kafka Connect logs:

[2022-09-20 10:48:21,944] INFO [Worker clientId=connect-1, groupId=kafka-connect] Connector rockset8 config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:21,947] INFO [Worker clientId=connect-1, groupId=kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
[2022-09-20 10:48:21,947] INFO [Worker clientId=connect-1, groupId=kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:21,949] INFO [Worker clientId=connect-1, groupId=kafka-connect] Successfully joined group with generation Generation{generationId=98, memberId=‘connect-1-c39e245b-4d6e-4c54-8d60-0b449a781fd4’, protocol=‘sessioned’} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:21,955] INFO [Worker clientId=connect-1, groupId=kafka-connect] Successfully synced group in generation Generation{generationId=98, memberId=‘connect-1-c39e245b-4d6e-4c54-8d60-0b449a781fd4’, protocol=‘sessioned’} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:21,955] INFO [Worker clientId=connect-1, groupId=kafka-connect] Joined group at generation 98 with protocol version 2 and got assignment: Assignment{error=0, leader=‘connect-1-c39e245b-4d6e-4c54-8d60-0b449a781fd4’, leaderUrl=‘…’, offset=391, connectorIds=[rockset8], taskIds=, revokedConnectorIds=, revokedTaskIds=, delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:21,955] INFO [Worker clientId=connect-1, groupId=kafka-connect] Starting connectors and tasks using config offset 391 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:21,956] INFO [Worker clientId=connect-1, groupId=kafka-connect] Starting connector rockset8 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:21,956] INFO Creating connector rockset8 of type rockset.RocksetSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:21,956] INFO SinkConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
topics = [snacks1]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig)
[2022-09-20 10:48:21,956] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
topics = [snacks1]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2022-09-20 10:48:21,957] INFO Instantiated connector rockset8 with version 1.0 of type class rockset.RocksetSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:21,958] INFO Finished creating connector rockset8 (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:21,958] INFO [Worker clientId=connect-1, groupId=kafka-connect] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:21,959] INFO Starting RocksetSinkConnector (rockset.RocksetSinkConnector)
[2022-09-20 10:48:21,959] INFO RocksetConnectorConfig values:
format = JSON
rockset.apikey = null
rockset.apiserver.url = …
rockset.collection = null
rockset.integration.key = kafka://d1KsokagFZPFJol6ggLyFCA2AbPOlSpwy85yHOsjBvxbRJ6TQTPFAPzuknhyUikO@api.euc1a1.rockset.com
rockset.task.threads = 1
rockset.workspace = commons
(rockset.RocksetConnectorConfig)
[2022-09-20 10:48:21,959] INFO Building Rockset connector config. Apiserver: …Number of Threads: 1, Format: JSON (rockset.RocksetConnectorConfig)
[2022-09-20 10:48:21,960] INFO SinkConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
topics = [snacks1]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig)
[2022-09-20 10:48:21,960] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
topics = [snacks1]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2022-09-20 10:48:22,475] INFO [Worker clientId=connect-1, groupId=kafka-connect] Tasks [rockset8-0] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:22,476] INFO [Worker clientId=connect-1, groupId=kafka-connect] Handling task config update by restarting tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:22,477] INFO [Worker clientId=connect-1, groupId=kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
[2022-09-20 10:48:22,477] INFO [Worker clientId=connect-1, groupId=kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,479] INFO [Worker clientId=connect-1, groupId=kafka-connect] Successfully joined group with generation Generation{generationId=99, memberId=‘connect-1-c39e245b-4d6e-4c54-8d60-0b449a781fd4’, protocol=‘sessioned’} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,486] INFO [Worker clientId=connect-1, groupId=kafka-connect] Successfully synced group in generation Generation{generationId=99, memberId=‘connect-1-c39e245b-4d6e-4c54-8d60-0b449a781fd4’, protocol=‘sessioned’} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,486] INFO [Worker clientId=connect-1, groupId=kafka-connect] Joined group at generation 99 with protocol version 2 and got assignment: Assignment{error=0, leader=‘connect-1-c39e245b-4d6e-4c54-8d60-0b449a781fd4’, leaderUrl=‘…’, offset=393, connectorIds=[rockset8], taskIds=[rockset8-0], revokedConnectorIds=, revokedTaskIds=, delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:22,488] INFO [Worker clientId=connect-1, groupId=kafka-connect] Starting connectors and tasks using config offset 393 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:22,488] INFO [Worker clientId=connect-1, groupId=kafka-connect] Starting task rockset8-0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:22,489] INFO Creating task rockset8-0 (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:22,489] INFO ConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig)
[2022-09-20 10:48:22,490] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2022-09-20 10:48:22,490] INFO TaskConfig values:
task.class = class rockset.RocksetSinkTask
(org.apache.kafka.connect.runtime.TaskConfig)
[2022-09-20 10:48:22,491] INFO Instantiated task rockset8-0 with version 0.0.0.0 of type rockset.RocksetSinkTask (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:22,491] INFO StringConverterConfig values:
converter.encoding = UTF8
converter.type = key
(org.apache.kafka.connect.storage.StringConverterConfig)
[2022-09-20 10:48:22,491] INFO StringConverterConfig values:
converter.encoding = UTF8
converter.type = value
(org.apache.kafka.connect.storage.StringConverterConfig)
[2022-09-20 10:48:22,492] INFO Set up the key converter class org.apache.kafka.connect.storage.StringConverter for task rockset8-0 using the connector config (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:22,492] INFO Set up the value converter class org.apache.kafka.connect.storage.StringConverter for task rockset8-0 using the connector config (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:22,492] INFO Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task rockset8-0 using the worker config (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:22,493] INFO Initializing: org.apache.kafka.connect.runtime.TransformationChain{} (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:22,494] INFO SinkConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
topics = [snacks1]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig)
[2022-09-20 10:48:22,494] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
topics = [snacks1]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2022-09-20 10:48:22,494] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [redpanda…:9094]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = connector-consumer-rockset8-0
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-rockset8
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes =
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters =
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-256
security.protocol = SASL_PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2022-09-20 10:48:22,540] WARN The configuration ‘metrics.context.connect.group.id’ was supplied but isn’t a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2022-09-20 10:48:22,540] WARN The configuration ‘metrics.context.connect.kafka.cluster.id’ was supplied but isn’t a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2022-09-20 10:48:22,540] INFO Kafka version: 6.1.0-ccs (org.apache.kafka.common.utils.AppInfoParser)
[2022-09-20 10:48:22,540] INFO Kafka commitId: 5496d92defc9bbe4 (org.apache.kafka.common.utils.AppInfoParser)
[2022-09-20 10:48:22,540] INFO Kafka startTimeMs: 1663670902540 (org.apache.kafka.common.utils.AppInfoParser)
[2022-09-20 10:48:22,542] INFO [Worker clientId=connect-1, groupId=kafka-connect] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:22,544] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Subscribed to topic(s): snacks1 (org.apache.kafka.clients.consumer.KafkaConsumer)
[2022-09-20 10:48:22,545] INFO RocksetConnectorConfig values:
format = JSON
rockset.apikey = null
rockset.apiserver.url = …
rockset.collection = null
rockset.integration.key = …
rockset.task.threads = 1
rockset.workspace = commons
(rockset.RocksetConnectorConfig)
[2022-09-20 10:48:22,545] INFO Building Rockset connector config. Apiserver: …Number of Threads: 1, Format: JSON (rockset.RocksetConnectorConfig)
[2022-09-20 10:48:22,546] INFO WorkerSinkTask{id=rockset8-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2022-09-20 10:48:22,565] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Cluster ID: redpanda.d631c108-1733-4b72-a6e1-2154c39f3ea6 (org.apache.kafka.clients.Metadata)
[2022-09-20 10:48:22,565] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Discovered group coordinator …redpanda-0…:9094 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,566] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,585] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,586] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Successfully joined group with generation Generation{generationId=1, memberId=‘connector-consumer-rockset8-0-1eb78a96-b09a-40c5-b309-019f01a7d98f’, protocol=‘range’} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,586] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Finished assignment for group at generation 1: {connector-consumer-rockset8-0-1eb78a96-b09a-40c5-b309-019f01a7d98f=Assignment(partitions=[snacks1-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-09-20 10:48:22,590] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Successfully synced group in generation Generation{generationId=1, memberId=‘connector-consumer-rockset8-0-1eb78a96-b09a-40c5-b309-019f01a7d98f’, protocol=‘range’} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,591] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Notifying assignor about the new Assignment(partitions=[snacks1-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-09-20 10:48:22,591] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Adding newly assigned partitions: snacks1-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-09-20 10:48:22,592] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Found no committed offset for partition snacks1-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-09-20 10:48:22,623] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Resetting offset for partition snacks1-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[redpanda-0…:9094 (id: 0 rack: null)], epoch=absent}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2022-09-20 10:48:32,541] INFO WorkerSinkTask{id=rockset8-0} Committing offsets asynchronously using sequence number 1: {snacks1-0=OffsetAndMetadata{offset=12, leaderEpoch=null, metadata=‘’}} (org.apache.kafka.connect.runtime.WorkerSinkTask)

Hi again,

this is the last part of the Kafka Connect log that’s showing what happens when the connector starts up…

[2022-09-20 10:48:22,592] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Found no committed offset for partition snacks1-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-09-20 10:48:22,623] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Resetting offset for partition snacks1-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[redpanda-0…:9094 (id: 0 rack: null)], epoch=absent}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2022-09-20 10:48:32,541] INFO WorkerSinkTask{id=rockset8-0} Committing offsets asynchronously using sequence number 1: {snacks1-0=OffsetAndMetadata{offset=12, leaderEpoch=null, metadata=‘’}} (org.apache.kafka.connect.runtime.WorkerSinkTask)

So basically, all looks fine when it says “found no committed offset” and resets the offset to 0 - but then, in the very next step, the WorkerSinkTask commits the offsets and puts the consumer group to the end of the topic (in this case, that means offset 12).

Best,
Ralph

Hi @ralph ,

That is interesting and doesn’t make sense to me. It looks like you are using a new connector name with each test, which is what I would suggest. You could double check that there are no offsets stored in the offset topic but I am not sure if RedPanda uses the save offset saving mechanism. I would recommend following up with the RedPanda team to make sure things are working as expected when using the consumer override configuration.

I would also just run the consumer console against that topic from the beginning and see what data is in there to begin with. It’s possible that 12 is the earliest offset if the previous segment offsets have been truncated.