C++20协程实例:协程化的IOCP服务端/客户端

C++20协程实例:协程化的IOCP服务端/客户端

VC支持协程已经有一段时间了,之前一直想不明白协程的意义在哪里,前几天拉屎的时候突然灵光一闪:

以下是伪代码:

task server() {    for (;;) {        sock_context s = co_await io.accept();        for (;;) {            auto buf = co_await io.recv(s);            if (!buf.length())                break;            std::cout << buf.data() << std::endl;            int n = co_await io.send(s, "收到!", strlen("收到!") + 1);        }        co_await io.close(s);    }}

如果把IO库对外的接口做成上面这样,那岂不是看起来和最简单的阻塞模型相同的代码结构,但它的内在其实是异步的,用单线程相同的代码就能支撑一堆连接通信。

所以才有了接下来的研究(闲出屁才研究的),好在研究出成品了。

最终我也明白协程的意义了:

  协程化的库越多,C++程序员的门槛会越低,做上层开发的程序员可以不用知道协程的细节,只要知道如何正确使用库即可。

好了,真正介绍协程细节的文章有一大堆,不用我来写,我直接放代码,有兴趣的可以参考我的实现以及那些细节文章自己做:

2021/12/23:我最近使用了一个边缘应用试毒了这个库,一系列修修补补过后,还是很好用的。

2021/12/23:备注:最好不要用lambda函数作为协程函数,它可能会异常,也可能不会,这属于编译器bug带来的玄学。

