Пишем свой BitTorrent клиент на Go

Post Thumbnail

Перевод "Building a BitTorrent client from the ground up in Go"

Что происходит с момента визита на торрент трекер и появлением mp3-файла на вашем компьютере?

В этом посте мы реализуем BitTorrent протокол на уровне, достаточном для скачивания образа Debian. Можете сразу посмотреть исходный код и пропустить все подробные объяснения. Можете начинать с исходного кода и потом переходить к подробным объяснениям.

BitTorrent - это протокол для скачивания файлов и распространения их через интернет. В отличие от традиционного клиент-серверного взаимодействия (например, просмотра фильмов на ВК Видео или загрузки интернет-страничек), участники BitTorrent сети, которые называются пирами (peers), скачивают части файлов друг у друга. Такое взаимодействие называется peer-to-peer протоколом. Мы разберемся, как он работает, и напишем свой собственный клиент, который сможет находить пиров и обмениваться с ними данными.

Последние 20 лет протокол эволюционировал. Различные организации и разработчики расширяли его и добавляли новые функции для шифрования, частных торрентов и новых способов поиска пиров. Мы реализуем оригинальный протокол 2001 года, чтобы наш учебный проект оставался маленьким и реализуемым за одни выходные.

Для экспериментов будем использовать Debian ISO как подопытного кролика. Это большой файл, но не огромный: 350 МБ.

Поиск пиров

Итак, нам нужно скачать файл с помощью BitTorrent. Но это peer-to-peer протокол, и пока мы понятия не имеем, где найти пиров для скачивания. Похоже на переезд в новый город и поиск новых друзей: вы можете познакомиться в баре поблизости или на каком-нибудь митапе. Эта идея лежит в основе централизованных трекеров, которые позволяют пирам знакомиться друг с другом. Как правило, это обычные серверы, работающие через HTTP. Например, образ Debian есть тут: bttracker.debian.org:6969.

Такие централизованные серверы подвергаются нападкам со стороны правообладателей. Возможно, вы читали про трекеры TorrentSpy, Popcorn Time и KickassTorrents, которые были закрыты за распространение нелегального контента. Сегодня уже существуют методы поиска пиров без посредников: одноранговый распределенный поиск. Мы не будем реализовывать эти алгоритмы, но если вам интересно - почитайте про DHT, PEX и магнитные ссылки.

Разбор .torrent файла

В .torrent файле содержится информация о трекере и о самом файле, который нужно скачать. Для начала скачивания этого достаточно. Debian’овский .torrent файл выглядит так:

d8:announce41:http://bttracker.debian.org:6969/announce7:comment35:"Debian CD from cdimage.debian.org"13:creation datei1573903810e9:httpseedsl145:https://cdimage.debian.org/cdimage/release/10.2.0//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-10.2.0-amd64-netinst.iso145:https://cdimage.debian.org/cdimage/archive/10.2.0//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-10.2.0-amd64-netinst.isoe4:infod6:lengthi351272960e4:name31:debian-10.2.0-amd64-netinst.iso12:piece lengthi262144e6:pieces26800:(binary blob of the hashes of each piece)ee

Данные в .torrent файле закодированы в формате Bencode, и нам нужно его декодировать.

В bencode такие же типы, как в JSON: строки, числа, списки и словари. Данные в формате bencode, в отличие от JSON, не особо человекочитаемые. Но такой формат очень удобен для бинарных данных и потокового чтения. Строки начинаются с префикса, в котором указана длина, и выглядят так: 4:spam. Числа начинаются и заканчиваются маркерами, например 7 будет выглядеть как i7e. Списки и словари очень похожи: l4:spami7ee это ['spam', 7], а d4:spami7ee означает {spam: 7}.

Если отформатировать наш .torrent файл, то все становится намного понятней:

d
  8:announce
    41:http://bttracker.debian.org:6969/announce
  7:comment
    35:"Debian CD from cdimage.debian.org"
  13:creation date
    i1573903810e
  4:info
    d
      6:length
        i351272960e
      4:name
        31:debian-10.2.0-amd64-netinst.iso
      12:piece length
        i262144e
      6:pieces
        26800: (binary blob of the hashes of each piece)
    e
