博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spring-kafka学习笔记
阅读量:7069 次
发布时间:2019-06-28

本文共 2760 字,大约阅读时间需要 9 分钟。

hot3.png

1. 简要说明

是Spring下的一个项目。

官方有详细的和,以下所写的内容也是基于此。没有使用Producer,所以暂时只有Consumer部分的代码,以后会补全。

2. consumer

使用了@KafkaListener注解。

2.1. pom.xml

引入以下依赖

org.springframework.kafka
spring-kafka
1.2.0.RELEASE

2.2. 配置类

@Configuration@EnableKafkapublic class KafkaConfig {		@Value("${kafka.bootstrap.servers}")	private String kafkaBootstrapServers;	@Value("${session.timeout.ms}")	private Integer sessionTimeoutMs;	@Value("${enable.auto.commit}")	private boolean enableAutoCommit;	@Value("${auto.commit.interval.ms}")	private Integer autoCommitIntervalMs;	@Value("${auto.offset.reset}")	private String autoOffsetReset;	@Value("${group.id}")	private String groupId;		@Bean	KafkaListenerContainerFactory
> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory
consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map
consumerConfigs() { Map
props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public KafkaConsumer consumer() { return new KafkaConsumer(); } }

2.3. 消费处理

public class KafkaConsumer { 	@KafkaListeners({@KafkaListener(topics="topic1"), @KafkaListener(topics="topic2")})	public void listen(ConsumerRecord
msg) { // 消费到数据后的处理逻辑 }}

2.4. @KafkaListener和@KafkaListners

@KafkaListeners是@KafkaListener的Container Annotation,这也是jdk8的新特性之一,注解可以重复标注。

@KafkaListeners({@KafkaListener(topics="topic1"), @KafkaListener(topics="topic2")})public void listen(ConsumerRecord
msg) {}等同于@KafkaListener(topics="topic1")@KafkaListener(topics="topic2")public void listen(ConsumerRecord
msg) {}

2.5. @KafkaListener使用小结

在注解上可以方便地进行各种配置,但是如果要消费的topic个数不定,用@KafkaListener就很难优雅解决。

注解要求必须在compile-time就能确定值,可以移步查看更加详细的解释。

3. producer

to be continued...

 

 

 

 

转载于:https://my.oschina.net/u/3149614/blog/884597

你可能感兴趣的文章
Java泛型
查看>>
【转载】bat命令学习
查看>>
BOOTMGR is missing解决方案
查看>>
vtigercrm 也可以支持中文用户名
查看>>
智能家居:互联网变革的前夕
查看>>
getSuperclass与getGenericSuperclass的区别。
查看>>
ubuntu 解压命令
查看>>
我的友情链接
查看>>
如何有效的提高代码重复性,代码简洁--经典之谈
查看>>
Cacti的安装并整合nagios
查看>>
仓央嘉措最美的几首诗
查看>>
我的友情链接
查看>>
服务器网卡丢包
查看>>
bzoj 2245: [SDOI2011]工作安排
查看>>
在LINUX下面建立GPRS无线MODEM拨号
查看>>
Photoshop制作一个质感的卡通小公仔
查看>>
VS2010编译错误之mt.exe : general error c101008d
查看>>
Vue CLI 3开发中屏蔽烦人的EsLint错误
查看>>
Percona XtraDB Cluster 集群环境建立与验证指南
查看>>
沣西大数据产业模式初探
查看>>