Skip to content

Latest commit

 

History

History
1130 lines (908 loc) · 55.5 KB

lab07-nonblocking-server.md

File metadata and controls

1130 lines (908 loc) · 55.5 KB
title lang
Лабораторная работа № 7. Неблокирующий режим работы сокетов
ru

Цель работы

  • Изучить отличительные особенности неблокирующего режима работы сокетов.
  • Научиться использовать сокеты в неблокирующем режиме на примере реализации протокола обмена файлами.

Введение

Неблокирующие сокеты

Обслуживание каждого соединения в отдельном потоке при помощи блокирующих сокетов крайне просто для программирования, однако этот подход к созданию асинхронных сетевых приложений имеет принципиальные ограничения.

  • Как известно из курса системного ПО, использования потоков влечет накладные расходы ОС на их переключение. Де-факто они становятся критическими при сотнях активных потоках на одно ядро ЦП или при тысячах потоков на всю систему. Современные нагрузки предполагают десятки тысяч активных соединений одновременно или миллионы малоактивных, поэтому для них порождать поток на каждое соединение неприемлемо.

  • Блокирующие сокеты неудобны тем, что позволяют либо принимать данные из сокета, либо отправлять их. Это приемлемо, если клиент и сервер обмениваются запросами в строгой очередности. Однако, если любая сторона может как принимать, так и отправлять данные по своему усмотрению (например, мессенджер), требуется или два соединения (расход сокетов), или два потока (накладные расходы, описанные выше).

Необходим инструмент для работы из одного потока с несколькими сокетами сразу или с несколькими операциями над одним сокетом сразу. Таковым является неблокирующий режим работы сокетов или, сокращенно, неблокирующие сокеты.

Стандартные операции над сокетом, переведенным в неблокирующий режим, никогда не приводят к блокировке. Вместо этого они завершаются со специальным кодом ошибки (WSAEWOULDBLOCK в Windows; EWOULDBLOCK или EAGAIN в *nix), который означает, что операция не может быть выполнена в данный момент (не пришли данные в случае recv(), нет места в буфере отправки в случае send() и прочее). Таким образом, программа не лишается управления при временной невозможности выполнить операцию, а лишь информируется об этом, и может повторить ее успешно позже.

Как дождаться момента, когда потенциально блокирующую функцию можно будет вызвать успешно (говорят: когда сокет будет готов к работе)? Постоянно вызывать ее, пока результат — EWSAWOULDBLOCK, неэффективно с точки зрения как производительности — данные могут прибыть для одного сокета, пока совершается опрос других, — так и энергоэффективности - процесс все время работает, в отличие от состояния блокировки. Вместо этого применяется вызов специальной функции, называемой мультиплексором. Эта функция принимает набор сокетов и блокирует выполнение, пока хотя бы один из них не будет готов к работе, а по ее завершении можно узнать, какой именно (их может быть несколько). При этом для каждого из сокетов можно указать, что для него считается готовностью к работе: возможность приема, отправки или любая из них. Сокет-слушатель также может быть неблокирующим, для него актуальна возможность приема, но для вызова не recv(), а accept().

Таким образом, работа асинхронного приложения на основе неблокирующих сокетов строится в таком цикле:

  1. Для каждого сокета выбираются события, которые должны наступить, чтобы считать сокет готовым к работе, и он добавляется в набор сокетов.

  2. Вызывается функция-мультиплексор, выполнение блокируется.

  3. Для каждого сокета из набора проверяется, какие события для него наступили (готовность к приему, готовность к отправке, ошибка).

  4. Для сокетов, готовых к работе, соответствующая функция (например, recv() при готовности к приему) вызывается, пока не завершится с ошибкой WSAEWOULDBLOCK.

Недостаток неблокирующего режима — сложность программирования. При использовании потоков данные, связанные с каждым клиентом, хранятся в локальных переменных. При работе со всеми клиентами из одного потока требуется самостоятельно вести массив данных всех клиентов. Поскольку после любого вызова потенциально блокирующей функции (например, recv()) может быть нужно вернуться к мультиплексору, требуется также для каждого клиента хранить состояние, например, сколько байт осталось принять. При использовании потоков состояние определялось местом программы.

Для клиентов неблокирующий режим обычно менее актуален, но также возможен. Функция connect() в нем возвращает EWOULDBLOCK при первом вызове, затем нужно дожидаться возможности записи в сокет, что означает успешное завершение соединения. Неблокирующий режим доступен и для дейтаграммных сокетов, и соответственно, recvfrom() и sendto().

