Changeset 69 for osgVisual/src


Ignore:
Timestamp:
Jul 20, 2010, 10:44:13 AM (14 years ago)
Author:
Torben Dannhauer
Message:

Network sync:
now works:

  • serialization of transport container
  • transport via ENet UDP
  • de-serialization of the transport container.
  • transport of viewmatrix and framenumber to the slave.

ToDo?: apply viewmatrix on slave still do not work.

Location:
osgVisual/src
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • osgVisual/src/cluster/dataIO_clusterENet.cpp

    r67 r69  
    2323        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet constructed" << std::endl;
    2424        serverToConnect = "unknown";
     25        hardSync = false;       // integrate into init()
     26        port = 12345;   // integrate into init()
    2527}
    2628
     
    3234
    3335
    34 void dataIO_clusterENet::init( osg::ArgumentParser& arguments_, clustermode clusterMode_, osgVisual::dataIO_transportContainer* sendContainer_, bool compressionEnabled_, bool asAscii_ )
     36void dataIO_clusterENet::init( osg::ArgumentParser& arguments_, osgViewer::Viewer* viewer_, clustermode clusterMode_, osgVisual::dataIO_transportContainer* sendContainer_, bool compressionEnabled_, bool asAscii_ )
    3537{
    3638        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet init();" << std::endl;
    3739       
     40        // Store viewer
     41        viewer = viewer_;
     42
    3843        // Configure the clustermode
    3944        clusterMode = clusterMode_;
     
    4146        // store sendContainer
    4247        sendContainer = sendContainer_;
     48
     49        // Configure Compression and instantiate read/write-options
     50        std::string readOptionString = "";
     51        std::string writeOptionString = "";
     52        if(asAscii_)
     53        {
     54                readOptionString = "Ascii";
     55                writeOptionString = "Ascii";
     56        }
     57        if (compressionEnabled_)
     58                writeOptionString+=" Compressor=zlib";
     59        readOptions = new osgDB::Options( readOptionString.c_str() );
     60        writeOptions = new osgDB::Options( writeOptionString.c_str() );
     61
     62        // Get ReaderWriter
     63        rw = osgDB::Registry::instance()->getReaderWriterForExtension("osgb"); 
    4364       
    4465        // create ENet implementation object.
    45         enet_impl = new osgVisual::dataIO_clusterENet_implementation();
     66        enet_impl = new osgVisual::dataIO_clusterENet_implementation(receivedTransportContainer);
    4667
    4768        // initialize ENet implementation
    4869        if(clusterMode == MASTER)
    4970        {
     71                std::cout << "Init dataIO_cluster_ENet as Server on port " << port << std::endl;
    5072                enet_impl->init(dataIO_clusterENet_implementation::SERVER, port);
    5173
     
    81103                {
    82104                        initialized = false;
    83                         std::cout << "Failed to establish connection to server " << serverToConnect << std::endl;
     105                        std::cout << "Finally failed to establish connection to server " << serverToConnect << std::endl;
     106                        exit(-1);
    84107                }
    85108        }       // IF SLAVE END
     
    95118bool dataIO_clusterENet::sendTO_OBJvaluesToSlaves()
    96119{
    97         OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendTO_OBJvaluesToSlaves()" << std::endl;
    98 
     120        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendTO_OBJvaluesToSlaves()" << std::endl;
     121
     122        if(sendContainer.valid())
     123        {
     124                // Pack FrameID & Viewmatrix
     125                sendContainer->setFrameID(viewer->getFrameStamp()->getFrameNumber());
     126                sendContainer->setViewMatrix(viewer->getCamera()->getViewMatrix());
     127
     128                // Writing node to stream
     129                std::stringstream myOstream;
     130                if ( rw )
     131                {
     132                        osgDB::ReaderWriter::WriteResult wr = rw->writeObject( *sendContainer.get(), myOstream, writeOptions );
     133                        if (wr.success() )                     
     134                        {
     135                                // Send Data via ENet:
     136                                OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterUDP::sendTO_OBJvaluesToSlaves() - Bytes to send: " << myOstream.str().length() << std::endl;
     137                                //sOSG_NOTIFY( osg::ALWAYS ) << "Send: " << myOstream.str() << std::endl;
     138                                ENetPacket * packet = enet_packet_create (myOstream.str().c_str(),
     139                                                                                                  myOstream.str().size(),
     140                                                                                                  ENET_PACKET_FLAG_RELIABLE);
     141               
     142                                // Send data
     143                                enet_impl->sendPacket( packet, 0, 0, true);
     144                        }
     145                        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Save failed: " << wr.message() << std::endl;
     146                }
     147                else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Unable to get readerWriter for osgb" << std::endl;
     148        }
     149        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Invalid transportContainer" << std::endl;
     150
     151
     152        enet_impl->processEvents();     // As Master: process events AFTER doing anything to have up to have the "sent" commands in queue.
    99153        return true;
    100154}
     
    103157bool dataIO_clusterENet::readTO_OBJvaluesFromMaster()
    104158{
    105         OSG_NOTIFY( osg::ALWAYS ) << "clusterENet readTO_OBJvaluesFromMaster()" << std::endl;
     159        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet readTO_OBJvaluesFromMaster()" << std::endl;
     160        enet_impl->processEvents();     // As Slave: process events BEFORE doing anything to have up to date values.
     161
     162        int bytes_received = receivedTransportContainer.size();
     163        if (bytes_received > 0 )
     164        {
     165                OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Bytes received: " << bytes_received << std::endl;
     166                //OSG_NOTIFY( osg::ALWAYS ) << "Received: " << std::endl << receivedTransportContainer << std::endl;
     167
     168                // Unserialize data
     169                if ( rw )
     170                {
     171                        std::stringstream test;
     172                        test << receivedTransportContainer;
     173                        osgDB::ReaderWriter::ReadResult rr = rw->readObject( test, readOptions );
     174                        if (rr.success())
     175                        {
     176                                sendContainer = dynamic_cast<osgVisual::dataIO_transportContainer*>(rr.takeObject());
     177                                if (sendContainer)
     178                                {
     179                                        OSG_NOTIFY( osg::ALWAYS ) << "Received::FrameID is: " << sendContainer->getFrameID() << std::endl;
     180                                        // Restore Viewmatrix
     181                                        viewer->getCamera()->setViewMatrix(sendContainer->getViewMatrix());
     182                                }
     183                                else
     184                                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to cast converted node to transportContainer" << std::endl;
     185                        }
     186                        else
     187                                OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to convert stream to node" << std::endl;
     188                }
     189                else
     190                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to get readerWriter for osgb" << std::endl;
     191        }       // IF bytes recv > 0 END
     192
     193
    106194
    107195        return true;
     
    111199void dataIO_clusterENet::reportAsReadyToSwap()
    112200{
     201        if(!hardSync)
     202                return;
     203
    113204        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet reportAsReadyToSwap()" << std::endl;
    114205}
     
    116207bool dataIO_clusterENet::waitForSwap()
    117208{
     209        if(!hardSync)
     210                return true;
     211
    118212        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForSwap()" << std::endl;
    119213
     
    124218bool dataIO_clusterENet::waitForAllReadyToSwap()
    125219{
     220        if(!hardSync)
     221                return true;
     222
    126223        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForAllReadyToSwap()" << std::endl;
    127224
     
    132229bool dataIO_clusterENet::sendSwapCommand()
    133230{
     231        if(!hardSync)
     232                return true;
     233
    134234        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendSwapCommand()" << std::endl;
    135235
  • osgVisual/src/cluster/dataIO_clusterENet_implementation.cpp

    r68 r69  
    2121int dataIO_clusterENet_implementation::activeENetInstances = 0;
    2222
    23 dataIO_clusterENet_implementation::dataIO_clusterENet_implementation()
     23dataIO_clusterENet_implementation::dataIO_clusterENet_implementation(std::string& receivedTransportContainer_)
     24: receivedTransportContainer(receivedTransportContainer_)
    2425{
    2526        std::cout << "Instantiated server class# "<< activeENetInstances << std::endl;
     
    142143        if( peerList.size() == 0 )
    143144        {
    144                 std::cout << "dataIO_clusterENet_implementation::sendPacket() - ERROR: No connected peer available!" << std::endl;
     145                //std::cout << "dataIO_clusterENet_implementation::sendPacket() - ERROR: No connected peer available!" << std::endl;
    145146                return;
    146147        }
     
    215216{
    216217        if(currentRole != dataIO_clusterENet_implementation::CLIENT)
     218        {
     219                std::cout << "dataIO_clusterENet_implementation::connectTo() : ERROR: ENet does not work as client - ignoring to connect!" << std::endl;
    217220                return false;
     221        }
    218222
    219223        ENetAddress address;
     
    257261void dataIO_clusterENet_implementation::onReceivePacket(ENetEvent* event_)
    258262{
    259                 std::string datastring;
    260                 datastring.assign((char*)(event_->packet->data), event_->packet->dataLength);   
    261                 std::cout << "A packet of length "<<event_->packet->dataLength<<" containing "<<datastring<<" was received from "<<datastring<<" on channel "<<(int)(event_->channelID)<<std::endl;
    262                 datastring+=std::string("answer");
    263 
    264                 ENetPacket * packet = enet_packet_create (datastring.c_str(),
    265                                                                                                   datastring.size(),
    266                                                                                                   ENET_PACKET_FLAG_RELIABLE);
    267                
    268                 // Send answer
    269                 sendPacket( packet, 0, 0, true);
     263                receivedTransportContainer.assign((char*)(event_->packet->data), event_->packet->dataLength);   
     264                //std::cout << "A packet of length "<<event_->packet->dataLength<<" containing "<<receivedTransportContainer<<" was received from "<<receivedTransportContainer<<" on channel "<<(int)(event_->channelID)<<std::endl;
    270265}
    271266
  • osgVisual/src/cluster/dataIO_clusterUDP.cpp

    r59 r69  
    128128                else
    129129                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterUDP::readTO_OBJvaluesFromMaster() - Unable to get readerWriter for osgb" << std::endl;
    130         }
     130        }       // IF bytes recv >0
    131131
    132132
  • osgVisual/src/core/visual_core.cpp

    r68 r69  
    362362        //test->init( rootNode, viewer );
    363363
    364         // Creating Testclasses
    365         osg::ref_ptr<osgVisual::dataIO_transportContainer> test = new osgVisual::dataIO_transportContainer();
    366         osg::ref_ptr<osgVisual::dataIO_executer> testEx = new osgVisual::dataIO_executer();
    367         osg::ref_ptr<osgVisual::dataIO_slot> testSlot = new osgVisual::dataIO_slot();
    368         test->setFrameID( 22 );
    369         test->setName("ugamoep");
    370         testEx->setexecuterID( osgVisual::dataIO_executer::IS_COLLISION );
    371         testSlot->setVariableName(std::string("HalloName"));
    372         testSlot->setdataDirection( osgVisual::dataIO_slot::TO_OBJ );
    373         testSlot->setvarType( osgVisual::dataIO_slot::DOUBLE );
    374         testSlot->setValue( 0.12345 );
    375         test->addExecuter( testEx );
    376         test->addSlot( testSlot );
    377 
    378         // Writing object to stream
    379         //std::ofstream myOstream("test.txt" );
    380         std::stringstream myOstream;
    381         std::string extension = "osgb";
    382         bool compressed = false;
    383         osgDB::ReaderWriter* rw = osgDB::Registry::instance()->getReaderWriterForExtension(extension.c_str());
    384         if ( rw )
    385         {
    386                 osgDB::ReaderWriter::WriteResult wr;
    387 
    388                 if (extension == "osgb")
    389                 {
    390                         if (compressed)
    391                                 wr = rw->writeObject( *test, myOstream, new osgDB::Options("Compressor=zlib") );
    392                         else
    393                                 wr = rw->writeObject( *test, myOstream );
    394                 }
    395 
    396                 if (extension == "osgt")
    397                 {
    398                         if (compressed)
    399                                 wr = rw->writeObject( *test, myOstream, new osgDB::Options("Ascii Compressor=zlib") );
    400                         else
    401                                 wr = rw->writeObject( *test, myOstream, new osgDB::Options("Ascii") );
    402                 }
    403 
    404                 if (extension == "osgx")
    405                 {
    406                         if (compressed)
    407                                 wr = rw->writeObject( *test, myOstream, new osgDB::Options("XML Compressor=zlib") );
    408                         else
    409                                 wr = rw->writeObject( *test, myOstream, new osgDB::Options("XML") );
    410                 }
    411 
    412 
    413                 if (!wr.success() )     OSG_NOTIFY( osg::WARN ) << "ERROR: Save failed: " << wr.message() << std::endl;
    414         }
    415         else
    416                 OSG_NOTIFY( osg::WARN ) << "error getting readerWriter for osgt" << std::endl;
    417 
    418         // Size ermitteln.
    419         std::stringbuf *pbuf;
    420         pbuf = myOstream.rdbuf();
    421         OSG_NOTIFY( osg::WARN ) << "PBUF Bytes available: " << pbuf->in_avail() << std::endl;
    422         OSG_NOTIFY( osg::ALWAYS ) << "STRING Bytes available: " << myOstream.str().length() << std::endl;
    423         OSG_NOTIFY( osg::ALWAYS ) << "STRING content: " << myOstream.str() << std::endl;
    424 
    425         //Reading Stream to node
    426         if ( rw )
    427         {
    428                 osgDB::ReaderWriter::ReadResult rr = rw->readObject( myOstream );
    429                 osg::ref_ptr<osgVisual::dataIO_transportContainer> test2 = dynamic_cast<osgVisual::dataIO_transportContainer*>(rr.takeObject());
    430                 if (test2)
    431                 {
    432                         OSG_NOTIFY( osg::WARN ) << "TEST::FrameID is: " << test->getFrameID() << std::endl;
    433                 }
    434                 else
    435                         OSG_NOTIFY( osg::WARN ) << "Error converting stream to Node" << std::endl;
    436         }
    437 }
     364        //// Creating Testclasses
     365        //osg::ref_ptr<osgVisual::dataIO_transportContainer> test = new osgVisual::dataIO_transportContainer();
     366        //osg::ref_ptr<osgVisual::dataIO_executer> testEx = new osgVisual::dataIO_executer();
     367        //osg::ref_ptr<osgVisual::dataIO_slot> testSlot = new osgVisual::dataIO_slot();
     368        //test->setFrameID( 22 );
     369        //test->setName("ugamoep");
     370        //testEx->setexecuterID( osgVisual::dataIO_executer::IS_COLLISION );
     371        //testSlot->setVariableName(std::string("HalloName"));
     372        //testSlot->setdataDirection( osgVisual::dataIO_slot::TO_OBJ );
     373        //testSlot->setvarType( osgVisual::dataIO_slot::DOUBLE );
     374        //testSlot->setValue( 0.12345 );
     375        //test->addExecuter( testEx );
     376        //test->addSlot( testSlot );
     377
     378        visual_dataIO::getInstance()->setSlotData("TestSlot1", osgVisual::dataIO_slot::TO_OBJ, 0.12345);
     379
     380
     381        //// Writing object to stream
     382        ////std::ofstream myOstream("test.txt" );
     383        //std::stringstream myOstream;
     384        //std::string extension = "osgb";
     385        //bool compressed = false;
     386        //osgDB::ReaderWriter* rw = osgDB::Registry::instance()->getReaderWriterForExtension(extension.c_str());
     387        //if ( rw )
     388        //{
     389        //      osgDB::ReaderWriter::WriteResult wr;
     390
     391        //      if (extension == "osgb")
     392        //      {
     393        //              if (compressed)
     394        //                      wr = rw->writeObject( *test, myOstream, new osgDB::Options("Compressor=zlib") );
     395        //              else
     396        //                      wr = rw->writeObject( *test, myOstream );
     397        //      }
     398
     399        //      if (extension == "osgt")
     400        //      {
     401        //              if (compressed)
     402        //                      wr = rw->writeObject( *test, myOstream, new osgDB::Options("Ascii Compressor=zlib") );
     403        //              else
     404        //                      wr = rw->writeObject( *test, myOstream, new osgDB::Options("Ascii") );
     405        //      }
     406
     407        //      if (extension == "osgx")
     408        //      {
     409        //              if (compressed)
     410        //                      wr = rw->writeObject( *test, myOstream, new osgDB::Options("XML Compressor=zlib") );
     411        //              else
     412        //                      wr = rw->writeObject( *test, myOstream, new osgDB::Options("XML") );
     413        //      }
     414
     415
     416        //      if (!wr.success() )     OSG_NOTIFY( osg::WARN ) << "ERROR: Save failed: " << wr.message() << std::endl;
     417        //}
     418        //else
     419        //      OSG_NOTIFY( osg::WARN ) << "error getting readerWriter for osgt" << std::endl;
     420
     421        //// Size ermitteln.
     422        //std::stringbuf *pbuf;
     423        //pbuf = myOstream.rdbuf();
     424        //OSG_NOTIFY( osg::WARN ) << "PBUF Bytes available: " << pbuf->in_avail() << std::endl;
     425        //OSG_NOTIFY( osg::ALWAYS ) << "STRING Bytes available: " << myOstream.str().length() << std::endl;
     426        //OSG_NOTIFY( osg::ALWAYS ) << "STRING content: " << myOstream.str() << std::endl;
     427
     428        ////Reading Stream to node
     429        //if ( rw )
     430        //{
     431        //      osgDB::ReaderWriter::ReadResult rr = rw->readObject( myOstream );
     432        //      osg::ref_ptr<osgVisual::dataIO_transportContainer> test2 = dynamic_cast<osgVisual::dataIO_transportContainer*>(rr.takeObject());
     433        //      if (test2)
     434        //      {
     435        //              OSG_NOTIFY( osg::WARN ) << "TEST::FrameID is: " << test->getFrameID() << std::endl;
     436        //      }
     437        //      else
     438        //              OSG_NOTIFY( osg::WARN ) << "Error converting stream to Node" << std::endl;
     439        //}
     440}
  • osgVisual/src/dataIO/visual_dataIO.cpp

    r67 r69  
    7878        #ifdef USE_CLUSTER_ENET
    7979                cluster = new dataIO_clusterENet();
     80                cluster->enableHardSync( false );       /** \todo : rebuild this structure in cluster.h and move it this way to a general implementation. */
    8081        #endif
    8182        if(cluster.valid())
    82                 cluster->init(arguments_, clusterMode, slotContainer, true, false);
     83                //cluster->init(arguments_, clusterMode, slotContainer, true, false);
     84                cluster->init(arguments_, viewer_, clusterMode, slotContainer, false, true);
    8385
    8486        // Create extLink.
Note: See TracChangeset for help on using the changeset viewer.