e

Из этого файла можно узнать URL трекера, имя и размер файла, дату создания (в Unix-формате), размер частей (piece length), на которые разбит нужный нам файл. Кроме этого, в файле есть большой кусок бинарных данных, в котором содержатся SHA-1 хэши всех частей (pieces). Размер частей для разных торрентов может быть разный, но, как правило, в пределах 256 КБ и 1 МБ. Большой файл может состоять из тысяч частей. Нам нужно скачать каждую часть с наших пиров, проверить хэши по нашему торрент-файлу, собрать эти части вместе - и готово!

Такой механизм позволяет проверить отдельно каждую часть файла и защититься от случайного и намеренного повреждения файла. Если злоумышленник не взломал SHA-1, то мы получим тот файл, который ожидаем.

Было бы прикольно написать свой bencode-парсер. Но хочется сконцентрироваться на важных вещах, поэтому будем использовать готовый парсер github.com/jackpal/bencode-go. А если вы хотите получше разобраться с bencode-форматом - посмотрите парсер от Fredrik Lundh в 50 строчек кода.

import (
    "github.com/jackpal/bencode-go"
)

type bencodeInfo struct {
    Pieces      string `bencode:"pieces"`
    PieceLength int    `bencode:"piece length"`
    Length      int    `bencode:"length"`
    Name        string `bencode:"name"`
}

type bencodeTorrent struct {
    Announce string      `bencode:"announce"`
    Info     bencodeInfo `bencode:"info"`
}

// Open parses a torrent file
func Open(r io.Reader) (*bencodeTorrent, error) {
    bto := bencodeTorrent{}
    err := bencode.Unmarshal(r, &bto)
    if err != nil {
        return nil, err
    }
    return &bto, nil
}

Я стараюсь оставлять структуры максимально плоскими и отделять структуры сериализации от структур приложения. Поэтому я сделал экспортируемой другую, более плоскую структуру TorrentFile и добавил несколько методов для преобразования между ними.

Обратите внимание, я разбил pieces (во внутренней структуре это обычная строка) на список хэшей по 20 байт. Так с ними будет проще работать. И вычислил общий SHA-1 хэш всего bencode-закодированного словаря info (в котором содержится имя, размер и хэши всех частей). Этот общий хэш будет работать как идентификатор и понадобится для взаимодействия с трекером и пирами. Об этом чуть позже.

type TorrentFile struct {
    Announce    string
    InfoHash    [20]byte
    PieceHashes [][20]byte
    PieceLength int
    Length      int
    Name        string
}

func (bto *bencodeTorrent) toTorrentFile() (*TorrentFile, error) {
    // ...
}

Получаем пиров через трекер

Теперь у нас есть информация о файле и трекере. Давайте сделаем запрос на сервер, чтобы объявить (announce) о нашем присутствии как пира и получить список других пиров. Для этого нужно сделать GET-запрос на announce URL трекера с нужными параметрами:

func (t *TorrentFile) buildTrackerURL(peerID [20]byte, port uint16) (string, error) {
    base, err := url.Parse(t.Announce)
    if err != nil {
        return "", err
    }
    params := url.Values{
        "info_hash":  []string{string(t.InfoHash[:])},
        "peer_id":    []string{string(peerID[:])},
        "port":       []string{strconv.Itoa(int(Port))},
        "uploaded":   []string{"0"},
        "downloaded": []string{"0"},
        "compact":    []string{"1"},
        "left":       []string{strconv.Itoa(t.Length)},
    }
    base.RawQuery = params.Encode()
    return base.String(), nil
}

Что тут важно:

  • info_hash - идентифицирует файл, который мы хотим скачать. Это хэш, который мы вычислили раньше по словарю info. Трекеру нужно знать этот хэш, чтобы показать нам правильных пиров.
  • peer_id - 20-байтное имя, которое идентифицирует нас на трекере и для других пиров. Используем случайно сгенерированную последовательность. Реальные BitTorrent-клиенты используют идентификаторы вида -TR2940-k8hj0wgej6ch, в котором закодированы используемая программа для скачивания и ее версия. В нашем примере TR2940 - это клиент Transmission версии 2.94.

