1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
| // RabbitListener.cs 这个是基类,只实现注册RabbitMQ后到监听消息,然后每个消费者自己去重写RouteKey/QueueName/消息处理函数Process
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Test.Listener
{
public class RabbitListener : IHostedService
{
private readonly IConnection connection;
private readonly IModel channel;
public RabbitListener(IOptions<AppConfiguration> options)
{
try
{
var factory = new ConnectionFactory()
{
// 这是我这边的配置,自己改成自己用就好
HostName = options.Value.RabbitHost,
UserName = options.Value.RabbitUserName,
Password = options.Value.RabbitPassword,
Port = options.Value.RabbitPort,
};
this.connection = factory.CreateConnection();
this.channel = connection.CreateModel();
}
catch (Exception ex)
{
Console.WriteLine($"RabbitListener init error,ex:{ex.Message}");
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
Register();
return Task.CompletedTask;
}
protected string RouteKey;
protected string QueueName;
// 处理消息的方法
public virtual bool Process(string message)
{
throw new NotImplementedException();
}
// 注册消费者监听在这里
public void Register()
{
Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}");
channel.ExchangeDeclare(exchange: "message", type: "topic");
channel.QueueDeclare(queue:QueueName, exclusive: false);
channel.QueueBind(queue: QueueName,
exchange: "message",
routingKey: RouteKey);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var result = Process(message);
if (result)
{
channel.BasicAck(ea.DeliveryTag, false);
}
};
channel.BasicConsume(queue: QueueName, consumer: consumer);
}
public void DeRegister()
{
this.connection.Close();
}
public Task StopAsync(CancellationToken cancellationToken)
{
this.connection.Close();
return Task.CompletedTask;
}
}
}
// 随便贴一个子类
using System;
using System.Text;
using Microsoft.Extensions.Options;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
namespace Test.Listener
{
public class ChapterLister : RabbitListener
{
private readonly ILogger<RabbitListener> _logger;
// 因为Process函数是委托回调,直接将其他Service注入的话两者不在一个scope,
// 这里要调用其他的Service实例只能用IServiceProvider CreateScope后获取实例对象
private readonly IServiceProvider _services;
public ChapterLister(IServiceProvider services, IOptions<AppConfiguration> options,
ILogger<RabbitListener> logger) : base(options)
{
base.RouteKey = "done.task";
base.QueueName = "lemonnovelapi.chapter";
_logger = logger;
_services = services;
}
public override bool Process(string message)
{
var taskMessage = JToken.Parse(message);
if (taskMessage == null)
{
// 返回false 的时候回直接驳回此消息,表示处理不了
return false;
}
try
{
using (var scope = _services.CreateScope())
{
var xxxService = scope.ServiceProvider.GetRequiredService<XXXXService>();
return true;
}
}
catch (Exception ex)
{
_logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}");
_logger.LogError(-1, ex, "Process fail");
return false;
}
}
}
}
|