Skip to main content
Ant implements the WHATWG Streams Standard, so you can create, transform, and consume streams using the same API available in browsers. Streams are especially useful for large HTTP responses — instead of buffering everything in memory, you can push chunks to the client as soon as they are ready. Ant also provides the Node-compatible node:stream module for code that uses Readable, Writable, and pipeline.

ReadableStream

A ReadableStream produces data on demand. You define how chunks are generated inside the start method of the underlying source object.
const stream = new ReadableStream({
  start(controller) {
    const chunks = ['Hello', ' ', 'from', ' ', 'Ant!'];

    for (const chunk of chunks) {
      controller.enqueue(new TextEncoder().encode(chunk));
    }

    controller.close();
  }
});
controller.enqueue(chunk)
function
Pushes a chunk into the stream. For text and binary data, enqueue a Uint8Array so downstream consumers and Response handle it correctly.
controller.close()
function
Signals that no more chunks will be enqueued. Consumers will see the stream end after reading all queued chunks.
controller.error(reason)
function
Closes the stream with an error. Consumers will receive a rejection.

Async ReadableStream

Use the pull method for back-pressure-aware production — it is called only when the consumer is ready for more data.
let index = 0;
const items = ['one', 'two', 'three'];

const stream = new ReadableStream({
  pull(controller) {
    if (index < items.length) {
      controller.enqueue(new TextEncoder().encode(items[index++]));
    } else {
      controller.close();
    }
  }
});

Consuming a ReadableStream

const reader = stream.getReader();

while (true) {
  const { value, done } = await reader.read();
  if (done) break;
  console.log(new TextDecoder().decode(value));
}

reader.releaseLock();

WritableStream

A WritableStream consumes data written to it. Implement write, close, and optionally abort in the underlying sink.
const collected = [];

const writable = new WritableStream({
  write(chunk) {
    collected.push(new TextDecoder().decode(chunk));
  },
  close() {
    console.log('stream closed, data:', collected.join(''));
  },
  abort(reason) {
    console.error('stream aborted:', reason);
  }
});

const writer = writable.getWriter();
await writer.write(new TextEncoder().encode('Hello'));
await writer.write(new TextEncoder().encode(', world!'));
await writer.close();

TransformStream

A TransformStream sits between a ReadableStream source and a WritableStream sink, transforming chunks as they pass through.
const upperCase = new TransformStream({
  transform(chunk, controller) {
    const text = new TextDecoder().decode(chunk);
    controller.enqueue(new TextEncoder().encode(text.toUpperCase()));
  }
});
Pipe a source through a transform:
const source = new ReadableStream({
  start(controller) {
    controller.enqueue(new TextEncoder().encode('hello from ant'));
    controller.close();
  }
});

const { readable, writable } = new TransformStream({
  transform(chunk, controller) {
    const text = new TextDecoder().decode(chunk);
    controller.enqueue(new TextEncoder().encode(text.toUpperCase()));
  }
});

source.pipeTo(writable);

const reader = readable.getReader();
const { value } = await reader.read();
console.log(new TextDecoder().decode(value)); // "HELLO FROM ANT"

Streaming HTTP responses

Pass a ReadableStream directly as the body of a Response to stream the HTTP response to the client. Ant flushes each enqueued chunk as it arrives.
export default {
  port: 3000,
  fetch(req) {
    const stream = new ReadableStream({
      start(controller) {
        const chunks = ['Hello', ' ', 'from', ' ', 'Ant!'];

        for (const chunk of chunks) {
          controller.enqueue(new TextEncoder().encode(chunk));
        }

        controller.close();
      }
    });

    return new Response(stream, {
      headers: { 'Content-Type': 'text/plain' }
    });
  }
};
For server-sent events or long-lived connections, use an async start that resolves over time:
export default {
  port: 3000,
  fetch(req) {
    let interval;

    const stream = new ReadableStream({
      start(controller) {
        let count = 0;

        interval = setInterval(() => {
          controller.enqueue(
            new TextEncoder().encode(`data: ${JSON.stringify({ tick: ++count })}\n\n`)
          );

          if (count >= 5) {
            clearInterval(interval);
            controller.close();
          }
        }, 500);
      },
      cancel() {
        clearInterval(interval);
      }
    });

    return new Response(stream, {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache'
      }
    });
  }
};

Text encoding streams

TextEncoderStream and TextDecoderStream are transform streams that convert between strings and Uint8Array chunks, useful when piping text through a stream pipeline.
const { readable, writable } = new TextEncoderStream();

const writer = writable.getWriter();
writer.write('Hello');
writer.write(', Ant!');
writer.close();

const reader = readable.getReader();
// yields Uint8Array chunks

Compression streams

CompressionStream and DecompressionStream compress and decompress data using gzip, deflate, or brotli. Ant implements these using zlib and the brotli library bundled in the runtime.
const input = new TextEncoder().encode('compress this payload');

const compressedStream = new ReadableStream({
  start(controller) {
    controller.enqueue(input);
    controller.close();
  }
}).pipeThrough(new CompressionStream('gzip'));

// Collect the compressed chunks
const chunks = [];
const reader = compressedStream.getReader();

while (true) {
  const { value, done } = await reader.read();
  if (done) break;
  chunks.push(value);
}
Decompress in the same way:
const decompressedStream = compressedStream
  .pipeThrough(new DecompressionStream('gzip'));
Supported formats for CompressionStream and DecompressionStream are "gzip", "deflate", "deflate-raw", and "brotli".

Node-compatible node:stream

Use the Node.js stream module when working with packages that import from stream or node:stream.
import { Readable, Writable, Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';

Pipeline with stream/promises

import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { createGzip } from 'node:zlib';

await pipeline(
  createReadStream('./input.txt'),
  createGzip(),
  createWriteStream('./input.txt.gz')
);
Prefer stream/promises over the callback-based stream.pipeline — it integrates cleanly with async/await and avoids uncaught error edge cases.