API неблокирующего режима

Переход в неблокирующий режим

Неблокирующий режим переключается для каждого сокета отдельно. Даже если сокет-слушатель работает в неблокирующем режиме, сокеты-передатчики, порождаемые accept(), начинают работать в блокирующем режиме. Для переключения используется функция ioctl() (*nix) или ioctlsocket() (Windows), однако с одинаковым параметром FIONBIO (Flag for IO control: non-blocking IO):

bool
make_nonblocking(SOCKET handle) {
    unsigned long int on = 1;
    const int result = ::ioctlsocket(handle, FIONBIO, &on);
    if (result < 0) {
        fprintf(stderr, "error: ioctlsocket()=%d\n", WSAGetLastError());
        return false;
    }
    return true;
}

Мультиплексирование

Мультиплексор в программировании сокетов — это специальная операция, которая заключается в ожидании событий, которые могут произойти на некоторых сокетах из группы. Например, мультиплексор одновременно ожидает, когда одному из сокетов-передатчиков поступит запрос клиента или сокету-слушателю придет новый запрос на подключение. Мультиплексор — единственная блокирующая операция, связанная с неблокирующими сокетами.

Два стандартных мультиплексора — select() и poll() (WSAPoll() — несмотря на переименования, данная функция повторяет поведение POSIX.) В современных приложениях рекомендуется использовать poll().

Функция poll() принимает:

  • массив дескрипторов опроса (poll file descriptor), каждый из которых описывает, какие события и для какого сокета будут ожидаться;
  • количество дескрипторов в массиве;
  • максимальное время ожидания (таймаут).

После выполнения poll() изменяет поле-флаги revents каждого дескриптора опроса, указывая, какие из запрошенных событий произошли с сокетом. Некоторые из событий, которые можно отслеживать (флаги):

  • POLLIN — поступление данных для сокета-передатчика (можно вызвать recv()) или новое подключение для сокета-слушателя (можно вызвать accept());
  • POLLOUT — возможность передачи данных после установления соединения (можно вызвать send()) или окончание установления соединения (после неблокирующего вызова connect());
  • POLLHUPrevents) — штатное завершение соединения удаленной стороной.
  • POLLERRrevents) — наличие ошибки.

Если таймаут ожидания не нужен, его можно задать бесконечным, передав в качестве аргумента отрицательное число.

WSAPoll() возвращает количество сокетов, которые стали готовыми к работе (0, если раньше истек таймаут) или отрицательное число при ошибке самого мультиплексирования (но не при ошибке одного из сокетов).

WSAPoll() доступна, начиная с Windows Vista. Чтобы использовать ее в своих программах необходимо до подключения <winsock2.h> указать, что будет использоваться этот новый API:

#define WINVER _WIN32_WINNT_WIN7
#define _WIN32_WINNT _WIN32_WINNT_WIN7

Задание

Требуется перевести асинхронный сервер из ЛР № 6 на неблокирующие сокеты.

Указание. Определенные фрагменты кода и целые функции уже были написаны в предыдущих ЛР, откуда их можно и нужно брать, однако программу удобнее писать заново — отличия от ЛР № 5 и 6 достаточно велики.

(@) Создайте новый проект lab07-nonblocking-server. Подключите к библиотеки, необходимые для работы с сокетами. Инициализируйте API сокетов.

(@) Создайте сокет-слушатель и запишите код для приема одного подключения, как в ЛР № 5 и № 6.

Управление соединениями в неблокирующем режиме

Как первый этап будем в неблокирующем режиме принимать подключения и вести перечень клиентов, реагируя на то, когда они отключаются. Обмениваться с ними данными временно не будем.

Упрощенно опишем клиента как сокет и его адрес:

struct Client {
    SOCKET channel;
    sockaddr_in peer;
};

Сервер в минимальном виде — это сокет-слушатель и клиенты:

struct Server {
    SOCKET listener;
    std::vector<Client> clients;
};

Всю работу сервера, то есть цикл с вызовом мультиплексора, вынесем в функцию

bool run_server(Server& server);

После мультиплексирования она принимает новые подключения и обслуживает существующие; для этих задач заведем отдельные функции:

bool process_listener(Server& server, WSAPOLLFD fd);
bool process_client(Server& server, size_t index, WSAPOLLFD fd);

(@) Переведите сокет-слушатель в неблокирующий режим работы.

(@) Добавьте в программу основной цикл работы сервера, в котором:

1. Сервер инициализируется — изначально открыт только сокет-слушатель:
    ```
    Server server;
    server.listener = listener;
    ```

2. Сервер работает до возникновения неисправимых ошибок:
    ```
    while (run_server(server));
    ```

3. По окончании работы сервера все сокеты закрываются:
    ```
    for (const Client& client : server.clients) {
        ::closesocket(client.channel);
    }
    ::closesocket(server.listener);
    ```

Мультиплексирование

(@) Заполните набор дескрипторов ожидания для мультиплексирования.

В нашем случае это сокеты-передатчики клиентов и сокет-слушатель. Будем всегда помещать сокет-слушатель в конец набора.

bool
run_server(Server& server) {
    std::vector<WSAPOLLFD> fds;

На данном этапе все сокеты клиентов ожидают только отключения:

    for (const auto& client : server.clients) {
        WSAPOLLFD fd;
        fd.fd = client.channel;
        fd.events = POLLIN;
        fds.push_back(fd);
    }

Сокет-слушатель ожидает новых подключений, то есть возможности чтения:

    WSAPOLLFD fd;
    fd.fd = listener;
    fd.events = POLLIN;
    fds.push_back(fd);

В заполненным набором можно вызвать мультиплексор с бесконечным таймаутом:

    ::WSAPoll(&fds[0], fds.size(), -1);

(@) Добавьте обработку ошибок вызова WSAPoll() (выход из функции с false).

(@) Вызовите обработчик событий сокета-слушателя (он сам проверит, были ли они): process_listener(server, fds.back());

(@) Вызовите обработчики событий всех клиентов.

Поскольку требуется работать и с массивом клиентов, и с массивом дескрипторов, удобно пользоваться индексом.

    for (size_t i = 0; i < server.clients.size(); i++) {
        const auto progress = process_client(server, i, fds[i]);

На данном этапе обработчик событий клиента возвращает false, если работу с клиентом необходимо закончить. Обратите внимание, что здесь клиент не удаляется из перечня, а только его сокет помечается INVALID_SOCKET. Иначе индексы в массиве клиентов и дескрипторов будут означать разные сокеты, поддержка чего усложнила бы код.

        if (!progress) {
            ::closesocket(server.clients[i].channel);
            server.clients[i].channel = INVALID_SOCKET;
        }
    }

(@) После обработки всех клиентов удаляйте тех, работа с которыми закончена:

    auto it = server.clients.begin();
    while (it != server.clients.end()) {
        if (it->channel == INVALID_SOCKET) {
            it = server.clients.erase(it);
        } else {
            ++it;
        }
    }

    return true;
}

Напомним: it — итератор, объект, представляющий место в коллекции клиентов. Оператором ++it он переходит к следующему элементу, если текущий удалять не нужно. Если же текущий элемент удаляется erase() возвращает новое значение итератора, элемент за удаленным.

Прием подключений

(@) Проверьте сокет-слушатель на наличие ошибок:

bool
process_listener(Server& server, WSAPOLLFD fd) {
    if (fd.revents & POLLERR) {
        return false;
    }

(@) Напишите функцию для получения кода ошибки отдельного сокета при помощи getsockopt() и SO_ERROR (подобный код был в ЛР № 2). Здесь и далее при обнаружении ошибки сокета (POLLERR) печатайте её код.

Поскольку сокет-слушатель ожидает всегда только событий приема данных, опустим проверку if (fd.revents & POLLIN) — это единственный возможный здесь случай.

(@) Принимайте новые подключения, пока не выберете всю их очередь.

    // fd.revents & POLLIN
    while (true) {
        sockaddr_in peer;
        int peer_size = sizeof(peer);
        auto channel = ::accept(listener, (struct sockaddr*)&peer, &peer_size);

Поскольку режим сокета-слушателя неблокирующий, ошибка accept() может означать, что всего лишь приняты все входящие подключения, если код ошибки — WSAEWOULDBLOCK.

        if (channel == INVALID_SOCKET) {
            const int code = ::WSAGetLastError();
            if (code == WSAEWOULDBLOCK) {
                return true;
            }

В противном случае ошибка действительно произошла. Завершать работу всего сервера нет нужды, достаточно выйти из функции. Если еще остались не принятые подключения, они будут обработаны после очередного вызова мультиплексора.

            return false;
        }

(@) Переведите сокет-передатчик клиента в неблокирующий режим. При ошибке закройте сокет-передатчик и продолжите цикл (continue).

Наконец, добавим нового клиента в реестр:

        Client client;
        client.channel = channel;
        client.peer = peer;
        server.clients.push_back(client);

        fprintf(stderr, "info: client connected: peer=%s, clients=%u\n",
                endpoint_to_string(peer).c_str(), server.clients.size());
    }
}

