source: osgVisual/src/cluster/dataIO_clusterENet.cpp @ 69

Last change on this file since 69 was 69, checked in by Torben Dannhauer, 14 years ago

Network sync:
now works:

  • serialization of transport container
  • transport via ENet UDP
  • de-serialization of the transport container.
  • transport of viewmatrix and framenumber to the slave.

ToDo?: apply viewmatrix on slave still do not work.

File size: 7.5 KB
Line 
1/* -*-c++-*- osgVisual - Copyright (C) 2009-2010 Torben Dannhauer
2 *
3 * This library is based on OpenSceneGraph, open source and may be redistributed and/or modified under
4 * the terms of the OpenSceneGraph Public License (OSGPL) version 0.0 or
5 * (at your option) any later version.  The full license is in LICENSE file
6 * included with this distribution, and on the openscenegraph.org website.
7 *
8 * osgVisual requires for some proprietary modules a license from the correspondig manufacturer.
9 * You have to aquire licenses for all used proprietary modules.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * OpenSceneGraph Public License for more details.
15*/
16
17#include "dataIO_clusterENet.h"
18
19using namespace osgVisual;
20
21dataIO_clusterENet::dataIO_clusterENet()
22{
23        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet constructed" << std::endl;
24        serverToConnect = "unknown";
25        hardSync = false;       // integrate into init()
26        port = 12345;   // integrate into init()
27}
28
29
30dataIO_clusterENet::~dataIO_clusterENet(void)
31{
32        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet destructed" << std::endl;
33}
34
35
36void dataIO_clusterENet::init( osg::ArgumentParser& arguments_, osgViewer::Viewer* viewer_, clustermode clusterMode_, osgVisual::dataIO_transportContainer* sendContainer_, bool compressionEnabled_, bool asAscii_ )
37{
38        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet init();" << std::endl;
39       
40        // Store viewer
41        viewer = viewer_;
42
43        // Configure the clustermode
44        clusterMode = clusterMode_;
45
46        // store sendContainer
47        sendContainer = sendContainer_;
48
49        // Configure Compression and instantiate read/write-options
50        std::string readOptionString = "";
51        std::string writeOptionString = "";
52        if(asAscii_)
53        {
54                readOptionString = "Ascii";
55                writeOptionString = "Ascii";
56        }
57        if (compressionEnabled_)
58                writeOptionString+=" Compressor=zlib";
59        readOptions = new osgDB::Options( readOptionString.c_str() );
60        writeOptions = new osgDB::Options( writeOptionString.c_str() );
61
62        // Get ReaderWriter
63        rw = osgDB::Registry::instance()->getReaderWriterForExtension("osgb"); 
64       
65        // create ENet implementation object.
66        enet_impl = new osgVisual::dataIO_clusterENet_implementation(receivedTransportContainer);
67
68        // initialize ENet implementation
69        if(clusterMode == MASTER)
70        {
71                std::cout << "Init dataIO_cluster_ENet as Server on port " << port << std::endl;
72                enet_impl->init(dataIO_clusterENet_implementation::SERVER, port);
73
74                initialized = true;
75        }
76        if(clusterMode == SLAVE)
77        {
78                // Get the server IP
79                if(!arguments_.read("--server",serverToConnect, port))
80                {
81                        // try server discovery
82                        //discoverServer(serverToConnect,port);
83                        /* todo : implement a udp server discovery based on ASIO */
84                }
85
86                // Init ENet
87                enet_impl->init(dataIO_clusterENet_implementation::CLIENT, port);
88
89                // Connect to server with 5 retries:
90                bool connected = false;
91                for(int i=0; i<5; i++)
92                {
93                        std::cout << "Try to connect to server " << serverToConnect << std::endl;
94                        if( enet_impl->connectTo( serverToConnect.c_str(), 5000 ) )
95                        {
96                                // Connect successful.
97                                initialized = true;
98                                connected = true;
99                                break;
100                        }
101                }       // For END
102                if(!connected)
103                {
104                        initialized = false;
105                        std::cout << "Finally failed to establish connection to server " << serverToConnect << std::endl;
106                        exit(-1);
107                }
108        }       // IF SLAVE END
109}
110
111
112void dataIO_clusterENet::shutdown()
113{
114        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet shutdown();" << std::endl;
115}
116
117
118bool dataIO_clusterENet::sendTO_OBJvaluesToSlaves()
119{
120        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendTO_OBJvaluesToSlaves()" << std::endl;
121
122        if(sendContainer.valid())
123        {
124                // Pack FrameID & Viewmatrix
125                sendContainer->setFrameID(viewer->getFrameStamp()->getFrameNumber());
126                sendContainer->setViewMatrix(viewer->getCamera()->getViewMatrix());
127
128                // Writing node to stream
129                std::stringstream myOstream;
130                if ( rw )
131                {
132                        osgDB::ReaderWriter::WriteResult wr = rw->writeObject( *sendContainer.get(), myOstream, writeOptions );
133                        if (wr.success() )                     
134                        {
135                                // Send Data via ENet:
136                                OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterUDP::sendTO_OBJvaluesToSlaves() - Bytes to send: " << myOstream.str().length() << std::endl;
137                                //sOSG_NOTIFY( osg::ALWAYS ) << "Send: " << myOstream.str() << std::endl;
138                                ENetPacket * packet = enet_packet_create (myOstream.str().c_str(), 
139                                                                                                  myOstream.str().size(), 
140                                                                                                  ENET_PACKET_FLAG_RELIABLE);
141               
142                                // Send data
143                                enet_impl->sendPacket( packet, 0, 0, true);
144                        }
145                        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Save failed: " << wr.message() << std::endl;
146                }
147                else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Unable to get readerWriter for osgb" << std::endl;
148        }
149        else OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::sendTO_OBJvaluesToSlaves() :: Invalid transportContainer" << std::endl;
150
151
152        enet_impl->processEvents();     // As Master: process events AFTER doing anything to have up to have the "sent" commands in queue.
153        return true;
154}
155
156
157bool dataIO_clusterENet::readTO_OBJvaluesFromMaster()
158{
159        //OSG_NOTIFY( osg::ALWAYS ) << "clusterENet readTO_OBJvaluesFromMaster()" << std::endl;
160        enet_impl->processEvents();     // As Slave: process events BEFORE doing anything to have up to date values.
161
162        int bytes_received = receivedTransportContainer.size();
163        if (bytes_received > 0 )
164        {
165                OSG_NOTIFY( osg::ALWAYS ) << "dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Bytes received: " << bytes_received << std::endl;
166                //OSG_NOTIFY( osg::ALWAYS ) << "Received: " << std::endl << receivedTransportContainer << std::endl;
167
168                // Unserialize data
169                if ( rw )
170                {
171                        std::stringstream test;
172                        test << receivedTransportContainer;
173                        osgDB::ReaderWriter::ReadResult rr = rw->readObject( test, readOptions );
174                        if (rr.success())
175                        {
176                                sendContainer = dynamic_cast<osgVisual::dataIO_transportContainer*>(rr.takeObject());
177                                if (sendContainer)
178                                {
179                                        OSG_NOTIFY( osg::ALWAYS ) << "Received::FrameID is: " << sendContainer->getFrameID() << std::endl;
180                                        // Restore Viewmatrix
181                                        viewer->getCamera()->setViewMatrix(sendContainer->getViewMatrix());
182                                }
183                                else
184                                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to cast converted node to transportContainer" << std::endl;
185                        }
186                        else
187                                OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to convert stream to node" << std::endl;
188                }
189                else
190                        OSG_NOTIFY( osg::WARN ) << "ERROR: dataIO_clusterENet::readTO_OBJvaluesFromMaster() - Unable to get readerWriter for osgb" << std::endl;
191        }       // IF bytes recv > 0 END
192
193
194
195        return true;
196}
197
198
199void dataIO_clusterENet::reportAsReadyToSwap()
200{
201        if(!hardSync)
202                return;
203
204        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet reportAsReadyToSwap()" << std::endl;
205}
206
207bool dataIO_clusterENet::waitForSwap()
208{
209        if(!hardSync)
210                return true;
211
212        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForSwap()" << std::endl;
213
214        return true;
215}
216
217
218bool dataIO_clusterENet::waitForAllReadyToSwap()
219{
220        if(!hardSync)
221                return true;
222
223        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet waitForAllReadyToSwap()" << std::endl;
224
225        return true;
226}
227
228
229bool dataIO_clusterENet::sendSwapCommand()
230{
231        if(!hardSync)
232                return true;
233
234        OSG_NOTIFY( osg::ALWAYS ) << "clusterENet sendSwapCommand()" << std::endl;
235
236        return true;
237}
Note: See TracBrowser for help on using the repository browser.