• Java使用milo读写OPC UA源代码示例


    Java使用milo读写OPC UA源代码示例

    OPC UA客户端工具UaExpert使用

    OPC UA客户端工具Softing OPC Client使用_推荐使用

    Milo官方源代码地址:
    https://github.com/eclipse/milo.git

    下载源代码

    git clone https://github.com/eclipse/milo.git
    
    • 1

    OPC UA读取测试

    在这里插入图片描述

    修改部分需要连接的OPC UA服务代码

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    读取OPC UA服务的状态和当前时间

    package org.eclipse.milo.examples.client;
    
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    
    import com.google.common.collect.ImmutableList;
    import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
    import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
    import org.eclipse.milo.opcua.stack.core.Identifiers;
    import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
    import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
    import org.eclipse.milo.opcua.stack.core.types.enumerated.ServerState;
    import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class ReadExample implements ClientExample {
    
        public static void main(String[] args) throws Exception {
            ReadExample example = new ReadExample();
    
    //        new ClientExampleRunner(example, true).run(); //启动opcua服务
            new ClientExampleRunner(example, false).run();
        }
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        @Override
        public void run(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws Exception {
            // synchronous connect
            client.connect().get();
    
            // synchronous read request via VariableNode
            UaVariableNode node = client.getAddressSpace().getVariableNode(Identifiers.Server_ServerStatus_StartTime);
            DataValue value = node.readValue();
    
            logger.info("StartTime======{}", value.getValue().getValue());
    
            // asynchronous read request
            readServerStateAndTime(client).thenAccept(values -> {
                DataValue v0 = values.get(0);
                DataValue v1 = values.get(1);
    
                logger.info("State====={}", ServerState.from((Integer) v0.getValue().getValue()));
                logger.info("CurrentTime===={}", v1.getValue().getValue());
    
                future.complete(client);
            });
        }
    
        private CompletableFuture<List<DataValue>> readServerStateAndTime(OpcUaClient client) {
            List<NodeId> nodeIds = ImmutableList.of(
                Identifiers.Server_ServerStatus_State,
                Identifiers.Server_ServerStatus_CurrentTime);
    
            return client.readValues(0.0, TimestampsToReturn.Both, nodeIds);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    OPC UA测点写入值

    package org.eclipse.milo.examples.client;
    
    import com.google.common.collect.ImmutableList;
    import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
    import org.eclipse.milo.opcua.stack.core.types.builtin.*;
    import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
    import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
    import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
    import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    
    import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.ubyte;
    import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
    
    public class WriteExample  implements ClientExample {
    
        public static void main(String[] args) throws Exception {
            WriteExample example = new WriteExample();
    
            new ClientExampleRunner(example, false).run();
        }
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        @Override
        public void run(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws Exception {
            //write(client);
            //addNode(client);
    //        writeList(client);
    
            // synchronous connect
            client.connect().get();
    
            //kepserver opcua 写入成功
            List<NodeId> nodeIds = ImmutableList.of(new NodeId(2, "tongdao.tag1.aaa"));
    //        List nodeIds = ImmutableList.of(new NodeId(2, "HelloWorld/ScalarTypes/Int32"));
    //        List nodeIds = ImmutableList.of(new NodeId(5, "abcd"));
    
            for (int i = 0; i < 10; i++) {
                Thread.sleep(500);
                Variant v = new Variant(i);
    
                // don't write status or timestamps
                DataValue dv = new DataValue(v, null, null);
    
                // write asynchronously....
                CompletableFuture<List<StatusCode>> f =
                    client.writeValues(nodeIds, ImmutableList.of(dv));
    
                // ...but block for the results so we write in order
                List<StatusCode> statusCodes = f.get();
                StatusCode status = statusCodes.get(0);
    
                if (status.isGood()) {
                    logger.info("Wrote '{}' to nodeId={}", v, nodeIds.get(0));
                }
            }
    
            future.complete(client);
        }
    
        private void writeList(OpcUaClient client) throws Exception {
            client.connect().get();
    
            List<NodeId> nodeIds = new ArrayList<>();
            nodeIds.add(new NodeId(5, "aaa"));
            nodeIds.add(new NodeId(5, "bbb"));
            nodeIds.add(new NodeId(5, "ccc"));
    
                // don't write status or timestamps
            List<DataValue> dataValues = new ArrayList<>();
            Variant v1 = new Variant(111);
            dataValues.add(new DataValue(v1, null, null));
    
            Variant v2 = new Variant(222);
            dataValues.add(new DataValue(v2, null, null));
    
            Variant v3 = new Variant(333);
            dataValues.add(new DataValue(v3, null, null));
    
                // write asynchronously....
            CompletableFuture<List<StatusCode>> f =
                    client.writeValues(nodeIds, dataValues);
    
            // ...but block for the results so we write in order
            List<StatusCode> statusCodes = f.get();
            statusCodes.forEach(statusCode -> {
                logger.info("Wrote '{}' to nodeId={},{},{}", statusCode.getValue(), statusCode.isGood(),
                        statusCode.isBad(), statusCode.toString());
            });
        }
    
        private void write(OpcUaClient opcUaClient) throws Exception {
            opcUaClient.connect().get();
    
            //写入值
            int v = 1000;
            NodeId nodeId = new NodeId(5,"abcd");
            Variant value = new Variant(v);
            DataValue dataValue = new DataValue(value,null,null);
            StatusCode statusCode = opcUaClient.writeValue(nodeId,dataValue).get();
    
            //打印true 写入成功
            System.out.println(statusCode.isGood());
        }
    
        private void addNode (OpcUaClient opcUaClient) throws Exception {
            opcUaClient.connect().get();
    
            //        List list = new ArrayList<>();
            NodeId nodeId1 = new NodeId(5,"www");
    //        list.add(nodeId1);
    //        //这个没用 不能创建
    //        CompletableFuture rrr = opcUaClient.registerNodes(list);
    //        RegisterNodesResponse rng = rrr.get();
    //        System.out.println(rng.getRegisteredNodeIds().length);
    
    //        DataValue valuenode = opcUaClient.readValue(0.0, TimestampsToReturn.Both, nodeId1).get();
    //        System.out.println("www=====" + valuenode.getValue().getValue().toString());
    
            //https://github.com/eclipse/milo/issues/531
            // milo 有人留言, 也没创建成功
            ObjectAttributes attributes = new ObjectAttributes(
                    uint(0b11111),
                    LocalizedText.english("My Object"),
                    LocalizedText.english("Description of My Object"),
                    uint(0),
                    uint(0),
                    ubyte(0)
            );
    
            ExtensionObject encodedAttributes = ExtensionObject.encode(
                    opcUaClient.getSerializationContext(),
                    attributes
            );
    
    //        ExpandedNodeId parentNodeId = new ExpandedNodeId(Identifiers.Server);
            ExpandedNodeId requestNewNodeID = new ExpandedNodeId(nodeId1.getNamespaceIndex(), null, "www");
    
            QualifiedName browserName = new QualifiedName(0, "testObject");
            NodeClass nodeClass = NodeClass.Object;
    
    //        ExtensionObject extensionObject = ExtensionObject.encode(opcUaClient.getSerializationContext(),
    //                new ByteString("attr".getBytes()), Identifiers.CreateSessionRequest_Encoding_DefaultBinary,
    //                OpcUaDefaultBinaryEncoding.getInstance());
    
    //        ExpandedNodeId typeDe = new ExpandedNodeId(Identifiers.BaseObjectType);
            AddNodesItem add = new AddNodesItem(null, nodeId1, requestNewNodeID, browserName,
                    nodeClass, encodedAttributes, null);
    
            List<AddNodesItem> list = new ArrayList<>();
            list.add(add);
            CompletableFuture<AddNodesResponse> re = opcUaClient.addNodes(list);
            AddNodesResponse addNodesResponse = re.get();
            System.out.println(addNodesResponse);
            //报错
            //status=Bad_ServiceUnsupported, description=The server does not support the requested service.
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165

    milo订阅OPC UA测点数据

    package org.eclipse.milo.examples.client;
    
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;
    
    import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
    import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
    import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
    import org.eclipse.milo.opcua.stack.core.AttributeId;
    import org.eclipse.milo.opcua.stack.core.Identifiers;
    import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
    import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
    import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
    import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
    import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
    import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
    import org.eclipse.milo.opcua.stack.core.types.structured.ContentFilter;
    import org.eclipse.milo.opcua.stack.core.types.structured.EventFilter;
    import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
    import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
    import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
    import org.eclipse.milo.opcua.stack.core.types.structured.SimpleAttributeOperand;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import static com.google.common.collect.Lists.newArrayList;
    import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
    
    public class EventSubscriptionExample implements ClientExample {
    
        public static void main(String[] args) throws Exception {
            EventSubscriptionExample example = new EventSubscriptionExample();
    
    //        new ClientExampleRunner(example, true).run();
            new ClientExampleRunner(example, false).run();
        }
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        private final AtomicLong clientHandles = new AtomicLong(1L);
    
        @Override
        public void run(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws Exception {
            // synchronous connect
            client.connect().get();
    
            // create a subscription and a monitored item
            UaSubscription subscription = client.getSubscriptionManager()
                .createSubscription(1000.0).get();
    
    //        ReadValueId readValueId = new ReadValueId(
    //            Identifiers.Server,
    //            AttributeId.EventNotifier.uid(),
    //            null,
    //            QualifiedName.NULL_VALUE
    //        );
    
            //创建订阅的变量
            NodeId nodeId22 = new NodeId(5,"Counter1");
            ReadValueId readValueId = new ReadValueId(nodeId22,AttributeId.EventNotifier.uid(),null,QualifiedName.NULL_VALUE);
    
            // client handle must be unique per item
            UInteger clientHandle = uint(clientHandles.getAndIncrement());
    
            EventFilter eventFilter = new EventFilter(
                new SimpleAttributeOperand[]{
                    new SimpleAttributeOperand(
                        Identifiers.BaseEventType,
                        new QualifiedName[]{new QualifiedName(0, "EventId")},
                        AttributeId.Value.uid(),
                        null),
                    new SimpleAttributeOperand(
                        Identifiers.BaseEventType,
                        new QualifiedName[]{new QualifiedName(0, "EventType")},
                        AttributeId.Value.uid(),
                        null),
                    new SimpleAttributeOperand(
                        Identifiers.BaseEventType,
                        new QualifiedName[]{new QualifiedName(0, "Severity")},
                        AttributeId.Value.uid(),
                        null),
                    new SimpleAttributeOperand(
                        Identifiers.BaseEventType,
                        new QualifiedName[]{new QualifiedName(0, "Time")},
                        AttributeId.Value.uid(),
                        null),
                    new SimpleAttributeOperand(
                        Identifiers.BaseEventType,
                        new QualifiedName[]{new QualifiedName(0, "Message")},
                        AttributeId.Value.uid(),
                        null)
                },
                new ContentFilter(null)
            );
    
            MonitoringParameters parameters = new MonitoringParameters(
                clientHandle,
                0.0,
                ExtensionObject.encode(client.getSerializationContext(), eventFilter),
                uint(10),
                true
            );
    
            MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
                readValueId,
                MonitoringMode.Reporting,
                parameters
            );
    
            List<UaMonitoredItem> items = subscription
                .createMonitoredItems(TimestampsToReturn.Both, newArrayList(request)).get();
    
            // do something with the value updates
            UaMonitoredItem monitoredItem = items.get(0);
    
            final AtomicInteger eventCount = new AtomicInteger(0);
    
            monitoredItem.setEventConsumer((item, vs) -> {
                logger.info(
                    "Event Received from {}",
                    item.getReadValueId().getNodeId());
    
                for (int i = 0; i < vs.length; i++) {
                    logger.info("\tvariant[{}]: {}", i, vs[i].getValue());
                }
    
                if (eventCount.incrementAndGet() == 3) {
                    future.complete(client);
                }
            });
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    package org.eclipse.milo.examples.client;
    
    public class EventSubscriptionExampleProsys extends EventSubscriptionExample {
    
        public static void main(String[] args) throws Exception {
            EventSubscriptionExampleProsys example = new EventSubscriptionExampleProsys();
    
            new ClientExampleRunner(example, false).run();
        }
    
        @Override
        public String getEndpointUrl() {
            return "opc.tcp://10.211.55.4:53530/OPCUA/SimulationServer";
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    package org.eclipse.milo.examples.client;
    
    import java.util.ArrayList;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Set;
    import java.util.concurrent.CompletableFuture;
    import java.util.function.BiConsumer;
    
    import com.google.common.collect.ImmutableList;
    import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
    import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
    import org.eclipse.milo.opcua.sdk.client.api.UaSession;
    import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
    import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
    import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
    import org.eclipse.milo.opcua.stack.core.AttributeId;
    import org.eclipse.milo.opcua.stack.core.UaException;
    import org.eclipse.milo.opcua.stack.core.serialization.SerializationContext;
    import org.eclipse.milo.opcua.stack.core.types.builtin.*;
    import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
    import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
    import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
    import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
    import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
    import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    public class SubscriptionExample implements ClientExample {
    
        public static void main(String[] args) throws Exception {
            SubscriptionExample example = new SubscriptionExample();
    
            new ClientExampleRunner(example).run();
        }
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        @Override
        public void run(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws Exception {
            // synchronous connect
            client.connect().get();
    
            //成功订阅
            //new UsernameProvider("opcua","opcua")
            testKepServerSub(client);
    
            //成功订阅
    //        testSub(client);
            //https://stackoverflow.com/questions/66247577/how-to-handle-resubscribing-after-server-is-disconnected-or-restarted-with-opc-u
            //重连后, 成功重新订阅  opcua服务重启后,监听器触发,清除订阅,重新订阅成功
    //        testSub22(client);
    
    //        testSub(client);
    
    //        testSub88(client);
            //成功订阅  opcua服务重启后,监听器触发,重新订阅成功(异步订阅可以)
    //        testSub99(client);
    
            //不行, 重连后, 在订阅,却订阅不了(同步订阅阻塞了), 只能做在线离线状态判断
    //        testSub33(client);
    
    //        testSubOne(client);
    
    //        testSubTwo(client);
    
           /* // create a subscription @ 1000ms
            UaSubscription subscription = client.getSubscriptionManager().createSubscription(1000.0).get();
    
            // subscribe to the Value attribute of the server's CurrentTime node
            ReadValueId readValueId = new ReadValueId(
                Identifiers.Server_ServerStatus_CurrentTime,
                AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE
            );
    
            //成功订阅
            NodeId nodeId = new NodeId(5,"Counter1");
            ReadValueId readValueId2 = new ReadValueId(nodeId, AttributeId.Value.uid(),null,null);
    
            // IMPORTANT: client handle must be unique per item within the context of a subscription.
            // You are not required to use the UaSubscription's client handle sequence; it is provided as a convenience.
            // Your application is free to assign client handles by whatever means necessary.
            UInteger clientHandle = subscription.nextClientHandle();
    
            MonitoringParameters parameters = new MonitoringParameters(
                clientHandle,
                1000.0,     // sampling interval
                null,       // filter, null means use default
                uint(10),   // queue size
                true        // discard oldest
            );
    
            MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
                readValueId2,
                MonitoringMode.Reporting,
                parameters
            );
    
            // when creating items in MonitoringMode.Reporting this callback is where each item needs to have its
            // value/event consumer hooked up. The alternative is to create the item in sampling mode, hook up the
            // consumer after the creation call completes, and then change the mode for all items to reporting.
            BiConsumer onItemCreated =
                (item, id) -> item.setValueConsumer(this::onSubscriptionValue);
    
            List items = subscription.createMonitoredItems(
                TimestampsToReturn.Both,
                newArrayList(request),
                onItemCreated
            ).get();
    
            for (UaMonitoredItem item : items) {
                if (item.getStatusCode().isGood()) {
                    logger.info("item created for nodeId={}", item.getReadValueId().getNodeId());
                } else {
                    logger.warn(
                        "failed to create item for nodeId={} (status={})",
                        item.getReadValueId().getNodeId(), item.getStatusCode());
                }
            }*/
    
            // let the example run for 5 seconds then terminate
            Thread.sleep(200000);
            future.complete(client);
        }
    
        private void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
            logger.info(
                "subscription value received: item={}, value={}, {}, {}, {}",
                item.getReadValueId().getNodeId(), value.getValue(),
                    item.getReadValueId().getNodeId().getNamespaceIndex(),
                    item.getReadValueId().getNodeId().getType(),
                    item.getReadValueId().getNodeId().getIdentifier());
        }
    
        private void testKepServerSub(OpcUaClient client) throws Exception {
            logger.info("-------------------------");
            Double publishInterval = 1000.0;
            int queueSize = 10;
            //创建指定发布间隔的订阅对象
            logger.info("&&&&&&&&&&&&&&&&");
            System.out.println(client.connect().isDone());
            logger.info("&&&&&&&&&&&&&&&&");
            System.out.println(client.getSession().isDone());
            ImmutableList<UaSubscription>  aa = client.getSubscriptionManager().getSubscriptions();
            System.out.println(aa.size());
            aa.forEach(ua -> {
                System.out.println(ua.getSubscriptionId());
            });
            //同步订阅
            UaSubscription subscription = client.getSubscriptionManager().createSubscription(publishInterval).get();
            logger.info("^^^^^^^^^^^^^");
            //创建监控项请求集合
            List<MonitoredItemCreateRequest> itemsToCreate = new ArrayList<>();
            Set<String> members = new HashSet<>();
            members.add("tongdao.tag1.aaa");
            members.add("aaa");
            logger.info("########");
            for (String member : members) {
                // KepServer 一个订阅器, 订阅5000点,都没报错
    //        for (int i = 0; i < 5000; i++) {
    //            NodeId nodeId = new NodeId(2, "tongdao.tag1.aaa");
                NodeId nodeId = new NodeId(2, member);
                ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(),null,null);
                //创建监控的参数
                MonitoringParameters parameters = new MonitoringParameters(
                        subscription.nextClientHandle(),    //监控项
                        publishInterval,     // sampling interval
                        null,       // filter, null means use default
                        UInteger.valueOf(queueSize),   // queue size
                        true        // discard oldest
                );
                //创建监控项请求
                //该请求最后用于创建订阅。
                MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
                //添加至请求集合
                itemsToCreate.add(request);
            }
            logger.info("**************");
            BiConsumer<UaMonitoredItem, Integer> biConsumer =
                    (item, id) -> item.setValueConsumer(this::onSubscriptionValue);
    
            List<UaMonitoredItem> items = subscription.createMonitoredItems(
                    TimestampsToReturn.Both,
                    itemsToCreate,
                    biConsumer
            ).get();
    
            logger.info("==============");
            for (UaMonitoredItem item : items) {
                if (item.getStatusCode().isGood()) {
                    System.out.println("item created for nodeId={}" + item.getReadValueId().getNodeId());
                } else {
                    System.out.println(
                            "failed to create item for nodeId={}" + item.getReadValueId().getNodeId() + " (status={})" +  item.getStatusCode());
                }
            }
    
        }
    
        //同步订阅
        private void testSub(OpcUaClient client) throws Exception {
            logger.info("-------------------------");
            Double publishInterval = 1000.0;
            int queueSize = 10;
            //创建指定发布间隔的订阅对象
            logger.info("&&&&&&&&&&&&&&&&");
            System.out.println(client.connect().isDone());
            logger.info("&&&&&&&&&&&&&&&&");
            System.out.println(client.getSession().isDone());
            ImmutableList<UaSubscription>  aa = client.getSubscriptionManager().getSubscriptions();
            System.out.println(aa.size());
            aa.forEach(ua -> {
                System.out.println(ua.getSubscriptionId());
            });
            //同步订阅
            UaSubscription subscription = client.getSubscriptionManager().createSubscription(publishInterval).get();
            logger.info("^^^^^^^^^^^^^");
            //创建监控项请求集合
            List<MonitoredItemCreateRequest> itemsToCreate = new ArrayList<>();
            Set<String> members = new HashSet<>();
            members.add("Counter1");
            members.add("aaa");
            members.add("bbb");
            members.add("fff");
            members.add("error");
            logger.info("########");
            for (String member : members) {
                NodeId nodeId = new NodeId(5, member);
                ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(),null,null);
                //创建监控的参数
                MonitoringParameters parameters = new MonitoringParameters(
                        subscription.nextClientHandle(),    //监控项
                        publishInterval,     // sampling interval
                        null,       // filter, null means use default
                        UInteger.valueOf(queueSize),   // queue size
                        true        // discard oldest
                );
                //创建监控项请求
                //该请求最后用于创建订阅。
                MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
                //添加至请求集合
                itemsToCreate.add(request);
            }
            logger.info("**************");
            BiConsumer<UaMonitoredItem, Integer> biConsumer =
                    (item, id) -> item.setValueConsumer(this::onSubscriptionValue);
    
            List<UaMonitoredItem> items = subscription.createMonitoredItems(
                    TimestampsToReturn.Both,
                    itemsToCreate,
                    biConsumer
            ).get();
    
            logger.info("==============");
            for (UaMonitoredItem item : items) {
                if (item.getStatusCode().isGood()) {
                    System.out.println("item created for nodeId={}" + item.getReadValueId().getNodeId());
                } else {
                    System.out.println(
                            "failed to create item for nodeId={}" + item.getReadValueId().getNodeId() + " (status={})" +  item.getStatusCode());
                }
            }
    
        }
    
        private void testSub22(OpcUaClient client) throws Exception {
            client.getSubscriptionManager().addSubscriptionListener(new UaSubscriptionManager.SubscriptionListener() {
                @Override
                public void onKeepAlive(UaSubscription subscription, DateTime publishTime) {
    
    
                }
    
                @Override
                public void onStatusChanged(UaSubscription subscription, StatusCode status) {
    
                }
    
                @Override
                public void onPublishFailure(UaException exception) {
    
                }
    
                @Override
                public void onNotificationDataLost(UaSubscription subscription) {
    
                }
    
                @Override
                public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
                    logger.info("oooooooooooooooo{}", statusCode);
                    //一旦连接断开,就清除诸如此类的订阅管理器, 重新订阅
                    client.getSubscriptionManager().clearSubscriptions();
                    try {
                        // 这个可以重新订阅到
                        testSub(client);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
    
            client.addSessionActivityListener(new SessionActivityListener(){
                @Override
                public void onSessionActive(UaSession session) {
                    logger.info("success-----------");
                    System.out.println(client.connect().isDone());
                    System.out.println(client.getSession().isDone());
                }
    
                @Override
                public void onSessionInactive(UaSession session) {
                    logger.info("fail-----------");
                    System.out.println(client.connect().isDone());
                    System.out.println(client.getSession().isDone());
                }
            });
    
        }
    
    
        private void testSub33(OpcUaClient client) throws Exception {
            SessionActivityListener aa = new SessionActivityListener(){
                @Override
                public void onSessionActive(UaSession session) {
                    logger.info("11111111111111{}");
                    try {
                        //不行,重新连接后,在去订阅不了, 只能做在线判断
                        testSub(client);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                }
    
                @Override
                public void onSessionInactive(UaSession session) {
                    logger.info("2222222222");
                    //离线判断
                }
            };
    
            client.addSessionActivityListener(aa);
            aa.onSessionActive(client.getSession().get());
        }
    
        private void testSub99(OpcUaClient client) throws Exception {
            SessionActivityListener aa = new SessionActivityListener(){
                @Override
                public void onSessionActive(UaSession session) {
                    logger.info("11111111111111{}");
                    try {
                        testSub88(client);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                }
    
                @Override
                public void onSessionInactive(UaSession session) {
                    logger.info("2222222222");
                }
            };
    
            client.addSessionActivityListener(aa);
            aa.onSessionActive(client.getSession().get());
        }
    
        //异步订阅
        private void testSub88(OpcUaClient client) {
            Double publishInterval = 1000.0;
            int queueSize = 10;
            //异步订阅
            client.getSubscriptionManager().createSubscription(publishInterval).whenCompleteAsync(new BiConsumer<UaSubscription, Throwable>() {
                @Override
                public void accept(UaSubscription uaSubscription, Throwable throwable) {
                    if (null != throwable) {
                        try {
                            throw throwable;
                        } catch (Throwable throwable1) {
                            throwable1.printStackTrace();
                        }
                    }
                    //创建监控项请求集合
                    List<MonitoredItemCreateRequest> itemsToCreate = new ArrayList<>();
                    Set<String> members = new HashSet<>();
                    members.add("Counter1");
                    members.add("aaa");
                    members.add("bbb");
                    members.add("fff");
                    members.add("error");
                    for (String member : members) {
                        NodeId nodeId = new NodeId(5, member);
                        ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(),null,null);
                        //创建监控的参数
                        MonitoringParameters parameters = new MonitoringParameters(
                                uaSubscription.nextClientHandle(),    //监控项
                                publishInterval,     // sampling interval
                                null,       // filter, null means use default
                                UInteger.valueOf(queueSize),   // queue size
                                true        // discard oldest
                        );
                        //创建监控项请求
                        //该请求最后用于创建订阅。
                        MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
                        //添加至请求集合
                        itemsToCreate.add(request);
                    }
    
    
                    uaSubscription.createMonitoredItems(
                            TimestampsToReturn.Both,
                            itemsToCreate,
                            new UaSubscription.ItemCreationCallback() {
                                @Override
                                public void onItemCreated(SerializationContext serializationContext, UaMonitoredItem item, int index) {
                                item.setValueConsumer(new BiConsumer<UaMonitoredItem, DataValue>() {
                                    @Override
                                    public void accept(UaMonitoredItem uaMonitoredItem, DataValue dataValue) {
                                        StatusCode statusCode = uaMonitoredItem.getStatusCode();
                                        logger.info("StatusCode: {}", statusCode);
                                        logger.info("aaa={}", uaMonitoredItem.getReadValueId().getNodeId().getIdentifier());
                                        logger.info(
                                                "subscription value received: item={}, value={}, {}, {}, {}",
                                                item.getReadValueId().getNodeId(), dataValue.getValue(),
                                                item.getReadValueId().getNodeId().getNamespaceIndex(),
                                                item.getReadValueId().getNodeId().getType(),
                                                item.getReadValueId().getNodeId().getIdentifier());
                                    }
                                });
                                }
                            });
    
                }
    
            });
        }
    
        private void testSubOne(OpcUaClient client) throws Exception {
            logger.info("----------------------------------------------");
            Double publishInterval = 1000.0;
            int queueSize = 10;
            //创建指定发布间隔的订阅对象
            logger.info("&&&&&&&&&&&&&&&&");
            System.out.println(client.connect().isDone());
            logger.info("&&&&&&&&&&&&&&&&");
            System.out.println(client.getSession().isDone());
            ImmutableList<UaSubscription>  aa = client.getSubscriptionManager().getSubscriptions();
            System.out.println("uasubscription" + aa.size());
            //同步订阅
            UaSubscription subscription = client.getSubscriptionManager().createSubscription(publishInterval).get();
            System.out.println("uasubscription" + aa.size());
            aa.forEach(ua -> {
                System.out.println(ua.getSubscriptionId());
            });
            logger.info("^^^^^^^^^^^^^");
            //创建监控项请求集合
            List<MonitoredItemCreateRequest> itemsToCreate = new ArrayList<>();
            Set<String> members = new HashSet<>();
            members.add("aaa");
            members.add("bbb");
            members.add("testone");
            logger.info("########");
            for (String member : members) {
                NodeId nodeId = new NodeId(5, member);
                ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(),null,null);
                //创建监控的参数
                MonitoringParameters parameters = new MonitoringParameters(
                        subscription.nextClientHandle(),    //监控项
                        publishInterval,     // sampling interval
                        null,       // filter, null means use default
                        UInteger.valueOf(queueSize),   // queue size
                        true        // discard oldest
                );
                //创建监控项请求
                //该请求最后用于创建订阅。
                MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
                //添加至请求集合
                itemsToCreate.add(request);
            }
            logger.info("**************");
            BiConsumer<UaMonitoredItem, Integer> biConsumer =
                    (item, id) -> item.setValueConsumer(this::onSubscriptionValue);
    
            List<UaMonitoredItem> items = subscription.createMonitoredItems(
                    TimestampsToReturn.Both,
                    itemsToCreate,
                    biConsumer
            ).get();
    
            logger.info("==============");
            for (UaMonitoredItem item : items) {
                if (item.getStatusCode().isGood()) {
                    System.out.println("item1111 created for nodeId={}" + item.getReadValueId().getNodeId());
                } else {
                    System.out.println(
                            "failed1111 to create item for nodeId={}" + item.getReadValueId().getNodeId() + " (status={})" +  item.getStatusCode());
                }
            }
    
            ImmutableList<UaSubscription>  bb = client.getSubscriptionManager().getSubscriptions();
            System.out.println("uasubscription" + bb.size());
        }
    
        private void testSubTwo(OpcUaClient client) throws Exception {
            logger.info("-------------------------------------------------------------");
            Double publishInterval = 1000.0;
            int queueSize = 10;
            //创建指定发布间隔的订阅对象
            logger.info("&&&&&&&&&&&&&&&&");
            System.out.println(client.connect().isDone());
            logger.info("&&&&&&&&&&&&&&&&");
            System.out.println(client.getSession().isDone());
            ImmutableList<UaSubscription>  aa = client.getSubscriptionManager().getSubscriptions();
            System.out.println("uasubscription" + aa.size());
            //同步订阅
            UaSubscription subscription = client.getSubscriptionManager().createSubscription(publishInterval).get();
            System.out.println("uasubscription" + aa.size());
            aa.forEach(ua -> {
                System.out.println(ua.getSubscriptionId());
            });
            logger.info("^^^^^^^^^^^^^");
            //创建监控项请求集合
            List<MonitoredItemCreateRequest> itemsToCreate = new ArrayList<>();
            Set<String> members = new HashSet<>();
            members.add("Counter1");
            members.add("fff");
            members.add("testTwo");
            logger.info("########");
            for (String member : members) {
                NodeId nodeId = new NodeId(5, member);
                ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(),null,null);
                //创建监控的参数
                MonitoringParameters parameters = new MonitoringParameters(
                        subscription.nextClientHandle(),    //监控项
                        publishInterval,     // sampling interval
                        null,       // filter, null means use default
                        UInteger.valueOf(queueSize),   // queue size
                        true        // discard oldest
                );
                //创建监控项请求
                //该请求最后用于创建订阅。
                MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
                //添加至请求集合
                itemsToCreate.add(request);
            }
            logger.info("**************");
            BiConsumer<UaMonitoredItem, Integer> biConsumer =
                    (item, id) -> item.setValueConsumer(this::onSubscriptionValue22);
    
            List<UaMonitoredItem> items = subscription.createMonitoredItems(
                    TimestampsToReturn.Both,
                    itemsToCreate,
                    biConsumer
            ).get();
    
            logger.info("==============");
            for (UaMonitoredItem item : items) {
                if (item.getStatusCode().isGood()) {
                    System.out.println("item2222 created for nodeId={}" + item.getReadValueId().getNodeId());
                } else {
                    System.out.println(
                            "failed2222 to create item for nodeId={}" + item.getReadValueId().getNodeId() + " (status={})" +  item.getStatusCode());
                }
            }
            ImmutableList<UaSubscription>  bb = client.getSubscriptionManager().getSubscriptions();
            System.out.println("uasubscription" + bb.size());
        }
    
        private void onSubscriptionValue22(UaMonitoredItem item, DataValue value) {
            logger.info(
                    "subscription222222 value received: item={}, value={}, {}, {}, {}",
                    item.getReadValueId().getNodeId(), value.getValue(),
                    item.getReadValueId().getNodeId().getNamespaceIndex(),
                    item.getReadValueId().getNodeId().getType(),
                    item.getReadValueId().getNodeId().getIdentifier());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375
    • 376
    • 377
    • 378
    • 379
    • 380
    • 381
    • 382
    • 383
    • 384
    • 385
    • 386
    • 387
    • 388
    • 389
    • 390
    • 391
    • 392
    • 393
    • 394
    • 395
    • 396
    • 397
    • 398
    • 399
    • 400
    • 401
    • 402
    • 403
    • 404
    • 405
    • 406
    • 407
    • 408
    • 409
    • 410
    • 411
    • 412
    • 413
    • 414
    • 415
    • 416
    • 417
    • 418
    • 419
    • 420
    • 421
    • 422
    • 423
    • 424
    • 425
    • 426
    • 427
    • 428
    • 429
    • 430
    • 431
    • 432
    • 433
    • 434
    • 435
    • 436
    • 437
    • 438
    • 439
    • 440
    • 441
    • 442
    • 443
    • 444
    • 445
    • 446
    • 447
    • 448
    • 449
    • 450
    • 451
    • 452
    • 453
    • 454
    • 455
    • 456
    • 457
    • 458
    • 459
    • 460
    • 461
    • 462
    • 463
    • 464
    • 465
    • 466
    • 467
    • 468
    • 469
    • 470
    • 471
    • 472
    • 473
    • 474
    • 475
    • 476
    • 477
    • 478
    • 479
    • 480
    • 481
    • 482
    • 483
    • 484
    • 485
    • 486
    • 487
    • 488
    • 489
    • 490
    • 491
    • 492
    • 493
    • 494
    • 495
    • 496
    • 497
    • 498
    • 499
    • 500
    • 501
    • 502
    • 503
    • 504
    • 505
    • 506
    • 507
    • 508
    • 509
    • 510
    • 511
    • 512
    • 513
    • 514
    • 515
    • 516
    • 517
    • 518
    • 519
    • 520
    • 521
    • 522
    • 523
    • 524
    • 525
    • 526
    • 527
    • 528
    • 529
    • 530
    • 531
    • 532
    • 533
    • 534
    • 535
    • 536
    • 537
    • 538
    • 539
    • 540
    • 541
    • 542
    • 543
    • 544
    • 545
    • 546
    • 547
    • 548
    • 549
    • 550
    • 551
    • 552
    • 553
    • 554
    • 555
    • 556
    • 557
    • 558
    • 559
    • 560
    • 561
    • 562
    • 563
    • 564
    • 565
    • 566
    • 567
    • 568
    • 569
    • 570
    • 571
    • 572
    • 573
    • 574
    • 575
    • 576
    • 577
    • 578
    • 579
    • 580
    • 581
    package org.eclipse.milo.examples.client;
    
    import com.google.common.collect.ImmutableList;
    import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
    import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
    import org.eclipse.milo.opcua.sdk.client.api.UaSession;
    import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
    import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
    import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
    import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
    import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
    import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
    import org.eclipse.milo.opcua.stack.core.AttributeId;
    import org.eclipse.milo.opcua.stack.core.UaException;
    import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
    import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
    import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
    import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
    import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
    import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
    import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
    import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
    import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
    import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.function.BiConsumer;
    
    import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
    
    /**
     * @author yinjinliang
     * @desc
     * @date 2021/3/16
     */
    public class Subscription {
    
        private static final Logger log = LoggerFactory.getLogger(Subscription.class);
    
        public static void main(String[] args) throws Exception{
            //OpcUaClient opcUaClient = getOpcUaClient("opc.tcp://192.168.3.48:53530/OPCUA/SimulationServer", null, null);
            //OpcUaClient opcUaClient2 = getOpcUaClient("opc.tcp://192.168.3.48:53530/OPCUA/SimulationServer", null, null);
            List<String> members = new ArrayList<>();
            members.add("Counter1");
            members.add("error");
            List<String> members2 = new ArrayList<>();
            members2.add("aaa");
            members2.add("bbb");
            members2.add("error222");
    
    //        UInteger id = testSub(members, opcUaClient);
    //        UInteger id2 = testSub(members2, opcUaClient);
            //testSub(members2, opcUaClient2);
    
            //同一个连接,重复添加监听器,断开重连时,会触发方法,
            // 清除订阅时,第2次会把上一个订阅清除, 然后重新订阅, 就只有members2的数据了
            // 可以只清除指定的订阅器,多个监听器就没有问题
    //        addListener(id, members, opcUaClient);
    //        addListener(id2, members2, opcUaClient);
            //addListener(members2, opcUaClient2);
    
            //注意: 一个连接最多100个订阅器,超过会报错
    //        test();
    //        test();
            testKepServer();
    
            Thread.sleep(200000);
        }
    
        public static void testKepServer() {
            //kepServer opcua设置opcua 最大连接数 10 时 超过10个连接报错
            //status=Bad_TooManySessions, message=The server has reached its maximum number of sessions.
            OpcUaClient client = getOpcUaClient("opc.tcp://192.168.3.56:49320", "opcua", "opcua");
    //        OpcUaClient client2 = getOpcUaClient("opc.tcp://192.168.3.56:49320", "opcua", "opcua");
    //        OpcUaClient client3 = getOpcUaClient("opc.tcp://192.168.3.56:49320", "opcua", "opcua");
    //        OpcUaClient client4 = getOpcUaClient("opc.tcp://192.168.3.56:49320", "opcua", "opcua");
    //        OpcUaClient client5 = getOpcUaClient("opc.tcp://192.168.3.56:49320", "opcua", "opcua");
    //        OpcUaClient client6 = getOpcUaClient("opc.tcp://192.168.3.56:49320", "opcua", "opcua");
    //        OpcUaClient client7 = getOpcUaClient("opc.tcp://192.168.3.56:49320", "opcua", "opcua");
    //        OpcUaClient client8 = getOpcUaClient("opc.tcp://192.168.3.56:49320", "opcua", "opcua");
    //        OpcUaClient client9 = getOpcUaClient("opc.tcp://192.168.3.56:49320", "opcua", "opcua");
    //        OpcUaClient client10 = getOpcUaClient("opc.tcp://192.168.3.56:49320", "opcua", "opcua");
            List<String> members = new ArrayList<>();
            members.add("tongdao.tag1.aaa");
            if (true) {
    //            return;
            }
            Double publishInterval = 1000.0;
            int queueSize = 10;
            //注意: KepServer 一个连接8000个订阅器,都没报错
            for (int i = 0; i < 1000; i++) {
                //创建指定发布间隔的订阅对象
                //同步订阅
                UaSubscription subscription = null;
                try {
                    subscription = client.getSubscriptionManager().createSubscription(publishInterval).get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                //创建监控项请求集合
                List<MonitoredItemCreateRequest> itemsToCreate = new ArrayList<>();
                for (String member : members) {
                    NodeId nodeId = new NodeId(2, member);
                    ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(),null,null);
                    //创建监控的参数
                    MonitoringParameters parameters = new MonitoringParameters(
                            subscription.nextClientHandle(),    //监控项
                            publishInterval,     // sampling interval
                            null,       // filter, null means use default
                            UInteger.valueOf(queueSize),   // queue size
                            true        // discard oldest
                    );
                    //创建监控项请求
                    //该请求最后用于创建订阅。
                    MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
                    //添加至请求集合
                    itemsToCreate.add(request);
                }
                BiConsumer<UaMonitoredItem, Integer> biConsumer =
                        (item, id) -> item.setValueConsumer(new BiConsumer<UaMonitoredItem, DataValue>() {
                            @Override
                            public void accept(UaMonitoredItem uaMonitoredItem, DataValue value) {
    //                            log.info(
    //                                    "subscription value received: item={}, value={}, {}, {}, {}",
    //                                    item.getReadValueId().getNodeId(), value.getValue(),
    //                                    item.getReadValueId().getNodeId().getNamespaceIndex(),
    //                                    item.getReadValueId().getNodeId().getType(),
    //                                    item.getReadValueId().getNodeId().getIdentifier());
                                log.info(
                                        "subscription value received: item={}, value={}, t={}, so={}, ser={}",
                                        item.getReadValueId().getNodeId().getIdentifier(),
                                        value.getValue().getValue(),
                                        uaMonitoredItem.getTimestamps().getValue(),
                                        value.getSourceTime().getJavaDate().getTime(),
                                        value.getServerTime().getJavaDate()
                                );
                            }
                        });
    
                List<UaMonitoredItem> items = null;
                try {
                    items = subscription.createMonitoredItems(
                            TimestampsToReturn.Both,
                            itemsToCreate,
                            biConsumer
                    ).get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
    
                for (UaMonitoredItem item : items) {
                    if (item.getStatusCode().isGood()) {
                        System.out.println("item created for nodeId={}" + item.getReadValueId().getNodeId());
                    } else {
                        System.out.println(
                                "failed to create item for nodeId={}" + item.getReadValueId().getNodeId() + " (status={})" +  item.getStatusCode());
                    }
                }
            }
        }
    
        public static void test() {
            OpcUaClient client = getOpcUaClient("opc.tcp://192.168.3.48:53530/OPCUA/SimulationServer", null, null);
            List<String> members = new ArrayList<>();
            members.add("aaa");
    
            Double publishInterval = 1000.0;
            int queueSize = 10;
            //注意: SimulationServer 一个连接最多100个订阅器,超过会报错
            for (int i = 0; i < 1; i++) {
                //创建指定发布间隔的订阅对象
                //同步订阅
                UaSubscription subscription = null;
                try {
                    subscription = client.getSubscriptionManager().createSubscription(publishInterval).get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                //创建监控项请求集合
                List<MonitoredItemCreateRequest> itemsToCreate = new ArrayList<>();
                for (String member : members) {
                    NodeId nodeId = new NodeId(5, member);
                    ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(),null,null);
                    //创建监控的参数
                    MonitoringParameters parameters = new MonitoringParameters(
                            subscription.nextClientHandle(),    //监控项
                            publishInterval,     // sampling interval
                            null,       // filter, null means use default
                            UInteger.valueOf(queueSize),   // queue size
                            true        // discard oldest
                    );
                    //创建监控项请求
                    //该请求最后用于创建订阅。
                    MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
                    //添加至请求集合
                    itemsToCreate.add(request);
                }
                BiConsumer<UaMonitoredItem, Integer> biConsumer =
                        (item, id) -> item.setValueConsumer(new BiConsumer<UaMonitoredItem, DataValue>() {
                            @Override
                            public void accept(UaMonitoredItem uaMonitoredItem, DataValue value) {
    //                            log.info(
    //                                    "subscription value received: item={}, value={}, {}, {}, {}",
    //                                    item.getReadValueId().getNodeId(), value.getValue(),
    //                                    item.getReadValueId().getNodeId().getNamespaceIndex(),
    //                                    item.getReadValueId().getNodeId().getType(),
    //                                    item.getReadValueId().getNodeId().getIdentifier());
                                log.info(
                                        "subscription value received: item={}, value={}, t={}, so={}, ser={}",
                                        item.getReadValueId().getNodeId().getIdentifier(),
                                        value.getValue().getValue(),
                                        uaMonitoredItem.getTimestamps().getValue(),
                                        value.getSourceTime().getJavaDate().getTime(),
                                        value.getServerTime().getJavaDate()
                                        );
                            }
                        });
    
                List<UaMonitoredItem> items = null;
                try {
                    items = subscription.createMonitoredItems(
                            TimestampsToReturn.Both,
                            itemsToCreate,
                            biConsumer
                    ).get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
    
                for (UaMonitoredItem item : items) {
                    if (item.getStatusCode().isGood()) {
                        System.out.println("item created for nodeId={}" + item.getReadValueId().getNodeId());
                    } else {
                        System.out.println(
                                "failed to create item for nodeId={}" + item.getReadValueId().getNodeId() + " (status={})" +  item.getStatusCode());
                    }
                }
            }
        }
    
        private static OpcUaClient getOpcUaClient(String endPointUrl, String username, String pwd) {
            OpcUaClient opcUaClient;
            //log.info("初始化OPC UA Client......{}", endPointUrl);
            try {
                IdentityProvider identityProvider;
                if (username != null && pwd != null) {
                    identityProvider = new UsernameProvider(username, pwd);
                } else {
                    identityProvider = new AnonymousProvider();
                }
                opcUaClient = OpcUaClient.create(
                        endPointUrl,
                        endpoints ->
                                endpoints.stream()
                                        .findFirst(),
                        configBuilder ->
                                configBuilder
                                        .setIdentityProvider(identityProvider)
                                        .setRequestTimeout(uint(5000))
                                        .build()
                );
                //log.info("初始化OPC UA Client......成功");
            } catch (Exception e) {
                log.error("初始化OPC UA Client失败, opcua={}, error={}", endPointUrl, e.getMessage());
                return null;
            }
            if (!opcUaClient.getSession().isDone()) {
                try {
                    // synchronous connect
                    opcUaClient.connect().get();
                    log.info("OPC UA Client连接connect成功");
                } catch (Exception e) {
                    log.error("OPC UA Client连接connect失败, opcua={}, error={}", endPointUrl, e.getMessage());
                    opcUaClient.disconnect();
                    return null;
                }
            }
            return opcUaClient;
        }
    
        private static  UInteger testSub(List<String> list, OpcUaClient client) {
            Double publishInterval = 1000.0;
            int queueSize = 10;
            //创建指定发布间隔的订阅对象
            ImmutableList<UaSubscription> aa = client.getSubscriptionManager().getSubscriptions();
            //同步订阅
            UaSubscription subscription = null;
            UInteger idaa = null;
            try {
                subscription = client.getSubscriptionManager().createSubscription(publishInterval).get();
                idaa = subscription.getSubscriptionId();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            //创建监控项请求集合
            List<MonitoredItemCreateRequest> itemsToCreate = new ArrayList<>();
            for (String member : list) {
                NodeId nodeId = new NodeId(5, member);
                ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(),null,null);
                //创建监控的参数
                MonitoringParameters parameters = new MonitoringParameters(
                        subscription.nextClientHandle(),    //监控项
                        publishInterval,     // sampling interval
                        null,       // filter, null means use default
                        UInteger.valueOf(queueSize),   // queue size
                        true        // discard oldest
                );
                //创建监控项请求
                //该请求最后用于创建订阅。
                MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
                //添加至请求集合
                itemsToCreate.add(request);
            }
            BiConsumer<UaMonitoredItem, Integer> biConsumer =
                    (item, id) -> item.setValueConsumer(new BiConsumer<UaMonitoredItem, DataValue>() {
                        @Override
                        public void accept(UaMonitoredItem uaMonitoredItem, DataValue value) {
                            log.info(
                                    "subscription value received: item={}, value={}, {}, {}, {}",
                                    item.getReadValueId().getNodeId(), value.getValue(),
                                    item.getReadValueId().getNodeId().getNamespaceIndex(),
                                    item.getReadValueId().getNodeId().getType(),
                                    item.getReadValueId().getNodeId().getIdentifier());
                        }
                    });
    
            List<UaMonitoredItem> items = null;
            try {
                items = subscription.createMonitoredItems(
                        TimestampsToReturn.Both,
                        itemsToCreate,
                        biConsumer
                ).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    
            for (UaMonitoredItem item : items) {
                if (item.getStatusCode().isGood()) {
                    System.out.println("item created for nodeId={}" + item.getReadValueId().getNodeId());
                } else {
                    System.out.println(
                            "failed to create item for nodeId={}" + item.getReadValueId().getNodeId() + " (status={})" +  item.getStatusCode());
                }
            }
            return idaa;
        }
    
        private static void addListener(UInteger id, List<String> list, OpcUaClient client) {
            client.getSubscriptionManager().addSubscriptionListener(new UaSubscriptionManager.SubscriptionListener() {
                @Override
                public void onKeepAlive(UaSubscription subscription, DateTime publishTime) {
    
    
                }
    
                @Override
                public void onStatusChanged(UaSubscription subscription, StatusCode status) {
    
                }
    
                @Override
                public void onPublishFailure(UaException exception) {
    
                }
    
                @Override
                public void onNotificationDataLost(UaSubscription subscription) {
    
                }
    
                @Override
                public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
                    log.info("oooooooooooooooo{}", statusCode);
                    //一旦连接断开,就清除诸如此类的订阅管理器, 重新订阅
    //                client.getSubscriptionManager().clearSubscriptions();
                    //清除指定id的订阅器
                    client.getSubscriptionManager().deleteSubscription(id);
                    try {
                        // 这个可以重新订阅到
                        System.out.println("**********" + list.size());
                        for (String member : list) {
                            System.out.println(member);
                        }
                        testSub(list, client);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
    
            client.addSessionActivityListener(new SessionActivityListener(){
                @Override
                public void onSessionActive(UaSession session) {
                    log.info("success-----------");
                    System.out.println(client.connect().isDone());
                    System.out.println(client.getSession().isDone());
                }
    
                @Override
                public void onSessionInactive(UaSession session) {
                    log.info("fail-----------");
                    System.out.println(client.connect().isDone());
                    System.out.println(client.getSession().isDone());
                }
            });
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375
    • 376
    • 377
    • 378
    • 379
    • 380
    • 381
    • 382
    • 383
    • 384
    • 385
    • 386
    • 387
    • 388
    • 389
    • 390
    • 391
    • 392
    • 393
    • 394
    • 395
    • 396
    • 397
    • 398
    • 399
    • 400
    • 401
    • 402
    • 403
    • 404
    • 405
    • 406
    • 407
    • 408
    • 409
    • 410
    • 411
    • 412
    • 413
    • 414
    • 415
    • 416
    • 417
    • 418
    • 419
    • 420
    • 421
    • 422
    • 423
    • 424
    • 425
    • 426
  • 相关阅读:
    【react基础02】编写函数式组件和类组件
    [羊城杯 2023] web
    懒羊羊闲话1
    java计算机毕业设计考勤管理系统源码+mysql数据库+系统+lw文档+部署
    彻底了解线程池的原理——40行从零开始自己写线程池
    用Python制作我的核酸检测日历
    js中事件委托和事件绑定之间的区别
    Linux内核分析与应用6-系统调用
    C++数据结构(上):模拟实现AVL
    【场景化解决方案】电商平台与钉钉打通,实现客户静默下单催付
  • 原文地址:https://blog.csdn.net/yinjl123456/article/details/126107792