source: osgVisual/src/cluster/dataIO_clusterENet.cpp @ 115

Last change on this file since 115 was 88, checked in by Torben Dannhauer, 14 years ago

Moved memory leak detection from source file to headerfile. Its still in the class but at least not in the source file.

The leak detection works, but the false positives are not stopped.
Use Linux/Valgrind? to make your final leak detection beyond the easy first approach in MSVC

File size: 7.6 KB
Line 
1/* -*-c++-*- osgVisual - Copyright (C) 2009-2010 Torben Dannhauer
2 *
3 * This library is based on OpenSceneGraph, open source and may be redistributed and/or modified under
4 * the terms of the OpenSceneGraph Public License (OSGPL) version 0.0 or
5 * (at your option) any later version.  The full license is in LICENSE file
6 * included with this distribution, and on the openscenegraph.org website.
7 *
8 * osgVisual requires for some proprietary modules a license from the correspondig manufacturer.
9 * You have to aquire licenses for all used proprietary modules.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * OpenSceneGraph Public License for more details.
15*/
16
17#include "dataIO_clusterENet.h"
18
19using namespace osgVisual;
20
21dataIO_clusterENet::dataIO_clusterENet()
22{
23        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet constructed" << std::endl;
24
25        serverToConnect = "unknown";
26        hardSync = false;       // integrate into init()
27        port = 12345;   // integrate into init()
28}
29
30
31dataIO_clusterENet::~dataIO_clusterENet(void)
32{
33        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet destructed" << std::endl;
34}
35
36
37void dataIO_clusterENet::init( osg::ArgumentParser& arguments_, osgViewer::Viewer* viewer_, clustermode clusterMode_, osgVisual::dataIO_transportContainer* sendContainer_, bool compressionEnabled_, bool asAscii_ )
38{
39        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet init();" << std::endl;
40       
41        // Store viewer
42        viewer = viewer_;
43
44        // Configure the clustermode
45        clusterMode = clusterMode_;
46
47        // store sendContainer
48        sendContainer = sendContainer_;
49
50        // Configure Compression and instantiate read/write-options
51        std::string readOptionString = "";
52        std::string writeOptionString = "";
53        if(asAscii_)
54        {
55                readOptionString = "Ascii";
56                writeOptionString = "Ascii";
57        }
58        if (compressionEnabled_)
59                writeOptionString+=" Compressor=zlib";
60        readOptions = new osgDB::Options( readOptionString.c_str() );
61        writeOptions = new osgDB::Options( writeOptionString.c_str() );
62
63        // Get ReaderWriter
64        rw = osgDB::Registry::instance()->getReaderWriterForExtension("osgb"); 
65       
66        // create ENet implementation object.
67        enet_impl = new osgVisual::dataIO_clusterENet_implementation(receivedTransportContainer);
68
69        // initialize ENet implementation
70        if(clusterMode == MASTER)
71        {
72                std::cout << "Init dataIO_cluster_ENet as Server on port " << port << std::endl;
73                enet_impl->init(dataIO_clusterENet_implementation::SERVER, port);
74
75                initialized = true;
76        }
77        if(clusterMode == SLAVE)
78        {
79                // Get the server IP
80                if(!arguments_.read("--server",serverToConnect, port))
81                {
82                        // try server discovery
83                        //discoverServer(serverToConnect,port);
84                        /* todo : implement a udp server discovery based on ASIO */
85                }
86
87                // Init ENet
88                enet_impl->init(dataIO_clusterENet_implementation::CLIENT, port);
89
90                // Connect to server with 5 retries:
91                bool connected = false;
92                for(int i=0; i<5; i++)
93                {
94                        std::cout << "Try to connect to server " << serverToConnect << std::endl;
95                        if( enet_impl->connectTo( serverToConnect.c_str(), 5000 ) )
96                        {
97                                // Connect successful.
98                                initialized = true;
99                                connected = true;
100                                break;
101                        }
102                }       // For END
103                if(!connected)
104                {
105                        initialized = false;
106                        std::cout << "Finally failed to establish connection to server " << serverToConnect << std::endl;
107                        exit(-1);
108                }
109        }       // IF SLAVE END
110}
111
112
113void dataIO_clusterENet::shutdown()
114{
115        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet shutdown();" << std::endl;
116}
117
118
119bool dataIO_clusterENet::sendTO_OBJvaluesToSlaves(osg::Matrixd viewMatrix_) 
120{
121        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendTO_OBJvaluesToSlaves()" << std::endl;
122       
123        if(sendContainer.valid())
124        {
125                // Pack FrameID & Viewmatrix
126                sendContainer->setFrameID(viewer->getFrameStamp()->getFrameNumber());
127                sendContainer->setViewMatrix(viewMatrix_);
128
129                // Writing node to stream
130                std::stringstream myOstream;
131                if ( rw )
132                {
133                        osgDB::ReaderWriter::WriteResult wr = rw->writeObject( *sendContainer.get(), myOstream, writeOptions );
134                        if (wr.success() )                     
135                        {
136                                // Send Data via ENet:
137                                //OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterUDP::sendTO_OBJvaluesToSlaves() - Bytes to send: " << myOstream.str().length() << std::endl;
138                                //OSG_NOTIFY( osg::ALWAYS ) << "Send: " << myOstream.str() << std::endl;
139                                //OSG_NOTIFY( osg::ALWAYS ) << "Sent Framenumber: " << viewer->getFrameStamp()->getFrameNumber() << std::endl;
140                                ENetPacket * packet = enet_packet_create (myOstream.str().c_str(), 
141                                                                                                  myOstream.str().size(), 
142                                                                                                  ENET_PACKET_FLAG_RELIABLE);
143               
144                                // Send data
145                                enet_impl->sendPacket( packet, 0, 0, true);
146                        }
147                        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Save failed: " << wr.message() << std::endl;
148                }
149                else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Unable to get readerWriter for osgb" << std::endl;
150        }
151        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Invalid transportContainer" << std::endl;
152
153
154        enet_impl->processEvents();     // As Master: process events AFTER doing anything to have up to have the "sent" commands in queue.
155        return true;
156}
157
158
159bool dataIO_clusterENet::readTO_OBJvaluesFromMaster()
160{
161        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet readTO_OBJvaluesFromMaster()" << std::endl;
162        enet_impl->processEvents();     // As Slave: process events BEFORE doing anything to have up to date values.
163
164        int bytes_received = receivedTransportContainer.size();
165        if (bytes_received > 0 )
166        {
167                //OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Bytes received: " << bytes_received << std::endl;
168                //OSG_NOTIFY( osg::ALWAYS ) << "Received: " << std::endl << receivedTransportContainer << std::endl;
169               
170
171                // Unserialize data
172                if ( rw )
173                {
174                        std::stringstream tmp;
175                        tmp  << receivedTransportContainer;
176                        osgDB::ReaderWriter::ReadResult rr = rw->readObject( tmp, readOptions );
177                        if (rr.success())
178                        {
179                                sendContainer = dynamic_cast<osgVisual::dataIO_transportContainer*>(rr.takeObject());
180                                if (sendContainer)
181                                {
182                                        OSG_NOTIFY( osg::ALWAYS ) << "Received:: Settings Viewmatrix...FrameID is: " << sendContainer->getFrameID() << std::endl;
183                                        // Restore Viewmatrix
184                                        viewer->getCamera()->setViewMatrix(sendContainer->getViewMatrix());
185                                }
186                                else
187                                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to cast converted node to transportContainer" << std::endl;
188                        }
189                        else
190                                OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to convert stream to node" << std::endl;
191                }
192                else
193                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to get readerWriter for osgb" << std::endl;
194        }       // IF bytes recv > 0 END
195
196
197
198        return true;
199}
200
201
202void dataIO_clusterENet::reportAsReadyToSwap()
203{
204        if(!hardSync)
205                return;
206
207        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet reportAsReadyToSwap()" << std::endl;
208}
209
210bool dataIO_clusterENet::waitForSwap()
211{
212        if(!hardSync)
213                return true;
214
215        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForSwap()" << std::endl;
216
217        return true;
218}
219
220
221bool dataIO_clusterENet::waitForAllReadyToSwap()
222{
223        if(!hardSync)
224                return true;
225
226        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForAllReadyToSwap()" << std::endl;
227
228        return true;
229}
230
231
232bool dataIO_clusterENet::sendSwapCommand()
233{
234        if(!hardSync)
235                return true;
236
237        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendSwapCommand()" << std::endl;
238
239        return true;
240}
Note: See TracBrowser for help on using the repository browser.