Rambler's Top100 Service калинин.ru / программирование / сети /  << 19.12.00 >>

"Тонкий" клиент

В предыдущей заметке Gregory Liokumovich рассказывал о применении событийной модели WinSock для программирования сетевых приложений. На самом деле, как мне кажется, организация сетевой программы не должна зависеть от используемой версии API. То есть, вычислительная часть и непосредственно вызовы сетевых функций должны быть максимально разделены.

При этом, я не считаю абстракцию следующего вида (для клиента):

class ClientSocket
{
public:
   open( /* ... */ );
   bind( /* ... */ );
   close( /* ... */ );
   read( /* ... */ );
   write( /* ... */ );
};

удовлетворительной. Связано это с тем, что по сути такой класс ничуть не скрывает деталей реализации, а, наоборот, подчеркивает их. На мой взгляд, организация взаимодействия между клиентом и сервером не должна содержать в себе "пересылки данных", а должна проектироваться исходя из сообщений, которыми те обмениваются. Следовательно, должен быть какой-то метод скрыть в себе детали пересылки таким образом, что бы программист, использующий код впоследствии даже не догадывался об использовании сокетов. На самом деле, "не догадываться" --- это уже слишком, а вот сделать так, что бы можно было хотя бы не особенно задумываться, вполне возможно.

То что будет написано ниже не стоит рассматривать как панацею от всех бед; просто пример использования.

Выделим две сущности: "соединение" и "менеджер". Соединение будет "знать" о том, как обрабатывать конкретный дескриптор, а менеджер будет заниматься управлением соединений, т.е. ожидать событий и передавать управление тому соединению, для которого было получено событие нужного типа.

Соединение

Начнем с соединения, т.е. определим класс Connection. Понятно, что внутри себя объект этого класса должен содержать системнозависимую информацию, необходимую для обеспечения передачи данных. Для того, что бы продолжить разговор не абстрактными терминами, а более-менее конкретно, скажу, что в качестве протокола передачи данных будем использовать TCP с реализацией системного интерфейса в виде BSD sockets.

    Лирическое отступление:
   

Область примения конечного автомата не ограничена одним только разбором текста: зачастую очень удобно представить в виде конечного автомата и другие объекты в программе, кроме парсеров.

Будем рассматривать объект класса Connection как... конечный автомат. Понятно, что набор его состояний определяется постановкой задачи для всей программы, но для менеджера важны только следующие состояния:

enum
{
   connecting,
   writing,
   reading,
   closing
} state;

Я думаю, понятно, что каждое из этих состояний будет характеризовать. Одно "но": в списке этих состояний отображены только состояния, существенные для менеджера соединений, остальные нас пока что не интересуют. Опишем класс Connection "почти целиком":

class Connection
{
  friend class Downloader;
public:
  Connection(const ip_address& ia, unsigned int p /* , ... */ );
  virtual ~Connection();
private:
  int         fd;

  ip_address   ip_addr;
  unsigned int port;

  time_t last_accessed;

  bool open();
  void close();
  void read();
  void write();
  void connected();

  enum
  {
     /*
      * ...
      */

  } state;
};

Несложно заметить, что в нем присутствуют методы read() и т.д., но на этот раз они несут другую смысловую нагрузку, чем в первом примере. Во-первых, это реакция на событие, а не его инициация, как раньше (кроме open()). То есть, метод read() вызывается менеджером тогда, когда есть что прочитать и соединение находилось в нужном состоянии (reading). Тип ip_address нигде не определяю, но это всего-навсего синоним используемого системного типа для представления ip-адресов выбранного протокола (IPv4, IPv6). Соответственно, многоточие в конструкторе оставлено для демонстрации того, что соединение выполняет не только функции связи, но и некоторые функции обработки данных, нужная информация для которых и передается через это многоточие. Т.е., реально вместо многоточия находится еще какой-то список параметров, который сейчас для нас неважен.

