Article
anomaly-detectiontime-seriestelemetrypythonstatistical-methodssremonitoringpandas
Detect Anomalies in Time-Series Telemetry Data
Implement statistical Anomaly Detection for telemetry data like CPU usage or latency. This guide uses Python with Z-score and Interquartile Range (IQR) methods to identify significant deviations from a baseline, a core skill for system monitoring and reliability.
beginner15 min4 steps
The play
- Generate Sample Telemetry DataFirst, ensure you have pandas, numpy, and matplotlib installed (`pip install pandas numpy matplotlib`). We will create a synthetic time-series dataset representing CPU usage, injecting a clear spike to serve as our test anomaly. This provides a controlled dataset for our Anomaly Detection logic.
- Apply the Z-Score MethodThe Z-score indicates how many standard deviations a data point is from the mean. It's a fast, simple Anomaly Detection method. We'll calculate the Z-score for each point and flag any point with a score above a certain threshold (e.g., 3) as an anomaly.
- Use the Robust IQR MethodThe Interquartile Range (IQR) method is more robust to extreme outliers than Z-score. It defines anomalies as points falling outside a range defined by the 1st and 3rd quartiles. This is a common and effective Anomaly Detection technique for non-normally distributed data.
- Visualize Data and AnomaliesVisualizing the results is crucial for verifying the Anomaly Detection logic and understanding its behavior. We'll use matplotlib to plot the original time-series, the calculated IQR bounds, and highlight the points identified as anomalies.
Starter code
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
def detect_anomalies_iqr(df, column_name='value', multiplier=1.5):
"""Detects anomalies in a DataFrame column using the IQR method."""
Q1 = df[column_name].quantile(0.25)
Q3 = df[column_name].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - multiplier * IQR
upper_bound = Q3 + multiplier * IQR
anomalies = df[(df[column_name] < lower_bound) | (df[column_name] > upper_bound)]
return anomalies, lower_bound, upper_bound
# 1. Generate Sample Telemetry Data
np.random.seed(42)
normal_data = np.random.normal(loc=50, scale=10, size=300)
# Inject an anomaly
normal_data[220] = 150
df = pd.DataFrame(
normal_data,
columns=['cpu_usage'],
index=pd.to_datetime(pd.date_range('2023-01-01', periods=300, freq='min'))
)
# 2. Perform Anomaly Detection
anomalies, lower_bound, upper_bound = detect_anomalies_iqr(df, 'cpu_usage')
print("--- Anomaly Detection Report ---")
print(f"IQR Lower Bound: {lower_bound:.2f}")
print(f"IQR Upper Bound: {upper_bound:.2f}")
print("\nDetected Anomalies:")
if anomalies.empty:
print("No anomalies detected.")
else:
print(anomalies)
# 3. Visualize the results
plt.style.use('seaborn-v0_8-whitegrid')
plt.figure(figsize=(16, 7))
# Plot the main time series
plt.plot(df.index, df['cpu_usage'], label='CPU Usage', color='blue', zorder=1)
# Plot the IQR bounds
plt.axhline(y=upper_bound, color='darkorange', linestyle='--', label=f'IQR Upper Bound ({upper_bound:.2f})')
# Highlight the anomalies
plt.scatter(anomalies.index, anomalies['cpu_usage'], color='red', s=100, label='Anomaly', zorder=5, edgecolors='black')
plt.title('Time-Series Anomaly Detection with IQR', fontsize=16)
plt.xlabel('Timestamp', fontsize=12)
plt.ylabel('CPU Usage (%)', fontsize=12)
plt.legend(fontsize=10)
plt.tight_layout()
plt.show()