source: osgVisual/src/cluster/dataIO_clusterENet.cpp @ 87

Last change on this file since 87 was 87, checked in by Torben Dannhauer, 14 years ago

Introductes VS 2008 Memory Leak Debugging.
Todo: Compile on Linux and compare with Valgrind, VS 2008 seems to be awkward in leak debugging

File size: 7.7 KB
RevLine 
[65]1/* -*-c++-*- osgVisual - Copyright (C) 2009-2010 Torben Dannhauer
2 *
3 * This library is based on OpenSceneGraph, open source and may be redistributed and/or modified under
4 * the terms of the OpenSceneGraph Public License (OSGPL) version 0.0 or
5 * (at your option) any later version.  The full license is in LICENSE file
6 * included with this distribution, and on the openscenegraph.org website.
7 *
8 * osgVisual requires for some proprietary modules a license from the correspondig manufacturer.
9 * You have to aquire licenses for all used proprietary modules.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * OpenSceneGraph Public License for more details.
15*/
16
17#include "dataIO_clusterENet.h"
18
19using namespace osgVisual;
20
21dataIO_clusterENet::dataIO_clusterENet()
22{
23        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet constructed" << std::endl;
[87]24        #include <leakDetection.h>
25
[67]26        serverToConnect = "unknown";
[69]27        hardSync = false;       // integrate into init()
28        port = 12345;   // integrate into init()
[65]29}
30
[67]31
[65]32dataIO_clusterENet::~dataIO_clusterENet(void)
33{
34        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet destructed" << std::endl;
35}
36
[67]37
[69]38void dataIO_clusterENet::init( osg::ArgumentParser& arguments_, osgViewer::Viewer* viewer_, clustermode clusterMode_, osgVisual::dataIO_transportContainer* sendContainer_, bool compressionEnabled_, bool asAscii_ )
[65]39{
[66]40        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet init();" << std::endl;
41       
[69]42        // Store viewer
43        viewer = viewer_;
44
[67]45        // Configure the clustermode
46        clusterMode = clusterMode_;
47
48        // store sendContainer
[65]49        sendContainer = sendContainer_;
[69]50
51        // Configure Compression and instantiate read/write-options
52        std::string readOptionString = "";
53        std::string writeOptionString = "";
54        if(asAscii_)
55        {
56                readOptionString = "Ascii";
57                writeOptionString = "Ascii";
58        }
59        if (compressionEnabled_)
60                writeOptionString+=" Compressor=zlib";
61        readOptions = new osgDB::Options( readOptionString.c_str() );
62        writeOptions = new osgDB::Options( writeOptionString.c_str() );
63
64        // Get ReaderWriter
65        rw = osgDB::Registry::instance()->getReaderWriterForExtension("osgb"); 
[66]66       
67        // create ENet implementation object.
[69]68        enet_impl = new osgVisual::dataIO_clusterENet_implementation(receivedTransportContainer);
[67]69
70        // initialize ENet implementation
71        if(clusterMode == MASTER)
72        {
[69]73                std::cout << "Init dataIO_cluster_ENet as Server on port " << port << std::endl;
[67]74                enet_impl->init(dataIO_clusterENet_implementation::SERVER, port);
75
76                initialized = true;
77        }
78        if(clusterMode == SLAVE)
79        {
80                // Get the server IP
81                if(!arguments_.read("--server",serverToConnect, port))
82                {
83                        // try server discovery
84                        //discoverServer(serverToConnect,port);
85                        /* todo : implement a udp server discovery based on ASIO */
86                }
87
88                // Init ENet
89                enet_impl->init(dataIO_clusterENet_implementation::CLIENT, port);
90
91                // Connect to server with 5 retries:
92                bool connected = false;
93                for(int i=0; i<5; i++)
94                {
95                        std::cout << "Try to connect to server " << serverToConnect << std::endl;
96                        if( enet_impl->connectTo( serverToConnect.c_str(), 5000 ) )
97                        {
98                                // Connect successful.
99                                initialized = true;
100                                connected = true;
101                                break;
102                        }
103                }       // For END
104                if(!connected)
105                {
106                        initialized = false;
[69]107                        std::cout << "Finally failed to establish connection to server " << serverToConnect << std::endl;
108                        exit(-1);
[67]109                }
110        }       // IF SLAVE END
[65]111}
112
[67]113
[65]114void dataIO_clusterENet::shutdown()
115{
116        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet shutdown();" << std::endl;
117}
118
[67]119
[82]120bool dataIO_clusterENet::sendTO_OBJvaluesToSlaves(osg::Matrixd viewMatrix_) 
[65]121{
[69]122        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendTO_OBJvaluesToSlaves()" << std::endl;
[82]123       
[69]124        if(sendContainer.valid())
125        {
126                // Pack FrameID & Viewmatrix
127                sendContainer->setFrameID(viewer->getFrameStamp()->getFrameNumber());
[82]128                sendContainer->setViewMatrix(viewMatrix_);
[69]129
130                // Writing node to stream
131                std::stringstream myOstream;
132                if ( rw )
133                {
134                        osgDB::ReaderWriter::WriteResult wr = rw->writeObject( *sendContainer.get(), myOstream, writeOptions );
135                        if (wr.success() )                     
136                        {
137                                // Send Data via ENet:
[70]138                                //OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterUDP::sendTO_OBJvaluesToSlaves() - Bytes to send: " << myOstream.str().length() << std::endl;
139                                //OSG_NOTIFY( osg::ALWAYS ) << "Send: " << myOstream.str() << std::endl;
[84]140                                //OSG_NOTIFY( osg::ALWAYS ) << "Sent Framenumber: " << viewer->getFrameStamp()->getFrameNumber() << std::endl;
[69]141                                ENetPacket * packet = enet_packet_create (myOstream.str().c_str(), 
142                                                                                                  myOstream.str().size(), 
143                                                                                                  ENET_PACKET_FLAG_RELIABLE);
144               
145                                // Send data
146                                enet_impl->sendPacket( packet, 0, 0, true);
147                        }
148                        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Save failed: " << wr.message() << std::endl;
149                }
150                else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Unable to get readerWriter for osgb" << std::endl;
151        }
152        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Invalid transportContainer" << std::endl;
153
154
155        enet_impl->processEvents();     // As Master: process events AFTER doing anything to have up to have the "sent" commands in queue.
[65]156        return true;
157}
158
[67]159
[65]160bool dataIO_clusterENet::readTO_OBJvaluesFromMaster()
161{
[69]162        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet readTO_OBJvaluesFromMaster()" << std::endl;
163        enet_impl->processEvents();     // As Slave: process events BEFORE doing anything to have up to date values.
[65]164
[69]165        int bytes_received = receivedTransportContainer.size();
166        if (bytes_received > 0 )
167        {
[70]168                //OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Bytes received: " << bytes_received << std::endl;
[74]169                //OSG_NOTIFY( osg::ALWAYS ) << "Received: " << std::endl << receivedTransportContainer << std::endl;
[73]170               
[69]171
172                // Unserialize data
173                if ( rw )
174                {
[77]175                        std::stringstream tmp;
[80]176                        tmp  << receivedTransportContainer;
[77]177                        osgDB::ReaderWriter::ReadResult rr = rw->readObject( tmp, readOptions );
[69]178                        if (rr.success())
179                        {
180                                sendContainer = dynamic_cast<osgVisual::dataIO_transportContainer*>(rr.takeObject());
181                                if (sendContainer)
182                                {
[70]183                                        OSG_NOTIFY( osg::ALWAYS ) << "Received:: Settings Viewmatrix...FrameID is: " << sendContainer->getFrameID() << std::endl;
[73]184                                        // Restore Viewmatrix
[69]185                                        viewer->getCamera()->setViewMatrix(sendContainer->getViewMatrix());
186                                }
187                                else
188                                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to cast converted node to transportContainer" << std::endl;
189                        }
190                        else
191                                OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to convert stream to node" << std::endl;
192                }
193                else
194                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to get readerWriter for osgb" << std::endl;
195        }       // IF bytes recv > 0 END
196
197
198
[65]199        return true;
200}
201
[67]202
[65]203void dataIO_clusterENet::reportAsReadyToSwap()
204{
[69]205        if(!hardSync)
206                return;
207
[65]208        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet reportAsReadyToSwap()" << std::endl;
209}
210
211bool dataIO_clusterENet::waitForSwap()
212{
[69]213        if(!hardSync)
214                return true;
215
[65]216        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForSwap()" << std::endl;
217
218        return true;
219}
220
[67]221
[65]222bool dataIO_clusterENet::waitForAllReadyToSwap()
223{
[69]224        if(!hardSync)
225                return true;
226
[65]227        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForAllReadyToSwap()" << std::endl;
228
229        return true;
230}
231
[67]232
[65]233bool dataIO_clusterENet::sendSwapCommand()
234{
[69]235        if(!hardSync)
236                return true;
237
[65]238        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendSwapCommand()" << std::endl;
239
240        return true;
241}
Note: See TracBrowser for help on using the repository browser.