2024年2月20日发(作者:)
ime = () def interv(self): return () - ime def timenowstr(self): tmptnow = () # print("%s.%s"%(me("%Y-%m-%d %H:%M:%S"), (".%3f"%((tmptnow)))) return me("%Y-%m-%d %H:%M:", ime(tmptnow)) + ("%.3f"%(tmptnow % 60))sockList = []#('ulimit -n 20000')def testconnectServer(): for i in range(clientTestNumber): try: #print("connet index = %d"%(i)) tmpTcpCliSock = socket(AF_INET,SOCK_STREAM) t(addr) #print(data) (tmpTcpCliSock) except Exception as e: print("err->%d:%s"%(i, e)) breakdef tSendDataToSerser(): for i in range(len(sockList)): try: strbytes = bytes("abcdef", encoding = "utf8") sockList[i].send(strbytes) data = sockList[i].recv(2048) except Exception as e: print("err->%d:%s"%(i, e))def freeConnectServer(): for node in sockList: try: () except Exception as e: print(e)printdbg(str(clientTestNumber))tmptimec = timepy()#print(wstr())testconnectServer()print('connect finish:%f'%(()))()tSendDataToSerser()print('send finish:%f'%(()))()freeConnectServer()print('free finish:%f'%(()))# while True:# (1)()测试结果:
clientTestNumber = oneThreadConnectNumber * threadNumberNeeddef printdbg(pstr = ""): dt_ms = ().strftime('%Y-%m-%d %H:%M:%S.%f') print("[%s][%s:%s:%s]%s"%(dt_ms, sys._getframe().f_back.f__filename,
sys._getframe().f_back.f__name, sys._getframe().f_back.f_lineno, pstr))class timepy: def __init__(self): ime = () def timenow(self): return () def begin(self): ime = () def interv(self): return () - ime def timenowstr(self): tmptnow = () # print("%s.%s"%(me("%Y-%m-%d %H:%M:%S"), (".%3f"%((tmptnow)))) return me("%Y-%m-%d %H:%M:", ime(tmptnow)) + ("%.3f"%(tmptnow % 60))sockListLock = ()sockList = []#('ulimit -n 20000')def testconnectServer(): tmpSockListBuf = [] for i in range(oneThreadConnectNumber): try: #print("connet index = %d"%(i)) tmpTcpCliSock = socket(AF_INET,SOCK_STREAM) t(addr) #print(data) (tmpTcpCliSock) except Exception as e: print("err->%d:%s"%(i, e)) break e() (tmpSockListBuf) e()def tSendDataToSerser(threadindex): for i in range(len(sockList[threadindex])): try: strbytes = bytes("abcdef", encoding = "utf8") sockList[threadindex][i].send(strbytes) data = sockList[threadindex][i].recv(2048) except Exception as e: print("err->%d:%s"%(i, e))def freeConnectServer(threadindex): for node in sockList[threadindex]: try: () except Exception as e: print(e)runthreadlist = []tmptimec = timepy()printdbg('total number : %s;thread num:%d'%(str(clientTestNumber), threadNumberNeed))for i in range(threadNumberNeed): ((target = testconnectServer))for thnode in runthreadlist: ()for thnode in runthreadlist: ()print('connect finish:%f'%(()))()
if(hcinf->bev) { bufferevent_set_timeouts(hcinf->bev, NULL, NULL); bufferevent_enable (hcinf->bev, EV_READ); bufferevent_free(hcinf->bev); hcinf->bev = NULL; }}void libClientEventWork(){ evthread_use_pthreads(); base = event_base_new(); struct event basetimeout; if(base == NULL) { logwerr("err"); return; } event_assign(&basetimeout, base, -1, EV_PERSIST, clientTimeout_cb, (void*) &basetimeout); struct timeval tv; _sec = 20; _usec = 0; event_add(&basetimeout, &tv); event_base_dispatch(base); event_base_free(base); logwdbg("finished");}void libClientCommunicatWork(){ int tmpFlag, i; std::string tmpStr; initHostClientInform(); backClientRunflag = 1; timerc respontTask, hb_reconTime; while(backClientRunflag) { usleep(10 * 1000); msgWrkCls::instance()->rcvmsg(tmpStr, msgTypeIdHostCli); if(() > 5000 || tmpStr == "1") { (); }
if(hb_() < 20) { continue; } hb_(); for(i = 0; i < HOST_CLIENT_MAX_NUMBER; i++) {
if(!(hcpcWorkList[i].bev != NULL && hcpcWorkList[i].servIp != "" && hcpcWorkList[i].servPort != "")) { continue; } logwdbg("%p,%s,%s", hcpcWorkList[i].bev, hcpcWorkList[i].servIp.c_str(), hcpcWorkList[i].servPort.c_str()); hcpcWorkList[i].sendHeartbeatToServer(); // hcpcWorkList[i].sendOnlineDevsToServer(); // hcpcWorkList[i].getTableInformCom(&tabHostWrkRegister); // hcpcWorkList[i].getTableInformCom(&tabHostWrkParm); } // logwdbg("-->"); for(i = 0; i < HOST_CLIENT_MAX_NUMBER; i++) { if(!(hcpcWorkList[i].bev == NULL && hcpcWorkList[i].servIp != "" && hcpcWorkList[i].servPort != ""))
if(!(hcpcWorkList[i].bev == NULL && hcpcWorkList[i].servIp != "" && hcpcWorkList[i].servPort != "")) { continue; } hcpcWorkList[i].bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);//没有添加BEV_OPT_THREADSAFE if(hcpcWorkList[i].bev == NULL) { logwerr("err"); continue; } struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server__family = AF_INET; server__port = htons(atoi(hcpcWorkList[i].servPort.c_str())); inet_aton(hcpcWorkList[i].servIp.c_str(), &server__addr); tmpFlag = bufferevent_socket_connect(hcpcWorkList[i].bev, (struct sockaddr *)&server_addr, sizeof(server_addr)); logwdbg("%s:%s,%d", hcpcWorkList[i].servIp.c_str(), hcpcWorkList[i].servPort.c_str(), tmpFlag); if(tmpFlag < 0) { logwerr("err"); continue; } hcpcWorkList[i].initParseClass(); bufferevent_setcb(hcpcWorkList[i].bev, server_msg_cb, NULL, event_cb, (void *)&hcpcWorkList[i]); bufferevent_enable(hcpcWorkList[i].bev, EV_READ | EV_PERSIST); struct timeval tv = {120, 0}; bufferevent_set_timeouts(hcpcWorkList[i].bev, &tv, NULL); } }}void beginHostClientWork(){ hostClientThead = std::thread(libClientEventWork); hostClientConnectThead = std::thread(libClientCommunicatWork);}void exitHostClientWork(){ backClientRunflag = 0; // for(int i = 0; i < HOST_CLIENT_MAX_NUMBER; i++) // { // freeConnectBuffEvent(&clientBackList[i]); // } if(base) { event_base_loopbreak(base); event_base_loopexit(base, NULL); } (); ();}void server_msg_cb(struct bufferevent *bev, void *arg){ static char msg[16 * 1024]; size_t len = bufferevent_read(bev, msg, sizeof(msg)); if(len < sizeof(msg)) { msg[len] = '0'; } logwdbg("recv[%ld]", len); // logwdbg("recv[%ld]:%s", len, msg); parseHostClientClass *tmpClient = (parseHostClientClass *)arg;


发布评论