source: osgVisual/src/cluster/dataIO_clusterENet.cpp @ 85

Last change on this file since 85 was 84, checked in by Torben Dannhauer, 14 years ago
File size: 7.6 KB
RevLine 
[65]1/* -*-c++-*- osgVisual - Copyright (C) 2009-2010 Torben Dannhauer
2 *
3 * This library is based on OpenSceneGraph, open source and may be redistributed and/or modified under
4 * the terms of the OpenSceneGraph Public License (OSGPL) version 0.0 or
5 * (at your option) any later version.  The full license is in LICENSE file
6 * included with this distribution, and on the openscenegraph.org website.
7 *
8 * osgVisual requires for some proprietary modules a license from the correspondig manufacturer.
9 * You have to aquire licenses for all used proprietary modules.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * OpenSceneGraph Public License for more details.
15*/
16
17#include "dataIO_clusterENet.h"
18
19using namespace osgVisual;
20
21dataIO_clusterENet::dataIO_clusterENet()
22{
23        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet constructed" << std::endl;
[67]24        serverToConnect = "unknown";
[69]25        hardSync = false;       // integrate into init()
26        port = 12345;   // integrate into init()
[65]27}
28
[67]29
[65]30dataIO_clusterENet::~dataIO_clusterENet(void)
31{
32        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet destructed" << std::endl;
33}
34
[67]35
[69]36void dataIO_clusterENet::init( osg::ArgumentParser& arguments_, osgViewer::Viewer* viewer_, clustermode clusterMode_, osgVisual::dataIO_transportContainer* sendContainer_, bool compressionEnabled_, bool asAscii_ )
[65]37{
[66]38        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet init();" << std::endl;
39       
[69]40        // Store viewer
41        viewer = viewer_;
42
[67]43        // Configure the clustermode
44        clusterMode = clusterMode_;
45
46        // store sendContainer
[65]47        sendContainer = sendContainer_;
[69]48
49        // Configure Compression and instantiate read/write-options
50        std::string readOptionString = "";
51        std::string writeOptionString = "";
52        if(asAscii_)
53        {
54                readOptionString = "Ascii";
55                writeOptionString = "Ascii";
56        }
57        if (compressionEnabled_)
58                writeOptionString+=" Compressor=zlib";
59        readOptions = new osgDB::Options( readOptionString.c_str() );
60        writeOptions = new osgDB::Options( writeOptionString.c_str() );
61
62        // Get ReaderWriter
63        rw = osgDB::Registry::instance()->getReaderWriterForExtension("osgb"); 
[66]64       
65        // create ENet implementation object.
[69]66        enet_impl = new osgVisual::dataIO_clusterENet_implementation(receivedTransportContainer);
[67]67
68        // initialize ENet implementation
69        if(clusterMode == MASTER)
70        {
[69]71                std::cout << "Init dataIO_cluster_ENet as Server on port " << port << std::endl;
[67]72                enet_impl->init(dataIO_clusterENet_implementation::SERVER, port);
73
74                initialized = true;
75        }
76        if(clusterMode == SLAVE)
77        {
78                // Get the server IP
79                if(!arguments_.read("--server",serverToConnect, port))
80                {
81                        // try server discovery
82                        //discoverServer(serverToConnect,port);
83                        /* todo : implement a udp server discovery based on ASIO */
84                }
85
86                // Init ENet
87                enet_impl->init(dataIO_clusterENet_implementation::CLIENT, port);
88
89                // Connect to server with 5 retries:
90                bool connected = false;
91                for(int i=0; i<5; i++)
92                {
93                        std::cout << "Try to connect to server " << serverToConnect << std::endl;
94                        if( enet_impl->connectTo( serverToConnect.c_str(), 5000 ) )
95                        {
96                                // Connect successful.
97                                initialized = true;
98                                connected = true;
99                                break;
100                        }
101                }       // For END
102                if(!connected)
103                {
104                        initialized = false;
[69]105                        std::cout << "Finally failed to establish connection to server " << serverToConnect << std::endl;
106                        exit(-1);
[67]107                }
108        }       // IF SLAVE END
[65]109}
110
[67]111
[65]112void dataIO_clusterENet::shutdown()
113{
114        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet shutdown();" << std::endl;
115}
116
[67]117
[82]118bool dataIO_clusterENet::sendTO_OBJvaluesToSlaves(osg::Matrixd viewMatrix_) 
[65]119{
[69]120        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendTO_OBJvaluesToSlaves()" << std::endl;
[82]121       
[69]122        if(sendContainer.valid())
123        {
124                // Pack FrameID & Viewmatrix
125                sendContainer->setFrameID(viewer->getFrameStamp()->getFrameNumber());
[82]126                sendContainer->setViewMatrix(viewMatrix_);
[69]127
128                // Writing node to stream
129                std::stringstream myOstream;
130                if ( rw )
131                {
132                        osgDB::ReaderWriter::WriteResult wr = rw->writeObject( *sendContainer.get(), myOstream, writeOptions );
133                        if (wr.success() )                     
134                        {
135                                // Send Data via ENet:
[70]136                                //OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterUDP::sendTO_OBJvaluesToSlaves() - Bytes to send: " << myOstream.str().length() << std::endl;
137                                //OSG_NOTIFY( osg::ALWAYS ) << "Send: " << myOstream.str() << std::endl;
[84]138                                //OSG_NOTIFY( osg::ALWAYS ) << "Sent Framenumber: " << viewer->getFrameStamp()->getFrameNumber() << std::endl;
[69]139                                ENetPacket * packet = enet_packet_create (myOstream.str().c_str(), 
140                                                                                                  myOstream.str().size(), 
141                                                                                                  ENET_PACKET_FLAG_RELIABLE);
142               
143                                // Send data
144                                enet_impl->sendPacket( packet, 0, 0, true);
145                        }
146                        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Save failed: " << wr.message() << std::endl;
147                }
148                else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Unable to get readerWriter for osgb" << std::endl;
149        }
150        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Invalid transportContainer" << std::endl;
151
152
153        enet_impl->processEvents();     // As Master: process events AFTER doing anything to have up to have the "sent" commands in queue.
[65]154        return true;
155}
156
[67]157
[65]158bool dataIO_clusterENet::readTO_OBJvaluesFromMaster()
159{
[69]160        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet readTO_OBJvaluesFromMaster()" << std::endl;
161        enet_impl->processEvents();     // As Slave: process events BEFORE doing anything to have up to date values.
[65]162
[69]163        int bytes_received = receivedTransportContainer.size();
164        if (bytes_received > 0 )
165        {
[70]166                //OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Bytes received: " << bytes_received << std::endl;
[74]167                //OSG_NOTIFY( osg::ALWAYS ) << "Received: " << std::endl << receivedTransportContainer << std::endl;
[73]168               
[69]169
170                // Unserialize data
171                if ( rw )
172                {
[77]173                        std::stringstream tmp;
[80]174                        tmp  << receivedTransportContainer;
[77]175                        osgDB::ReaderWriter::ReadResult rr = rw->readObject( tmp, readOptions );
[69]176                        if (rr.success())
177                        {
178                                sendContainer = dynamic_cast<osgVisual::dataIO_transportContainer*>(rr.takeObject());
179                                if (sendContainer)
180                                {
[70]181                                        OSG_NOTIFY( osg::ALWAYS ) << "Received:: Settings Viewmatrix...FrameID is: " << sendContainer->getFrameID() << std::endl;
[73]182                                        // Restore Viewmatrix
[69]183                                        viewer->getCamera()->setViewMatrix(sendContainer->getViewMatrix());
184                                }
185                                else
186                                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to cast converted node to transportContainer" << std::endl;
187                        }
188                        else
189                                OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to convert stream to node" << std::endl;
190                }
191                else
192                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to get readerWriter for osgb" << std::endl;
193        }       // IF bytes recv > 0 END
194
195
196
[65]197        return true;
198}
199
[67]200
[65]201void dataIO_clusterENet::reportAsReadyToSwap()
202{
[69]203        if(!hardSync)
204                return;
205
[65]206        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet reportAsReadyToSwap()" << std::endl;
207}
208
209bool dataIO_clusterENet::waitForSwap()
210{
[69]211        if(!hardSync)
212                return true;
213
[65]214        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForSwap()" << std::endl;
215
216        return true;
217}
218
[67]219
[65]220bool dataIO_clusterENet::waitForAllReadyToSwap()
221{
[69]222        if(!hardSync)
223                return true;
224
[65]225        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForAllReadyToSwap()" << std::endl;
226
227        return true;
228}
229
[67]230
[65]231bool dataIO_clusterENet::sendSwapCommand()
232{
[69]233        if(!hardSync)
234                return true;
235
[65]236        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendSwapCommand()" << std::endl;
237
238        return true;
239}
Note: See TracBrowser for help on using the repository browser.