Source code for psy.cat.tirt

# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
import math
from functools import partial
import numpy as np
from scipy.stats import norm
from psy.exceptions.cat import ItemParamError, ScoreError, ThetaError, IterMethodError, UnknownModelError
from psy.exceptions import ConvergenceError
from psy.utils import cached_property, gen_item_bank


[docs]class BaseModel(object): # 最大牛顿迭代次数 _newton_max_iter = 1000 # 牛顿迭代步长 _newton_step_size = 0.1 # 最大梯度上升次数 _gradient_max_iter = 1000 # 梯度上升步长 _gradient_step_size = 0.01 # 参数估计精度 _tol = 1e-5 def __init__(self, slop, threshold, init_theta=None, score=None, iter_method='newton', sigma=None): """ 不管是probit还是logit,都是用一样的参数估计算法, 基于牛顿迭代的极大似然算法和贝叶斯最大后验算法 :param slop: ndarray(float), 多维向量,斜率,区分度 :param threshold: ndarray(float), 单维向量,阈值,通俗度,难度 :param init_theta: ndarray(int|float), 特质向量初值 :param score: ndarray(0|1), 得分向量 """ if not isinstance(slop, np.ndarray): raise ItemParamError('item param must be ndarray') if not isinstance(threshold, np.ndarray): raise ItemParamError('item param must be ndarray') if len(slop.shape) == 1: slop.shape = 1, slop.shape[0] if len(slop) != len(threshold): raise ItemParamError('item param must be same length') if score is not None: if not isinstance(score, np.ndarray): raise ScoreError('score must be ndarray') if len(score) != len(slop): raise ScoreError('score must be same length as item param') if init_theta is not None and not isinstance(init_theta, np.ndarray): raise ThetaError('init_theta must be ndarray') if iter_method not in ('newton', 'gradient_ascent'): raise IterMethodError('iter_method must be newton or gradient_ascent') self._slop = slop self._score = score self._threshold = threshold self._init_theta = init_theta if init_theta is not None else np.zeros(len(self._slop[0])) # 默认bayes先验正态分布标准差 if sigma is None: self._inv_psi = np.identity(len(self._slop[0])) else: self._inv_psi = np.linalg.inv(sigma) self._iter_method = iter_method @property def score(self): return self._score def _prob(self, theta): raise NotImplementedError
[docs] def prob(self, theta): # 回答为1的概率 if not isinstance(theta, np.ndarray): raise ThetaError('theta must be ndarray') return self._prob(theta)
def _z(self, theta): """ probit和logit的z值 :param theta: ndarray(int|float), 特质向量初值 :return: ndarray(float), z值向量 """ return np.sum(self._slop * theta, 1) - self._threshold
[docs] def z(self, theta): if not isinstance(theta, np.ndarray): raise ThetaError('theta must be ndarray') return self._z(theta)
def _get_hessian_n_jacobian(self, theta): """ 抽象方法,目的是返回海塞矩阵和雅克比一阶导数向量,用于牛顿迭代 :param theta: ndarray(int|float), 特质向量初值 """ raise NotImplementedError def _get_jacobian(self, theta): """ 抽象方法,目的是返回雅克比向量(矩阵),用于梯度上升 :param theta: """ raise NotImplementedError @property def newton(self): """ 基于牛顿迭代的参数估计 :return: ndarray(int|float), 特质向量初值 """ theta0 = self._init_theta * 1.0 for i in range(self._newton_max_iter): hes, jac = self._get_hessian_n_jacobian(theta0) temp = self._newton_step_size * np.dot(hes, jac) theta = theta0 - temp if np.max(np.abs(temp)) < self._tol: # print i return np.round(theta, 3) theta0 = theta raise ConvergenceError('no convergence') @property def gradient_ascent(self): # 梯度上升参数估计 theta0 = self._init_theta * 1.0 for i in range(self._gradient_max_iter): jac = self._get_jacobian(theta0) theta = theta0 + self._gradient_step_size * jac if np.max(np.abs(self._gradient_step_size * jac)) < self._tol: return np.round(theta, 3) theta0 = theta raise ConvergenceError('no convergence') @cached_property def solve(self): return getattr(self, self._iter_method)
[docs]class BaseProbitModel(BaseModel): # probit基础模型 def _prob(self, theta): # probit概率值 return norm.cdf(self._z(theta)) def _h(self, z): # probit函数的h值,方便计算 return (1.0 / ((2 * np.pi) ** 0.5)) * np.exp(-1 * z ** 2 / 2.0) def _w(self, h, prob): # probit函数的w值,可以看成权重,方便计算和呈现 pq = (1 - prob) * prob return h ** 2 / (pq + 1e-10) def _get_h_prob_val_w(self, theta): z = self._z(theta) h = self._h(z) + 1e-10 prob_val = self._prob(theta) w = self._w(h, prob_val) return h, prob_val, w def _dloglik(self, theta, prob_val, h, w): # probit一阶导数 return np.dot(self._slop.transpose(), w * (self._score - prob_val) / h) def _ddloglik(self, theta, w): # probit二阶导数 return -1 * np.dot(self._slop.transpose() * w, self._slop)
[docs] def info(self, theta): # 信息矩阵 if not isinstance(theta, np.ndarray): raise ThetaError('theta must be ndarray') h, prob_val, w = self._get_h_prob_val_w(theta) return np.dot(self._slop.transpose() * w, self._slop)
[docs]class BayesProbitModel(BaseProbitModel): # 贝叶斯modal估计 def _bayes_dloglik(self, theta, prob_val, h, w): return self._dloglik(theta, prob_val, h, w) - np.dot(self._inv_psi, theta) def _bayes_ddloglik(self, theta, w): return self._ddloglik(theta, w) - self._inv_psi def _get_hessian_n_jacobian(self, theta): h, prob_val, w = self._get_h_prob_val_w(theta) hes = np.linalg.inv(self._bayes_ddloglik(theta, w)) jac = self._bayes_dloglik(theta, prob_val, h, w) return hes, jac def _get_jacobian(self, theta): h, prob_val, w = self._get_h_prob_val_w(theta) return self._bayes_dloglik(theta, prob_val, h, w)
[docs] def info(self, theta): # 信息矩阵 if not isinstance(theta, np.ndarray): raise ThetaError('theta must be ndarray') _info = super(BayesProbitModel, self).info(theta) return _info + self._inv_psi
[docs]class BaseSimTirt(object): MODEL = {'bayes_probit': BayesProbitModel} def __init__(self, subject_nums, trait_size, model='bayes_probit', sigma=None, iter_method='newton', block_size=3, lower=1, upper=4, avg=0, std=1): """ :param subject_nums: int, 模拟被试的人数 :param trait_size: int, 特质数量 :param iter_method: str :param model: str, 模型 :param block_size: int, 题块 :param lower: int|float :param upper: int|float :param avg: int|float :param std: int|float """ if not isinstance(subject_nums, int): raise ValueError('subject_nums must be int') if not isinstance(trait_size, int): raise ValueError('trait_size must be int') if model not in ('bayes_probit', ): raise ValueError('mode must be bayes_probit') if block_size not in (2, 3): raise ValueError('block_size must be 2 or 3') if not isinstance(lower, (int, float)): raise ValueError('lower must be int or float') if not isinstance(upper, (int, float)): raise ValueError('upper must be int or float') if not isinstance(avg, (int, float)): raise ValueError('avg must be int or float') if not isinstance(std, (int, float)): raise ValueError('std must be int or float') if iter_method not in ('newton', 'gradient_ascent'): raise IterMethodError('iter_method must be newton or gradient_ascent') self._subject_nums = subject_nums self._trait_size = trait_size self._block_size = block_size self._lower = lower self._upper = upper self._avg = avg self.std = std self._iter_method = iter_method self._model = self._get_model(model, sigma) def _get_model(self, model, sigma): try: return partial(self.MODEL[model], sigma=sigma) except KeyError: raise UnknownModelError('unknown model, must be "bayes_probit" or ' '"ml_probit" or "bayes_logit" or "ml_logit"') @cached_property def random_thetas(self): """ 生成特质向量 :return: ndarray """ return np.random.multivariate_normal(np.zeros(self._trait_size), np.identity(self._trait_size), self._subject_nums) def _get_init_theta(self): return np.zeros(self._trait_size) def _get_mean_error(self, theta_list): return np.mean(np.abs(theta_list - self.random_thetas))
[docs]class SimAdaptiveTirt(BaseSimTirt): def __init__(self, item_size, max_sec_item_size=10, *args, **kwargs): super(SimAdaptiveTirt, self).__init__(*args, **kwargs) # 题库题量 self._item_size = item_size # 已做答试题编号保存记录 self._has_answered_item_idx = {} # 已做答得分保存记录 self._score = {} # 参数估计保存记录 self._theta = {} # 作答试题斜率保存记录 self._slop = {} # 作答试题阈值保存记录 self._threshold = {} # 第二阶段最大答题次数 self._max_sec_item_size = max_sec_item_size @property def scores(self): return self._score @property def thetas(self): return self._theta def _add_slop(self, theta_idx, slop): if theta_idx in self._slop: self._slop[theta_idx] = np.concatenate((self._slop[theta_idx], slop)) else: self._slop[theta_idx] = slop def _get_slop(self, theta_idx): return self._slop[theta_idx] def _get_threshold(self, theta_idx): return self._threshold[theta_idx] def _add_threshold(self, theta_idx, threshold): if theta_idx in self._threshold: self._threshold[theta_idx] = np.concatenate((self._threshold[theta_idx], threshold)) else: self._threshold[theta_idx] = threshold def _add_answered_item_idx(self, theta_idx, used_item_idx_list): if theta_idx in self._has_answered_item_idx: self._has_answered_item_idx[theta_idx].extend(used_item_idx_list) else: self._has_answered_item_idx[theta_idx] = used_item_idx_list def _get_answered_item_idx_set(self, theta_idx): return set(self._has_answered_item_idx[theta_idx]) def _get_can_use_items(self, theta_idx): can_use_idx = self._get_can_use_idx(theta_idx) return self.item_bank[list(can_use_idx)] def _get_can_use_idx(self, theta_idx): can_use_idx = self._item_idx_set - self._get_answered_item_idx_set(theta_idx) return can_use_idx def _add_score(self, theta_idx, score): if theta_idx in self._score: self._score[theta_idx] = np.concatenate((self._score[theta_idx], score)) else: self._score[theta_idx] = score def _get_score(self, theta_idx): return self._score[theta_idx] def _add_theta(self, theta_idx, theta): if theta_idx in self._theta: self._theta[theta_idx].append(theta) else: self._theta[theta_idx] = [theta] def _get_theta(self, theta_idx): return self._theta[theta_idx][-1] @cached_property def item_bank(self): return gen_item_bank(self._trait_size, self._item_size, self._block_size) @cached_property def _item_idx_set(self): return set(range(self._item_size)) def _get_random_choice_items(self, theta_idx): rand_choice_size = self._get_random_choice_size() while True: items = [] dims = [] used_idx_list = [] idx_list = np.random.choice(list(self._item_idx_set), rand_choice_size, False) for i in idx_list: item = self.item_bank[i] items.append(item) dims.extend(item['dim']) used_idx_list.append(i) # if len(set(dims)) == self._trait_size: self._add_answered_item_idx(theta_idx, used_idx_list) return items def _get_random_choice_size(self): return int(math.ceil(1.0 * self._trait_size / self._block_size)) def _get_random_choice_params(self, theta_idx): first_rand_items = self._get_random_choice_items(theta_idx) slop = np.zeros((len(first_rand_items) * self._block_size, self._trait_size)) threshold = np.zeros(len(first_rand_items) * self._block_size) for i, item in enumerate(first_rand_items): slop[i:i + self._block_size] = item['params'][0] threshold[i:i + self._block_size] = item['params'][1] return slop, threshold def _first_random(self, theta, theta_idx): # 第一阶段,随机抽题 slop, threshold = self._get_random_choice_params(theta_idx) p_list = self._model(slop, threshold).prob(theta) score = np.random.binomial(1, p_list, len(p_list)) print('当前作答结果:') print(score) init_theta = self._get_init_theta() model = self._model(slop, threshold, init_theta, score, self._iter_method) theta = model.solve self._add_score(theta_idx, score) self._add_theta(theta_idx, theta) self._add_slop(theta_idx, slop) self._add_threshold(theta_idx, threshold) def _second_random(self, theta, theta_idx): item = self._get_next_item(theta_idx) score = self._get_next_score(item, theta, theta_idx) print('当前作答结果:') print(score) est_theta = self._get_estimate_theta(score, theta_idx) self._add_theta(theta_idx, est_theta) return est_theta def _get_estimate_theta(self, score, theta_idx): # 参数估计 now_slop = self._get_slop(theta_idx) now_threshold = self._get_threshold(theta_idx) init_theta = self._get_init_theta() model = self._model(now_slop, now_threshold, init_theta, score, self._iter_method) est_theta = model.solve return est_theta def _get_next_score(self, item, theta, theta_idx): # 模拟自适应抽题的下一题得分 item_slop = item['params'][0] self._add_slop(theta_idx, item_slop) item_threshold = item['params'][1] self._add_threshold(theta_idx, item_threshold) p_list = self._model(item_slop, item_threshold).prob(theta) item_score = np.random.binomial(1, p_list, len(p_list)) self._add_score(theta_idx, item_score) score = self._get_score(theta_idx) return score def _get_next_item(self, theta_idx): # 获得自适应抽题的下一道题 est_theta = self._get_theta(theta_idx) items = self._get_can_use_items(theta_idx) slop = self._get_slop(theta_idx) threshold = self._get_threshold(theta_idx) test_info = self._model(slop, threshold).info(est_theta) print('误差方差平均值:%f' % np.mean(np.diag(np.linalg.inv(test_info)) ** 0.5)) info_list = np.zeros_like(items) for i, _item in enumerate(items): _slop, _threshold = _item['params'] item_info = self._model(_slop, _threshold).info(est_theta) info_list[i] = np.linalg.det(test_info + item_info) max_info_idx = info_list.argmax() item = items[max_info_idx] idx = list(self._get_can_use_idx(theta_idx))[max_info_idx] self._add_answered_item_idx(theta_idx, [idx]) return item
[docs] def sim(self): thetas = self.random_thetas theta_list = np.zeros((self._subject_nums, self._trait_size)) for i, theta in enumerate(thetas): est_theta = np.nan self._first_random(theta, i) for j in range(self._max_sec_item_size): est_theta = self._second_random(theta, i) print(u'第{0}个被试模拟成功!'.format(i + 1)) theta_list[i] = est_theta mean_error = self._get_mean_error(theta_list) print('模拟结束,平均误差{0}'.format(mean_error)) return theta_list