#pragma once#include <WinSock2.h>#include <MSWSock.h>#include <ws2tcpip.h>#pragma comment(lib, "ws2_32.lib")#include <coroutine>#include <string>#include <functional>#include <thread>#include "logger.hpp"#include <random>/*** 最近花了点时间学习了一下C++20协程,初步改造实现了IOCP协程化的网络IO库* 此前基于回调分发的机制,由于上层协议解析所需的各种上下文,导致这个库是模板化的,* 现在有了协程,上层协议上下文已经可以在协程函数中实现,消除了模板化,也变得易于维护了一丢丢。* 但目前协程还有多少坑是未知的,是好是坏还得再看。* 使用协程,就意味着,这个库几乎完全失去了多线程的能力,* 要维护好一个内部是多线程,外皮是协程的IO库,我承认我没那个脑子。* 我个人当前的状态是不考虑过度设计,只追求上层代码优雅简洁,10几万并发对我而言已经满足了。* 如果这还不够用,那就意味着该放弃协程了,协程不是完全没有损耗的,根据我的测试,协程相比回调函数分发的方式,有15%左右的性能损耗。*/#pragma warning(push)#pragma warning(disable:4996)namespace aqx{    static int init_winsock() {        WSADATA wd;        return WSAStartup(MAKEWORD(2, 2), &wd);    }    static aqx::log nlog;#ifndef _nf#define _nf ((size_t)-1)#endif#ifndef __AQX_TIME_HPP#define __AQX_NOW_FUNC    using clock64_t = long long;    template<typename period = std::milli>    clock64_t now() {        const clock64_t _Freq = _Query_perf_frequency();        const clock64_t _Ctr = _Query_perf_counter();        const clock64_t _Whole = (_Ctr / _Freq) * period::den;        const clock64_t _Part = (_Ctr % _Freq) * period::den / _Freq;        return _Whole + _Part;    }#endif    /**     * 操作码与状态码定义    */    struct net_status {        static constexpr unsigned int s_accept = 0x01;        static constexpr unsigned int s_connect = 0x02;        static constexpr unsigned int s_read = 0x04;        static constexpr unsigned int s_write = 0x08;        static constexpr unsigned int s_close = 0x10;        static constexpr unsigned int s_exec = 0x20;        static constexpr unsigned int t_activated = 0x40;        static constexpr unsigned int t_acceptor = 0x0100;        static constexpr unsigned int t_connector = 0x0200;        static constexpr unsigned int t_await_undo = 0x0400;        static constexpr unsigned int t_await_accept = 0x010000;        static constexpr unsigned int t_await_connect = 0x020000;        static constexpr unsigned int t_await_read = 0x040000;        static constexpr unsigned int t_await_write = 0x080000;        static constexpr unsigned int t_await_close = 0x100000;        static constexpr unsigned int t_await = 0xFF0000;    };    /** net_base 主要负责衔接操作系统    * 不考虑过度设计,写得比较辣鸡,能用就行。    */    class net_base {    public:        net_base() {            fd = INVALID_SOCKET;            hIocp = NULL;            AcceptEx = NULL;            ConnectEx = NULL;            DisconnectEx = NULL;            StreamCapacity = 1440;            Timeout = 0;            DataBacklog = 0;            WorkerThreadId = 0;        }        static bool sockaddr_from_string(sockaddr_in& _Addr, const std::string& _Dest) {            _Addr.sin_addr.S_un.S_addr = INADDR_NONE;            size_t pos = _Dest.find(":");            if(pos == _nf) {                nlog("%s->错误目标地址:(%s)\n", __FUNCTION__, _Dest.data());                return false;            }            auto strip = _Dest.substr(0, pos);            auto strport = _Dest.substr(pos + 1);            strport.erase(strport.find_last_not_of("\r\n\t ") + 1);            strport.erase(0, strport.find_first_not_of("\r\n\t "));            unsigned short port = (unsigned short)atoi(strport.c_str());            if (!port) {                nlog("%s->目标端口号错误:(%s)\n", __FUNCTION__, _Dest.data());                return false;            }                        strip.erase(strip.find_last_not_of("\r\n\t ") + 1);            strip.erase(0, strip.find_first_not_of("\r\n\t "));            auto it = std::find_if(strip.begin(), strip.end(), [](char c)->bool {                return ((c < '0' || c > '9') && (c != '.'));                });            _Addr.sin_family = AF_INET;            _Addr.sin_port = htons(port);            if (it != strip.end()) {                hostent* host = gethostbyname(strip.c_str());                if (!host) {                    nlog("%s->错误的目标域名:(%s)\n", __FUNCTION__, _Dest.data());                    return false;                }                _Addr.sin_addr = *(in_addr*)(host->h_addr_list[0]);            }            else {                _Addr.sin_addr.S_un.S_addr = inet_addr(strip.c_str());            }            if (_Addr.sin_addr.S_un.S_addr == INADDR_NONE) {                nlog("%s->错误的目标地址:(%s)\n", __FUNCTION__, _Dest.data());                return false;            }            return true;        }        static void sockaddr_any(sockaddr_in& _Addr, unsigned short _Port) {            _Addr.sin_family = AF_INET;            _Addr.sin_port = htons(_Port);            _Addr.sin_addr.S_un.S_addr = INADDR_ANY;        }        static void sockaddr_local(sockaddr_in& _Addr, unsigned short _Port) {            _Addr.sin_family = AF_INET;            _Addr.sin_port = htons(_Port);            _Addr.sin_addr.S_un.S_addr = INADDR_LOOPBACK;        }        static void* getmswsfunc(SOCKET s, GUID guid) {            DWORD dwBytes;            void* lpResult = nullptr;            WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,                sizeof(guid), &lpResult, sizeof(lpResult), &dwBytes, NULL, NULL);            return lpResult;        }        static std::string sockaddr_to_string(const sockaddr_in &_Addr) {            char buf[256];            sprintf(buf, "%d.%d.%d.%d:%d", _Addr.sin_addr.S_un.S_un_b.s_b1,                _Addr.sin_addr.S_un.S_un_b.s_b2,                _Addr.sin_addr.S_un.S_un_b.s_b3,                _Addr.sin_addr.S_un.S_un_b.s_b4,                htons(_Addr.sin_port));            std::string _Result = buf;            return _Result;        }    private:        int init(int _StreamCapacity, int _DataBacklog, int _Timeout) {            if (fd != INVALID_SOCKET) {                return 0;            }            auto reterr = [this](int n) {                if (fd != INVALID_SOCKET) {                    closesocket(fd);                    fd = INVALID_SOCKET;                }                return n;            };            StreamCapacity = _StreamCapacity;            Timeout = _Timeout;            if (Timeout < 0) {                nlog("%s->Timeout必须>=0", __FUNCTION__);                return reterr(-1);            }            DataBacklog = _DataBacklog;            fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);            if (fd == INVALID_SOCKET) {                nlog("%s->创建套接字失败:%d", __FUNCTION__, WSAGetLastError());                return reterr(-1);            }            ConnectEx = (LPFN_CONNECTEX)getmswsfunc(fd, WSAID_CONNECTEX);            if (!ConnectEx) {                nlog("%s->获取 ConnectEx 地址失败,错误号:%d", __FUNCTION__, WSAGetLastError());                return reterr(-2);            }            AcceptEx = (LPFN_ACCEPTEX)getmswsfunc(fd, WSAID_ACCEPTEX);            if (!AcceptEx) {                nlog("%s->获取 AcceptEx 函数失败,错误号:%d", __FUNCTION__, WSAGetLastError());                return reterr(-3);            }                        // 我已经不止一次做过DisconnectEx的测试,最终结论都是DisconnectEx并不能提高并发连接数。            // DisconnectEx 在想象中会更快是因为用IOCP队列锁去换系统全局锁带来了性能提升。            // 还有一种方法是开一个线程搞个表去阻塞调用DisconnectEx,完事之后直接AcceptEx,也就最终把全局内核锁完全转嫁成你自己的锁了。            // DisconnectEx首先是不同的操作系统行为不一致,真正保险的做法只能在对方关闭连接时,调用DisconnectEx来复用。            // 对于IOCP来说,也就是在WSARecv或者WSASend 从 GetQueuedCompletionStatus 返回之后,第2个参数transferred == 0时            // 同时它受到TCP TIME_WAIT状态的影响            // 系统存在大量TIME_WAIT套接字时,最终得到的效果是,用了更多内存,去换来了更少的并发连接数。            /*DisconnectEx = (LPFN_DISCONNECTEX)getmswsfunc(fd, WSAID_DISCONNECTEX);            if (!DisconnectEx) {                nlog("%s->获取 DisconnectEx 函数失败,错误号:%d", __FUNCTION__, WSAGetLastError());                return reterr(-4);            }*/            hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);            if (!hIocp) {                nlog("%s->创建完成端口失败,错误号:%d", __FUNCTION__, GetLastError());                return reterr(-5);            }            CreateIoCompletionPort((HANDLE)fd, hIocp, 0, 0);            return 0;        }        void close() {            if (fd != INVALID_SOCKET) {                closesocket(fd);                fd = INVALID_SOCKET;            }            if (hIocp) {                CloseHandle(hIocp);                hIocp = NULL;            }        }        BOOL Accept(SOCKET s, char* _Data, LPOVERLAPPED _Overlapped) {            DWORD _Received = 0;            return AcceptEx(fd, s, _Data, 0, sizeof(SOCKADDR_IN) << 1, sizeof(SOCKADDR_IN) << 1, &_Received, _Overlapped);        }        BOOL Connect(SOCKET s, sockaddr* _Addr, int _AddrLen, LPOVERLAPPED _Overlapped) {            DWORD _Sent = 0;            return ConnectEx(s, _Addr, _AddrLen, nullptr, 0, &_Sent, _Overlapped);        }        /*BOOL Disconnect(SOCKET s, LPOVERLAPPED _Overlapped) {            return DisconnectEx(s, _Overlapped, TF_REUSE_SOCKET, 0);        }*/        /* 使用了C++11的条件变量与互斥锁实现了同步消息来保证多线程安全IO处理,本质上只是多线程Output        * 因为完成端口未实现同步消息机制,所以这种操作无论如何都至少要涉及到两个锁(一个IOCP锁,一个其他锁):        * 1、采用动态new delete,这种方式最坏的情况要经过那把系统全局的大锁,不可取。        * 2、采用一个我们自己的锁对象,当前使用的方式。        * 3、每个套接字上下文拥有一个独立的锁对象,总觉得在这种了不起就才10几万并发IO的场景,锁竞争带来的性能损耗不该发展到这一步。        */        int SafeIOMessage(DWORD dwNumberOfBytesTransferred, ULONG_PTR dwCompletionKey) {            std::unique_lock<std::mutex> lock(safeIO.mtx);            safeIO.cv.wait(lock, [this]() {                return (safeIO.s & 1);            });            if (safeIO.s == -1)                return -1;            safeIO.s = 0;            PostQueuedCompletionStatus(hIocp, dwNumberOfBytesTransferred, dwCompletionKey, 0);            safeIO.cv.wait(lock, [this]() {                return (safeIO.s & 3);            });            if (safeIO.s == -1)                return -1;            int _Result = safeIO.result;            safeIO.s = 1;            safeIO.cv.notify_all();            return _Result;        }        void InitSafeIO() {            std::lock_guard<std::mutex> lg(safeIO.mtx);            safeIO.s = 1;        }        void ExitSafeIO() {            std::lock_guard<std::mutex> lg(safeIO.mtx);            safeIO.s = -1;            safeIO.cv.notify_all();        }        void SafeIOResult(int _Result) {            // 理论上来说,IOCP工作者线程不需要在此处加锁,实际情况未知,我个人是以悲观的态度对待这个问题            std::lock_guard<std::mutex> lg(safeIO.mtx);            safeIO.result = _Result;            safeIO.s = 2;            safeIO.cv.notify_all();        }    private:        friend class sock;        friend class netio;        friend class coio;        SOCKET fd;        HANDLE hIocp;        LPFN_ACCEPTEX AcceptEx;        LPFN_CONNECTEX ConnectEx;        LPFN_DISCONNECTEX DisconnectEx;        int StreamCapacity;        int Timeout;        int DataBacklog;        DWORD WorkerThreadId;        struct safeio_send_struct {            sock* s;            void* buf;            int len;        };        struct SAFEIO {            std::mutex mtx;            std::condition_variable cv;            int s = -1;            int result = 0;        }safeIO;            };    /*直接继承一个std::string来作为套接字的各种缓冲区*/    class sock_buffer : public std::string {    public:        using _Basetype = std::string;        using _Basetype::_Basetype;        void preset_length(size_t _Length) {            // 直接在二进制层面去搞VC的std::string结构,修改std::string::length()的返回值            // 这么做的好处是,免去了std::string::resize()的拷贝问题。            // 注意这段代码仅适用于VC,G++的std::string结构和VC不一样。            struct __stlstr {                const char str[0x10];                size_t len;            };            if (this->capacity() < _Length)                this->reserve(_Length);            ((__stlstr*)this)->len = _Length;        }    };    /**    * 协程task    */    template<typename _Ty>    struct net_task_t {        struct promise_type;        using _Hty = std::coroutine_handle<promise_type>;        struct promise_type {            net_task_t get_return_object() { return { _Hty::from_promise(*this) }; }            // initial_suspend 里返回return std::suspend_always{};表示协程初始化成功之后就挂起            // 这里就挂起,是为了给set_sock留出操作的时间,否则一个空函数协程,会在创建完之后直接就销毁。            auto initial_suspend() { return std::suspend_always{}; }            auto final_suspend() noexcept {                 s->on_destroy_coroutine();                 return std::suspend_never{};             }            void unhandled_exception() { std::terminate(); }            void return_void() { }            _Ty* s = nullptr;        };        _Hty _Handle;        void resume() { _Handle.resume(); }        void destroy() { _Handle.destroy(); }        void set_sock(_Ty* _s) { _Handle.promise().s = _s; }    };    /**套接字上下文*/    class sock {        // 这是扩展OVERLAPPED结构        struct binding {            OVERLAPPED ol;            int opt;            sock* s;        };        /**        * 返回给协程recv的对象类型        */        class sock_data {            sock_data(sock* _s) : s(_s) {}        public:            char* data() { return s->ibuf.data(); }            void erase(size_t _Count) { s->ibuf.erase(0, _Count); }            size_t length() { return s->ibuf.length(); }            void clear() { s->ibuf.clear(); }        private:            friend class sock;            sock* s;        };        /**返回给协程connect和accept的对象类型        * 用于异步send与close,        * 其他线程也可以利用这个对象通信,已经处理了线程安全问题,但不太效率,因为使用了全局锁。        */        class asyncsock {        public:            /**            * send 是未加锁的发送数据            * 没有多线程需求时,send是安全的            */            int send(void* data, int len) {                if (s->v->WorkerThreadId != GetCurrentThreadId()) {                    return s->safe_send(data, len);                }                else {                    return s->send(data, len);                }            }            int send(const void* data, int len) {                if (s->v->WorkerThreadId != GetCurrentThreadId()) {                    return s->safe_send(data, len);                }                else {                    return s->send(data, len);                }            }            void close() {                if (s->v->WorkerThreadId != GetCurrentThreadId()) {                    s->safe_close();                }                else {                    s->close();                }            }            bool isactivated() { return s->isactivated(); }            operator bool() {                return (s != nullptr);            }            sockaddr_in& getsockaddr() {                return s->getsockaddr();            }            // 响应超时,这是用来给客户端发送心跳包的            // 心跳机制是基于操作系统函数 RegisterWaitForSingleObject实现的            // 会基于netio::init传入的Timeout参数的2/3的频率发送消息            // 也就是说,Timeout并不是一个绝对准确的数值,这就是为了要给客户端留出发心跳包的切入点的代价。            // 例如Timeout设置为6000, 真正超时的客户端,将会再4000-8000ms后被检查出来            void ontimeout(void(*proc)(asyncsock)) {                if (!s)                    return;                s->ontimeout = proc;            }                    private:            bool operator<(const asyncsock& as) const{                return (size_t)s < (size_t)as.s;            }            friend typename std::less<asyncsock>;        private:            friend class netio;            friend class coio;            friend class sock;            sock* s = nullptr;        };                struct recv_awaitable {            recv_awaitable(sock* s) : data(s) { }            // 当编译器自动将await_ready以及await_suspend优化为inline时,协程态引发异常            // 使await_ready强制noline时,没有异常。            __declspec(noinline) bool await_ready() {                // 我当前的vs版本是: vs 2022 17.0.1                // 这里发现一个编译bug,只要await_ready与await_suspend同时被inline优化                // 最后从流程态切换回协程态时,会获取 __coro_frame_ptr.__resume_address 做为recv_awaitable对象来使用                // 紧接着就会引发异常                // 最终我发现,目前vc的协程与lambda函数之间存在bug,                // 使用lambda作为协程函数时,如果此lambda函数inline,就可能会有各种指针错误。                // 我已向vs社区报告过此问题,得到的答复时考虑中,也不知道何时修复。                if (data.s->st & net_status::t_await_undo) {                    data.s->ibuf.clear();                    data.s->st &= (~net_status::t_await_undo);                    return true;                }                return false;            }            void await_suspend(std::coroutine_handle<> handle) { }            sock_data await_resume() const {                 return data;             }            sock_data data;        };        struct sock_awaitable {            sock_awaitable(sock* _s) { s.s = _s; }            __declspec(noinline) bool await_ready() {                if (s.s->st & net_status::t_await_undo) {                    s.s->st &= (~net_status::t_await_undo);                    return true;                }                return false;            }            void await_suspend(std::coroutine_handle<> handle) { }            sock::asyncsock await_resume() { return s; }            sock::asyncsock s;        };        struct close_awaitable {            close_awaitable(bool _IsSuspend) : IsSuspend(_IsSuspend) { }            __declspec(noinline) bool await_ready() { return (IsSuspend == false); }            void await_suspend(std::coroutine_handle<> handle) { }            void await_resume() { }            bool IsSuspend;        };        struct send_awaitable {            send_awaitable(sock* _s) : s(_s) {}            __declspec(noinline) bool await_ready() {                if (s->st & net_status::t_await_undo) {                    s->st &= (~net_status::t_await_undo);                    return true;                }                return false;            }            void await_suspend(std::coroutine_handle<> handle) { }            int await_resume() { return s->syncsendlen; }            sock* s;        };    public:        using opcode = net_status;        sock(net_base* _v) {            fd = INVALID_SOCKET;            v = _v;            st = 0;            ontimeout = nullptr;            memset(&input.ol, 0, sizeof(input.ol));            memset(&output.ol, 0, sizeof(output.ol));                        if (v->Timeout)                output.ol.hEvent = input.ol.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);            else                output.ol.hEvent = input.ol.hEvent = NULL;            output.s = input.s = this;            output.opt = opcode::s_write;            ibuf.reserve(v->StreamCapacity);            obuf.reserve(v->StreamCapacity);        }        ~sock() {            close();            if (!output.ol.hEvent)                return;            CloseHandle(output.ol.hEvent);            output.ol.hEvent = output.ol.hEvent = NULL;            if (st & opcode::t_await)                 co.destroy();        }        void on_destroy_coroutine() {            close();            st &= (~opcode::t_connector);        }        bool isactivated() {            return ((st & opcode::t_activated) != 0);        }        int send(void* data, int len) {            if (!len)                return len;            int n = (int)(obuf.capacity() - obuf.length());            if (n >= len && !obacklog.length()) {                obuf.append((char*)data, len);            }            else {                if (v->DataBacklog != 0 && obacklog.length() + len > v->DataBacklog) {                    //积压值超过限制                    close();                    return -1;                }                obacklog.append((char*)data, len);            }            return (write() == 0) ? len : -1;        }        int send(const void* data, int len) {            return send((void*)data, len);        }        int safe_send(void* data, int len) {            net_base::safeio_send_struct param = { this, data, len };            return v->SafeIOMessage(opcode::s_write, (ULONG_PTR)&param);        }        int safe_send(const void* data, int len) {            net_base::safeio_send_struct param = { this, (void*)data, len };            return v->SafeIOMessage(opcode::s_write, (ULONG_PTR)&param);        }        int safe_close() {            return v->SafeIOMessage(opcode::s_close, (ULONG_PTR)this);        }        void close() {            if (INVALID_SOCKET == fd)                return;            ontimeout = nullptr;            closesocket(fd);            fd = INVALID_SOCKET;            st &= ~opcode::t_activated;            st |= opcode::s_close;            set_timer(false);            ibuf.clear();            if (obacklog.capacity() <= 0x0F)                return;            sock_buffer tmp;            obacklog.swap(tmp);        }        sockaddr_in& getsockaddr() { return sa; }    private:        int initfd() {            if (INVALID_SOCKET != fd) {                return 0;            }                            fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);            if (INVALID_SOCKET == fd) {                nlog("%s->创建套接字失败,错误号:%d", __FUNCTION__, WSAGetLastError());                return -1;            }            LINGER linger = { 1, 0 };            setsockopt(fd, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger));            int b = 1;            setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&b, sizeof(b));            CreateIoCompletionPort((HANDLE)fd, v->hIocp, 0, 0);            return 0;        }        int bindlocal() {            sockaddr_in local;            local.sin_family = AF_INET;            local.sin_addr.S_un.S_addr = INADDR_ANY;            local.sin_port = 0;            if (SOCKET_ERROR == bind(fd, (LPSOCKADDR)&local, sizeof(local))) {                nlog("%s->绑定本地端口失败,错误号:%d", __FUNCTION__, WSAGetLastError());                return -1;            }            return 0;        }        bool set_dest(const std::string& _Dest) {            return net_base::sockaddr_from_string(sa, _Dest);        }        void set_timer(bool _Enable) {            if (_Enable) {                if (hTimer)                    return;                RegisterWaitForSingleObject(&hTimer, output.ol.hEvent, [](void* Param, BOOLEAN TimerOrWaitFired) {                    if (!TimerOrWaitFired)                        return;                    sock* p = (sock*)Param;                    PostQueuedCompletionStatus(p->v->hIocp, 0, (ULONG_PTR)p, nullptr);                }, this, (ULONG)v->Timeout * 2 / 3, WT_EXECUTEDEFAULT);            }            else {                if (!hTimer)                    return;                std::ignore = UnregisterWaitEx(hTimer, NULL);                hTimer = NULL;            }        }        int nat() {            sockaddr_in _Addr;            int _AddrLen = sizeof(_Addr);            if (-1 == getsockname(fd, (sockaddr*)&_Addr, &_AddrLen))                return -1;            SOCKET fdNat = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);            LINGER linger = { 1, 0 };            setsockopt(fdNat, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger));            CreateIoCompletionPort((HANDLE)fdNat, v->hIocp, 0, 0);            if (-1 == bind(fdNat, (sockaddr*)&_Addr, sizeof(_Addr))) {                closesocket(fdNat);                return -1;            }            close();            fd = fdNat;            return connect();        }        int accept() {            if (((st & 0xFF) | opcode::s_close) != opcode::s_close) {                nlog("%s->当前套接字未断开连接!", __FUNCTION__);                return -1;            }            if (initfd())                return -1;            DWORD _Received = 0;            input.opt = opcode::s_accept;            st &= (~opcode::s_close);            st |= opcode::s_accept;            if (!v->Accept(fd, ibuf.data(), &input.ol)) {                int _Error = WSAGetLastError();                if (_Error != ERROR_IO_PENDING) {                    st &= (~opcode::s_accept);                    nlog("%s->AcceptEx失败, 错误号:", __FUNCTION__, WSAGetLastError());                    return -1;                }            }            return 0;        }        int connect() {            if (((st & 0xFF) | opcode::s_close) != opcode::s_close) {                nlog("%s->当前套接字未断开连接!", __FUNCTION__);                return -1;            }            if (INVALID_SOCKET == fd) {                if (initfd())                    return -1;                if (bindlocal())                    return -1;            }            input.opt = opcode::s_connect;            st &= (~opcode::s_close);            st |= opcode::s_connect;            if (!v->Connect(fd, (sockaddr*)&sa, sizeof(sa), &input.ol)) {                int _Error = WSAGetLastError();                if (_Error != ERROR_IO_PENDING) {                    nlog("%s->ConnectEx失败, 错误号:", __FUNCTION__, WSAGetLastError());                    return -1;                }            }            return 0;        }        int write() {            if (!(st & opcode::t_activated)) {                return -1;            }            if (st & (opcode::s_write | opcode::s_close | opcode::s_accept | opcode::s_connect))                return 0;            if (obacklog.size()) {                size_t rl = obuf.capacity() - obuf.length();                if (rl > obacklog.length())                    rl = obacklog.length();                if (rl) {                    obuf.append(obacklog.data(), rl);                    obacklog.erase(0, rl);                }            }            WSABUF buf = { (ULONG)(obuf.length()), obuf.data() };            if (!buf.len)                return 0;            st |= opcode::s_write;            DWORD _Sent = 0;            if (SOCKET_ERROR == WSASend(fd, &buf, 1, &_Sent, 0, &(output.ol), NULL)) {                int _Error = WSAGetLastError();                if (WSA_IO_PENDING != _Error) {                    st &= (~opcode::s_write);                    return -1;                }            }            return 0;        }        int read() {            if (!(st & opcode::t_activated)) {                return -1;            }            if (st & (opcode::s_read | opcode::s_close | opcode::s_accept | opcode::s_connect))                return 0;            WSABUF buf = {                (ULONG)(ibuf.capacity() - ibuf.length()),                ibuf.data() + ibuf.length()            };            if ((int)buf.len <= 0) {                return -1;            }            DWORD _Received = 0;            DWORD _Flags = 0;            st |= opcode::s_read;            input.opt = opcode::s_read;            if (SOCKET_ERROR == WSARecv(fd, &buf, 1, &_Received, &_Flags, &(input.ol), NULL)) {                int _Error = WSAGetLastError();                if (WSA_IO_PENDING != _Error) {                    st &= ~(opcode::s_read);                    return -1;                }            }            return 0;        }    private:        friend class coio;        friend class netio;        SOCKET fd;        sockaddr_in sa;        net_base* v;        int st;        binding input, output;        sock_buffer ibuf, obuf, obacklog;        HANDLE hTimer;        aqx::clock64_t rtime;        net_task_t<sock> co;        void (*ontimeout)(asyncsock);        int syncsendlen;    };    // coio是传参给协程函数的操作对象    class coio {        coio(sock* _s) : s(_s) {}    public:        using asyncsock = sock::asyncsock;        using sock_awaitable = sock::sock_awaitable;        using close_awaitable = sock::close_awaitable;        using send_awaitable = sock::send_awaitable;        using recv_awaitable = sock::recv_awaitable;        struct nat_awaitable {            nat_awaitable(bool _ret) : ret(_ret) {  }            __declspec(noinline) bool await_ready() { return (ret == false); }            void await_suspend(std::coroutine_handle<> handle) { }            bool await_resume() { return ret; }            bool ret;        };        coio() : s(nullptr) {}        sock_awaitable connect(const std::string& _Dest) {            if (!s->set_dest(_Dest)) {                // 设置目标地址失败时,撤销等待                s->st |= net_status::t_await_undo;                return sock_awaitable(s);            }            // 我使用的协程initial_suspend中是不挂起的,             // 所以一个套接字的首次connect操作基本都是由其他线程引发的            // 而且很可能在await_suspend之前,IOCP队列就已经完成            if (GetCurrentThreadId() == s->v->WorkerThreadId) {                if (s->connect()) {                    // 连接失败时,撤销等待。                    s->st |= net_status::t_await_undo;                    return sock_awaitable(s);                }            }            else {                // 因此,不是IOCP队列线程引发的connect就发送到IOCP队列去处理                PostQueuedCompletionStatus(s->v->hIocp, net_status::s_connect, (ULONG_PTR)s, 0);            }            s->st |= net_status::t_await_connect;            return sock_awaitable(s);        }        sock_awaitable accept() {            // 首次accept虽然也是其他线程调用的(一般是main线程)            // 但首次accept时,IOCP工作线程尚未启动,因此可以无视掉connect的那个问题。            s->st |= ((!s->accept()) ? net_status::t_await_accept : net_status::t_await_undo);            return sock_awaitable(s);        }        /**        * 以下几个成员函数中的参数asyncsock _s应该等同于私有成员s,除非强行在外部使用syncio对象        * 使用参数而不是私有成员的原因是防止在尚未连接前调用IO操作。        * 私有成员s将专用于accept与connect        */        close_awaitable close(asyncsock _s) {            _s.s->close();            if ((_s.s->st & 0xFF) == net_status::s_close) {                // 如果套接字上已经没有任何IO事件,就让awaitable直接唤醒协程                // 通常这才是正常状态,但如果有其他线程异步send时,可能就会有未决IO存在了。                return close_awaitable(false);            }            _s.s->st |= net_status::t_await_close;            return close_awaitable(true);        }        send_awaitable send(asyncsock _s, void *buf, int len) {            _s.s->syncsendlen = _s.send(buf, len);            _s.s->st |= ((_s.s->syncsendlen >= 0) ? net_status::t_await_write : net_status::t_await_undo);            return sock::send_awaitable(_s.s);        }        send_awaitable send(asyncsock _s, const void* buf, int len) {            _s.s->syncsendlen = _s.send(buf, len);            _s.s->st |= ((_s.s->syncsendlen >= 0) ? net_status::t_await_write : net_status::t_await_undo);            return sock::send_awaitable(_s.s);        }        recv_awaitable recv(asyncsock _s) {            int n = _s.s->read();            if (n < 0) {                _s.s->st |= net_status::t_await_undo;            }            else {                _s.s->st |= net_status::t_await_read;            }            return recv_awaitable(_s.s);        }        nat_awaitable nat(asyncsock _s, const std::string& _Dest) {            if ((_s.s->st & 0xFF) != net_status::t_activated) {                // nat之前必须保证所有未决IO都已经返回,与打洞服务器保持正常连接状态,否则就是失败。                // 到这里失败时,依旧与打洞服务器保持着正常连接。                return nat_awaitable(false);            }            sockaddr_in sa = _s.s->sa;            if (!_s.s->set_dest(_Dest)) {                // 设置目标地址失败                // 到这里失败时,依旧与打洞服务器保持着正常连接。                _s.s->sa = sa;                return nat_awaitable(false);            }            if (_s.s->nat()) {                // 到这一步失败时,与打洞服务器的连接就有可能会断掉                // nat失败时,本就应该直接close();                 // 都失败了,我想不出还要跟打洞服务器继续苟合的理由。                // 如果所有状态全都对,还失败,可能就是双方正好属于无法穿透的NAT类型环境下。                // 我对此研究不多,业界内真正懂行的也不多,资料更是少得可怜,我只知道TCP NAT在代码上的表现为:                //     1、与打洞服务器保持连接的这个套接字设置了SO_REUSEADDR,确保这个套接字绑定的本地端口可复用。                //          在这个库里我全都设置了可复用,但主要目的是为了缓解TIME_WAIT,并不是为了穿透。                //     2、双方通过打洞服务器沟通好各自的远端地址                //     3、双方都创建一个新的套接字,并将该套接字绑定到本地与打洞服务器进行连接的那个地址(getsockname可以获得)                //          到第 3 步处理好之后,与打洞服务器连接的那个套接字,已经废了,无法再进行通信,此时应该把它close掉。                //     4、最后双方都connect对方的地址。                _s.s->sa = sa;                return nat_awaitable(false);            }            s->st |= net_status::t_await_connect;            return nat_awaitable(true);        }        bool valid() {            return (s != nullptr);        }        operator bool () {            return valid();        }    private:        friend class netio;        sock* s;    };    /**    * 可以简单把netio看成是一个容器的作用    * 它主要用于对接net_base,创建线程,处理IO事件。    */    class netio {        struct IOCP_STATUS {            DWORD transferred;            SIZE_T key;            typename sock::binding* pb;            BOOL ok;        };    public:        /**listener 只是一种简单的参数包装,只是为了方便构造而已        * 构造参数:        * _Dest 要监听的地址和端口,格式为:"a.b.c.d:port"        * _ListenBacklog 系统函数listen的第2个参数        * _MaxClients 最多同时接受的客户端数量        */        class listener {        public:            listener() {                max_clients = 0;                listen_backlog = 0;                addr.sin_addr.S_un.S_addr = INADDR_NONE;            }            listener(const std::string& _Dest, int _ListenBacklog, size_t _MaxClients) {                max_clients = _MaxClients;                listen_backlog = _ListenBacklog;                net_base::sockaddr_from_string(addr, _Dest);            }        private:            friend class netio;            sockaddr_in addr;            int listen_backlog;            size_t max_clients;        };        using asyncsock = sock::asyncsock;        using sock_data = sock::sock_data;        using opcode = net_status;        using task = net_task_t<sock>;        int init(int _StreamCapacity = 1440, int _DataBacklog = 0, int _Timeout = 0) {            std::lock_guard<std::mutex> lg(mtx);            return nwb.init(_StreamCapacity, _DataBacklog, _Timeout);        }        int server(const std::function<task(coio)> &_func, const listener &param) {            std::lock_guard<std::mutex> lg(mtx);            if (thd.joinable()) {                nlog("%s->netio已启动, 请勿重复调用!", __FUNCTION__);                return 0;            }            if (nwb.fd == INVALID_SOCKET)                return -1;            cofunc = _func;            if (param.addr.sin_addr.S_un.S_addr != INADDR_NONE) {                if (SOCKET_ERROR == bind(nwb.fd, (SOCKADDR*)&param.addr, sizeof(SOCKADDR))) {                    nlog("%s->绑定端口失败,错误号:%d", __FUNCTION__, WSAGetLastError());                    nwb.close();                    return -1;                }                if (SOCKET_ERROR == ::listen(nwb.fd, param.listen_backlog)) {                    nlog("%s->监听失败,错误号:%d", __FUNCTION__, WSAGetLastError());                    nwb.close();                    return -1;                }                for (int i = 0; i < param.max_clients; i++) {                    sock* psock = new sock(&nwb);                    a_list.push_back(psock);                    psock->st |= opcode::t_acceptor;                    psock->co = cofunc(coio(psock));                    psock->co.set_sock(psock);                    psock->co.resume();                }            }            __start();            return 0;        }        // client是一次性的,专用于客户端        // 让它返回asyncsock对象的理由是为了给脚本语言预留的        // 例如可以使用lua去实现类似node.js的那种connect之后不管连没连上就先得到对象去绑定事件的机制。        asyncsock client(const std::function<task(coio)>& _func) {            std::lock_guard<std::mutex> lg(mtx);            coio io;            asyncsock ret;            if (!thd.joinable()) {                // 如果线程未启动,尝试启动线程,这之后如果要回收资源,是需要stop和release的                if (nwb.fd == INVALID_SOCKET)                    return ret;                __start();            }            io.s = get_connector();            ret.s = io.s;            io.s->co = _func(io);            io.s->co.set_sock(io.s);            io.s->co.resume();            return ret;        }        void exec(const std::function<void()>& _func) {            if (!thd.joinable()) {                // 如果线程未启动,尝试启动线程,这之后如果要回收资源,是需要stop和release的                if (nwb.fd == INVALID_SOCKET)                    return;                __start();            }            nwb.SafeIOMessage(opcode::s_exec, (ULONG_PTR)&_func);        }        void stop() {            std::lock_guard<std::mutex> lg(mtx);            if (thd.joinable()) {                PostQueuedCompletionStatus(nwb.hIocp, -1, 0, 0);                thd.join();            }        }        void release() {            std::lock_guard<std::mutex> lg(mtx);            if (thd.joinable()) {                nlog("%s->nio正在运行,请先stop", __FUNCTION__);                return;            }            for (auto p : a_list) {                delete p;            }            a_list.clear();            for (auto p : c_list) {                delete p;            }            c_list.clear();            nwb.close();        }    private:        sock* get_connector() {            sock* psock = nullptr;                        for (auto v : c_list) {                if ((v->st & opcode::t_connector) == 0 && ((v->st & 0xFF)| opcode::s_close) == opcode::s_close) {                    psock = v;                    break;                }            }            if (!psock) {                psock = new sock(&nwb);                c_list.push_back(psock);            }            psock->st |= opcode::t_connector;            return psock;        }        void on_connect(sock& s) {            s.ibuf.clear();            s.obuf.clear();            s.obacklog.clear();            s.rtime = aqx::now();            if (nwb.Timeout != 0)                s.set_timer(true);            s.st |= opcode::t_activated;        }                void on_accept(sock &s) {            // 懒得去调用GetAcceptExSockAddrs,有硬编码可用#ifndef _WIN64            s.sa = *(sockaddr_in*)(s.ibuf.data() + 0x26);#else            s.sa = *(sockaddr_in*)(s.ibuf.data() + 0x20);#endif            on_connect(s);        }                bool on_resume(sock& s) {            if (s.st & opcode::t_await) {                // 清除所有协程等待标志                s.st &= (~opcode::t_await);                // 唤醒协程                s.co.resume();                return true;            }            return false;        }        void on_close(sock& s) {            if ((s.st & 0xFF) == opcode::s_close) {                s.st &= ~opcode::s_close;                on_resume(s);            }        }        bool error_resume(sock &s) {            int st = s.st & opcode::t_await;            switch (st) {            case opcode::t_await_accept:            case opcode::t_await_connect:            case opcode::t_await_close:                s.st &= (~opcode::t_await);                s.co.resume();                return true;            case opcode::t_await_read:                s.ibuf.clear();                s.st &= (~opcode::t_await);                s.co.resume();                return true;            case opcode::t_await_write:                s.syncsendlen = -1;                s.st &= (~opcode::t_await);                s.co.resume();                return true;            default:                break;            }            return false;        }        void on_reset(sock &s) {            if ((s.st & 0xFF) == opcode::s_close) {                s.st &= ~opcode::s_close;                if (s.st & opcode::t_acceptor) {                    // 如果服务端协程不在一个循环里,协程返回自动销毁后就会这样                    // 此时的挽救措施就是创建一个新的协程                    s.co = cofunc(coio(&s));                }            }        }        void on_completion(IOCP_STATUS& st) {            sock& s = *(st.pb->s);            int op = st.pb->opt;            s.st &= (~op);            if (s.st & opcode::s_close)                op = 0;            //nlog("on_completion:%I64X, %d", &s, op);            switch (op) {            case 0:                break;            case opcode::s_accept:                on_accept(s);                break;            case opcode::s_connect:                if (!st.ok && WSAGetLastError() == 1225) {                    // 出现这种错误,一般是由于服务端没有在监听指定端口,直接被操作系统拒绝了。                    op = 0;                    break;                }                on_connect(s);                break;            case opcode::s_read:                if (!st.transferred) {                    op = 0;                    break;                }                s.rtime = aqx::now();                s.ibuf.preset_length(s.ibuf.length() + st.transferred);                break;            case opcode::s_write:                if (!st.transferred) {                    op = 0;                    break;                }                s.rtime = aqx::now();                s.obuf.erase(0, st.transferred);                if (s.obuf.length() || s.obacklog.length()) {                    if (s.write()) {                        op = 0;                        break;                    }                }                // write操作可能是非协程发起的,协程很可能挂起在recv,因此需要判断一下。                if (!(s.st & opcode::t_await_write))                    return;                break;            }                        //nlog("on_completion2:%I64X, %d", &s, op);            if (!op) {                if (error_resume(s))                    return;                // 只有当协程被销毁时,error_resume才会返回false                s.close();                on_reset(s);                return;            }                        on_resume(s);            if (s.st & opcode::s_close)                return on_close(s);        }        void on_msgtimeout(sock *psock) {            if (aqx::now() - psock->rtime >= nwb.Timeout && (psock->st & opcode::t_activated)) {                psock->close();                if (error_resume(*psock))                    return;                on_reset(*psock);                return;            }            if (psock->ontimeout != nullptr) {                asyncsock as;                as.s = psock;                psock->ontimeout(as);            }        }        void on_msgconnect(sock* psock) {            if (psock->connect()) {                psock->close();                if (error_resume(*psock))                    return;                on_reset(*psock);            }        }        void on_msgwrite(net_base::safeio_send_struct* pss) {            nwb.SafeIOResult(pss->s->send(pss->buf, pss->len));        }        void on_msgclose(sock* psock) {            psock->close();            nwb.SafeIOResult(0);        }        void __start() {            thd = std::thread([this]() {                nwb.WorkerThreadId = GetCurrentThreadId();                srand((unsigned int)aqx::now() + nwb.WorkerThreadId);                nwb.InitSafeIO();                IOCP_STATUS st = { 0,0,0,0 };                //nlog("netio::worker->I/O工作线程 %d 开始!", nwb.WorkerThreadId);                                for (;;) {                    st.ok = GetQueuedCompletionStatus(nwb.hIocp,                        &(st.transferred),                        &(st.key),                        (OVERLAPPED**)&(st.pb),                        INFINITE);                    if (!st.pb) {                        if (st.transferred == -1)                             break;                                                switch (st.transferred) {                        case 0:                            on_msgtimeout((sock*)st.key);                            break;                        case opcode::s_connect:                            on_msgconnect((sock*)st.key);                            break;                        case opcode::s_write:                             on_msgwrite((net_base::safeio_send_struct*)st.key);                            break;                        case opcode::s_close:                            on_msgclose((sock*)st.key);                            break;                        case opcode::s_exec:                            (*((std::function<void()>*)st.key))();                            nwb.SafeIOResult(0);                            break;                        }                        continue;                    }                    on_completion(st);                }                                nwb.ExitSafeIO();                nwb.WorkerThreadId = 0;                //nlog("netio::worker->I/O工作线程 %d 已停止!", nwb.WorkerThreadId);            });        }            private:        net_base nwb;        std::list<sock*> a_list;        std::list<sock*> c_list;        std::function<task(coio)> cofunc;        std::thread thd;        std::mutex mtx;    };}#pragma warning(pop)

