Usage
Example: Files by creation timestamp
In this example, we’ll set up a bream
stream that streams some numbers from files, in order of file
creation time, to a processing function that computes and writes some basic stats.
Setting up an example data source
For this example, let’s set up a simple data source. We’ll have a loop that populates a directory with randomly named files (e.g gzhzm0gx50
)
that contain some random positive integers, e.g:
26
48
27
49
Let’s set up a function and script entrypoint to do this:
from __future__ import annotations
from pathlib import Path
from random import random, randint, choices
from sys import argv
from time import sleep
def file_creation_loop(testdir: Path) -> None:
"""An example data source.
Every now and then, adds a randomly-named file to `test_dir` that contains
some numbers.
"""
testdir.mkdir(parents=True, exist_ok=True)
max_seconds_between_file_creation = 24
while True:
sleep_time = random()*max_seconds_between_file_creation
sleep(sleep_time)
fname = "".join(choices("abcdefghijklmnopqrstuvwxyz0123456789", k=10))
number_of_numbers = randint(1, 6)
nums = [f"{randint(0, 50)}\n" for _ in range(number_of_numbers)]
print(f"Writing file: {fname}...")
with (testdir / str(fname)).open("w") as f:
f.writelines(nums)
# cleanup: keep only latest 250 files
ps = sorted(testdir.glob("*"), key=(lambda p: p.stat().st_ctime))
for p in ps[:-250:]:
p.unlink()
if __name__ == "__main__":
if argv[1] == "startsource":
inputdir = Path(argv[2])
file_creation_loop(inputdir)
else:
raise ValueError
The source could now by started with:
python <pythonscript>.py startsource <where to put the files>
But we have more to set up first.
Wrapping the data source for use with bream
To use this data source with a bream stream, we first need to define a bream Source
for it. To do so, we inherit from Source
and define the name and read
method
that loads the files and emits the numbers:
from __future__ import annotations
from pathlib import Path
from random import random, randint, choices
from sys import argv
from time import sleep
from contextlib import suppress
from bream.core import Source, BatchRequest, Batch
...
class NumbersFromFilesByCreationTimestampSource(Source):
"""Bream wrapper for a directory of files containing numbers. The data will
be emitted in order of file creation timestamp. The file creation timestamp in
POSIX timestamp form is used as the stream "offset".
For this example, it will be be pointed at a directory populated by
`file_creation_loop`.
"""
def __init__(self, sourcename: str, filedir: Path, num_files_per_batch: int) -> None:
"""Initialize the bream data source.
Args:
sourcename: the name of the source - used by bream to identify the source
filedir: path to the dir containing files of numbers
num_files_per_batch: the maximum number of files that should be read in each batch
"""
# the name of the source - used by bream to identify the source:
self.name = sourcename
self._filedir = filedir
self._num_files_per_batch = num_files_per_batch
def _get_timestamp_to_file_map(self) -> dict[int, Path]:
"""Get a map of file-creation timestamps to file path for each file in the dir."""
files = list(self._filedir.glob("*"))
res: dict[int, Path] = {}
for f in files:
with suppress(FileNotFoundError): # avoid cleanup race-condition of file_creation_loop
res[int(f.stat().st_ctime*1000)] = f
return res
def read(self, br: BatchRequest) -> Batch | None:
"""Read a batch of number-lists from the directory.
The data will be read to a map with a key-value pair for each file read in this batch:
{<filename>: [<numbers>, ...], ...}
Args:
br: The batch request. This is a bream construct that is
part of the bream source protocol that looks like:
BreamRequest(read_from_after: int | None, read_to: int | None) where:
`read_from_after` is an offset such that we should start
from the next available one (or None if we should start from the
beginning)
`read_to` is the offset we should read to (or None if the source
gets to choose where to read to).
Returns:
batch: The batch we return. `Batch` is another bream construct that is part
of the bream source protocol and looks like:
Batch(data: Any, read_to: int) where:
`data` is the batch data
`read_to` is the offset we actually read to.
If there is no data available, this should be None.
"""
# if there are no files, we can't read any data:
if not (timestamp_to_file_map := self._get_timestamp_to_file_map()):
return None
# get the timestamps in sorted order
timestamps = sorted(timestamp_to_file_map)
# read from the next timestamp if a 'read_from_after' is given, otherwise start at beginning
read_from_idx = (
(timestamps.index(br.read_from_after) + 1) if br.read_from_after is not None else 0
)
# if there are no more files to read, we won't ready any data
if read_from_idx == len(timestamps):
return None
if br.read_to is not None:
# if told where to read to, respect it
read_to_idx = timestamps.index(br.read_to)
else:
# otherwise read as many files as configured to
read_to_idx = min(read_from_idx + self._num_files_per_batch - 1, len(timestamps) - 1)
# get the actual timestamps of files we should read
timestamps_of_files_to_read = timestamps[read_from_idx:read_to_idx+1]
# get the file paths we need to read
files_to_read = [timestamp_to_file_map[ts] for ts in timestamps_of_files_to_read]
# get the data from the files
data: dict[str, list[int]] = {}
for path in files_to_read:
with path.open("r") as f:
cleaned = [x for x in f.read().strip().split("\n") if x]
nums = [int(x) for x in cleaned]
data[path.name] = nums
return Batch(data=data, read_to=timestamps_of_files_to_read[-1])
...
Defining the stream processing function
Now we have a bream source set up, but before we configure a stream, we’ll want do something with the data emitted by the source. So let’s define a “batch function” that consumes the numbers loaded from each file and writes some basic stats:
from __future__ import annotations
from pathlib import Path
from random import random, randint, choices
from sys import argv
from time import sleep
from statistics import mean, stdev
from contextlib import suppress
from bream.core import Source, BatchRequest, Batch, Batches
...
def write_stats(batches: Batches, output_file: Path) -> None:
"""Example batch function that takes data from NumbersFromFilesByCreationTimestampSource.
This will be given to the bream `Stream` object as the batch processing function and
computes some basic stats for each list of numbers read from the files.
Args:
batches: A bream construct that looks like
Batches(batches: dict[str, Batch | None] = {<source name>: Batch(...), ...}) where
`batches` is a map of source name to Batch object. In this example there will be only
one key-value pair, but in general there will be one for each source of the stream.
output_file: Path to file we should write the output stats too.
"""
output_file.parent.mkdir(parents=True, exist_ok=True)
# get and report batch of numbers:
batch = list(batches.batches.values())[0] # in this example we know there is only one source
assert batch is not None
print(f"Seen batch: {batch}")
nums: dict[str, list[int]] = batch.data
# as per the source, the data is a map of the form {<filename>: [<numbers>, ...], ...}
# make the function flaky:
if random() < 0.1:
raise RuntimeError(f"raising error on batch: {batch}")
# process the batch and write output
with output_file.open("a") as f:
for fname, numlist in nums.items():
# compute stats:
mean_ = 0 if len(numlist) < 1 else mean(numlist)
stdev_ = 0 if len(numlist) < 2 else stdev(numlist)
stats = [len(numlist), mean_, stdev_]
# write stats:
stats_writable = (
f"{fname}: {numlist} -> "
f"(count={stats[0]}, mean={stats[1]}, stdev={stats[2]})\n"
)
f.write(stats_writable)
...
Setting up the stream
Now we’re read to set up and start the stream. Let’s add a function and another entrypoint to our script:
from __future__ import annotations
from pathlib import Path
from random import random, randint, choices
from functools import partial
from sys import argv
from time import sleep
from statistics import mean, stdev
from contextlib import suppress
from bream.core import Source, BatchRequest, Batch, Batches, Stream
...
def start_stream(input_dir: Path, output_file: Path, stream_dir: Path) -> None:
"""Start the stream for the example..
Args:
input_dir: path to dir of files to point NumbersFromFilesByCreationTimestampSource at
output_file: path to file where batch function should write stats to
stream_dir: path to dir where stream will track its progress
"""
# define the source:
source_name = "some_numbers"
source = NumbersFromFilesByCreationTimestampSource(
source_name, input_dir, num_files_per_batch=3
)
# define the batch function:
batch_func = partial(write_stats, output_file=output_file)
# define and start the stream: batch function is flaky so wrap it in a restart-loop:
while True:
stream = Stream([source], stream_dir)
# start stream in background thread, and don't read batches more often than 30 secs
stream.start(batch_func, min_batch_seconds=30)
stream.wait() # block until stream dies
if stream.status.error: # stream died which means an error happened, check status
print(f"error raised: {repr(stream.status.error)}")
...
if __name__ == "__main__":
if argv[1] == "startsource":
inputdir = Path(argv[2])
file_creation_loop(inputdir)
elif argv[1] == "startstream":
inputdir = Path(argv[2])
outputdir = Path(argv[3])
streamdir = Path(argv[4])
start_stream(inputdir, outputdir, streamdir)
else:
raise ValueError
Starting everything up
Now we’re ready to start the data source and the stream. We can start the source with in one process with:
python <pythonscript>.py startsource <where to put the files>
which should produce an output that looks something like:
Writing file: weyec11rft...
Writing file: 6z17vodynr...
Writing file: gzhzm0gx50...
Writing file: 0lar0xijkl...
Writing file: 0qznxuy93c...
Writing file: nt64iet8r7...
Writing file: n947n4c2lg...
Writing file: 6419nm4txf...
Writing file: dvilu31ftl...
Writing file: 833hp14faa...
Writing file: evy33h4glw...
...
We can start the stream with:
python <pythonscript>.py startstream <input dir, same as where to put files> <output file> <stream tracking dir>
which should produce an output that looks something like:
Seen batch: Batch(data={'weyec11rft': [14, 9, 28]}, read_to=1745697964833)
Seen batch: Batch(data={'6z17vodynr': [30], 'gzhzm0gx50': [35, 2, 6], '0lar0xijkl': [38, 15]}, read_to=1745697995425)
error raised: RuntimeError("raising error on batch: Batch(data={'6z17vodynr': [30], 'gzhzm0gx50': [35, 2, 6], '0lar0xijkl': [38, 15]}, read_to=1745697995425)")
Seen batch: Batch(data={'6z17vodynr': [30], 'gzhzm0gx50': [35, 2, 6], '0lar0xijkl': [38, 15]}, read_to=1745697995425)
Seen batch: Batch(data={'0qznxuy93c': [34], 'nt64iet8r7': [10, 28, 40]}, read_to=1745698027377)
error raised: RuntimeError("raising error on batch: Batch(data={'0qznxuy93c': [34], 'nt64iet8r7': [10, 28, 40]}, read_to=1745698027377)")
Seen batch: Batch(data={'0qznxuy93c': [34], 'nt64iet8r7': [10, 28, 40]}, read_to=1745698027377)
Seen batch: Batch(data={'n947n4c2lg': [9], '6419nm4txf': [6, 11, 25, 0, 32], 'dvilu31ftl': [31, 42, 39, 35, 19, 28]}, read_to=1745698052553)
Seen batch: Batch(data={'833hp14faa': [49, 24, 17, 12, 35], 'evy33h4glw': [34, 39]}, read_to=1745698087422)
...
Looking in the <output file>
we should see the basic stats like:
weyec11rft: [14, 9, 28] -> (count=3, mean=17, stdev=9.848857801796104)
6z17vodynr: [30] -> (count=1, mean=30, stdev=0)
gzhzm0gx50: [35, 2, 6] -> (count=3, mean=14.333333333333334, stdev=18.009256878986797)
0lar0xijkl: [38, 15] -> (count=2, mean=26.5, stdev=16.263455967290593)
0qznxuy93c: [34] -> (count=1, mean=34, stdev=0)
nt64iet8r7: [10, 28, 40] -> (count=3, mean=26, stdev=15.0996688705415)
n947n4c2lg: [9] -> (count=1, mean=9, stdev=0)
6419nm4txf: [6, 11, 25, 0, 32] -> (count=5, mean=14.8, stdev=13.330416347586446)
dvilu31ftl: [31, 42, 39, 35, 19, 28] -> (count=6, mean=32.333333333333336, stdev=8.286535263104035)
833hp14faa: [49, 24, 17, 12, 35] -> (count=5, mean=27.4, stdev=14.842506526863986)
evy33h4glw: [34, 39] -> (count=2, mean=36.5, stdev=3.5355339059327378)
...
and looking in the <stream tracking dir>
we should see the most recent offset
and commit
files as the stream
tracks its progress along the data source.
Full script
# filesbytimestamp.py
"""
A demonstration of how to use `bream`. This example sets up a simple data source
that writes small files (with random names) of numbers to a directory, and a bream Stream
that streams these lists of numbers into a batch function that coomputes and writes
basic stats.
The source can be started with
>> python filesbytimestamp.py startsource <directory to put number files>
The stream can be started in a separate process with
>> python filesbytimestamp.py startstream <input dir, same as where to put files> <output file> <stream tracking dir> # noqa: E501
"""
from __future__ import annotations
from pathlib import Path
from random import random, randint, choices
from functools import partial
from sys import argv
from time import sleep
from statistics import mean, stdev
from contextlib import suppress
from bream.core import Source, BatchRequest, Batch, Batches, Stream
def file_creation_loop(testdir: Path) -> None:
"""An example data source.
Every now and then, adds a randomly-named file to `test_dir` that contains
some numbers.
"""
testdir.mkdir(parents=True, exist_ok=True)
max_seconds_between_file_creation = 24
while True:
sleep_time = random()*max_seconds_between_file_creation
sleep(sleep_time)
fname = "".join(choices("abcdefghijklmnopqrstuvwxyz0123456789", k=10))
number_of_numbers = randint(1, 6)
nums = [f"{randint(0, 50)}\n" for _ in range(number_of_numbers)]
print(f"Writing file: {fname}...")
with (testdir / str(fname)).open("w") as f:
f.writelines(nums)
# cleanup: keep only latest 250 files
ps = sorted(testdir.glob("*"), key=(lambda p: p.stat().st_ctime))
for p in ps[:-250:]:
p.unlink()
class NumbersFromFilesByCreationTimestampSource(Source):
"""Bream wrapper for a directory of files containing numbers. The data will
be emitted in order of file creation timestamp. The file creation timestamp in
POSIX timestamp form is used as the stream "offset".
For this example, it will be be pointed at a directory populated by
`file_creation_loop`.
"""
def __init__(self, sourcename: str, filedir: Path, num_files_per_batch: int) -> None:
"""Initialize the bream data source.
Args:
sourcename: the name of the source - used by bream to identify the source
filedir: path to the dir containing files of numbers
num_files_per_batch: the maximum number of files that should be read in each batch
"""
# the name of the source - used by bream to identify the source:
self.name = sourcename
self._filedir = filedir
self._num_files_per_batch = num_files_per_batch
def _get_timestamp_to_file_map(self) -> dict[int, Path]:
"""Get a map of file-creation timestamps to file path for each file in the dir."""
files = list(self._filedir.glob("*"))
res: dict[int, Path] = {}
for f in files:
with suppress(FileNotFoundError): # avoid cleanup race-condition of file_creation_loop
res[int(f.stat().st_ctime*1000)] = f
return res
def read(self, br: BatchRequest) -> Batch | None:
"""Read a batch of number-lists from the directory.
The data will be read to a map with a key-value pair for each file read in this batch:
{<filename>: [<numbers>, ...], ...}
Args:
br: The batch request. This is a bream construct that is
part of the bream source protocol that looks like:
BreamRequest(read_from_after: int | None, read_to: int | None) where:
`read_from_after` is an offset such that we should start
from the next available one (or None if we should start from the
beginning)
`read_to` is the offset we should read to (or None if the source
gets to choose where to read to).
Returns:
batch: The batch we return. `Batch` is another bream construct that is part
of the bream source protocol and looks like:
Batch(data: Any, read_to: int) where:
`data` is the batch data
`read_to` is the offset we actually read to.
If there is no data available, this should be None.
"""
# if there are no files, we can't read any data:
if not (timestamp_to_file_map := self._get_timestamp_to_file_map()):
return None
# get the timestamps in sorted order
timestamps = sorted(timestamp_to_file_map)
# read from the next timestamp if a 'read_from_after' is given, otherwise start at beginnig
read_from_idx = (
(timestamps.index(br.read_from_after) + 1) if br.read_from_after is not None else 0
)
# if there are no more files to read, we won't ready any data
if read_from_idx == len(timestamps):
return None
if br.read_to is not None:
# if told where to read to, respect it
read_to_idx = timestamps.index(br.read_to)
else:
# otherwise read as many files as configured to
read_to_idx = min(read_from_idx + self._num_files_per_batch - 1, len(timestamps) - 1)
# get the actual timestamps of files we should read
timestamps_of_files_to_read = timestamps[read_from_idx:read_to_idx+1]
# get the file paths we need to read
files_to_read = [timestamp_to_file_map[ts] for ts in timestamps_of_files_to_read]
# get the data from the files
data: dict[str, list[int]] = {}
for path in files_to_read:
with path.open("r") as f:
cleaned = [x for x in f.read().strip().split("\n") if x]
nums = [int(x) for x in cleaned]
data[path.name] = nums
return Batch(data=data, read_to=timestamps_of_files_to_read[-1])
def write_stats(batches: Batches, output_file: Path) -> None:
"""Example batch function that takes data from NumbersFromFilesByCreationTimestampSource.
This will be given to the bream `Stream` object as the batch processing function and
computes some basic stats for each list of numbers read from the files.
Args:
batches: A bream construct that looks like
Batches(batches: dict[str, Batch | None] = {<source name>: Batch(...), ...}) where
`batches` is a map of source name to Batch object. In this example there will be only
one key-value pair, but in general there will be one for each source of the stream.
output_file: Path to file we should write the output stats too.
"""
output_file.parent.mkdir(parents=True, exist_ok=True)
# get and report batch of numbers:
batch = list(batches.batches.values())[0] # in this example we know there is only one source
assert batch is not None
print(f"Seen batch: {batch}")
nums: dict[str, list[int]] = batch.data
# as per the source, the data is a map of the form {<filename>: [<numbers>, ...], ...}
# make the function flaky:
if random() < 0.1:
raise RuntimeError(f"raising error on batch: {batch}")
# process the batch and write output
with output_file.open("a") as f:
for fname, numlist in nums.items():
# compute stats:
mean_ = 0 if len(numlist) < 1 else mean(numlist)
stdev_ = 0 if len(numlist) < 2 else stdev(numlist)
stats = [len(numlist), mean_, stdev_]
# write stats:
stats_writable = (
f"{fname}: {numlist} -> "
f"(count={stats[0]}, mean={stats[1]}, stdev={stats[2]})\n"
)
f.write(stats_writable)
def start_stream(input_dir: Path, output_file: Path, stream_dir: Path) -> None:
"""Start the stream for the example..
Args:
input_dir: path to dir of files to point NumbersFromFilesByCreationTimestampSource at
output_file: path to file where batch function should write stats to
stream_dir: path to dir where stream will track its progress
"""
# define the source:
source_name = "some_numbers"
source = NumbersFromFilesByCreationTimestampSource(
source_name, input_dir, num_files_per_batch=3
)
# define the batch function:
batch_func = partial(write_stats, output_file=output_file)
# define and start the stream: batch function is flaky so wrap it in a restart-loop:
while True:
stream = Stream([source], stream_dir)
# start stream in background thread, and don't read batches more often than 30 secs
stream.start(batch_func, min_batch_seconds=30)
stream.wait() # block until stream dies
if stream.status.error: # stream died which means an error happened, check status
print(f"error raised: {repr(stream.status.error)}")
if __name__ == "__main__":
if argv[1] == "startsource":
inputdir = Path(argv[2])
file_creation_loop(inputdir)
elif argv[1] == "startstream":
inputdir = Path(argv[2])
outputdir = Path(argv[3])
streamdir = Path(argv[4])
start_stream(inputdir, outputdir, streamdir)
else:
raise ValueError