第一步、启动zookeeper server和kafka server
启动zookeeper server:bin/zookeeper-server-start.sh config/zookeeper.properties
启动两个kafka server:bin/kafka-server-start.sh config/server-1.properties;
bin/kafka-server-start.sh config/server.properties
zookeeper会选举一个作为leader,另外一个作为slave
第二步、创建一个maven项目
这一篇中修改了Spring Boot的版本为2.0.0,pom.xml如下:
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.0.0.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.4.RELEASE</version></dependency>
第三步、kafka配置
@Configuration@EnableKafkapublicclassKafkaConfig {/* --------------producer configuration-----------------**/@Beanpublic Map<String, Object>producerConfigs() { Map<String, Object> props =new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093"); props.put(ProducerConfig.RETRIES_CONFIG,0); props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); props.put(ProducerConfig.LINGER_MS_CONFIG,1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props; }@Beanpublic ProducerFactory<String, String>producerFactory() {returnnew DefaultKafkaProducerFactory<>(producerConfigs()); }/* --------------consumer configuration-----------------**/@Beanpublic Map<String, Object>consumerConfigs() { Map<String, Object> props =new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093"); props.put(ConsumerConfig.GROUP_ID_CONFIG,"0"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,100); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props; }@Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory());return factory; }@Beanpublic ConsumerFactory<String, String>consumerFactory() {returnnew DefaultKafkaConsumerFactory<>(consumerConfigs()); }@Bean//消息监听器public MyListenermyListener() {returnnew MyListener(); }/* --------------kafka template configuration-----------------**/@Beanpublic KafkaTemplate<String,String>kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate =new KafkaTemplate<>(producerFactory());return kafkaTemplate; } }
第四步、topic的配置
自动创建的topic分区数是1,复制因子是0
@Configuration@EnableKafkapublicclassTopicConfig {@Beanpublic KafkaAdminkafkaAdmin() { Map<String, Object> configs =new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");returnnew KafkaAdmin(configs); }@Beanpublic NewTopicfoo() { /第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数//当broker个数为1个时会创建topic失败,//提示:replication factor: 2 larger than available brokers: 1//只有在集群中才能使用kafka的备份功能returnnew NewTopic("foo",10, (short)2); }@Beanpublic NewTopicbar() {returnnew NewTopic("bar",10, (short)2); }@Beanpublic NewTopictopic1(){returnnew NewTopic("topic1",10, (short)2); }@Beanpublic NewTopictopic2(){returnnew NewTopic("topic2",10, (short)2); } }
第五步、使用@KafkaListener注解
topicPartitions和topics、topicPattern不能同时使用
publicclass MyListener { @KafkaListener(id ="myContainer1",//id是消费者监听容器 topicPartitions =//配置topic和分区:监听两个topic,分别为topic1、topic2,topic1只接收分区0,3的消息,//topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为5 { @TopicPartition(topic ="topic1", partitions = {"0","3" }), @TopicPartition(topic ="topic2", partitions ="0", partitionOffsets = @PartitionOffset(partition ="1", initialOffset ="4")) })publicvoidlisten(ConsumerRecord<?, ?> record) { System.out.println("topic" + record.topic()); System.out.println("key:" + record.key()); System.out.println("value:"+record.value()); } @KafkaListener(id ="myContainer2",topics = {"foo","bar"})publicvoidlisten2(ConsumerRecord<?, ?> record){ System.out.println("topic:" + record.topic()); System.out.println("key:" + record.key()); System.out.println("value:"+record.value()); } }
第六步、创建发送消息的接口
@RestControllerpublicclassKafkaController {privatefinalstatic Logger logger = LoggerFactory.getLogger(KafkaController.class);@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@RequestMapping(value ="/{topic}/send",method = RequestMethod.GET)publicvoidsendMeessageTotopic1(@PathVariable String topic,@RequestParam(value ="partition",defaultValue ="0")int partition) { logger.info("start send message to {}",topic); kafkaTemplate.send(topic,partition,"你","好"); } }
第七步、启动程序、调用接口
消息监听器只监听订阅的topic的特定分区的消息
源码:https://github.com/NapWells/java_framework_learn/tree/master/springkafka2
热门文章
- VPN美国 | 4月13日18.8M/S|免费VPN/Clash/V2ray/Shadowrocket/SSR免费节点链接地址
- 开宠物店赚钱的美容店(开宠物店赚钱的美容店叫什么)
- VPN美国 | 4月25日23M/S|免费VPN/SSR/V2ray/Clash/Shadowrocket免费节点链接地址
- VPN美国 | 4月16日21.6M/S|免费VPN/Clash/V2ray/Shadowrocket/SSR免费节点链接地址
- C#使用RestClient调用Web API_C#教程
- 动物疫病防控工作总结(动物疫病防疫工作)
- 动物疫苗接种注意事项包括哪些(动物疫苗注射工作注意事项)
- VPN美国 | 3月23日22.1M/S|免费VPN/V2ray/Shadowrocket/SSR/Clash免费节点链接地址
- 宠物美容师要培训多久(宠物美容师要培训多久才能考证)
- 网店卖宠物用品赚钱吗(网店卖宠物用品赚钱吗现在)