• websocket多线程发送消息报错TEXT_PARTIAL_WRITING--自旋锁替换synchronized独占锁的使用案例


    1.背景:

    websocket在使用多线程推送消息时,如果大量消息中存在同一个session的会话的发送多条消息,如果两个线程同时拿到这个session发送消息就会报错

    The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat e for called method
    
    • 1

    原因就是: handlerA和handlerB两个方法有可能同时执行,当A或者B方法遍历到某一个session并且调用sendMessage发送消息的时候,另外一个方法也正好也在使用相同的session发送另外一个消息(同一个session消息发送冲突了,也就是说同一个时刻,多个线程向一个socket写数据冲突了),就会报上面的异常。

    2.解决办法:

    解决方法其实很简单,就是在发送消息的时候加上一把锁,(保证一个session在某个时刻不会被调用多次)

       /**
         * 发送信息给指定用户
         * @param clientId
         * @param message
         * @return
         */
        public  static boolean sendMessageToUser(String clientId, TextMessage message) {
        	WebSocketSession session = socketMap.get(clientId);
        	if(session==null) {
        		return false;
        	}
        	if (!session.isOpen()) {
        		return false;
            }
        	try {
        		synchronized (session) {
        			session.sendMessage(message);
        		}
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
            return true;
        }
        /**
         * 广播消息出去
         * @param message
         * @return
         */
        public static void sendMessageToAll(TextMessage message) {
        	for (WebSocketSession session : socketMap.values()) {
        		if(session==null||!session.isOpen()) {
            		continue;
            	}
        		try {
        			synchronized (session) {
        				session.sendMessage(message);
        			}
        		} catch (IOException e) {
        			e.printStackTrace();
        		}
    		}
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    3.存在问题:

    根据之前的文章Java并发编程–公平锁的实现和使用案例
    不难发现,以下代码块会阻塞所有线程按顺序发送,这样即使多线程调用sendMessageToUser也是单线程的效率的顺序发送,失去了多线程发送的消息意义,所以以上方法可以解决问题,但是本质上并没有提高效率

    synchronized (session) {
     	session.sendMessage(message);
    }
    
    • 1
    • 2
    • 3

    4.自旋锁解决思路:

    只锁同一个session对象,让获取同一个session的线程只能按顺序获取,又一个线程发送消息的动作耗时非常短,可以考虑将独占锁简化为使用CAS的自旋锁
    根据之前的文章Java并发编程–自旋锁的实现和使用
    自旋锁实现锁使用的是对Thread 的null与非空来判断单前线程是否被锁,那我们把session从lock()方法中传入,那可以把锁的对象换成session,从而进行自旋加锁和解锁

    自旋锁实现工具类

    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.atomic.AtomicReference;
    
    @Slf4j
    public class ReentrantSpinLock {
    
        private static AtomicReference<Object> sign = new AtomicReference<>();
      
        public static <T> void lock(T t, boolean reentrantFlag) {
          	// 若可重入标志为true, 且若尝试加锁的对象和已加的锁中的对象相同,可重入,并加锁成功
            if (reentrantFlag && t == sign.get()) {
                return;
            }
            //If the lock is not acquired, it can be spun through CAS
            while (!sign.compareAndSet(null, t)) {
                // DO nothing
                log.info("自旋一会.");
            }
        }
      
        public static <T> void unlock(T t) {
          	// 锁的线程和目前的线程相等时,才允许释放锁
            if (t == sign.get()) {
                    sign.compareAndSet(t, null);
                }
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    其中reentrantFlag为某一个线程已经获取到session,但是还需要调用其他的session的方法是否可重入标志位

    5.修改后的自旋锁锁定发送消息代码

       /**
         * 发送信息给指定用户
         * @param clientId
         * @param message
         * @return
         */
        public  static boolean sendMessageToUser(String clientId, TextMessage message) {
        	WebSocketSession session = socketMap.get(clientId);
        	if(session==null) {
        		return false;
        	}
        	if (!session.isOpen()) {
        		return false;
            }
        	try {
        		// 自旋锁保证不同线程的同一个session消息按照顺序发送
                ReentrantSpinLock.lock(session, false);
        		session.sendMessage(message);
        	} catch (IOException e) {
    			e.printStackTrace();
    		} finally {
             	ReentrantSpinLock.unlock(session);
            }
            return true;
        }
        /**
         * 广播消息出去
         * @param message
         * @return
         */
        public static void sendMessageToAll(TextMessage message) {
        	for (WebSocketSession session : socketMap.values()) {
        		if(session==null||!session.isOpen()) {
            		continue;
            	}
        	try {
        		// 自旋锁保证不同线程的同一个session消息按照顺序发送
                ReentrantSpinLock.lock(session, false);
        		session.sendMessage(message);
        	} catch (IOException e) {
    			e.printStackTrace();
    		} finally {
             	ReentrantSpinLock.unlock(session);
            }
    		}
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    参考:
    The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is an invalid stat e for called method
    Java并发编程–公平锁的实现和使用案例
    Java并发编程–自旋锁的实现和使用

  • 相关阅读:
    模拟滴答声
    【MySQL】1.索引
    【iOS】—— present和push再学习
    杀毒软件的原理
    嵌入式Linux—FreeType矢量字体
    Canvas简历编辑器-我的剪贴板里究竟有什么数据
    五一“捡钱”,就在这几天!国债逆回购最佳时点来了,如何躺赚6天利息?来看操作攻略
    Workerman开启ssl方法如下
    多核处理器上的内存访问一致性
    渗透测试工程师
  • 原文地址:https://blog.csdn.net/Master_Shifu_/article/details/126009854