这个库我已经去除了各种耦合,除了日志库,aqx::log我自己写的一个简单的格式化日志库:

logger.hpp#pragma once#include <iostream>#include <string>#include <time.h>#include <stdarg.h>#include <mutex>#include <vector>//aqx::log不与aqx其他库耦合#if defined(_WIN32) || defined(_WIN64)#ifndef _WINDOWS_#include <WinSock2.h>#endif#define __aqxlog_getpid GetCurrentProcessId#define __aqxlog_gettid GetCurrentThreadId#include <io.h>#else#if defined(__linux__)#include <unistd.h>#include <sys/syscall.h>#define __aqxlog_getpid getpid#define __aqxlog_gettid() syscall(__NR_gettid)#endif#endif#pragma warning(push)#pragma warning(disable:4996)namespace aqx {    class log {    private:        struct _format_texts {            std::string time;            std::string type;            std::string pid;            std::string tid;        };    public:        static constexpr auto hs_time{ static_cast<int>(1) };        static constexpr auto hs_type{ static_cast<int>(2) };        static constexpr auto hs_pid{ static_cast<int>(4) };        static constexpr auto hs_tid{ static_cast<int>(8) };        log() {            _stdout_fp = stdout;            fp = stdout;            _fmtts = { "%Y/%m/%d %H:%M:%S ", "{%s} ",  "[%d] ",  "(%d) " };            head_style = log::hs_time;            head_presize = _gethps();            _EnableInfo = true;            _EnableError = false;            _EnableDebug = false;            _EnableWarn = false;            _DefType = "info";            s.reserve(0x1000);        }        ~log() {            if (fp != _stdout_fp)                fclose(fp);        }        void enable(const std::string_view& _Type, bool _Enable) {            std::lock_guard<std::mutex> lg(_Mtx);            if (_Type == "info")                _EnableInfo = _Enable;            else if (_Type == "error")                _EnableError = _Enable;            else if (_Type == "debug")                _EnableDebug = _Enable;            else if (_Type == "warn")                _EnableWarn = _Enable;        }        void seths(int hs) {            std::lock_guard<std::mutex> lg(_Mtx);            head_style = hs;            head_presize = _gethps();        }        void sethfmt(int _Style, const char* _Fmt) {            std::lock_guard<std::mutex> lg(_Mtx);            switch (_Style) {            case hs_time:                _fmtts.time = _Fmt;                break;            case hs_type:                _fmtts.type = _Fmt;                break;            case hs_pid:                _fmtts.pid = _Fmt;                break;            case hs_tid:                _fmtts.tid = _Fmt;                break;            }            head_presize = _gethps();        }        bool setvfs(const char* _FileName, bool _PutStdout = false) {            std::lock_guard<std::mutex> lg(_Mtx);            FILE* _tmp = fopen(_FileName, "ab");            if (!_tmp)                return false;            if (fp != _stdout_fp)                fclose(fp);            fp = _tmp;            PutStdout = _PutStdout;            return true;        }        log& info(const char* _Fmt, ...) {            std::lock_guard<std::mutex> lg(_Mtx);            if (!_EnableInfo)                return *this;            va_list vl;            va_start(vl, _Fmt);            _build("info", _Fmt, vl);            va_end(vl);            _putlog();            return *this;        }        log& debug(const char* _Fmt, ...) {            std::lock_guard<std::mutex> lg(_Mtx);            if (!_EnableDebug)                return *this;            va_list vl;            va_start(vl, _Fmt);            _build("info", _Fmt, vl);            va_end(vl);            _putlog();            return *this;        }        log& error(const char* _Fmt, ...) {            std::lock_guard<std::mutex> lg(_Mtx);            if (!_EnableError)                return *this;            va_list vl;            va_start(vl, _Fmt);            _build("info", _Fmt, vl);            va_end(vl);            _putlog();            return *this;        }        log& warn(const char* _Fmt, ...) {            std::lock_guard<std::mutex> lg(_Mtx);            if (!_EnableWarn)                return *this;            va_list vl;            va_start(vl, _Fmt);            _build("info", _Fmt, vl);            va_end(vl);            _putlog();            return *this;        }        log& operator()(const char* _Fmt, ...) {            std::lock_guard<std::mutex> lg(_Mtx);            if (!_EnableInfo)                return *this;            va_list vl;            va_start(vl, _Fmt);            _build(_DefType.c_str(), _Fmt, vl);            va_end(vl);            _putlog();            return *this;        }    private:        void _putlog() {            fputs(s.data(), fp);            if (fp != _stdout_fp) {                //fflush(fp);                if (PutStdout)                    fputs(s.data(), _stdout_fp);            }        }        size_t _build(const char* _Type, const char* _Fmt, va_list vl) {            s.clear();            size_t n = vsnprintf(nullptr, 0, _Fmt, vl);            if (n <= 0)                return _build_head(_Type);            if (n >= s.capacity()) {                s.clear();                s.reserve(n + head_presize);            }            size_t _Pos = _build_head(_Type);            char* p = (char*)s.data();            _Pos += vsnprintf(p + _Pos, s.capacity(), _Fmt, vl);            char c = p[_Pos - 1];#ifdef _WINDOWS_            if (c != '\r' && c != '\n') {                p[_Pos++] = '\r';                p[_Pos++] = '\n';                p[_Pos] = '\0';            }#else            if (c != '\r' && c != '\n') {                p[_Pos++] = '\n';                p[_Pos] = '\0';            }#endif            return _Pos;        }        size_t _build_time(size_t _Pos) {            if (!(head_style & log::hs_time))                return _Pos;            time_t t = time(NULL);            auto _Tm = localtime(&t);            _Pos += strftime((char*)s.data() + _Pos, head_presize, _fmtts.time.c_str(), _Tm);            return _Pos;        }        size_t _build_type(size_t _Pos, const char* _Type) {            if (!(head_style & log::hs_type))                return _Pos;            _Pos += sprintf((char*)s.data() + _Pos, _fmtts.type.c_str(), _Type);            return _Pos;        }        size_t _build_pid(size_t _Pos) {            if (!(head_style & log::hs_pid))                return _Pos;            auto _Pid = __aqxlog_getpid();            _Pos += sprintf((char*)s.data() + _Pos, _fmtts.pid.c_str(), _Pid);            return _Pos;        }        size_t _build_tid(size_t _Pos) {            if (!(head_style & log::hs_tid))                return _Pos;            auto _Tid = __aqxlog_gettid();            _Pos += sprintf((char*)s.data() + _Pos, _fmtts.tid.c_str(), _Tid);            return _Pos;        }        size_t _build_head(const char* _Type) {            return _build_tid(_build_pid(_build_type(_build_time(0), _Type)));        }        size_t _gethps() {            size_t _Result = 3;            if (head_style & log::hs_time)                _Result += ((_fmtts.time.length() << 1) + 30);            if (head_style & log::hs_type)                _Result += ((_fmtts.pid.length() << 1) + 12);            if (head_style & log::hs_pid)                _Result += ((_fmtts.pid.length() << 1) + 20);            if (head_style & log::hs_tid)                _Result += ((_fmtts.pid.length() << 1) + 20);            return _Result;        }    private:        std::vector<char> s;        FILE* fp;        _format_texts _fmtts;        int head_style;        size_t head_presize;        bool PutStdout;        FILE* _stdout_fp;        std::mutex _Mtx;        std::string _DefType;        bool _EnableInfo;        bool _EnableDebug;        bool _EnableError;        bool _EnableWarn;    };}static aqx::log logger;#pragma warning(pop)

