Hi Patrick,
thanks a lot! Tried that as well, but still got the same behavior… strange…
I have attached the logs from Kafka Connect to the end of this message - maybe you can see what is happening there…
Best,
Ralph
–
Kafka Connect Rockset connector config:
{
“name”: “rockset8”,
“config”:{
“connector.class”: “rockset.RocksetSinkConnector”,
“tasks.max”: “1”,
“topics”: “snacks1”,
“rockset.task.threads”: “1”,
“rockset.apiserver.url”: “https://api.euc1a1.rockset.com”,
“rockset.integration.key”: “kafka://…”,
“format”: “JSON”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”: “false”,
“value.converter.schemas.enable”: “false”,
“consumer.override.auto.offset.reset”: “earliest”
}
}
–
Kafka Connect logs:
[2022-09-20 10:48:21,944] INFO [Worker clientId=connect-1, groupId=kafka-connect] Connector rockset8 config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:21,947] INFO [Worker clientId=connect-1, groupId=kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
[2022-09-20 10:48:21,947] INFO [Worker clientId=connect-1, groupId=kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:21,949] INFO [Worker clientId=connect-1, groupId=kafka-connect] Successfully joined group with generation Generation{generationId=98, memberId=‘connect-1-c39e245b-4d6e-4c54-8d60-0b449a781fd4’, protocol=‘sessioned’} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:21,955] INFO [Worker clientId=connect-1, groupId=kafka-connect] Successfully synced group in generation Generation{generationId=98, memberId=‘connect-1-c39e245b-4d6e-4c54-8d60-0b449a781fd4’, protocol=‘sessioned’} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:21,955] INFO [Worker clientId=connect-1, groupId=kafka-connect] Joined group at generation 98 with protocol version 2 and got assignment: Assignment{error=0, leader=‘connect-1-c39e245b-4d6e-4c54-8d60-0b449a781fd4’, leaderUrl=‘…’, offset=391, connectorIds=[rockset8], taskIds=, revokedConnectorIds=, revokedTaskIds=, delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:21,955] INFO [Worker clientId=connect-1, groupId=kafka-connect] Starting connectors and tasks using config offset 391 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:21,956] INFO [Worker clientId=connect-1, groupId=kafka-connect] Starting connector rockset8 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:21,956] INFO Creating connector rockset8 of type rockset.RocksetSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:21,956] INFO SinkConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
topics = [snacks1]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig)
[2022-09-20 10:48:21,956] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
topics = [snacks1]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2022-09-20 10:48:21,957] INFO Instantiated connector rockset8 with version 1.0 of type class rockset.RocksetSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:21,958] INFO Finished creating connector rockset8 (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:21,958] INFO [Worker clientId=connect-1, groupId=kafka-connect] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:21,959] INFO Starting RocksetSinkConnector (rockset.RocksetSinkConnector)
[2022-09-20 10:48:21,959] INFO RocksetConnectorConfig values:
format = JSON
rockset.apikey = null
rockset.apiserver.url = …
rockset.collection = null
rockset.integration.key = kafka://d1KsokagFZPFJol6ggLyFCA2AbPOlSpwy85yHOsjBvxbRJ6TQTPFAPzuknhyUikO@api.euc1a1.rockset.com
rockset.task.threads = 1
rockset.workspace = commons
(rockset.RocksetConnectorConfig)
[2022-09-20 10:48:21,959] INFO Building Rockset connector config. Apiserver: …Number of Threads: 1, Format: JSON (rockset.RocksetConnectorConfig)
[2022-09-20 10:48:21,960] INFO SinkConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
topics = [snacks1]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig)
[2022-09-20 10:48:21,960] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
topics = [snacks1]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2022-09-20 10:48:22,475] INFO [Worker clientId=connect-1, groupId=kafka-connect] Tasks [rockset8-0] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:22,476] INFO [Worker clientId=connect-1, groupId=kafka-connect] Handling task config update by restarting tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:22,477] INFO [Worker clientId=connect-1, groupId=kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
[2022-09-20 10:48:22,477] INFO [Worker clientId=connect-1, groupId=kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,479] INFO [Worker clientId=connect-1, groupId=kafka-connect] Successfully joined group with generation Generation{generationId=99, memberId=‘connect-1-c39e245b-4d6e-4c54-8d60-0b449a781fd4’, protocol=‘sessioned’} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,486] INFO [Worker clientId=connect-1, groupId=kafka-connect] Successfully synced group in generation Generation{generationId=99, memberId=‘connect-1-c39e245b-4d6e-4c54-8d60-0b449a781fd4’, protocol=‘sessioned’} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,486] INFO [Worker clientId=connect-1, groupId=kafka-connect] Joined group at generation 99 with protocol version 2 and got assignment: Assignment{error=0, leader=‘connect-1-c39e245b-4d6e-4c54-8d60-0b449a781fd4’, leaderUrl=‘…’, offset=393, connectorIds=[rockset8], taskIds=[rockset8-0], revokedConnectorIds=, revokedTaskIds=, delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:22,488] INFO [Worker clientId=connect-1, groupId=kafka-connect] Starting connectors and tasks using config offset 393 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:22,488] INFO [Worker clientId=connect-1, groupId=kafka-connect] Starting task rockset8-0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:22,489] INFO Creating task rockset8-0 (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:22,489] INFO ConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig)
[2022-09-20 10:48:22,490] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2022-09-20 10:48:22,490] INFO TaskConfig values:
task.class = class rockset.RocksetSinkTask
(org.apache.kafka.connect.runtime.TaskConfig)
[2022-09-20 10:48:22,491] INFO Instantiated task rockset8-0 with version 0.0.0.0 of type rockset.RocksetSinkTask (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:22,491] INFO StringConverterConfig values:
converter.encoding = UTF8
converter.type = key
(org.apache.kafka.connect.storage.StringConverterConfig)
[2022-09-20 10:48:22,491] INFO StringConverterConfig values:
converter.encoding = UTF8
converter.type = value
(org.apache.kafka.connect.storage.StringConverterConfig)
[2022-09-20 10:48:22,492] INFO Set up the key converter class org.apache.kafka.connect.storage.StringConverter for task rockset8-0 using the connector config (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:22,492] INFO Set up the value converter class org.apache.kafka.connect.storage.StringConverter for task rockset8-0 using the connector config (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:22,492] INFO Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task rockset8-0 using the worker config (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:22,493] INFO Initializing: org.apache.kafka.connect.runtime.TransformationChain{} (org.apache.kafka.connect.runtime.Worker)
[2022-09-20 10:48:22,494] INFO SinkConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
topics = [snacks1]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig)
[2022-09-20 10:48:22,494] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = rockset.RocksetSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = rockset8
predicates =
tasks.max = 1
topics = [snacks1]
topics.regex =
transforms =
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2022-09-20 10:48:22,494] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [redpanda…:9094]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = connector-consumer-rockset8-0
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-rockset8
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes =
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters =
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-256
security.protocol = SASL_PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2022-09-20 10:48:22,540] WARN The configuration ‘metrics.context.connect.group.id’ was supplied but isn’t a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2022-09-20 10:48:22,540] WARN The configuration ‘metrics.context.connect.kafka.cluster.id’ was supplied but isn’t a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2022-09-20 10:48:22,540] INFO Kafka version: 6.1.0-ccs (org.apache.kafka.common.utils.AppInfoParser)
[2022-09-20 10:48:22,540] INFO Kafka commitId: 5496d92defc9bbe4 (org.apache.kafka.common.utils.AppInfoParser)
[2022-09-20 10:48:22,540] INFO Kafka startTimeMs: 1663670902540 (org.apache.kafka.common.utils.AppInfoParser)
[2022-09-20 10:48:22,542] INFO [Worker clientId=connect-1, groupId=kafka-connect] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-09-20 10:48:22,544] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Subscribed to topic(s): snacks1 (org.apache.kafka.clients.consumer.KafkaConsumer)
[2022-09-20 10:48:22,545] INFO RocksetConnectorConfig values:
format = JSON
rockset.apikey = null
rockset.apiserver.url = …
rockset.collection = null
rockset.integration.key = …
rockset.task.threads = 1
rockset.workspace = commons
(rockset.RocksetConnectorConfig)
[2022-09-20 10:48:22,545] INFO Building Rockset connector config. Apiserver: …Number of Threads: 1, Format: JSON (rockset.RocksetConnectorConfig)
[2022-09-20 10:48:22,546] INFO WorkerSinkTask{id=rockset8-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2022-09-20 10:48:22,565] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Cluster ID: redpanda.d631c108-1733-4b72-a6e1-2154c39f3ea6 (org.apache.kafka.clients.Metadata)
[2022-09-20 10:48:22,565] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Discovered group coordinator …redpanda-0…:9094 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,566] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,585] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,586] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Successfully joined group with generation Generation{generationId=1, memberId=‘connector-consumer-rockset8-0-1eb78a96-b09a-40c5-b309-019f01a7d98f’, protocol=‘range’} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,586] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Finished assignment for group at generation 1: {connector-consumer-rockset8-0-1eb78a96-b09a-40c5-b309-019f01a7d98f=Assignment(partitions=[snacks1-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-09-20 10:48:22,590] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Successfully synced group in generation Generation{generationId=1, memberId=‘connector-consumer-rockset8-0-1eb78a96-b09a-40c5-b309-019f01a7d98f’, protocol=‘range’} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2022-09-20 10:48:22,591] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Notifying assignor about the new Assignment(partitions=[snacks1-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-09-20 10:48:22,591] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Adding newly assigned partitions: snacks1-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-09-20 10:48:22,592] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Found no committed offset for partition snacks1-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-09-20 10:48:22,623] INFO [Consumer clientId=connector-consumer-rockset8-0, groupId=connect-rockset8] Resetting offset for partition snacks1-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[redpanda-0…:9094 (id: 0 rack: null)], epoch=absent}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2022-09-20 10:48:32,541] INFO WorkerSinkTask{id=rockset8-0} Committing offsets asynchronously using sequence number 1: {snacks1-0=OffsetAndMetadata{offset=12, leaderEpoch=null, metadata=‘’}} (org.apache.kafka.connect.runtime.WorkerSinkTask)