Разбираем ответ трекера

В ответе от сервера приходят bencode-закодированные данные.

d
  8:interval
    i900e
  5:peers
    252:(another long binary blob)
e

Поле interval указывает, как часто мы можем делать запрос на сервер для обновления списка пиров. Это значение в секундах (900 секунд = 15 минут).

Поле peers - это большой кусок бинарных данных, в котором содержатся IP-адреса каждого пира. Его нужно разбить на группы по 6 байтов. Первые 4 байта - это IP-адрес узла, последние 2 байта - порт (uint16 в big-endian кодировке). Big-endian (или сетевой порядок) означает, что можно интерпретировать целое число как группу байтов, просто составляя их по порядку слева направо. Например, байты 0x1A, 0xE1 будут кодироваться в порядке 0x1AE1 или 6881 в десятичном формате.

// Peer encodes connection information for a peer
type Peer struct {
    IP   net.IP
    Port uint16
}

// Unmarshal parses peer IP addresses and ports from a buffer
func Unmarshal(peersBin []byte) ([]Peer, error) {
    const peerSize = 6 // 4 for IP, 2 for port
    numPeers := len(peersBin) / peerSize
    if len(peersBin)%peerSize != 0 {
        err := fmt.Errorf("Received malformed peers")
        return nil, err
    }
    peers := make([]Peer, numPeers)
    for i := 0; i < numPeers; i++ {
        offset := i * peerSize
        peers[i].IP = net.IP(peersBin[offset : offset+4])
        peers[i].Port = binary.BigEndian.Uint16(peersBin[offset+4 : offset+6])
    }
    return peers, nil
}

Скачивание с пиров

Теперь у нас есть список пиров. Настало время соединиться с ними и начать скачивать части файла. Этот процесс можно разбить на несколько этапов. Для каждого пира нужно:

  1. Начать TCP-соединение с пиром. Это как начать телефонный разговор.
  2. Выполнить двусторонний BitTorrent-хендшейк. "Hello?" "Hello."
  3. Обмен сообщениями для скачивания частей файла. "Мне нужна часть №231, пожалуйста."

Начинаем TCP-соединение

conn, err := net.DialTimeout("tcp", peer.String(), 3*time.Second)
if err != nil {
    return nil, err
}

Тут используется таймаут для соединения, чтобы не зависать долго на попытках подключения к пирам.

Выполняем хендшейк (рукопожатие)

Мы подключились к пиру, но теперь нужно выполнить рукопожатие, чтобы убедиться:

  • Пир может взаимодействовать по BitTorrent-протоколу.
  • Может понимать наши сообщения и отвечать на них.
  • Знает про файл, который мы хотим скачать.

Мой старик-отец как-то сказал мне, что секрет хорошего рукопожатия в его крепости и зрительном контакте. Для хорошего BitTorrent-рукопожатия тоже нужно знать несколько секретов:

  1. Длина идентификатора протокола всегда 19 (0x13 в hex).
  2. Сам идентификатор, который называется pstr, всегда BitTorrent protocol.
  3. Восемь зарезервированных байтов, которые используются для указания расширенных возможностей. В нашем случае - все выставлены в 0.
  4. Хэш для идентификации файлов (infohash, инфохэш), который мы вычислили раньше.
  5. Идентификатор нашего пира.

Собираем все вместе. Наш хендшейк выглядит так:

\x13BitTorrent protocol\x00\x00\x00\x00\x00\x00\x00\x00\x86\xd4\xc8\x00\x24\xa4\x69\xbe\x4c\x50\xbc\x5a\x10\x2c\xf7\x17\x80\x31\x00\x74-TR2940-k8hj0wgej6ch

После отправки хендшейка в ответ ожидаем получить аналогичную строку. Инфохэш, который мы получили в ответе, должен совпадать с нашим - так мы будем знать, что говорим об одном и том же файле. Если все прошло хорошо, то переходим к следующему этапу. Если нет, то можем повторить, а если ошибки повторяются, то просто разрываем соединение.

