Changeset 69 for osgVisual/src/cluster


Ignore:
Timestamp:
Jul 20, 2010, 10:44:13 AM (14 years ago)
Author:
Torben Dannhauer
Message:

Network sync:
now works:

  • serialization of transport container
  • transport via ENet UDP
  • de-serialization of the transport container.
  • transport of viewmatrix and framenumber to the slave.

ToDo?: apply viewmatrix on slave still do not work.

Location:
osgVisual/src/cluster
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • osgVisual/src/cluster/dataIO_clusterENet.cpp

    r67 r69  
    2323        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet constructed" << std::endl;
    2424        serverToConnect = "unknown";
     25        hardSync = false;       // integrate into init()
     26        port = 12345;   // integrate into init()
    2527}
    2628
     
    3234
    3335
    34 void dataIO_clusterENet::init( osg::ArgumentParser& arguments_, clustermode clusterMode_, osgVisual::dataIO_transportContainer* sendContainer_, bool compressionEnabled_, bool asAscii_ )
     36void dataIO_clusterENet::init( osg::ArgumentParser& arguments_, osgViewer::Viewer* viewer_, clustermode clusterMode_, osgVisual::dataIO_transportContainer* sendContainer_, bool compressionEnabled_, bool asAscii_ )
    3537{
    3638        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet init();" << std::endl;
    3739       
     40        // Store viewer
     41        viewer = viewer_;
     42
    3843        // Configure the clustermode
    3944        clusterMode = clusterMode_;
     
    4146        // store sendContainer
    4247        sendContainer = sendContainer_;
     48
     49        // Configure Compression and instantiate read/write-options
     50        std::string readOptionString = "";
     51        std::string writeOptionString = "";
     52        if(asAscii_)
     53        {
     54                readOptionString = "Ascii";
     55                writeOptionString = "Ascii";
     56        }
     57        if (compressionEnabled_)
     58                writeOptionString+=" Compressor=zlib";
     59        readOptions = new osgDB::Options( readOptionString.c_str() );
     60        writeOptions = new osgDB::Options( writeOptionString.c_str() );
     61
     62        // Get ReaderWriter
     63        rw = osgDB::Registry::instance()->getReaderWriterForExtension("osgb"); 
    4364       
    4465        // create ENet implementation object.
    45         enet_impl = new osgVisual::dataIO_clusterENet_implementation();
     66        enet_impl = new osgVisual::dataIO_clusterENet_implementation(receivedTransportContainer);
    4667
    4768        // initialize ENet implementation
    4869        if(clusterMode == MASTER)
    4970        {
     71                std::cout << "Init dataIO_cluster_ENet as Server on port " << port << std::endl;
    5072                enet_impl->init(dataIO_clusterENet_implementation::SERVER, port);
    5173
     
    81103                {
    82104                        initialized = false;
    83                         std::cout << "Failed to establish connection to server " << serverToConnect << std::endl;
     105                        std::cout << "Finally failed to establish connection to server " << serverToConnect << std::endl;
     106                        exit(-1);
    84107                }
    85108        }       // IF SLAVE END
     
    95118bool dataIO_clusterENet::sendTO_OBJvaluesToSlaves()
    96119{
    97         OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendTO_OBJvaluesToSlaves()" << std::endl;
    98 
     120        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendTO_OBJvaluesToSlaves()" << std::endl;
     121
     122        if(sendContainer.valid())
     123        {
     124                // Pack FrameID & Viewmatrix
     125                sendContainer->setFrameID(viewer->getFrameStamp()->getFrameNumber());
     126                sendContainer->setViewMatrix(viewer->getCamera()->getViewMatrix());
     127
     128                // Writing node to stream
     129                std::stringstream myOstream;
     130                if ( rw )
     131                {
     132                        osgDB::ReaderWriter::WriteResult wr = rw->writeObject( *sendContainer.get(), myOstream, writeOptions );
     133                        if (wr.success() )                     
     134                        {
     135                                // Send Data via ENet:
     136                                OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterUDP::sendTO_OBJvaluesToSlaves() - Bytes to send: " << myOstream.str().length() << std::endl;
     137                                //sOSG_NOTIFY( osg::ALWAYS ) << "Send: " << myOstream.str() << std::endl;
     138                                ENetPacket * packet = enet_packet_create (myOstream.str().c_str(),
     139                                                                                                  myOstream.str().size(),
     140                                                                                                  ENET_PACKET_FLAG_RELIABLE);
     141               
     142                                // Send data
     143                                enet_impl->sendPacket( packet, 0, 0, true);
     144                        }
     145                        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Save failed: " << wr.message() << std::endl;
     146                }
     147                else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Unable to get readerWriter for osgb" << std::endl;
     148        }
     149        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Invalid transportContainer" << std::endl;
     150
     151
     152        enet_impl->processEvents();     // As Master: process events AFTER doing anything to have up to have the "sent" commands in queue.
    99153        return true;
    100154}
     
    103157bool dataIO_clusterENet::readTO_OBJvaluesFromMaster()
    104158{
    105         OSG_NOTIFY( osg::ALWAYS ) << "clusterENet readTO_OBJvaluesFromMaster()" << std::endl;
     159        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet readTO_OBJvaluesFromMaster()" << std::endl;
     160        enet_impl->processEvents();     // As Slave: process events BEFORE doing anything to have up to date values.
     161
     162        int bytes_received = receivedTransportContainer.size();
     163        if (bytes_received > 0 )
     164        {
     165                OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Bytes received: " << bytes_received << std::endl;
     166                //OSG_NOTIFY( osg::ALWAYS ) << "Received: " << std::endl << receivedTransportContainer << std::endl;
     167
     168                // Unserialize data
     169                if ( rw )
     170                {
     171                        std::stringstream test;
     172                        test << receivedTransportContainer;
     173                        osgDB::ReaderWriter::ReadResult rr = rw->readObject( test, readOptions );
     174                        if (rr.success())
     175                        {
     176                                sendContainer = dynamic_cast<osgVisual::dataIO_transportContainer*>(rr.takeObject());
     177                                if (sendContainer)
     178                                {
     179                                        OSG_NOTIFY( osg::ALWAYS ) << "Received::FrameID is: " << sendContainer->getFrameID() << std::endl;
     180                                        // Restore Viewmatrix
     181                                        viewer->getCamera()->setViewMatrix(sendContainer->getViewMatrix());
     182                                }
     183                                else
     184                                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to cast converted node to transportContainer" << std::endl;
     185                        }
     186                        else
     187                                OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to convert stream to node" << std::endl;
     188                }
     189                else
     190                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to get readerWriter for osgb" << std::endl;
     191        }       // IF bytes recv > 0 END
     192
     193
    106194
    107195        return true;
     
    111199void dataIO_clusterENet::reportAsReadyToSwap()
    112200{
     201        if(!hardSync)
     202                return;
     203
    113204        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet reportAsReadyToSwap()" << std::endl;
    114205}
     
    116207bool dataIO_clusterENet::waitForSwap()
    117208{
     209        if(!hardSync)
     210                return true;
     211
    118212        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForSwap()" << std::endl;
    119213
     
    124218bool dataIO_clusterENet::waitForAllReadyToSwap()
    125219{
     220        if(!hardSync)
     221                return true;
     222
    126223        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForAllReadyToSwap()" << std::endl;
    127224
     
    132229bool dataIO_clusterENet::sendSwapCommand()
    133230{
     231        if(!hardSync)
     232                return true;
     233
    134234        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendSwapCommand()" << std::endl;
    135235
  • osgVisual/src/cluster/dataIO_clusterENet_implementation.cpp

    r68 r69  
    2121int dataIO_clusterENet_implementation::activeENetInstances = 0;
    2222
    23 dataIO_clusterENet_implementation::dataIO_clusterENet_implementation()
     23dataIO_clusterENet_implementation::dataIO_clusterENet_implementation(std::string& receivedTransportContainer_)
     24: receivedTransportContainer(receivedTransportContainer_)
    2425{
    2526        std::cout << "Instantiated server class# "<< activeENetInstances << std::endl;
     
    142143        if( peerList.size() == 0 )
    143144        {
    144                 std::cout << "dataIO_clusterENet_implementation::sendPacket() - ERROR: No connected peer available!" << std::endl;
     145                //std::cout << "dataIO_clusterENet_implementation::sendPacket() - ERROR: No connected peer available!" << std::endl;
    145146                return;
    146147        }
     
    215216{
    216217        if(currentRole != dataIO_clusterENet_implementation::CLIENT)
     218        {
     219                std::cout << "dataIO_clusterENet_implementation::connectTo() : ERROR: ENet does not work as client - ignoring to connect!" << std::endl;
    217220                return false;
     221        }
    218222
    219223        ENetAddress address;
     
    257261void dataIO_clusterENet_implementation::onReceivePacket(ENetEvent* event_)
    258262{
    259                 std::string datastring;
    260                 datastring.assign((char*)(event_->packet->data), event_->packet->dataLength);   
    261                 std::cout << "A packet of length "<<event_->packet->dataLength<<" containing "<<datastring<<" was received from "<<datastring<<" on channel "<<(int)(event_->channelID)<<std::endl;
    262                 datastring+=std::string("answer");
    263 
    264                 ENetPacket * packet = enet_packet_create (datastring.c_str(),
    265                                                                                                   datastring.size(),
    266                                                                                                   ENET_PACKET_FLAG_RELIABLE);
    267                
    268                 // Send answer
    269                 sendPacket( packet, 0, 0, true);
     263                receivedTransportContainer.assign((char*)(event_->packet->data), event_->packet->dataLength);   
     264                //std::cout << "A packet of length "<<event_->packet->dataLength<<" containing "<<receivedTransportContainer<<" was received from "<<receivedTransportContainer<<" on channel "<<(int)(event_->channelID)<<std::endl;
    270265}
    271266
  • osgVisual/src/cluster/dataIO_clusterUDP.cpp

    r59 r69  
    128128                else
    129129                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterUDP::readTO_OBJvaluesFromMaster() - Unable to get readerWriter for osgb" << std::endl;
    130         }
     130        }       // IF bytes recv >0
    131131
    132132
Note: See TracChangeset for help on using the changeset viewer.