Basics

Here are some common things to do without digging too deep into the mechanics.

Saving Keyframes

If you just want to look at keyframes, you can set CodecContext.skip_frame to speed up the process:

import av
import av.datasets


content = av.datasets.curated("pexels/time-lapse-video-of-night-sky-857195.mp4")
with av.open(content) as container:
    # Signal that we only want to look at keyframes.
    stream = container.streams.video[0]
    stream.codec_context.skip_frame = "NONKEY"

    for frame in container.decode(stream):

        print(frame)

        # We use `frame.pts` as `frame.index` won't make must sense with the `skip_frame`.
        frame.to_image().save(
            "night-sky.{:04d}.jpg".format(frame.pts),
            quality=80,
        )

Remuxing

Remuxing is copying audio/video data from one container to the other without transcoding it. By doing so, the data does not suffer any generational loss, and is the full quality that it was in the source container.

import av
import av.datasets


input_ = av.open(av.datasets.curated("pexels/time-lapse-video-of-night-sky-857195.mp4"))
output = av.open("remuxed.mkv", "w")

# Make an output stream using the input as a template. This copies the stream
# setup from one to the other.
in_stream = input_.streams.video[0]
out_stream = output.add_stream(template=in_stream)

for packet in input_.demux(in_stream):

    print(packet)

    # We need to skip the "flushing" packets that `demux` generates.
    if packet.dts is None:
        continue

    # We need to assign the packet to the new stream.
    packet.stream = out_stream

    output.mux(packet)

input_.close()
output.close()

Parsing

Sometimes we have a raw stream of data, and we need to split it into packets before working with it. We can use CodecContext.parse() to do this.

import os
import subprocess

import av
import av.datasets


# We want an H.264 stream in the Annex B byte-stream format.
# We haven't exposed bitstream filters yet, so we're gonna use the `ffmpeg` CLI.
h264_path = "night-sky.h264"
if not os.path.exists(h264_path):
    subprocess.check_call(
        [
            "ffmpeg",
            "-i",
            av.datasets.curated("pexels/time-lapse-video-of-night-sky-857195.mp4"),
            "-vcodec",
            "copy",
            "-an",
            "-bsf:v",
            "h264_mp4toannexb",
            h264_path,
        ]
    )


fh = open(h264_path, "rb")

codec = av.CodecContext.create("h264", "r")

while True:

    chunk = fh.read(1 << 16)

    packets = codec.parse(chunk)
    print("Parsed {} packets from {} bytes:".format(len(packets), len(chunk)))

    for packet in packets:

        print("   ", packet)

        frames = codec.decode(packet)
        for frame in frames:
            print("       ", frame)

    # We wait until the end to bail so that the last empty `buf` flushes
    # the parser.
    if not chunk:
        break

Threading

By default, codec contexts will decode with SLICE threading. This allows multiple threads to cooperate to decode any given frame.

This is faster than no threading, but is not as fast as we can go.

Also enabling FRAME (or AUTO) threading allows multiple threads to decode independent frames. This is not enabled by default because it does change the API a bit: you will get a much larger “delay” between starting the decode of a packet and getting it’s results. Take a look at the output of this sample to see what we mean:

import time

import av
import av.datasets


print("Decoding with default (slice) threading...")

container = av.open(
    av.datasets.curated("pexels/time-lapse-video-of-night-sky-857195.mp4")
)

start_time = time.time()
for packet in container.demux():
    print(packet)
    for frame in packet.decode():
        print(frame)

default_time = time.time() - start_time
container.close()


print("Decoding with auto threading...")

container = av.open(
    av.datasets.curated("pexels/time-lapse-video-of-night-sky-857195.mp4")
)

# !!! This is the only difference.
container.streams.video[0].thread_type = "AUTO"

start_time = time.time()
for packet in container.demux():
    print(packet)
    for frame in packet.decode():
        print(frame)

auto_time = time.time() - start_time
container.close()


print("Decoded with default threading in {:.2f}s.".format(default_time))
print("Decoded with auto threading in {:.2f}s.".format(auto_time))

On the author’s machine, the second pass decodes ~5 times faster.