Skip to content

Data Processing

Processing module - stores all inputs to run Dynamic Factor Model.

DataProcessor

Source code in dfmdash/processing.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class DataProcessor:
    def __init__(self, ad: AnnData, global_multiplier: int = 1, maxiter: int = 10_000):
        """Prepares inputs for running model

        Args:
            ad (AnnData): Annotated data object
            global_multiplier (int, optional): Global multiplier. Defaults to 1.
            maxiter (int, optional): Maximum number of iterations. Defaults to 10_000.
        """
        self.ad = ad
        self.global_multiplier = global_multiplier
        self.multiplicities = {"Global": global_multiplier}
        self.maxiter = maxiter
        self.non_stationary_cols = None
        self.raw: pd.DataFrame = None
        self.df: pd.DataFrame = None

    def __repr__(self):
        return f"DataProcessor(ad={self.ad}, global_multiplier={self.global_multiplier}, maxiter={self.maxiter})"

    def process(self, columns: Optional[list[str]] = None) -> "DataProcessor":
        """Processes the data for the Dynamic Factor Model

        Args:
            columns (Optional[list[str]], optional): Subset of columns to use. Defaults to None, which uses all columns.

        Returns:
            DataProcessor: Stores processed data
        """
        filtered_columns = [x for x in columns if x in columns] if columns else None
        if filtered_columns and len(filtered_columns) != len(columns):
            print(f"Invalid columns removed!\nInput: {columns}\nFiltered: {filtered_columns}")
        self.raw = self.ad.to_df()[columns] if columns else self.ad.to_df()
        self.df = self.raw.copy()
        self.process_differences().drop_constant_cols().normalize()
        self.factors = {k: v for k, v in self.get_factors().items() if k in self.df.columns}
        self.stationary_columns = self.get_nonstationary_columns()

        return self

    def write(self, outdir: Path):
        """Writes the processed input data and run info to outdir

        Args:
            outdir (Path): Output directory
        """
        outdir.mkdir(exist_ok=True)
        self.raw.to_csv(outdir / "raw.csv")
        self.df.to_csv(outdir / "df.csv")
        with open(outdir / "run-info.yaml", "w") as f:
            yaml.dump(
                {
                    "factor_map": self.factors,
                    "global_multiplier": self.global_multiplier,
                    "maxiter": self.maxiter,
                    "non_stationary_cols": self.non_stationary_cols,
                    "diff_cols": self.diff_cols,
                    "logdiff_cols": self.logdiff_cols,
                },
                f,
            )

    def get_factors(self) -> dict[str, tuple[str]]:
        """Gets the factor dictionary from the AnnData object for the DFM

        Returns:
            dict[str, tuple[str]]: Dictionary of factors
        """
        if "factor" not in self.ad.var.columns:
            msg = "No `factor` column in AnnData input. Please add to `.var`"
            raise RuntimeError(msg)
        factors = self.ad.var.factor.to_dict()
        if self.global_multiplier == 0:
            return {k: (v,) for k, v in factors.items()}
        return {k: ("Global", v) for k, v in factors.items()}

    def process_differences(self) -> "DataProcessor":
        """Processes the differences in the data

        Returns:
            DataProcessor: Processed data
        """
        self.diff_cols = self.get_diff_cols()
        self.logdiff_cols = self.get_logdiff_cols()
        if self.diff_cols:
            self.diff_vars()
        if self.logdiff_cols:
            self.logdiff_vars()
        if self.diff_cols or self.logdiff_cols:
            self.df = self.df.iloc[1:]
            self.raw = self.raw.iloc[1:]  # Trim raw dataframe for parity
        self.df = self.df.fillna(0)
        return self

    def drop_constant_cols(self) -> "DataProcessor":
        """Drops constant columns from the DataFrame.

        Returns:
            DataProcessor: Processed data
        """
        self.df = self.df.loc[:, self.df.columns[~self.df.apply(is_constant)]]
        return self

    def get_diff_cols(self) -> list[str]:
        """Returns the columns that should be differenced.

        Returns:
            list[str]: List of columns to be differenced
        """
        return self._get_cols("difference")

    def get_logdiff_cols(self) -> list[str]:
        """Returns the columns that should be log-differenced.

        Returns:
            list[str]: List of columns to be log-differenced
        """
        return self._get_cols("logdiff")

    def _get_cols(self, colname: str) -> list[str]:
        """Helper function to get columns based on a specific condition

        Args:
            colname (str): Name of the condition

        Returns:
            list[str]: List of columns that satisfy the condition
        """
        if colname not in self.ad.var.columns:
            return []
        columns = self.ad.var.query(f"{colname} == True").index.to_list()
        return [x for x in columns if x in self.df.columns]

    def diff_vars(self) -> "DataProcessor":
        """Performs differencing on the specified columns

        Returns:
            DataProcessor: Processed data
        """
        self.df[self.diff_cols] = self.df[self.diff_cols].diff()
        return self

    def logdiff_vars(self) -> "DataProcessor":
        """Performs log-differencing on the specified columns

        Returns:
            DataProcessor: Processed data
        """
        self.df[self.logdiff_cols] = self.df[self.logdiff_cols].apply(lambda x: np.log(x + 1)).diff()
        return self

    def get_nonstationary_columns(self) -> list[str]:
        """Runs AD-Fuller test on columns and returns non-stationary columns

        Returns:
            list[str]: List of non-stationary columns
        """
        cols = []
        for col in self.df.columns:
            result = adfuller(self.df[col])
            p_value = result[1]
            if p_value > 0.25:  # TODO: Ask Aaron/Josh - p-value 0.25 is pretty weird
                cols.append(col)
        print(f"Columns that fail the ADF test (non-stationary)\n{cols}")
        return cols

    def normalize(self) -> "DataProcessor":
        """Normalizes the data between 0 and 1

        Returns:
            DataProcessor: Processed data
        """
        self.df = pd.DataFrame(MinMaxScaler().fit_transform(self.df), columns=self.df.columns)
        self.df.index = self.raw.index
        return self

