Handling Missing Values in Data Streams#
Given that the real world is never ideal, our datasets are often far from perfect and contain missing values. In order to build accurate machine learning models, we must address missing values. When data is missing, our understanding of the system is incomplete, potentially due to issues such as sensor failure, network issues, or optional data fields. In real-time applications like self-driving cars, heating systems, and smart devices, incorrect interpretation of missing data can have severe consequences. The process for dealing with missing value is called imputation and we will demonstrate how you can build a custom window to deal with this in Bytewax.
Skill Level |
Time to Complete |
Level |
---|---|---|
Basic, no prior knowledge requirement |
Approx. 15 Min |
Beginner |
Your Takeaway#
Learn how to create a custom sliding window with the stateful map operator to impute, or fill in missing values, with an estimate using numpy
Resources#
bytewax/bytewax/docs/tutorials/handling-missing-data/missing_data_dataflow.py
Important Concepts#
Bytewax is based around the concept of a dataflow. A dataflow is made up of a sequence of operators that interact with data that is “flowing” through it. The dataflow is a directed graph where the nodes are operators and the edges are the data that flows between them. The dataflow is a powerful abstraction that allows you to build complex data processing pipelines with ease.
Stateless vs. Stateful - In Bytewax, operators can be either stateless or stateful. A stateless operator is one that processes each value it sees in isolation. A stateful operator, on the other hand, maintains some state between items and allows you to modify the state. This state can be used to store information about the data that has been seen so far, or to store the results of some computation.
Workers - A worker is a single thread of execution that runs a dataflow. Workers are responsible for executing the operators in the dataflow and passing data between them. Workers can run on a single machine, or they can be distributed across multiple machines. See Execution for more information
Goal#
Generate a dataflow that will impute missing values in a stream of data of random integers and nan values.
We can represent our dataflow - called map_eg through this diagram, in which the data flows through three key steps:
input: includes a random integer between 0 and 10 or a numpy nan value for every 5th value
stateful map to impute the values: we will create a custom window to impute the missing values
output: output the data and the imputed value to standard output
Let’s get started!
Imports and Setup#
Before we begin, let’s import the necessary modules and set up the environment for building the dataflow.
Complete installation - we recommend using a virtual environment to manage your Python dependencies. You can install Bytewax using pip:
$ python -m venv venv
$ ./venv/bin/activate
(venv) $ pip install bytewax==0.21.0
Now, let’s import the required modules and set up the environment for building the dataflow.
10import random
11from typing import Optional, Tuple
12
13import bytewax.operators as op
14import numpy as np
15from bytewax.connectors.stdio import StdOutSink
16from bytewax.dataflow import Dataflow
17from bytewax.inputs import DynamicSource, StatelessSourcePartition
18
Input Code#
For this example we will mock up some data that will yield either a random integer between 0 and 10, or a numpy nan value for every 5th value we generate.
To simulate the generation of random numbers and NaN
values, we will create a class called RandomNumpyData
. This class will generate a random integer between 0 and 10, or a NaN
value for every 5th value. We will design this class to inherit from StatelessSourcePartition
, allowing us to create our input as a stateless Bytewax input partition.
Next, we will create the RandomNumpyInput
class, which will act as a wrapper for RandomNumpyData
. This wrapper facilitates dynamic data generation based on the distribution of work across multiple workers in a distributed processing system. When the data source needs to be instantiated (e.g., at the start of a processing step or when distributed across workers), each worker will create and return an instance of RandomNumpyData
.
23class RandomNumpyData(StatelessSourcePartition):
24 """Generate a random sequence of numbers with missing values.
25
26 Data Source that generates a sequence
27 of 100 numbers, where every 5th number is
28 missing (represented by np.nan),
29 and the rest are random integers between 0 and 10.
30 """
31
32 def __init__(self):
33 """Initialize the data source."""
34 self._it = enumerate(range(100))
35
36 def next_batch(self):
37 """Generate the next batch of data.
38
39 Returns:
40 list: A list of tuples containing the data.
41 If the index of the item is divisible by 5,
42 the data is np.nan, otherwise it is a random
43 """
44 i, item = next(self._it)
45 if i % 5 == 0:
46 return [("data", np.nan)]
47 else:
48 return [("data", random.randint(0, 10))]
49
50
51class RandomNumpyInput(DynamicSource):
52 """Generate random data based on worker distribution.
53
54 Class encapsulating dynamic data generation
55 based on worker distribution in distributed processing
56 """
57
58 def build(self, step_id, _worker_index, _worker_count):
59 """Build the data source."""
60 return RandomNumpyData()
61
62
When the data source needs to be built (e.g., at the start of a processing step or when distributed across workers), it simply creates and returns an instance of RandomNumpyData
.
We can then initialize our dataflow with RandomNumpyInput
as the input source.
With this we complete the input part of our dataflow. We will now turn our attention to how we can set up custom windowing using the stateful map operator.
66flow = Dataflow("map_eg")
67input_stream = op.input("input", flow, RandomNumpyInput())
Custom Window Using Stateful Map#
Before we dive into the code, it is important to understand the stateful map operator. Stateful map is a one-to-one transformation of values in (key, value) pairs, but allows you to reference a persistent state for each key when doing the transformation. For more information see the API docs stateful_map
.
In our case our key will be the same for the entire stream because we only have one stream of data in this example. So, we have some code that will create a WindowedArray
object in the builder function and then use the update function to impute the mean. This class allows us to maintain a sliding window of the most recent values in a sequence, allowing for the computation of window-based statistics.
Let’s unpack the code. When our class WindowedArray
is initialized, it will create an empty Numpy array with dtype
of object. The reason the the object datatype is that this will allow us to add both integers and Nan
values. For each new data point that we receive, we will instruct the stateful map operator to use the impute_value method that will check if the value is nan and then calculate the mean from the last n objects, n being the size of array of values we’ve “remembered”. In other words, how many of the values we care about and want to use in our calculation. this will vary on the application itself. It will also add the value to our window (last_n
).
72class WindowedArray:
73 """Windowed Numpy Array.
74
75 Create a numpy array to run windowed statistics on.
76 """
77
78 def __init__(self, window_size: int) -> None:
79 """Initialize the windowed array.
80
81 Args:
82 window_size (int): The size of the window.
83 """
84 self.last_n = np.empty(0, dtype=float)
85 self.n = window_size
86
87 def push(self, value: float) -> None:
88 """Push a value into the windowed array.
89
90 Args:
91 value (float): The value to push into the array.
92 """
93 if np.isscalar(value) and np.isreal(value):
94 self.last_n = np.insert(self.last_n, 0, value)
95 try:
96 self.last_n = np.delete(self.last_n, self.n)
97 except IndexError:
98 pass
99
100 def impute_value(self) -> float:
101 """Impute the next value in the windowed array.
102
103 Returns:
104 tuple: A tuple containing the original value and the imputed value.
105 """
106 return np.nanmean(self.last_n)
107
108
We also create a StatefulImputer
wrapper class that will create an instance of WindowedArray
and return it when the stateful map operator needs to be built. This is useful for maintaining state in streaming data processing where state needs to be maintained across batches of data. The StatefulImputer
provides a convenient way to manage state. By separating the state management into a simple mapper function, the WindowedArray
can remain focused on its primary functionality without concerning itself with the operator’s requirements.
113class StatefulImputer:
114 """Impute values while maintaining state.
115
116 This class is a stateful object that encapsulates a
117 WindowedArray and provides a method that uses this
118 array to impute values.
119 The impute_value method of this object is passed to
120 op.stateful_map, so the state is maintained across
121 calls to this method.
122 """
123
124 def __init__(self, window_size):
125 """Initialize the stateful imputer.
126
127 Args:
128 window_size (int): The size of the window.
129 """
130 self.windowed_array = WindowedArray(window_size)
131
132 def impute_value(self, key, value):
133 """Impute the value in the windowed array."""
134 return self.windowed_array.impute_value(value)
135
136
We can then add the stateful_map
step to our dataflow.
141def mapper(
142 window: Optional[WindowedArray], orig_value: float
143) -> Tuple[Optional[WindowedArray], Tuple[float, float]]:
144 """Impute missing values in a stream of numbers."""
145 if window is None:
146 window = WindowedArray(10)
147 if not np.isnan(orig_value):
148 window.push(orig_value)
149 new_value = orig_value
150 else:
151 new_value = window.impute_value() # Calculate derived value.
152
153 return (window, (orig_value, new_value))
154
155
156imputed_stream = op.stateful_map("impute", input_stream, mapper)
Output Results#
Next up we will use the output
operator to write our code to a sink, in this case StdOutSink
. This is not going to do anything sophisticated, just print the data and the imputed value to standard output.
160op.output("output", imputed_stream, StdOutSink())
Running our dataflow#
That’s it! To run the code, use the following invocation:
> python -m bytewax.run dataflow:flow
This yields:
('data', (nan, nan))
('data', (10, 10))
('data', (1, 1))
('data', (4, 4))
('data', (10, 10))
('data', (nan, 6.25))
('data', (5, 5))
('data', (10, 10))
('data', (8, 8))
('data', (5, 5))
('data', (nan, 6.625))
On the left hand side of the tuple we see the original value, and on the right hand side the imputed value. Note that the imputed value is calculated based on the last 4 values in the window. Note also that if the first value in the stream is an empty value, the imputed value will be the same as the first value in the stream.
Summary#
In this example, we learned how to impute missing values from a datastream using Bytewax and numpy through custom classes, the dataflow and stateful map operators.