Деструктор сделан виртуальным для того, что бы сделать возможным расширять возможности Connection в случае необходимости (или использовать его в качестве одного из базовых классов) и корректно удалять его через указатель.

На всякий случай приведу внутреннее устройство функций типа read(), хотя оно и достаточно простое.

bool Connection::open()
{
  assert(fd == -1);

  struct sockaddr_in servaddr;

  fd = socket(AF_INET, SOCK_STREAM, 0);

  if(fd == -1) return false;
		
  bzero(&servaddr, sizeof(servaddr));
  servaddr.sin_family = AF_INET;
  servaddr.sin_port = htons(port);
  servaddr.sin_addr.s_addr = ip_addr;

  int flags = fcntl(fd, F_GETFL, 0);
  if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
  {
     ::close(fd);
     fd = -1;
     return false;
  }

  if(connect(fd, (sockaddr*)&servaddr, sizeof(servaddr)) != 0)
  {
     if(errno == EINPROGRESS)
     {
        state = connecting;
     }
     else
     {
        ::close(fd);
        fd = -1;
        return false;
     }
  }
  else
    state = writing;

  time(&last_accessed);

  return true;
}

Функция open() выполняет необходимые подготовительные операции, используя при этом технику неблокирующего connect(). Об этом я уже подробно рассказывал, так что сейчас обращу ваше внимание только на изменение состояний на connecting и writing. На самом деле, конечно же, лучше было бы предусмотреть отдельную функцию changeState(), которая бы по текущему контексту выбирала бы нужное состояние. В данном случае правомерна "в общем" только смена состояния в connecting, а вот writing --- зависит от конкретного приложения. Тем не менее, я это обговорил, а дальше имейте ввиду, что этот код приведен здесь только ради примера и многое в нем упрощено.

last_accessed используется для того, чтобы обеспечить проверку на затраченное время. Например, можно сказать, что если между последовательными операциями чтения прошло уже более минуты, то такое соединение надо закрыть.

В случае, если соединение с хостом прошло успешно, то реакцией на это событие будет:

void Connection::connected()
{
  assert(state == connecting);

  socklen_t n;
  int error;
		
  n = sizeof(error);
  if(getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &n) < 0 || error != 0)
  {
     state = closing;
  }
  else
    state = writing;

  time(&last_accessed);
}	

Хочу заметить, что вместо того, чтобы вызвать close() напрямую самостоятельно, функция connected() переводит состояние в closing, т.е. непосредственный вызов произойдет из менеджера соединений. Прошу обратить на это внимание: во-первых, это укладывается в предложенную схему конечного автомата и, во-вторых, дает возможность менеджеру выполнить какие-то свои операции перед тем, как закрыть соединение.

Соответственно, при завершении соединения, нужно закрыть дескриптор:

void Connection::close()
{
  if(fd == -1) 
     return;

  ::close(fd);
  fd = -1;
}

Стоит обратить внимание на отсутствие изменения last_accessed и state: в данном случае оно ничего не значит (так как работа с соединением прекращена).

Перейдем к чтению и записи. Эти операции я привожу для демонстрации того, как происходит работа с сокетами. Итак, сначала чтение из сокета:

void Connection::read()
{
  int readed;
  char buf[4096];

  bool working = true;

  assert(state == reading);

  for( ; working ; )
  {
     readed = ::read(fd, buf, sizeof(buf));

     switch(readed)
     {
     case 0:
       {
          /*
           * Соединение завершено.
           */

          state = closing;
          working = false;

          break;
       }
     case -1:
       {
         /*
          * Либо ошибка соединения, либо 
          * надо подождать прихода новых данных.
          */

         if(errno == EAGAIN || errno == EINTR)
         {
            // Пусто.
         }
         else
           state = closing;

         working = false;

         break;
       }
     default:
       {
          assert(readed > 0);

          response += std::string(buf, readed);
       }
     }
  }

  time(&last_accessed);
}

