/* NTT - network torture tool. Copyright (C) 2008 Håkan T. Johansson Håkan T. Johansson Österängsgatan 22, II 302 53 Halmstad, Sweden This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // Compile: make ntt CFLAGS=-O3 // // or for debug: make ntt CFLAGS="-Wall -g" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef __Lynx__ # include # include # include # if !defined(LYNXOS40) typedef int socklen_t; # endif // Some versions actually has stdint... # if !defined(LYNXOS31) && !defined(LYNXOS40) typedef unsigned char uint8_t; typedef unsigned short int uint16_t; typedef unsigned long int uint32_t; typedef unsigned long long int uint64_t; # endif #else # include # include # include //# include #endif #define NTT_DEFAULT_PORT 56557 #if defined __GNUC__ && __GNUC__ < 3 # define ERROR(__VA_ARGS__...) { fprintf(stderr,__VA_ARGS__); } # define WARNING(__VA_ARGS__...) { fprintf(stderr,__VA_ARGS__); } # define INFO(__VA_ARGS__...) { fprintf(stdout,__VA_ARGS__);fflush(stdout);} #else # define ERROR(...) { fprintf(stderr,__VA_ARGS__); } # define WARNING(...) { fprintf(stderr,__VA_ARGS__); } # define INFO(...) { fprintf(stdout,__VA_ARGS__);fflush(stdout); } #endif typedef struct ntt_hilo_64_t { uint32_t _hi; uint32_t _lo; } ntt_hilo_64; uint64_t ntt_from_hilo_64(ntt_hilo_64 hilo) { uint64_t value; value = (((uint64_t) ntohl(hilo._hi)) << 32) | ntohl(hilo._lo); return value; } ntt_hilo_64 ntt_to_hilo_64(uint64_t value) { ntt_hilo_64 hilo; hilo._hi = htonl(value >> 32); hilo._lo = htonl(value); return hilo; } typedef struct ntt_server_info_t { // Totals for the server ntt_hilo_64 _total_recv; ntt_hilo_64 _total_send; // Number of active connections uint32_t _conn_recv; uint32_t _conn_send; // CPU time usage ntt_hilo_64 _utime_s; ntt_hilo_64 _stime_s; } ntt_server_info; typedef struct ntt_connection_info_t { // Totals for the connection ntt_hilo_64 _total_recv; ntt_hilo_64 _total_send; uint32_t _status; uint32_t _mtu; } ntt_connection_info; #define NTT_SEND_INFO_MAGIC (0x75638391 + 4) // last number is version typedef struct ntt_send_info_t { uint32_t _magic; uint32_t _connections; // number of connections that we send info on (0 or 1 only for now) ntt_server_info _server; ntt_connection_info _conn[1]; } ntt_send_info; #define NTT_ORDER_REQUEST_RECEIVE 1 #define NTT_ORDER_REQUEST_TRANSMIT 2 #define NTT_ORDER_REQUEST_SERVER_INFO 3 #define NTT_ORDER_REQUEST_PURGE_CLIENTS 4 #define NTT_ORDER_REQUEST_KILL_SERVER 5 #define NTT_ORDER_NUM_REQUESTS 5 #define NTT_ORDER_MAGIC (0xfe743bd4 + 3) // last number is version typedef struct ntt_order_t { uint32_t _magic; uint32_t _request; // 1 = connect to a server, 0 = send data (we're the server) uint32_t _reverse; uint32_t _pattern; uint32_t _limit_bytes_per_second; uint32_t _buffer_size; uint32_t _reply_buffer_size; // != 0 if we should be a ping-pong connection uint32_t _fault_location; // The following fields only apply when _receive = 1 char _server[128]; } ntt_order; #define NTT_ERROR_INFO_MAGIC (0x63975485 + 1) // last number is version #define NTT_ERROR_DISCONNECT 1 #define NTT_ERROR_DATA_ERROR 2 typedef struct ntt_error_info_t { uint32_t _magic; uint32_t _type; struct data_error_t { uint8_t _direction; // 0 for normal, 1 for return data uint8_t _dummy; uint8_t _got; uint8_t _expected; uint32_t _offset_buffer; ntt_hilo_64 _offset_total; } _data_error; } ntt_error_info; /******************************************************************** * * Stuff needed for the server part. * */ typedef struct ntt_buffer_t { size_t _size; // total size size_t _offset; // current offset char *_buf; } ntt_buffer; /////////////////////////////////////////////////////////// #define NTT_CONN_MODE_READ_ORDERS 1 #define NTT_CONN_MODE_HANDLE_ORDERS 2 #define NTT_CONN_MODE_WAIT_CONNECTED 3 #define NTT_CONN_MODE_SEND_ORDERS 4 #define NTT_CONN_MODE_PREPARE_READ 5 #define NTT_CONN_MODE_READ 6 #define NTT_CONN_MODE_CHECK_READ 7 #define NTT_CONN_MODE_PREPARE_WRITE 8 #define NTT_CONN_MODE_WAIT_WRITE 9 #define NTT_CONN_MODE_WRITE 10 #define NTT_CONN_MODE_WRITE_ERROR 11 #define NTT_CONN_MODE_AFTER_WRITE 12 #define NTT_CONN_MODE_SHUTDOWN 13 /////////////////////////////////////////////////////////// #define NTT_CONN_PATTERN_NONE 1 #define NTT_CONN_PATTERN_ZERO 2 #define NTT_CONN_PATTERN_SCAN_BUF 3 #define NTT_CONN_PATTERN_SCAN_TOTAL 4 #define NTT_CONN_PATTERN_RANDOM 5 #define NTT_CONN_PATTERN_RANDOM_BUF 6 #define NTT_CONN_PATTERN_FAULT 7 #define NTT_CONN_NUM_PATTERNS 7 /////////////////////////////////////////////////////////// struct ntt_connection_t; struct ntt_peer_t; typedef struct ntt_connection_t { int _fd; int _mode; int _mode_after_write; int _mode_after_read; int _reverse; int _pattern; int _limit_bytes_per_second; int _direction; uint32_t _fault_location; char _next_recv_mark; char _next_send_mark; unsigned long _send_rand_next; unsigned long _recv_rand_next; double _limit_usec_per_byte; struct timeval _next_time; struct sockaddr_in _cli_addr; ntt_buffer _recv; ntt_buffer _send; ntt_buffer _verify; ntt_buffer _send_order; uint64_t _total_recv; uint64_t _total_send; struct ntt_peer_t *_peer; struct ntt_connection_t *_prev; struct ntt_connection_t *_next; } ntt_connection; /////////////////////////////////////////////////////////// #define NTT_PEER_MODE_PREPARE_WRITE 1 #define NTT_PEER_MODE_WRITE 2 #define NTT_PEER_MODE_WRITE_ERROR 3 #define NTT_PEER_MODE_SHUTDOWN 4 /////////////////////////////////////////////////////////// #define NTT_CONN_STATUS_DATA_ERROR 0x0001 /////////////////////////////////////////////////////////// typedef struct ntt_peer_t { int _fd; int _mode; uint32_t _status; ntt_buffer _send; ntt_buffer _error; struct timeval _next_time; struct ntt_connection_t *_conn; struct ntt_peer_t *_prev; struct ntt_peer_t *_next; } ntt_peer; /////////////////////////////////////////////////////////// int _shutdown = 0; ntt_connection *_connections = NULL; ntt_peer *_peers = NULL; uint64_t _server_total_recv = 0; uint64_t _server_total_send = 0; /////////////////////////////////////////////////////////// void ntt_create_buffer(ntt_buffer *buf,size_t size) { buf->_buf = malloc(size); if (!buf->_buf) { ERROR("Memory allocation error (buf).\n"); exit(1); } buf->_size = size; } void ntt_realloc_buffer(ntt_buffer *buf,size_t size) { buf->_buf = realloc(buf->_buf,size); if (!buf->_buf) { ERROR("Memory allocation error (buf, realloc).\n"); exit(1); } buf->_size = size; } void ntt_destroy_buffer(ntt_buffer *buf) { free(buf->_buf); buf->_buf = NULL; buf->_size = 0; } // prerequisite: file descriptor ok for read by select, or we'll block // will only issue one read call, return -1 => failure, disconnect int ntt_read_buffer(ntt_buffer *buf,int fd) { int max_recv = buf->_size - buf->_offset; int n = read(fd,buf->_buf + buf->_offset,max_recv); if (n == 0) { // Socket closed on other end, this we treat as an error, since // we never close :-) return -1; } if (n < 0) { if (errno == EINTR || errno == EAGAIN) return 0; // try again next time perror("read"); WARNING("Error while reading data.\n"); // treat it as some general failure, disconnect if that's an // option return -1; } buf->_offset += n; return n; } // prerequisite: file descriptor ok for write by select, or we'll block // will only issue one write call, return -1 => failure, disconnect int ntt_write_buffer(ntt_buffer *buf,int fd) { int max_send = buf->_size - buf->_offset; int n = write(fd,buf->_buf + buf->_offset,max_send); if (n == 0) { ERROR("write returned 0, cannot happen.\n"); exit(1); } if (n < 0) { if (errno == EINTR || errno == EAGAIN) return 0; // try again next time if (errno == EPIPE) { WARNING("Disconnected, pipe closed.\n"); } else { perror("write"); WARNING("Error while writing data.\n"); // treat it as some general failure, disconnect if that's an // option } return -1; } buf->_offset += n; return n; } /* Adapted from Linux Programmer's Manual...: * * "POSIX.1-2001 gives the following example of an implementation of * rand() and srand(), possibly useful when one needs the same * sequence on two different machines." * * Which is just what we want. This PRNG may not be so good, but at * least gzip is not able to compress it. bzip2 squeezes 5% away. * Should be able to defeat most communication compressions anyway... */ /* RAND_MAX assumed to be 32767 */ int myrand(unsigned long *next) { *next = *next * 1103515245 + 12345; return((unsigned)(*next/65536) % 32768); } /* --- */ void print_error(ntt_error_info *error) { switch (ntohl(error->_type)) { case NTT_ERROR_DISCONNECT: printf ("disconnect"); break; case NTT_ERROR_DATA_ERROR: printf ("(d=%s,g=x%02x,e=x%02x,o=x%llx,bo=x%lx)", error->_data_error._direction ? "back" : "forw", error->_data_error._got, error->_data_error._expected, (unsigned long long) ntt_from_hilo_64(error->_data_error._offset_total), (unsigned long) ntohl(error->_data_error._offset_buffer)); break; default: printf ("unknown"); } } int ntt_conn_setup_select(ntt_connection *conn, int nfd, fd_set *readfds,fd_set *writefds, struct timeval *now, struct timeval *next_time) { // We always expect error messages from the other end... assert(conn->_mode != 0); switch (conn->_mode) { case NTT_CONN_MODE_PREPARE_READ: assert(conn->_recv._size); // must have non-zero recv buffer // reset the receive pointer conn->_recv._offset = 0; conn->_mode = NTT_CONN_MODE_READ; // no break, fall through case NTT_CONN_MODE_READ_ORDERS: case NTT_CONN_MODE_READ: FD_SET(conn->_fd,readfds); if (conn->_fd > nfd) nfd = conn->_fd; break; case NTT_CONN_MODE_PREPARE_WRITE: assert(conn->_send._size); // must have non-zero send buffer // reset the send pointer conn->_send._offset = 0; // if we're supposed to prepare the buffer with some special // pattern, now is the time! switch (conn->_pattern) { case NTT_CONN_PATTERN_SCAN_TOTAL: { int i; char fill = (char) conn->_total_send; for (i = 0; i < conn->_send._size; i++, fill++) conn->_send._buf[i] = fill; break; } case NTT_CONN_PATTERN_RANDOM: { int i; for (i = 0; i < conn->_send._size; i++) conn->_send._buf[i] = myrand(&conn->_send_rand_next) >> 6; break; } } // Put the marker that tells it is a normal buffer (not error) conn->_send._buf[0] = conn->_next_send_mark++; if (conn->_pattern == NTT_CONN_PATTERN_FAULT) { // Simulate data error if (conn->_total_send <= conn->_fault_location && conn->_total_send + conn->_send._size > conn->_fault_location) conn->_send._buf[conn->_fault_location - conn->_total_send] += 2; // adding 2 never has the same effect as ~, so cannot be mistaken // for start of an error message } // go into next state conn->_mode = NTT_CONN_MODE_WAIT_WRITE; // no break, fall through case NTT_CONN_MODE_WAIT_WRITE: // if we are supposed to be bandwidth limited, now is the time // to check if we're allowed to send data yet if (conn->_limit_bytes_per_second && (timercmp(now,&conn->_next_time,<))) { // time is too early, set next time if we are earlier // than next timeout if (!timerisset(next_time) || (timercmp(&conn->_next_time,next_time,<))) *next_time = conn->_next_time; break; } conn->_mode = NTT_CONN_MODE_WRITE; // no break, fall through case NTT_CONN_MODE_WAIT_CONNECTED: case NTT_CONN_MODE_SEND_ORDERS: case NTT_CONN_MODE_WRITE: case NTT_CONN_MODE_WRITE_ERROR: FD_SET(conn->_fd,writefds); if (conn->_fd > nfd) nfd = conn->_fd; break; case NTT_CONN_MODE_SHUTDOWN: // Make sure it gets handled immediately, set timeout to 0 *next_time = *now; break; } if (conn->_mode == NTT_CONN_MODE_WAIT_WRITE || conn->_mode == NTT_CONN_MODE_WRITE) { // If we're just a writing kind of device, the other end may // have detected an error, and sent such a notice to us... Then // we must read that, as the socket could be blocked for our // writing by the other end that will not read anything more... FD_SET(conn->_fd,readfds); if (conn->_fd > nfd) nfd = conn->_fd; } return nfd; } void fill_static_pattern(ntt_buffer *buf,int pattern) { int i; char fill; unsigned long seed; switch (pattern) { case NTT_CONN_PATTERN_NONE: case NTT_CONN_PATTERN_ZERO: for (i = 0; i < buf->_size; i++) buf->_buf[i] = 0; break; case NTT_CONN_PATTERN_FAULT: case NTT_CONN_PATTERN_SCAN_BUF: fill = 0; for (i = 0; i < buf->_size; i++, fill++) buf->_buf[i] = fill; break; case NTT_CONN_PATTERN_RANDOM_BUF: seed = 0; for (i = 0; i < buf->_size; i++, fill++) buf->_buf[i] = myrand(&seed) >> 7; break; case NTT_CONN_PATTERN_SCAN_TOTAL: case NTT_CONN_PATTERN_RANDOM: // pattern is not static break; } } int ntt_get_host_port(const char *server,struct sockaddr_in *serv_addr) { struct hostent *h; char *hostname; int port = NTT_DEFAULT_PORT; // if there is a colon in the hostname, interpret what follows as a // port number, overriding the default port { char *colon; hostname = strdup(server); colon = strchr(hostname,':'); if (colon) { port = atoi(colon+1); *colon = 0; // cut the hostname } /* get server IP address (no check if input is IP address or DNS name */ h = gethostbyname(hostname); free(hostname); } if(h == NULL) { WARNING("Unknown host '%s'.\n",server); return 0; } #ifndef __Lynx__ INFO("Server '%s' known... (IP: %s, port: %d).\n", h->h_name, inet_ntoa(*(struct in_addr *)h->h_addr_list[0]), port); #else INFO("Server '%s' known... (port: %d).\n", h->h_name, port); #endif /* bind any port */ serv_addr->sin_family = h->h_addrtype; memcpy((char *) &serv_addr->sin_addr.s_addr, h->h_addr_list[0], h->h_length); serv_addr->sin_port = htons(port); return 1; } void set_max_window_size(int fd) { int i; return; for (i = 256; i > 1; i--) { int window_size = i * 1024; if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &window_size, sizeof(window_size)) == 0) { printf ("SO_SNDBUF set to %d\n",window_size); break; } } for (i = 256; i > 1; i--) { int window_size = i * 1024; if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &window_size, sizeof(window_size)) == 0) { printf ("SO_RCVBUF set to %d\n",window_size); break; } } } int handle_orders(ntt_connection *conn, struct timeval *now) { // make a copy of the orders such that we can kill the receive // buffer of conn ntt_order *src = (ntt_order*) conn->_recv._buf; ntt_order order; // first byteswap the orders if (ntohl(src->_magic) != NTT_ORDER_MAGIC) return 0; // bad magic order._request = ntohl(src->_request); order._reverse = ntohl(src->_reverse); order._pattern = ntohl(src->_pattern); order._limit_bytes_per_second = ntohl(src->_limit_bytes_per_second); order._buffer_size = ntohl(src->_buffer_size); order._reply_buffer_size = ntohl(src->_reply_buffer_size); order._fault_location = ntohl(src->_fault_location); memcpy(order._server,src->_server,sizeof(order._server)); // make sure hostname doesn't overflow order._server[sizeof(order._server)-1] = 0; INFO("Got orders: req=%d rvs=%d ptn=%d lim=%d siz=%d rply=%d fl=%d srv=%s\n", (int) order._request,(int) order._reverse,(int) order._pattern, (int) order._limit_bytes_per_second, (int) order._buffer_size,(int) order._reply_buffer_size, (int) order._fault_location, order._server); ntt_destroy_buffer(&conn->_recv); // now figure out what we need to do... // make some sanity checks on the orders // don't print errors, as we presume it's some nasty attempt on us if (order._reverse != 0 && order._reverse != 1) return 0; if (order._request == 0 || order._request > NTT_ORDER_NUM_REQUESTS) return 0; if (order._request == NTT_ORDER_REQUEST_RECEIVE || order._request == NTT_ORDER_REQUEST_TRANSMIT) { if (order._pattern == 0 || order._pattern > NTT_CONN_NUM_PATTERNS) return 0; if (order._limit_bytes_per_second > 10000000000) return 0; if (order._buffer_size == 0 || order._buffer_size > 2*1024*1024) return 0; if (order._reply_buffer_size > 2*1024*1024) return 0; } if (order._request == NTT_ORDER_REQUEST_KILL_SERVER) { _shutdown = 1; return 0; } if (order._request == NTT_ORDER_REQUEST_PURGE_CLIENTS) { // Put all client connections in death state. ntt_connection *conn; for (conn = _connections->_next; conn != _connections; conn = conn->_next) { conn->_mode = NTT_CONN_MODE_SHUTDOWN; } return 0; } if (order._request == NTT_ORDER_REQUEST_RECEIVE || order._request == NTT_ORDER_REQUEST_SERVER_INFO) { struct sockaddr_in serv_addr; ntt_order *send_order; int ret; // So, we're supposed to setup a connection to a server. The // file descriptor that we have is in reality a file descriptor // to the controling client, so set up that structure instead. ntt_peer *peer = malloc (sizeof(ntt_peer)); if (!peer) { ERROR("Memory allocation error (peer).\n"); exit(1); } memset(peer,0,sizeof(ntt_peer)); peer->_fd = conn->_fd; peer->_next_time = *now; peer->_mode = NTT_PEER_MODE_PREPARE_WRITE; // allocate memory enough to send data for server and one connection ntt_create_buffer(&peer->_send,sizeof(ntt_send_info)); // Insert the peer into the list peer->_next = _peers; peer->_prev = _peers->_prev; _peers->_prev->_next = peer; _peers->_prev = peer; conn->_fd = -1; // taken over by peer INFO("Changed connection into peer...\n"); if (order._request == NTT_ORDER_REQUEST_SERVER_INFO) { // Dirty trick: return 0 and let the normal code delete the // conn data structure. return 0; } peer->_conn = conn; conn->_peer = peer; // socket successfully created, set up the connection // variables ntt_create_buffer(&conn->_recv,order._buffer_size); ntt_create_buffer(&conn->_verify,order._buffer_size); if (order._reply_buffer_size) { ntt_create_buffer(&conn->_send,order._reply_buffer_size); conn->_mode_after_read = NTT_CONN_MODE_PREPARE_WRITE; conn->_mode_after_write = NTT_CONN_MODE_PREPARE_READ; } else { // no ping-pong mode, just always read if (order._reverse) conn->_mode_after_write = NTT_CONN_MODE_PREPARE_WRITE; else conn->_mode_after_read = NTT_CONN_MODE_PREPARE_READ; } conn->_pattern = order._pattern; conn->_reverse = order._reverse; conn->_direction = order._reverse; conn->_fault_location = order._fault_location; if (order._reverse) { ntt_buffer tmp = conn->_send; conn->_send = conn->_recv; conn->_recv = tmp; if (order._limit_bytes_per_second) { conn->_limit_bytes_per_second = order._limit_bytes_per_second; conn->_limit_usec_per_byte = 1000000.0 / order._limit_bytes_per_second; conn->_next_time = *now; } } else // bandwidth limitation does not apply to us ; // We need to setup a the orders to be sent to the write server ntt_create_buffer(&conn->_send_order,sizeof(ntt_order)); send_order = (ntt_order *) conn->_send_order._buf; memset(send_order,0,sizeof(ntt_order)); send_order->_magic = htonl(NTT_ORDER_MAGIC); send_order->_request = htonl(NTT_ORDER_REQUEST_TRANSMIT); send_order->_reverse = htonl(order._reverse); send_order->_pattern = htonl(order._pattern); send_order->_limit_bytes_per_second = htonl(order._limit_bytes_per_second); send_order->_buffer_size = htonl(order._buffer_size); send_order->_reply_buffer_size = htonl(order._reply_buffer_size); send_order->_fault_location = htonl(order._fault_location); // Since the connect may take a very long time, we do it with // the socket in non-blocking mode, and handle the connection as // it becomes alive. Therefore we do the connect as the last // thing, letting it complete on it's own... // (We'll not allow the server to block just because the user // specified a bad host... (denial-of-service). if (!ntt_get_host_port(order._server,&serv_addr)) return 0; // Now, fix up the connection /* socket creation */ conn->_fd = socket(PF_INET,SOCK_STREAM,IPPROTO_TCP); if (conn->_fd < 0) { perror("socket"); WARNING("Cannot open socket.\n"); return 0; } // make the server socket non-blocking, such that we're not hit by // false selects (see Linux man page bug notes) if (fcntl(conn->_fd,F_SETFL,fcntl(conn->_fd,F_GETFL) | O_NONBLOCK) == -1) { perror("fcntl"); exit(1); } set_max_window_size(conn->_fd); do_connect: ret = connect(conn->_fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)); if (ret == 0) { conn->_mode = NTT_CONN_MODE_SEND_ORDERS; } else { if (errno == EINTR) goto do_connect; if (errno != EINPROGRESS) { perror("connect"); WARNING("Failure connecting to server\n"); return 0; // deletes the connection } conn->_mode = NTT_CONN_MODE_WAIT_CONNECTED; } } if (order._request == NTT_ORDER_REQUEST_TRANSMIT) { // Things are easy, we should just set up the fields // appropriately ntt_create_buffer(&conn->_send,order._buffer_size); ntt_create_buffer(&conn->_verify,order._buffer_size); // If we're in ping-pong mode, then read and write alternates if (order._reply_buffer_size) { ntt_create_buffer(&conn->_recv,order._reply_buffer_size); conn->_mode_after_read = NTT_CONN_MODE_PREPARE_WRITE; conn->_mode_after_write = NTT_CONN_MODE_PREPARE_READ; } else { // no ping-pong mode, just always write if (order._reverse) conn->_mode_after_read = NTT_CONN_MODE_PREPARE_READ; else conn->_mode_after_write = NTT_CONN_MODE_PREPARE_WRITE; } conn->_pattern = order._pattern; conn->_reverse = order._reverse; conn->_direction = !order._reverse; conn->_fault_location = order._fault_location; if (order._reverse) { ntt_buffer tmp = conn->_send; conn->_send = conn->_recv; conn->_recv = tmp; // We begin by reading conn->_mode = NTT_CONN_MODE_PREPARE_READ; } else { if (order._limit_bytes_per_second) { conn->_limit_bytes_per_second = order._limit_bytes_per_second; conn->_limit_usec_per_byte = 1000000.0 / order._limit_bytes_per_second; conn->_next_time = *now; } // We begin by writing conn->_mode = NTT_CONN_MODE_PREPARE_WRITE; } } // Make sure whoever sends data fills in static patterns fill_static_pattern(&conn->_send,order._pattern); fill_static_pattern(&conn->_verify,order._pattern); // To be able to receive error messages easily, make sure the // receive buffer has space to take at least one byte if (!conn->_recv._buf) ntt_create_buffer(&conn->_recv,1); return 1; } int ntt_conn_after_select(ntt_connection *conn, fd_set *readfds,fd_set *writefds, struct timeval *now) { if (conn->_mode == NTT_CONN_MODE_WAIT_WRITE || conn->_mode == NTT_CONN_MODE_WRITE) { // If we're just a writing kind of device, the other end may // have detected an error, and sent such a notice to us... In // that case, begin by trying to read that... if (FD_ISSET(conn->_fd,readfds)) { int n; // Since our current mode is writing, it is fine to start to // screw the receive buffer up. We have for this very case // ensured that it at least has space for one byte. conn->_recv._offset = 0; n = ntt_read_buffer(&conn->_recv,conn->_fd); if (n < 0) return 0; // failure if (n != 0) { INFO("Got data (%d) when in write mode.\n",n); // We got something, either an failure -> fatal, or part // of an error message. As we should never get anything // when in write mode, we jump to receive state if (conn->_recv._buf[0] != ~conn->_next_recv_mark) { // Things are very strange it does not even look // like an error message... Complain, but go to // read mode anyhow... WARNING("Received (non-error message) " "when in write mode (%02x,%02x).\n", conn->_recv._buf[0],~conn->_next_recv_mark); } conn->_total_recv += (uint64_t) n; _server_total_recv += (uint64_t) n; conn->_mode = NTT_CONN_MODE_READ; goto handle_read_data; } } } assert(conn->_mode != 0); switch (conn->_mode) { case NTT_CONN_MODE_READ_ORDERS: { int n; if (!FD_ISSET(conn->_fd,readfds)) return 1; n = ntt_read_buffer(&conn->_recv,conn->_fd); if (n < 0) return 0; // failure if (conn->_recv._offset < conn->_recv._size) { // we've not reached end of buffer yet break; } conn->_mode = NTT_CONN_MODE_CHECK_READ; // no break, fall through } case NTT_CONN_MODE_HANDLE_ORDERS: // The receive buffer now contains our order structure, // so handle the orders... return handle_orders(conn,now); case NTT_CONN_MODE_READ: { int n; if (!FD_ISSET(conn->_fd,readfds)) return 1; n = ntt_read_buffer(&conn->_recv,conn->_fd); if (n < 0) return 0; // failure conn->_total_recv += (uint64_t) n; _server_total_recv += (uint64_t) n; handle_read_data: // Have we by chance gotten an error message? if (conn->_recv._buf[0] == ~conn->_next_recv_mark && conn->_recv._size < sizeof(ntt_error_info) + 1) ntt_realloc_buffer(&conn->_recv,sizeof(ntt_error_info) + 1); if (conn->_recv._offset < conn->_recv._size) { // we've not reached end of buffer yet // An error message will terminate earlier... Allow that, // i.e. if error message, then fall through to check... if (conn->_recv._buf[0] != ~conn->_next_recv_mark || conn->_recv._offset != sizeof(ntt_error_info) + 1) break; } // reached end of buffer, what should we do next? conn->_mode = NTT_CONN_MODE_CHECK_READ; // no break, fall through } case NTT_CONN_MODE_CHECK_READ: { ntt_error_info error; int i; char chk; char *ptr = conn->_recv._buf; char *ver = conn->_verify._buf; // The first byte contains a special mark that tells if it is a // real buffer, or an error message (i.e. that other end // detected transmission error) chk = conn->_next_recv_mark++; if (*ptr != chk) { if (*ptr != ~chk) { // It is simply wrong (i.e. not even an error), assume // it is a normal error in this transmission... goto data_error; } // So, data is to be an error structure, check that size is // enough and magic good, then propagate it to our peer (if // any), and do a shutdown of us ourselves... if (conn->_recv._size < sizeof(error) + 1) { chk = ~*ptr; // simulate error (bad one) goto data_error; } memcpy(&error,conn->_recv._buf+1,sizeof(error)); if (error._magic != htonl(NTT_ERROR_INFO_MAGIC)) { chk = ~*ptr; // simulate error (bad one) goto data_error; } printf ("Connection had transmission error: "); print_error(&error); printf ("\n"); // We're done, get rid of us... conn->_mode = NTT_CONN_MODE_SHUTDOWN; goto send_error_to_peer; } ptr++; ver++; switch (conn->_pattern) { // NTT_CONN_PATTERN_NONE, no test case NTT_CONN_PATTERN_ZERO: if (memcmp(ptr,ver,conn->_recv._size-1) == 0) break; chk = 0; for (i = conn->_recv._size-1; i; i--, ptr++) if (*ptr != chk) goto data_error; break; case NTT_CONN_PATTERN_FAULT: case NTT_CONN_PATTERN_SCAN_BUF: if (memcmp(ptr,ver,conn->_recv._size-1) == 0) break; chk = 0+1; for (i = conn->_recv._size-1; i; i--, ptr++, chk++) if (*ptr != chk) goto data_error; break; case NTT_CONN_PATTERN_SCAN_TOTAL: chk = (char) ((int) conn->_total_recv - conn->_recv._size) + 1; for (i = conn->_recv._size-1; i; i--, ptr++, chk++) if (*ptr != chk) goto data_error; break; case NTT_CONN_PATTERN_RANDOM: myrand(&conn->_recv_rand_next); // discard one for (i = conn->_recv._size-1; i; i--, ptr++) { chk = (myrand(&conn->_recv_rand_next) >> 6); if (*ptr != chk) goto data_error; } break; case NTT_CONN_PATTERN_RANDOM_BUF: if (memcmp(ptr,ver,conn->_recv._size-1) == 0) break; // We compare to the verification buffer for (i = conn->_recv._size-1; i; i--, ptr++, ver++) { if (*ptr != *ver) { chk = *ver; goto data_error; } } break; } if (0) // dirty... we get into here only via jump on error... { data_error: INFO("Receive data error (0x%02x != 0x%02x), " "byte 0x%08llx, 0x%08lx in buffer...\n", (unsigned char) *ptr, (unsigned char) chk, (unsigned long long) (conn->_total_recv - conn->_recv._size + ptr - conn->_recv._buf), (unsigned long) (ptr - conn->_recv._buf)); // Create an error structure, and send it (OOB) both to our // other end and peer (if a any) memset(&error,0,sizeof(error)); error._magic = htonl(NTT_ERROR_INFO_MAGIC); error._type = htonl(NTT_ERROR_DATA_ERROR); error._data_error._direction = conn->_direction; error._data_error._got = *ptr; error._data_error._expected = chk; error._data_error._offset_buffer = htonl(ptr - conn->_recv._buf); error._data_error._offset_total = ntt_to_hilo_64 (conn->_total_recv - conn->_recv._size + ptr - conn->_recv._buf); // To send the error message to the other end, we just need // to put it in the transmit buffer, and set next mode to // data transmission. // Make sure send buffer has right size ntt_realloc_buffer(&conn->_send,1+sizeof(error)); // Put the marker that tells it is an error conn->_send._buf[0] = ~conn->_next_send_mark; // Copy the data memcpy(conn->_send._buf+1,&error,sizeof(error)); // reset the send pointer conn->_send._offset = 0; // Do write conn->_mode = NTT_CONN_MODE_WRITE_ERROR; // After transmission is done, next is shutdown... conn->_mode_after_write = NTT_CONN_MODE_SHUTDOWN; // Don't fool around with any waiting conn->_limit_bytes_per_second = 0; send_error_to_peer: // we had a data error, mark that in the status and // notify our peer if (conn->_peer) { // the peer is a bit more messy, as it may already be in // the middle of writing something. If it is, then we // cannot push ourselves inbetween that, as whoever is // reading would then be desynced. Instead, prepare a // send buffer and put that on hold, peer will deal with // that as soon as possible, after which it will go into // shutdown mode... // Create buffer ntt_create_buffer(&conn->_peer->_error,sizeof(error)); // Copy data memcpy(conn->_peer->_error._buf,&error,sizeof(error)); conn->_peer->_status |= NTT_CONN_STATUS_DATA_ERROR; // destroy the conn <-> peer relationship, as we do // not want our tear-down routine to kill the peer conn->_peer->_conn = NULL; conn->_peer = NULL; } return 1; } // What should be done after writing? conn->_mode = conn->_mode_after_read; // All those modes begin in the before_select() function break; } case NTT_CONN_MODE_WAIT_CONNECTED: { int so_error; socklen_t optlen; int ret; if (!FD_ISSET(conn->_fd,writefds)) return 1; // so the connect has finished, figure out if it was successul optlen = sizeof(so_error); ret = getsockopt(conn->_fd,SOL_SOCKET,SO_ERROR,&so_error,&optlen); if (ret != 0) { perror("getsockopt"); exit(1); } if (so_error != 0) { WARNING("delayed connect: %s\n", strerror(so_error)); return 0; // connection failed } conn->_mode = NTT_CONN_MODE_SEND_ORDERS; // must wait for allowance to write, break break; } case NTT_CONN_MODE_SEND_ORDERS: { int n; if (!FD_ISSET(conn->_fd,writefds)) return 1; n = ntt_write_buffer(&conn->_send_order,conn->_fd); if (n < 0) return 0; // failure if (conn->_send_order._offset < conn->_send_order._size) { // we've not reached end of buffer yet break; } // Kill the buffer with orders to send ntt_destroy_buffer(&conn->_send_order); if (conn->_reverse) // We begin by writing conn->_mode = NTT_CONN_MODE_PREPARE_WRITE; else // We begin by reading conn->_mode = NTT_CONN_MODE_PREPARE_READ; // So we must wait for data to read... break; } case NTT_CONN_MODE_WRITE: case NTT_CONN_MODE_WRITE_ERROR: { int n; if (!FD_ISSET(conn->_fd,writefds)) return 1; n = ntt_write_buffer(&conn->_send,conn->_fd); if (n < 0) return 0; // failure conn->_total_send += (uint64_t) n; _server_total_send += (uint64_t) n; if (conn->_send._offset < conn->_send._size) { // we've not reached end of buffer yet break; } // reached end of buffer, what should we do next? conn->_mode = NTT_CONN_MODE_AFTER_WRITE; // no break, fall through } case NTT_CONN_MODE_AFTER_WRITE: // if we're supposed to do bandwidth limiting, then figure // out when we next time may write some data... if (conn->_limit_bytes_per_second) { // last time we updated the limiting functions, we had not // sent this buffer. this buffer was allowed to be sent at // earliest @conn->_next_time so, next buffer may at // earliest be sent at @conn->_next_time + conn->_send._size // / bytes/s long long int add_usec = (long long int) (conn->_send._size * conn->_limit_usec_per_byte); conn->_next_time.tv_sec += (int) (add_usec / 1000000); conn->_next_time.tv_usec += (int) (add_usec % 1000000); if (conn->_next_time.tv_usec >= 1000000) { conn->_next_time.tv_sec++; conn->_next_time.tv_usec -= 1000000; } // if time has already passed, set the allowed time to now // (such that we are not allowed to use old unused // bandwidth) if (timercmp(&conn->_next_time,now,<)) conn->_next_time = *now; } // What should be done after writing? conn->_mode = conn->_mode_after_write; // All those modes begin in the before_select() function break; case NTT_CONN_MODE_SHUTDOWN: INFO("Conn shutdown.\n"); return 0; } return 1; } int ntt_peer_setup_select(ntt_peer *peer, int nfd, fd_set *readfds,fd_set *writefds, struct timeval *now, struct timeval *next_time) { assert(peer->_mode != 0); if (peer->_mode != NTT_PEER_MODE_WRITE && peer->_error._buf) { INFO("Peer start to send error message...\n"); // We are to send the error. On purpose, we do not do shutdown // after propagating an (data) error, as that would get the // client do also go down, with all other connections. peer->_mode = NTT_PEER_MODE_WRITE_ERROR; // Start to send from right point... peer->_error._offset = 0; } switch (peer->_mode) { case NTT_PEER_MODE_PREPARE_WRITE: // only send info so often... if (timercmp(now,&peer->_next_time,<)) { // time is too early, set next time if we are earlier // than next timeout if (!timerisset(next_time) || timercmp(&peer->_next_time,next_time,<)) *next_time = peer->_next_time; return nfd; } // instead of doing correct book-keeping on the number of // sending and receiving connections, update the counts every // time { ntt_send_info *info; size_t real_size; uint32_t server_conn_recv = 0; uint32_t server_conn_send = 0; struct rusage usage; ntt_connection *conn; for (conn = _connections->_next; conn != _connections; conn = conn->_next) { if (conn->_mode_after_read) server_conn_recv++; if (conn->_mode_after_write) server_conn_send++; } // prepare the status we're gonna send to the client // send both connection and server info if it has a connection memset(peer->_send._buf,0,peer->_send._size); info = (ntt_send_info *) peer->_send._buf; info->_magic = htonl(NTT_SEND_INFO_MAGIC); info->_connections = 0; info->_server._total_recv = ntt_to_hilo_64(_server_total_recv); info->_server._total_send = ntt_to_hilo_64(_server_total_send); info->_server._conn_recv = ntohl(server_conn_recv); info->_server._conn_send = ntohl(server_conn_send); getrusage(RUSAGE_SELF,&usage); info->_server._utime_s = ntt_to_hilo_64((((uint64_t) usage.ru_utime.tv_sec) << 32) + (((uint64_t) usage.ru_utime.tv_usec) << 32) / 1000000); info->_server._stime_s = ntt_to_hilo_64((((uint64_t) usage.ru_stime.tv_sec) << 32) + (((uint64_t) usage.ru_stime.tv_usec) << 32) / 1000000); // if we have a connection, also fill in that info if (peer->_conn) { info->_connections++; } // reallocate the send buffer? real_size = sizeof(ntt_send_info)- sizeof(ntt_connection_info)+ sizeof(ntt_connection_info)*info->_connections; if (peer->_send._size != real_size) ntt_realloc_buffer(&peer->_send,real_size); if (peer->_conn) { // seems hard to include linux/in.h without crashing // into netinet/in.h hacky workaround #ifdef __linux__ #define IP_MTU 14 // from linux/in.h #endif #ifdef IP_MTU int mtu; socklen_t optlen; int ret; #endif info->_conn[0]._total_recv = ntt_to_hilo_64(peer->_conn->_total_recv); info->_conn[0]._total_send = ntt_to_hilo_64(peer->_conn->_total_send); info->_conn[0]._status = htonl(peer->_status); #ifdef IP_MTU optlen = sizeof(mtu); ret = getsockopt(peer->_conn->_fd,IPPROTO_IP,IP_MTU,&mtu,&optlen); if (ret != 0) { // silently ignore the socket errors info->_conn[0]._mtu = htonl((uint32_t) -1); } else info->_conn[0]._mtu = htonl(mtu); #else info->_conn[0]._mtu = htonl((uint32_t) -1); #endif } info->_connections = htonl(info->_connections); } peer->_send._offset = 0; peer->_mode = NTT_PEER_MODE_WRITE; // no break, fall through case NTT_PEER_MODE_WRITE: case NTT_PEER_MODE_WRITE_ERROR: FD_SET(peer->_fd,writefds); if (peer->_fd > nfd) nfd = peer->_fd; break; case NTT_PEER_MODE_SHUTDOWN: // Make sure it gets handled immediately, set timeout to 0 *next_time = *now; break; } return nfd; } int ntt_peer_after_select(ntt_peer *peer, fd_set *readfds,fd_set *writefds, struct timeval *now) { assert(peer->_mode != 0); switch (peer->_mode) { case NTT_PEER_MODE_WRITE: case NTT_PEER_MODE_WRITE_ERROR: { int n; ntt_buffer *send_buffer; if (!FD_ISSET(peer->_fd,writefds)) return 1; send_buffer = (peer->_mode == NTT_PEER_MODE_WRITE_ERROR ? &peer->_error : &peer->_send); n = ntt_write_buffer(send_buffer,peer->_fd); if (n < 0) return 0; // failure if (send_buffer->_offset < send_buffer->_size) { // we've not reached end of buffer yet break; } if (peer->_mode == NTT_PEER_MODE_WRITE_ERROR) { ntt_destroy_buffer(&peer->_error); } else { peer->_next_time = *now; peer->_next_time.tv_sec++; // add one second } // so we wait and then write again peer->_mode = NTT_PEER_MODE_PREPARE_WRITE; break; } case NTT_PEER_MODE_SHUTDOWN: INFO("Peer shutdown.\n"); return 0; } return 1; } void ntt_accept_connection(int fd) { ntt_connection *conn; int client_fd; struct sockaddr_in cliAddr; socklen_t cliLen; cliLen = sizeof(cliAddr); client_fd = accept(fd,(struct sockaddr *) &cliAddr,&cliLen); if (client_fd < 0) { if (errno == EINTR) return; // we need to do it again, lets redo the select... if (errno == EAGAIN) return; // false select... // There are many errors of accept that may happen as // consequences of the network (ECONNABORTED, EPERM, EPROTO), // so we only deal with it as a warning WARNING("Accepting client connection failed...\n"); return; } // make the socket non-blocking if (fcntl(client_fd,F_SETFL,fcntl(client_fd,F_GETFL) | O_NONBLOCK) == -1) { perror("fcntl()"); exit(1); } // ok, so we got a connection... bgein by treating it as a data // connection, that will begin by receiving it's orders. if it in // reality is a control connection, it will be changed into that // after orders have been received conn = malloc(sizeof (ntt_connection)); if (!conn) { ERROR("Memory allocation failure (conn).\n"); exit(1); } memset(conn,0,sizeof(ntt_connection)); conn->_fd = client_fd; ntt_create_buffer(&conn->_recv,sizeof(ntt_order)); // Insert the peer into the list conn->_next = _connections; conn->_prev = _connections->_prev; _connections->_prev->_next = conn; _connections->_prev = conn; conn->_recv._offset = 0; conn->_mode = NTT_CONN_MODE_READ_ORDERS; conn->_cli_addr = cliAddr; #ifndef __Lynx__ { char client_dotted[INET_ADDRSTRLEN+1]; inet_ntop(AF_INET,&cliAddr.sin_addr,client_dotted,sizeof(client_dotted)); INFO("Accepted connection [%s]...\n",client_dotted); } #else INFO("Accepted connection...\n"); #endif return; } void safe_close(int fd) { int ret; do_close: ret = close(fd); if (ret == -1) { if (errno == EINTR) goto do_close; perror("close"); return; } } void ntt_server(int port,int daemonize) { struct sockaddr_in servAddr; int srv_socket; struct timeval now, time_had_connection; ntt_connection *conn; ntt_peer *peer; /* Create a dummy connection to make linking easy. */ _connections = (ntt_connection *) malloc (sizeof (ntt_connection)); memset (_connections,0,sizeof (ntt_connection)); _connections->_fd = -1; _connections->_prev = _connections; _connections->_next = _connections; /* Create a dummy peer to make linking easy. */ _peers = (ntt_peer *) malloc (sizeof (ntt_peer)); memset (_peers,0,sizeof (ntt_peer)); _peers->_fd = -1; _peers->_prev = _peers; _peers->_next = _peers; /* Open a socket and listen for connections. */ srv_socket = socket(PF_INET, SOCK_STREAM, 0); if (srv_socket < 0) { perror("socket"); ERROR("Could not open server socket.\n"); exit(1); } servAddr.sin_family = AF_INET; servAddr.sin_addr.s_addr = htonl(INADDR_ANY); servAddr.sin_port = htons(port); if (bind (srv_socket,(struct sockaddr *) &servAddr,sizeof(servAddr)) != 0) { perror("bind"); ERROR("Failure binding server to port %d.\n",port); exit(1); } set_max_window_size(srv_socket); if (listen(srv_socket,3) != 0) { perror("listen"); ERROR("Failure to set server listening on port %d.\n",port); exit(1); } // make the server socket non-blocking, such that we're not hit by // false selects (see Linux man page bug notes) if (fcntl(srv_socket,F_SETFL,fcntl(srv_socket,F_GETFL) | O_NONBLOCK) == -1) { perror("fcntl"); exit(1); } INFO("NTT server running, port %d\n",port); // Now that we know that the port was free (and need not deliver // such error messages), we may deamonize if (daemonize) { int i; int dummy; // ignore some return/error values // Fork of a child (so parent can exit) switch(fork()) { case -1: perror ("fork"); exit(1); default: // parent exit(0); break; case 0: // child break; } // Get rid of controlling terminal setsid(); // Fork of a child again, so we never get a controlling terminal again switch(fork()) { case -1: perror ("fork"); exit(1); default: // parent exit(0); break; case 0: // child break; } // Do not stand around and occupy the directory dummy = chdir("/"); (void) dummy; // Control over file creation (we don't create any) umask(0); // Get rid of old file descriptors, after this, any error // messages will go unseen... // do not close srv_socket :) INFO("Daemonised\n"); for (i = getdtablesize(); i >= 0; --i) if (i != srv_socket) close(i); // open file descriptor for stdin/out/err. as we just closed them, // and descriptors are opened in order, this will be 0, 1, 2... i = open("/dev/null",O_RDWR); dummy = dup(i); dummy = dup(i); } gettimeofday(&now,NULL); time_had_connection = now; // Ok, we're now up, run the server loop. while (!_shutdown) { struct timeval next_time; struct timeval *timeout; int ret; int has_connection; fd_set readfds; fd_set writefds; int nfd = -1; FD_ZERO(&readfds); FD_ZERO(&writefds); // Add the server socket to the listening set FD_SET(srv_socket,&readfds); if (srv_socket > nfd) nfd = srv_socket; // Add all connections to the respective sets timerclear(&next_time); has_connection = 0; for (conn = _connections->_next; conn != _connections; conn = conn->_next) { assert(conn->_fd != -1); //printf ("conn, fd=%d, mode=%d\n",conn->_fd,conn->_mode); nfd = ntt_conn_setup_select(conn,nfd,&readfds,&writefds, &now,&next_time); // only connections that has sent some 'verified' data // count against server shutdown if (conn->_mode >= NTT_CONN_MODE_WAIT_CONNECTED) has_connection = 1; } for (peer = _peers->_next; peer != _peers; peer = peer->_next) { assert(peer->_fd != -1); //printf ("peer, fd=%d, mode=%d\n",peer->_fd,peer->_mode); nfd = ntt_peer_setup_select(peer,nfd,&readfds,&writefds, &now,&next_time); has_connection = 1; } // Do we need to set up a timeout? timeout = NULL; if (timerisset(&next_time)) { next_time.tv_sec -= now.tv_sec; next_time.tv_usec -= now.tv_usec; if (next_time.tv_usec < 0) { next_time.tv_sec--; next_time.tv_usec += 1000000; } timeout = &next_time; } if (has_connection) time_had_connection = now; else { // There are currently no connections or peers. // Measure against (unknown) bugs in networking code: // Shutdown after about 4 hours... assert(timeout == NULL); if (now.tv_sec > time_had_connection.tv_sec + 3600 * 4) { INFO("No valid connections for 4 hours, shutdown.\n"); _shutdown = 1; next_time.tv_sec = 0; } else { next_time.tv_sec = 3600 * 4 - (now.tv_sec - time_had_connection.tv_sec) + 2; } // Wait for the 4 hours, plus 2 seconds, then: boom! next_time.tv_usec = 0; timeout = &next_time; } /* int i; for (i = 0; i <= nfd; i++) { if (FD_ISSET(i,&readfds)) printf ("R%d ",i); if (FD_ISSET(i,&writefds)) printf ("W%d ",i); } if (timeout) printf ("%d:%06d",(int)timeout->tv_sec,(int)timeout->tv_usec); else printf ("-"); fflush(stdout); */ // Then wait for someone needing servicing ret = select(nfd+1,&readfds,&writefds,NULL,timeout); if (ret == -1) { if (errno == EINTR) continue; perror("select"); exit(1); } gettimeofday(&now,NULL); /* printf (" ==> \n"); for (i = 0; i <= nfd; i++) { if (FD_ISSET(i,&readfds)) printf ("R%d ",i); if (FD_ISSET(i,&writefds)) printf ("W%d ",i); } printf ("\n"); */ // Did someone want to connect? if (FD_ISSET(srv_socket,&readfds)) ntt_accept_connection(srv_socket); // Some servicing needed? for (conn = _connections->_next; conn != _connections; ) { int ok; assert(conn->_fd != -1); //printf ("conn: fd=%d, mode=%d\n",conn->_fd,conn->_mode); ok = ntt_conn_after_select(conn,&readfds,&writefds, &now); if (!ok) { void *p; // connection has closed, remove it if (conn->_fd != -1) { safe_close(conn->_fd); INFO("connection closed...\n"); } // unlink it from the list conn->_next->_prev = conn->_prev; conn->_prev->_next = conn->_next; ntt_destroy_buffer(&conn->_recv); ntt_destroy_buffer(&conn->_send); ntt_destroy_buffer(&conn->_send_order); if (conn->_peer) { INFO("kill peer...\n"); // peer will kill itself conn->_peer->_mode = NTT_PEER_MODE_SHUTDOWN; conn->_peer->_conn = NULL; } p = conn; conn = conn->_next; free(p); } else conn = conn->_next; } for (peer = _peers->_next; peer != _peers; ) { int ok; assert(peer->_fd != -1); //printf ("peer: fd=%d, mode=%d\n",peer->_fd,peer->_mode); ok = ntt_peer_after_select(peer,&readfds,&writefds, &now); if (!ok) { void *p; // peer has closed, remove it if (peer->_fd != -1) { safe_close(peer->_fd); INFO("peer closed...\n"); } // unlink it from the list peer->_next->_prev = peer->_prev; peer->_prev->_next = peer->_next; ntt_destroy_buffer(&peer->_send); if (peer->_conn) { INFO("kill connection...\n"); // connection will kill itself peer->_conn->_mode = NTT_CONN_MODE_SHUTDOWN; peer->_conn->_peer = NULL; } p = peer; peer = peer->_next; free(p); } else peer = peer->_next; } } // We've been requested to shut down... // Try to clean up the file descriptors... safe_close (srv_socket); for (conn = _connections->_next; conn != _connections; conn = conn->_next) { if (conn->_fd == -1) continue; safe_close(conn->_fd); } for (peer = _peers->_next; peer != _peers; peer = peer->_next) { if (peer->_fd == -1) continue; safe_close(peer->_fd); } INFO("NTT server terminated.\n"); exit(0); } /******************************************************************** * * Stuff needed for the control part. * */ struct ntt_order_item_t; typedef struct ntt_order_item_t { ntt_order _order; char _server[128]; unsigned int _index; unsigned int _index_other; struct ntt_order_item_t *_next; } ntt_order_item; struct ntt_ctrl_connection_t; typedef struct ntt_ctrl_connection_t { int _fd; // connection to server ntt_send_info _prev_info; struct timeval _prev_stamp; ntt_send_info _info; struct timeval _stamp; ntt_buffer _recv; ntt_error_info _error; ntt_order_item *_order; struct ntt_ctrl_connection_t *_prev; struct ntt_ctrl_connection_t *_next; struct ntt_ctrl_connection_t *_next_server; unsigned int _index; // for sorting unsigned int _serv_list_index[2]; } ntt_ctrl_connection; ntt_ctrl_connection *_ctrl_connections = NULL; ntt_ctrl_connection *_ctrl_servers = NULL; ntt_ctrl_connection **_ctrl_server_list = NULL; int _num_ctrl_servers = 0; ntt_order_item *_client_orders = NULL; ntt_order_item **_last_order = &_client_orders; void byte_count(uint64_t a) { double value = a; if (value < 99.0) printf ("%5.2f B",value); else if (value < 99000.0) printf ("%5.2f kB",value * 0.001); else if (value < 99000000.0) printf ("%5.2f MB",value * 0.000001); else if (value < 99000000000.0) printf ("%5.2f GB",value * 0.000000001); else printf ("%5.2f TB",value * 0.000000000001); } void show_rate(double rate) { if (rate < 99.0) printf ("%5.2f B/s",rate); else if (rate < 99000.0) printf ("%5.2f kB/s",rate * 0.001); else if (rate < 990000.0) printf ("%5.1f kB/s",rate * 0.001); else if (rate < 99000000.0) printf ("%5.2f MB/s",rate * 0.000001); else if (rate < 990000000.0) printf ("%5.1f MB/s",rate * 0.000001); else printf ("%5.2f GB/s",rate * 0.000000001); } void show_tiny_rate(double rate) { if (rate == 0) printf (" "); else if (rate < 99.0) printf (" %5.1f",rate); else if (rate < 99000.0) printf ("%5.1fk",rate * 0.001); else if (rate < 990000.0) printf ("%5.0fk",rate * 0.001); else if (rate < 99000000.0) printf ("%5.1fM",rate * 0.000001); else if (rate < 990000000.0) printf ("%5.0fM",rate * 0.000001); else printf ("%5.1fG",rate * 0.000000001); } double calc_rate(uint64_t a,uint64_t b,double dt) { double diff, rate; if (dt == 0.0) return 0; diff = (double) (a - b); rate = diff / dt; return rate; } void byte_rate(uint64_t a,uint64_t b,double dt) { if (dt == 0) printf (" - "); else show_rate(calc_rate(a,b,dt)); } double fraction_time(uint64_t a,uint64_t b,double dt) { double diff, frac; if (dt == 0.0) { printf (" -"); return 0; } diff = (a - b) * 100.0 / (((uint64_t) 1) << 32); frac = diff / dt; if (frac < 10) printf ("%3.1f%%",frac); else printf ("%3.0f%%",frac); return frac; } int compare_ctrl_index(const void *p1, const void *p2) { int i1 = (*((const ntt_ctrl_connection **) p1))->_index; int i2 = (*((const ntt_ctrl_connection **) p2))->_index; return i1 - i2; } void ntt_control(const char *logfilename,int show_matrix,int show_mtu) { ntt_order_item *order_item; ntt_ctrl_connection **next_server; ntt_ctrl_connection *ctrl; struct timeval now; struct timeval next_time; FILE *logfile = NULL; int i, j; double *server_rates; /* Create a dummy connection to make linking easy. */ _ctrl_connections = (ntt_ctrl_connection *) malloc (sizeof (ntt_ctrl_connection)); memset (_ctrl_connections,0,sizeof (ntt_ctrl_connection)); _ctrl_connections->_fd = -1; _ctrl_connections->_prev = _ctrl_connections; _ctrl_connections->_next = _ctrl_connections; _num_ctrl_servers = 0; // Open logfile if (logfilename) if ((logfile = fopen(logfilename,"w")) == NULL) { perror("fopen"); ERROR("Failed to open logfile: %s\n",logfilename); exit(1); } // Our job is to connect to the receiving servers, and ask them to // start their jobs. We should also connect to all (other) servers // involved, such that we can receive periodical status information // from them. When everything has been set up, go into the receive // loop to get periodic information and display it... // setup all the connections in the client setup phase we do not // care about non-blocking, just get it done as much time as it // wants order_item = _client_orders; next_server = &_ctrl_servers; for (order_item = _client_orders; order_item; order_item = order_item->_next) { struct sockaddr_in serv_addr; int fd; int ret; int offset; ntt_ctrl_connection *srv; if (!ntt_get_host_port(order_item->_server,&serv_addr)) exit(1); // Now, fix up the connection /* socket creation */ fd = socket(PF_INET,SOCK_STREAM,IPPROTO_TCP); if (fd < 0) { perror("socket"); ERROR("Cannot open socket.\n"); exit(1); } do_connect: ret = connect(fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)); if (ret < 0) { if (errno == EINTR) goto do_connect; perror("connect"); ERROR("Failure connecting to server '%s'.\n", order_item->_server); exit(1); } INFO("Connected...\n"); // Then send the orders offset = 0; while (offset < sizeof(order_item->_order)) { int n = write(fd,(char*) (&order_item->_order)+offset, sizeof(order_item->_order) - offset); if (n < 0) { if (errno == EINTR) continue; perror("write"); ERROR("Could not write order to server '%s'.\n", order_item->_server); exit(1); } offset += n; } if (order_item->_order._request == htonl(NTT_ORDER_REQUEST_KILL_SERVER) || order_item->_order._request == htonl(NTT_ORDER_REQUEST_PURGE_CLIENTS)) { // there will be no further communication safe_close(fd); continue; } // make the server socket non-blocking, such that we're not hit by // false selects (see Linux man page bug notes) if (fcntl(fd,F_SETFL,fcntl(fd,F_GETFL) | O_NONBLOCK) == -1) { perror("fcntl"); ERROR("Failed to make socket non-blocking.\n"); exit(1); } ctrl = (ntt_ctrl_connection*) malloc (sizeof(ntt_ctrl_connection)); memset(ctrl,0,sizeof(ntt_ctrl_connection)); ctrl->_fd = fd; ctrl->_order = order_item; // each message is at least 3 uint32_t long ntt_create_buffer(&ctrl->_recv,sizeof(uint32_t) * 3); ctrl->_stamp.tv_sec = 0; ctrl->_prev_stamp.tv_sec = 0; ctrl->_index = (unsigned int) -1; ctrl->_next = _ctrl_connections; ctrl->_prev = _ctrl_connections->_prev; _ctrl_connections->_prev->_next = ctrl; _ctrl_connections->_prev = ctrl; // is the server known in the server list? if not, add it for (srv = _ctrl_servers; srv; srv = srv->_next_server) if (strcmp(ctrl->_order->_server,srv->_order->_server) == 0) goto server_known; // server not known, use this connection to get info about it *next_server = ctrl; next_server = &(ctrl->_next_server); srv = ctrl; _num_ctrl_servers++; server_known: if (order_item->_index < srv->_index) srv->_index = order_item->_index; } if (_ctrl_connections->_next == _ctrl_connections) { WARNING("No connections, exit.\n"); exit(0); } // Prepare a string of all connections and also the machines // to be presented in the logfile _ctrl_server_list = (ntt_ctrl_connection **) malloc(_num_ctrl_servers * sizeof(ntt_ctrl_connection *)); for (ctrl = _ctrl_servers, i = 0; ctrl; ctrl = ctrl->_next_server, i++) _ctrl_server_list[i] = ctrl; qsort(_ctrl_server_list,_num_ctrl_servers,sizeof(ntt_ctrl_connection *), compare_ctrl_index); server_rates = (double *) malloc(_num_ctrl_servers * _num_ctrl_servers * sizeof(double)); for (ctrl = _ctrl_connections->_next; ctrl != _ctrl_connections; ctrl = ctrl->_next) for (i = 0; i < _num_ctrl_servers; i++) { ntt_ctrl_connection *srv = _ctrl_server_list[i]; if (strcmp(ctrl->_order->_server,srv->_order->_server) == 0) ctrl->_serv_list_index[0] = i; if (strcmp(ctrl->_order->_order._server,srv->_order->_server) == 0) ctrl->_serv_list_index[1] = i; } gettimeofday(&now,NULL); next_time = now; while (!_shutdown) { struct timeval timeout; int ret; fd_set readfds; fd_set writefds; int nfd = -1; FD_ZERO(&readfds); FD_ZERO(&writefds); for (ctrl = _ctrl_connections->_next; ctrl != _ctrl_connections; ctrl = ctrl->_next) { assert(ctrl->_fd != -1); //printf ("ctrl, fd=%d\n",ctrl->_fd); FD_SET(ctrl->_fd,&readfds); if (ctrl->_fd > nfd) nfd = ctrl->_fd; } // Calculate timeout if (timercmp(&next_time,&now,<)) { timeout.tv_sec = 0; timeout.tv_usec = 0; } else { timeout.tv_sec = next_time.tv_sec - now.tv_sec; timeout.tv_usec = next_time.tv_usec - now.tv_usec; if (timeout.tv_usec < 0) { timeout.tv_sec--; timeout.tv_usec += 1000000; } } /* int i; for (i = 0; i <= nfd; i++) { if (FD_ISSET(i,&readfds)) printf ("R%d ",i); if (FD_ISSET(i,&writefds)) printf ("W%d ",i); } printf ("%d:%06d",(int)timeout.tv_sec,(int)timeout.tv_usec); fflush(stdout); */ // Then wait for someone needing servicing ret = select(nfd+1,&readfds,&writefds,NULL,&timeout); if (ret == -1) { if (errno == EINTR) continue; perror("select"); exit(1); } gettimeofday(&now,NULL); /* printf (" ==> \n"); for (i = 0; i <= nfd; i++) { if (FD_ISSET(i,&readfds)) printf ("R%d ",i); if (FD_ISSET(i,&writefds)) printf ("W%d ",i); } printf ("\n"); */ // Some servicing needed? for (ctrl = _ctrl_connections->_next; ctrl != _ctrl_connections; ctrl = ctrl->_next) { assert(ctrl->_fd != -1); //printf ("ctrl: fd=%d\n",ctrl->_fd); if (FD_ISSET(ctrl->_fd,&readfds)) { ntt_send_info *info; int magic; size_t real_size; // do some reading... int n = ntt_read_buffer(&ctrl->_recv,ctrl->_fd); if (n < 0) { WARNING("Connection died... (%s %s %s)\n", ctrl->_order->_order._server, ctrl->_order->_order._reverse ? "<-" : "->", ctrl->_order->_server); goto do_shutdown; } info = (ntt_send_info *) ctrl->_recv._buf; magic = ntohl(info->_magic); real_size = 0; if (ctrl->_recv._offset >= 2 * sizeof (uint32_t)) { switch (magic) { default: WARNING("Bad magic... (%s)\n",ctrl->_order->_server); goto do_shutdown; case NTT_SEND_INFO_MAGIC: { // if we've read more than 4 bytes, then reallocate // buffer in case it's size does not match with the // number of connections that get their info from here int nconn = ntohl(info->_connections); if (nconn > 1) { WARNING("Bogus number of connections (%d) (%s)...\n", nconn,ctrl->_order->_server); goto do_shutdown; } real_size = sizeof(ntt_send_info)- sizeof(ntt_connection_info)+ sizeof(ntt_connection_info)*nconn; break; } case NTT_ERROR_INFO_MAGIC: real_size = sizeof(ntt_error_info); break; } if (ctrl->_recv._size != real_size) { assert (ctrl->_recv._offset <= real_size); ntt_realloc_buffer(&ctrl->_recv,real_size); } } if (ctrl->_recv._offset >= ctrl->_recv._size) { // We just got the full info. Make a copy, and reset // Hack, for the time being we know that connections // cannot be more than 1 switch (magic) { case NTT_SEND_INFO_MAGIC: assert(ctrl->_recv._size <= sizeof(ctrl->_info)); memcpy(&ctrl->_info,info,ctrl->_recv._size); ctrl->_stamp = now; break; case NTT_ERROR_INFO_MAGIC: { ntt_error_info error; memcpy(&error,info,sizeof(ntt_error_info)); printf ("-------------------------------------------------------------------------------\n"); printf ("Transmission error:\n"); printf ("%-15s %s %-15s: ", ctrl->_order->_order._server, ctrl->_order->_order._reverse ? "<-" : "->", ctrl->_order->_server); print_error(&error); printf ("\n"); ctrl->_error = error; break; } } // reset message size, in case we get something // smaller next time. reallocation after checking // magic is not enough, as that could loose part of // the next-next message... ntt_realloc_buffer(&ctrl->_recv,sizeof(uint32_t) * 3); ctrl->_recv._offset = 0; } } } // Have we passed the time for printing the current status? if (timercmp(&now,&next_time,>)) { //printf ("Status...\n"); static int display_counter = 0; if (logfile && (display_counter++ % 20) == 2) { for (ctrl = _ctrl_connections->_next; ctrl != _ctrl_connections; ctrl = ctrl->_next) { int nconn = ntohl(ctrl->_info._connections); if (nconn < 1) continue; fprintf (logfile,"#%-9.9s", ctrl->_order->_order._reverse ? ctrl->_order->_server : ctrl->_order->_order._server); } fprintf (logfile,"\n"); for (ctrl = _ctrl_connections->_next; ctrl != _ctrl_connections; ctrl = ctrl->_next) { int nconn = ntohl(ctrl->_info._connections); if (nconn < 1) continue; fprintf (logfile,"#%9.9s", ctrl->_order->_order._reverse ? ctrl->_order->_order._server : ctrl->_order->_server); } fprintf (logfile,"\n"); } printf ("===============================================================================\n"); memset(server_rates,0, _num_ctrl_servers * _num_ctrl_servers * sizeof(double)); for (ctrl = _ctrl_connections->_next; ctrl != _ctrl_connections; ctrl = ctrl->_next) { int nconn = ntohl(ctrl->_info._connections); double dt; double rate1, rate2; if (nconn < 1) { if (ntohl(ctrl->_error._magic) == NTT_ERROR_INFO_MAGIC) { printf ("%-16.16s %s %-16.16s: ", ctrl->_order->_order._server, ctrl->_order->_order._reverse ? "<-" : "->", ctrl->_order->_server); print_error(&ctrl->_error); printf ("\n"); } continue; } if (!show_matrix) printf ("%-16.16s %s %-16.16s ", ctrl->_order->_order._server, ctrl->_order->_order._reverse ? "<-" : "->", ctrl->_order->_server); if (ctrl->_stamp.tv_sec == 0 || ctrl->_prev_stamp.tv_sec == 0) { if (!show_matrix) printf ("...\n"); if (logfile && display_counter > 2) fprintf(logfile," "); continue; } dt = (ctrl->_stamp.tv_sec - ctrl->_prev_stamp.tv_sec) + (ctrl->_stamp.tv_usec - ctrl->_prev_stamp.tv_usec) * 0.000001; rate1 = calc_rate(ntt_from_hilo_64(ctrl->_info._conn[0]._total_recv), ntt_from_hilo_64(ctrl->_prev_info._conn[0]._total_recv),dt); *(server_rates+ctrl->_serv_list_index[0]+ _num_ctrl_servers*ctrl->_serv_list_index[1]) += rate1; if (!show_matrix) { if (show_mtu) printf ("%5dmtu",ntohl(ctrl->_info._conn[0]._mtu)); else byte_count(ntt_from_hilo_64(ctrl->_info._conn[0]._total_recv)); printf (" "); if (dt == 0) printf (" - "); else show_rate(rate1); } rate2 = 0; if (ntt_from_hilo_64(ctrl->_info._conn[0]._total_send)) { rate2 = calc_rate(ntt_from_hilo_64(ctrl->_info._conn[0]._total_send), ntt_from_hilo_64(ctrl->_prev_info._conn[0]._total_send),dt); *(server_rates+ctrl->_serv_list_index[1]+ _num_ctrl_servers*ctrl->_serv_list_index[0]) += rate2; if (!show_matrix) { printf (" | "); byte_count(ntt_from_hilo_64(ctrl->_info._conn[0]._total_send)); printf (" "); if (dt == 0) printf (" - "); else show_rate(rate2); } } if (logfile && display_counter > 2) fprintf(logfile," %#9.3g", ctrl->_order->_order._reverse ? rate2 : rate1); if (!show_matrix) printf ("\n"); } // go through the servers if (!show_matrix) { printf ("\n"); for (i = 0; i < _num_ctrl_servers; i++) { double dt; ctrl = _ctrl_server_list[i]; printf ("%-27.27s tx(%2d) ", ctrl->_order->_server, (int) ntohl(ctrl->_info._server._conn_send)); if (ctrl->_stamp.tv_sec == 0 || ctrl->_prev_stamp.tv_sec == 0) { printf ("... rx(%2d)\n", (int) ntohl(ctrl->_info._server._conn_recv)); continue; } dt = (ctrl->_stamp.tv_sec - ctrl->_prev_stamp.tv_sec) + (ctrl->_stamp.tv_usec - ctrl->_prev_stamp.tv_usec) * 0.000001; byte_rate(ntt_from_hilo_64(ctrl->_info._server._total_send), ntt_from_hilo_64(ctrl->_prev_info._server._total_send), dt); printf (" rx(%2d) ", (int) ntohl(ctrl->_info._server._conn_recv)); byte_rate(ntt_from_hilo_64(ctrl->_info._server._total_recv), ntt_from_hilo_64(ctrl->_prev_info._server._total_recv), dt); printf (" CPU:"); fraction_time(ntt_from_hilo_64(ctrl->_info._server._utime_s), ntt_from_hilo_64(ctrl->_prev_info._server._utime_s), dt); printf ("+"); fraction_time(ntt_from_hilo_64(ctrl->_info._server._stime_s), ntt_from_hilo_64(ctrl->_prev_info._server._stime_s), dt); printf ("\n"); } } else { printf (" "); for (i = 0; i < _num_ctrl_servers; i += 2) { ctrl = _ctrl_server_list[i]; printf ("%11.11s ", ctrl->_order->_server); } printf ("\nRates (/s) "); for (i = 1; i < _num_ctrl_servers; i += 2) { ctrl = _ctrl_server_list[i]; printf ("%11.11s ", ctrl->_order->_server); } printf ("\n cpu"); for (i = 0; i < _num_ctrl_servers; i++) { double dt; ctrl = _ctrl_server_list[i]; dt = (ctrl->_stamp.tv_sec - ctrl->_prev_stamp.tv_sec) + (ctrl->_stamp.tv_usec - ctrl->_prev_stamp.tv_usec) * 0.000001; printf (" "); fraction_time(ntt_from_hilo_64(ctrl->_info._server._utime_s)+ntt_from_hilo_64(ctrl->_info._server._stime_s), ntt_from_hilo_64(ctrl->_prev_info._server._utime_s)+ntt_from_hilo_64(ctrl->_prev_info._server._stime_s), dt); } printf ("\n rx\\tx"); for (i = 0; i < _num_ctrl_servers; i++) { double dt; ctrl = _ctrl_server_list[i]; dt = (ctrl->_stamp.tv_sec - ctrl->_prev_stamp.tv_sec) + (ctrl->_stamp.tv_usec - ctrl->_prev_stamp.tv_usec) * 0.000001; show_tiny_rate(calc_rate(ntt_from_hilo_64(ctrl->_info._server._total_send), ntt_from_hilo_64(ctrl->_prev_info._server._total_send), dt)); } printf ("\n"); for (i = 0; i < _num_ctrl_servers; i++) { double dt; ctrl = _ctrl_server_list[i]; dt = (ctrl->_stamp.tv_sec - ctrl->_prev_stamp.tv_sec) + (ctrl->_stamp.tv_usec - ctrl->_prev_stamp.tv_usec) * 0.000001; printf ("%-10.10s", ctrl->_order->_server); show_tiny_rate(calc_rate(ntt_from_hilo_64(ctrl->_info._server._total_recv), ntt_from_hilo_64(ctrl->_prev_info._server._total_recv), dt)); printf (" "); for (j = 0; j < _num_ctrl_servers; j++) show_tiny_rate(server_rates[i+j*_num_ctrl_servers]); printf ("\n"); } } if (logfile && display_counter > 2) { fprintf(logfile,"\n"); fflush(logfile); } // age the info for (ctrl = _ctrl_connections->_next; ctrl != _ctrl_connections; ctrl = ctrl->_next) { ctrl->_prev_info = ctrl->_info; ctrl->_prev_stamp = ctrl->_stamp; } next_time = now; next_time.tv_sec += 5; } } do_shutdown: ; // Shut down the connections for (ctrl = _ctrl_connections->_next; ctrl != _ctrl_connections; ctrl = ctrl->_next) { if (ctrl->_fd == -1) continue; safe_close(ctrl->_fd); } if (logfile) fclose(logfile); } /******************************************************************** * * Parse command line options. * */ void usage(char *cmdname) { printf ("Network torture tool\n\n"); printf ("%s options\n\n",cmdname); printf (" --server[=PORT] Start NTT server (shutdown after 4h without connection).\n"); printf (" --daemon Daemonize the server.\n"); printf ("\n"); printf (" --buffer=N Set buffer size (for following connections = ffc).\n"); printf (" --limit=N Limit transmit bandwidth usage (ffc). (0 = no limit)\n"); printf (" --respond=N Wait for N response bytes after each buffer sent (ffc).\n"); printf (" --pattern=PTN Transmission test pattern,\n"); printf (" PTN=none,zero,scanbuf,scan,rnd,rndbuf.\n"); printf ("\n"); printf (" HOST1[:P],HOST2[:P] Create data connection from HOST1 to HOST2.\n"); printf (" R:HOST1[:P],H2[:P] Create reversed data connection to HOST1 from HOST2.\n"); printf (" HOST[:PORT] Monitor NTT server on HOST.\n"); printf ("\n"); printf (" --set:N=H1,H2,... Define a set of hosts, with tag N.\n"); printf (" --connset=[R:]M,N Create connections from all M hosts to all N hosts.\n"); printf (" --matrix Show connections as a matrix of machines.\n"); printf (" --mtu Show discovered MTU for connections (where available).\n"); printf (" --logfile[=FILE] Write a log of the connection statistics.\n"); printf ("\n"); printf (" Note: network names HOST1 are looked up from / relative to HOST2.\n"); printf ("\n"); printf (" --rnd Dump random bitstream. (NOTE: it is a bad PRNG!)\n"); printf (" --purge=HOST[:PORT] Close all client connections on HOST.\n"); printf (" --kill=HOST[:PORT] Shut the server on HOST down.\n"); printf ("\n"); } void sigint_handler(int sig) { _shutdown++; if (_shutdown > 1) { INFO("Get many SIGINT requests, next will go through"); signal(SIGINT,SIG_DFL); } else { INFO("Termination request, setting shutdown...\n"); } } uint64_t atoi_suffix(const char *post) { char *size_end; uint64_t value = strtol(post,&size_end,10); if (strcmp(size_end,"k") == 0) value *= 1000; else if (strcmp(size_end,"M") == 0) value *= 1000000; else if (strcmp(size_end,"G") == 0) value *= 1000000000; else if (strcmp(size_end,"ki") == 0) value <<= 10; else if (strcmp(size_end,"Mi") == 0) value <<= 20; else if (strcmp(size_end,"Gi") == 0) value <<= 30; else if (*size_end != 0) { ERROR("Malformed value (should be number[k|M|G|ki|Mi|Gi]): %s",post); exit(1); } return value; } ntt_order_item *create_order(const char *server,uint32_t request) { ntt_order_item *order_item; order_item = (ntt_order_item*) malloc (sizeof(ntt_order_item)); memset(order_item,0,sizeof(ntt_order_item)); order_item->_order._magic = htonl(NTT_ORDER_MAGIC); order_item->_order._request = htonl(request); strncpy(order_item->_server,server,sizeof(order_item->_server)); order_item->_server[sizeof(order_item->_server)-1] = 0; *_last_order = order_item; _last_order = &((*_last_order)->_next); return order_item; } typedef struct ntt_set_host_t { const char *_host; struct ntt_set_host_t *_next; } ntt_set_host; typedef struct ntt_set_t { const char *_set; struct ntt_set_host_t *_hosts; struct ntt_set_t *_next; } ntt_set; #if 0 char *strndup(const char *src,size_t length) { // We wast memory in case the string actually is shorter... char *dest = (char *) malloc(length+1); strncpy(dest,src,length); dest[length]='\0'; // since strncpy would not handle this return dest; } #endif int main(int argc,char *argv[]) { int i; ntt_order_item *chk; ntt_set *sets = NULL; ntt_set **add_set = &sets; int server_port = 0; int daemonize = 0; uint32_t pattern = NTT_CONN_PATTERN_SCAN_TOTAL; uint32_t limit_bytes_per_second = 0; uint32_t buffer_size = 16*1024; uint32_t reply_buffer_size = 0; uint32_t fault_location = 0; int next_index = 1; const char *logfilename = NULL; int show_matrix = 0; int show_mtu = 0; if(argc == 1) { usage(argv[0]); exit(0); } for (i = 1; i < argc; i++) { char *post; #define MATCH_PREFIX(prefix,post) (strncmp(argv[i],prefix,strlen(prefix)) == 0 && *(post = argv[i] + strlen(prefix)) != '\0') #define MATCH_ARG(name) (strcmp(argv[i],name) == 0) if (MATCH_ARG("--help")) { usage(argv[0]); exit(0); } else if (MATCH_ARG("--rnd")) { unsigned long next = 0; unsigned char r[4096]; for ( ; ; ) { int i; for (i = 0; i < 4096; i++) r[i] = myrand(&next); i = write(1,r,4096); // ugly, don't care about errors... } exit(0); } else if (MATCH_ARG("--server")) { server_port = NTT_DEFAULT_PORT; } else if (MATCH_PREFIX("--server=",post)) { server_port = atoi(post); } else if (MATCH_ARG("--daemon")) { daemonize = 1; } else if (MATCH_ARG("--matrix")) { show_matrix = 1; } else if (MATCH_ARG("--mtu")) { show_mtu = 1; } else if (MATCH_ARG("--logfile")) { char tmp[64]; sprintf (tmp,"ntt_%d.log",(int) getpid()); logfilename = strdup(tmp); } else if (MATCH_PREFIX("--logfile=",post)) { logfilename = post; } else if (MATCH_PREFIX("--buffer=",post)) { buffer_size = atoi_suffix(post); } else if (MATCH_PREFIX("--respond=",post)) { reply_buffer_size = atoi_suffix(post); } else if (MATCH_PREFIX("--limit=",post)) { limit_bytes_per_second = atoi_suffix(post); } else if (MATCH_PREFIX("--pattern=",post)) { if (strcmp(post,"none") == 0) { pattern = NTT_CONN_PATTERN_NONE; } else if (strcmp(post,"zero") == 0) { pattern = NTT_CONN_PATTERN_ZERO; } else if (strcmp(post,"scanbuf") == 0) { pattern = NTT_CONN_PATTERN_SCAN_BUF; } else if (strcmp(post,"scan") == 0) { pattern = NTT_CONN_PATTERN_SCAN_TOTAL; } else if (strcmp(post,"rnd") == 0) { pattern = NTT_CONN_PATTERN_RANDOM; } else if (strcmp(post,"rndbuf") == 0) { pattern = NTT_CONN_PATTERN_RANDOM_BUF; } else if (strncmp(post,"fault=",6) == 0) { pattern = NTT_CONN_PATTERN_FAULT; fault_location = atoi(post+6); } else { ERROR("Unknown pattern: %s.\n",post); exit(1); } } else if (MATCH_PREFIX("--purge=",post)) { create_order(post,NTT_ORDER_REQUEST_PURGE_CLIENTS); } else if (MATCH_PREFIX("--kill=",post)) { create_order(post,NTT_ORDER_REQUEST_KILL_SERVER); } else if (MATCH_PREFIX("--set:",post)) { char *equals = strchr(argv[i],'='); ntt_set *set; ntt_set_host **add_host; ntt_set_host *host; char *start; char *comma; if (!equals) { ERROR("Bad set defintion: %s.\n",post); exit(1); } set = (ntt_set*) malloc(sizeof(ntt_set)); set->_set = strndup(post,equals-post); set->_hosts = NULL; set->_next = NULL; *add_set = set; add_set = &set->_next; add_host = &set->_hosts; start = equals+1; while ((comma = strchr(start,',')) != NULL) { host = (ntt_set_host*) malloc(sizeof(ntt_set)); host->_host = strndup(start,comma-(start)); host->_next = NULL; *add_host = host; add_host = &host->_next; start = comma+1; } host = (ntt_set_host*) malloc(sizeof(ntt_set)); host->_host = strdup(start); host->_next = NULL; *add_host = host; } else if (MATCH_PREFIX("--connset=",post)) { char *comma; char *start; int reverse = 0; ntt_set *set1, *set2; ntt_set_host *host1, *host2; char *name1; char *name2; int ind, ind_other; int n1, n2, o1, o2; comma = strchr(argv[i],','); // With no comma, treat it as request to connect all to all, // except self-connection if (strncmp(post,"R:",2) == 0) { reverse = 1; start = post + 2; } else start = post; if (comma) { name1 = strndup(start,comma-start); name2 = comma+1; } else name1 = name2 = start; for (set1 = sets; set1; set1 = set1->_next) if (strcmp(set1->_set,name1) == 0) break; if (!set1) { ERROR("Unknown set: %s.\n",name1); exit(1); } if (comma) free(name1); for (set2 = sets; set2; set2 = set2->_next) if (strcmp(set2->_set,name2) == 0) break; if (!set2) { ERROR("Unknown set: %s.\n",comma+1); exit(1); } for (host1 = set1->_hosts, n1 = 0; host1; host1 = host1->_next, n1++) ; for (host2 = set2->_hosts, n2 = 0; host2; host2 = host2->_next, n2++) ; if (reverse) { ind = next_index; next_index += n2; ind_other = next_index | 0x10000000; next_index += n1; } else { ind_other = next_index; next_index += n1; ind = next_index | 0x10000000; next_index += n2; } for (host1 = set1->_hosts, o1 = 0; host1; host1 = host1->_next, o1++) for (host2 = set2->_hosts, o2 = 0; host2; host2 = host2->_next, o2++) { ntt_order_item *order_item; if (!comma && host1 == host2) continue; // no self connection // create order with the name of the destination host order_item = create_order(host2->_host,NTT_ORDER_REQUEST_RECEIVE); order_item->_order._reverse = htonl(reverse); order_item->_index = ind + o2; order_item->_index_other = ind_other + o1; strncpy(order_item->_order._server,host1->_host,sizeof(order_item->_order._server)); order_item->_order._server[sizeof(order_item->_order._server)-1] = 0; // the parameters of the order order_item->_order._pattern = htonl(pattern); order_item->_order._limit_bytes_per_second = htonl(limit_bytes_per_second); order_item->_order._buffer_size = htonl(buffer_size); order_item->_order._reply_buffer_size = htonl(reply_buffer_size); order_item->_order._fault_location = htonl(fault_location); } } else { char *comma; // if it is an option with a comma, it's a request for data // transmission from FIRST to SECOND, in FIRST,SECOND // if it has no comma, then it's a request to query that // server for periodic information comma = strchr(argv[i],','); if (comma) { int src_len; ntt_order_item *order_item; const char *start; int reverse = 0; if (strncmp(argv[i],"R:",2) == 0) reverse = 1; // create order with the name of the destination host order_item = create_order(comma+1,NTT_ORDER_REQUEST_RECEIVE); if (reverse) { order_item->_order._reverse = htonl(1); start = argv[i] + 2; order_item->_index = next_index++; order_item->_index_other = next_index++ | 0x10000000; } else { start = argv[i]; order_item->_index_other = next_index++; order_item->_index = next_index++ | 0x10000000; } // copy the name of the source host src_len = comma - start; if (src_len > sizeof(order_item->_order._server)) src_len = sizeof(order_item->_order._server); strncpy(order_item->_order._server,start,src_len); order_item->_order._server[sizeof(order_item->_order._server)-1] = 0; // the parameters of the order order_item->_order._pattern = htonl(pattern); order_item->_order._limit_bytes_per_second = htonl(limit_bytes_per_second); order_item->_order._buffer_size = htonl(buffer_size); order_item->_order._reply_buffer_size = htonl(reply_buffer_size); order_item->_order._fault_location = htonl(fault_location); } else { create_order(argv[i],NTT_ORDER_REQUEST_SERVER_INFO); } } } // make sure we have direct connections also to all sending machines, // such that we'll get statistics for them too... for (chk = _client_orders; chk; chk = chk->_next) { // is it a request for setting a connection up? if (chk->_order._request == htonl(NTT_ORDER_REQUEST_RECEIVE)) { const char *send_server = chk->_order._server; // is the server known in the server list? if not, add it ntt_order_item *srv; for (srv = _client_orders; srv; srv = srv->_next) if (strcmp(send_server,srv->_server) == 0) goto send_server_known; // sending server not on the order list. create a new order // for that srv = create_order(send_server,NTT_ORDER_REQUEST_SERVER_INFO); srv->_index = (unsigned int) -1; send_server_known: if (chk->_index_other < srv->_index) srv->_index = chk->_index_other; ; } } signal(SIGINT,sigint_handler); signal(SIGPIPE,SIG_IGN); if (server_port) { srand(time(NULL)); ntt_server(server_port,daemonize); return 0; } ntt_control(logfilename,show_matrix,show_mtu); return 0; }