Miljoona verkkopistettä ja mene

Hei kaikki! Nimeni on Sergey Kamardin ja olen Mail.Ru: n kehittäjä.

Tämä artikkeli kertoo siitä, kuinka kehitimme suuren kuormituksen WebSocket-palvelimen Go-sovelluksella.

Jos olet perehtynyt WebSocketsiin, mutta tiedät vähän Go: sta, toivon, että tämä artikkeli on silti mielenkiintoinen suorituskyvyn optimoinnin ideoiden ja tekniikoiden suhteen.

1. Esittely

Tarinamme kontekstin määrittelemiseksi on sanottava muutama sana siitä, miksi tarvitsemme tätä palvelinta.

Mail.Ru: lla on paljon tilallisia järjestelmiä. Käyttäjän sähköpostitallennus on yksi niistä. On olemassa useita tapoja seurata tilan muutoksia järjestelmässä - ja järjestelmän tapahtumista. Enimmäkseen tämä tapahtuu joko säännöllisin väliajoin järjestelmän kyselyin tai järjestelmän ilmoituksiin sen tilan muutoksista.

Molemmilla tavoilla on hyvät ja huonot puolensa. Mutta mitä tulee postiin, sitä nopeammin käyttäjä saa uuden postin, sitä parempi.

Postin kyselyyn liittyy noin 50 000 HTTP-kyselyä sekunnissa, joista 60% palauttaa 304-tilan, mikä tarkoittaa, että postilaatikossa ei ole muutoksia.

Siksi palvelinten kuormituksen vähentämiseksi ja postin toimittamisen nopeuttamiseksi käyttäjille päätettiin kehittää pyörä uudelleen kirjoittamalla julkaisija-tilaajapalvelin (joka tunnetaan myös nimellä väylä, viestien välittäjä tai tapahtuma- kanava), joka saisi toisaalta ilmoituksia tilamuutoksista ja toisaalta tällaisten ilmoitusten tilauksista.

Aiemmin:

Nyt:

Ensimmäinen kaavio näyttää miltä se oli ennen. Selain kysyi säännöllisesti sovellusliittymää ja kysyi tallennustilan (postilaatikkopalvelu) muutoksista.

Toinen kaavio kuvaa uutta arkkitehtuuria. Selain muodostaa WebSocket-yhteyden ilmoitussovellusliittymän kanssa, joka on väyläpalvelimen asiakas. Saatuaan uuden sähköpostin Storage lähettää ilmoituksen siitä Busille (1) ja Bus sen tilaajille (2). Sovellusliittymä määrittää yhteyden vastaanotetun ilmoituksen lähettämiseen ja lähettää sen käyttäjän selaimelle (3).

Joten tänään puhumme API: sta tai WebSocket-palvelimesta. Tulevaisuudessa sanon, että palvelimella on noin 3 miljoonaa online-yhteyttä.

2. Idiomaattinen tapa

Katsotaanpa, kuinka toteutamme tietyt palvelimemme osat käyttämällä tavallisia Go-ominaisuuksia ilman optimointia.

Ennen kuin jatkamme net/http, puhutaan siitä, miten lähetämme ja vastaanotamme tietoja. WebSocket-protokollan yläpuolella oleviin tietoihin (esim. JSON-objektit) viitataan jäljempänä paketeina .

Aloitetaan sellaisen Channelrakenteen toteuttaminen , joka sisältää tällaisten pakettien lähettämisen ja vastaanottamisen logiikan WebSocket-yhteyden kautta.

2.1. Kanavan rakenne

// Packet represents application level data. type Packet struct { ... } // Channel wraps user connection. type Channel struct { conn net.Conn // WebSocket connection. send chan Packet // Outgoing packets queue. } func NewChannel(conn net.Conn) *Channel { c := &Channel{ conn: conn, send: make(chan Packet, N), } go c.reader() go c.writer() return c }

