Surging集成SuperSocket预发布版本2.0的实践与问题
笔记哥 /
04-14 /
43点赞 /
0评论 /
438阅读
# 一、概述
周末在家试着扩展SuperSocket,因为之前都是只支持.net framework, 后面出现支持.NET CORE 的SuperSocket 2.0 ,然后集成进来和dotnetty 做下对比,dotnetty 有多强,我压测可以支持20w/s, 然后客户提供的服务器,通过外网压测网关,把上行速度50MB带宽的网络跑满了,引擎主机CPU只是在15%左右,完全没有跑满。然后再试试国人开发的SuperSocket看下性能怎么样。


木舟 (Kayak) 是什么?
木舟(Kayak)是基于.NET6.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。
凯亚物联网平台: 密码:123456)(木舟物联网有人取了,准备改名原神凯亚,凡是交托于他的任务,总能得到解决)
链路跟踪Skywalking V8:
surging 微服务引擎开源地址: 会移动到microsurging进行维护)
# 二、集成SuperSocket
作为去中心化的微服务引擎,相关的引擎组件,中间件都可以替换,就比如核心的RPC组件dotnetty 都可以替换成其它组件,下面介绍如何进行替换
**创建服务端消息监听SuperSocketServerMessageListener,需要继承IMessageListener,代码如下:**
```csharp
public class SuperSocketServerMessageListener : IMessageListener, IDisposable
{
public event ReceivedDelegate Received;
private readonly ILogger _logger;
private readonly ITransportMessageDecoder _transportMessageDecoder;
private readonly ITransportMessageEncoder _transportMessageEncoder;
private readonly IServiceEngineLifetime _serviceEngineLifetime;
public SuperSocketServerMessageListener(ILogger logger, ITransportMessageCodecFactory codecFactory, IServiceEngineLifetime serviceEngineLifetime)
{
_logger = logger;
_transportMessageEncoder = codecFactory.GetEncoder();
_transportMessageDecoder = codecFactory.GetDecoder();
_serviceEngineLifetime = serviceEngineLifetime;
}
public async Task StartAsync(EndPoint endPoint)
{
_serviceEngineLifetime.ServiceEngineStarted.Register(async () =>
{
try
{
var ipEndPoint = endPoint as IPEndPoint;
var host = SuperSocketHostBuilder.Create()
.UsePackageHandler( (s, p) =>
{
Task.Run(async () =>
{
var sender = new SuperSocketServerMessageSender(_transportMessageEncoder, s);
await OnReceived(sender, p);
});
return ValueTask.CompletedTask;
})
.ConfigureSuperSocket(options =>
{
options.Name = "Echo Server";
options.Logger = _logger;
options.AddListener(new ListenOptions
{
Ip = ipEndPoint.Address.ToString(),
Port = ipEndPoint.Port,
}
);
})
.ConfigureLogging((hostCtx, loggingBuilder) =>
{
loggingBuilder.AddConsole();
})
.Build();
await host.RunAsync();
}
catch (Exception ex)
{
_logger.LogError($"SuperSocket服务主机启动失败,监听地址:{endPoint}。 ");
}
});
}
public async Task OnReceived(IMessageSender sender, TransportMessage message)
{
if (Received == null)
return;
await Received(sender, message);
}
public void Dispose()
{
}
}
```
**创建客户端消息监听SuperSocketTransportClientFactory,需要继承ITransportClientFactory,代码如下:**
```csharp
internal class SuperSocketTransportClientFactory : ITransportClientFactory, IDisposable
{
private readonly ITransportMessageEncoder _transportMessageEncoder;
private readonly ITransportMessageDecoder _transportMessageDecoder;
private readonly ILogger _logger;
private readonly IServiceExecutor _serviceExecutor;
private readonly IHealthCheckService _healthCheckService;
private readonly ConcurrentDictionary>> _clients = new ConcurrentDictionary>>();
public SuperSocketTransportClientFactory(ITransportMessageCodecFactory codecFactory, IHealthCheckService healthCheckService, ILogger logger)
: this(codecFactory, healthCheckService, logger, null)
{
}
public SuperSocketTransportClientFactory(ITransportMessageCodecFactory codecFactory, IHealthCheckService healthCheckService, ILogger logger, IServiceExecutor serviceExecutor)
{
_transportMessageEncoder = codecFactory.GetEncoder();
_transportMessageDecoder = codecFactory.GetDecoder();
_logger = logger;
_serviceExecutor = serviceExecutor;
_healthCheckService = healthCheckService;
}
public async Task CreateClientAsync(EndPoint endPoint)
{
var key = endPoint;
if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug($"准备为服务端地址:{key}创建客户端。");
try
{
return await _clients.GetOrAdd(key
, k => new Lazy>(async () =>
{
//客户端对象
var client = new EasyClient(new TransportMessagePipelineFilter()).AsClient();
var messageListener = new MessageListener();
var messageSender = new SuperSocketMessageClientSender(_transportMessageEncoder, client);
await client.ConnectAsync(endPoint);
client.PackageHandler += async (sender, package) =>
{
await messageListener.OnReceived(messageSender, package);
};
client.StartReceive();
//创建客户端
var transportClient = new TransportClient(messageSender, messageListener, _logger, _serviceExecutor);
return transportClient;
}
)).Value;//返回实例
}
catch
{
//移除
_clients.TryRemove(key, out var value);
var ipEndPoint = endPoint as IPEndPoint;
//标记这个地址是失败的请求
if (ipEndPoint != null)
await _healthCheckService.MarkFailure(new IpAddressModel(ipEndPoint.Address.ToString(), ipEndPoint.Port));
throw;
}
}
public void Dispose()
{
foreach (var client in _clients.Values)
{
(client as IDisposable)?.Dispose();
}
}
}
```
**注册初始化SuperSocket引擎模块,需要继承EnginePartModule, 代码如下:**
```csharp
public class SuperSocketModule : EnginePartModule
{
public override void Initialize(AppModuleContext context)
{
base.Initialize(context);
}
///
/// Inject dependent third-party components
///
///
protected override void RegisterBuilder(ContainerBuilderWrapper builder)
{
base.RegisterBuilder(builder);
builder.Register(provider =>
{
IServiceExecutor serviceExecutor = null;
if (provider.IsRegistered(typeof(IServiceExecutor)))
serviceExecutor = provider.Resolve();
return new SuperSocketTransportClientFactory(provider.Resolve(),
provider.Resolve(),
provider.Resolve>(),
serviceExecutor);
}).As(typeof(ITransportClientFactory)).SingleInstance();
if (AppConfig.ServerOptions.Protocol == CommunicationProtocol.Tcp ||
AppConfig.ServerOptions.Protocol == CommunicationProtocol.None)
{
RegisterDefaultProtocol(builder);
}
}
private void RegisterDefaultProtocol(ContainerBuilderWrapper builder)
{
builder.Register(provider =>
{
return new SuperSocketServerMessageListener(provider.Resolve>(),
provider.Resolve(),
provider.Resolve());
}).SingleInstance();
builder.Register(provider =>
{
var serviceExecutor = provider.ResolveKeyed(CommunicationProtocol.Tcp.ToString());
var messageListener = provider.Resolve();
return new DefaultServiceHost(async endPoint =>
{
await messageListener.StartAsync(endPoint);
return messageListener;
}, serviceExecutor);
}).As();
}
}
```
**客户端服务端消息发送,需要继承IMessageSender, 代码如下:**
```csharp
public abstract class SuperSocketMessageSender
{
private readonly ITransportMessageEncoder _transportMessageEncoder;
protected SuperSocketMessageSender(ITransportMessageEncoder transportMessageEncoder)
{
_transportMessageEncoder = transportMessageEncoder;
}
protected byte[] GetByteBuffer(TransportMessage message)
{
var data = _transportMessageEncoder.Encode(message).ToList();
data.AddRange(Encoding.UTF8.GetBytes("\r\n"));
//var buffer = PooledByteBufferAllocator.Default.Buffer();
return data.ToArray();
}
}
public class SuperSocketMessageClientSender : SuperSocketMessageSender, IMessageSender
{
private readonly IEasyClient _client;
public SuperSocketMessageClientSender(ITransportMessageEncoder transportMessageEncoder, IEasyClient client) : base(transportMessageEncoder)
{
_client = client;
}
///
/// 发送消息。
///
/// 消息内容。
/// 一个任务。
public async Task SendAsync(TransportMessage message)
{
var buffer = GetByteBuffer(message);
await _client.SendAsync(buffer);
}
///
/// 发送消息并清空缓冲区。
///
/// 消息内容。
/// 一个任务。
public async Task SendAndFlushAsync(TransportMessage message)
{
var buffer = GetByteBuffer(message);
await _client.SendAsync(buffer);
// _client.StartReceive();
//var p= await _client.ReceiveAsync();
}
}
#region Implementation of IMessageSender
public class SuperSocketServerMessageSender : SuperSocketMessageSender, IMessageSender
{
private readonly IAppSession _session;
public SuperSocketServerMessageSender(ITransportMessageEncoder transportMessageEncoder, IAppSession session) : base(transportMessageEncoder)
{
_session = session;
}
///
/// 发送消息。
///
/// 消息内容。
/// 一个任务。
public async Task SendAsync(TransportMessage message)
{
var buffer = GetByteBuffer(message);
await _session.SendAsync(buffer);
}
///
/// 发送消息并清空缓冲区。
///
/// 消息内容。
/// 一个任务。
public async Task SendAndFlushAsync(TransportMessage message)
{
var buffer = GetByteBuffer(message);
await _session.SendAsync(buffer);
}
}
#endregion
```
**SuperSocket过滤器,需要继承TerminatorPipelineFilter<TransportMessage>, 代码如下:**
```csharp
public class TransportMessagePipelineFilter : TerminatorPipelineFilter
{
private readonly ITransportMessageDecoder _transportMessageDecoder;
public TransportMessagePipelineFilter() : base(new[] { (byte)'\r', (byte)'\n' })
{
_transportMessageDecoder = ServiceLocator.GetService().GetDecoder();
}
public override TransportMessage Filter(ref SequenceReader bufferStream)
{
try
{
var bytes = bufferStream.Sequence.Slice(0, bufferStream.Length - 2).ToArray();
var transportMessage = _transportMessageDecoder.Decode(bytes);
return transportMessage;
}
finally
{
bufferStream.Advance(bufferStream.Length);
}
}
}
```
# 三、如何加载**SuperSocket引擎组件**
**第一种方式:**
去掉Surging.Core.DotNetty引用,添加Surging.Core.SuperSocket