最后是测试代码:客户端和服务端放在一起了,要分离就从nio.init后面的几个地方分离一下。

// main.cpp#include <iostream>#include <aqx/netio.hpp>int main(){    aqx::init_winsock();    aqx::netio nio;    nio.init(1440, 0x10000);    // 一个简单的echo服务器例子:    nio.server([](aqx::coio io)->aqx::netio::task {        // 服务端始终应该放在一个死循环里,否则兜底逻辑会反复创建新协程。        for (;;) {            // io.accept会返回一个可用于异步send和close的对象            auto s = co_await io.accept();            logger("客户端连入:%s", aqx::net_base::sockaddr_to_string(s.getsockaddr()));            for (;;) {                auto buf = co_await io.recv(s);                if (!buf.length()) {                    logger("断开连接!");                    break;                }                puts(buf.data());                buf.clear();                // 异步发送,协程不会在这里挂起                s.send("收到!", 5);                            }            co_await io.close(s);            logger("已关闭!");        }    }, aqx::netio::listener("0.0.0.0:55554", 100, 100));    // 我已经懒到让客户端和服务端都放在一起了,要分自己分    auto sock1 = nio.client([](aqx::coio io)->aqx::netio::task {        // 客户端只有需要自动重连,才放在循环里处理        for (;;) {            auto s = co_await io.connect("127.0.0.1:55554");            if (!s) {                co_await io.close(s);                continue;            }            for (;;) {                auto buf = co_await io.recv(s);                if (!buf.length()) {                    break;                }                puts(buf.data());                buf.clear();            }                        co_await io.close(s);        }           });    // 我已经懒到让客户端和服务端都放在一起了,要分自己分    auto sock2 = nio.client([](aqx::coio io)->aqx::netio::task {        // 客户端只有需要自动重连,才放在循环里处理        for (;;) {            auto s = co_await io.connect("127.0.0.1:55554");            if (!s) {                co_await io.close(s);                continue;            }            for (;;) {                auto buf = co_await io.recv(s);                if (!buf.length()) {                    break;                }                puts(buf.data());                buf.clear();            }            co_await io.close(s);        }    });        std::string str;    for (;;) {        std::cin >> str;        if (str == "exit")            break;        std::string sd = "sock1:";        sd += str;        sock1.safe_send(sd.data(), (int)sd.length() + 1);        sd = "sock2:";        sd += str;        sock2.safe_send(sd.data(), (int)sd.length() + 1);    }    nio.stop();    nio.release();}