Начнем с того, что здесь как раз, как мне кажется, вместо использования флага working для выхода из цикла, больше подошел бы goto: тогда бы этот цикл смотрелся бы логичнее. Кроме того, хочу обратить внимание на комментарий "Пусто" внутри функции: хочется сразу же заметить, что в подобных местах подобный комментарий очень желателен.

response, который используется в конце функции, я нигде не определял, но это и не важно. Это всего-лишь хранилище принятой информации.

Опять же, перевод состояния в closing неправомерен в общем случае! Потому что в общем принятую информацию надо обработать и, если надо, ответить на нее.

void Connection::write()
{
  ssize_t written;

  assert(state == writing);
		
  written = ::write(fd, request.c_str() + write_pos, request.length() - write_pos);

  if(written == 0)
  {
     if(errno == EAGAIN || errno == EINTR)
     {
        /*
         * Это нормально, можно будет повторить вызов.
         */
     }
     else
     {
        /*
         * Все плохо, прекращаем запись.
         */

         state = closing;
     }
   }
   else
   {
      write_pos += written;
      if(written >= request.length())
      {
         /*
          * Запись закончилась. 
          */

         shutdown(fd, 1);
         state = reading;
      }
    }

   time(&last_accessed);
 }

request и write_pos определяют информацию, которую надо записать в сокет. Опять же, я их нигде не определял, надеюсь что это никого не смущает. Вызов shutdown() определяет то, что запись больше производится не будет: вообще, класс Connection в таком виде обеспечивает следующую функциональность: запись запроса в сокет (например, HTTP-запрос вида "GET / HTTP/1.0") и прием ответа от сервера. Реально может понадобиться другая функциональность, это достигается за счет изменения логики смены состояний.

Менеджер соединений

Функционально менеджер соединений представляет из себя отдельный поток управления, который будет заниматься вызовом select(). Связь с остальным миром будет поддерживаться за счет двух очередей (FIFO), которые будут содержать соединения "на вход" и уже обработанные соединения. Вызывающая сторона по своему усмотрению обрабатывает эти очереди (она может добавлять во входную очередь соединения и выбирать из выходной). Мне крайне не хочется приводить здесь еще и ненужный код для создания потоков, поэтому сосредоточусь на одной функции класса ConnectionsManager, которая является по совместительству функцией потока.

class ConnectionsManager
{
  /*
   * ...
   */

private:
  void* process();

  typedef std::map<int, Connection*> connections_container;
  typedef std::queue<Connection*> output_queue_container;
  typedef std::queue<Connection*> input_queue_container;

  connections_container  connections;
  output_queue_container output_queue;
  input_queue_container  input_queue;

  bool finish;

  /*
   * ...
   */
};

Реальная функция потока (для реализации pthreads) принимает в качестве аргумента указатель, который определяет объект класса ConnectionsManager, и запускает функцию класса process(). Теперь о том, как она выглядит:

