[docs]@dataclass(frozen=True)classContiguousPanelMap:"""Map for panel data with contiguous blocks per individual."""unique_ids:np.ndarray# shape (K,), individual labels, in order of appearancestarts:np.ndarray# shape (K,), start row index of each individual blockcounts:np.ndarray# shape (K,), number of rows per individualindptr:np.ndarray# shape (K+1,), cumulative pointers: [starts] + [N]
[docs]defbuild_contiguous_panel_map(df:pd.DataFrame,panel_column:str)->ContiguousPanelMap:""" Build a panel map assuming each individual's rows are contiguous. Raises an error if any individual's rows are non-contiguous. """ifpanel_columnnotindf.columns:raiseKeyError(f"'{panel_column}' not in dataframe columns.")# Work on a 0-based, monotonic row index viewidx=np.arange(len(df),dtype=np.int64)tmp=pd.DataFrame({panel_column:df[panel_column].values,"_pos":idx})# First/last positions and counts per individualstats=(tmp.groupby(panel_column,sort=False)["_pos"].agg(["min","max","count"]).reset_index())# Contiguity check: for each id, max - min + 1 must equal countcontiguous=(stats["max"]-stats["min"]+1)==stats["count"]ifnotbool(np.all(contiguous.values)):bad=stats.loc[~contiguous,panel_column].tolist()raiseValueError("Panel rows are not contiguous for the following IDs: "+", ".join(repr(b)forbinbad))unique_ids=stats[panel_column].to_numpy()starts=stats["min"].to_numpy(dtype=np.int64)counts=stats["count"].to_numpy(dtype=np.int64)# Build CSR-like pointer array: indptr[i] = start of block i, with trailing NN=len(df)# If starts are strictly increasing and counts contiguous, the last pointer is N# (contiguity guarantees no gaps between starts except different block lengths)indptr=np.empty(len(starts)+1,dtype=np.int64)indptr[:-1]=startsindptr[-1]=NreturnContiguousPanelMap(unique_ids=unique_ids,starts=starts,counts=counts,indptr=indptr)