• [华为北向网管NCE开发教程(6)消息订阅


    1.作用

    之前介绍的都是我们向网管NCE发起请求获取数据,消息订阅则反过来,是网管NCE系统给我们推送信息。其原理和MQ,JMS这些差不多,这里不过多累述。

    2.场景

    所支持订阅的场景有如下,以告警通知为例,当我们订阅告警通知以后,如果NCE网管有告警通知产生以后,就会给订阅的人发送一个通知(也就是实时告警推送)。那么我们就可以接收到如下的通知。

    2024-06-06 00:09:30c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160140.0Z, X.733::EventType=securityAlarm, emsTime=20240605160142.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784751, isClearable=true, affectedTPList=21}
    2024-06-06 00:09:36c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160147.0Z, X.733::EventType=securityAlarm, emsTime=20240605160149.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CLEARED, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784752, isClearable=true, affectedTPList=21}
    2024-06-06 00:09:43c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160155.0Z, X.733::EventType=securityAlarm, emsTime=20240605160156.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784753, isClearable=true, affectedTPList=21}
    2024-06-06 00:09:50c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160202.0Z, X.733::EventType=securityAlarm, emsTime=20240605160203.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CLEARED, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784755, isClearable=true, affectedTPList=21}
    2024-06-06 00:10:01c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160213.0Z, X.733::EventType=securityAlarm, emsTime=20240605160214.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784756, isClearable=true, affectedTPList=21}
    

    同理,如果我们订阅了文件传输状态通知,当存在文件传输完成的时候会收到如下通知,通知信息中包含了,文件传输完成后,文件的存储地址。

    2024-06-06 10:15:26c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786334, fileName=pm/sdh/0605-0606/3145740.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
    2024-06-06 10:15:39c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786335, fileName=pm/sdh/0605-0606/3145734.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
    2024-06-06 10:15:42c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786336, fileName=pm/sdh/0605-0606/3145739.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
    
    通知类型说明
    NT_ALARM告警通知
    NT_ALARM_UPDATED告警更新通知
    NT_TCA性能越限告警通知
    NT_OBJECT_CREATION对象创建通知
    NT_OBJECT_DELETION对象删除通知
    NT_ATTRIBUTE_VALUE_CHANGE属性改变通知
    NT_STATE_CHANGE状态改变通知
    NT_ROUTE_CHANGE路由改变通知
    NT_PROTECTION_SWITCH保护倒换通知
    NT_FILE_TRANSFER_STATUS文件传输状态通知
    NT_EPROTECTION_SWITCH设备保护倒换通知事件
    NT_ASON_RESOURCE_CHANGE智能资源改变通知
    NT_PRBSTEST_STATUS伪随机码测试状态通知
    NT_WDMPROTECTION_SWITCH波分保护倒换通知
    NT_ATMPROTECTION_SWITCH ATM保护倒换通知
    NT_RPRPROTECTION_SWITCH RPR保护组倒换通知事件格式
    NT_IPPROTECTION_SWITCH Tunnel保护组倒换通知事件格式

    3.如何开订阅(SpringBoot为例)

    3.1登录NCE

    3.1.1CorbaLoginReq

    配置文件的登录参数如下

    huawei: 
      nce: 
        login: 
          corba:
            host: 127.0.0.1
            port: 12001
            userName: 111111
            passWord: 111111
    

    配置文件参数注入Spring Bean

    import org.springframework.boot.SpringBootConfiguration;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    
    import lombok.Data;
    
    @Data
    @SpringBootConfiguration
    @ConfigurationProperties(prefix = "huawei.nce.login.corba")
    public class CorbaLoginReq {
    	
    	private String host;
    	
    	private String port;
    	
    	private String userName;
    	
    	private String passWord;
    }
    

    3.1.2CorbaLoginRes

    登录返回参数

    import org.omg.DynamicAny.DynAnyFactory;
    
    import lombok.Data;
    import mtnm.tmforum.org.emsSession.EmsSession_I;
    
    @Data
    public class CorbaLoginRes {
    	private org.omg.CORBA.ORB orb;
    	private org.omg.PortableServer.POA rootPOA ;
    	private EmsSession_I emsSession;
    	private DynAnyFactory dynAnyFactory;
    }
    

    3.1.3TANmsSession_IImpl

    import mtnm.tmforum.org.nmsSession.NmsSession_IPOA;
    import mtnm.tmforum.org.session.Session_I;
    /**
     * NmsSession_IPOA for EMS(NCE) invoking. 
     * @author
     *
     */
    public class TANmsSession_IImpl extends NmsSession_IPOA {
    	public void eventLossCleared(String endTime) {
    		log("TANmsSession_IImpl.eventLossCleared(String endTime) is invoked by EMS(NCE).");
    		log("endTime:"+endTime);
    	}
    	public void eventLossOccurred(String startTime, String notificationId) {
    		log("TANmsSession_IImpl.eventLossOccurred(String startTime, String notificationId) is invoked by EMS.");
    		log("startTime:"+startTime+", notificationId:"+notificationId);
    	}
    	public Session_I associatedSession() {
    		log("TANmsSession_IImpl.associatedSession() is invoked by EMS(NCE).");
    		return null;
    	}
    	public void endSession() {
    		log("TANmsSession_IImpl.endSession() is invoked by EMS(NCE).");
    	}
    	public void ping() {
    		log("TANmsSession_IImpl.ping() is invoked by EMS(NCE).");
    	}
    	private static void log(String str){
    		System.out.println(str);
    	}
    }
    

    3.1.4BaseCorbaService

    public interface BaseCorbaService {
    
    	/**
    	 * @description:登录华为nce-corba
    	 * @author:hutao
    	 * @mail:hutao1@epri.sgcc.com.cn
    	 * @date:2024年3月1日 下午4:19:59
    	 */
    	CorbaLoginRes login();
    	
    	/**
    	 * @description:清空登录
    	 * @author:hutao
    	 * @mail:hutao1@epri.sgcc.com.cn
    	 * @date:2024年6月7日 下午3:24:02
    	 */
    	void clearLogin();
    }
    
    import java.util.Arrays;
    import java.util.List;
    
    import org.omg.CosNaming.NameComponent;
    import org.omg.DynamicAny.DynAnyFactory;
    import org.omg.DynamicAny.DynAnyFactoryHelper;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import com.collect.sdh.module.corba.entity.CorbaLoginReq;
    import com.collect.sdh.module.corba.entity.CorbaLoginRes;
    import com.collect.sdh.module.corba.entity.TANmsSession_IImpl;
    import com.collect.sdh.module.corba.service.BaseCorbaService;
    
    import mtnm.tmforum.org.common.Common_IHolder;
    import mtnm.tmforum.org.emsMgr.EMSMgr_I;
    import mtnm.tmforum.org.emsMgr.EMSMgr_IHelper;
    import mtnm.tmforum.org.emsSession.EmsSession_I;
    import mtnm.tmforum.org.emsSession.EmsSession_IHolder;
    import mtnm.tmforum.org.emsSession.EmsSession_IPackage.managerNames_THolder;
    import mtnm.tmforum.org.emsSessionFactory.EmsSessionFactory_I;
    import mtnm.tmforum.org.emsSessionFactory.EmsSessionFactory_IHelper;
    import mtnm.tmforum.org.equipment.EquipmentInventoryMgr_I;
    import mtnm.tmforum.org.equipment.EquipmentInventoryMgr_IHelper;
    import mtnm.tmforum.org.equipment.EquipmentOrHolderIterator_IHolder;
    import mtnm.tmforum.org.equipment.EquipmentOrHolderList_THolder;
    import mtnm.tmforum.org.equipment.EquipmentOrHolder_T;
    import mtnm.tmforum.org.equipment.ObjectAdditionalInfoList_THolder;
    import mtnm.tmforum.org.equipment.ObjectAdditionalInfo_T;
    import mtnm.tmforum.org.equipment.PhysicalLocationInfoList_THolder;
    import mtnm.tmforum.org.equipment.PhysicalLocationInfo_T;
    import mtnm.tmforum.org.globaldefs.NameAndStringValue_T;
    import mtnm.tmforum.org.globaldefs.NamingAttributesIterator_IHolder;
    import mtnm.tmforum.org.globaldefs.NamingAttributesList_THolder;
    import mtnm.tmforum.org.globaldefs.ProcessingFailureException;
    import mtnm.tmforum.org.managedElement.ManagedElementIterator_IHolder;
    import mtnm.tmforum.org.managedElement.ManagedElementList_THolder;
    import mtnm.tmforum.org.managedElement.ManagedElement_T;
    import mtnm.tmforum.org.managedElement.ManagedElement_THolder;
    import mtnm.tmforum.org.managedElementManager.ManagedElementMgr_I;
    import mtnm.tmforum.org.managedElementManager.ManagedElementMgr_IHelper;
    import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetworkMgr_I;
    import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetworkMgr_IHelper;
    import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetwork_T;
    import mtnm.tmforum.org.multiLayerSubnetwork.SubnetworkIterator_IHolder;
    import mtnm.tmforum.org.multiLayerSubnetwork.SubnetworkList_THolder;
    import mtnm.tmforum.org.nmsSession.NmsSession_I;
    import mtnm.tmforum.org.nmsSession.NmsSession_IPOA;
    import mtnm.tmforum.org.subnetworkConnection.CCIterator_IHolder;
    import mtnm.tmforum.org.subnetworkConnection.CrossConnectList_THolder;
    import mtnm.tmforum.org.subnetworkConnection.CrossConnect_T;
    import mtnm.tmforum.org.subnetworkConnection.Route_THolder;
    import mtnm.tmforum.org.subnetworkConnection.SNCIterator_IHolder;
    import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnectionList_THolder;
    import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnection_T;
    import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnection_THolder;
    import mtnm.tmforum.org.terminationPoint.TerminationPointIterator_IHolder;
    import mtnm.tmforum.org.terminationPoint.TerminationPointList_THolder;
    import mtnm.tmforum.org.terminationPoint.TerminationPoint_T;
    import mtnm.tmforum.org.topologicalLink.TopologicalLinkIterator_IHolder;
    import mtnm.tmforum.org.topologicalLink.TopologicalLinkList_THolder;
    import mtnm.tmforum.org.topologicalLink.TopologicalLink_T;
    
    @Service
    public class BaseCorbaServiceImpl implements BaseCorbaService {
    
    	@Autowired
    	private CorbaLoginReq loginReq;
    	
    	private CorbaLoginRes login;
    	
    	/**
    	 * @description:清空登录
    	 * @author:hutao
    	 * @mail:hutao1@epri.sgcc.com.cn
    	 * @date:2024年6月7日 下午3:24:02
    	 */
    	@Override
    	public void clearLogin() {
    		login = null;
    	}
    	
    	/**
    	 * @description:登录华为nce-corba
    	 * @author:hutao
    	 * @mail:hutao1@epri.sgcc.com.cn
    	 * @date:2024年3月1日 下午4:19:59
    	 */
    	@Override
    	public CorbaLoginRes login() {
    		if(login != null) {
    			/*本应该检测登录是否可用,如果可用,则返回登录信息,不可用则重新登录,(不知道是否可以使用emsSession.ping()来判断)
    			  但是没找到华为有这个接口,因此如果出现不可抗力因素导致登录无效,例如网络中断
    			  则通过com.collect.sdh.module.test.TestCorbaController.cleanLogin()清空登录
    			*/	
    			return login;
    		}
    		try {
    			login = new CorbaLoginRes();
    			String[] argv = new String[2];
    			argv[0] = "-ORBInitRef";
    			argv[1] = "NameService=corbaloc::" + loginReq.getHost() + ":" + loginReq.getPort() + "/NameService";
    			org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(argv, null);
    			org.omg.PortableServer.POA rootPOA = org.omg.PortableServer.POAHelper.narrow(orb.resolve_initial_references("RootPOA"));
    			rootPOA.the_POAManager().activate();
    			DynAnyFactory dynAnyFactory = DynAnyFactoryHelper.narrow(orb.resolve_initial_references("DynAnyFactory"));
    			org.omg.CosNaming.NamingContextExt nc = org.omg.CosNaming.NamingContextExtHelper.narrow(orb.resolve_initial_references("NameService"));
    			org.omg.CosNaming.NameComponent[] name;
    			name = new NameComponent[5];
    			name[0] = new NameComponent("TMF_MTNM", "Class");
    			name[1] = new NameComponent("HUAWEI", "Vendor");
    			name[2] = new NameComponent("Huawei/NCE", "EmsInstance");
    			name[3] = new NameComponent("2.0", "Version");
    			name[4] = new NameComponent("Huawei/NCE", "EmsSessionFactory_I");
    			EmsSessionFactory_I emsSessionFactory = EmsSessionFactory_IHelper.narrow(nc.resolve(name));
    			NmsSession_IPOA pNmsSessionServant = new TANmsSession_IImpl();
    			NmsSession_I nmsSession = pNmsSessionServant._this(orb);
    			EmsSession_IHolder emsSessionInterfaceHolder = new EmsSession_IHolder();
    			emsSessionFactory.getEmsSession(loginReq.getUserName(), loginReq.getPassWord(), nmsSession, emsSessionInterfaceHolder);
    			EmsSession_I emsSession = emsSessionInterfaceHolder.value;
    			login.setDynAnyFactory(dynAnyFactory);
    			login.setOrb(orb);
    			login.setRootPOA(rootPOA);
    			login.setEmsSession(emsSession);
    			return login;
    		} catch (Exception e) {
    			e.printStackTrace();
    			return null;
    		}
    	}
    }
    

    3.2定制通知

    3.2.1ConsumerNotice

    需要实现接口:org.omg.CosNotifyComm.StructuredPushConsumerPOA

    import java.util.HashMap;
    import java.util.Map;
    
    import org.omg.CosEventComm.Disconnected;
    import org.omg.CosNotification.EventType;
    import org.omg.CosNotification.StructuredEvent;
    import org.omg.CosNotifyComm.InvalidEventType;
    import org.omg.CosNotifyComm.StructuredPushConsumerPOA;
    import org.springframework.util.ObjectUtils;
    
    import com.collect.sdh.module.corba.entity.CorbaLoginRes;
    import com.collect.sdh.utils.AnyUtil;
    
    import lombok.extern.log4j.Log4j2;
    
    /**
     * @description:消费通知
     * @author:hutao
     * @mail:hutao1@epri.sgcc.com.cn
     * @date:2024年5月7日 上午10:57:26
     */
    @Log4j2
    public class ConsumerNotice extends StructuredPushConsumerPOA{
    
    	private CorbaLoginRes loginRes;
    	
    	public ConsumerNotice(CorbaLoginRes loginRes) {
    		super();
    		this.loginRes = loginRes;
    	}
    
    	private static Map<String, String> noticeTypes = new HashMap<>();
    	
    	static {
    		noticeTypes.put("NT_ALARM", "告警通知");
    		noticeTypes.put("NT_ALARM_UPDATED", "告警更新通知");
    		noticeTypes.put("NT_TCA", "性能越限告警通知");
    		noticeTypes.put("NT_OBJECT_CREATION", "对象创建通知");
    		noticeTypes.put("NT_OBJECT_DELETION", "对象删除通知");
    		noticeTypes.put("NT_ATTRIBUTE_VALUE_CHANGE", "属性改变通知");
    		noticeTypes.put("NT_STATE_CHANGE", "状态改变通知");
    		noticeTypes.put("NT_ROUTE_CHANGE", "路由改变通知");
    		noticeTypes.put("NT_PROTECTION_SWITCH", "保护倒换通知");
    		noticeTypes.put("NT_FILE_TRANSFER_STATUS", "文件传输状态通知");
    		noticeTypes.put("NT_EPROTECTION_SWITCH", "设备保护倒换通知事件");
    		noticeTypes.put("NT_ASON_RESOURCE_CHANGE", "智能资源改变通知");
    		noticeTypes.put("NT_PRBSTEST_STATUS", "伪随机码测试状态通知");
    		noticeTypes.put("NT_WDMPROTECTION_SWITCH", "波分保护倒换通知");
    		noticeTypes.put("NT_ATMPROTECTION_SWITCH", "ATM保护倒换通知");
    		noticeTypes.put("NT_RPRPROTECTION_SWITCH", "RPR保护组倒换通知事件格式");
    		noticeTypes.put("NT_IPPROTECTION_SWITCH", "Tunnel保护组倒换通知事件格式");
    	}
    	
    	@Override
    	public void disconnect_structured_push_consumer() {
    		log.info("Consumer disconnect_structured_push_consumer");
    	}
    
    	@Override
    	public void push_structured_event(StructuredEvent event) throws Disconnected {
    		String eventType = event.header.fixed_header.event_type.type_name;
    		Map<String, Object> eventData = new HashMap<>(event.filterable_data.length);
    		for (int i = 0; i < event.filterable_data.length; i++) {
    			if (!ObjectUtils.isEmpty(event.filterable_data[i])) {
    				eventData.put(event.filterable_data[i].name, AnyUtil.parseAny( event.filterable_data[i].value, loginRes.getDynAnyFactory()));
    			}
    		}
    		log.info("收到事件通知:{}<{}>,通知参数:{}",eventType, noticeTypes.get(eventType), eventData);
    	}
    
    	@Override
    	public void offer_change(EventType[] arg0, EventType[] arg1) throws InvalidEventType {
    		
    	}
    
    }
    

    3.2.2AnyUtil

    用于解析返回的信息。

    import org.omg.CORBA.Any;
    import org.omg.CORBA.TCKind;
    import org.omg.DynamicAny.DynAnyFactory;
    import org.omg.DynamicAny.DynArray;
    import org.omg.DynamicAny.DynEnum;
    import org.omg.DynamicAny.DynSequence;
    import org.omg.DynamicAny.DynStruct;
    import org.omg.DynamicAny.DynUnion;
    
    /**
     * @description:org.omg.DynamicAny格式化工具
     * @author:hutao
     * @mail:hutao1@epri.sgcc.com.cn
     * @date:2024年5月7日 上午11:33:17
     */
    public class AnyUtil {
    	
    	/**
    	 * @description:格式化数据
    	 * @author:hutao
    	 * @mail:hutao1@epri.sgcc.com.cn
    	 * @date:2024年5月7日 上午11:34:17
    	 */
        public static String parseAny(Any any, DynAnyFactory factory){
    		if( null==any ){
    			return null;
    		}
    		StringBuilder result = new StringBuilder();
    		try {
    			switch (any.type().kind().value()) {
    			case TCKind._tk_char:
    				result.append(any.extract_char());break;
    			case TCKind._tk_null:
    				break;
    			case TCKind._tk_boolean:
    				result.append(any.extract_boolean());
    				break;
    			case TCKind._tk_short:
    				result.append(any.extract_short());
    				break;
    			case TCKind._tk_long:
    				result.append(any.extract_long());
    				break;
    			case TCKind._tk_double:
    				result.append(any.extract_double());
    				break;
    			case TCKind._tk_float:
    				result.append(any.extract_float());
    				break;
    			case TCKind._tk_octet:
    				result.append(any.extract_octet());
    				break;
    			case TCKind._tk_ulong:
    				result.append(any.extract_ulong());
    				break;
    			case TCKind._tk_string:
    				result.append(any.extract_string());
    				break;
    			case TCKind._tk_enum:
    			{
    				DynEnum dynEnum = (DynEnum) factory.create_dyn_any(any);
    				result.append(dynEnum.get_as_string());
    				break;
    			}
    			case TCKind._tk_any:
    			{
    				any=factory.create_dyn_any(any).get_any();
    				result.append(any);
    				break;
    			}
    			case TCKind._tk_objref:
    			{
    				result.append(any.extract_Object());
    				break;
    			}
    			case TCKind._tk_struct:
    			case TCKind._tk_except:
    			{
    				DynStruct dynstruct = (DynStruct) factory.create_dyn_any(any);
    				org.omg.DynamicAny.NameValuePair[] members = dynstruct.get_members();
    				result.append("{");
    				for (int i = 0; i < members.length; i++) {
    					if(i>0){
    						result.append(" ");
    					}
    					result.append(members[i].id).append(" ").append(parseAny(members[i].value, factory));
    				}
    				result.append("}");
    				break;
    			}
    			case TCKind._tk_union:
    				DynUnion dynunion = (DynUnion) factory.create_dyn_any(any);
    				result.append(dynunion.member_name()).append(" ");
    				result.append(parseAny(dynunion.member().to_any(), factory));
    				break;
    			case TCKind._tk_sequence:
    				DynSequence dynseq = (DynSequence) factory.create_dyn_any(any);
    				Any[] contents = dynseq.get_elements();
    				result.append("{");
    				for (int i = 0; i < contents.length; i++){
    					result.append(parseAny(contents[i], factory));
    				}
    				result.append("}");
    				break;
    			case TCKind._tk_array:
    				DynArray dynarray = (DynArray) factory.create_dyn_any(any);
    				Any[] arrayContents = dynarray.get_elements();
    				result.append("{");
    				for (int i = 0; i < arrayContents.length; i++){
    					result.append(parseAny(arrayContents[i], factory)).append("");
    				}
    				result.append("}");
    				break;
    			default:
    				result.append(any.type().kind().value());
    			}
    		} catch (Exception ex) {
    			ex.printStackTrace();
    		}
    		return new String(result.toString().getBytes(StandardCharsets.ISO_8859_1));
    	}
    }
    

    3.3订阅通知

    SubscribeNotice 实现 Runnable,即订阅的时候,另起一个线程来订阅。该线程负责订阅。

    3.3.1SubscribeNotice

    import org.omg.CORBA.IntHolder;
    import org.omg.CORBA.Object;
    import org.omg.CosNotifyChannelAdmin.ClientType;
    import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
    import org.omg.CosNotifyChannelAdmin.EventChannelHolder;
    import org.omg.CosNotifyChannelAdmin.ProxySupplier;
    import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplier;
    import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierHelper;
    import org.omg.CosNotifyComm.StructuredPushConsumerHelper;
    import org.springframework.util.ObjectUtils;
    
    import com.collect.sdh.module.corba.entity.CorbaLoginRes;
    import com.collect.sdh.utils.JsonUtils;
    
    import lombok.extern.log4j.Log4j2;
    
    /**
     * @description:订阅消费通知
     * @author:hutao
     * @mail:hutao1@epri.sgcc.com.cn
     * @date:2024年5月7日 上午9:33:18
     */
    @Log4j2
    public class SubscribeNotice implements Runnable{
    
    	/**
    	 * 登录corba成功后的参数
    	 */
    	private CorbaLoginRes loginRes;
    	
    	/**
    	 * 记录订阅通知的通道ID的存储文件地址
    	 */
    	private String poxyIdPath;
    	
    	public SubscribeNotice(CorbaLoginRes loginRes, String poxyIdPath) {
    		super();
    		this.loginRes = loginRes;
    		this.poxyIdPath = poxyIdPath;
    	}
    
    	@Override
    	public void run() {
    		try {
    			//获取通道
    			IntHolder poxyId = new IntHolder();
    			poxyId.value = getPoxyId(poxyIdPath);
    			EventChannelHolder eventChannel = new EventChannelHolder();
    			loginRes.getEmsSession().getEventChannel(eventChannel);
    			//ConsumerNotice extends StructuredPushConsumerPOA 为消费者
    			ConsumerNotice consumerNotice = new ConsumerNotice(loginRes);
    			ConsumerAdmin defaultConsumerAdmin = eventChannel.value.default_consumer_admin();
    			//连接通道,如果发现通道已经打开,则先关闭之前的通道(已经打开的通道即使不可以,北向接口并未释放该接口的资源,但是会限制连接通道(数量 < 3))
    			try {
    				if (poxyId.value > 0){
    					log.info("释放旧的消费通道:{}", poxyId.value);
    					ProxySupplier oldSupplier = defaultConsumerAdmin.get_proxy_supplier(poxyId.value);
    					assert (oldSupplier != null);
    					StructuredProxyPushSupplier myOldPoxy = StructuredProxyPushSupplierHelper.narrow(oldSupplier);
    					myOldPoxy.disconnect_structured_push_supplier();
    				}
    			}catch (Exception e) {
    				e.printStackTrace();
    			}
    			ProxySupplier tmpSupplier = defaultConsumerAdmin.obtain_notification_push_supplier(ClientType.STRUCTURED_EVENT, poxyId);
    			StructuredProxyPushSupplier proxyPushSupplier = StructuredProxyPushSupplierHelper.narrow(tmpSupplier);
    			Object servant = loginRes.getRootPOA().servant_to_reference(consumerNotice);
    			proxyPushSupplier.connect_structured_push_consumer(StructuredPushConsumerHelper.narrow(servant));
    			savePoxyId(poxyIdPath, poxyId.value);
    			log.info("保存此次的消费通道:{}", poxyId.value);
    			loginRes.getOrb().run();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	/**
    	 * @description:获取已经连接的消费通道ID
    	 * @author:hutao
    	 * @mail:hutao1@epri.sgcc.com.cn
    	 * @date:2024年5月7日 上午10:40:57
    	 */
    	public int getPoxyId(String path) {
    		int poxyId = -1;
    		//备注,这里没有提供JsonUtils,这里你可以改为存储到数据库或者其他地方,这里我是将记录的poxyId 存储到文件中,因为我采集的程序不需要连接数据库
    		String str = JsonUtils.readStringFromSystemPath(path);
    		if(!ObjectUtils.isEmpty(str)) {
    			poxyId = Integer.parseInt(str);
    		}
    		return poxyId;
    	}
    	
    	/**
    	 * @description:保存已经连接的消费通道ID
    	 * @author:hutao
    	 * @mail:hutao1@epri.sgcc.com.cn
    	 * @date:2024年5月7日 上午10:41:33
    	 */
    	public void savePoxyId(String path, int poxyId) {
    			//备注,这里没有提供JsonUtils,这里你可以改为存储到数据库或者其他地方,这里我是将记录的poxyId 存储到文件中,因为我采集的程序不需要连接数据库
    		JsonUtils.writeStringToSystemPath(path, String.valueOf(poxyId));
    	}
    }
    

    3.3.2JsonUtils

    为了保证代码完整性,如果你完全抄上面的代码,这里提供了代码需要的两个文件操作示例

    
     public static String readStringFromSystemPath(String path) {
        	String data = "";
        	try {
        		InputStream inputStream = new FileInputStream(path);
        		byte[] bdata = FileCopyUtils.copyToByteArray(inputStream);
        		data = new String(bdata, StandardCharsets.UTF_8);
    		} catch (FileNotFoundException e) {
    			log.info("文件不存在,文件地址:{}", path);
    		} catch (Exception e) {
    			log.info("读取文件失败,文件地址:{},失败原因:{}", path,e.getMessage());
    		} 
    		return data;
        }
    
     public static void writeStringToSystemPath(String filePath, String str) {
    		Writer write = null;
    		try {
    			File file = new File(filePath);
    			if(file.exists()) {
    				file.delete();
    			}
    			if (!file.getParentFile().exists()) {
    				file.getParentFile().mkdirs();
    			}
    			if(file.createNewFile()) {
    				write = new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8);
    				write.write(str);
    				write.flush();
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			if(write !=null ) {
    				try {
    					write.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    

    3.4启动订阅

    这里我们使用SpringBoot启动的时候启动订阅,即实现ApplicationRunner,然后使用线程池的单线程来启动上面我们编写的线程。

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Component;
    
    import com.collect.sdh.module.corba.entity.CorbaLoginRes;
    import com.collect.sdh.module.corba.service.BaseCorbaService;
    
    /**
     * @description:启动订阅corba的消费
     * @author:hutao
     * @mail:hutao1@epri.sgcc.com.cn
     * @date:2024年5月7日 下午4:18:16
     */
    @Component
    public class SubscribeRunner implements ApplicationRunner  {
        
    	@Value(value = "${file-save-path}")
    	private String poxyIdPath;
    	
    	@Autowired
    	private BaseCorbaService baseCorbaService;
    
    	@Override
    	public void run(ApplicationArguments args) throws Exception {
    		poxyIdPath = poxyIdPath + "poxyId";
    		CorbaLoginRes login = baseCorbaService.login();
    		ExecutorService executor = Executors.newSingleThreadExecutor();
    		executor.submit(new SubscribeNotice(login, poxyIdPath));
    	}
    }
    

    4.效果展示

    在这里插入图片描述

  • 相关阅读:
    软考高级职称哪个好考?明确给你答案
    聊一聊什么是缓存击穿、雪崩、穿透?如何解决?
    AI人工智能小程序系统开发
    Docker学习笔记
    第一章 操作系统概述
    git的使用(详细教程)通过命令行操作及vscode插件
    SpringBoot_11_整合MyBatis
    农业4.0中麦田的精确杂草检测:实现技术、方法和研究挑战的综述
    基于vue和node.js的志愿者招募网站设计
    Django容易被遗忘却无比重要的框架默认文件介绍及使用方法
  • 原文地址:https://blog.csdn.net/m0_37892044/article/details/139675374