source: osgVisual/src/cluster/dataIO_clusterENet.cpp @ 75

Last change on this file since 75 was 75, checked in by Torben Dannhauer, 14 years ago
File size: 7.6 KB
Line 
1/* -*-c++-*- osgVisual - Copyright (C) 2009-2010 Torben Dannhauer
2 *
3 * This library is based on OpenSceneGraph, open source and may be redistributed and/or modified under
4 * the terms of the OpenSceneGraph Public License (OSGPL) version 0.0 or
5 * (at your option) any later version.  The full license is in LICENSE file
6 * included with this distribution, and on the openscenegraph.org website.
7 *
8 * osgVisual requires for some proprietary modules a license from the correspondig manufacturer.
9 * You have to aquire licenses for all used proprietary modules.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * OpenSceneGraph Public License for more details.
15*/
16
17#include "dataIO_clusterENet.h"
18
19using namespace osgVisual;
20
21dataIO_clusterENet::dataIO_clusterENet()
22{
23        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet constructed" << std::endl;
24        serverToConnect = "unknown";
25        hardSync = false;       // integrate into init()
26        port = 12345;   // integrate into init()
27}
28
29
30dataIO_clusterENet::~dataIO_clusterENet(void)
31{
32        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet destructed" << std::endl;
33}
34
35
36void dataIO_clusterENet::init( osg::ArgumentParser& arguments_, osgViewer::Viewer* viewer_, clustermode clusterMode_, osgVisual::dataIO_transportContainer* sendContainer_, bool compressionEnabled_, bool asAscii_ )
37{
38        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet init();" << std::endl;
39       
40        // Store viewer
41        viewer = viewer_;
42
43        // Configure the clustermode
44        clusterMode = clusterMode_;
45
46        // store sendContainer
47        sendContainer = sendContainer_;
48
49        // Configure Compression and instantiate read/write-options
50        std::string readOptionString = "";
51        std::string writeOptionString = "";
52        if(asAscii_)
53        {
54                readOptionString = "Ascii";
55                writeOptionString = "Ascii";
56        }
57        if (compressionEnabled_)
58                writeOptionString+=" Compressor=zlib";
59        readOptions = new osgDB::Options( readOptionString.c_str() );
60        writeOptions = new osgDB::Options( writeOptionString.c_str() );
61
62        // Get ReaderWriter
63        rw = osgDB::Registry::instance()->getReaderWriterForExtension("osgb"); 
64       
65        // create ENet implementation object.
66        enet_impl = new osgVisual::dataIO_clusterENet_implementation(receivedTransportContainer);
67
68        // initialize ENet implementation
69        if(clusterMode == MASTER)
70        {
71                std::cout << "Init dataIO_cluster_ENet as Server on port " << port << std::endl;
72                enet_impl->init(dataIO_clusterENet_implementation::SERVER, port);
73
74                initialized = true;
75        }
76        if(clusterMode == SLAVE)
77        {
78                // Get the server IP
79                if(!arguments_.read("--server",serverToConnect, port))
80                {
81                        // try server discovery
82                        //discoverServer(serverToConnect,port);
83                        /* todo : implement a udp server discovery based on ASIO */
84                }
85
86                // Init ENet
87                enet_impl->init(dataIO_clusterENet_implementation::CLIENT, port);
88
89                // Connect to server with 5 retries:
90                bool connected = false;
91                for(int i=0; i<5; i++)
92                {
93                        std::cout << "Try to connect to server " << serverToConnect << std::endl;
94                        if( enet_impl->connectTo( serverToConnect.c_str(), 5000 ) )
95                        {
96                                // Connect successful.
97                                initialized = true;
98                                connected = true;
99                                break;
100                        }
101                }       // For END
102                if(!connected)
103                {
104                        initialized = false;
105                        std::cout << "Finally failed to establish connection to server " << serverToConnect << std::endl;
106                        exit(-1);
107                }
108        }       // IF SLAVE END
109}
110
111
112void dataIO_clusterENet::shutdown()
113{
114        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet shutdown();" << std::endl;
115}
116
117
118bool dataIO_clusterENet::sendTO_OBJvaluesToSlaves()
119{
120        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendTO_OBJvaluesToSlaves()" << std::endl;
121
122        if(sendContainer.valid())
123        {
124                // Pack FrameID & Viewmatrix
125                sendContainer->setFrameID(viewer->getFrameStamp()->getFrameNumber());
126                sendContainer->setViewMatrix(viewer->getCamera()->getViewMatrix());
127
128                // Writing node to stream
129                std::stringstream myOstream;
130                if ( rw )
131                {
132                        osgDB::ReaderWriter::WriteResult wr = rw->writeObject( *sendContainer.get(), myOstream, writeOptions );
133                        if (wr.success() )                     
134                        {
135                                // Send Data via ENet:
136                                //OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterUDP::sendTO_OBJvaluesToSlaves() - Bytes to send: " << myOstream.str().length() << std::endl;
137                                //OSG_NOTIFY( osg::ALWAYS ) << "Send: " << myOstream.str() << std::endl;
138                                OSG_NOTIFY( osg::ALWAYS ) << "Sent Framenumber: " << viewer->getFrameStamp()->getFrameNumber() << std::endl;
139                                ENetPacket * packet = enet_packet_create (myOstream.str().c_str(), 
140                                                                                                  myOstream.str().size(), 
141                                                                                                  ENET_PACKET_FLAG_RELIABLE);
142               
143                                // Send data
144                                enet_impl->sendPacket( packet, 0, 0, true);
145                        }
146                        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Save failed: " << wr.message() << std::endl;
147                }
148                else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Unable to get readerWriter for osgb" << std::endl;
149        }
150        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Invalid transportContainer" << std::endl;
151
152
153        enet_impl->processEvents();     // As Master: process events AFTER doing anything to have up to have the "sent" commands in queue.
154        return true;
155}
156
157
158bool dataIO_clusterENet::readTO_OBJvaluesFromMaster()
159{
160        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet readTO_OBJvaluesFromMaster()" << std::endl;
161        enet_impl->processEvents();     // As Slave: process events BEFORE doing anything to have up to date values.
162
163        int bytes_received = receivedTransportContainer.size();
164        if (bytes_received > 0 )
165        {
166                //OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Bytes received: " << bytes_received << std::endl;
167                //OSG_NOTIFY( osg::ALWAYS ) << "Received: " << std::endl << receivedTransportContainer << std::endl;
168               
169
170                // Unserialize data
171                if ( rw )
172                {
173                        std::stringstream test;
174                        test << receivedTransportContainer;
175                        osgDB::ReaderWriter::ReadResult rr = rw->readObject( test, readOptions );
176                        if (rr.success())
177                        {
178                                sendContainer = dynamic_cast<osgVisual::dataIO_transportContainer*>(rr.takeObject());
179                                if (sendContainer)
180                                {
181                                        OSG_NOTIFY( osg::ALWAYS ) << "Received:: Settings Viewmatrix...FrameID is: " << sendContainer->getFrameID() << std::endl;
182                                        // Restore Viewmatrix
183                                        viewer->getCamera()->setViewMatrix(sendContainer->getViewMatrix());
184                                }
185                                else
186                                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to cast converted node to transportContainer" << std::endl;
187                        }
188                        else
189                                OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to convert stream to node" << std::endl;
190                }
191                else
192                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to get readerWriter for osgb" << std::endl;
193        }       // IF bytes recv > 0 END
194
195
196
197        return true;
198}
199
200
201void dataIO_clusterENet::reportAsReadyToSwap()
202{
203        if(!hardSync)
204                return;
205
206        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet reportAsReadyToSwap()" << std::endl;
207}
208
209bool dataIO_clusterENet::waitForSwap()
210{
211        if(!hardSync)
212                return true;
213
214        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForSwap()" << std::endl;
215
216        return true;
217}
218
219
220bool dataIO_clusterENet::waitForAllReadyToSwap()
221{
222        if(!hardSync)
223                return true;
224
225        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForAllReadyToSwap()" << std::endl;
226
227        return true;
228}
229
230
231bool dataIO_clusterENet::sendSwapCommand()
232{
233        if(!hardSync)
234                return true;
235
236        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendSwapCommand()" << std::endl;
237
238        return true;
239}
Note: See TracBrowser for help on using the repository browser.