void* ConnectionsManager::process()
{
   fd_set rfds, wfds;
   struct timeval tv;
   tv.tv_sec = 0; tv.tv_usec = 500;

   /*
    * Здесь таймаут задан жестко внутри кода. На самом деле,
    * он как-то задается через параметры программы.
    */
   const int timeout = 30; 

   for( ; ; )
     {

Функция логически делится на две части: до select() и после. В первой части обрабатывается входная очередь и устанавливаются дескрипторы в нужных множествах. Заметьте, что тут же происходит попытка открыть соединение и если она неудачна, то соединение сразу же помещается в выходную очередь. Это важно: все соединения, попавшие в менеджер, рано или поздно его покинут.

        FD_ZERO(&rfds); 
        FD_ZERO(&wfds);

        int max_fd = -1;

        Connection* to_add;
        for( ; (to_add = get_from_input_queue()) ; )
          {
             if(to_add->open())
                connections[to_add->getDescriptor()] = to_add;
              else
                put_in_output_queue(to_add);
          }

        for(connections_container::iterator i = connections.begin();
            i != connections.end(); )
        {
           switch(i->second->state)
           {
              case Connection::reading:
              {
                 if(max_fd < i->first)
                 max_fd = i->first;

                 FD_SET(i->first, &rfds);

                 i++;
								
                 break;
              }
              case Connection::writing:
              {
                 if(max_fd < i->first)
                 max_fd = i->first;

                 FD_SET(i->first, &wfds);

                 i++;

                 break;
              }
              case Connection::closing:
              {
                 i->second->close();
                 put_in_output_queue(i->second);

                 connections_container::iterator j = i;
                 i++;

                 connections.erase(j);
                 break;
              }
              case Connection::connecting:
              {
                 if(max_fd < i->first)
                 max_fd = i->first;

                 FD_SET(i->first, &wfds);
                 FD_SET(i->first, &rfds);

                 i++;

                 break;
              }
           }
        }

        if(max_fd == -1)
        {
           if(finish)
              break;

            sched_yield();
            continue;
        }

        select(max_fd + 1, &rfds, &wfds, NULL, &tv);

Часть после select() занимается тем, что передает сообщения нужным соединениям.


        for(connections_container::iterator i = connections.begin();
            i != connections.end(); i++)
        {
           switch(i->second->state)
           {
              case Connection::reading:
                {
                   if (FD_ISSET(i->first, &rfds)) 
                   i->second->read();
									
                   break;
                }
              case Connection::writing:
                {
                   if (FD_ISSET(i->first, &wfds)) 
                   i->second->write();

                   break;
                }
              case Connection::connecting:
                {
                   if(FD_ISSET(i->first, &wfds) || FD_ISSET(i->first, &rfds))
                   i->second->connected();

                   break;
                }
              case Connection::closing:
                {
                   break;
                }
           }

           if(!(FD_ISSET(i->first, &wfds) || FD_ISSET(i->first, &rfds)))
             if(time(NULL) > (i->second->last_accessed + timeout))
                i->second->state = Connection::closing;
        }
   return NULL;
}

В качестве послесловия к этой функции хочется отметить, что установку блокировок из нее (а это, все-таки, многопоточная программа) я удалил специально. А вот sched_yield() оставил потому, что уверен в том, что этот вызов много лучше, чем вызов sleep().

Подводя итоги, хочется поговорить об ограничениях. Все функции, которые выполняются над соединениями, выполняются в том же потоке, что и select(), следовательно время перехода из одного состояния в другое должно быть минимизировано (поэтому и заголовок статьи такой). С другой стороны, видно как от этого можно избавиться --- заведя еще один "шедулер", который будет обрабатывать соединения в состояниях, неинтересных для менеджера соединений. Кроме того, считается что все соединения равнозначны и поэтому порядок посылки сообщений (и возможности реакции на них) определяется... номером файлового дескриптора. В тех случаях, когда такой подход невозможен, придется организовывать приоритетную очередь и как-то назначать приоритеты соединениям.

Резюме

В общем, конечно же предложенная схема это далеко не идеал. Но целью этого опуса я ставил немного другое: хотелось показать то, что можно разделить обработку соединений в техническом и логическом смыслах. Вообще говоря, эта схема хороша тем, что ее (немного усложнив) можно выделить в отдельную библиотеку и использовать для, я думаю, большинства клиентских приложений, которые тратят большую часть времени на работу с сетью. Т.е., получается мечта программиста: code reuse. Опять же, схема не нова и много программистов независимо друг от друга "изобретают" что-то в этом духе.


Версия для печати


  Ссылки по теме:
W. Richard Stevens
   Unix Network programming, volume 1.
W. Richard Stevens
   TCP/IP Illustrated, volume 1.
W. Richard Stevens,
   TCP/IP Illustrated, volume 2.
  Рядом в разделе:
Обзор CORBA (14.01.01)
   CORBA (расшифровывается как Common Object Request Broker) это технология, которая позволяет рассматривать компоненты распределенной системы как объекты, отвечающие некоторым определенным интерфейсам....   >>>>
Событийная модель в WinSock (12.12.00)
   Автором этого текста является Gregory Liokumovich ( ). Он любезно прислал мне этот текст, связанный с программированием сетей при помощи интерфейса...   >>>>
  Рядом по дате:
Гимн (25.12.00)
   У Спектатора разгорелись жуткие дебаты по поводу новопринятого гимна и связанных с ним проблем... я очень внимательно читал эти отзывы, хотя...   >>>>
Быстрый и мертвый / The Quick and the Dead, 1995 (13.12.00)
   На этот раз я впервые рассказываю о фильме, который я посмотрел "по телевизору": он был на канале "Боевик" от НТВ+. Собственно,...   >>>>
  Содержание:
Заглавная страница
Мой блог
Мое резюме
Дайджест
Программирование
   C&C++
Сети
Unix
Алгоритмы
Оптимизация
Соревнования
Отвлеченно
XML
TeX
Просто так
Студенческое
Туризм
  Байки
Фотографии
Комментарии
   Книги
Web-ресурсы
Фильмы
Интернет
Программное обеспечение
Жизнь
Благодарности
Форум
Хронология
 
  В этом разделе:
События ядра в FreeBSD. (16.07.01)
   Обработка большого количества сетевых соединений всегда затруднительна. Мало того, не существует стандартных решений, подходящих для проблем любого вида, в которых возникает...   >>>>
Обзор CORBA (14.01.01)
   CORBA (расшифровывается как Common Object Request Broker) это технология, которая позволяет рассматривать компоненты распределенной системы как объекты, отвечающие некоторым определенным интерфейсам....   >>>>
"Тонкий" клиент (19.12.00)
   В предыдущей заметке Gregory Liokumovich рассказывал о применении событийной модели WinSock для программирования сетевых приложений. На самом деле, как мне кажется,...   >>>>
Событийная модель в WinSock (12.12.00)
   Автором этого текста является Gregory Liokumovich ( ). Он любезно прислал мне этот текст, связанный с программированием сетей при помощи интерфейса...   >>>>
Неблокирующий connect() (01.12.00)
   В продолжение темы о замене блокирующего вызова , хочется рассказать о другой функции интерфейса сокетов, . Она имеет следующий прототип: int...   >>>>
Определение ip-адреса по имени хоста, adns (05.11.00)
   Есть такой, характерный для организации "традиционного" UNIX'а, системный вызов под названием : struct hostent * gethostbyname(const char *name); Традиционен он тем,...   >>>>
lingering close (29.10.00)
   Когда программа выкачивает один файл с удаленного сервера с использованием протокола TCP, а после этого сразу же "отваливается", то проблем, скорее...   >>>>
Содержание раздела полностью...
   Примерно в тоже время
Гимн (25.12.00)
   У Спектатора разгорелись жуткие дебаты по поводу новопринятого гимна и связанных с ним проблем... я очень внимательно читал эти отзывы, хотя...   >>>>
Быстрый и мертвый / The Quick and the Dead, 1995 (13.12.00)
   На этот раз я впервые рассказываю о фильме, который я посмотрел "по телевизору": он был на канале "Боевик" от НТВ+. Собственно,...   >>>>
Хронология полностью...
   Содержание
Заглавная страница
Мой блог
Мое резюме
Дайджест
Программирование
  C&C++
Сети
Unix
Алгоритмы
Оптимизация
Соревнования
Отвлеченно
XML
TeX
Туризм
  Байки
Фотографии
Комментарии
  Книги
Web-ресурсы
Фильмы
Интернет
Программное обеспечение
Жизнь
Студенческое
Просто так
Благодарности
Форум
Хронология
© 2000-2008, Andrey L. Kalinin
mailto:andrey@kalinin.ru
Rambler's Top100