__init__(ad, global_multiplier=1, maxiter=10000)

Prepares inputs for running model

Parameters:

Name Type Description Default
ad AnnData

Annotated data object

required
global_multiplier int

Global multiplier. Defaults to 1.

1
maxiter int

Maximum number of iterations. Defaults to 10_000.

10000
Source code in dfmdash/processing.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def __init__(self, ad: AnnData, global_multiplier: int = 1, maxiter: int = 10_000):
    """Prepares inputs for running model

    Args:
        ad (AnnData): Annotated data object
        global_multiplier (int, optional): Global multiplier. Defaults to 1.
        maxiter (int, optional): Maximum number of iterations. Defaults to 10_000.
    """
    self.ad = ad
    self.global_multiplier = global_multiplier
    self.multiplicities = {"Global": global_multiplier}
    self.maxiter = maxiter
    self.non_stationary_cols = None
    self.raw: pd.DataFrame = None
    self.df: pd.DataFrame = None

diff_vars()

Performs differencing on the specified columns

Returns:

Name Type Description
DataProcessor DataProcessor

Processed data

Source code in dfmdash/processing.py
146
147
148
149
150
151
152
153
def diff_vars(self) -> "DataProcessor":
    """Performs differencing on the specified columns

    Returns:
        DataProcessor: Processed data
    """
    self.df[self.diff_cols] = self.df[self.diff_cols].diff()
    return self

drop_constant_cols()

Drops constant columns from the DataFrame.

Returns:

Name Type Description
DataProcessor DataProcessor

Processed data

Source code in dfmdash/processing.py
107
108
109
110
111
112
113
114
def drop_constant_cols(self) -> "DataProcessor":
    """Drops constant columns from the DataFrame.

    Returns:
        DataProcessor: Processed data
    """
    self.df = self.df.loc[:, self.df.columns[~self.df.apply(is_constant)]]
    return self

get_diff_cols()

Returns the columns that should be differenced.

Returns:

Type Description
list[str]

list[str]: List of columns to be differenced

Source code in dfmdash/processing.py
116
117
118
119
120
121
122
def get_diff_cols(self) -> list[str]:
    """Returns the columns that should be differenced.

    Returns:
        list[str]: List of columns to be differenced
    """
    return self._get_cols("difference")

get_factors()

Gets the factor dictionary from the AnnData object for the DFM

Returns:

Type Description
dict[str, tuple[str]]

dict[str, tuple[str]]: Dictionary of factors

Source code in dfmdash/processing.py
75
76
77
78
79
80
81
82
83
84
85
86
87
def get_factors(self) -> dict[str, tuple[str]]:
    """Gets the factor dictionary from the AnnData object for the DFM

    Returns:
        dict[str, tuple[str]]: Dictionary of factors
    """
    if "factor" not in self.ad.var.columns:
        msg = "No `factor` column in AnnData input. Please add to `.var`"
        raise RuntimeError(msg)
    factors = self.ad.var.factor.to_dict()
    if self.global_multiplier == 0:
        return {k: (v,) for k, v in factors.items()}
    return {k: ("Global", v) for k, v in factors.items()}

get_logdiff_cols()

Returns the columns that should be log-differenced.

Returns:

Type Description
list[str]

list[str]: List of columns to be log-differenced

Source code in dfmdash/processing.py
124
125
126
127
128
129
130
def get_logdiff_cols(self) -> list[str]:
    """Returns the columns that should be log-differenced.

    Returns:
        list[str]: List of columns to be log-differenced
    """
    return self._get_cols("logdiff")