Давайте реализуем структуру для хендшейка и несколько дополнительных методов для сериализации и чтения.

// A Handshake is a special message that a peer uses to identify itself
type Handshake struct {
    Pstr     string
    InfoHash [20]byte
    PeerID   [20]byte
}

// Serialize serializes the handshake to a buffer
func (h *Handshake) Serialize() []byte {
    pstrlen := len(h.Pstr)
    bufLen := 49 + pstrlen
    buf := make([]byte, bufLen)
    buf[0] = byte(pstrlen)
    copy(buf[1:], h.Pstr)
    // Leave 8 reserved bytes
    copy(buf[1+pstrlen+8:], h.InfoHash[:])
    copy(buf[1+pstrlen+8+20:], h.PeerID[:])
    return buf
}

// Read parses a handshake from a stream
func Read(r io.Reader) (*Handshake, error) {
    // Do Serialize(), but backwards
    // ...
}

Отправка и получение сообщений

Как только мы выполнили рукопожатие, можем посылать и получать сообщения. Ну не совсем. Пока пир не согласится принимать сообщения, нет смысла ему что-то отправлять. Сейчас мы считаемся "задушенными" (choked) для других пиров. Они должны отправить нам сообщение unchoke, и только после этого мы сможем отправлять им сообщения и запрашивать у них данные. По умолчанию считаем, что все другие пиры нас "душат".

Когда нам присылают сообщение unchoke, можем начинать отправлять запросы за частями файла и ждать в ответ сообщения с этими частями.

Разбор сообщений

В сообщении содержится длина, идентификатор и полезная нагрузка. Это выглядит так:

Сообщение начинается с указания длины. Это 32-битное целое число в виде 4 байтов в big-endian кодировке. Следующий байт - ID (идентификатор), который означает, какой тип сообщения мы получили. Например, 2 означает тип сообщения "interested". Последняя часть сообщения содержит полезную нагрузку.

type messageID uint8

const (
    MsgChoke         messageID = 0
    MsgUnchoke       messageID = 1
    MsgInterested    messageID = 2
    MsgNotInterested messageID = 3
    MsgHave          messageID = 4
    MsgBitfield      messageID = 5
    MsgRequest       messageID = 6
    MsgPiece         messageID = 7
    MsgCancel        messageID = 8
)

// Message stores ID and payload of a message
type Message struct {
    ID      messageID
    Payload []byte
}

// Serialize serializes a message into a buffer of the form
// <length prefix><message ID><payload>
// Interprets `nil` as a keep-alive message
func (m *Message) Serialize() []byte {
    if m == nil {
        return make([]byte, 4)
    }
    length := uint32(len(m.Payload) + 1) // +1 for id
    buf := make([]byte, 4+length)
    binary.BigEndian.PutUint32(buf[0:4], length)
    buf[4] = byte(m.ID)
    copy(buf[5:], m.Payload)
    return buf
}

Вычитываем сообщение из потока и разбираем его, следуя формату. Сначала читаем 4 первых байта и интерпретируем их как uint32. Это длина нашего сообщения, которую используем, чтобы прочитать все сообщение. Получаем ID (идентификатор) - первый байт и payload (полезную нагрузку) - остаток сообщения.

// Read parses a message from a stream. Returns `nil` on keep-alive message
func Read(r io.Reader) (*Message, error) {
    lengthBuf := make([]byte, 4)
    _, err := io.ReadFull(r, lengthBuf)
    if err != nil {
        return nil, err
    }
    length := binary.BigEndian.Uint32(lengthBuf)

    // keep-alive message
    if length == 0 {
        return nil, nil
    }

    messageBuf := make([]byte, length)
    _, err = io.ReadFull(r, messageBuf)
    if err != nil {
        return nil, err
    }

    m := Message{
        ID:      messageID(messageBuf[0]),
        Payload: messageBuf[1:],
    }

    return &m, nil
}

Bitfields

Самый интересный тип сообщения - bitfield. Это структура, которую пиры используют для эффективного кодирования фрагментов, которые они могут нам отправить. Bitfield работает как массив битов. Биты, выставленные в 1, указывают, какие части файлов есть у пира. Это похоже на карту локальности кофейни. Начинаем с пустой карты (все биты 0), заканчиваем, когда вся карта проштампована (все биты в 1).

