1. 判空逻辑,如果为空,抛异常,下面代码来自kafka client:
Assert.notNull(queue, () -> "No cache found for " + txIdPrefix);
Assert的另一个用法,来自springframework:org.springframework.boot.context.config.StandardConfigDataLocationResolver#validateConfigName
- private void validateConfigName(String name) {
- Assert.state(!name.contains("*"), () -> "Config name '" + name + "' cannot contain '*'");
- }
这个是校验参数是否合法的用法,如果 !name.contains("*") 返回的是false,就会抛出异常,代码如下:
- public static void state(boolean expression, Supplier
messageSupplier) { - if (!expression) {
- throw new IllegalStateException(nullSafeGet(messageSupplier));
- }
- }
2. 本地cache设计,一下代码来自kafka client:
- private final Map
>> cache = new ConcurrentHashMap<>(); -
- protected BlockingQueue
> getCache(String txIdPrefix) { - if (txIdPrefix == null) {
- return null;
- }
- // 下面这个方法是针对每个txIdPrefix,都创建一个LinkedBlockingQueue,并缓存起来,这里第二个参数用到了Supplier函数式接口
- return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue<>());
- }
3. kafka consumer单线程的控制逻辑CAS:
- /**
- * Acquire the light lock and ensure that the consumer hasn't been closed.
- * @throws IllegalStateException If the consumer has been closed
- */
- private void acquireAndEnsureOpen() {
- acquire();
- if (this.closed) {
- release();
- throw new IllegalStateException("This consumer has already been closed.");
- }
- }
- // currentThread holds the threadId of the current thread accessing KafkaConsumer
- // and is used to prevent multi-threaded access
-
- /**
- * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
- * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
- * supported).
- * @throws ConcurrentModificationException if another thread already has the lock
- */
- private final AtomicInteger refcount = new AtomicInteger(0);
- private static final long NO_CURRENT_THREAD = -1L;
- private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
-
-
- private void acquire() {
- long threadId = Thread.currentThread().getId();
- if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
- throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
- refcount.incrementAndGet();
- }
- /**
- * Release the light lock protecting the consumer from multi-threaded access.
- */
- private void release() {
- if (refcount.decrementAndGet() == 0)
- currentThread.set(NO_CURRENT_THREAD);
- }
解释一下以上逻辑:
首先是acquire 方法尝试获取锁,在acquire方法里,先判断 threadId是否为 currentThread,如果不是(第一次判断的时候,currentThread = -1,所以肯定是不相等的),就尝试把 currentThread 设置为当前线程的 threadId,设置成功就不会抛异常,表示获取锁成功,否则抛异常。
如果加锁成功,此时再来一个线程,会满足 threadId != currentThread.get(),但是后面的那个CAS的判断就不满足,因为CAS是当currentThread是期望值:NO_CURRENT_THREAD的时候,才去把currentThread设置为当前线程id,现在期望值是上一个线程的id,所以会抛异常,就阻止了当前线程执行
restful api 路径定义
当需要定义一个restful api 接口的时候,一般最好追寻按模块定义,比如用户的创建,一般用:
/user/create 而非直接使用 /create,这样做有很多好处,比如springsecurity 对某一类路径做一些权限控制的时候,这样写的作用就非常好了
- public static MultiValueMap
getParameters(HttpServletRequest request) { - Map
parameterMap = request.getParameterMap(); - MultiValueMap
parameters = new LinkedMultiValueMap<>(parameterMap.size()); - parameterMap.forEach((key, values) -> {
- if (values.length > 0) {
- for (String value : values) {
- parameters.add(key, value);
- }
- }
- });
- return parameters;
- }