如何解决Kafka Connect中的ACL配置不起作用
我为3节点Kafka集群设置了ACL,并且能够通过生产者控制台和使用者控制台发送和接收主题。现在,我想用ACL配置Kafka连接。我尝试使用SASL_PLAINTEXT组合,并且在connect.log文件中它显示以下错误。它没有从源表同步到主题,请在缺少任何配置的地方提供帮助。
错误日志
[2020-10-14 07:24:35,874] ERROR WorkerSourceTask{id=oracle-jdbc-source-mtx_domains_acl5-0} Failed to flush,timed out while waiting for producer to flush outstanding 1
messages (org.apache.kafka.connect.runtime.WorkerSourceTask:448)
[2020-10-14 07:24:35,874] ERROR WorkerSourceTask{id=oracle-jdbc-source-mtx_domains_acl5-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCo
mmitter:116)"
根据以下文件的我的配置。我已经在jaas.conf文件中提到了用户并设置到环境中。
1:zookeeper.properties。
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
zookeeper.set.acl=true
jaasLoginRenew=3600000
2:server.properties
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://<server_name>:9092
host.name=server_ip
3:schema-registry.properties
kafkastore.security.protocol=SASL_PLAINTEXT
kafkastore.sasl.mechanism=PLAIN
metadataServerUrls=SASL_PLAINTEXT://<server_ip>:9092
zookeeper.set.acl=true
kafkastore.group.id=schema-registry-3
4:connect-avro-distributed.properties
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
5:源连接器脚本
curl -X POST -H "Content-Type: application/json" --data '{ "name":"oracle-jdbc-source-mtx_domains_acl5","config":{ "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max":"1","connection.url":"jdbc:oracle:thin:@<ip>:<port>:<dbname>","connection.user":"<username>","connection.password":"password","numeric.mapping":"best_fit","table.whitelist":"TABLENAME","mode":"timestamp","timestamp.column.name":"CREATED_ON","topic.prefix":"","validate.non.null":"false","transforms":"createKey","transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey","transforms.createKey.fields":"DOMAIN_CODE","sasl.mechanism":"PLAIN","security.protocol":"SASL_PLAINTEXT","producer.sasl.mechanism":"PLAIN","producer.security.protocol":"SASL_PLAINTEXT","producer.request.timeout.ms":50000,"producer.retry.backoff.ms":500,"offset.flush.timeout.ms":50000,"producer.buffer.memory":100,"sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"producer\";","producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"producer\";","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","delete.enabled":"true","key.converter":"io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url":"http://localhost:8081","value.converter":"io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url":"http://localhost:8081" } }' http://localhost:8083/connectors
解决方法
您需要将以下属性添加到connect-distributed.properties
:
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="connect" \
password="connect-secret";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="connect" \
password="connect-secret";
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="connect" \
password="connect-secret";
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。