Перевод "Let's build a distributed Postgres proof of concept"
Давайте посмотрим, из чего состоит CockroachDB под капотом. Загляните в её go.mod и вы увидите множество зависимостей, каждая из которых решает свою важную задачу: реализация протокола PostgreSQL wire, уровень хранения данных, реализация Raft для распределённого консенсуса. А ещё в go.mod этого нет, но всеравно используется сторонняя разработка - грамматика PostgreSQL.
Если довести до абсурда, то CockroachDB — это просто "клей" для этих библиотек. Вооружившись таким упрощённым подходом, давайте попробуем сами создать прототип распределённой PostgreSQL. Мы используем всего четыре основные внешние библиотеки: для разбора SQL, для работы с сетевым протоколом Postgres, для реализации Raft и для хранения метаданных таблиц и самих строк.
Если вы хотите по-настоящему разобраться во внутреннем устройстве CockroachDB, рекомендую следить за инженерным блогом CockroachDB
К концу этой статьи, написав около 600 строк кода, мы получим распределенную "реализацию PostgreSQL", которая будет принимать записи (CREATE TABLE, INSERT) на лидере и обрабатывать чтения (SELECT) на любом узле. При этом все узлы будут содержать одни и те же данные.
Пример взаимодействия с мастером:
$ psql -h localhost -p 6000
psql (13.4, server 0.0.0)
Type "help" for help.
phil=> create table x (age int, name text);
CREATE ok
phil=> insert into x values(14, 'garry'), (20, 'ted');
could not interpret result from server: INSERT ok
INSERT ok
phil=> select name, age from x;
name | age
---------+-----
"garry" | 14
"ted" | 20
(2 rows)
И пример взаимодействия с репликой:
$ psql -h 127.0.0.1 -p 6001
psql (13.4, server 0.0.0)
Type "help" for help.
phil=> select age, name from x;
age | name
-----+---------
20 | "ted"
14 | "garry"
(2 rows)
Весь код из этой статьи доступен на GitHub в репозитории, который автор с любовью назвал WaterbugDB.
План наступления
Под влиянием доклада Филипа О'Тула о rqlite на Hacker Nights мы поместим на фронт сервер, реализующий протокол PostgreSQL wire. Когда он получает запросы, то на SELECT отвечает сразу и отдает все что нужно. А если это CREATE TABLE или INSERT, то строка запроса целиком отправляется в Raft кластер. Каждый инстанс в кластере Raft, реализует соответствующие функции для обработки сообщений Raft. В нашем случае сообщения будут только двух типов: создать таблицу или вставить данные.
Таким образом, каждый работающий инстанс будет запускать у себя сервер протокола PostgreSQL wire, сервер Raft и HTTP-сервер. HTTP нужен для правильной работы инстанса в кластере Raft.
У каждого работающего инстанса будет своя собственная директория для хранения данных.
Raft
Предполагаю что есть, есть разница между Raft'ом как научной работой и Raft'ом как конкретными реализациями. Когда в остальной части этого поста я буду ссылаться на Raft, я буду иметь в виду именно реализацию. У нас тут практика, а не расуждения.
В CockroachDB используется реализация Raft от etcd, я не сразу это понял, когда начинал этот проект. Я по привычке использовал реализацию Raft от Hashicorp и не думаю чтот тут будет какая-то проблема.
Raft позволяет нам надежно синхронизировать несколько узлов с помощью журнала сообщений. Каждый узел в кластере Raft должен реализовать конечный автомат (FSM) с тремя операциями: Apply (применить), Snapshot (создать снимок) и Restore (восстановить). В нашем конечном автомате будет сразу встроенный движок Postgres, который мы разработаем позже, и мы сразу сможем работать с SQL запросами.
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"path"
"strings"
"time"
"github.com/google/uuid"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/jackc/pgproto3/v2"
pgquery "github.com/pganalyze/pg_query_go/v2"
bolt "go.etcd.io/bbolt"
)
type pgFsm struct {
pe *pgEngine
}
func (pf *pgFsm) Apply(log *raft.Log) any {
switch log.Type {
case raft.LogCommand:
query := string(log.Data)
ast, err := pgquery.Parse(query)
if err != nil {
panic(fmt.Errorf("Could not parse payload: %s", err))
}
err = pf.pe.execute(ast)
if err != nil {
panic(err)
}
default:
panic(fmt.Errorf("Unknown raft log type: %#v", log.Type))
}
return nil
}
Насколько я понимаю, операция создания снимка Snapshot позволяет Raft'у усекать журналы. Она используется в сочетании с восстановлением Restore. При запуске, если существует снимок, вызывается restore, чтобы вы могли загрузить этот снимок. Затем все последующие записи журнала, которые еще не были включены в снимок, воспроизводятся заново через операцию Apply.
Чтобы упростить нашу учебную реализацию, мы просто будем отклонять все запросы на создание снимков, так что Restore никогда не будет вызван. Все журналы будут воспроизводиться заново при каждом запуске через Apply. Это, конечно, неэффективно, но позволяет написать код проще.
Когда мы будем реализовыват логику запуска сервиса, нам нужно будет удалять базу данных, чтобы эти вызовы Apply выполнялись с чистого листа.
type snapshotNoop struct{}
func (sn snapshotNoop) Persist(sink raft.SnapshotSink) error {
return sink.Cancel()
}
func (sn snapshotNoop) Release() {}
func (pf *pgFsm) Snapshot() (raft.FSMSnapshot, error) {
return snapshotNoop{}, nil
}
func (pf *pgFsm) Restore(rc io.ReadCloser) error {
return fmt.Errorf("Nothing to restore")
}
Каждый каждый инстанс при старте будет запускать этот код и изначально становиться лидером кластере. Инстанс в момент старта ничего не знает про другие инстансы. Поэтому нам нужно добавить HTTP-сервер, который позволит инстансам связываться с друг с другом, чтобы инстансы могли договорится кто остается лидером, а кто переходит в режим ведомого. Именно через HTTP-эндпоинт в HTTP-сервере мы соберем все инстансы в один кластер с одним лидером
На самом деле, это все что нам нужно знать про Raft на таком уровне. Современные реализацию забирают на себя почти всю сложную работу и нам нужно будет сделать минимум.
Итак, давайте разработаем этот HTTP-сервер и ендпоинт для перехода в режим ведомого.
HTTP ендпоинт
Наш HTTP-сервер будет иметь только один ендпоинт, который указывает инстансу (а) связаться с другим инстансом (б), чтобы этот самый инстанс (б) присоединился к кластеру инстанса (а).
HTTP-серверу будет нужен экземпляр Raft. С его помощью инстанс (а) сможет инициировать присоединение других инстансов. А для того, чтобы Raft знал, как связаться с инстансом (б), нам нужно сообщить ему уникальный идентификатор и порт инстанса (б). Мы сами присваиваем идентивикаторы при запуске инстансов, так что тут не должно быть проблем.
type httpServer struct {
r *raft.Raft
}
func (hs httpServer) addFollowerHandler(w http.ResponseWriter, r *http.Request) {
followerId := r.URL.Query().Get("id")
followerAddr := r.URL.Query().Get("addr")
if hs.r.State() != raft.Leader {
json.NewEncoder(w).Encode(struct {
Error string `json:"error"`
}{
"Not the leader",
})
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
err := hs.r.AddVoter(raft.ServerID(followerId), raft.ServerAddress(followerAddr), 0, 0).Error()
if err != nil {
log.Printf("Failed to add follower: %s", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusOK)
}
В принципе. это и есть все что нужно сделать с Raft в нашем проекте
Движок запросов
Движок запросов представляет собой обёртку над слоем хранения данных. А для храенния данных мы будем использовать bbolt.
Изначально я реализовал это с использованием pebble от Cockroach, но у pebble есть транзитивная зависимость от библиотеки на C, имена функций которой конфликтуют с именами функций в библиотеке на C, которую оборачивает pg_query_go.
type pgEngine struct {
db *bolt.DB
bucketName []byte
}
func newPgEngine(db *bolt.DB) *pgEngine {
return &pgEngine{db, []byte("data")}
}
bbolt организует данные в бакеты. Бакеты могли бы быть естественным способом хранения строк таблиц (один бакет на таблицу), но для простоты реализации мы поместим все метаданные таблиц и данные строк в один бакет
data.
Точка входа, которую мы вызвали в реализации метода Apply для нашей FSM выше, называлась execute. Она принимает разобранный список выражений statements. Мы пройдемся по выражениям в цикле, определим тип каждого выражения и вызовем соответствующий вспомогательный метод для каждого типа.
func (pe *pgEngine) execute(tree *pgquery.ParseResult) error {
for _, stmt := range tree.GetStmts() {
n := stmt.GetStmt()
if c := n.GetCreateStmt(); c != nil {
return pe.executeCreate(c)
}
if c := n.GetInsertStmt(); c != nil {
return pe.executeInsert(c)
}
if c := n.GetSelectStmt(); c != nil {
_, err := pe.executeSelect(c)
return err
}
return fmt.Errorf("Unknown statement type: %s", stmt)
}
return nil
}
Документация pg_query_go не очень полезна. Мне пришлось создать отдельную программу для изучения AST, чтобы было проще разобраться с этим парсером.
Давайте начнем разбираться с созданием табличек
Создание таблицы
Когда мы создаем табличку, нам нуэно сохранить некоторую метаинформацию.
type tableDefinition struct {
Name string
ColumnNames []string
ColumnTypes []string
}
Сначала мы извлекаем эти метаданные из AST.
func (pe *pgEngine) executeCreate(stmt *pgquery.CreateStmt) error {
tbl := tableDefinition{}
tbl.Name = stmt.Relation.Relname
for _, c := range stmt.TableElts {
cd := c.GetColumnDef()
tbl.ColumnNames = append(tbl.ColumnNames, cd.Colname)
// Names is namespaced. So `INT` is pg_catalog.int4. `BIGINT` is pg_catalog.int8.
var columnType string
for _, n := range cd.TypeName.Names {
if columnType != "" {
columnType += "."
}
columnType += n.GetString_().Str
}
tbl.ColumnTypes = append(tbl.ColumnTypes, columnType)
}
// ...
}
Теперь нам нужно сохранить это в слое хранения данных. Самый простой и примитивный способ сделать это - сериализовать метаданные в JSON и сохранить их с ключом: tables_${tableName}.
func (pe *pgEngine) executeCreate(stmt *pgquery.CreateStmt) error {
// ...
tableBytes, err := json.Marshal(tbl)
if err != nil {
return fmt.Errorf("Could not marshal table: %s", err)
}
err = pe.db.Update(func(tx *bolt.Tx) error {
bkt, err := tx.CreateBucketIfNotExists(pe.bucketName)
if err != nil {
return err
}
return bkt.Put([]byte("tables_"+tbl.Name), tableBytes)
})
if err != nil {
return fmt.Errorf("Could not set key-value: %s", err)
}
return nil
}
Далее мы создадим вспомогательную функцию для обратной операции — извлечения метаданных таблицы из слоя хранения по имени таблицы:
func (pe *pgEngine) getTableDefinition(name string) (*tableDefinition, error) {
var tbl tableDefinition
err := pe.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(pe.bucketName)
if bkt == nil {
return fmt.Errorf("Table does not exist")
}
valBytes := bkt.Get([]byte("tables_" + name))
err := json.Unmarshal(valBytes, &tbl)
if err != nil {
return fmt.Errorf("Could not unmarshal table: %s", err)
}
return nil
})
return &tbl, err
}
На этом базовая поддержка CREATE TABLE завершена. Теперь займемся INSERT
Вставка строки
Наша поддержка INSERT будет работать только с литеральными/константными значениями VALUES.
func (pe *pgEngine) executeInsert(stmt *pgquery.InsertStmt) error {
tblName := stmt.Relation.Relname
slct := stmt.GetSelectStmt().GetSelectStmt()
for _, values := range slct.ValuesLists {
var rowData []any
for _, value := range values.GetList().Items {
if c := value.GetAConst(); c != nil {
if s := c.Val.GetString_(); s != nil {
rowData = append(rowData, s.Str)
continue
}
if i := c.Val.GetInteger(); i != nil {
rowData = append(rowData, i.Ival)
continue
}
}
return fmt.Errorf("Unknown value type: %s", value)
}
// ...
}
Было бы лучше вынести этот код для VALUES в отдельную вспомогательную функцию, чтобы его можно было использовать и для SELECT, но из лени мы просто оставим это здесь.
Теперь нам нужно записать строку в слой хранения. Мы сериализуем данные строки в JSON. Да, это неэффективно, потому что структура строки нам известна, но JSON прост в использовании. Мы сохраним строку с префиксом, включающим имя таблицы, и присвоим её ключу уникальный UUID. При итерации по строкам таблицы мы сможем выполнить сканирование по префиксу, которое вернёт только строки этой таблицы.
// ...
rowBytes, err := json.Marshal(rowData)
if err != nil {
return fmt.Errorf("Could not marshal row: %s", err)
}
id := uuid.New().String()
err = pe.db.Update(func(tx *bolt.Tx) error {
bkt, err := tx.CreateBucketIfNotExists(pe.bucketName)
if err != nil {
return err
}
return bkt.Put([]byte("rows_"+tblName+"_"+id), rowBytes)
})
if err != nil {
return fmt.Errorf("Could not store row: %s", err)
}
}
return nil
}
Теперь можем перейти к поддержке SELECT
Выборка строк
В отличие от CREATE TABLE и INSERT, SELECT должен будет возвращать строки, имена столбцов и типы столбцов. Этого требует протокол PostgreSQL
type pgResult struct {
fieldNames []string
fieldTypes []string
rows [][]any
}
Сначала мы извлекаем имя таблицы и выбранные поля, находя типы полей в метаданных таблицы.
func (pe *pgEngine) executeSelect(stmt *pgquery.SelectStmt) (*pgResult, error) {
tblName := stmt.FromClause[0].GetRangeVar().Relname
tbl, err := pe.getTableDefinition(tblName)
if err != nil {
return nil, err
}
results := &pgResult{}
for _, c := range stmt.TargetList {
fieldName := c.GetResTarget().Val.GetColumnRef().Fields[0].GetString_().Str
results.fieldNames = append(results.fieldNames, fieldName)
fieldType := ""
for i, cn := range tbl.ColumnNames {
if cn == fieldName {
fieldType = tbl.ColumnTypes[i]
}
}
if fieldType == "" {
return nil, fmt.Errorf("Unknown field: %s", fieldName)
}
results.fieldTypes = append(results.fieldTypes, fieldType)
}
// ...
}
Наконец, мы выполняем сканирование по префиксу, чтобы извлечь все строки таблицы из слоя хранения.
func (pe *pgEngine) executeSelect(stmt *pgquery.SelectStmt) (*pgResult, error) {
// ...
prefix := []byte("rows_" + tblName + "_")
pe.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket(pe.bucketName).Cursor()
for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
var row []any
err = json.Unmarshal(v, &row)
if err != nil {
return fmt.Errorf("Unable to unmarshal row: %s", err)
}
var targetRow []any
for _, target := range results.fieldNames {
for i, field := range tbl.ColumnNames {
if target == field {
targetRow = append(targetRow, row[i])
}
}
}
results.rows = append(results.rows, targetRow)
}
return nil
})
return results, nil
}
На этом SELECT полностью реализован. Последняя функция, которую мы реализуем, — это вспомогательная функция для удаления всех данных в слое хранения. Она будет вызываться при запуске до применения логов Raft, чтобы база данных всегда оказывалась в согласованном состоянии.
func (pe *pgEngine) delete() error {
return pe.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(pe.bucketName)
if bkt != nil {
return tx.DeleteBucket(pe.bucketName)
}
return nil
})
}
И мы готовы перейти к заключительному слою - протоколу PostgreSQL.
Протокол PostgreSQL
jackc/pgproto3 — это реализация протокола PostgreSQL на Go. Она позволяет нам создать сервер, который может отвечать на запросы клиентов PostgreSQL, таких как psql.
Он работает путем обертывания TCP-соединения. Поэтому мы начнем с создания функции, которая реализует цикл обработки TCP-подключений.
func runPgServer(port string, db *bolt.DB, r *raft.Raft) {
ln, err := net.Listen("tcp", "localhost:"+port)
if err != nil {
log.Fatal(err)
}
for {
conn, err := ln.Accept()
if err != nil {
log.Fatal(err)
}
pc := pgConn{conn, db, r}
go pc.handle()
}
}
Экземпляру pgConn необходим прямой доступ к базе данных, чтобы иметь возможность отвечать на запрос SELECT. А для всех остальных запросов ему нужен экземпляр Raft.
type pgConn struct {
conn net.Conn
db *bolt.DB
r *raft.Raft
}
Функция handle, которая используется в коде выше, будет получать текущее сообщение через пакет pgproto3 и обрабатывать стартовые сообщения и обычные сообщения
func (pc pgConn) handle() {
pgc := pgproto3.NewBackend(pgproto3.NewChunkReader(pc.conn), pc.conn)
defer pc.conn.Close()
err := pc.handleStartupMessage(pgc)
if err != nil {
log.Println(err)
return
}
for {
err := pc.handleMessage(pgc)
if err != nil {
log.Println(err)
return
}
}
}
Стартовые сообщения включают проверки авторизации и SSL. Мы будем разрешать всё в первом случае и отвечать "нет" во втором.
func (pc pgConn) handleStartupMessage(pgconn *pgproto3.Backend) error {
startupMessage, err := pgconn.ReceiveStartupMessage()
if err != nil {
return fmt.Errorf("Error receiving startup message: %s", err)
}
switch startupMessage.(type) {
case *pgproto3.StartupMessage:
buf := (&pgproto3.AuthenticationOk{}).Encode(nil)
buf = (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(buf)
_, err = pc.conn.Write(buf)
if err != nil {
return fmt.Errorf("Error sending ready for query: %s", err)
}
return nil
case *pgproto3.SSLRequest:
_, err = pc.conn.Write([]byte("N"))
if err != nil {
return fmt.Errorf("Error sending deny SSL request: %s", err)
}
return pc.handleStartupMessage(pgconn)
default:
return fmt.Errorf("Unknown startup message: %#v", startupMessage)
}
}
В основной логике функции handleMessage мы будем проверять тип сообщения.
func (pc pgConn) handleMessage(pgc *pgproto3.Backend) error {
msg, err := pgc.Receive()
if err != nil {
return fmt.Errorf("Error receiving message: %s", err)
}
switch t := msg.(type) {
case *pgproto3.Query:
// TODO
case *pgproto3.Terminate:
return nil
default:
return fmt.Errorf("Received message other than Query from client: %s", msg)
}
return nil
}
Если сообщение является запросом, мы разберем его и немедленно ответим на SELECT
switch t := msg.(type) {
case *pgproto3.Query:
stmts, err := pgquery.Parse(t.String)
if err != nil {
return fmt.Errorf("Error parsing query: %s", err)
}
if len(stmts.GetStmts()) > 1 {
return fmt.Errorf("Only make one request at a time.")
}
stmt := stmts.GetStmts()[0]
// Handle SELECTs here
s := stmt.GetStmt().GetSelectStmt()
if s != nil {
pe := newPgEngine(pc.db)
res, err := pe.executeSelect(s)
if err != nil {
return err
}
pc.writePgResult(res)
return nil
}
В противном случае мы добавим запрос в журнал Raft и вернем базовый ответ.
Пусть вас не смущает writePgResult, мы реализуем этот вспомогательный метод ниже.
// Otherwise it's DDL/DML, raftify
future := pc.r.Apply([]byte(t.String), 500*time.Millisecond)
if err := future.Error(); err != nil {
return fmt.Errorf("Could not apply: %s", err)
}
e := future.Response()
if e != nil {
return fmt.Errorf("Could not apply (internal): %s", e)
}
pc.done(nil, strings.ToUpper(strings.Split(t.String, " ")[0])+" ok")
case *pgproto3.Terminate:
return nil
default:
return fmt.Errorf("Received message other than Query from client: %s", msg)
}
return nil
}
done — это важная вспомогательная функция, которая сообщает подключению PostgreSQL, что запрос выполнен и сервер готов к приему следующего запроса. Без этого ответа psql просто зависнет.
func (pc pgConn) done(buf []byte, msg string) {
buf = (&pgproto3.CommandComplete{CommandTag: []byte(msg)}).Encode(buf)
buf = (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(buf)
_, err := pc.conn.Write(buf)
if err != nil {
log.Printf("Failed to write query response: %s", err)
}
}
А теперь давайте реализуем вспомогательную функцию writePgResult. Эта функция должна преобразовывать нашу структуру pgResult в формат, требуемый pgproto3.
var dataTypeOIDMap = map[string]uint32{
"text": 25,
"pg_catalog.int4": 23,
}
func (pc pgConn) writePgResult(res *pgResult) {
rd := &pgproto3.RowDescription{}
for i, field := range res.fieldNames {
rd.Fields = append(rd.Fields, pgproto3.FieldDescription{
Name: []byte(field),
DataTypeOID: dataTypeOIDMap[res.fieldTypes[i]],
})
}
buf := rd.Encode(nil)
for _, row := range res.rows {
dr := &pgproto3.DataRow{}
for _, value := range row {
bs, err := json.Marshal(value)
if err != nil {
log.Printf("Failed to marshal cell: %s\n", err)
return
}
dr.Values = append(dr.Values, bs)
}
buf = dr.Encode(buf)
}
pc.done(buf, fmt.Sprintf("SELECT %d", len(res.rows)))
}
Мы закончили, нам осталось реализовать фцнкцию main()
Main
При запуске каждому процессу должен быть назначен (родительским процессом) уникальный идентификатор узла - для этого подойдет любая уникальная строка. А также порты для сервера Raft, сервера Postgres и HTTP-сервера. Мы создадим небольшую вспомогательную функцию getConfig для получения этих данных из аргументов.
type config struct {
id string
httpPort string
raftPort string
pgPort string
}
func getConfig() config {
cfg := config{}
for i, arg := range os.Args[1:] {
if arg == "--node-id" {
cfg.id = os.Args[i+2]
i++
continue
}
if arg == "--http-port" {
cfg.httpPort = os.Args[i+2]
i++
continue
}
if arg == "--raft-port" {
cfg.raftPort = os.Args[i+2]
i++
continue
}
if arg == "--pg-port" {
cfg.pgPort = os.Args[i+2]
i++
continue
}
}
if cfg.id == "" {
log.Fatal("Missing required parameter: --node-id")
}
if cfg.raftPort == "" {
log.Fatal("Missing required parameter: --raft-port")
}
if cfg.httpPort == "" {
log.Fatal("Missing required parameter: --http-port")
}
if cfg.pgPort == "" {
log.Fatal("Missing required parameter: --pg-port")
}
return cfg
}
Теперь в функции main мы получим конфигурацию и настроим базу данных для этого процесса. Все процессы будут помещать свои данные в корневую директорию data, чтобы упростить управление директориями. Но внутри этой директории каждый процесс будет иметь свои собственные уникальные директории для хранения данных, основанные на уникальном идентификаторе узла. Запутанно, но не так уж и сложно
func main() {
cfg := getConfig()
dataDir := "data"
err := os.MkdirAll(dataDir, os.ModePerm)
if err != nil {
log.Fatalf("Could not create data directory: %s", err)
}
db, err := bolt.Open(path.Join(dataDir, "/data"+cfg.id), 0600, nil)
if err != nil {
log.Fatalf("Could not open bolt db: %s", err)
}
defer db.Close()
// ...
Нам необходимо очистить базу данных при старте.
// ...
pe := newPgEngine(db)
// Start off in clean state
pe.delete()
// ...
Настроим сервер Raft.
// ...
pf := &pgFsm{pe}
r, err := setupRaft(path.Join(dataDir, "raft"+cfg.id), cfg.id, "localhost:"+cfg.raftPort, pf)
if err != nil {
log.Fatal(err)
}
// ...
Настроим HTTP-сервер.
// ...
hs := httpServer{r}
http.HandleFunc("/add-follower", hs.addFollowerHandler)
go func() {
err := http.ListenAndServe(":"+cfg.httpPort, nil)
if err != nil {
log.Fatal(err)
}
}()
// ...
И наконец, запустим сервер PostgreSQL.
// ...
runPgServer(cfg.pgPort, db, r)
}
И вот теперь точно все. Давайте запустим наше приложение.
"Что сотворил Бог"
Библейская фраза из книги Числ (23:23), также известная как первое сообщение, отправленное по телеграфу Сэмюэлем Морзе в 1844 году.
Сначала инициализируйте go-модуль, а затем соберите приложение.
$ go mod init waterbugdb
$ go mod tidy
$ go build
Теперь в терминале 1 запустите экземпляр базы данных:
$ ./waterbugdb --node-id node1 --raft-port 2222 --http-port 8222 --pg-port 6000
Затем в терминале 2 запустите другой экземпляр.
$ ./waterbugdb --node-id node2 --raft-port 2223 --http-port 8223 --pg-port 6001
И в терминале 3 укажите node1, чтобы node2 присоединился к нему.
$ curl 'localhost:8222/add-follower?addr=localhost:2223&id=node2'
А затем откройте psql, подключившись к порту 6000, на котором слушает лидер.
$ psql -h localhost -p 6000
psql -h 127.0.0.1 -p 6000
psql (13.4, server 0.0.0)
Type "help" for help.
phil=> create table x (age int, name text);
CREATE ok
phil=> insert into x values(14, 'garry'), (20, 'ted');
could not interpret result from server: INSERT ok
INSERT ok
phil=> select name, age from x;
name | age
---------+-----
"garry" | 14
"ted" | 20
(2 rows)
Теперь остановите node1 в терминале 1. Затем запустите его снова. node2 теперь станет лидером. Поэтому выйдите из psql в терминале 3 и подключитесь снова, указав node2 на порту 6001. Добавьте новые данные.
$ psql -h 127.0.0.1 -p 6001
psql (13.4, server 0.0.0)
Type "help" for help.
phil=> insert into x values(19, 'ava'), (18, 'ming');
could not interpret result from server: INSERT ok
phil=> select age, name from x;
age | name
-----+---------
20 | "ted"
14 | "garry"
18 | "ming"
19 | "ava"
Выйдите из psql в терминале 3 и снова подключитесь к node1 на порту 6000.
$ psql -h 127.0.0.1 -p 6000
psql (13.4, server 0.0.0)
Type "help" for help.
phil=> select age, name from x;
age | name
-----+---------
20 | "ted"
14 | "garry"
18 | "ming"
19 | "ava"
(2 rows)
Круто! Все работает.
Итог
С одной стороны, этот пост получился сложнее чем ожидалось. Каждому инстансу потребовалось три работающих сервера. Двумя из этих серверов мы управляли напрямую, а сервер Raft управлялся библиотекой Raft.
С другой стороны, мы реализовали всё это очень небольшим объемом кода. Да, множество граничных случаев остались необработанными, и огромная часть SQL не поддерживается. И да, здесь полно неэффективностей, например использование JSON формата, хотя каждая таблица имеет фиксированную структуру. Но, надеюсь, теперь вы представляете, как может быть реализован подобный проект. И у нас есть основа для постепенного добавления поддержки синтаксиса и обработки граничных случаев.
Кроме того, единственная проблема консенсуса, которую мы решили, — это репликация, а не шардирование. Именно это и его более сложный родственник (кросс-шардовые транзакции) являются той самой "изюминкой", которую привносит Cockroach.
Подробнее о том, как разобраться в шардировании, репликации и распределенном консенсусе, читайте здесь.