source: osgVisual/trunk/src/cluster/dataIO_clusterENet.cpp @ 193

Last change on this file since 193 was 186, checked in by Torben Dannhauer, 14 years ago

XML configuration now works also with dataIO extLink

File size: 8.8 KB
Line 
1/* -*-c++-*- osgVisual - Copyright (C) 2009-2010 Torben Dannhauer
2 *
3 * This library is based on OpenSceneGraph, open source and may be redistributed and/or modified under
4 * the terms of the OpenSceneGraph Public License (OSGPL) version 0.0 or
5 * (at your option) any later version.  The full license is in LICENSE file
6 * included with this distribution, and on the openscenegraph.org website.
7 *
8 * osgVisual requires for some proprietary modules a license from the correspondig manufacturer.
9 * You have to aquire licenses for all used proprietary modules.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * OpenSceneGraph Public License for more details.
15*/
16
17#include "dataIO_clusterENet.h"
18
19using namespace osgVisual;
20
21dataIO_clusterENet::dataIO_clusterENet()
22{
23        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet constructed" << std::endl;
24
25        serverToConnect = "unknown";
26        hardSync = false;       // integrate into init()
27        port = 12345;   // integrate into init()
28}
29
30
31dataIO_clusterENet::~dataIO_clusterENet(void)
32{
33        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet destructed" << std::endl;
34}
35
36
37bool dataIO_clusterENet::init(xmlNode* configurationNode, osgViewer::Viewer* viewer_, clustermode clusterMode_, osgVisual::dataIO_transportContainer* sendContainer_, bool asAscii_)
38{
39        if (!configurationNode || !processXMLConfiguration(configurationNode))
40                return false;
41
42        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet init();" << std::endl;
43       
44        // Store viewer
45        viewer = viewer_;
46
47        // Configure the clustermode
48        clusterMode = clusterMode_;
49
50        // store sendContainer
51        sendContainer = sendContainer_;
52
53        // Configure Compression and instantiate read/write-options
54        std::string readOptionString = "";
55        std::string writeOptionString = "";
56        if(asAscii_)
57        {
58                readOptionString = "Ascii";
59                writeOptionString = "Ascii";
60        }
61        if (compressionEnabled)
62                writeOptionString+=" Compressor=zlib";
63        readOptions = new osgDB::Options( readOptionString.c_str() );
64        writeOptions = new osgDB::Options( writeOptionString.c_str() );
65
66        // Get ReaderWriter
67        rw = osgDB::Registry::instance()->getReaderWriterForExtension("osgb"); 
68       
69        // create ENet implementation object.
70        enet_impl = new osgVisual::dataIO_clusterENet_implementation(receivedTransportContainer);
71
72        // initialize ENet implementation
73        if(clusterMode == MASTER)
74        {
75                std::cout << "Init dataIO_cluster_ENet as Server on port " << port << std::endl;
76                enet_impl->init(dataIO_clusterENet_implementation::SERVER, port);
77
78                initialized = true;
79        }
80        if(clusterMode == SLAVE)
81        {
82                // Init ENet
83                enet_impl->init(dataIO_clusterENet_implementation::CLIENT, port);
84
85                // Connect to server with 5 retries:
86                bool connected = false;
87                for(int i=0; i<5; i++)
88                {
89                        std::cout << "Try to connect to server " << serverToConnect << std::endl;
90                        if( enet_impl->connectTo( serverToConnect.c_str(), 5000 ) )
91                        {
92                                // Connect successful.
93                                initialized = true;
94                                connected = true;
95                                break;
96                        }
97                }       // For END
98                if(!connected)
99                {
100                        initialized = false;
101                        std::cout << "Finally failed to establish connection to server " << serverToConnect << std::endl;
102                        return false;
103                }
104        }       // IF SLAVE END
105
106        return true;
107}
108
109bool dataIO_clusterENet::processXMLConfiguration(xmlNode* clusterConfig_)
110{
111        // Extract cluster role
112        xmlAttr  *attr = clusterConfig_->properties;
113        while ( attr ) 
114        { 
115                std::string attr_name=reinterpret_cast<const char*>(attr->name);
116                std::string attr_value=reinterpret_cast<const char*>(attr->children->content);
117                if( attr_name == "implementation" )
118                {
119                        if(attr_value != "enet")
120                        {
121                                OSG_NOTIFY( osg::ALWAYS ) << "WARNING: Cluster configuration does not match the currently used 'enet' implementation, falling back to clusterDummy" << std::endl;
122                                return false;
123                        }
124                }
125                if( attr_name == "hardsync" )
126                {
127                        if(attr_value == "yes")
128                                hardSync = true;
129                        else
130                                hardSync = false;
131                }
132                if( attr_name == "master_ip" )
133                {
134                        serverToConnect = attr_value;
135                }
136                if( attr_name == "port" )
137                {
138                        std::istringstream i(attr_value);
139                        if (!(i >> port))
140                        {
141                                OSG_NOTIFY( osg::ALWAYS ) << "WARNING: Cluster configuration : Invalid port number '" << attr_value << "', falling back to clusterDummy" << std::endl;
142                                return false;
143                        }
144                }
145                if( attr_name == "use_zlib_compressor" )
146                {
147                        if(attr_value == "yes")
148                                compressionEnabled = true;
149                        else
150                                compressionEnabled = false;
151                }
152                attr = attr->next; 
153        }       // WHILE attrib END
154
155        return true;
156}
157
158
159void dataIO_clusterENet::shutdown()
160{
161        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet shutdown();" << std::endl;
162}
163
164
165bool dataIO_clusterENet::sendTO_OBJvaluesToSlaves(osg::Matrixd viewMatrix_) 
166{
167        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendTO_OBJvaluesToSlaves()" << std::endl;
168       
169        if(sendContainer.valid())
170        {
171                // Pack FrameID & Viewmatrix
172                sendContainer->setFrameID(viewer->getFrameStamp()->getFrameNumber());
173                sendContainer->setViewMatrix(viewMatrix_);
174
175                // Writing node to stream
176                std::stringstream myOstream;
177                if ( rw )
178                {
179                        osgDB::ReaderWriter::WriteResult wr = rw->writeObject( *sendContainer.get(), myOstream, writeOptions );
180                        if (wr.success() )                     
181                        {
182                                // Send Data via ENet:
183                                //OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterUDP::sendTO_OBJvaluesToSlaves() - Bytes to send: " << myOstream.str().length() << std::endl;
184                                //OSG_NOTIFY( osg::ALWAYS ) << "Send: " << myOstream.str() << std::endl;
185                                //OSG_NOTIFY( osg::ALWAYS ) << "Sent Framenumber: " << viewer->getFrameStamp()->getFrameNumber() << std::endl;
186                                ENetPacket * packet = enet_packet_create (myOstream.str().c_str(), 
187                                                                                                  myOstream.str().size(), 
188                                                                                                  ENET_PACKET_FLAG_RELIABLE);
189               
190                                // Send data
191                                enet_impl->sendPacket( packet, 0, 0, true);
192                        }
193                        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Save failed: " << wr.message() << std::endl;
194                }
195                else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Unable to get readerWriter for osgb" << std::endl;
196        }
197        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Invalid transportContainer" << std::endl;
198
199
200        enet_impl->processEvents();     // As Master: process events AFTER doing anything to have up to have the "sent" commands in queue.
201        return true;
202}
203
204
205bool dataIO_clusterENet::readTO_OBJvaluesFromMaster()
206{
207        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet readTO_OBJvaluesFromMaster()" << std::endl;
208        enet_impl->processEvents();     // As Slave: process events BEFORE doing anything to have up to date values.
209
210        int bytes_received = receivedTransportContainer.size();
211        if (bytes_received > 0 )
212        {
213                //OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Bytes received: " << bytes_received << std::endl;
214                //OSG_NOTIFY( osg::ALWAYS ) << "Received: " << std::endl << receivedTransportContainer << std::endl;
215               
216
217                // Unserialize data
218                if ( rw )
219                {
220                        std::stringstream tmp;
221                        tmp  << receivedTransportContainer;
222                        osgDB::ReaderWriter::ReadResult rr = rw->readObject( tmp, readOptions );
223                        if (rr.success())
224                        {
225                                sendContainer = dynamic_cast<osgVisual::dataIO_transportContainer*>(rr.takeObject());
226                                if (sendContainer)
227                                {
228                                        OSG_NOTIFY( osg::ALWAYS ) << "Received:: Settings Viewmatrix...FrameID is: " << sendContainer->getFrameID() << std::endl;
229                                        // Restore Viewmatrix
230                                        viewer->getCamera()->setViewMatrix(sendContainer->getViewMatrix());
231                                }
232                                else
233                                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to cast converted node to transportContainer" << std::endl;
234                        }
235                        else
236                                OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to convert stream to node" << std::endl;
237                }
238                else
239                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to get readerWriter for osgb" << std::endl;
240        }       // IF bytes recv > 0 END
241
242
243
244        return true;
245}
246
247
248void dataIO_clusterENet::reportAsReadyToSwap()
249{
250        if(!hardSync)
251                return;
252
253        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet reportAsReadyToSwap()" << std::endl;
254}
255
256bool dataIO_clusterENet::waitForSwap()
257{
258        if(!hardSync)
259                return true;
260
261        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForSwap()" << std::endl;
262
263        return true;
264}
265
266
267bool dataIO_clusterENet::waitForAllReadyToSwap()
268{
269        if(!hardSync)
270                return true;
271
272        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForAllReadyToSwap()" << std::endl;
273
274        return true;
275}
276
277
278bool dataIO_clusterENet::sendSwapCommand()
279{
280        if(!hardSync)
281                return true;
282
283        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendSwapCommand()" << std::endl;
284
285        return true;
286}
Note: See TracBrowser for help on using the repository browser.