最近公司项目比较多,多项目多市场并行开发,需要服务支持灰度发布,以便进行生产验证,避免出现问题。在项目中消息中间件和分布式Task是必不可少的,接口调用的灰度转发可以通过网关实现,因此主要需要对这两个中间件做灰度转发控制
所谓灰度服务,是指一个新的功能,用一个额外的灰度实例发布。对于加了灰度用户的名单可以看到这个新的功能,而原有的功能不受影响。这样可以让少部分人体验新功能,确认没有问题后,再逐步放更多的灰度用户使用,直至全量。因此生产环境在灰度期间会有正常服务和灰度服务并行,一般会持续两周左右
灰度服务期间,用户流量如下
这里只能容纳一个灰度服务 ,但实际多项目并行时,会出现很个灰度的情况,而一些较为通用的公共服务则不需要灰度,那么生成实际会更加复杂,如下图
消息队列的灰度
公司项目中主要用到消息队列有两个 Kafka
和Pulsar
,对于kafka
应该都很熟悉了,比较主流的消息队列。Pulsar
是一个新兴的消息队列,比Kafka
更好扩展和迁移。
灰度方案
消息队列的转发方案,是对正常服务的topic同时创建一些灰度topic。举个例子,这里有一个topic名叫 test-msg。我要对它做灰度
假设同时有两个项目要上,比如高级订单和夜盘交易。为了不互相影响,创建了两个topic分别是test-msg-v1和test-msg-v2。如果想让用户B使用高级订单的功能则用v1,想让用户C使用夜盘交易的功能则用v2,而用户A则看不到高级订单和夜盘交易的功能,消息仍然在正常服务上。这样同时灰度,互不影响。
代码实现
消息的转发主要在两个端,生产端和消费端。代码实现是怎么考虑无侵入式的实现我们的功能。我这里主要用cglib的动态代理实现。自定义一个注解,在spring服务启动的时候,织入转发逻辑,实现无感的灰度实现。
定义注解
以消费者为例,首先自定义一个注解,用来标识这个是一个灰度的消费者(生产者同理),代码如下
package com.webull.st.order.platform.receiver.savings.consumer.test;
import org.springframework.core.annotation.AliasFor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
@KafkaListener
public @interface CanaryKafkaListener {
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id() default "";
@AliasFor(annotation = KafkaListener.class, attribute = "containerFactory")
String containerFactory() default "";
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] topics() default {};
@AliasFor(annotation = KafkaListener.class, attribute = "topicPattern")
String topicPattern() default "";
@AliasFor(annotation = KafkaListener.class, attribute = "topicPartitions")
TopicPartition[] topicPartitions() default {};
@AliasFor(annotation = KafkaListener.class, attribute = "containerGroup")
String containerGroup() default "";
@AliasFor(annotation = KafkaListener.class, attribute = "errorHandler")
String errorHandler() default "";
@AliasFor(annotation = KafkaListener.class, attribute = "groupId")
String groupId() default "";
@AliasFor(annotation = KafkaListener.class, attribute = "idIsGroup")
boolean idIsGroup() default true;
@AliasFor(annotation = KafkaListener.class, attribute = "clientIdPrefix")
String clientIdPrefix() default "";
@AliasFor(annotation = KafkaListener.class, attribute = "beanRef")
String beanRef() default "__listener";
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "";
@AliasFor(annotation = KafkaListener.class, attribute = "autoStartup")
String autoStartup() default "";
@AliasFor(annotation = KafkaListener.class, attribute = "properties")
String[] properties() default {};
}
这里只是覆盖了原来@KafkaListener注解的一些属性,最重要的是下面这个类
package com.webull.st.order.platform.receiver.savings.consumer.test;
import com.webull.inst.framework.common.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.cglib.proxy.Enhancer;
import org.springframework.cglib.proxy.MethodInterceptor;
import org.springframework.cglib.proxy.MethodProxy;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
/**
* @author : lushunjian
* @createDate : 2023-10-18 09:42:34
* @Description :
**/
@Slf4j
@Component
public class TestProcessor implements BeanPostProcessor {
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
// 找bean的过程,通过注解去找被CanaryKafkaListener注解的方法
// 这里为了简单写死的beanName=orderPlaceConsumer
if("myConsumer".equals(beanName)){
log.info("postProcessAfterInitialization start beanName:{},bean:{}",beanName,bean.getClass().getName());
// 创建增强器
Enhancer enhancer = new Enhancer();
// 设置需要增强的类的对象
enhancer.setSuperclass(bean.getClass());
// 设置回调方法
enhancer.setCallback(new MethodInterceptor() {
@Override
public Object intercept(Object o, Method method, Object[] args, MethodProxy methodProxy)
throws Throwable {
log.info("do orderPlaceConsumer before....method:{}", method.getName());
if(method.getName().equals("consume")){
List list = null;
for(Object arg : args) {
if(arg instanceof List){
list = (List) arg;
}
}
// 这里list就是收到的消息 List<ConsumerRecord<String,String>>
log.info("do orderPlaceConsumer before....list:{}", list);
// 拿到ConsumerRecord后,通过ConsumerRecord.key可以拿到userId(前提是生产端消息的key是userId,不然没法通过user做灰度转发,当然也可根据业务场景使用业务主键)
// 再调用原来的消费方法之前,加入转发策略,如果用户在灰度名单里就转发
// kafkaSender.producer("test-msg-v1","msg");
// return
// 调用原方法
Object result = methodProxy.invokeSuper(o, args);
log.info("do orderPlaceConsumer after....");
return result;
}else {
return methodProxy.invokeSuper(o, args);
}
}
});
log.info("postProcessAfterInitialization end beanName:{},bean:{}",beanName,bean.getClass().getName());
return enhancer.create();
}
return bean;
}
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if("myTestService".equals(beanName)){
log.info("postProcessAfterInitialization start beanName:{},bean:{}",beanName,bean.getClass().getName());
// 创建增强器
Enhancer enhancer = new Enhancer();
// 设置需要增强的类的对象
enhancer.setSuperclass(bean.getClass());
// 设置回调方法
enhancer.setCallback(new MethodInterceptor() {
@Override
public Object intercept(Object o, Method method, Object[] args, MethodProxy methodProxy)
throws Throwable {
if(method.getName().equals("test")){
List list = null;
String str = null;
for(Object arg : args) {
if(arg instanceof List){
list = (List) arg;
}
if(arg instanceof String){
str = (String) arg;
}
}
log.info("do test before....list:{},str:{}", JsonUtils.toJson(list),str);
// 调用原方法
Object result = methodProxy.invokeSuper(o, args);
log.info("do test after....");
return result;
}else {
return methodProxy.invokeSuper(o, args);
}
}
});
log.info("postProcessAfterInitialization end beanName:{},bean:{}",beanName,bean.getClass().getName());
return enhancer.create();
}
return bean;
}
}
然后就是具体使用的地方了
package com.webull.st.order.platform.receiver.savings.consumer.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.support.Acknowledgment;
import java.util.List;
/**
* @author : lushunjian
* @createDate : 2023-10-21 13:00:30
* @Description :
**/
public class MyConsumer {
@CanaryKafkaListener(topics = {"#{'${fix.execution.topic}'.split(',')}"}, containerFactory = "fixMsgContainerFactory")
public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
}
}
如果是 Pulsar 的话,也是同理的,需要覆盖Pulsar的消费者注解 @PulsarConsumer。其他代码和 Kafka类似了
这样在实际过程中如果我们要对 Kafka和Pulsar 做灰度,那么就是使用自定义的注解 @CanaryKafkaListener
和 @CanaryPulsarConsumer
注解即可。当然这里用户的灰度转发策略和topic之间的关系还需要用一个数据库表去维护,这里省略了