上一章我们把系统所需要的MongoDB集合设计好了,这一章我们的主要任务是使用.NET Core应用程序连接MongoDB并且封装MongoDB数据仓储和工作单元模式,因为本章内容涵盖的有点多关于仓储和工作单元的使用就放到下一章节中讲解了。仓储模式(Repository )带来的好处是一套代码可以适用于多个类,把常用的CRUD通用方法抽象出来通过接口形式集中管理,从而解除业务逻辑层与数据访问层之间的耦合,使业务逻辑层在存储、访问数据库时无须关心数据的来源及存储方式。工作单元模式(UnitOfWork)它是用来维护一个由已经被业务修改(如增加、删除和更新等)的业务对象组成的列表,跨多个请求的业务,统一管理事务,统一提交从而保障事物一致性的作用。
MongoDB从入门到实战之Docker快速安装MongoDB👉
MongoDB从入门到实战之MongoDB工作常用操作命令👉
MongoDB从入门到实战之.NET Core使用MongoDB开发ToDoList系统(1)-后端项目框架搭建👉
MongoDB从入门到实战之.NET Core使用MongoDB开发ToDoList系统(2)-Swagger框架集成👉
MongoDB从入门到实战之.NET Core使用MongoDB开发ToDoList系统(3)-系统数据集合设计👉
MongoDB从入门到实战之.NET Core使用MongoDB开发ToDoList系统(4)-MongoDB数据仓储和工作单元模式封装👉
欢迎各位看官老爷review,有帮助的别忘了给我个Star哦💖!!!
MongoRepository地址:https://github.com/YSGStudyHards/YyFlight.ToDoList/tree/main/Repository/Repository
说明:
MongoDB单机服务器不支持事务【使用MongoDB事务会报错:Standalone servers do not support transactions】,只有在集群情况下才支持事务,因为博主接下来都是在单机环境下操作,所以无法来演示Mongo事务操作,但是方法都已经是封装好了的,大家可以自己搭建集群实操。
原因:
MongoDB在使用分布式事务时需要进行多节点之间的协调和通信,而单机环境下无法实现这样的分布式协调和通信机制。但是,在MongoDB部署为一个集群(cluster)后,将多个计算机连接为一个整体,通过协调和通信机制实现了分布式事务的正常使用。从数据一致性和可靠性的角度来看,在分布式系统中实现事务处理是至关重要的。而在单机环境下不支持事务,只有在集群情况下才支持事务的设计方式是为了保证数据一致性和可靠性,并且也符合分布式系统的设计思想。
Install-Package MongoDB.Driver
前往appsettings.json文件中配置Mongo数据库信息:
"MongoSettings": { "Connection": "mongodb://root:123456@local:27017/yyflight_todolist?authSource=admin", //MongoDB连接字符串 "DatabaseName": "yyflight_todolist" //MongoDB数据库名称 }
基于MongoDB的最佳实践对于MongoClient最好设置为单例注入,因为在MongoDB.Driver中MongoClient已经被设计为线程安全可以被多线程共享,这样可还以避免反复实例化MongoClient带来的开销,避免在极端情况下的性能低下。
我们这里设计一个MongoConnection类,用于包裹这个MongoClient,然后将其以单例模式注入IoC容器中。
public interface IMongoConnection { public MongoClient MongoDBClient { get; set; } public IMongoDatabase DatabaseName { get; set; } }
public class MongoConnection : IMongoConnection { //基于MongoDB的最佳实践对于MongoClient最好设置为单例注入,因为在MongoDB.Driver中MongoClient已经被设计为线程安全可以被多线程共享,这样可还以避免反复实例化MongoClient带来的开销,避免在极端情况下的性能低下。 //我们这里设计一个MongoConnection类,用于包裹这个MongoClient,然后将其以单例模式注入IoC容器中。 public MongoClient MongoDBClient { get; set; } public IMongoDatabase DatabaseName { get; set; } public MongoConnection(IConfiguration configuration) { MongoDBClient = new MongoClient(configuration["MongoSettings:Connection"]); DatabaseName = MongoDBClient.GetDatabase(configuration["MongoSettings:DatabaseName"]); } }
现在我们将定义MongoDB DBContext上下文类,具体到一个业务对象或需要被持久化的对象,这个上下文类将封装数据库的连接和集合。
该类应负责建立与所需数据库的连接,在建立连接后,该类将在内存中或按请求持有数据库上下文(基于API管道中配置的生命周期管理。)
public interface IMongoContext : IDisposable { ////// 添加命令操作 /// /// 委托 方法接受一个 Func委托作为参数,该委托表示一个需要 IClientSessionHandle 对象作为参数并返回一个异步任务的方法 /// Task AddCommandAsync(Func func); /// /// 提交更改并返回受影响的行数 /// TODO:MongoDB单机服务器不支持事务【使用MongoDB事务会报错:Standalone servers do not support transactions】,只有在集群情况下才支持事务 /// 原因:MongoDB在使用分布式事务时需要进行多节点之间的协调和通信,而单机环境下无法实现这样的分布式协调和通信机制。但是,在MongoDB部署为一个集群(cluster)后,将多个计算机连接为一个整体,通过协调和通信机制实现了分布式事务的正常使用。从数据一致性和可靠性的角度来看,在分布式系统中实现事务处理是至关重要的。而在单机环境下不支持事务,只有在集群情况下才支持事务的设计方式是为了保证数据一致性和可靠性,并且也符合分布式系统的设计思想。 /// /// MongoDB 会话(session)对象 ///Task SaveChangesAsync(IClientSessionHandle session); /// /// 初始化Mongodb会话对象session /// ///Task StartSessionAsync(); /// /// 获取集合数据 /// ////// /// IMongoCollection GetCollection (string name); }
public class MongoContext : IMongoContext { private readonly IMongoDatabase _databaseName; private readonly MongoClient _mongoClient; //这里将 _commands 中的每个元素都定义为一个 Func委托,此委托表示一个需要 IClientSessionHandle 对象作为参数并返回一个异步任务的方法 //每个委托都表示一个MongoDB 会话(session)对象和要执行的命令 private readonly List > _commands = new List >(); public MongoContext(IMongoConnection mongoConnection) { _mongoClient = mongoConnection.MongoDBClient; _databaseName = mongoConnection.DatabaseName; } /// /// 添加命令操作 /// /// 方法接受一个 Func委托作为参数,该委托表示一个需要 IClientSessionHandle 对象作为参数并返回一个异步任务的方法 /// public async Task AddCommandAsync(Func func) { _commands.Add(func); await Task.CompletedTask; } /// /// 提交更改并返回受影响的行数 /// TODO:MongoDB单机服务器不支持事务【使用MongoDB事务会报错:Standalone servers do not support transactions】,只有在集群情况下才支持事务 /// 原因:MongoDB在使用分布式事务时需要进行多节点之间的协调和通信,而单机环境下无法实现这样的分布式协调和通信机制。但是,在MongoDB部署为一个集群(cluster)后,将多个计算机连接为一个整体,通过协调和通信机制实现了分布式事务的正常使用。从数据一致性和可靠性的角度来看,在分布式系统中实现事务处理是至关重要的。而在单机环境下不支持事务,只有在集群情况下才支持事务的设计方式是为了保证数据一致性和可靠性,并且也符合分布式系统的设计思想。 /// /// MongoDB 会话(session)对象 ///public async Task SaveChangesAsync(IClientSessionHandle session) { try { session.StartTransaction();//开始事务 foreach (var command in _commands) { await command(session); //语句实现了对事务中所有操作的异步执行,并等待它们完成。如果没有错误发生,程序会继续执行session.CommitTransactionAsync();方法,将之前进行的所有更改一起提交到MongoDB服务器上,从而实现事务提交。 } await session.CommitTransactionAsync();//提交事务 return _commands.Count; } catch (Exception ex) { await session.AbortTransactionAsync();//回滚事务 return 0; } finally { _commands.Clear();//清空_commands列表中的元素 } } /// /// 初始化Mongodb会话对象session /// ///public async Task StartSessionAsync() { var session = await _mongoClient.StartSessionAsync(); return session; } /// /// 获取MongoDB集合 /// ////// 集合名称 /// public IMongoCollection GetCollection (string name) { return _databaseName.GetCollection (name); } /// /// 释放上下文 /// public void Dispose() { GC.SuppressFinalize(this); } }
Repository(仓储)是DDD(领域驱动设计)中的经典思想,可以归纳为介于实际业务层(领域层)和数据访问层之间的层,能让领域层能在感觉不到数据访问层的情况下,完成与数据库的交互和以往的DAO(数据访问)层相比,Repository层的设计理念更偏向于面向对象,而淡化直接对数据表进行的CRUD操作。
定义一个泛型Repository通用接口,抽象常用的增加,删除,修改,查询等操作方法。
public interface IMongoRepositorywhere T : class, new() { #region 事务操作示例 /// /// 事务添加数据 /// /// MongoDB 会话(session)对象 /// 添加数据 ///Task AddTransactionsAsync(IClientSessionHandle session, T objData); /// /// 事务数据删除 /// /// MongoDB 会话(session)对象 /// objectId ///Task DeleteTransactionsAsync(IClientSessionHandle session, string id); /// /// 事务异步局部更新(仅更新一条记录) /// /// MongoDB 会话(session)对象 /// 过滤器 /// 更新条件 ///Task UpdateTransactionsAsync(IClientSessionHandle session, FilterDefinition filter, UpdateDefinition update); #endregion #region 添加相关操作 /// /// 添加数据 /// /// 添加数据 ///Task AddAsync(T objData); /// /// 批量插入 /// /// 实体集合 ///Task InsertManyAsync(List objDatas); #endregion #region 删除相关操作 /// /// 数据删除 /// /// objectId ///Task DeleteAsync(string id); /// /// 异步删除多条数据 /// /// 删除的条件 ///Task DeleteManyAsync(FilterDefinition filter); #endregion #region 修改相关操作 /// /// 指定对象异步修改一条数据 /// /// 要修改的对象 /// 修改条件 ///Task UpdateAsync(T obj, string id); /// /// 局部更新(仅更新一条记录) /// /// 筛选条件 /// 更新条件 ///x.Id == 1 && x.Age > 18 && x.Gender == 0]]> ///new T{ RealName = "Ray", Gender = 1}]]> ///Task UpdateAsync(Expression > expression, Expression > entity); /// /// 异步局部更新(仅更新一条记录) /// /// 过滤器 /// 更新条件 ///Task UpdateAsync(FilterDefinition filter, UpdateDefinition update); /// /// 异步局部更新(仅更新多条记录) /// /// 筛选条件 /// 更新条件 ///Task UpdateManyAsync(Expression > expression, UpdateDefinition update); /// /// 异步批量修改数据 /// /// 要修改的字段 /// 更新条件 ///Task UpdateManayAsync(Dictionary dic, FilterDefinition filter); #endregion #region 查询统计相关操作 /// /// 通过ID主键获取数据 /// /// objectId ///Task GetByIdAsync(string id); /// /// 获取所有数据 /// ///Task > GetAllAsync(); /// /// 获取记录数 /// /// 筛选条件 ///Task CountAsync(Expression > expression); /// /// 获取记录数 /// /// 过滤器 ///Task CountAsync(FilterDefinition filter); /// /// 判断是否存在 /// /// 条件 ///Task ExistsAsync(Expression > predicate); /// /// 异步查询集合 /// /// 查询条件 /// 要查询的字段,不写时查询全部 /// 要排序的字段 ///Task > FindListAsync(FilterDefinition
filter, string[]? field = null, SortDefinition ? sort = null); /// /// 异步分页查询集合 /// /// 查询条件 /// 当前页 /// 页容量 /// 要查询的字段,不写时查询全部 /// 要排序的字段 ///Task > FindListByPageAsync(FilterDefinition
filter, int pageIndex, int pageSize, string[]? field = null, SortDefinition ? sort = null); #endregion }
public class MongoBaseRepository: IMongoRepository where T : class, new() { protected readonly IMongoContext _context; protected readonly IMongoCollection _dbSet; private readonly string _collectionName; protected MongoBaseRepository(IMongoContext context) { _context = context; _collectionName = typeof(T).GetAttributeValue((TableAttribute m) => m.Name) ?? typeof(T).Name; _dbSet = _context.GetCollection (_collectionName); } #region 事务操作示例 /// /// 事务添加数据 /// /// MongoDB 会话(session)对象 /// 添加数据 ///public async Task AddTransactionsAsync(IClientSessionHandle session, T objData) { await _context.AddCommandAsync(async (session) => await _dbSet.InsertOneAsync(objData)); } /// /// 事务数据删除 /// /// MongoDB 会话(session)对象 /// objectId ///public async Task DeleteTransactionsAsync(IClientSessionHandle session, string id) { await _context.AddCommandAsync((session) => _dbSet.DeleteOneAsync(Builders .Filter.Eq(" _id ", id))); } /// /// 事务异步局部更新(仅更新一条记录) /// /// MongoDB 会话(session)对象 /// 过滤器 /// 更新条件 ///public async Task UpdateTransactionsAsync(IClientSessionHandle session, FilterDefinition filter, UpdateDefinition update) { await _context.AddCommandAsync((session) => _dbSet.UpdateOneAsync(filter, update)); } #endregion #region 添加相关操作 /// /// 添加数据 /// /// 添加数据 ///public async Task AddAsync(T objData) { await _dbSet.InsertOneAsync(objData); } /// /// 批量插入 /// /// 实体集合 ///public async Task InsertManyAsync(List objDatas) { await _dbSet.InsertManyAsync(objDatas); } #endregion #region 删除相关操作 /// /// 数据删除 /// /// objectId ///public async Task DeleteAsync(string id) { await _dbSet.DeleteOneAsync(Builders .Filter.Eq("_id", new ObjectId(id))); } /// /// 异步删除多条数据 /// /// 删除的条件 ///public async Task DeleteManyAsync(FilterDefinition filter) { return await _dbSet.DeleteManyAsync(filter); } #endregion #region 修改相关操作 /// /// 指定对象异步修改一条数据 /// /// 要修改的对象 /// 修改条件 ///public async Task UpdateAsync(T obj, string id) { //修改条件 FilterDefinition filter = Builders .Filter.Eq("_id", new ObjectId(id)); //要修改的字段 var list = new List >(); foreach (var item in obj.GetType().GetProperties()) { if (item.Name.ToLower() == "id") continue; list.Add(Builders .Update.Set(item.Name, item.GetValue(obj))); } var updatefilter = Builders .Update.Combine(list); await _dbSet.UpdateOneAsync(filter, updatefilter); } /// /// 局部更新(仅更新一条记录) /// /// 筛选条件 /// 更新条件 ///x.Id == 1 && x.Age > 18 && x.Gender == 0]]> ///new T{ RealName = "Ray", Gender = 1}]]> ///public async Task UpdateAsync(Expression > expression, Expression > entity) { var fieldList = new List >(); if (entity.Body is MemberInitExpression param) { foreach (var item in param.Bindings) { var propertyName = item.Member.Name; object propertyValue = null; if (item is not MemberAssignment memberAssignment) continue; if (memberAssignment.Expression.NodeType == ExpressionType.Constant) { if (memberAssignment.Expression is ConstantExpression constantExpression) propertyValue = constantExpression.Value; } else { propertyValue = Expression.Lambda(memberAssignment.Expression, null).Compile().DynamicInvoke(); } if (propertyName != "_id") //实体键_id不允许更新 { fieldList.Add(Builders .Update.Set(propertyName, propertyValue)); } } } await _dbSet.UpdateOneAsync(expression, Builders .Update.Combine(fieldList)); } /// /// 异步局部更新(仅更新一条记录) /// /// 过滤器 /// 更新条件 ///public async Task UpdateAsync(FilterDefinition filter, UpdateDefinition update) { await _dbSet.UpdateOneAsync(filter, update); } /// /// 异步局部更新(仅更新多条记录) /// /// 筛选条件 /// 更新条件 ///public async Task UpdateManyAsync(Expression > expression, UpdateDefinition update) { await _dbSet.UpdateManyAsync(expression, update); } /// /// 异步批量修改数据 /// /// 要修改的字段 /// 更新条件 ///public async Task UpdateManayAsync(Dictionary dic, FilterDefinition filter) { T t = new T(); //要修改的字段 var list = new List >(); foreach (var item in t.GetType().GetProperties()) { if (!dic.ContainsKey(item.Name)) continue; var value = dic[item.Name]; list.Add(Builders .Update.Set(item.Name, value)); } var updatefilter = Builders .Update.Combine(list); return await _dbSet.UpdateManyAsync(filter, updatefilter); } #endregion #region 查询统计相关操作 /// /// 通过ID主键获取数据 /// /// objectId ///public async Task GetByIdAsync(string id) { var queryData = await _dbSet.FindAsync(Builders .Filter.Eq("_id", new ObjectId(id))); return queryData.FirstOrDefault(); } /// /// 获取所有数据 /// ///public async Task > GetAllAsync() { var queryAllData = await _dbSet.FindAsync(Builders .Filter.Empty); return queryAllData.ToList(); } /// /// 获取记录数 /// /// 筛选条件 ///public async Task CountAsync(Expression > expression) { return await _dbSet.CountDocumentsAsync(expression); } /// /// 获取记录数 /// /// 过滤器 ///public async Task CountAsync(FilterDefinition filter) { return await _dbSet.CountDocumentsAsync(filter); } /// /// 判断是否存在 /// /// 条件 ///public async Task ExistsAsync(Expression > predicate) { return await Task.FromResult(_dbSet.AsQueryable().Any(predicate)); } /// /// 异步查询集合 /// /// 查询条件 /// 要查询的字段,不写时查询全部 /// 要排序的字段 ///public async Task > FindListAsync(FilterDefinition
filter, string[]? field = null, SortDefinition ? sort = null) { //不指定查询字段 if (field == null || field.Length == 0) { if (sort == null) return await _dbSet.Find(filter).ToListAsync(); return await _dbSet.Find(filter).Sort(sort).ToListAsync(); } //指定查询字段 var fieldList = new List >(); for (int i = 0; i < field.Length; i++) { fieldList.Add(Builders .Projection.Include(field[i].ToString())); } var projection = Builders .Projection.Combine(fieldList); fieldList?.Clear(); //不排序 if (sort == null) return await _dbSet.Find(filter).Project (projection).ToListAsync(); //排序查询 return await _dbSet.Find(filter).Sort(sort).Project (projection).ToListAsync(); } /// /// 异步分页查询集合 /// /// 查询条件 /// 当前页 /// 页容量 /// 要查询的字段,不写时查询全部 /// 要排序的字段 ///public async Task > FindListByPageAsync(FilterDefinition
filter, int pageIndex, int pageSize, string[]? field = null, SortDefinition ? sort = null) { //不指定查询字段 if (field == null || field.Length == 0) { if (sort == null) return await _dbSet.Find(filter).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); //进行排序 return await _dbSet.Find(filter).Sort(sort).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); } //指定查询字段 var fieldList = new List >(); for (int i = 0; i < field.Length; i++) { fieldList.Add(Builders .Projection.Include(field[i].ToString())); } var projection = Builders .Projection.Combine(fieldList); fieldList?.Clear(); //不排序 if (sort == null) return await _dbSet.Find(filter).Project (projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); //排序查询 return await _dbSet.Find(filter).Sort(sort).Project (projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); } #endregion }
工作单元模式是“维护一个被业务事务影响的对象列表,协调变化的写入和并发问题的解决”。具体来说,在C#工作单元模式中,我们通过UnitOfWork对象来管理多个Repository对象,同时UnitOfWork还提供了对事务的支持。对于一组需要用到多个Repository的业务操作,我们可以在UnitOfWork中创建一个事务,并将多个Repository操作放在同一个事务中处理,以保证数据的一致性。当所有Repository操作完成后,再通过UnitOfWork提交事务或者回滚事务。
////// 工作单元接口 /// public interface IUnitOfWork : IDisposable { ////// 提交保存更改 /// /// MongoDB 会话(session)对象 ///Task Commit(IClientSessionHandle session); /// /// 初始化MongoDB会话对象session /// ///Task InitTransaction(); }
////// 工作单元类 /// public class UnitOfWork : IUnitOfWork { private readonly IMongoContext _context; public UnitOfWork(IMongoContext context) { _context = context; } ////// 提交保存更改 /// /// MongoDB 会话(session)对象 ///public async Task Commit(IClientSessionHandle session) { return await _context.SaveChangesAsync(session) > 0; } /// /// 初始化MongoDB会话对象session /// ///public async Task InitTransaction() { return await _context.StartSessionAsync(); } public void Dispose() { _context.Dispose(); } }
//注册数据库基础操作和工作单元 builder.Services.AddSingleton(); builder.Services.AddScoped (); builder.Services.AddScoped (); builder.Services.AddScoped ();
NoSQL – MongoDB Repository Implementation in .NET Core with Unit Testing example