get_nonstationary_columns()

Runs AD-Fuller test on columns and returns non-stationary columns

Returns:

Type Description
list[str]

list[str]: List of non-stationary columns

Source code in dfmdash/processing.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def get_nonstationary_columns(self) -> list[str]:
    """Runs AD-Fuller test on columns and returns non-stationary columns

    Returns:
        list[str]: List of non-stationary columns
    """
    cols = []
    for col in self.df.columns:
        result = adfuller(self.df[col])
        p_value = result[1]
        if p_value > 0.25:  # TODO: Ask Aaron/Josh - p-value 0.25 is pretty weird
            cols.append(col)
    print(f"Columns that fail the ADF test (non-stationary)\n{cols}")
    return cols

logdiff_vars()

Performs log-differencing on the specified columns

Returns:

Name Type Description
DataProcessor DataProcessor

Processed data

Source code in dfmdash/processing.py
155
156
157
158
159
160
161
162
def logdiff_vars(self) -> "DataProcessor":
    """Performs log-differencing on the specified columns

    Returns:
        DataProcessor: Processed data
    """
    self.df[self.logdiff_cols] = self.df[self.logdiff_cols].apply(lambda x: np.log(x + 1)).diff()
    return self

normalize()

Normalizes the data between 0 and 1

Returns:

Name Type Description
DataProcessor DataProcessor

Processed data

Source code in dfmdash/processing.py
179
180
181
182
183
184
185
186
187
def normalize(self) -> "DataProcessor":
    """Normalizes the data between 0 and 1

    Returns:
        DataProcessor: Processed data
    """
    self.df = pd.DataFrame(MinMaxScaler().fit_transform(self.df), columns=self.df.columns)
    self.df.index = self.raw.index
    return self

process(columns=None)

Processes the data for the Dynamic Factor Model

Parameters:

Name Type Description Default
columns Optional[list[str]]

Subset of columns to use. Defaults to None, which uses all columns.

None

Returns:

Name Type Description
DataProcessor DataProcessor

Stores processed data

Source code in dfmdash/processing.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def process(self, columns: Optional[list[str]] = None) -> "DataProcessor":
    """Processes the data for the Dynamic Factor Model

    Args:
        columns (Optional[list[str]], optional): Subset of columns to use. Defaults to None, which uses all columns.

    Returns:
        DataProcessor: Stores processed data
    """
    filtered_columns = [x for x in columns if x in columns] if columns else None
    if filtered_columns and len(filtered_columns) != len(columns):
        print(f"Invalid columns removed!\nInput: {columns}\nFiltered: {filtered_columns}")
    self.raw = self.ad.to_df()[columns] if columns else self.ad.to_df()
    self.df = self.raw.copy()
    self.process_differences().drop_constant_cols().normalize()
    self.factors = {k: v for k, v in self.get_factors().items() if k in self.df.columns}
    self.stationary_columns = self.get_nonstationary_columns()

    return self

process_differences()

Processes the differences in the data

Returns:

Name Type Description
DataProcessor DataProcessor

Processed data

Source code in dfmdash/processing.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def process_differences(self) -> "DataProcessor":
    """Processes the differences in the data

    Returns:
        DataProcessor: Processed data
    """
    self.diff_cols = self.get_diff_cols()
    self.logdiff_cols = self.get_logdiff_cols()
    if self.diff_cols:
        self.diff_vars()
    if self.logdiff_cols:
        self.logdiff_vars()
    if self.diff_cols or self.logdiff_cols:
        self.df = self.df.iloc[1:]
        self.raw = self.raw.iloc[1:]  # Trim raw dataframe for parity
    self.df = self.df.fillna(0)
    return self

write(outdir)

Writes the processed input data and run info to outdir

Parameters:

Name Type Description Default
outdir Path

Output directory

required
Source code in dfmdash/processing.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def write(self, outdir: Path):
    """Writes the processed input data and run info to outdir

    Args:
        outdir (Path): Output directory
    """
    outdir.mkdir(exist_ok=True)
    self.raw.to_csv(outdir / "raw.csv")
    self.df.to_csv(outdir / "df.csv")
    with open(outdir / "run-info.yaml", "w") as f:
        yaml.dump(
            {
                "factor_map": self.factors,
                "global_multiplier": self.global_multiplier,
                "maxiter": self.maxiter,
                "non_stationary_cols": self.non_stationary_cols,
                "diff_cols": self.diff_cols,
                "logdiff_cols": self.logdiff_cols,
            },
            f,
        )

is_constant(column)

Returns True if a DataFrame column is constant

Source code in dfmdash/processing.py
190
191
192
def is_constant(column) -> bool:
    """Returns True if a DataFrame column is constant"""
    return all(column == column.iloc[0])