上周产品经理提了个需求:统计每个用户的最长连续登录天数,用来做用户活跃度分析。听起来很简单对吧?结果写代码的时候发现,这事儿没那么简单。
记录一下完整的思路和实现过程,顺便踩了不少坑。
业务场景
我们的用户登录表大概长这样:
CREATE TABLE user_login (
id INT PRIMARY KEY AUTO_INCREMENT,
user_id INT NOT NULL,
login_date DATE NOT NULL,
login_time DATETIME NOT NULL,
INDEX idx_user_date (user_id, login_date)
);
数据示例:
user_id | login_date
--------|------------
1001 | 2025-11-01
1001 | 2025-11-02
1001 | 2025-11-03
1001 | 2025-11-05
1001 | 2025-11-06
需求: 计算每个用户最长连续登录了多少天。
比如上面的例子,用户 1001 在 11月1-3日连续登录3天,11月5-6日连续登录2天,所以最长连续登录天数是 3天。
思路分析
难点在哪
乍一看这需求很简单,不就是数连续的日期吗?但实际操作有几个问题:
- 同一天多次登录怎么办? 只算一次
- 如何判断日期是连续的? 不能简单用日期相减
- 如何分组统计连续段? 这是核心难点
- 数据量大时性能怎么保证? 几百万用户,上千万条记录
核心思路:分组标记法
关键是给每个连续的日期段打上标记,然后按标记分组统计。
举个例子:
原始数据:
日期 排名 日期-排名
11-01 → 1 → 10-31 (分组A)
11-02 → 2 → 10-31 (分组A)
11-03 → 3 → 10-31 (分组A)
11-05 → 4 → 11-01 (分组B)
11-06 → 5 → 11-01 (分组B)
思路解释:
- 给每条记录一个行号(ROW_NUMBER)
- 用登录日期减去行号
- 连续的日期减去行号后,结果是相同的
- 这个相同的结果就是分组标记
这是个很巧妙的技巧,第一次见的时候我也懵了半天。
方案一:纯 SQL 实现
MySQL 8.0 之后有窗口函数,可以直接用 SQL 搞定。
完整 SQL
WITH
-- 第一步:去重,一天只算一次登录
deduplicated AS (
SELECT DISTINCT
user_id,
login_date
FROM user_login
),
-- 第二步:给每个用户的登录记录编号
ranked AS (
SELECT
user_id,
login_date,
ROW_NUMBER() OVER (
PARTITION BY user_id
ORDER BY login_date
) AS rn
FROM deduplicated
),
-- 第三步:计算分组标记
grouped AS (
SELECT
user_id,
login_date,
DATE_SUB(login_date, INTERVAL rn DAY) AS group_flag
FROM ranked
),
-- 第四步:按分组统计连续天数
consecutive_days AS (
SELECT
user_id,
group_flag,
COUNT(*) AS consecutive_count
FROM grouped
GROUP BY user_id, group_flag
)
-- 第五步:取每个用户的最大值
SELECT
user_id,
MAX(consecutive_count) AS max_consecutive_days
FROM consecutive_days
GROUP BY user_id
ORDER BY max_consecutive_days DESC;
执行结果
user_id | max_consecutive_days
--------|---------------------
1001 | 3
1002 | 7
1003 | 1
性能测试
测试环境:100万用户,500万条登录记录
-- 执行时间:约 8.5 秒
-- 扫描行数:5,000,000
-- 使用索引:idx_user_date
对于百万级数据,8秒多还能接受。但如果数据量更大,就得考虑优化了。
踩过的坑
坑1:DATE_SUB 的参数类型
一开始我写的是:
DATE_SUB(login_date, rn) -- 错误!
结果报错,因为 DATE_SUB 第二个参数需要明确单位:
DATE_SUB(login_date, INTERVAL rn DAY) -- 正确
坑2:忘记去重
有些用户一天登录多次,如果不去重,会导致计算错误。
坑3:MySQL 5.7 没有窗口函数
如果用的是老版本 MySQL,得用变量模拟:
SET @prev_user := NULL;
SET @rn := 0;
SELECT
user_id,
login_date,
@rn := IF(@prev_user = user_id, @rn + 1, 1) AS rn,
@prev_user := user_id
FROM (
SELECT DISTINCT user_id, login_date
FROM user_login
ORDER BY user_id, login_date
) t;
但这种写法性能更差,还容易出错。能升级就升级吧。
方案二:Python + pandas 实现
如果数据不是特别大,或者需要做更复杂的分析,用 Python 更灵活。
代码实现
import pandas as pd
import pymysql
from datetime import timedelta
# 连接数据库
conn = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='your_db',
charset='utf8mb4'
)
# 读取数据
query = """
SELECT DISTINCT
user_id,
login_date
FROM user_login
ORDER BY user_id, login_date
"""
df = pd.read_sql(query, conn)
conn.close()
print(f"总记录数: {len(df)}")
print(df.head())
核心逻辑
def calculate_max_consecutive_days(df):
"""
计算每个用户的最长连续登录天数
"""
result = []
# 按用户分组处理
for user_id, group in df.groupby('user_id'):
# 转换日期格式
dates = pd.to_datetime(group['login_date']).sort_values().reset_index(drop=True)
# 计算日期差
diff = dates.diff()
# 找出不连续的地方(日期差 > 1天)
breaks = diff[diff > timedelta(days=1)].index.tolist()
# 添加起始和结束位置
breaks = [0] + breaks + [len(dates)]
# 计算每段的长度
max_days = 0
for i in range(len(breaks) - 1):
segment_length = breaks[i + 1] - breaks[i]
max_days = max(max_days, segment_length)
result.append({
'user_id': user_id,
'max_consecutive_days': max_days
})
return pd.DataFrame(result)
# 执行计算
result_df = calculate_max_consecutive_days(df)
print(result_df.head(10))
优化版本:向量化操作
def calculate_consecutive_days_vectorized(df):
"""
向量化版本,性能更好
"""
results = []
for user_id, group in df.groupby('user_id'):
# 排序并重置索引
group = group.sort_values('login_date').reset_index(drop=True)
# 转换为日期类型
group['login_date'] = pd.to_datetime(group['login_date'])
# 创建分组标记:日期 - 行号
group['rn'] = range(len(group))
group['group_flag'] = group['login_date'] - pd.to_timedelta(group['rn'], unit='D')
# 按分组标记统计
consecutive_counts = group.groupby('group_flag').size()
# 取最大值
max_days = consecutive_counts.max()
results.append({
'user_id': user_id,
'max_consecutive_days': max_days
})
return pd.DataFrame(results)
# 执行
result = calculate_consecutive_days_vectorized(df)
性能对比
测试数据:10万用户,50万条记录
| 方法 | 执行时间 | 内存占用 |
|---|---|---|
| 循环版本 | ~12秒 | ~200MB |
| 向量化版本 | ~3秒 | ~180MB |
| 纯 SQL | ~2秒 | - |
结论: 如果只是简单统计,SQL 最快。如果需要后续分析,Python 更方便。
Python 版踩过的坑
坑1:日期格式不统一
数据库读出来的日期可能是字符串:
# 错误示范
dates.diff() # 字符串无法直接计算差值
# 正确做法
dates = pd.to_datetime(group['login_date'])
坑2:时区问题
如果数据库存的是 DATETIME:
df['login_date'] = pd.to_datetime(df['login_date']).dt.date
坑3:空数据处理
某些用户可能没有登录记录:
if len(dates) == 0:
max_days = 0
continue
方案三:混合方案(推荐)
实际项目中,我用的是混合方案:
- 数据库层面做预处理:去重、过滤
- Python 做复杂计算:分组、统计
- 结果写回数据库:供其他系统使用
完整流程
import pandas as pd
import pymysql
from sqlalchemy import create_engine
from datetime import datetime
class ConsecutiveLoginAnalyzer:
def __init__(self, db_config):
self.engine = create_engine(
f"mysql+pymysql://{db_config['user']}:{db_config['password']}"
f"@{db_config['host']}/{db_config['database']}?charset=utf8mb4"
)
def extract_data(self, start_date=None, end_date=None):
"""从数据库提取数据"""
query = """
SELECT DISTINCT
user_id,
login_date
FROM user_login
WHERE 1=1
"""
params = []
if start_date:
query += " AND login_date >= %s"
params.append(start_date)
if end_date:
query += " AND login_date <= %s"
params.append(end_date)
query += " ORDER BY user_id, login_date"
return pd.read_sql(query, self.engine, params=params)
def calculate(self, df):
"""计算连续登录天数"""
results = []
for user_id, group in df.groupby('user_id'):
group = group.sort_values('login_date').reset_index(drop=True)
group['login_date'] = pd.to_datetime(group['login_date'])
group['rn'] = range(len(group))
group['group_flag'] = group['login_date'] - pd.to_timedelta(group['rn'], unit='D')
consecutive_counts = group.groupby('group_flag').size()
results.append({
'user_id': user_id,
'max_consecutive_days': consecutive_counts.max(),
'total_login_days': len(group),
'last_login_date': group['login_date'].max(),
'calculated_at': datetime.now()
})
return pd.DataFrame(results)
def save_results(self, df, table_name='user_consecutive_stats'):
"""保存结果到数据库"""
df.to_sql(
table_name,
self.engine,
if_exists='replace',
index=False
)
print(f"已保存 {len(df)} 条记录到表 {table_name}")
def run(self):
"""完整流程"""
print("1. 提取数据...")
df = self.extract_data()
print(f" 共 {len(df)} 条登录记录")
print("2. 计算连续登录天数...")
result = self.calculate(df)
print(f" 处理了 {len(result)} 个用户")
print("3. 保存结果...")
self.save_results(result)
print("4. 完成!")
return result
# 使用示例
if __name__ == '__main__':
config = {
'host': 'localhost',
'user': 'root',
'password': 'your_password',
'database': 'your_db'
}
analyzer = ConsecutiveLoginAnalyzer(config)
result = analyzer.run()
# 查看结果
print("\n最长连续登录 TOP 10:")
print(result.nlargest(10, 'max_consecutive_days'))
实际应用场景
场景1:用户活跃度分析
# 根据连续登录天数分层
def categorize_users(df):
conditions = [
df['max_consecutive_days'] >= 30,
df['max_consecutive_days'] >= 7,
df['max_consecutive_days'] >= 3,
df['max_consecutive_days'] >= 1,
]
choices = ['超级活跃', '活跃', '一般', '不活跃']
df['user_level'] = pd.Series(
pd.cut(df['max_consecutive_days'],
bins=[0, 1, 3, 7, 30, float('inf')],
labels=['沉睡', '不活跃', '一般', '活跃', '超级活跃'])
)
return df
result = categorize_users(result)
print(result['user_level'].value_counts())
场景2:活动效果评估
# 对比活动前后的连续登录情况
def compare_periods(analyzer, activity_date):
# 活动前一个月
before = analyzer.extract_data(
end_date=activity_date
)
before_result = analyzer.calculate(before)
# 活动后一个月
after = analyzer.extract_data(
start_date=activity_date
)
after_result = analyzer.calculate(after)
# 合并对比
comparison = before_result.merge(
after_result,
on='user_id',
suffixes=('_before', '_after')
)
comparison['improvement'] = (
comparison['max_consecutive_days_after'] -
comparison['max_consecutive_days_before']
)
return comparison
activity_date = '2025-11-01'
comp = compare_periods(analyzer, activity_date)
print(f"平均提升: {comp['improvement'].mean():.2f} 天")
场景3:预警流失用户
# 找出连续登录中断的用户
from datetime import date, timedelta
def find_at_risk_users(df):
today = pd.Timestamp(date.today())
# 最后登录超过3天的用户
at_risk = df[
(df['max_consecutive_days'] >= 7) & # 曾经很活跃
((today - df['last_login_date']).dt.days > 3) # 但最近没来
]
return at_risk[['user_id', 'max_consecutive_days', 'last_login_date']]
at_risk_users = find_at_risk_users(result)
print(f"需要关注的用户: {len(at_risk_users)}")
性能优化建议
1. 数据库层面
-- 创建合适的索引
CREATE INDEX idx_user_date ON user_login(user_id, login_date);
-- 定期统计,不要每次都全量计算
CREATE TABLE user_consecutive_stats (
user_id INT PRIMARY KEY,
max_consecutive_days INT,
last_updated DATETIME
);
-- 增量更新
INSERT INTO user_consecutive_stats
SELECT ...
FROM user_login
WHERE login_date >= DATE_SUB(CURDATE(), INTERVAL 7 DAY)
ON DUPLICATE KEY UPDATE ...
2. Python 层面
# 分批处理大数据
def batch_calculate(df, batch_size=10000):
results = []
for i in range(0, len(df), batch_size):
batch = df.iloc[i:i+batch_size]
batch_result = calculate_consecutive_days_vectorized(batch)
results.append(batch_result)
return pd.concat(results, ignore_index=True)
# 并行处理
from multiprocessing import Pool
def parallel_calculate(df, n_workers=4):
user_groups = [group for _, group in df.groupby('user_id')]
with Pool(n_workers) as pool:
results = pool.map(process_user_group, user_groups)
return pd.concat(results)
总结
连续登录天数统计看似简单,实际上有不少细节:
- 核心思路:日期减行号的分组标记法
- SQL 方案:适合数据量大、计算简单的场景
- Python 方案:适合需要复杂分析的场景
- 混合方案:实际项目的最佳选择
选择建议:
- 数据 < 100万:Python 随便玩
- 100万 ~ 1000万:SQL 为主,Python 为辅
- > 1000万:考虑分布式计算(Spark)
最重要的是,理解业务需求,选择合适的技术方案。别为了炫技用复杂方案,能用简单 SQL 搞定的就别折腾了。
代码都是实际跑过的,有问题欢迎留言讨论。
相关阅读:
- 窗口函数详解
- pandas 性能优化技巧
- MySQL 索引优化