Работа с битами экономичнее, чем работа с байтами. Такие структуры намного компактней. Мы можем закодировать информацию о 8 частях в одном байте - это размер типа bool. Но с такими структурами не так удобно работать. Самый маленький размер для адресации - байт. Поэтому для работы с битами нужно выполнять дополнительные манипуляции.

// A Bitfield represents the pieces that a peer has
type Bitfield []byte

// HasPiece tells if a bitfield has a particular index set
func (bf Bitfield) HasPiece(index int) bool {
    byteIndex := index / 8
    offset := index % 8
    return bf[byteIndex]>>(7-offset)&1 != 0
}

// SetPiece sets a bit in the bitfield
func (bf Bitfield) SetPiece(index int) {
    byteIndex := index / 8
    offset := index % 8
    bf[byteIndex] |= 1 << (7 - offset)
}

Собираем все вместе

Теперь у нас есть все, чтобы начать скачивать файл: у нас есть список пиров с трекера, мы можем общаться с ними по TCP, можем провести рукопожатие, отправлять и получать сообщения. Но нужно учесть, что придется работать с несколькими пирами конкурентно и хранить состояния отдельно для каждого пира, пока мы с ними взаимодействуем. Это непростые задачи.

Управление конкурентностью: каналы и очереди

В Go принято разделять память через общение.

Настроим два канала для синхронизации наших воркеров: один для распараллеливания работы между пирами, второй для сбора скачанных частей. Когда загруженные фрагменты попадают в канал с результатами, мы копируем их в буфер для сборки полного файла.

// Init queues for workers to retrieve work and send results
workQueue := make(chan *pieceWork, len(t.PieceHashes))
results := make(chan *pieceResult)
for index, hash := range t.PieceHashes {
    length := t.calculatePieceSize(index)
    workQueue <- &pieceWork{index, hash, length}
}

// Start workers
for _, peer := range t.Peers {
    go t.startDownloadWorker(peer, workQueue, results)
}

// Collect results into a buffer until full
buf := make([]byte, t.Length)
donePieces := 0
for donePieces < len(t.PieceHashes) {
    res := <-results
    begin, end := t.calculateBoundsForPiece(res.index)
    copy(buf[begin:end], res.buf)
    donePieces++
}
close(workQueue)

Запускаем воркеры в горутинах для каждого пира. В воркерах выполняется соединение, рукопожатие, а потом воркер получает задачи из workQueue, в которых указаны фрагменты для скачивания, пытается загрузить нужные фрагменты и скидывает их в канал results.

func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) {
    c, err := client.New(peer, t.PeerID, t.InfoHash)
    if err != nil {
        log.Printf("Could not handshake with %s. Disconnecting\n", peer.IP)
        return
    }
    defer c.Conn.Close()
    log.Printf("Completed handshake with %s\n", peer.IP)

    c.SendUnchoke()
    c.SendInterested()

    for pw := range workQueue {
        if !c.Bitfield.HasPiece(pw.index) {
            workQueue <- pw // Put piece back on the queue
            continue
        }

        // Download the piece
        buf, err := attemptDownloadPiece(c, pw)
        if err != nil {
            log.Println("Exiting", err)
            workQueue <- pw // Put piece back on the queue
            return
        }

        err = checkIntegrity(pw, buf)
        if err != nil {
            log.Printf("Piece #%d failed integrity check\n", pw.index)
            workQueue <- pw // Put piece back on the queue
            continue
        }

        c.SendHave(pw.index)
        results <- &pieceResult{pw.index, buf}
    }
}

Управление состояниями

Мы будем хранить состояние каждого пира и изменять его в зависимости от полученных сообщений. Для этого сделаем отдельную структуру, в которой будут храниться данные о том, сколько мы загрузили с этого пира, сколько мы запрашивали и "задушены" мы или нет. Для большей гибкости эту логику можно реализовать в виде конечного автомата. Но пока нам достаточно обычного switch.

type pieceProgress struct {
    index      int
    client     *client.Client
    buf        []byte
    downloaded int
    requested  int
    backlog    int
}