第二种方式
添加Surging.Core.DotNetty,Surging.Core.SuperSocket这两个应用,在surgingSettings.json配置文件中把Packages的using列表中的DotNettyModule改成SuperSocketModule

## 四、结果
可能是预发布版本,在测试当中还是有些问题,kayka物联网平台换成SuperSocket还是会发生错误,暂时没有条件进行压测,可能是因为预发布版本,作者还需要完善,等正式版之后再压测做对比吧
本文来自投稿,不代表本站立场,如若转载,请注明出处:http//www.knowhub.vip/share/2/2236
- 热门的技术博文分享
- 1 . ESP实现Web服务器
- 2 . 从零到一:打造高效的金仓社区 API 集成到 MCP 服务方案
- 3 . 使用C#构建一个同时问多个LLM并总结的小工具
- 4 . .NET 原生驾驭 AI 新基建实战系列Milvus ── 大规模 AI 应用的向量数据库首选
- 5 . 在Avalonia/C#中使用依赖注入过程记录
- 6 . [设计模式/Java] 设计模式之工厂方法模式
- 7 . 5. RabbitMQ 消息队列中 Exchanges(交换机) 的详细说明
- 8 . SQL 中的各种连接 JOIN 的区别总结!
- 9 . JavaScript 中防抖和节流的多种实现方式及应用场景
- 10 . SaltStack 远程命令执行中文乱码问题
- 11 . 推荐10个 DeepSeek 神级提示词,建议搜藏起来使用
- 12 . C#基础:枚举、数组、类型、函数等解析
- 13 . VMware平台的Ubuntu部署完全分布式Hadoop环境
- 14 . C# 多项目打包时如何将项目引用转为包依赖
- 15 . Chrome 135 版本开发者工具(DevTools)更新内容
- 16 . 从零创建npm依赖,只需执行一条命令
- 17 . 关于 Newtonsoft.Json 和 System.Text.Json 混用导致的的序列化不识别的问题
- 18 . 大模型微调实战之训练数据集准备的艺术与科学
- 19 . Windows快速安装MongoDB之Mongo实战
- 20 . 探索 C# 14 新功能:实用特性为编程带来便利