Сообщения выводятся обычной fprintf() без блокировок, поскольку вся работа ведется в одном потоке.

Обработка отключения клиента

(@) Проверьте сокет-передатчик клиента на наличие ошибок:

bool
process_client(Server& server, size_t index, WSAPOLLFD fd) {
    Client& client = server.clients[index];

    if (fd.revents & POLLERR) {
        return false;
    }

(@) При отключении клиента печатайте сообщение об этом и сигнализируйте об окончании работы с клиентом возвратом false:

    if (fd.revents & POLLHUP) {
        fprintf(stderr, "info: client disconnected: peer=%s\n",
                endpoint_to_string(client.peer).c_str());
        return false;
    }
    return true;
}

(@) Запустите сервер. Подключитесь к нему одновременно несколькими клиентами из ЛР № 4 или ncat (можно вперемешку), затем отключите часть клиентов и подключите снова. Убедитесь, что сервер корректно реагирует на подключения и отключения. Зафиксируйте вывод сервера в отчете.

**Примечание.**  Не отправляйте серверу никаких команд —
на данном этапе он не в состоянии их обработать.

Модель клиента

В случае блокирующих сокетов, работавших в отдельных потоках, состояние клиента определялось местом программы, которое в данный момент выполнялось потоком клиента. В случае неблокирующих сокетов, состоянием необходимо управлять вручную. Основные состояния следующие:

  • Прием запроса: ожидается поступление данных из сети, пока не будет считан запрос целиком, после чего можно начать его обслуживание. В этом режиме клиент начинает работу.

    enum ClientState {
        CLIENT_RECEIVE_REQUEST,
    
  • Отправка файла или отправка данных: ожидается возможность отправки данных (освобождение буфера отправки), пока не удастся отправить весь ответ, после чего клиент возвращается в режим прием запроса.

        CLIENT_SEND_FILE,
        CLIENT_SEND_DATA,
    
  • Завершенная работа: ничего не делается, это специальное состояние, которое наступает при отключении удаленного пользователя или при ошибках.

        CLIENT_TERMINATED
    };
    

С каждым состоянием связан собственный набор данных. Так, во время приема заголовка необходимо хранить его целиком, потому что заголовок может прийти по частям, следовательно, нужна возможность приостанавливать прием после каждой части, чтобы продолжить при получении следующей.

#pragma pack(push, 1)
struct Header {
    uint32_t length;
    Type type;
};
#pragma pack(pop)

Заголовок пакета в используемом протоколе передачи файлов (длина и тип) является фиксированной по длине частью запроса:

struct RequestState {
    Header fixed;

В некоторых сообщениях возможны данные переменной длины, например, это имя файла при запросе на его загрузку:

    std::vector<uint8_t> variable;

Для приема по частям необходимы счетчики, сколько байтов осталось принять из каждой части (теоретически достаточно одного счетчика, но два удобнее):

    uint32_t fixed_bytes_left;
    uint32_t variable_bytes_left;
};

В состоянии выдачи файла достаточно хранить дескриптор открытого файла. Размер файла отправляется как часть заголовка, а данные считываются до конца файла, поэтому размер и остаток хранить дополнительно не нужно:

struct FileState {
    FILE* handle;
};

Передача блока данных, например, списка файлов по команде /list, принципиально не отличается от приема запроса переменной длины:

struct DataState {
    std::vector<uint8_t> bytes;
    uint32_t bytes_transmitted;
};

Все эти данные необходимо хранить для каждого клиента, но использовать одновременно только одну группу в зависимости от состояния:

struct Client {
    SOCKET channel;
    sockaddr_in peer;

