Еще один пример пульсации
Использованная в предыдущем примере модель не совсем пригодна в ситуации, когда одна сторона посылает другой поток данных, не разбитый на сообщения. Проблема в том, что посланный пульс оказывается частью потока, поэтому его идется явно выискивать и, возможно, даже экранировать (совет 6). Чтобы избежать сложностей, следует воспользоваться другим подходом.
Идея в том, чтобы использовать для контрольных пульсов отдельное соединение. На первый взгляд, кажется странной возможность контролировать одно соединение с помощью другого. Но помните, что делается попытка обнаружить крах оста на другом конце или разрыв в сети. Если это случится, то пострадают оба соединения. Задачу можно решить несколькими способами. Традиционный способ - создать отдельный поток выполнения (thread) для управления пульсацией. Можно также применить универсальный механизм отсчета времени, который разработан в совете 20. Однако, чтобы не вдаваться в различия между API потоков на платформе Win32 и библиотекой PThreads в UNIX, модифицируем написанный для предыдущего примера код с использованием системного вызова select.
Новые версии клиента и сервера очень похожи на исходные. Основное различие состоит в логике работы select, который теперь должен следить за двумя сокетами, а также в дополнительном коде для инициализации еще одного соединения. После соединения клиента с сервером, клиент посылает ему номер порта, по которому отслеживается пульсация сервера. Это напоминает то, что делает FТР-сервер, устанавливая соединение для обмена данными с клиентом.
Примечание: Может возникнуть проблема, если для преобразования частных сетевых адресов в открытые используется механизм NAT (совет 3). В отличие от ситуации с FTP программное обеспечение NAT не имеет информации, что нужно подменить указанный номер порта преобразованным. В таком случае самый простой путь - выделить приложению второй хорошо известный порт.
Начнем с логики инициализации и установления соединения на стороне клиента (листинг 2.26).
Листинг 2.26. Код инициализации и установления соединения на стороне клиента
1 #include "etcp.h"
2 #include "heartbeat.h"
3 int main( int argc, char **argv )
4 {
5 fd_set allfd;
6 fd_set readfd;
7 char msg[ 1024 ];
8 struct tirneval tv;
9 struct sockaddr_in hblisten;
10 SOCKET sdata;
11 SOCKET shb;
12 SOCKET slisten;
13 int rc;
14 int hblistenlen = sizeof( hblisten );
15 int heartbeats = 0;
16 int maxfdl;
17 char hbmsg[ 1 ];
18 INIT();
19 slisten = tcp_server( NULL, "0" ) ;
20 rc = getsockname( slisten, ( struct sockaddr * )&hblisten,
21 &hblistenlen );
23 error( 1, errno, "ошибка вызова getsockname" );
24 sdata = tcp_client( argv[ 1 ], argv[ 2 ] );
25 rc = send( sdata, ( char * ) &hblisten. sin__port,
26 sizeof( hblisten.sin_port ), 0 ) ;
27 if ( rc < 0 )
28 error( 1, errno, "ошибка при посылке номера порта");
29 shb = accept( slisten, NULL, NULL );
30 if ( !isvalidsock( shb ) )
31 error( 1, errno, "ошибка вызова accept" );
32 FD_ZERO( &allfd ) ;
33 FD_SET( sdata, &allfd );
34 FD_SET( shb, &allfd ) ;
35 maxfdl = ( sdata > shb ? sdata: shb ) + 1;
36 tv.tv_sec = Tl;
37 tv.tv_usec = 0;
Инициализация и соединение
19-23 Вызываем функцию tcp_server с номером порта 0, таким образом заставляя ядро выделить эфемерный порт (совет 18). Затем вызываем getsockname, чтобы узнать номер этого порта. Это делается потому, что с данным сервером ассоциирован только один хорошо известный порт.
24-28 Соединяемся с сервером и посылаем ему номер порта, с которым он должен установить соединение для посылки сообщений-пульсов.
29-31 Вызов accept блокирует программу до тех пор, пока сервер не установит соединение для пульсации. В промышленной программе, наверное, стоило бы для этого вызова взвести таймер, чтобы программа не «зависла», если сервер не установит соединения. Можно также проверить, что соединение для пульсации определил именно тот сервер, который запрашивался в строке 24.
32- 37 Инициализируем маски для select и взводим таймер.
Оставшийся код клиента показан в листинге 2.27. Здесь вы видите обработку содержательных сообщений и контрольных пульсов.
Листинг 2.27. Обработка сообщений клиентом
38 for ( ;; )
39 {
40 readfd = allfd;
41 rc = select( maxfdl, &readfd, NULL, NULL, &tv );
42 if ( rc < 0 )
43 error( 1, errno, "ошибка вызова select" );
44 if ( rc == 0 ) /* Произошел тайм-аут. */
45 {
46 if ( ++heartbeats > 3 )
47 error( 1, 0, "соединения нет\n" );
4g error( 0, 0, "посылаю пульс #%d\n", heartbeats );
49 rc = send( shb, "", 1, 0 ) ;
50 if ( rc < 0 )
51 error( 1, errno, "ошибка вызова send" );
52 tv.tv_sec = T2;
53 continue;
54 }
55 if ( FD_ISSET( shb, &readfd ) )
56 {
57 rc = recv( shb, hbmsg, 1, 0 );
58 if ( rc == 0 )
59 error( 1, 0, "сервер закончил работу (shb)\n" );
60 if ( rc < 0 )
61 error( 1, errno, "ошибка вызова recv для сокета shb");
62 }
63 if ( FD_ISSET( sdata, &readfd ) )
64 {
65 rc = recv( sdata, msg, sizeof( msg ), 0 );
66 if ( rc == 0 )
67 error( 1, 0, "сервер закончил работу (sdata)\n" );
68 if ( rc < 0 )
69 error( 1, errno, "ошибка вызова recv" );
70 /* Обработка данных. */
71 }
72 heartbeats = 0;
73 tv.tv_sec = T1;
74 }
75 }
Обработка данных и пульсов
40-43 Вызываем функцию select и проверяем код возврата.
44-54 Таймаут обрабатывается так же, как в листинге 2.24, только пульсы посылаются через сокет shb.
55-62 Если через сокет shb пришли данные, читаем их, но ничего не делаем.
63-71 Если данные пришли через сокет sdata, читаем столько, сколько сможем, и обрабатываем. Обратите внимание, что теперь производится работа не с сообщениями фиксированной длины. Поэтому читается не больше, чем помещается в буфер. Если данных меньше длины буфера, вызов recv вернет все, что есть, но не заблокирует программу. Если данных больше, то из сокета еще можно читать. Поэтому следующий вызов select немедленно вернет управление, и можно будет обработать очередную порцию данных.
72- 73 Поскольку только что пришло сообщение от сервера, сбрасываем переменную heartbeats в 0 и снова взводим таймер.
И в заключение рассмотрим код сервера для этого примера (листинг 2.28) Как и код клиента, он почти совпадает с исходным сервером (листинг 2.25) за тем и исключением, что устанавливает два соединения и работает с двумя сокетами.
Листинг 2.28. Код инициализации и установления соединения на стороне сервер^!
hb_server2.c
1 #include "etcp.h"
2 #include "heartbeat.h"
3 int main( int argc, char **argv )
4 {
5 fd_set allfd;
6 fd_set readfd;
7 char msg[ 1024 ];
8 struct sockaddr_in peer;
9 struct timeval tv;
10 SOCKET s;
11 SOCKET sdata;
12 SOCKET shb;
13 int rc
14 int maxfdl;
15 int missed_heartbeats = 0;
16 int peerlen = sizeof( peer);
17 char hbmsg[ 1 ];
18 INIT ();
19 s = tcp_server( NULL, argv[ 1 ] );
20 sdata = accept( s, ( struct sockaddr * )&peer,
21 &peerlen );
22 if ( !isvalidsock( sdata ) )
23 error( 1, errno, "accept failed" );
24 rc = readn( sdata, ( char * )&peer.sin_port,
25 sizeof( peer.sin_port ) );
26 if ( rc < 0 )
27 error( 1, errno, "ошибка при чтении номера порта" );
28 shb = socket( PF_INET, SOCK_STREAM, 0 );
29 if ( !isvalidsock( shb ) )
30 error ( 1, errno, "ошибка при создании сокета shb" );
31 rc = connect ( shb, ( struct sockaddr * )&peer, peerlen );
32 if (rc )
33 error( 1, errno, "ошибка вызова connect для сокета shb");
34 tv.tv_sec = T1 + T2;
35 tv.tv_usec = 0;
36 FD_ZERO( &allfd ) ;
37 FD_SET( sdata, &allfd );
38 FD_SET( shb, &allfd ) ;
39 maxfdl = ( sdata > shb ? sdata : shb ) + 1;
Инициализация и соединение
19-23 Слушаем и принимаем соединения от клиента. Кроме того, сохраняем адрес клиента в переменной peer, чтобы знать, с кем устанавливать соединение для пульсации.
24-27 Читаем номер порта, который клиент прослушивает в ожидании соединения для пульсации. Считываем его непосредственно в структуру peer. О преобразовании порядка байтов с помощью htons или ntohs беспокоиться не надо, так как порт уже пришел в сетевом порядке. В таком виде его и надо сохранить в peer.
28- 33 Получив сокет shb, устанавливаем соединение для пульсации.
34-39 Взводим таймер и инициализируем маски для select.
Оставшаяся часть сервера представлена в листинге 2.29.
Листинг 2.29. Обработка сообщений сервером
40 for ( ;; )
41 {
42 readfd = allfd;
43 rc = select( maxfdl, &readfd, NULL, NULL, &tv );
44 if ( rc < 0 )
45 error( 1, errno, "ошибка вызова select" );
46 if ( rc == 0 ) /* Произошел тайм-аут. */
47 {
48 if ( ++missed_heartbeats > 3 )
49 error( 1, 0, "соединения нет\n" );
50 error( 0, 0, "пропущен пульс #%d\n",
51 missed_heartbeats );
52 tv.tv_sec = T2;
53 continue;
54 }
55 if ( FD_ISSET( shb, &readfd ) )
56 {
57 rc = recv( shb, hbmsg, 1, 0 );
58 if ( rc == 0 )
59 error( 1, 0, "клиент завершил работу\n" );
60 if ( rc < 0 )
61 error( 1, errno, "ошибка вызова recv для сокета shb" );
62 rc = send( shb, hbmsg, 1, 0 );
63 if ( rc < 0 )
64 error( 1, errno, "ошибка вызова send для сокета shb" );
65 }
66 if ( FD_ISSET( sdata, &readfd ) )
67 {
68 rc = recv (sdata, msg, sizeof( msg ), 0);
69 if ( rc == 0 )
70 error (1, 0, “клиент завершил работу\n”);
71 if ( rc < 0 )
72 error (1, errno, “ошибка вызова recv”);
73 /*Обработка данных*/
74 }
75 missed_heartbeats = 0;
76 tv.tv_sec = T1 + T2;
77 }
78 EXIT( 0 );
79 }
42-45 Как и в ситуации с клиентом, вызываем select и проверяем возвращаемое значение.
46-53 Обработка тайм-аута такая же, как и в первом примере сервера в листинге 2.25.
55-65 Если в сокете shb есть данные для чтения, то читаем однобайтовый пульс и возвращаем его клиенту.
66-74 Если что-то поступило по соединению для передачи данных, читаем и обрабатываем данные, проверяя ошибки и признак конца файла.
75-76 Поскольку только что получены данные от клиента, соединение все еще живо, поэтому сбрасываем в нуль счетчик пропущенных пульсов и переустанавливаем таймер.
Если запустить клиента и сервер и имитировать сбой в сети, отсоединив один из хостов, то получим те же результаты, что при запуске hb_server и hb_client.