source: osgVisual/trunk/src/cluster/enet/host.c @ 202

Last change on this file since 202 was 65, checked in by Torben Dannhauer, 14 years ago

new cluster implementation added: ENet
ENet is a reliable UDP implementation with quite simple usage and high performance for transmission with small or medium size packet.

File size: 17.0 KB
Line 
1/**
2 @file host.c
3 @brief ENet host management functions
4*/
5#define ENET_BUILDING_LIB 1
6#include <string.h>
7#include <time.h>
8#include "enet/enet.h"
9
10/** @defgroup host ENet host functions
11    @{
12*/
13
14/** Creates a host for communicating to peers. 
15
16    @param address   the address at which other peers may connect to this host.  If NULL, then no peers may connect to the host.
17    @param peerCount the maximum number of peers that should be allocated for the host.
18    @param channelLimit the maximum number of channels allowed; if 0, then this is equivalent to ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT
19    @param incomingBandwidth downstream bandwidth of the host in bytes/second; if 0, ENet will assume unlimited bandwidth.
20    @param outgoingBandwidth upstream bandwidth of the host in bytes/second; if 0, ENet will assume unlimited bandwidth.
21
22    @returns the host on success and NULL on failure
23
24    @remarks ENet will strategically drop packets on specific sides of a connection between hosts
25    to ensure the host's bandwidth is not overwhelmed.  The bandwidth parameters also determine
26    the window size of a connection which limits the amount of reliable packets that may be in transit
27    at any given time.
28*/
29ENetHost *
30enet_host_create (const ENetAddress * address, size_t peerCount, size_t channelLimit, enet_uint32 incomingBandwidth, enet_uint32 outgoingBandwidth)
31{
32    ENetHost * host;
33    ENetPeer * currentPeer;
34
35    if (peerCount > ENET_PROTOCOL_MAXIMUM_PEER_ID)
36      return NULL;
37
38    host = (ENetHost *) enet_malloc (sizeof (ENetHost));
39    if (host == NULL)
40      return NULL;
41
42    host -> peers = (ENetPeer *) enet_malloc (peerCount * sizeof (ENetPeer));
43    if (host -> peers == NULL)
44    {
45       enet_free (host);
46
47       return NULL;
48    }
49    memset (host -> peers, 0, peerCount * sizeof (ENetPeer));
50
51    host -> socket = enet_socket_create (ENET_SOCKET_TYPE_DATAGRAM);
52    if (host -> socket == ENET_SOCKET_NULL || (address != NULL && enet_socket_bind (host -> socket, address) < 0))
53    {
54       if (host -> socket != ENET_SOCKET_NULL)
55         enet_socket_destroy (host -> socket);
56
57       enet_free (host -> peers);
58       enet_free (host);
59
60       return NULL;
61    }
62
63    enet_socket_set_option (host -> socket, ENET_SOCKOPT_NONBLOCK, 1);
64    enet_socket_set_option (host -> socket, ENET_SOCKOPT_BROADCAST, 1);
65    enet_socket_set_option (host -> socket, ENET_SOCKOPT_RCVBUF, ENET_HOST_RECEIVE_BUFFER_SIZE);
66    enet_socket_set_option (host -> socket, ENET_SOCKOPT_SNDBUF, ENET_HOST_SEND_BUFFER_SIZE);
67
68    if (address != NULL)
69      host -> address = * address;
70
71    if (! channelLimit || channelLimit > ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT)
72      channelLimit = ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT;
73    else
74    if (channelLimit < ENET_PROTOCOL_MINIMUM_CHANNEL_COUNT)
75      channelLimit = ENET_PROTOCOL_MINIMUM_CHANNEL_COUNT;
76
77    host -> randomSeed = (enet_uint32) time(NULL) + (enet_uint32) (size_t) host;
78    host -> randomSeed = (host -> randomSeed << 16) | (host -> randomSeed >> 16);
79    host -> channelLimit = channelLimit;
80    host -> incomingBandwidth = incomingBandwidth;
81    host -> outgoingBandwidth = outgoingBandwidth;
82    host -> bandwidthThrottleEpoch = 0;
83    host -> recalculateBandwidthLimits = 0;
84    host -> mtu = ENET_HOST_DEFAULT_MTU;
85    host -> peerCount = peerCount;
86    host -> commandCount = 0;
87    host -> bufferCount = 0;
88    host -> checksum = NULL;
89    host -> receivedAddress.host = ENET_HOST_ANY;
90    host -> receivedAddress.port = 0;
91    host -> receivedData = NULL;
92    host -> receivedDataLength = 0;
93     
94    host -> totalSentData = 0;
95    host -> totalSentPackets = 0;
96    host -> totalReceivedData = 0;
97    host -> totalReceivedPackets = 0;
98
99    host -> compressor.context = NULL;
100    host -> compressor.compress = NULL;
101    host -> compressor.decompress = NULL;
102    host -> compressor.destroy = NULL;
103
104    enet_list_clear (& host -> dispatchQueue);
105
106    for (currentPeer = host -> peers;
107         currentPeer < & host -> peers [host -> peerCount];
108         ++ currentPeer)
109    {
110       currentPeer -> host = host;
111       currentPeer -> incomingPeerID = currentPeer - host -> peers;
112       currentPeer -> outgoingSessionID = currentPeer -> incomingSessionID = 0xFF;
113       currentPeer -> data = NULL;
114
115       enet_list_clear (& currentPeer -> acknowledgements);
116       enet_list_clear (& currentPeer -> sentReliableCommands);
117       enet_list_clear (& currentPeer -> sentUnreliableCommands);
118       enet_list_clear (& currentPeer -> outgoingReliableCommands);
119       enet_list_clear (& currentPeer -> outgoingUnreliableCommands);
120       enet_list_clear (& currentPeer -> dispatchedCommands);
121
122       enet_peer_reset (currentPeer);
123    }
124
125    return host;
126}
127
128/** Destroys the host and all resources associated with it.
129    @param host pointer to the host to destroy
130*/
131void
132enet_host_destroy (ENetHost * host)
133{
134    ENetPeer * currentPeer;
135
136    enet_socket_destroy (host -> socket);
137
138    for (currentPeer = host -> peers;
139         currentPeer < & host -> peers [host -> peerCount];
140         ++ currentPeer)
141    {
142       enet_peer_reset (currentPeer);
143    }
144
145    if (host -> compressor.context != NULL && host -> compressor.destroy)
146      (* host -> compressor.destroy) (host -> compressor.context);
147
148    enet_free (host -> peers);
149    enet_free (host);
150}
151
152/** Initiates a connection to a foreign host.
153    @param host host seeking the connection
154    @param address destination for the connection
155    @param channelCount number of channels to allocate
156    @param data user data supplied to the receiving host
157    @returns a peer representing the foreign host on success, NULL on failure
158    @remarks The peer returned will have not completed the connection until enet_host_service()
159    notifies of an ENET_EVENT_TYPE_CONNECT event for the peer.
160*/
161ENetPeer *
162enet_host_connect (ENetHost * host, const ENetAddress * address, size_t channelCount, enet_uint32 data)
163{
164    ENetPeer * currentPeer;
165    ENetChannel * channel;
166    ENetProtocol command;
167
168    if (channelCount < ENET_PROTOCOL_MINIMUM_CHANNEL_COUNT)
169      channelCount = ENET_PROTOCOL_MINIMUM_CHANNEL_COUNT;
170    else
171    if (channelCount > ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT)
172      channelCount = ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT;
173
174    for (currentPeer = host -> peers;
175         currentPeer < & host -> peers [host -> peerCount];
176         ++ currentPeer)
177    {
178       if (currentPeer -> state == ENET_PEER_STATE_DISCONNECTED)
179         break;
180    }
181
182    if (currentPeer >= & host -> peers [host -> peerCount])
183      return NULL;
184
185    currentPeer -> channels = (ENetChannel *) enet_malloc (channelCount * sizeof (ENetChannel));
186    if (currentPeer -> channels == NULL)
187      return NULL;
188    currentPeer -> channelCount = channelCount;
189    currentPeer -> state = ENET_PEER_STATE_CONNECTING;
190    currentPeer -> address = * address;
191    currentPeer -> connectID = ++ host -> randomSeed;
192
193    if (host -> outgoingBandwidth == 0)
194      currentPeer -> windowSize = ENET_PROTOCOL_MAXIMUM_WINDOW_SIZE;
195    else
196      currentPeer -> windowSize = (host -> outgoingBandwidth /
197                                    ENET_PEER_WINDOW_SIZE_SCALE) * 
198                                      ENET_PROTOCOL_MINIMUM_WINDOW_SIZE;
199
200    if (currentPeer -> windowSize < ENET_PROTOCOL_MINIMUM_WINDOW_SIZE)
201      currentPeer -> windowSize = ENET_PROTOCOL_MINIMUM_WINDOW_SIZE;
202    else
203    if (currentPeer -> windowSize > ENET_PROTOCOL_MAXIMUM_WINDOW_SIZE)
204      currentPeer -> windowSize = ENET_PROTOCOL_MAXIMUM_WINDOW_SIZE;
205         
206    for (channel = currentPeer -> channels;
207         channel < & currentPeer -> channels [channelCount];
208         ++ channel)
209    {
210        channel -> outgoingReliableSequenceNumber = 0;
211        channel -> outgoingUnreliableSequenceNumber = 0;
212        channel -> incomingReliableSequenceNumber = 0;
213
214        enet_list_clear (& channel -> incomingReliableCommands);
215        enet_list_clear (& channel -> incomingUnreliableCommands);
216
217        channel -> usedReliableWindows = 0;
218        memset (channel -> reliableWindows, 0, sizeof (channel -> reliableWindows));
219    }
220       
221    command.header.command = ENET_PROTOCOL_COMMAND_CONNECT | ENET_PROTOCOL_COMMAND_FLAG_ACKNOWLEDGE;
222    command.header.channelID = 0xFF;
223    command.connect.outgoingPeerID = ENET_HOST_TO_NET_16 (currentPeer -> incomingPeerID);
224    command.connect.incomingSessionID = currentPeer -> incomingSessionID;
225    command.connect.outgoingSessionID = currentPeer -> outgoingSessionID;
226    command.connect.mtu = ENET_HOST_TO_NET_32 (currentPeer -> mtu);
227    command.connect.windowSize = ENET_HOST_TO_NET_32 (currentPeer -> windowSize);
228    command.connect.channelCount = ENET_HOST_TO_NET_32 (channelCount);
229    command.connect.incomingBandwidth = ENET_HOST_TO_NET_32 (host -> incomingBandwidth);
230    command.connect.outgoingBandwidth = ENET_HOST_TO_NET_32 (host -> outgoingBandwidth);
231    command.connect.packetThrottleInterval = ENET_HOST_TO_NET_32 (currentPeer -> packetThrottleInterval);
232    command.connect.packetThrottleAcceleration = ENET_HOST_TO_NET_32 (currentPeer -> packetThrottleAcceleration);
233    command.connect.packetThrottleDeceleration = ENET_HOST_TO_NET_32 (currentPeer -> packetThrottleDeceleration);
234    command.connect.connectID = currentPeer -> connectID;
235    command.connect.data = ENET_HOST_TO_NET_32 (data);
236 
237    enet_peer_queue_outgoing_command (currentPeer, & command, NULL, 0, 0);
238
239    return currentPeer;
240}
241
242/** Queues a packet to be sent to all peers associated with the host.
243    @param host host on which to broadcast the packet
244    @param channelID channel on which to broadcast
245    @param packet packet to broadcast
246*/
247void
248enet_host_broadcast (ENetHost * host, enet_uint8 channelID, ENetPacket * packet)
249{
250    ENetPeer * currentPeer;
251
252    for (currentPeer = host -> peers;
253         currentPeer < & host -> peers [host -> peerCount];
254         ++ currentPeer)
255    {
256       if (currentPeer -> state != ENET_PEER_STATE_CONNECTED)
257         continue;
258
259       enet_peer_send (currentPeer, channelID, packet);
260    }
261
262    if (packet -> referenceCount == 0)
263      enet_packet_destroy (packet);
264}
265
266/** Sets the packet compressor the host should use to compress and decompress packets.
267    @param host host to enable or disable compression for
268    @param compressor callbacks for for the packet compressor; if NULL, then compression is disabled
269*/
270void
271enet_host_compress (ENetHost * host, const ENetCompressor * compressor)
272{
273    if (host -> compressor.context != NULL && host -> compressor.destroy)
274      (* host -> compressor.destroy) (host -> compressor.context);
275
276    if (compressor)
277      host -> compressor = * compressor;
278    else
279      host -> compressor.context = NULL;
280}
281
282/** Limits the maximum allowed channels of future incoming connections.
283    @param host host to limit
284    @param channelLimit the maximum number of channels allowed; if 0, then this is equivalent to ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT
285*/
286void
287enet_host_channel_limit (ENetHost * host, size_t channelLimit)
288{
289    if (! channelLimit || channelLimit > ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT)
290      channelLimit = ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT;
291    else
292    if (channelLimit < ENET_PROTOCOL_MINIMUM_CHANNEL_COUNT)
293      channelLimit = ENET_PROTOCOL_MINIMUM_CHANNEL_COUNT;
294
295    host -> channelLimit = channelLimit;
296}
297
298
299/** Adjusts the bandwidth limits of a host.
300    @param host host to adjust
301    @param incomingBandwidth new incoming bandwidth
302    @param outgoingBandwidth new outgoing bandwidth
303    @remarks the incoming and outgoing bandwidth parameters are identical in function to those
304    specified in enet_host_create().
305*/
306void
307enet_host_bandwidth_limit (ENetHost * host, enet_uint32 incomingBandwidth, enet_uint32 outgoingBandwidth)
308{
309    host -> incomingBandwidth = incomingBandwidth;
310    host -> outgoingBandwidth = outgoingBandwidth;
311    host -> recalculateBandwidthLimits = 1;
312}
313
314void
315enet_host_bandwidth_throttle (ENetHost * host)
316{
317    enet_uint32 timeCurrent = enet_time_get (),
318           elapsedTime = timeCurrent - host -> bandwidthThrottleEpoch,
319           peersTotal = 0,
320           dataTotal = 0,
321           peersRemaining,
322           bandwidth,
323           throttle = 0,
324           bandwidthLimit = 0;
325    int needsAdjustment;
326    ENetPeer * peer;
327    ENetProtocol command;
328
329    if (elapsedTime < ENET_HOST_BANDWIDTH_THROTTLE_INTERVAL)
330      return;
331
332    for (peer = host -> peers;
333         peer < & host -> peers [host -> peerCount];
334         ++ peer)
335    {
336        if (peer -> state != ENET_PEER_STATE_CONNECTED && peer -> state != ENET_PEER_STATE_DISCONNECT_LATER)
337          continue;
338
339        ++ peersTotal;
340        dataTotal += peer -> outgoingDataTotal;
341    }
342
343    if (peersTotal == 0)
344      return;
345
346    peersRemaining = peersTotal;
347    needsAdjustment = 1;
348
349    if (host -> outgoingBandwidth == 0)
350      bandwidth = ~0;
351    else
352      bandwidth = (host -> outgoingBandwidth * elapsedTime) / 1000;
353
354    while (peersRemaining > 0 && needsAdjustment != 0)
355    {
356        needsAdjustment = 0;
357       
358        if (dataTotal < bandwidth)
359          throttle = ENET_PEER_PACKET_THROTTLE_SCALE;
360        else
361          throttle = (bandwidth * ENET_PEER_PACKET_THROTTLE_SCALE) / dataTotal;
362
363        for (peer = host -> peers;
364             peer < & host -> peers [host -> peerCount];
365             ++ peer)
366        {
367            enet_uint32 peerBandwidth;
368           
369            if ((peer -> state != ENET_PEER_STATE_CONNECTED && peer -> state != ENET_PEER_STATE_DISCONNECT_LATER) ||
370                peer -> incomingBandwidth == 0 ||
371                peer -> outgoingBandwidthThrottleEpoch == timeCurrent)
372              continue;
373
374            peerBandwidth = (peer -> incomingBandwidth * elapsedTime) / 1000;
375            if ((throttle * peer -> outgoingDataTotal) / ENET_PEER_PACKET_THROTTLE_SCALE <= peerBandwidth)
376              continue;
377
378            peer -> packetThrottleLimit = (peerBandwidth * 
379                                            ENET_PEER_PACKET_THROTTLE_SCALE) / peer -> outgoingDataTotal;
380           
381            if (peer -> packetThrottleLimit == 0)
382              peer -> packetThrottleLimit = 1;
383           
384            if (peer -> packetThrottle > peer -> packetThrottleLimit)
385              peer -> packetThrottle = peer -> packetThrottleLimit;
386
387            peer -> outgoingBandwidthThrottleEpoch = timeCurrent;
388
389           
390            needsAdjustment = 1;
391            -- peersRemaining;
392            bandwidth -= peerBandwidth;
393            dataTotal -= peerBandwidth;
394        }
395    }
396
397    if (peersRemaining > 0)
398    for (peer = host -> peers;
399         peer < & host -> peers [host -> peerCount];
400         ++ peer)
401    {
402        if ((peer -> state != ENET_PEER_STATE_CONNECTED && peer -> state != ENET_PEER_STATE_DISCONNECT_LATER) ||
403            peer -> outgoingBandwidthThrottleEpoch == timeCurrent)
404          continue;
405
406        peer -> packetThrottleLimit = throttle;
407
408        if (peer -> packetThrottle > peer -> packetThrottleLimit)
409          peer -> packetThrottle = peer -> packetThrottleLimit;
410    }
411   
412    if (host -> recalculateBandwidthLimits)
413    {
414       host -> recalculateBandwidthLimits = 0;
415
416       peersRemaining = peersTotal;
417       bandwidth = host -> incomingBandwidth;
418       needsAdjustment = 1;
419
420       if (bandwidth == 0)
421         bandwidthLimit = 0;
422       else
423       while (peersRemaining > 0 && needsAdjustment != 0)
424       {
425           needsAdjustment = 0;
426           bandwidthLimit = bandwidth / peersRemaining;
427
428           for (peer = host -> peers;
429                peer < & host -> peers [host -> peerCount];
430                ++ peer)
431           {
432               if ((peer -> state != ENET_PEER_STATE_CONNECTED && peer -> state != ENET_PEER_STATE_DISCONNECT_LATER) ||
433                   peer -> incomingBandwidthThrottleEpoch == timeCurrent)
434                 continue;
435
436               if (peer -> outgoingBandwidth > 0 &&
437                   peer -> outgoingBandwidth >= bandwidthLimit)
438                 continue;
439
440               peer -> incomingBandwidthThrottleEpoch = timeCurrent;
441 
442               needsAdjustment = 1;
443               -- peersRemaining;
444               bandwidth -= peer -> outgoingBandwidth;
445           }
446       }
447
448       for (peer = host -> peers;
449            peer < & host -> peers [host -> peerCount];
450            ++ peer)
451       {
452           if (peer -> state != ENET_PEER_STATE_CONNECTED && peer -> state != ENET_PEER_STATE_DISCONNECT_LATER)
453             continue;
454
455           command.header.command = ENET_PROTOCOL_COMMAND_BANDWIDTH_LIMIT | ENET_PROTOCOL_COMMAND_FLAG_ACKNOWLEDGE;
456           command.header.channelID = 0xFF;
457           command.bandwidthLimit.outgoingBandwidth = ENET_HOST_TO_NET_32 (host -> outgoingBandwidth);
458
459           if (peer -> incomingBandwidthThrottleEpoch == timeCurrent)
460             command.bandwidthLimit.incomingBandwidth = ENET_HOST_TO_NET_32 (peer -> outgoingBandwidth);
461           else
462             command.bandwidthLimit.incomingBandwidth = ENET_HOST_TO_NET_32 (bandwidthLimit);
463
464           enet_peer_queue_outgoing_command (peer, & command, NULL, 0, 0);
465       } 
466    }
467
468    host -> bandwidthThrottleEpoch = timeCurrent;
469
470    for (peer = host -> peers;
471         peer < & host -> peers [host -> peerCount];
472         ++ peer)
473    {
474        peer -> incomingDataTotal = 0;
475        peer -> outgoingDataTotal = 0;
476    }
477}
478   
479/** @} */
Note: See TracBrowser for help on using the repository browser.