场景:web端调用A服务,A服务调用B服务
A服务将web端通过header中的token解析,获取到user信息,A调用B服务时需要将token或者用户信息传递给B服务
暴力解决的方案为:直接把user作为一个对象传递就可以了,但是这种做法导致每个接口都需要传递user对象,非常麻烦
我们的做法是将token或者user信息统一封装在feign的header中传递给B服务,如果没有开启hystrix,这种方式一点问题也没有,但是开启了hytrix后,出现了大问题!!
package xxx.xxx.xxx;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
@Configuration
public class FeignBasicAuthRequestInterceptor implements RequestInterceptor {
private static final Logger log = LoggerFactory.getLogger(FeignBasicAuthRequestInterceptor.class);
@Override
public void apply(RequestTemplate requestTemplate) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
Enumeration<String> headerNames = request.getHeaderNames();
if (headerNames != null) {
while (headerNames.hasMoreElements()) {
String name = headerNames.nextElement();
String values = request.getHeader(name);
if(name.equals("Authorization")){
log.info(values);
}
requestTemplate.header(name, values);
}
}
}
}
我们定义一个FeignInterceptor实现RequestInterceptor,在通过feign调用的时候,会自动进入此拦截器,在此拦截器中,将token或者user信息放置到header中
开启hystrix后,Hystrix内部提供了两种模式执行逻辑:信号量、线程池。但是,spring cloud官方是不推荐信号量这种方式的,我们只能采用线程池这种方式。
线程池这种方式,是开启了另外一个线程(线程隔离)来进行feign调用的,主线程中的request是传递不到新开启线程中的,所以,我们要重写HystrixConcurrencyStrategy(并发策略)
package xxx.xxx.xxx;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariable;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 重写并发策略
*
* @author xxx
* @Date 2022/6/27
**/
@Slf4j
@Configuration
public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private HystrixConcurrencyStrategy delegate;
/**
* 无参构造
*/
public FeignHystrixConcurrencyStrategy() {
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof FeignHystrixConcurrencyStrategy) {
return;
}
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher, propertiesStrategy);
//重置HystrixPlugins再重新填充 否则会报 Another strategy was already registered.
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
} catch (Exception e) {
log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
}
}
/**
* 日志记录
* @param eventNotifier
* @param metricsPublisher
* @param propertiesStrategy
*/
private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
HystrixMetricsPublisher metricsPublisher,
HystrixPropertiesStrategy propertiesStrategy) {
if (log.isDebugEnabled()) {
log.debug("Current Hystrix plugins configuration is [" + "concurrencyStrategy ["
+ this.delegate + "]," + "eventNotifier [" + eventNotifier + "]," + "metricPublisher ["
+ metricsPublisher + "]," + "propertiesStrategy [" + propertiesStrategy + "]," + "]");
log.debug("Registering Sleuth Hystrix Concurrency Strategy.");
}
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return new WrappedCallable<>(callable, requestAttributes);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime,
unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}
static class WrappedCallable<T> implements Callable<T> {
private final Callable<T> target;
private final RequestAttributes requestAttributes;
public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
this.target = target;
this.requestAttributes = requestAttributes;
}
@Override
public T call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
return target.call();
} finally {
RequestContextHolder.resetRequestAttributes();
}
}
}
}
测试:
场景:openfeign接口写在服务提供方即服务B中,服务A引入服务B的feign接口模块
1.在服务调用方即服务A配置这两个文件,可以实现header透传
2.在服务提供方即服务B配置这两个文件,可以实现header透传
3.服务调用方即服务A配置第二步,服务提供方配置第一步,也可以实现header透传
3.服务调用方即服务A配置第一步,服务提供方配置第二步,也可以实现header透传