Usage ======== Example: Files by creation timestamp ------------------------------------ In this example, we'll set up a ``bream`` stream that streams some numbers from files, in order of file creation time, to a processing function that computes and writes some basic stats. Setting up an example data source ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For this example, let's set up a simple data source. We'll have a loop that populates a directory with randomly named files (e.g ``gzhzm0gx50``) that contain some random positive integers, e.g:: 26 48 27 49 Let's set up a function and script entrypoint to do this:: from __future__ import annotations from pathlib import Path from random import random, randint, choices from sys import argv from time import sleep def file_creation_loop(testdir: Path) -> None: """An example data source. Every now and then, adds a randomly-named file to `test_dir` that contains some numbers. """ testdir.mkdir(parents=True, exist_ok=True) max_seconds_between_file_creation = 24 while True: sleep_time = random()*max_seconds_between_file_creation sleep(sleep_time) fname = "".join(choices("abcdefghijklmnopqrstuvwxyz0123456789", k=10)) number_of_numbers = randint(1, 6) nums = [f"{randint(0, 50)}\n" for _ in range(number_of_numbers)] print(f"Writing file: {fname}...") with (testdir / str(fname)).open("w") as f: f.writelines(nums) # cleanup: keep only latest 250 files ps = sorted(testdir.glob("*"), key=(lambda p: p.stat().st_ctime)) for p in ps[:-250:]: p.unlink() if __name__ == "__main__": if argv[1] == "startsource": inputdir = Path(argv[2]) file_creation_loop(inputdir) else: raise ValueError The source could now by started with:: python .py startsource But we have more to set up first. Wrapping the data source for use with ``bream`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ To use this data source with a bream stream, we first need to define a bream :py:class:`Source ` for it. To do so, we inherit from :py:class:`Source ` and define the name and ``read`` method that loads the files and emits the numbers:: from __future__ import annotations from pathlib import Path from random import random, randint, choices from sys import argv from time import sleep from contextlib import suppress from bream.core import Source, BatchRequest, Batch ... class NumbersFromFilesByCreationTimestampSource(Source): """Bream wrapper for a directory of files containing numbers. The data will be emitted in order of file creation timestamp. The file creation timestamp in POSIX timestamp form is used as the stream "offset". For this example, it will be be pointed at a directory populated by `file_creation_loop`. """ def __init__(self, sourcename: str, filedir: Path, num_files_per_batch: int) -> None: """Initialize the bream data source. Args: sourcename: the name of the source - used by bream to identify the source filedir: path to the dir containing files of numbers num_files_per_batch: the maximum number of files that should be read in each batch """ # the name of the source - used by bream to identify the source: self.name = sourcename self._filedir = filedir self._num_files_per_batch = num_files_per_batch def _get_timestamp_to_file_map(self) -> dict[int, Path]: """Get a map of file-creation timestamps to file path for each file in the dir.""" files = list(self._filedir.glob("*")) res: dict[int, Path] = {} for f in files: with suppress(FileNotFoundError): # avoid cleanup race-condition of file_creation_loop res[int(f.stat().st_ctime*1000)] = f return res def read(self, br: BatchRequest) -> Batch | None: """Read a batch of number-lists from the directory. The data will be read to a map with a key-value pair for each file read in this batch: {: [, ...], ...} Args: br: The batch request. This is a bream construct that is part of the bream source protocol that looks like: BreamRequest(read_from_after: int | None, read_to: int | None) where: `read_from_after` is an offset such that we should start from the next available one (or None if we should start from the beginning) `read_to` is the offset we should read to (or None if the source gets to choose where to read to). Returns: batch: The batch we return. `Batch` is another bream construct that is part of the bream source protocol and looks like: Batch(data: Any, read_to: int) where: `data` is the batch data `read_to` is the offset we actually read to. If there is no data available, this should be None. """ # if there are no files, we can't read any data: if not (timestamp_to_file_map := self._get_timestamp_to_file_map()): return None # get the timestamps in sorted order timestamps = sorted(timestamp_to_file_map) # read from the next timestamp if a 'read_from_after' is given, otherwise start at beginning read_from_idx = ( (timestamps.index(br.read_from_after) + 1) if br.read_from_after is not None else 0 ) # if there are no more files to read, we won't ready any data if read_from_idx == len(timestamps): return None if br.read_to is not None: # if told where to read to, respect it read_to_idx = timestamps.index(br.read_to) else: # otherwise read as many files as configured to read_to_idx = min(read_from_idx + self._num_files_per_batch - 1, len(timestamps) - 1) # get the actual timestamps of files we should read timestamps_of_files_to_read = timestamps[read_from_idx:read_to_idx+1] # get the file paths we need to read files_to_read = [timestamp_to_file_map[ts] for ts in timestamps_of_files_to_read] # get the data from the files data: dict[str, list[int]] = {} for path in files_to_read: with path.open("r") as f: cleaned = [x for x in f.read().strip().split("\n") if x] nums = [int(x) for x in cleaned] data[path.name] = nums return Batch(data=data, read_to=timestamps_of_files_to_read[-1]) ... Defining the stream processing function ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Now we have a bream source set up, but before we configure a stream, we'll want do something with the data emitted by the source. So let's define a "batch function" that consumes the numbers loaded from each file and writes some basic stats:: from __future__ import annotations from pathlib import Path from random import random, randint, choices from sys import argv from time import sleep from statistics import mean, stdev from contextlib import suppress from bream.core import Source, BatchRequest, Batch, Batches ... def write_stats(batches: Batches, output_file: Path) -> None: """Example batch function that takes data from NumbersFromFilesByCreationTimestampSource. This will be given to the bream `Stream` object as the batch processing function and computes some basic stats for each list of numbers read from the files. Args: batches: A bream construct that looks like Batches(batches: dict[str, Batch | None] = {: Batch(...), ...}) where `batches` is a map of source name to Batch object. In this example there will be only one key-value pair, but in general there will be one for each source of the stream. output_file: Path to file we should write the output stats too. """ output_file.parent.mkdir(parents=True, exist_ok=True) # get and report batch of numbers: batch = list(batches.batches.values())[0] # in this example we know there is only one source assert batch is not None print(f"Seen batch: {batch}") nums: dict[str, list[int]] = batch.data # as per the source, the data is a map of the form {: [, ...], ...} # make the function flaky: if random() < 0.1: raise RuntimeError(f"raising error on batch: {batch}") # process the batch and write output with output_file.open("a") as f: for fname, numlist in nums.items(): # compute stats: mean_ = 0 if len(numlist) < 1 else mean(numlist) stdev_ = 0 if len(numlist) < 2 else stdev(numlist) stats = [len(numlist), mean_, stdev_] # write stats: stats_writable = ( f"{fname}: {numlist} -> " f"(count={stats[0]}, mean={stats[1]}, stdev={stats[2]})\n" ) f.write(stats_writable) ... Setting up the stream ^^^^^^^^^^^^^^^^^^^^^ Now we're read to set up and start the stream. Let's add a function and another entrypoint to our script:: from __future__ import annotations from pathlib import Path from random import random, randint, choices from functools import partial from sys import argv from time import sleep from statistics import mean, stdev from contextlib import suppress from bream.core import Source, BatchRequest, Batch, Batches, Stream ... def start_stream(input_dir: Path, output_file: Path, stream_dir: Path) -> None: """Start the stream for the example.. Args: input_dir: path to dir of files to point NumbersFromFilesByCreationTimestampSource at output_file: path to file where batch function should write stats to stream_dir: path to dir where stream will track its progress """ # define the source: source_name = "some_numbers" source = NumbersFromFilesByCreationTimestampSource( source_name, input_dir, num_files_per_batch=3 ) # define the batch function: batch_func = partial(write_stats, output_file=output_file) # define and start the stream: batch function is flaky so wrap it in a restart-loop: while True: stream = Stream([source], stream_dir) # start stream in background thread, and don't read batches more often than 30 secs stream.start(batch_func, min_batch_seconds=30) stream.wait() # block until stream dies if stream.status.error: # stream died which means an error happened, check status print(f"error raised: {repr(stream.status.error)}") ... if __name__ == "__main__": if argv[1] == "startsource": inputdir = Path(argv[2]) file_creation_loop(inputdir) elif argv[1] == "startstream": inputdir = Path(argv[2]) outputdir = Path(argv[3]) streamdir = Path(argv[4]) start_stream(inputdir, outputdir, streamdir) else: raise ValueError Starting everything up ^^^^^^^^^^^^^^^^^^^^^^ Now we're ready to start the data source and the stream. We can start the source with in one process with:: python .py startsource which should produce an output that looks something like:: Writing file: weyec11rft... Writing file: 6z17vodynr... Writing file: gzhzm0gx50... Writing file: 0lar0xijkl... Writing file: 0qznxuy93c... Writing file: nt64iet8r7... Writing file: n947n4c2lg... Writing file: 6419nm4txf... Writing file: dvilu31ftl... Writing file: 833hp14faa... Writing file: evy33h4glw... ... We can start the stream with:: python .py startstream which should produce an output that looks something like:: Seen batch: Batch(data={'weyec11rft': [14, 9, 28]}, read_to=1745697964833) Seen batch: Batch(data={'6z17vodynr': [30], 'gzhzm0gx50': [35, 2, 6], '0lar0xijkl': [38, 15]}, read_to=1745697995425) error raised: RuntimeError("raising error on batch: Batch(data={'6z17vodynr': [30], 'gzhzm0gx50': [35, 2, 6], '0lar0xijkl': [38, 15]}, read_to=1745697995425)") Seen batch: Batch(data={'6z17vodynr': [30], 'gzhzm0gx50': [35, 2, 6], '0lar0xijkl': [38, 15]}, read_to=1745697995425) Seen batch: Batch(data={'0qznxuy93c': [34], 'nt64iet8r7': [10, 28, 40]}, read_to=1745698027377) error raised: RuntimeError("raising error on batch: Batch(data={'0qznxuy93c': [34], 'nt64iet8r7': [10, 28, 40]}, read_to=1745698027377)") Seen batch: Batch(data={'0qznxuy93c': [34], 'nt64iet8r7': [10, 28, 40]}, read_to=1745698027377) Seen batch: Batch(data={'n947n4c2lg': [9], '6419nm4txf': [6, 11, 25, 0, 32], 'dvilu31ftl': [31, 42, 39, 35, 19, 28]}, read_to=1745698052553) Seen batch: Batch(data={'833hp14faa': [49, 24, 17, 12, 35], 'evy33h4glw': [34, 39]}, read_to=1745698087422) ... Looking in the ```` we should see the basic stats like:: weyec11rft: [14, 9, 28] -> (count=3, mean=17, stdev=9.848857801796104) 6z17vodynr: [30] -> (count=1, mean=30, stdev=0) gzhzm0gx50: [35, 2, 6] -> (count=3, mean=14.333333333333334, stdev=18.009256878986797) 0lar0xijkl: [38, 15] -> (count=2, mean=26.5, stdev=16.263455967290593) 0qznxuy93c: [34] -> (count=1, mean=34, stdev=0) nt64iet8r7: [10, 28, 40] -> (count=3, mean=26, stdev=15.0996688705415) n947n4c2lg: [9] -> (count=1, mean=9, stdev=0) 6419nm4txf: [6, 11, 25, 0, 32] -> (count=5, mean=14.8, stdev=13.330416347586446) dvilu31ftl: [31, 42, 39, 35, 19, 28] -> (count=6, mean=32.333333333333336, stdev=8.286535263104035) 833hp14faa: [49, 24, 17, 12, 35] -> (count=5, mean=27.4, stdev=14.842506526863986) evy33h4glw: [34, 39] -> (count=2, mean=36.5, stdev=3.5355339059327378) ... and looking in the ```` we should see the most recent ``offset`` and ``commit`` files as the stream tracks its progress along the data source. Full script ^^^^^^^^^^^ :: # filesbytimestamp.py """ A demonstration of how to use `bream`. This example sets up a simple data source that writes small files (with random names) of numbers to a directory, and a bream Stream that streams these lists of numbers into a batch function that coomputes and writes basic stats. The source can be started with >> python filesbytimestamp.py startsource The stream can be started in a separate process with >> python filesbytimestamp.py startstream # noqa: E501 """ from __future__ import annotations from pathlib import Path from random import random, randint, choices from functools import partial from sys import argv from time import sleep from statistics import mean, stdev from contextlib import suppress from bream.core import Source, BatchRequest, Batch, Batches, Stream def file_creation_loop(testdir: Path) -> None: """An example data source. Every now and then, adds a randomly-named file to `test_dir` that contains some numbers. """ testdir.mkdir(parents=True, exist_ok=True) max_seconds_between_file_creation = 24 while True: sleep_time = random()*max_seconds_between_file_creation sleep(sleep_time) fname = "".join(choices("abcdefghijklmnopqrstuvwxyz0123456789", k=10)) number_of_numbers = randint(1, 6) nums = [f"{randint(0, 50)}\n" for _ in range(number_of_numbers)] print(f"Writing file: {fname}...") with (testdir / str(fname)).open("w") as f: f.writelines(nums) # cleanup: keep only latest 250 files ps = sorted(testdir.glob("*"), key=(lambda p: p.stat().st_ctime)) for p in ps[:-250:]: p.unlink() class NumbersFromFilesByCreationTimestampSource(Source): """Bream wrapper for a directory of files containing numbers. The data will be emitted in order of file creation timestamp. The file creation timestamp in POSIX timestamp form is used as the stream "offset". For this example, it will be be pointed at a directory populated by `file_creation_loop`. """ def __init__(self, sourcename: str, filedir: Path, num_files_per_batch: int) -> None: """Initialize the bream data source. Args: sourcename: the name of the source - used by bream to identify the source filedir: path to the dir containing files of numbers num_files_per_batch: the maximum number of files that should be read in each batch """ # the name of the source - used by bream to identify the source: self.name = sourcename self._filedir = filedir self._num_files_per_batch = num_files_per_batch def _get_timestamp_to_file_map(self) -> dict[int, Path]: """Get a map of file-creation timestamps to file path for each file in the dir.""" files = list(self._filedir.glob("*")) res: dict[int, Path] = {} for f in files: with suppress(FileNotFoundError): # avoid cleanup race-condition of file_creation_loop res[int(f.stat().st_ctime*1000)] = f return res def read(self, br: BatchRequest) -> Batch | None: """Read a batch of number-lists from the directory. The data will be read to a map with a key-value pair for each file read in this batch: {: [, ...], ...} Args: br: The batch request. This is a bream construct that is part of the bream source protocol that looks like: BreamRequest(read_from_after: int | None, read_to: int | None) where: `read_from_after` is an offset such that we should start from the next available one (or None if we should start from the beginning) `read_to` is the offset we should read to (or None if the source gets to choose where to read to). Returns: batch: The batch we return. `Batch` is another bream construct that is part of the bream source protocol and looks like: Batch(data: Any, read_to: int) where: `data` is the batch data `read_to` is the offset we actually read to. If there is no data available, this should be None. """ # if there are no files, we can't read any data: if not (timestamp_to_file_map := self._get_timestamp_to_file_map()): return None # get the timestamps in sorted order timestamps = sorted(timestamp_to_file_map) # read from the next timestamp if a 'read_from_after' is given, otherwise start at beginnig read_from_idx = ( (timestamps.index(br.read_from_after) + 1) if br.read_from_after is not None else 0 ) # if there are no more files to read, we won't ready any data if read_from_idx == len(timestamps): return None if br.read_to is not None: # if told where to read to, respect it read_to_idx = timestamps.index(br.read_to) else: # otherwise read as many files as configured to read_to_idx = min(read_from_idx + self._num_files_per_batch - 1, len(timestamps) - 1) # get the actual timestamps of files we should read timestamps_of_files_to_read = timestamps[read_from_idx:read_to_idx+1] # get the file paths we need to read files_to_read = [timestamp_to_file_map[ts] for ts in timestamps_of_files_to_read] # get the data from the files data: dict[str, list[int]] = {} for path in files_to_read: with path.open("r") as f: cleaned = [x for x in f.read().strip().split("\n") if x] nums = [int(x) for x in cleaned] data[path.name] = nums return Batch(data=data, read_to=timestamps_of_files_to_read[-1]) def write_stats(batches: Batches, output_file: Path) -> None: """Example batch function that takes data from NumbersFromFilesByCreationTimestampSource. This will be given to the bream `Stream` object as the batch processing function and computes some basic stats for each list of numbers read from the files. Args: batches: A bream construct that looks like Batches(batches: dict[str, Batch | None] = {: Batch(...), ...}) where `batches` is a map of source name to Batch object. In this example there will be only one key-value pair, but in general there will be one for each source of the stream. output_file: Path to file we should write the output stats too. """ output_file.parent.mkdir(parents=True, exist_ok=True) # get and report batch of numbers: batch = list(batches.batches.values())[0] # in this example we know there is only one source assert batch is not None print(f"Seen batch: {batch}") nums: dict[str, list[int]] = batch.data # as per the source, the data is a map of the form {: [, ...], ...} # make the function flaky: if random() < 0.1: raise RuntimeError(f"raising error on batch: {batch}") # process the batch and write output with output_file.open("a") as f: for fname, numlist in nums.items(): # compute stats: mean_ = 0 if len(numlist) < 1 else mean(numlist) stdev_ = 0 if len(numlist) < 2 else stdev(numlist) stats = [len(numlist), mean_, stdev_] # write stats: stats_writable = ( f"{fname}: {numlist} -> " f"(count={stats[0]}, mean={stats[1]}, stdev={stats[2]})\n" ) f.write(stats_writable) def start_stream(input_dir: Path, output_file: Path, stream_dir: Path) -> None: """Start the stream for the example.. Args: input_dir: path to dir of files to point NumbersFromFilesByCreationTimestampSource at output_file: path to file where batch function should write stats to stream_dir: path to dir where stream will track its progress """ # define the source: source_name = "some_numbers" source = NumbersFromFilesByCreationTimestampSource( source_name, input_dir, num_files_per_batch=3 ) # define the batch function: batch_func = partial(write_stats, output_file=output_file) # define and start the stream: batch function is flaky so wrap it in a restart-loop: while True: stream = Stream([source], stream_dir) # start stream in background thread, and don't read batches more often than 30 secs stream.start(batch_func, min_batch_seconds=30) stream.wait() # block until stream dies if stream.status.error: # stream died which means an error happened, check status print(f"error raised: {repr(stream.status.error)}") if __name__ == "__main__": if argv[1] == "startsource": inputdir = Path(argv[2]) file_creation_loop(inputdir) elif argv[1] == "startstream": inputdir = Path(argv[2]) outputdir = Path(argv[3]) streamdir = Path(argv[4]) start_stream(inputdir, outputdir, streamdir) else: raise ValueError