Skip to content
  • Categories
  • Recent
  • Tags
  • Popular
  • Users
  • Groups
  • Search
  • Get Qt Extensions
  • Unsolved
Collapse
Brand Logo
  1. Home
  2. Qt Development
  3. General and Desktop
  4. QTcpServer, QTcpSocket and LOST packets!
QtWS25 Last Chance

QTcpServer, QTcpSocket and LOST packets!

Scheduled Pinned Locked Moved Solved General and Desktop
qtcpsocketqtcpserver
9 Posts 2 Posters 872 Views
  • Oldest to Newest
  • Newest to Oldest
  • Most Votes
Reply
  • Reply as topic
Log in to reply
This topic has been deleted. Only users with topic management privileges can see it.
  • D Offline
    D Offline
    Dariusz
    wrote on 20 May 2021, 09:08 last edited by Dariusz
    #1

    Hey

    This was a fun week of screams, I've finally narrowed it down to... well I have no idea what to be honest. So maybe some1 can enlighten me here.
    I have a server that creates connection in worker thread, I also have a client-socket spawner that makes 400 sockets in 40 threads and bombards the server with connections.

    Now the good news is that server accepts all 400 connections, the bad news is server mises 10-20% of messages from those connections! ; O !!

    Here is the server :

    
    #include "QThread"
    #include "QApplication"
    #include "QTcpServer"
    #include "QTcpSocket"
    #include "QHostAddress"
    #include "QWidget"
    
    class myServerSocket : public QTcpSocket {
    Q_OBJECT
    public:
        myServerSocket(QThread *workerThread);
        ~myServerSocket();
    public Q_SLOTS:
        void handleAvailableData();
    Q_SIGNALS:
        void newData(myServerSocket *senderPtr, int dataSize);
    };
    myServerSocket::myServerSocket(QThread *workerThread) : QTcpSocket(nullptr) {
        moveToThread(workerThread);
        connect(this, &QTcpSocket::readyRead, this, &myServerSocket::handleAvailableData);
    }
    
    myServerSocket::~myServerSocket() {
    
    }
    
    
    void myServerSocket::handleAvailableData() {
        auto data = readAll();
        Q_EMIT newData(this, data.size());
    
        if (bytesAvailable() > 0) {
            QMetaObject::invokeMethod(this, [this]() { handleAvailableData(); }, Qt::QueuedConnection);
        }
    }
    
    class myServer : public QTcpServer {
    Q_OBJECT
        QThread *mClientWorker;
        std::atomic<unsigned int> mConCount;
    protected:
        void incomingConnection(qintptr handle) override;
    
    public:
        myServer();
        ~myServer();
    
    public Q_SLOTS:
        void handleItemData(myServerSocket *client, int dataSize);
        void handleItemDeleted(QObject *itemPtr);
    
    };
    
    myServer::myServer() {
        mClientWorker = new QThread();
        mClientWorker->setObjectName("clientWorker");
        mClientWorker->start();
        listen(QHostAddress("localhost"), 40000);
    }
    
    myServer::~myServer() {
    
    }
    
    void myServer::handleItemData(myServerSocket *client, int dataSize) {
        qDebug() << QThread::currentThread << "Got data : " << dataSize << client;
    }
    
    void myServer::incomingConnection(qintptr handle) {
        auto myClient = new myServerSocket(mClientWorker);
        myClient->setSocketOption(QAbstractSocket::KeepAliveOption, true);
        QMetaObject::invokeMethod(myClient, [&, cl = myClient, ptr = handle]() { cl->setSocketDescriptor(ptr); }, Qt::QueuedConnection);
        mConCount++;
        qDebug() << "Got new con! : " << mConCount;
        connect(myClient, &myServerSocket::newData, this, &myServer::handleItemData, Qt::DirectConnection); // we want to emit in client thread for now
        connect(myClient, &myServerSocket::destroyed, this, &myServer::handleItemDeleted, Qt::QueuedConnection);
        connect(myClient, &myServerSocket::disconnected, myClient, &myServerSocket::deleteLater, Qt::QueuedConnection);
    }
    void myServer::handleItemDeleted(QObject *itemPtr) {
        mConCount--;
        qDebug() << "Losing client : " << mConCount;
    }
    
    
    int main(int argc, char *argv[]) {
        auto app = QApplication(argc, argv);
        myServer s;
        QWidget w;
        w.show();
        auto ax = app.exec();
    
        return ax;
    }
    

    Here is Client bombardier :

    
    #include <QThreadPool>
    #include <QtConcurrent/QtConcurrent>
    #include "QThread"
    #include "QApplication"
    #include "QTcpServer"
    #include "QTcpSocket"
    #include "QHostAddress"
    
    class myClient : public QTcpSocket {
    Q_OBJECT
        QString ip;
        unsigned int port;
        unsigned int clientId;
        unsigned int workerId;
    public:
        myClient(const QString &i, const unsigned int p, const unsigned int cId, const unsigned int wId, QObject *parent);
        ~myClient();
        void sendData();
    
    };
    myClient::myClient(const QString &i, const unsigned int p, const unsigned int cId, const unsigned int wId, QObject *parent) : QTcpSocket(parent) {
        ip = i;
        port = p;
        clientId = cId;
        workerId = wId;
    }
    
    myClient::~myClient() {
    
    }
    void myClient::sendData() {
        QMetaObject::invokeMethod(this, [this]() { /// send to client thread.
            connectToHost(ip, port);
            waitForConnected();
            qDebug() << QThread::currentThread() << clientId << workerId << "Connecting";
            if (state() == QTcpSocket::ConnectedState) {
                QByteArray data;
                data.resize(724);
                auto w = write(data);
                flush();
                qDebug() << clientId << workerId << "I send : " << w << " Data";
            } else {
                qWarning() << clientId << workerId << "Could not send data as I didnt connect...";
            }
        }, Qt::QueuedConnection);
    }
    
    
    int main(int argc, char *argv[]) {
        auto app = QApplication(argc, argv);
    
    
        std::atomic<unsigned int> maxProcess = 40;
        const unsigned int maxClient = 10;
        const unsigned int aliveTime = 5 * 1000;// 5 seconds;
        QThreadPool pool;
        pool.setMaxThreadCount(maxProcess);
        std::atomic<unsigned int> currentCount;
        while (true) {
    #pragma  omp parallel for
            for (int ca = 0; ca < maxProcess; ca++) {
                if (maxProcess > currentCount) {
                    QtConcurrent::run(&pool, [&]() {
                                          currentCount++;
                                          QThread *myWorker = new QThread();
                                          myWorker->setObjectName("Worker id : " + QString::number(currentCount));
                                          QObject *parent = new QObject();
                                          myWorker->start();
    
                                          std::vector<myClient *> clients(maxClient);
                                          for (int x = 0; x < maxClient; ++x) {
                                              auto c = new myClient("localhost", 40000, x, currentCount, parent);
                                              clients[x] = c;
                                          }
                                          parent->moveToThread(myWorker);
                                          app.processEvents();
                                          QThread::msleep(500);
                                          app.processEvents();;
    #pragma  omp parallel for
                                          for (int x = 0; x < maxClient; ++x) {
                                              clients[x]->sendData();
                                          }
                                          unsigned int wait = 0;
                                          while (true) {
                                              QThread::msleep(1);
                                              app.processEvents();
                                              wait++;
                                              if (wait > aliveTime) {
                                                  break;
                                              }
                                          }
                                          for (auto &client:clients) {
                                              client->deleteLater();
                                          }
                                          parent->deleteLater();
                                          myWorker->quit();
                                          myWorker->wait();
                                          myWorker->deleteLater();
                                          currentCount--;
                                      }
                    );
                }
            }
            app.processEvents();
        }
    
        auto ax = app.exec();
    
        return ax;
    }
    

    The question is... how do I not lose the messages? :D I have a "massive" app that relies on a similar system and over there out of 1200 connections/messages I get around ... 20 messages through...! Painful week, can any1 shed information on where I'm doing a mistake here?

    I've also tried delaying message send on the client side, say send 3s after connection, still the same issue! :- (((

    The way I test it is I run the server/client until clients start to disconnect, then I pause the app, and run a search over the output. Usually searching for got new c will yeld 400 results, but searching for Got data will be less than 400, thus I have message loss.

    TIA!

    Uuuu a side note, when I run serverClients in "main" thread, and not my worker thread, I did not lose messages, but in worker thread I lose them ! :- (

    1 Reply Last reply
    0
    • D Offline
      D Offline
      Dariusz
      wrote on 20 May 2021, 11:36 last edited by
      #6

      I may have a breakthrough... need to test more... but it looks like settings

          connect(myClient, &myServerSocket::newData, this, &myServer::handleItemData, Qt::DirectConnection); // we want to emit in client thread for now
          connect(myClient, &myServerSocket::destroyed, this, &myServer::handleItemDeleted, Qt::QueuedConnection);
          connect(myClient, &myServerSocket::disconnected, myClient, &myServerSocket::deleteLater, Qt::QueuedConnection);
      

      Before setting socketDescriptor... fixed the "lost" packages... need to test more, but maybe ?

      Hmmm, does it matter on which thread I create those connections?

      1 Reply Last reply
      0
      • VRoninV Offline
        VRoninV Offline
        VRonin
        wrote on 20 May 2021, 10:11 last edited by VRonin
        #2

        How do you check you are losing messages?

        P.S.

        • #pragma omp parallel for should nowadays be replaced with std::for_each(std::execution::par
        • you are leaking objects (parent for example)
        • You are making a mess with threads, mixing concurrent with QThread for no real reason tbh.
        • The repository attached to this example might be of help to you. It looks like you are overcomplicating things. You can also check the commonlib branch of the repo for a more advanced example

        "La mort n'est rien, mais vivre vaincu et sans gloire, c'est mourir tous les jours"
        ~Napoleon Bonaparte

        On a crusade to banish setIndexWidget() from the holy land of Qt

        D 1 Reply Last reply 20 May 2021, 10:48
        4
        • VRoninV VRonin
          20 May 2021, 10:11

          How do you check you are losing messages?

          P.S.

          • #pragma omp parallel for should nowadays be replaced with std::for_each(std::execution::par
          • you are leaking objects (parent for example)
          • You are making a mess with threads, mixing concurrent with QThread for no real reason tbh.
          • The repository attached to this example might be of help to you. It looks like you are overcomplicating things. You can also check the commonlib branch of the repo for a more advanced example
          D Offline
          D Offline
          Dariusz
          wrote on 20 May 2021, 10:48 last edited by
          #3

          @VRonin When I see disconnections start(as all 400 die at the same time) I pause app and run a search string over the output log.

          I then can see that I have 400 connections & 400> messages. So I know I lost packets as I should have 400 mesages/400 connections.

          #pragma omp parallel for should nowadays be replaced with std::for_each(std::execution::par woa! Will dig in to that, thanks!

          you are leaking objects (parent for example)
          Parent gets deleted in client at the end. So no leak there.

               for (auto &client:clients) {
                                                    client->deleteLater();
                                                }
                                                parent->deleteLater(); /// < here
                                                myWorker->quit();
                                                myWorker->wait();
                                                myWorker->deleteLater();
          

          You are making a mess with threads, mixing concurrent with QThread for no real reason tbh.
          In client ? Yeah... its a bit of a "hacky" method. But I added

          #pragma  omp parallel for
                  for (int ca = 0; ca < maxProcess; ca++) {
                      if (maxProcess > currentCount) {
          

          5 min before posting the topic and I didn't realize I can drop concurrent, still should not cause "real" issue, as there are no warning from Qt and in my other app I'm loosing messages without the concurent bananza.
          The repository attached to this example might be of help to you. It looks like you are overcomplicating things. You can also check the commonlib branch of the repo for a more advanced example
          Sweet thanks, will check it out!

          Again, odd thing is that if I run all my serverClients in main thread = no packet loss, if I use my worker thred = packet loss... Its as if worker thread is somewhat leaking data.

          TIA!

          VRoninV 1 Reply Last reply 20 May 2021, 10:53
          0
          • D Dariusz
            20 May 2021, 10:48

            @VRonin When I see disconnections start(as all 400 die at the same time) I pause app and run a search string over the output log.

            I then can see that I have 400 connections & 400> messages. So I know I lost packets as I should have 400 mesages/400 connections.

            #pragma omp parallel for should nowadays be replaced with std::for_each(std::execution::par woa! Will dig in to that, thanks!

            you are leaking objects (parent for example)
            Parent gets deleted in client at the end. So no leak there.

                 for (auto &client:clients) {
                                                      client->deleteLater();
                                                  }
                                                  parent->deleteLater(); /// < here
                                                  myWorker->quit();
                                                  myWorker->wait();
                                                  myWorker->deleteLater();
            

            You are making a mess with threads, mixing concurrent with QThread for no real reason tbh.
            In client ? Yeah... its a bit of a "hacky" method. But I added

            #pragma  omp parallel for
                    for (int ca = 0; ca < maxProcess; ca++) {
                        if (maxProcess > currentCount) {
            

            5 min before posting the topic and I didn't realize I can drop concurrent, still should not cause "real" issue, as there are no warning from Qt and in my other app I'm loosing messages without the concurent bananza.
            The repository attached to this example might be of help to you. It looks like you are overcomplicating things. You can also check the commonlib branch of the repo for a more advanced example
            Sweet thanks, will check it out!

            Again, odd thing is that if I run all my serverClients in main thread = no packet loss, if I use my worker thred = packet loss... Its as if worker thread is somewhat leaking data.

            TIA!

            VRoninV Offline
            VRoninV Offline
            VRonin
            wrote on 20 May 2021, 10:53 last edited by
            #4

            @Dariusz said in QTcpServer, QTcpSocket and LOST packets!:

            400 mesages

            By "messages" you mean qDebug() << QThread::currentThread << "Got data : " << dataSize << client;?

            "La mort n'est rien, mais vivre vaincu et sans gloire, c'est mourir tous les jours"
            ~Napoleon Bonaparte

            On a crusade to banish setIndexWidget() from the holy land of Qt

            D 1 Reply Last reply 20 May 2021, 10:57
            0
            • VRoninV VRonin
              20 May 2021, 10:53

              @Dariusz said in QTcpServer, QTcpSocket and LOST packets!:

              400 mesages

              By "messages" you mean qDebug() << QThread::currentThread << "Got data : " << dataSize << client;?

              D Offline
              D Offline
              Dariusz
              wrote on 20 May 2021, 10:57 last edited by Dariusz
              #5

              @VRonin said in QTcpServer, QTcpSocket and LOST packets!:

              @Dariusz said in QTcpServer, QTcpSocket and LOST packets!:

              400 mesages

              By "messages" you mean qDebug() << QThread::currentThread << "Got data : " << dataSize << client;?

              Yep I run search in my ide over console log for "Got Data"... I kinda expect them to get printed... like... yea... if that does not print then I have a problem :D
              aaabc136-7300-41ca-8447-d1b2be4602d5-image.png

              clientWorker here is the client socket worker I move it in to. While in it there is packet loss.

              1 Reply Last reply
              0
              • D Offline
                D Offline
                Dariusz
                wrote on 20 May 2021, 11:36 last edited by
                #6

                I may have a breakthrough... need to test more... but it looks like settings

                    connect(myClient, &myServerSocket::newData, this, &myServer::handleItemData, Qt::DirectConnection); // we want to emit in client thread for now
                    connect(myClient, &myServerSocket::destroyed, this, &myServer::handleItemDeleted, Qt::QueuedConnection);
                    connect(myClient, &myServerSocket::disconnected, myClient, &myServerSocket::deleteLater, Qt::QueuedConnection);
                

                Before setting socketDescriptor... fixed the "lost" packages... need to test more, but maybe ?

                Hmmm, does it matter on which thread I create those connections?

                1 Reply Last reply
                0
                • D Offline
                  D Offline
                  Dariusz
                  wrote on 20 May 2021, 12:20 last edited by
                  #7

                  Applied the same rule to my big app. I'm now setting the socket descriptor last in configuring my objects. It appears to work great! No lost packets on 10k connections spam! yay!

                  1 Reply Last reply
                  1
                  • VRoninV Offline
                    VRoninV Offline
                    VRonin
                    wrote on 20 May 2021, 12:22 last edited by
                    #8

                    Maybe I'm missing something here but you have a server with a single thread handling 10k connections, it might create bottlenecks...

                    "La mort n'est rien, mais vivre vaincu et sans gloire, c'est mourir tous les jours"
                    ~Napoleon Bonaparte

                    On a crusade to banish setIndexWidget() from the holy land of Qt

                    D 1 Reply Last reply 20 May 2021, 12:55
                    0
                    • VRoninV VRonin
                      20 May 2021, 12:22

                      Maybe I'm missing something here but you have a server with a single thread handling 10k connections, it might create bottlenecks...

                      D Offline
                      D Offline
                      Dariusz
                      wrote on 20 May 2021, 12:55 last edited by
                      #9

                      @VRonin yes thats right. But the example here is simplified to the bone to track down that evil issue...
                      Currently my server in release mode with profiler enable pushes around 3000 connections in 592ms, so 6k per second +/-... I've no idea if thats good or bad, But looking at profiler >
                      319d7357-9211-4008-ad08-dde32da2cfc0-image.png
                      The top green are new connection handlers, all it does is grab a handle > pushes it to vector. Its about 0.001 ms
                      the second row is the creation of objects for each handle, which is done in worker and the "tall" one are all the pragma omp helping multithread different parts of app.
                      Seems pretty fast given that in that time >

                      1. accept connections,
                      2. push them to monitor for display to show that there are new connections - this is batched too
                      3. Set connections details from packed I received
                      4. Validate connection credentials
                        some more work.

                      Yeah, I'm amazed o.o, but I have no idea if thats good/bad performance-wise. Learning networking here lol. Possibly 360k Connections /validations/ displaying per minute ? I can't spawn enough test connections per second to test how much can I push lol.

                      1 Reply Last reply
                      0

                      1/9

                      20 May 2021, 09:08

                      • Login

                      • Login or register to search.
                      1 out of 9
                      • First post
                        1/9
                        Last post
                      0
                      • Categories
                      • Recent
                      • Tags
                      • Popular
                      • Users
                      • Groups
                      • Search
                      • Get Qt Extensions
                      • Unsolved