    ClientState state;
    RequestState header;
    FileState file;
    DataState data;

Начальным состоянием клиента является прием запроса, причем количество оставшихся байт фиксированной части равно размеру заголовка.

(@) Напишите функцию void reset_client(Client& client), которая сбрасывает состояние клиента в начальное.

(@) Сбрасывайте состояние новых клиентов перед их добавлением в реестр (в функции process_listener()).

Обработка запросов в неблокирующем режиме

Любая операция обработки запроса в неблокирующем режиме имеет своим результатом некий прогресс:

enum Progress {
    DONE,       // операция выполнена целиком
    RUNNING,    // операция приостановлена (например, нужно больше данных)
    FINISHED,   // обмен данными завершен (клиент штатно отключился)
    FAILED      // произошла ошибка
};

Заранее будем считать, что функции receive_some() и send_some() уже переписаны для неблокирующего режима (мы сделаем это позже) и возвращают теперь не только состояние, но и количество переданных байтов:

struct Result {
    Progress status;
    size_t bytes;
};

Result receive_some(SOCKET channel, void* data, size_t size);
Result send_some(SOCKET channel, const void* data, size_t size);

Обработку запросов предлагается строить следующим образом. (Расширено 15.05.)

  • В run_server() вместо ожидания события POLLIN всегда, клиент должен ожидать POLLIN или POLLOUT в зависимости от своего текущего состояния:

        for (const auto& client : server.clients) {
            WSAPOLLFD fd;
            switch (client.state) {
            case CLIENT_RECEIVE_REQUEST:
                fd.events = POLLIN;
                fd.fd = client.channel;
                break;
            case CLIENT_SEND_FILE:
            case CLIENT_SEND_DATA:
                fd.events = POLLOUT;
                fd.fd = client.channel;
                break;
            default:
                fprintf(stderr, "debug: unexpected client state %d\n", client.state);
                fd.fd = -1;
            }
            fds.push_back(fd);
        }
    
  • Вместо возврата true, если не произошло ни ошибки, ни отключения, process_client() вызывает функцию, которая обрабатывает пришедшие запросы и возвращает false, если клиент завершил работу (по любой причине), иначе — true.

    bool
    serve_requests(Client& client) {
        Progress result = FAILED;
        do {
            result = serve_request(client);
        } while (result == DONE);
        return result == RUNNING;
    }
    

Функция serve_requests() пытается обслуживать запросы, пока каждый из них удается обслужить целиком (условие цикла). Если цикл прервался, единственный случай, когда следует продолжить работу с клиентом — когда запрос обработан еще не полностью (RUNNING).

Функция serve_request() должна быть готова принять клиента в любом состоянии и выполнить обработку запроса, начиная с соответствующего этапа (примем запроса, отправка заголовка ответа, отправка тела ответа). В каждом состоянии клиента прогресс отслеживается вспомогательными функциями, serve_request() не занимается этим:

Progress receive_request(Client& client);
void begin_file_transmission(Client& client);
void begin_list_transmission(Client& client);
void begin_error_transmission(Client& client);
Progress send_header(Client& client);
Progress send_file(Client& client);
Progress send_data(Client& client);
  • Если клиент принимает запрос, вызывается receive_request(). Если ей не удалось выполнить прием сразу или закончить начатый ранее, функция завершается с тем же результатом, что и receive_request().
Progress
serve_request(Client& client) {
    if (client.state == CLIENT_RECEIVE_REQUEST) {
        auto result = receive_request(client);
        if (result != DONE) {
            return result;
        }
  • После того, как запрос удалось принять полностью, готовится ответ на него. Подготовка включает формирование нового состояния клиента (заполнение поля file или data структуры Client) и перевод клиента в новое состояние (изменение поля state).
        const Type type = client.header.fixed.type;
        switch (type) {
        case TYPE_GET:
            begin_file_transmission(client);
            break;
        case TYPE_LIST:
            begin_list_transmission(client);
            break;
        default:
            begin_error_transmission(client,
                "unknown message type " + std::to_string((int)type));
        }
    }
  • Если клиент не принимал запрос в момент вызова serve_request() или если удалось (до)приниять запрос выше, делается попытка (до)отправить заголовок ответа. Функция send_header() отвечает за то, чтобы ничего не отправлять, если это уже сделано.
    const auto result = send_header(client);
  • Если (до)отправить заголовок ответа не удалось, serve_requests() завершается, как и в случае с приемом заголовка.
    if (result != DONE) {
        return result;
    }
  • Наконец, обрабатываются все прочие состояния клиента, кроме прием запроса:
    switch (client.state) {
    case CLIENT_SEND_FILE:
        return send_file(client);
    case CLIENT_SEND_DATA:
        return send_data(client);
    case CLIENT_TERMINATED:
        return FINISHED;
    }
}

Прием запроса

Прием запроса состоит из приема части запроса фиксированной длины и части переменной длины, если последнее необходимо:

Progress
receive_request(Client& client) {
    if (client.header.fixed_bytes_left > 0) {

Для удобства запишем адрес принимаемой фиксированной части в переменную, аналогично байтовому буферу для функции recv():

        const auto data =
                reinterpret_cast<uint8_t*>(&client.header.fixed);

В массиве data первые offset байт уже заполнены (приняты ранее), а client.header.fixed_bytes_left осталось принять, что функция и пытается сделать аналогично receive_some() в ЛР №№ 4—6: 

        const auto offset =
                sizeof(client.header.fixed) - client.header.fixed_bytes_left;
        const auto result = receive_some(
                client.channel,
                &data[offset],
                client.header.fixed_bytes_left);

Если весь затребованный остаток запроса принять не удалось, функция завершается так же, как и операция приема (не важно, была ли ошибка). Количество байтов, оставшихся для приема, уменьшается на количество принятых.

        client.header.fixed_bytes_left -= result.bytes;
        if (result.status != DONE) {
            return result.status;
        }

Когда вся фиксированная часть запроса принята, вычисляется размер части переменной длины и подготавливаются соответствующие поля состояния клиента: под часть переменной длины выделяется память, а принять остается ее всю:

        const auto bytes_left =
                ::ntohl(client.header.fixed.length) - sizeof(Type);
        client.header.variable.resize(bytes_left);
        client.header.variable_bytes_left = bytes_left;
    }

(@) После вычисления bytes_left добавьте проверку, что она не больше 250. При нарушении печатайте сообщение и выходите с прогрессом FAILED.

(До)прием части запроса переменной длины аналогичен (до)приему части фиксированной длины. По его окончании ничего вычислять не требуется, прием заголовка окончен.

    if (client.header.variable_bytes_left > 0) {
        const auto offset =
                client.header.variable.size() - client.header.variable_bytes_left;
        const auto result = receive_some(
                client.channel,
                &client.header.variable[offset],
                client.header.variable_bytes_left);

        client.header.variable_bytes_left -= result.bytes;
        return result.status;
    }
    return DONE;
}

Подготовка ответа на запрос загрузки файла

Перед функцией begin_file_transmission() две задачи:

  • открыть файл на чтение, сохранить его дескриптор в состоянии клиента;
  • сформировать заголовок ответа на запрос загрузки файла.

Поскольку std::ifstream не является простым типом, чтобы не усложнять структуру клиента, воспользуется классическим API для работы с файлами в C: типом FILE*, функцией fopen() и т. п. (документация). Функционально этот код так же открывает файл на чтение и определяет его размер, перемещая позицию чтения, как и код ЛР № 5.

void
begin_file_transmission(Client& client) {
    const std::string path{
        reinterpret_cast<const char*>(&client.header.variable[0]),
        client.header.variable.size()};

    FILE* file = ::fopen(path.c_str(), "rb");
    if (file == nullptr) {
        return begin_error_transmission(client, "file is inaccessible");
    }

    ::fseek(file, 0, SEEK_END);
    const auto size = ::ftell(file);
    ::fseek(file, 0, SEEK_SET);

    client.file.handle = file;

Заголовок ответа формируется в том же поле header структуры Client, что хранила заголовок запроса, который более не нужен. В состояниях, отличных от CLIENT_RECEIVE_REQUEST, поле header.fixed_bytes_left означает количество байт header.fixed, которое осталось отправить, а не принять.

    client.header.fixed.type = TYPE_GET;
    client.header.fixed.length = ::htonl(size + sizeof(Type));
    client.header.fixed_bytes_left = sizeof(Header);

    client.state = CLIENT_SEND_FILE;
}

Отправка заголовка ответа

Во-первых, если заголовок уже передан целиком, функции нечего делать:

Progress
send_header(Client& client) {
    if (client.header.fixed_bytes_left == 0) {
        return DONE;
    }

В противном случае выполняется попытка (до)отправить данные аналогично тому, как проводился их прием, но функцией send_some():

    const auto data =
            reinterpret_cast<const uint8_t*>(&client.header.fixed);
    const auto offset =
            sizeof(client.header.fixed) - client.header.fixed_bytes_left;
    const auto result = send_some(
            client.channel,
            &data[offset],
            client.header.fixed_bytes_left);
    client.header.fixed_bytes_left -= result.bytes;
    return result.status;
}

Отправка содержимого файла

Как и в блокирующем режиме, файл читается блоками. Если вместо блока запрошенного размера из файла удалось считать 0 байтов, это может означать или конец файла (проверяется функцией feof()) или ошибку. По достижении конца файла его следует закрыть, а состояние клиента сбросить — передача файла успешно окончена.

Progress
send_file(Client& client) {
    std::vector<uint8_t> buffer(4096);
    while (true) {
        const auto bytes_to_send =
                ::fread(&buffer[0], 1, buffer.size(), client.file.handle);
        if (bytes_to_send == 0) {
            if (::feof(client.file.handle)) {
                ::fclose(client.file.handle);
                reset_client(client);
                return DONE;
            }
            return FAILED;
        }

Отправка блока файла в неблокирующем режиме сложнее, чем в блокирующем, потому что send_some() не может гарантировать отправку всего блока. Необходимо проверять случай, когда отправлена лишь часть, и перемещать позицию чтения файла назад на число байтов, которые не удалось отправить, чтобы при следующем вызове send_file() считать и попытаться отправить их заново. Иначе некоторые байты файла, которые не удалось отправить сразу, будут потеряны — прочитаны из файла, но в сокет не записаны.

        const auto result =
                send_some(client.channel, &buffer[0], bytes_to_send);
        if (result.status == RUNNING) {
            if (result.bytes < bytes_to_send) {
                const auto offset = (long)bytes_to_send - (long)result.bytes;
                ::fseek(client.file.handle, offset, SEEK_CUR);
                return RUNNING;
            }
        }
    }
}

Формирование ответа с ошибкой

Для формирования ответа с сообщением об ошибка достаточно:

  • заполнить поле data структуры Client — скопировать сообщение в буфер и указать, что еще не передано ни одного байта из него;
  • заполнить заголовок, как при отправке файла;
  • изменить состояние клиента на CLIENT_SEND_DATA.
void
begin_error_transmission(Client& client, const std::string& message) {
    client.data.bytes.resize(message.size());
    ::memcpy(&client.data.bytes[0], &message[0], client.data.bytes.size());
    client.data.bytes_transmitted = 0;

    client.header.fixed.type = TYPE_ERROR;
    client.header.fixed.length = ::htonl(client.data.bytes.size() + sizeof(Type));
    client.header.fixed_bytes_left = sizeof(Header);

    client.state = CLIENT_SEND_DATA;
}

Отправка данных

Отправка данных заключается лишь в вызове send_some(), которую еще предстоит модернизировать.

Progress
send_data(Client& client) {
    const auto result = send_some(
            client.channel,
            &client.data.bytes[client.data.bytes_transmitted],
            client.data.bytes.size() - client.data.bytes_transmitted);

В случае, когда удалось (до)отправить данные до конца, необходимо сбросить состояние клиента:

    if (result.status == DONE) {
        reset_client(client);
        return DONE;
    }

Иначе требуется учесть количество данных, которое удалось отправить:

    client.data.bytes_transmitted += result.bytes;
    return result.status;
}

Формирование ответа со списком файлов

Поскольку в ЛР № 5 было решено формировать тело ответа целиком до отправки, можно воспользоваться тем же кодом:

void
begin_list_transmission(Client& client) {
    const auto files = list_files();
    if (files.empty()) {
        return begin_error_transmission(client, "unable to enumerate files");
    }

    std::vector<uint8_t> body;
    for (const auto& file : files) {
        const auto old_body_size = body.size();
        body.resize(body.size() + sizeof(uint8_t) + file.name.length());

        uint8_t* place = &body[old_body_size];

        *place = file.name.length();
        place++;

        std::memcpy(place, &file.name[0], file.name.length());
    }

Вместо немедленной отправки необходимо сформировать заголовок и заполнить состояние клиента:

    client.header.fixed.type = TYPE_LIST;
    client.header.fixed.length = ::htonl(body.size() + sizeof(Type));
    client.header.fixed_bytes_left = sizeof(Header);

    client.data.bytes = std::move(body);
    client.data.bytes_transmitted = 0;

    client.state = CLIENT_SEND_DATA;
}

Обратите внимание на перемещение тела из body в client.data.bytes:

    client.data.bytes = std::move(body);

Это более экономично, чем простое присваивание, при котором body копировалось бы в client.data.bytes; при перемещении же client.data.bytes приобретает данные body, которая их теряет. Тонкость состоит в том, что после вызова std::move() массив body становится пустым, у учетом этого, body.size() вызывается выше.

Прием и передача блоков данных в неблокирующем режиме

Перепишем функции receive_some() и send_some() для работы в неблокирующем режиме. По его сути они не могут гарантировать прием или отправку заданного количества байтов, поэтому вызывающему коду необходимо получать число байтов, которые были реально приняты или отправлены. Также ранее receive_some() не отличала отключение клиента от других ошибок, что было бы полезно.

Логика приема данных порциями, пока это удается, сохраняется в receive_some():

Result
receive_some(SOCKET channel, void* data, size_t size) {
    auto bytes = reinterpret_cast<char*>(data);
    size_t bytes_received = 0;
    while (bytes_received < size) {
        auto result = ::recv(
                channel, &bytes[bytes_received], size - bytes_received, 0);

Отличия возникают при обработке ошибок. Код WSAEWOULDBLOCK необходимо обрабатывать особым образом: возвращать успешный результат, присовокупив к нему количество принятых данных:

        if (result < 0) {
            const auto code = WSAGetLastError();
            if (code == WSAEWOULDBLOCK) {
                return {RUNNING, bytes_received};
            }

Все остальные коды действительно являются признаками ошибок:

            std::printf("error: recv()=%d\n", code);
            return {FAILED, bytes_received};
        }

В случае штатного закрытия соединения клиентом recv() возвращает нуль:

        else if (result == 0) {
            return {FINISHED, bytes_received};
        }

        bytes_received += result;
    }

Если выход из цикла произошел по условию в его заголовке, весь необходимый объем данных удалось успешно принять:

    return {DONE, bytes_received};
}

(@) Адаптируйте аналогичным образом функцию send_some(). Отличие в том, что для send() результат 0 не является особенным (она возвращает 0 только при попытке отправить 0 байтов, чего не делается).

(@) Добейтесь компиляции и корректной работы асинхронного сервера.

Замечание об альтернативных API и библиотеках

Сложность получившейся программы по сравнению в вариантом на основе блокирующих сокетов во много связана с тем, что смешана логика обработки запросов (принять запрос — отправить заголовок ответа — отправить ответ) и логика поддержка асинхронного режима. Их возможно разделить: работа с сокетами является универсальной частью, которая может быть сделана библиотечным кодом, а логика обработки запросов специфична для программы. Библиотека может предоставлять примитивы наподобие: «поставить блок данных в очередь на отправки», «принять N байтов» или «оповестить программу (вызвать заданную функцию), когда передача завершится». Более того, библиотека может скрывать сам главный цикл с вызовом мультиплексора. Тогда программа представляет собой набор функций, которые вызываются при различных событиях с сокетом (callback-функции); сами они, в свою очередь, вызывают функции библиотеки, которые не блокируются, а делают работу «в фоне» (на самом деле, в основном цикле между вызовами callbacks).

В настоящее время (2018) наиболее популярны библиотеки libuv (С) и Boost.Asio (C++). Первая доступна не только из C.

Помимо устаревшей select() стандартной poll() (WSAPoll()) существуют и специфичные для платформ API для асинхронного ввода-вывода. В случае Windows это IO completion ports (IOCP), построенный не на основе главного цикла, а на callback-функциях. В случае Linux это epoll() — API, похожий на poll(), но более производительный и гибкий.

На практике асинхронные приложения пишут либо с использованием библиотек, либо, если не хватает производительности, при помощи специфичных API. Тем не менее, грамотный специалист должен понимать механизмы, которые использует библиотека, достоинства и недостатки режимов работы с сокетами.

Контрольные вопросы и задания

К защите необходимо знать особенности работы с сокетами из нескольких потоков, отличия и особенности неблокирующего режима работы сокетов.

Тем, кто делал к ЛР № 5 контрольные задания 1, 3, 4, 5, 7, 8, 9 — перенести решение в неблокирующий сервер.

#. Добавьте мультиплексору таймаут — 1 секунду. Проверяйте, на была ли введена команда q — завершить цикл обслуживания запросов, или s — распечатать статистику, сколько запросов сделал каждый клиент. (Проверку можно делать [std::cin.peek()][cppref/cin/peek].)

[cppref/cin/peek]: http://en.cppreference.com/w/cpp/io/basic_istream/peek

#. Для каждого клиента запоминайте, когда он последний раз делал запрос (подойдет [time()][cppref/time]). Добавьте мультиплексору таймаут — 5 секунд. Отключайте клиентов, которые не делали запросов более 20 секунд.

[cppref/time]: http://en.cppreference.com/w/cpp/chrono/c/time

#. Добавьте подсчет и вывод средней скорости передачи для каждого клиента.