我还是稍微负责一点,既然发现了编译bug,还是跟踪一下吧。

如果 recv_awaitable::await_ready()是inline时,流程态remuse切换到 协程态 时,会经过以下流程
00007FF723AF6000 mov r11,rsp
00007FF723AF6003 mov qword ptr [r11+10h],rbx
00007FF723AF6007 mov qword ptr [r11+18h],rsi
00007FF723AF600B mov qword ptr [r11+20h],rdi
00007FF723AF600F mov qword ptr [r11+8],rcx
00007FF723AF6013 push r12
00007FF723AF6015 push r14
00007FF723AF6017 push r15
00007FF723AF6019 sub rsp,90h
00007FF723AF6020 mov rax,qword ptr [__security_cookie (07FF723AFA008h)]
00007FF723AF6027 xor rax,rsp
00007FF723AF602A mov qword ptr [rsp+80h],rax
00007FF723AF6032 mov rdi,rcx
00007FF723AF6035 mov qword ptr [rsp+50h],rcx
00007FF723AF603A movzx eax,word ptr [rdi+2Ch]
00007FF723AF603E mov word ptr [rsp+48h],ax
00007FF723AF6043 inc ax
00007FF723AF6046 cmp ax,0Ah
00007FF723AF604A ja `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+463h (07FF723AF6463h)
00007FF723AF6050 movsx rax,ax
00007FF723AF6054 lea rdx,[__ImageBase (07FF723AF0000h)]
00007FF723AF605B mov ecx,dword ptr [rdx+rax*4+6494h]
00007FF723AF6062 add rcx,rdx
00007FF723AF6065 jmp rcx
00007FF723AF6067 jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+82h (07FF723AF6082h)
00007FF723AF6069 xor r15d,r15d
00007FF723AF606C mov dword ptr [rdi+1B0h],r15d
00007FF723AF6073 mov r12d,10000h
00007FF723AF6079 jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+2E0h (07FF723AF62E0h)
00007FF723AF607E jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+82h (07FF723AF6082h)
00007FF723AF6080 jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+82h (07FF723AF6082h)
}
}, aqx::netio::listener("0.0.0.0:55554", 100, 100));
00007FF723AF6082 cmp word ptr [rdi+0Ah],0
00007FF723AF6087 je `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+464h (07FF723AF6464h)
00007FF723AF608D mov edx,1B4h
00007FF723AF6092 mov rcx,rdi
00007FF723AF6095 call operator delete (07FF723AF5504h)
00007FF723AF609A jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+464h (07FF723AF6464h)
00007FF723AF609F xor r15d,r15d
00007FF723AF60A2 mov r12d,10000h
00007FF723AF60A8 mov rdx,qword ptr [__coro_frame_ptr] ******************************************** 在这里获取了__coro_frame_ptr.__resume_address
00007FF723AF60AD jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+26Bh (07FF723AF626Bh)
00007FF723AF60B2 xor r15d,r15d
00007FF723AF60B5 mov r12d,10000h
00007FF723AF60BB jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+2C4h (07FF723AF62C4h)
00007FF723AF60C0 xor r15d,r15d
00007FF723AF60C3 mov r12d,10000h

---------------------------------------------------------------------------------------------------

00007FF723AF60A8 mov rdx,qword ptr [__coro_frame_ptr]
它直接拷贝了协程帧结构下 offset=0的__resume_address

00007FF723AF60AD jmp `main'::`2'::<lambda_1>$_ResumeCoro$1::operator()+26Bh (07FF723AF626Bh)
紧接着直接跳转过去,就将rdx当作recv_awaitable去进行操作

---------------------------------------------------------------------------------------------------
这个问题我敢肯定100%是编译器bug,导致这个问题的原因,一定不是简简单单的内联因素,绝对会有更深层次的编译逻辑导致此bug,但那是微软的问题。

免责声明:本网信息来自于互联网,目的在于传递更多信息,并不代表本网赞同其观点。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,并请自行核实相关内容。本站不承担此类作品侵权行为的直接责任及连带责任。如若本网有任何内容侵犯您的权益,请及时联系我们,本站将会在24小时内处理完毕。
相关文章
返回顶部