func (state *pieceProgress) readMessage() error {
    msg, err := state.client.Read() // this call blocks
    switch msg.ID {
    case message.MsgUnchoke:
        state.client.Choked = false
    case message.MsgChoke:
        state.client.Choked = true
    case message.MsgHave:
        index, err := message.ParseHave(msg)
        state.client.Bitfield.SetPiece(index)
    case message.MsgPiece:
        n, err := message.ParsePiece(state.index, state.buf, msg)
        state.downloaded += n
        state.backlog--
    }
    return nil
}

Время отправлять запросы!

Файлы, фрагменты и хэши фрагментов - это еще не вся история. Можно пойти дальше и разбить фрагменты на блоки. Блоки - это части фрагментов, и мы можем идентифицировать их по индексу фрагмента, в который он входит, смещению внутри фрагмента и длине блока. Когда мы делаем запросы к пирам, фактически мы запрашиваем блоки. Обычно блок имеет длину сообщения в 16 КБ. Это значит, для фрагмента в 256 КБ может понадобиться 16 запросов.

Пир должен разрывать соединение, если получает запрос на блок размером больше 16 КБ. Но, судя по моему опыту, большинство клиентов прекрасно обрабатывают запросы на блоки до 128 КБ. Тем не менее, я получил не очень большой прирост скорости при использовании большого размера блока, поэтому лучше придерживаться спецификации.

Пайплайн

Сетевые запросы довольно дорого стоят. И запросы блоков один за другим не увеличивают производительность нашей программы. Поэтому важно распределять запросы так, чтобы в полете постоянно было некоторое количество незавершенных запросов. Это может на порядок повысить пропускную способность нашего соединения.

Классические BitTorrent-клиенты держат очередь из 5 пайплайновых запросов. Мы тоже так поступим. Поэкспериментировав с этим значением, я обнаружил, что можно в два раза увеличить скорость загрузки. Современные клиенты поддерживают адаптивный размер очереди для лучшей утилизации сети. Сделаем это настраиваемым параметром и оставим это место для будущей оптимизации.

// MaxBlockSize is the largest number of bytes a request can ask for
const MaxBlockSize = 16384

// MaxBacklog is the number of unfulfilled requests a client can have in its pipeline
const MaxBacklog = 5

func attemptDownloadPiece(c *client.Client, pw *pieceWork) ([]byte, error) {
    state := pieceProgress{
        index:  pw.index,
        client: c,
        buf:    make([]byte, pw.length),
    }

    // Setting a deadline helps get unresponsive peers unstuck.
    // 30 seconds is more than enough time to download a 262 KB piece
    c.Conn.SetDeadline(time.Now().Add(30 * time.Second))
    defer c.Conn.SetDeadline(time.Time{}) // Disable the deadline

    for state.downloaded < pw.length {
        // If unchoked, send requests until we have enough unfulfilled requests
        if !state.client.Choked {
            for state.backlog < MaxBacklog && state.requested < pw.length {
                blockSize := MaxBlockSize
                // Last block might be shorter than the typical block
                if pw.length-state.requested < blockSize {
                    blockSize = pw.length - state.requested
                }

                err := c.SendRequest(pw.index, state.requested, blockSize)
                if err != nil {
                    return nil, err
                }
                state.backlog++
                state.requested += blockSize
            }
        }

        err := state.readMessage()
        if err != nil {
            return nil, err
        }
    }

    return state.buf, nil
}

main.go

Тут уже все просто. Мы почти закончили.

package main

import (
    "log"
    "os"

    "github.com/veggiedefender/torrent-client/torrentfile"
)

func main() {
    inPath := os.Args[1]
    outPath := os.Args[2]

    tf, err := torrentfile.Open(inPath)
    if err != nil {
        log.Fatal(err)
    }

    err = tf.DownloadToFile(outPath)
    if err != nil {
        log.Fatal(err)
    }
}

Куда дальше

Для краткости я включил только несколько важных фрагментов кода. Я опустил весь код для синтаксического анализа, тестов и другие скучные части. Полный код можно посмотреть на GitHub.