Haluaisin kiinnittää huomionne kahden luku- ja kirjoitusohjelman julkaisemiseen. Jokainen gorutiini vaatii oman muistipinon, jonka alkuperäinen koko voi olla 2–8 kt käyttöjärjestelmästä ja Go-versiosta riippuen.

Edellä mainitun 3 miljoonan online-yhteyden määrän osalta tarvitsemme 24 Gt muistia (4 kt: n pino) kaikille yhteyksille. Ja se ilman Channelrakennetta, lähteviä paketteja ch.sendja muita sisäisiä kenttiä varten varattua muistia .

2.2. I / O-gorutiinit

Katsotaanpa "lukijan" toteutusta:

func (c *Channel) reader() { // We make a buffered read to reduce read syscalls. buf := bufio.NewReader(c.conn) for { pkt, _ := readPacket(buf) c.handle(pkt) } }

Tässä käytämme bufio.Readervähentämään read()syscallien määrää ja lukemaan niin monta kuin bufpuskurikoko sallii . Infiniittisen silmukan sisällä odotamme tulevan uutta tietoa. Muista sanat: odota uusia tietoja tulevan. Palaamme heidän luokseen myöhemmin.

Jätämme sivuun saapuvien pakettien jäsentämisen ja käsittelyn, koska se ei ole tärkeää puhuttavien optimointien kannalta. Se bufon kuitenkin nyt huomiomme arvoinen: oletusarvoisesti se on 4 kt, mikä tarkoittaa vielä 12 Gt muistia yhteyksillemme. "Kirjoittajan" kohdalla on samanlainen tilanne:

func (c *Channel) writer() { // We make buffered write to reduce write syscalls. buf := bufio.NewWriter(c.conn) for pkt := range c.send { _ := writePacket(buf, pkt) buf.Flush() } }

Me iteroimme lähtevän pakettikanavan yli c.sendja kirjoitamme ne puskuriin. Kuten huomaavainen lukijamme jo arvaa, tämä on vielä 4 kt ja 12 Gt muistia 3 miljoonalle yhteydellemme.

2.3. HTTP

Meillä on jo yksinkertainen Channeltoteutus, nyt meidän on hankittava WebSocket-yhteys toimimaan. Koska olemme edelleen idiomaattisen tien otsikossa, tehkäämme se samalla tavalla.

Huomaa: Jos et tiedä miten WebSocket toimii, on mainittava, että asiakas siirtyy WebSocket-protokollaan erityisen HTTP-mekanismin avulla, jota kutsutaan päivittämiseksi. Päivityspyynnön onnistuneen käsittelyn jälkeen palvelin ja asiakas käyttävät TCP-yhteyttä vaihtamaan binaarisia WebSocket-kehyksiä. Tässä on kuvaus kehyksen rakenteesta yhteyden sisällä.
import ( "net/http" "some/websocket" ) http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) { conn, _ := websocket.Upgrade(r, w) ch := NewChannel(conn) //... })

Huomaa, että http.ResponseWritertekee muistinkäytön bufio.Readerja bufio.Writer(molemmat 4 kilotavun puskuri) ja *http.Requestalustus ja edelleen vastauksen kirjallisesti.

Käytetystä WebSocket-kirjastosta riippumatta palvelin vastaanottaa onnistuneen päivityspyyntöön vastaamisen jälkeen I / O-puskurit yhdessä TCP-yhteyden kanssa responseWriter.Hijack()puhelun jälkeen.

Vihje: joissakin tapauksissa niitä go:linknamevoidaan käyttää puskurien palauttamiseen sync.Poolsisäpuolelle net/httppuhelun kautta net/http.putBufio{Reader,Writer}.

Tarvitsemme siis vielä 24 Gt muistia 3 miljoonalle yhteydelle.

Joten yhteensä 72 Gt muistia sovellukselle, joka ei vielä tee mitään!

3. Optimoinnit

Tarkastellaan, mistä puhuimme johdanto-osassa, ja muistetaan, miten käyttäjäyhteys käyttäytyy. Vaihdettuaan WebSocketiin asiakas lähettää paketin asiaankuuluvien tapahtumien kanssa tai toisin sanoen tilaa tapahtumia. Sitten (ottamatta huomioon teknisiä viestejä ping/pong) asiakas ei voi lähettää mitään muuta koko yhteyden käyttöiän ajan.

Yhteyden käyttöikä voi kestää useista sekunneista useisiin päiviin.

Joten eniten aikaa Channel.reader()ja Channel.writer()odotamme datan käsittelyä vastaanottamista tai lähettämistä varten. Yhdessä heidän kanssaan odottamassa ovat I / O-puskurit 4 kt.

Nyt on selvää, että tietyt asiat voitaisiin tehdä paremmin, eikö niin?

3.1. Netpoll

Muistatko Channel.reader()toteutuksen, joka odotti uusien tietojenconn.Read() saapuvan lukitsemalla puhelun sisällä bufio.Reader.Read()? Jos yhteydessä oli tietoja, Go-ajonaika "herätti" gorutiinimme ja antoi sen lukea seuraavan paketin. Sen jälkeen gorutiini lukkiutui taas odottaessaan uusia tietoja. Katsotaanpa, kuinka Go runtime ymmärtää, että gorutiini on "herättävä".

Jos katsomme conn.Read () -toteutusta, näemme net.netFD.Read () -kutsun sen sisällä:

// net/fd_unix.go func (fd *netFD) Read(p []byte) (n int, err error) { //... for { n, err = syscall.Read(fd.sysfd, p) if err != nil { n = 0 if err == syscall.EAGAIN { if err = fd.pd.waitRead(); err == nil { continue } } } //... break } //... }
Go käyttää pistorasioita estämättömässä tilassa. EAGAIN sanoo, että pistorasiassa ei ole tietoja eikä käyttöjärjestelmää palauteta hallinta meille, jotta lukeminen ei onnistuisi tyhjästä pistorasiasta.

Näemme read()syscall-yhteyden tiedostotunnisteesta. Jos luku palauttaa EAGAIN-virheen, ajonaika tekee pollDesc.waitRead () -kutsun:

// net/fd_poll_runtime.go func (pd *pollDesc) waitRead() error { return pd.wait('r') } func (pd *pollDesc) wait(mode int) error { res := runtime_pollWait(pd.runtimeCtx, mode) //... }

Jos kaivamme syvemmälle, näemme, että netpoll toteutetaan käyttämällä epollia Linuxissa ja kqueueä BSD: ssä. Miksi et käyttäisi samaa lähestymistapaa yhteyksiimme? Voisimme allokoida lukupuskurin ja aloittaa lukurutiinin vain, kun se on todella välttämätöntä: kun pistorasiassa on todella luettavaa tietoa.

Sivustolla github.com/golang/go on kysymys netpoll-toimintojen viennistä.

3.2. Päästä eroon gorutiinista

Suppose we have netpoll implementation for Go. Now we can avoid starting the Channel.reader() goroutine with the inside buffer, and subscribe for the event of readable data in the connection:

ch := NewChannel(conn) // Make conn to be observed by netpoll instance. poller.Start(conn, netpoll.EventRead, func() { // We spawn goroutine here to prevent poller wait loop // to become locked during receiving packet from ch. go Receive(ch) }) // Receive reads a packet from conn and handles it somehow. func (ch *Channel) Receive() { buf := bufio.NewReader(ch.conn) pkt := readPacket(buf) c.handle(pkt) }

It is easier with the Channel.writer() because we can run the goroutine and allocate the buffer only when we are going to send the packet:

func (ch *Channel) Send(p Packet) { if c.noWriterYet() { go ch.writer() } ch.send <- p }
Note that we do not handle cases when operating system returns EAGAIN on write() system calls. We lean on Go runtime for such cases, cause it is actually rare for such kind of servers. Nevertheless, it could be handled in the same way if needed.

After reading the outgoing packets from ch.send (one or several), the writer will finish its operation and free the goroutine stack and the send buffer.

Perfect! We have saved 48 GB by getting rid of the stack and I/O buffers inside of two continuously running goroutines.

3.3. Control of resources

A great number of connections involves not only high memory consumption. When developing the server, we experienced repeated race conditions and deadlocks often followed by the so-called self-DDoS — a situation when the application clients rampantly tried to connect to the server thus breaking it even more.

For example, if for some reason we suddenly could not handle ping/pong messages, but the handler of idle connections continued to close such connections (supposing that the connections were broken and therefore provided no data), the client appeared to lose connection every N seconds and tried to connect again instead of waiting for events.

It would be great if the locked or overloaded server just stopped accepting new connections, and the balancer before it (for example, nginx) passed request to the next server instance.

Moreover, regardless of the server load, if all clients suddenly want to send us a packet for any reason (presumably by cause of bug), the previously saved 48 GB will be of use again, as we will actually get back to the initial state of the goroutine and the buffer per each connection.

Goroutine pool

We can restrict the number of packets handled simultaneously using a goroutine pool. This is what a naive implementation of such pool looks like:

package gopool func New(size int) *Pool { return &Pool{ work: make(chan func()), sem: make(chan struct{}, size), } } func (p *Pool) Schedule(task func()) error { select { case p.work <- task: case p.sem <- struct{}{}: go p.worker(task) } } func (p *Pool) worker(task func()) { defer func() { <-p.sem } for { task() task = <-p.work } }

Now our code with netpoll looks as follows:

pool := gopool.New(128) poller.Start(conn, netpoll.EventRead, func() { // We will block poller wait loop when // all pool workers are busy. pool.Schedule(func() { Receive(ch) }) })

So now we read the packet not only upon readable data appearance in the socket, but also upon the first opportunity to take up the free goroutine in the pool.

Similarly, we’ll change Send():

pool := gopool.New(128) func (ch *Channel) Send(p Packet) { if c.noWriterYet() { pool.Schedule(ch.writer) } ch.send <- p }

Instead of go ch.writer(), we want to write in one of the reused goroutines. Thus, for a pool of N goroutines, we can guarantee that with N requests handled simultaneously and the arrived N + 1 we will not allocate a N + 1 buffer for reading. The goroutine pool also allows us to limit Accept() and Upgrade() of new connections and to avoid most situations with DDoS.

3.4. Zero-copy upgrade

Let’s deviate a little from the WebSocket protocol. As was already mentioned, the client switches to the WebSocket protocol using a HTTP Upgrade request. This is what it looks like:

GET /ws HTTP/1.1 Host: mail.ru Connection: Upgrade Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA== Sec-Websocket-Version: 13 Upgrade: websocket HTTP/1.1 101 Switching Protocols Connection: Upgrade Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4= Upgrade: websocket

That is, in our case we need the HTTP request and its headers only for switch to the WebSocket protocol. This knowledge and what is stored inside the http.Request suggests that for the sake of optimization, we could probably refuse unnecessary allocations and copyings when processing HTTP requests and abandon the standard net/http server.

For example, the http.Request contains a field with the same-name Header type that is unconditionally filled with all request headers by copying data from the connection to the values strings. Imagine how much extra data could be kept inside this field, for example for a large-size Cookie header.

But what to take in return?

WebSocket implementation

Unfortunately, all libraries existing at the time of our server optimization allowed us to do upgrade only for the standard net/http server. Moreover, neither of the (two) libraries made it possible to use all the above read and write optimizations. For these optimizations to work, we must have a rather low-level API for working with WebSocket. To reuse the buffers, we need the procotol functions to look like this:

func ReadFrame(io.Reader) (Frame, error) func WriteFrame(io.Writer, Frame) error

If we had a library with such API, we could read packets from the connection as follows (the packet writing would look the same):

// getReadBuf, putReadBuf are intended to // reuse *bufio.Reader (with sync.Pool for example). func getReadBuf(io.Reader) *bufio.Reader func putReadBuf(*bufio.Reader) // readPacket must be called when data could be read from conn. func readPacket(conn io.Reader) error { buf := getReadBuf() defer putReadBuf(buf) buf.Reset(conn) frame, _ := ReadFrame(buf) parsePacket(frame.Payload) //... }

In short, it was time to make our own library.

github.com/gobwas/ws

Ideologically, the ws library was written so as not to impose its protocol operation logic on users. All reading and writing methods accept standard io.Reader and io.Writer interfaces, which makes it possible to use or not to use buffering or any other I/O wrappers.

Besides upgrade requests from standard net/http, ws supports zero-copy upgrade, the handling of upgrade requests and switching to WebSocket without memory allocations or copyings. ws.Upgrade() accepts io.ReadWriter (net.Conn implements this interface). In other words, we could use the standard net.Listen() and transfer the received connection from ln.Accept() immediately to ws.Upgrade(). The library makes it possible to copy any request data for future use in the application (for example, Cookie to verify the session).

Below there are benchmarks of Upgrade request processing: standard net/http server versus net.Listen() with zero-copy upgrade:

BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op

Switching to ws and zero-copy upgrade saved us another 24 GB — the space allocated for I/O buffers upon request processing by the net/http handler.

3.5. Summary

Let’s structure the optimizations I told you about.

  • A read goroutine with a buffer inside is expensive. Solution: netpoll (epoll, kqueue); reuse the buffers.
  • A write goroutine with a buffer inside is expensive. Solution: start the goroutine when necessary; reuse the buffers.
  • With a storm of connections, netpoll won’t work. Solution: reuse the goroutines with the limit on their number.
  • net/http is not the fastest way to handle Upgrade to WebSocket. Solution: use the zero-copy upgrade on bare TCP connection.

Juuri palvelinkoodi voisi näyttää:

import ( "net" "github.com/gobwas/ws" ) ln, _ := net.Listen("tcp", ":8080") for { // Try to accept incoming connection inside free pool worker. // If there no free workers for 1ms, do not accept anything and try later. // This will help us to prevent many self-ddos or out of resource limit cases. err := pool.ScheduleTimeout(time.Millisecond, func() { conn := ln.Accept() _ = ws.Upgrade(conn) // Wrap WebSocket connection with our Channel struct. // This will help us to handle/send our app's packets. ch := NewChannel(conn) // Wait for incoming bytes from connection. poller.Start(conn, netpoll.EventRead, func() { // Do not cross the resource limits. pool.Schedule(func() { // Read and handle incoming packet(s). ch.Recevie() }) }) }) if err != nil { time.Sleep(time.Millisecond) } }

4. Yhteenveto

Ennenaikainen optimointi on kaiken pahan (tai ainakin suurimman osan) perusta ohjelmoinnissa. Donald Knuth

Tietenkin yllä olevat optimoinnit ovat merkityksellisiä, mutta eivät kaikissa tapauksissa. Esimerkiksi jos vapaiden resurssien (muisti, suorittimen) ja online-yhteyksien määrän suhde on melko korkea, optimoinnilla ei todennäköisesti ole mitään järkeä. Voit kuitenkin hyötyä paljon siitä, että tiedät missä ja mitä parantaa.

Kiitos huomiostasi!

5. Viitteet

  • //github.com/mailru/easygo
  • //github.com/gobwas/ws
  • //github.com/gobwas/ws-examplees
  • //github.com/gobwas/httphead
  • Tämän artikkelin venäjänkielinen versio