• 在.NET Framework中使用RocketMQ(阿里云版)实战【第二章】


    章节
    第一章:https://www.cnblogs.com/kimiliucn/p/17662052.html
    第二章:https://www.cnblogs.com/kimiliucn/p/17667200.html


    cover(1).png

    作者:西瓜程序猿
    主页传送门:https://www.cnblogs.com/kimiliucn/


    上一章节主要介绍了RocketMQ基本介绍和前期准备,以及如何创建生产者。那这一章节主要介绍一下消费端的实现、如何发布消费端,以及遇到的坑怎么去解决。


    如果不知道怎么选,或者不知道怎么买云消息队列RocketMQ(阿里云版)?可以联系我[西瓜程序猿],如果需要特价购买可以通过下面地址访问:

    活动地址:https://www.aliyun.com/activity?userCode=tkq1f513

    image.png


    四、消费端实现

    4.1-创建消费者

    4.1.1-创建Windows服务项目

    (1)右击解决方案,然后依次点击【添加】——>【新建项目】,然后选择【 Windows 服务(.NET Framework) 】,点击下一步。

    注意:Windows服务只有在.NET Framework版本中才有了,在跨平台中使用Worker Service。

    image.png
    (2)修改项目名称,项目名称[西瓜程序猿]写的是【RocketMQ.Consumer】,然后框架选择的是【.NET Farmework 4.8】,这个可以根据自己的需要填写和选择,然后点击【创建】。
    image.png
    创建好的目录如下:【Program.cs】是主程序的入口,【Service1.cs】是服务的入口,可以创建多个,然后在Prodrams.cs中配置就好了。
    image.png
    (3)【Service1】服务名称可以重命名修改,此处我重命名为【RocketMQConsumerService】, Program.cs文件中也相对应的也要进行修改。
    image.png
    image.png
    (4)然后我们就可以在【RocketMQConsumerService】中写业务逻辑代码了,有很多种方式可以定位到要写的具体代码文件,先列举两种常用的。
    方法一:在【program.cs】文件中,找到这个类,按键盘上的F12可以直接进入查看文件。
    image.png
    方法二:直接右击,然后点击【查看代码】。
    image.png
    业务代码写到这里面:
    image.png
    到这一步消费者服务就创建好了,然后就写具体的业务代码就行了。注意:服务必须至少重写 OnStart 和 OnStop 才有用。


    4.1.2-项目依赖配置

    (1)在使用Visual Studio(VS)开发.NET的应用程序和类库时,默认的目标平台为“Any CPU”。但是.NET SDK仅支持Windows 64-bit操作系统,所以需要自行设置。先右击【RocketMQ.Consumer】项目,然后点击【属性】。
    image.png
    (2)点击左侧选项的【生成】,然后将目标平台改为【x64】。
    image.png

    (3)将资源包【ONSClient4CPP】文件夹里面所有的文件,复制到【bin/Debug】目录下。
    资源包:
    image.png
    项目:
    image.png


    4.1.3-配置日志(log4net)

    (1)为了方便测试,先介绍一下如何使用log4net做日志记录,当日志启动时和停止时我们记录一下。我们在项目目录下新建一个文件夹【LogConfig】,然后再创建一个文件为【log4net.config】。
    image.png
    (2)【log4net.config】内容如下。

    
    <configuration>
    	<configSections>
    		<section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net"/>
    	configSections>
    
    	<system.web>
    		<compilation debug="true" targetFramework="4.5.2" />
    		<httpRuntime targetFramework="4.5.2" />
    	system.web>
    	<log4net>
    		
    		
    		
    		<appender name="ErrorAppender" type="log4net.Appender.RollingFileAppender">
    			
    			<file value="log/error/error_" />
    			
    			<appendToFile value="true"/>
    			
    			<rollingStyle value="Date"/>
    			
    			<datePattern value="yyyy-MM-dd'.log'"/>
    			
    			<staticLogFileName value="false"/>
    			
    			<param name="MaxSizeRollBackups" value="100"/>
    			
    			<maximumFileSize value="50MB" />
    			
    			<layout type="log4net.Layout.PatternLayout">
    				
    				
    				
    
    				
    				
    				<conversionPattern value="%n==========
                                      %n【日志级别】%-5level
                                      %n【记录时间】%date
                                      %n【执行时间】[%r]毫秒
                                      %n【出错文件】%F
                                      %n【出错行号】%L
                                      %n【出错的类】%logger 属性[%property{NDC}]
                                      %n【错误描述】%message
                                      %n【错误详情】%newline"/>
    			layout>
    			<filter type="log4net.Filter.LevelRangeFilter,log4net">
    				<levelMin value="ERROR" />
    				<levelMax value="FATAL" />
    			filter>
    		appender>
    
    		
    		
    		
    		<appender name="DebugAppender" type="log4net.Appender.RollingFileAppender">
    			
    			<file value="log/debug/debug_" />
    			
    			<appendToFile value="true"/>
    			
    			<rollingStyle value="Date"/>
    			
    			<datePattern value="yyyy-MM-dd'.log'"/>
    			
    			<staticLogFileName value="false"/>
    			
    			<param name="MaxSizeRollBackups" value="100"/>
    			
    			<maximumFileSize value="50MB" />
    			
    			<layout type="log4net.Layout.PatternLayout">
    				
    				
    				
    
    				
    				
    				<conversionPattern value="%n==========
                                      %n【日志级别】%-2level
                                      %n【记录时间】%date
                                      %n【执行时间】[%r]毫秒
                                      %n【debug文件】%F
                                      %n【debug行号】%L
                                      %n【debug类】%logger 属性[%property{NDC}]
                                      %n【debug描述】%message"/>
    			layout>
    			<filter type="log4net.Filter.LevelRangeFilter,log4net">
    				<levelMin value="DEBUG" />
    				<levelMax value="WARN" />
    			filter>
    		appender>
    
    
    		
    		
    		
    		<appender name="INFOAppender" type="log4net.Appender.RollingFileAppender">
    			
    			<file value="log/info/info_" />
    			
    			<appendToFile value="true"/>
    			
    			<rollingStyle value="Date"/>
    			
    			<datePattern value="yyyy-MM-dd'.log'"/>
    			
    			<staticLogFileName value="false"/>
    			
    			<param name="MaxSizeRollBackups" value="100"/>
    			
    			<maximumFileSize value="50MB" />
    			
    			<layout type="log4net.Layout.PatternLayout">
    				
    				
    				
    
    				
    				
    				<conversionPattern value="%n==========
                                      %n【日志级别】%-2level
                                      %n【记录时间】%date
                                      %n【执行时间】[%r]毫秒
                                      %n【info文件】%F
                                      %n【info行号】%L
                                      %n【info类】%logger 属性[%property{NDC}]
                                      %n【info描述】%message"/>
    			layout>
    			<filter type="log4net.Filter.LevelRangeFilter,log4net">
    				<levelMin value="INFO" />
    				<levelMax value="WARN" />
    			filter>
    		appender>
    
    		
    		<root>
    			
    			<level value="ALL" />
    			<appender-ref ref="DebugAppender" />
    			<appender-ref ref="ErrorAppender" />
    			<appender-ref ref="INFOAppender" />
    		root>
    	log4net>
    configuration>
    

    image.png(3)并且右击【log4net.config】文件,点击【属性】,然后将[复制到输出目录]设置为【始终复制】。
    image.png
    (4)然后安装log4net。在项目目录中右击【引用】,然后点击【管理NuGet程序包】
    image.png
    (5)然后点击浏览,搜索【log4net】,右侧点击安装。
    image.png
    (6)重要:然后配置【AssemblyInfo.cs 】文件,如果不配置,是输出不了日志的。
    image.png
    添加到底部即可:(如果你的【log4net.config】文件路径和我的不一样,记得修改成跟自己配置路径一样的)。
    image.png
    代码:

    [assembly: log4net.Config.XmlConfigurator(ConfigFileExtension = "config", ConfigFile = "LogConfig/log4net.config", Watch = true)]
    

    (7)在服务启动方法【OnStart】中,配置启动log4net。
    image.png
    代码:

             XmlConfigurator.Configure(new System.IO.FileInfo("LogConfig/log4net.config"));
    

    (8)然后就可以使用log4net了,首先在Windows服务中获得log4net的实例。
    image.png
    代码:

    private static readonly log4net.ILog logger = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
    

    4.2-配置连接信息

    (1)然后右击【RocketMQ.Consumer】项目下,点击【引用】,然后将【RocketMQ.Core】项目勾选上确定。
    image.png
    (2)然后将前期准备的基本信息放在配置文件中。在【App.config】文件进行配置。
    image.png
    代码:

    
    <add key="ons_access_key" value="xxx" />
    
    <add key="ons_secret_key" value="xxx" />
    
    <add key="ons_topic" value="XG_CXY_Test" />
    
    <add key="ons_groupId" value="XG_CXY_Group_Test" />
    
    <add key="ons_name_srv" value="xxx-xxx-xxx-xxx.rmq.aliyuncs.com:8080" />
    
    <add key="ons_client_code" value="XG_CXY_Consumer_Develop" />
    

    (3)然后创建一个【Config】文件夹,写一个获得【ConfigSetting】配置文件的帮助类。
    image.png
    代码:

        /// 
        /// 配置文件
        /// 
        public class ConfigGeter
        {
            private static T TryGetValueFromConfig<T>(Func<string, T> parseFunc, Func defaultTValueFunc, [CallerMemberName] string key = "", string supressKey = "")
            {
                try
                {
                    if (!string.IsNullOrWhiteSpace(supressKey))
                    {
                        key = supressKey;
                    }
    
                    var node = ConfigurationManager.AppSettings[key];
                    return !string.IsNullOrEmpty(node) ? parseFunc(node) : defaultTValueFunc();
                }
                catch (Exception ex)
                {
                    return default(T);
                }
            }
    
            #region 消息队列:RocketMQ
            /// 
            /// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例用户名。
            /// 
            public static string ons_access_key
            {
                get
                {
                    return TryGetValueFromConfig(_ => _, () => string.Empty);
                }
            }
    
            /// 
            /// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例密码。
            /// 
            public static string ons_secret_key
            {
                get
                {
                    return TryGetValueFromConfig(_ => _, () => string.Empty);
                }
            }
    
            /// 
            ///  您在云消息队列 RocketMQ 版控制台创建的Topic。
            /// 
            public static string ons_topic
            {
                get
                {
                    return TryGetValueFromConfig(_ => _, () => string.Empty);
                }
            }
    
            /// 
            /// 设置为您在云消息队列 RocketMQ 版控制台创建的Group ID。
            /// 
            public static string ons_groupId
            {
                get
                {
                    return TryGetValueFromConfig(_ => _, () => string.Empty);
                }
            }
    
            /// 
            /// 设置为您从云消息队列 RocketMQ 版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            /// 
            public static string ons_name_srv
            {
                get
                {
                    return TryGetValueFromConfig(_ => _, () => string.Empty);
                }
            }
    
            /// 
            /// 消息来源(生产者/消费端客户端编码)
            /// 
            public static string ons_client_code
            {
                get
                {
                    return TryGetValueFromConfig(_ => _, () => string.Empty);
                }
            }
            #endregion
        }
    

    4.3-封装核心代码

    (1)新建一个【ConsumerStartup】文件,这个类继承自【MessageListener】类,然后实现consume方法,这个方法主要是消费者具体要执行的任务。
    image.png
    代码:

    /// 
        /// 消费端启动
        /// 
        public class ConsumerStartup : MessageListener
        {
            private readonly static ILog logger = LogManager.GetLogger(typeof(ConsumerStartup));
            private readonly static ConsumerManager manager = new ConsumerManager();
            private readonly string _consumerClientCode;
            private readonly string _ons_groupId;
    
            /// 
            /// 构造函数
            /// 
            /// 消费者客户端Code
            /// 消费者消费的分组
            public ConsumerStartup(string consumerClientCode, string ons_groupId)
            {
                _consumerClientCode = consumerClientCode;
                _ons_groupId = ons_groupId;
            }
    
            ~ConsumerStartup()
            {
            }
    
            /// 
            /// 消费者任务
            /// 
            /// 
            /// 
            /// 
            public override ons.Action consume(Message value, ConsumeContext context)
            {
                Console.WriteLine("【消费者任务】:消费者消息进来了...");
                logger.Info($"【消费者任务】:消费者消息进来了...");
    
                string topic = value.getTopic();
                string business_id = value.getKey();
                string message_id = value.getMsgID();
                string msg_tag = value.getTag();
                byte[] bytes = Encoding.Default.GetBytes(value.getBody());
                string msg_body = Encoding.Default.GetString(bytes);
                if (string.IsNullOrEmpty(msg_body))
                {
                    return ons.Action.CommitMessage;
                };
    
                string log_body = $"本次消费的消息:【消费序列:{value.getQueueOffset()}】【消息key:{business_id}】【消息ID:{message_id}】【Tag:{msg_tag}】";
                Console.WriteLine(log_body);
                logger.Info(log_body);
                logger.Info($"【消费内容】:{msg_body}");
    
                int status = 1;
                string error_msg = "";
                long sys_msg_id = 0;
                QueueOnsCommonModel consumerModel = null;
    
                try
                {
                    //调度到具体的消费者
                    consumerModel = JsonUtility.DeserializeJSON(msg_body);
                    if (consumerModel != null)
                    {
                        logger.Info($"【消费者任务】:真正开始执行了(消息key:{consumerModel.MessageId})");
                        if (!long.TryParse(consumerModel.MessageId, out sys_msg_id))
                        {
                            logger.Info("sys_msg_id 转换失败!");
                        }
    
                        manager.ExecuteConsumer(consumerModel.Tag, consumerModel.EventType, consumerModel);
    
                        logger.Info($"【消费者任务】:执行完成了(消息key:{consumerModel.MessageId})");
                    }
                    else
                    {
                        status = 2;
                        error_msg = "【调度到具体的消费者】解析消息body内容为空,无法进行消费";
                        logger.Error($"【调度到具体的消费者】解析消息body内容为空,无法进行消费");
                    }
                }
                catch (Exception ex)
                {
                    logger.Error($"【消费者任务】:发生异常了:{ex.Message}", ex);
                    status = 2;
                    error_msg = ex.Message;
                }
    
                return ons.Action.CommitMessage;
            }
        }
    

    4.4-启动消费者

    在【RocketMQConsumerService.cs】文件OnStart方法中创建生产者,主要就是从配置文件中获得配置信息,然后调用【QueueOnsProducer.CreatePushConsumer】方法创建消息队列生产者,通过调用【QueueOnsProducer.SetPushConsumer】方法来设置生产者,最后通过调用【QueueOnsProducer.StartPushConsumer】方法来启动生产者。
    image.png
    代码:

    //创建消费者
                string ons_access_key = ConfigSetting.ons_access_key;
                string ons_secret_key = ConfigSetting.ons_secret_key;
                string ons_topic = ConfigSetting.ons_topic;
                string ons_groupId = ConfigSetting.ons_groupId;
                string ons_name_srv = ConfigSetting.ons_name_srv;
                string ons_client_code = ConfigSetting.ons_client_code;
                QueueOnsProducer.CreatePushConsumer(new ONSPropertyConfigModel()
                {
                    AccessKey = ons_access_key,
                    SecretKey = ons_secret_key,
                    Topics = ons_topic,
                    GroupId = ons_groupId,
                    NAMESRV_ADDR = ons_name_srv,
                    OnsClientCode = ons_client_code,
                });
                //设置消费者
                QueueOnsProducer.SetPushConsumer(new ConsumerStartup(ons_client_code, ons_groupId), "*");
                //启动消费者
                QueueOnsProducer.StartPushConsumer();
    

    4.5-接收消费消息

    我们如果要创建一个具体消费者去消费某一条消息,需要先创建一个类,然后实现【IConsumerMsg】接口中的【Consume】方法。需要在这个方法上面标注两个特性,也可以是一个(意味着满足一个条件即可),一个是【ConsumerTag】Tag标签,表示要消费哪个生产的Tag标签,一个是【EventType】,表示要消费哪个生产的事件类型。如果有多个不同的消费者,就按照上面的方式创建多个即可。[西瓜程序猿]这边创建一个名为【SampleConsumer】的类作为例子。
    image.png
    代码:

     /// 
        /// 消费者Sample
        /// 
        [ConsumerTag(QueueTagConsts.XG_Blog_Sample_Tag)]
        [EventType(QueueOnsEventType.RocketMQ_TEST)]
        public class SampleConsumer :  IConsumerMsg
        {
            private readonly static ILog logger = LogManager.GetLogger(typeof(SampleConsumer));
    
            public void Consume(QueueOnsCommonModel model)
            {
                logger.Info($"【西瓜程序猿-消费者Sample】:测试消费者进来了");
                if (model != null)
                {
                    Console.WriteLine("tag:" + model.Tag);
                    Console.WriteLine("body" + model.Body);
                }
                Console.WriteLine("【西瓜程序猿-消费者Sample】消费成功了!");
            }
        }
    

    五、发布消费端

    然后来介绍一下如何部署消费端。之前看评论区说使用NSSM部署安装Window服务更方便,后面我也试了一下确实还挺好用,但是针对目前这个程序始终运行不起来(各位大佬如果有更好的方法和建议可以在评论区提出来哈),所以这次还是用之前的方法来介绍如何部署Windows服务了。

    5.1-服务基本配置

    (1)点击我们的服务【RocketMQConsumerService.cs】,然后右击点击【添加安装程序】。
    image.png

    (2)然后可以看到下面多出来了一个文件,就是安装程序。
    image.png
    image.png

    (3)然后可以修改基本信息,服务组件中的【服务名称】【服务描述】等等。我们右击【serviceInstall1】点击属性,然后进行修改。
    image.png
    image.png

    (4)然后点击【serviceProcessInstall1】右击属性,进行修改。
    image.png
    image.png


    5.2-服务运行与发布

    当我们直接按F5或者其他方式直接运行项目时,会提示:"无法从命令行或调试程序启动服务。必须首先安装 Windows服务(使用installutil.exe),然后用ServerExplorer、Windows服务管理工具或 NET START命令启动它。"。不是这样运行的,跟着下面步骤来操作运行与发布Windows服务吧。
    image.png
    前提注意:如果你设置的目标平台是x64,打开的目录会不一样,不然导致服务运行不起来。可以右击项目名,点击【属性】——>【生成】——>【目标平台】查看。
    image.png

    如果不是x64版本,复制这个地址:

    C:\Windows\Microsoft.NET\Framework\v4.0.30319

    如果是x64版本,复制这个地址:

    C:\Windows\Microsoft.NET\Framework64\v4.0.30319

    不然会报类似这种错误:在初始化安装时发生异常: System.BadImageFormatException: 未能加载文件或程序集...
    (1)然后我们把上面的地址(根据自己的环境选择)添加到环境变量中。点击【控制面板】——>【系统和安全】
    image.png

    (2)然后点击【系统】
    image.png

    (3)点击【高级系统设置】
    image.png

    (4)点击【环境变量】
    image.png

    (5)在【系统变量】中找到Path,然后点击【编辑】。
    image.png

    (6)然后点击【新建】,然后把我们拷贝的目录复制到这里。然后点击确认即可。
    image.png

    (7)测试是否配置成功,输入这个命令查看一下【InstallUtil】,如果是下面这样的内容说明成功了。
    image.png

    (8)然后编辑解决方案和项目。
    image.png

    (9)以管理员身份运行cmd命令,然后安装服务。

    InstallUtil 项目启动执行文件全路径

    西瓜程序猿的例子:

    InstallUtil D:\项目演示临时保存\MyDemoService\MyDemoService\bin\Debug\MyDemoService.exe

    image.png

    (10)出现这个说明安装成功了。
    image.png

    (11)打开服务管理器,找到要启动的服务,然后右击启动服务。
    image.png

    (12)如果要卸载服务,可以运行这个命令:

    InstallUtil /u 项目启动执行文件全路径

    西瓜程序猿的例子:

    InstallUtil /u D:\项目演示临时保存\MyDemoService\MyDemoService\bin\Debug\MyDemoService.exe

    image.png


    5.3-常见命令

    1、安装服务:InstallUtil 项目启动执行文件全路径
    2、启动服务:net start 服务名
    3、停止服务:net stop 服务名
    4、卸载服务:InstallUtil /u 项目启动执行文件全路径


    5.4-测试消费消息

    (1)首先可以先看一下日志,看一下这个消费者服务是否启动成功了。
    image.png
    (2)然后再日志里面记录下消费的消费,在根据消息Key或者消息ID在阿里云后台查询一下这一条消息的【消息轨迹】,如果提示消费成功就说明确实已经进行消费了。
    image.png
    最后,还有可能会出现消息生产失败、消息消费失败等场景,大佬们可以根据实际情况进行设计和跳转哈。


    六、防踩坑指南

    5.1:ons.ONSClient4CPPPINVOKE的类型初始值设定项引发异常

    异常详情:

    “ons.ONSClient4CPPPINVOKE”的类型初始值设定项引发异常。

    解决方案:
    第一步:在使用Visual Studio(VS)开发.NET的应用程序和类库时,默认的目标平台为“Any CPU”。但是.NET SDK仅支持Windows 64-bit操作系统,所以需要自行设置。先右击【RocketMQ.Producer】项目,然后点击【属性】,点击左侧选项的【生成】,然后将目标平台改为【x64】。
    image.png
    第二步:将资源包【ONSClient4CPP】文件夹里面所有的文件,复制到【bin】目录下。
    image.png


    5.2:Topic Route does not exist

    异常详情:

    Topic Route does not exist, Topic:XG_CXY_Test exception:msg: No route info of this topic, ,error:-1,in file <..\src\producer\DefaultMQProducer.cpp> line:581
    See https://github.com/alibaba/ons/issues/7 for further details.”

    异常截图:
    image.png
    解决方案:
    这个问题一般是没有链接上RocketMQ,检查一下配置文件中信息是否与RocketMQ信息一致。尤其是[ons_name_srv] RocketMQ 版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”切记不要加"http://或者https😕/"。我这边也经常遇到这个问题,因为只设置了【VPC专有网络】访问,也就是说在本地访问不了RocketMQ,只能在服务器内网访问。


    5.3:官方文档仅作为参考

    吐槽一下云消息队列RocketMQ(阿里云版)的官方文档,看下来对.NET开发同学不太友好,仅指的是老版本。看文档做开发的时候看了很多有时候发现下载下来的案例代码和文档上线写的代码不一致。然后文档上面看的是.NET SDK示例代码,出现的却是别的 C++语言代码。
    image.png


    我是西瓜程序猿,感谢大家的阅读和支持。编写不易,如果对大家有帮助,用您发财的小手点个赞和关注呗,非常感谢!有问题欢迎联系我一起学习与探讨~


    上一章节:https://www.cnblogs.com/kimiliucn/p/17662052.html

    原文链接:https://www.cnblogs.com/kimiliucn/p/17667200.html


    __EOF__

  • 本文作者: 西瓜程序猿
  • 本文链接: https://www.cnblogs.com/kimiliucn/p/17667200.html
  • 关于博主: 评论和私信会在第一时间回复。或者直接私信我。
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。
  • 相关阅读:
    采购信息记录(PIR)创建/修改的正确姿势
    【开发心得】记录一次自定义starter的实现
    清空flowable的表定义的相关表
    vue和react项目中实现 px 转 vm
    《Linux 内核设计与实现》13. 虚拟文件系统
    【Python 实战基础】Pandas 如何统计某个数据列的空值个数
    vue/html input 读取 json数据
    卧式铣床升降台主传动系统设计(说明书+翻译及原文+cad图纸+proe三维图纸)
    ZFS了解
    漏电断路器
  • 原文地址:https://www.cnblogs.com/kimiliucn/p/17667200.html