Source code for

# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
import math
from functools import partial
import numpy as np
from scipy.stats import norm
from import ItemParamError, ScoreError, ThetaError, IterMethodError, UnknownModelError
from psy.exceptions import ConvergenceError
from psy.utils import cached_property, gen_item_bank

[docs]class BaseModel(object): # 最大牛顿迭代次数 _newton_max_iter = 1000 # 牛顿迭代步长 _newton_step_size = 0.1 # 最大梯度上升次数 _gradient_max_iter = 1000 # 梯度上升步长 _gradient_step_size = 0.01 # 参数估计精度 _tol = 1e-5 def __init__(self, slop, threshold, init_theta=None, score=None, iter_method='newton', sigma=None): """ 不管是probit还是logit,都是用一样的参数估计算法, 基于牛顿迭代的极大似然算法和贝叶斯最大后验算法 :param slop: ndarray(float), 多维向量,斜率,区分度 :param threshold: ndarray(float), 单维向量,阈值,通俗度,难度 :param init_theta: ndarray(int|float), 特质向量初值 :param score: ndarray(0|1), 得分向量 """ if not isinstance(slop, np.ndarray): raise ItemParamError('item param must be ndarray') if not isinstance(threshold, np.ndarray): raise ItemParamError('item param must be ndarray') if len(slop.shape) == 1: slop.shape = 1, slop.shape[0] if len(slop) != len(threshold): raise ItemParamError('item param must be same length') if score is not None: if not isinstance(score, np.ndarray): raise ScoreError('score must be ndarray') if len(score) != len(slop): raise ScoreError('score must be same length as item param') if init_theta is not None and not isinstance(init_theta, np.ndarray): raise ThetaError('init_theta must be ndarray') if iter_method not in ('newton', 'gradient_ascent'): raise IterMethodError('iter_method must be newton or gradient_ascent') self._slop = slop self._score = score self._threshold = threshold self._init_theta = init_theta if init_theta is not None else np.zeros(len(self._slop[0])) # 默认bayes先验正态分布标准差 if sigma is None: self._inv_psi = np.identity(len(self._slop[0])) else: self._inv_psi = np.linalg.inv(sigma) self._iter_method = iter_method @property def score(self): return self._score def _prob(self, theta): raise NotImplementedError
[docs] def prob(self, theta): # 回答为1的概率 if not isinstance(theta, np.ndarray): raise ThetaError('theta must be ndarray') return self._prob(theta)
def _z(self, theta): """ probit和logit的z值 :param theta: ndarray(int|float), 特质向量初值 :return: ndarray(float), z值向量 """ return np.sum(self._slop * theta, 1) - self._threshold
[docs] def z(self, theta): if not isinstance(theta, np.ndarray): raise ThetaError('theta must be ndarray') return self._z(theta)
def _get_hessian_n_jacobian(self, theta): """ 抽象方法,目的是返回海塞矩阵和雅克比一阶导数向量,用于牛顿迭代 :param theta: ndarray(int|float), 特质向量初值 """ raise NotImplementedError def _get_jacobian(self, theta): """ 抽象方法,目的是返回雅克比向量(矩阵),用于梯度上升 :param theta: """ raise NotImplementedError @property def newton(self): """ 基于牛顿迭代的参数估计 :return: ndarray(int|float), 特质向量初值 """ theta0 = self._init_theta * 1.0 for i in range(self._newton_max_iter): hes, jac = self._get_hessian_n_jacobian(theta0) temp = self._newton_step_size *, jac) theta = theta0 - temp if np.max(np.abs(temp)) < self._tol: # print i return np.round(theta, 3) theta0 = theta raise ConvergenceError('no convergence') @property def gradient_ascent(self): # 梯度上升参数估计 theta0 = self._init_theta * 1.0 for i in range(self._gradient_max_iter): jac = self._get_jacobian(theta0) theta = theta0 + self._gradient_step_size * jac if np.max(np.abs(self._gradient_step_size * jac)) < self._tol: return np.round(theta, 3) theta0 = theta raise ConvergenceError('no convergence') @cached_property def solve(self): return getattr(self, self._iter_method)
[docs]class BaseProbitModel(BaseModel): # probit基础模型 def _prob(self, theta): # probit概率值 return norm.cdf(self._z(theta)) def _h(self, z): # probit函数的h值,方便计算 return (1.0 / ((2 * np.pi) ** 0.5)) * np.exp(-1 * z ** 2 / 2.0) def _w(self, h, prob): # probit函数的w值,可以看成权重,方便计算和呈现 pq = (1 - prob) * prob return h ** 2 / (pq + 1e-10) def _get_h_prob_val_w(self, theta): z = self._z(theta) h = self._h(z) + 1e-10 prob_val = self._prob(theta) w = self._w(h, prob_val) return h, prob_val, w def _dloglik(self, theta, prob_val, h, w): # probit一阶导数 return, w * (self._score - prob_val) / h) def _ddloglik(self, theta, w): # probit二阶导数 return -1 * * w, self._slop)
[docs] def info(self, theta): # 信息矩阵 if not isinstance(theta, np.ndarray): raise ThetaError('theta must be ndarray') h, prob_val, w = self._get_h_prob_val_w(theta) return * w, self._slop)
[docs]class BayesProbitModel(BaseProbitModel): # 贝叶斯modal估计 def _bayes_dloglik(self, theta, prob_val, h, w): return self._dloglik(theta, prob_val, h, w) -, theta) def _bayes_ddloglik(self, theta, w): return self._ddloglik(theta, w) - self._inv_psi def _get_hessian_n_jacobian(self, theta): h, prob_val, w = self._get_h_prob_val_w(theta) hes = np.linalg.inv(self._bayes_ddloglik(theta, w)) jac = self._bayes_dloglik(theta, prob_val, h, w) return hes, jac def _get_jacobian(self, theta): h, prob_val, w = self._get_h_prob_val_w(theta) return self._bayes_dloglik(theta, prob_val, h, w)
[docs] def info(self, theta): # 信息矩阵 if not isinstance(theta, np.ndarray): raise ThetaError('theta must be ndarray') _info = super(BayesProbitModel, self).info(theta) return _info + self._inv_psi
[docs]class BaseSimTirt(object): MODEL = {'bayes_probit': BayesProbitModel} def __init__(self, subject_nums, trait_size, model='bayes_probit', sigma=None, iter_method='newton', block_size=3, lower=1, upper=4, avg=0, std=1): """ :param subject_nums: int, 模拟被试的人数 :param trait_size: int, 特质数量 :param iter_method: str :param model: str, 模型 :param block_size: int, 题块 :param lower: int|float :param upper: int|float :param avg: int|float :param std: int|float """ if not isinstance(subject_nums, int): raise ValueError('subject_nums must be int') if not isinstance(trait_size, int): raise ValueError('trait_size must be int') if model not in ('bayes_probit', ): raise ValueError('mode must be bayes_probit') if block_size not in (2, 3): raise ValueError('block_size must be 2 or 3') if not isinstance(lower, (int, float)): raise ValueError('lower must be int or float') if not isinstance(upper, (int, float)): raise ValueError('upper must be int or float') if not isinstance(avg, (int, float)): raise ValueError('avg must be int or float') if not isinstance(std, (int, float)): raise ValueError('std must be int or float') if iter_method not in ('newton', 'gradient_ascent'): raise IterMethodError('iter_method must be newton or gradient_ascent') self._subject_nums = subject_nums self._trait_size = trait_size self._block_size = block_size self._lower = lower self._upper = upper self._avg = avg self.std = std self._iter_method = iter_method self._model = self._get_model(model, sigma) def _get_model(self, model, sigma): try: return partial(self.MODEL[model], sigma=sigma) except KeyError: raise UnknownModelError('unknown model, must be "bayes_probit" or ' '"ml_probit" or "bayes_logit" or "ml_logit"') @cached_property def random_thetas(self): """ 生成特质向量 :return: ndarray """ return np.random.multivariate_normal(np.zeros(self._trait_size), np.identity(self._trait_size), self._subject_nums) def _get_init_theta(self): return np.zeros(self._trait_size) def _get_mean_error(self, theta_list): return np.mean(np.abs(theta_list - self.random_thetas))
[docs]class SimAdaptiveTirt(BaseSimTirt): def __init__(self, item_size, max_sec_item_size=10, *args, **kwargs): super(SimAdaptiveTirt, self).__init__(*args, **kwargs) # 题库题量 self._item_size = item_size # 已做答试题编号保存记录 self._has_answered_item_idx = {} # 已做答得分保存记录 self._score = {} # 参数估计保存记录 self._theta = {} # 作答试题斜率保存记录 self._slop = {} # 作答试题阈值保存记录 self._threshold = {} # 第二阶段最大答题次数 self._max_sec_item_size = max_sec_item_size @property def scores(self): return self._score @property def thetas(self): return self._theta def _add_slop(self, theta_idx, slop): if theta_idx in self._slop: self._slop[theta_idx] = np.concatenate((self._slop[theta_idx], slop)) else: self._slop[theta_idx] = slop def _get_slop(self, theta_idx): return self._slop[theta_idx] def _get_threshold(self, theta_idx): return self._threshold[theta_idx] def _add_threshold(self, theta_idx, threshold): if theta_idx in self._threshold: self._threshold[theta_idx] = np.concatenate((self._threshold[theta_idx], threshold)) else: self._threshold[theta_idx] = threshold def _add_answered_item_idx(self, theta_idx, used_item_idx_list): if theta_idx in self._has_answered_item_idx: self._has_answered_item_idx[theta_idx].extend(used_item_idx_list) else: self._has_answered_item_idx[theta_idx] = used_item_idx_list def _get_answered_item_idx_set(self, theta_idx): return set(self._has_answered_item_idx[theta_idx]) def _get_can_use_items(self, theta_idx): can_use_idx = self._get_can_use_idx(theta_idx) return self.item_bank[list(can_use_idx)] def _get_can_use_idx(self, theta_idx): can_use_idx = self._item_idx_set - self._get_answered_item_idx_set(theta_idx) return can_use_idx def _add_score(self, theta_idx, score): if theta_idx in self._score: self._score[theta_idx] = np.concatenate((self._score[theta_idx], score)) else: self._score[theta_idx] = score def _get_score(self, theta_idx): return self._score[theta_idx] def _add_theta(self, theta_idx, theta): if theta_idx in self._theta: self._theta[theta_idx].append(theta) else: self._theta[theta_idx] = [theta] def _get_theta(self, theta_idx): return self._theta[theta_idx][-1] @cached_property def item_bank(self): return gen_item_bank(self._trait_size, self._item_size, self._block_size) @cached_property def _item_idx_set(self): return set(range(self._item_size)) def _get_random_choice_items(self, theta_idx): rand_choice_size = self._get_random_choice_size() while True: items = [] dims = [] used_idx_list = [] idx_list = np.random.choice(list(self._item_idx_set), rand_choice_size, False) for i in idx_list: item = self.item_bank[i] items.append(item) dims.extend(item['dim']) used_idx_list.append(i) # if len(set(dims)) == self._trait_size: self._add_answered_item_idx(theta_idx, used_idx_list) return items def _get_random_choice_size(self): return int(math.ceil(1.0 * self._trait_size / self._block_size)) def _get_random_choice_params(self, theta_idx): first_rand_items = self._get_random_choice_items(theta_idx) slop = np.zeros((len(first_rand_items) * self._block_size, self._trait_size)) threshold = np.zeros(len(first_rand_items) * self._block_size) for i, item in enumerate(first_rand_items): slop[i:i + self._block_size] = item['params'][0] threshold[i:i + self._block_size] = item['params'][1] return slop, threshold def _first_random(self, theta, theta_idx): # 第一阶段,随机抽题 slop, threshold = self._get_random_choice_params(theta_idx) p_list = self._model(slop, threshold).prob(theta) score = np.random.binomial(1, p_list, len(p_list)) print('当前作答结果:') print(score) init_theta = self._get_init_theta() model = self._model(slop, threshold, init_theta, score, self._iter_method) theta = model.solve self._add_score(theta_idx, score) self._add_theta(theta_idx, theta) self._add_slop(theta_idx, slop) self._add_threshold(theta_idx, threshold) def _second_random(self, theta, theta_idx): item = self._get_next_item(theta_idx) score = self._get_next_score(item, theta, theta_idx) print('当前作答结果:') print(score) est_theta = self._get_estimate_theta(score, theta_idx) self._add_theta(theta_idx, est_theta) return est_theta def _get_estimate_theta(self, score, theta_idx): # 参数估计 now_slop = self._get_slop(theta_idx) now_threshold = self._get_threshold(theta_idx) init_theta = self._get_init_theta() model = self._model(now_slop, now_threshold, init_theta, score, self._iter_method) est_theta = model.solve return est_theta def _get_next_score(self, item, theta, theta_idx): # 模拟自适应抽题的下一题得分 item_slop = item['params'][0] self._add_slop(theta_idx, item_slop) item_threshold = item['params'][1] self._add_threshold(theta_idx, item_threshold) p_list = self._model(item_slop, item_threshold).prob(theta) item_score = np.random.binomial(1, p_list, len(p_list)) self._add_score(theta_idx, item_score) score = self._get_score(theta_idx) return score def _get_next_item(self, theta_idx): # 获得自适应抽题的下一道题 est_theta = self._get_theta(theta_idx) items = self._get_can_use_items(theta_idx) slop = self._get_slop(theta_idx) threshold = self._get_threshold(theta_idx) test_info = self._model(slop, threshold).info(est_theta) print('误差方差平均值:%f' % np.mean(np.diag(np.linalg.inv(test_info)) ** 0.5)) info_list = np.zeros_like(items) for i, _item in enumerate(items): _slop, _threshold = _item['params'] item_info = self._model(_slop, _threshold).info(est_theta) info_list[i] = np.linalg.det(test_info + item_info) max_info_idx = info_list.argmax() item = items[max_info_idx] idx = list(self._get_can_use_idx(theta_idx))[max_info_idx] self._add_answered_item_idx(theta_idx, [idx]) return item
[docs] def sim(self): thetas = self.random_thetas theta_list = np.zeros((self._subject_nums, self._trait_size)) for i, theta in enumerate(thetas): est_theta = np.nan self._first_random(theta, i) for j in range(self._max_sec_item_size): est_theta = self._second_random(theta, i) print(u'第{0}个被试模拟成功!'.format(i + 1)) theta_list[i] = est_theta mean_error = self._get_mean_error(theta_list) print('模拟结束,平均误差{0}'.format(mean_error)) return theta_list