Source code for psy.irt.irm

# coding=utf-8
from __future__ import print_function
import warnings
from itertools import combinations
import numpy as np
from psy.irt.base import LogitMixin, ProbitMixin, ZMixin
from psy.utils import inverse_logistic, get_nodes_weights
from psy.fa import GPForth, Factor
from psy.settings import X_WEIGHTS, X_NODES
from scipy.stats import norm


class _BaseEmIrt(object):

    def __init__(self, scores=None, max_iter=1000, tol=1e-5):
        self.scores = scores
        self._max_iter = max_iter
        self._tol = tol
        self.item_size = scores.shape[1]

    def _e_step(self, p_val, weights):
        # 计算theta的分布人数
        scores = self.scores
        lik_wt = self._lik(p_val) * weights
        # 归一化
        lik_wt_sum = np.sum(lik_wt, axis=0)
        _temp = lik_wt / lik_wt_sum
        # theta的人数分布
        full_dis = np.sum(_temp, axis=1)
        # theta下回答正确的人数分布
        right_dis = np.dot(_temp, scores)
        full_dis.shape = full_dis.shape[0], 1
        # 对数似然值
        loglik_val = np.sum(np.log(lik_wt_sum))
        return full_dis, right_dis, loglik_val

    def _lik(self, p_val):
        # 似然函数
        scores = self.scores
        loglik_val = np.dot(np.log(p_val + 1e-200), scores.transpose()) + \
                     np.dot(np.log(1 - p_val + 1e-200), (1 - scores).transpose())
        return np.exp(loglik_val)


class _BaseEmUIrt(_BaseEmIrt):

    def __init__(self, init_slop=None, init_threshold=None, model='2PL', *args, **kwargs):
        super(_BaseEmUIrt, self).__init__(*args, **kwargs)
        _model = model.upper()
        if _model == '2PL':
            if init_slop is not None:
                self._init_slop = init_slop
            else:
                self._init_slop = np.ones(self.scores.shape[1])
        if _model == '1PL':
            self._init_slop = None
        if init_threshold is not None:
            self._init_threshold = init_threshold
        else:
            self._init_threshold = np.zeros(self.scores.shape[1])

    def _m_step(self, full_dis, right_dis, slop, threshold, theta, p_val, z):
        # 一阶导数, 二阶导数
        dp, ddp = self._get_dp_n_ddp(right_dis, full_dis, p_val, z)
        # jac矩阵和hess矩阵
        jac1 = np.sum(dp, axis=0)
        jac2 = np.sum(dp * theta, axis=0)
        hess11 = -1 * np.sum(ddp, axis=0)
        hess12 = hess21 = -1 * np.sum(ddp * theta, axis=0)
        hess22 = -1 * np.sum(ddp * theta ** 2, axis=0)
        delta_list = np.zeros((self.item_size, 2))
        hess_list = []
        # 把求稀疏矩阵的逆转化成求每个题目的小矩阵的逆
        for i in range(self.item_size):
            jac = np.array([jac1[i], jac2[i]])
            hess = np.array(
                [[hess11[i], hess12[i]],
                 [hess21[i], hess22[i]]]
            )
            delta = np.linalg.solve(hess, jac)
            slop[i], threshold[i] = slop[i] - delta[1], threshold[i] - delta[0]
            delta_list[i] = delta
            hess_list.append(hess)
        return slop, threshold, delta_list, hess_list

    def fit(self):
        # EM算法
        max_iter = self._max_iter
        tol = self._tol
        slop = self._init_slop
        threshold = self._init_threshold
        for i in range(max_iter):
            z = self.z(slop, threshold, X_NODES)
            p_val = self.p(z)
            full_dis, right_dis, loglik = self._e_step(p_val, X_WEIGHTS)
            slop, threshold, delta_list, hess_list = self._m_step(full_dis, right_dis, slop, threshold, X_NODES, p_val, z)
            if np.max(np.abs(delta_list)) < tol:
                return slop, threshold
        warnings.warn("no convergence")
        return slop, threshold


class _ProbitIrt(_BaseEmUIrt, ProbitMixin, ZMixin):

    def _get_dp_n_ddp(self, right_dis, full_dis, p_val, z):
        h = self._h(z)
        w = self._w(h, p_val)
        dp = w * (right_dis - full_dis * p_val) / h
        ddp = full_dis * w
        return dp, ddp

    def _h(self, z):
        # probit函数的h值,方便计算
        return (1.0 / ((2 * np.pi) ** 0.5)) * np.exp(-1 * z ** 2 / 2.0)

    def _w(self, h, prob):
        # probit函数的w值,可以看成权重,方便计算和呈现
        pq = (1 - prob) * prob
        return h ** 2 / (pq + 1e-10)


class _LogitIrt(_BaseEmUIrt, LogitMixin, ZMixin):
    # EM算法求解
    # E步求期望
    # M步求极大,这里M步只迭代一次

    def _get_dp_n_ddp(self, right_dis, full_dis, p_val, z):
        return right_dis - full_dis * p_val, full_dis * p_val * (1 - p_val)


[docs]class Irt(object): LINK_DT = {'logit': _LogitIrt, 'probit': _ProbitIrt} PARAMS_TYPE_TP = ('1PL', '2PL', '3PL') def __init__(self, scores, link='logit', params_type='2PL', init_slop=None, init_threshold=None, max_iter=1000, tol=1e-5, *args, **kwargs): if link not in ('probit', 'logit'): raise ValueError('link must be probit or logit') _params_type = params_type.upper() if _params_type not in self.PARAMS_TYPE_TP: raise ValueError('params type must be 1PL, 2PL or 3PL') _link = self.LINK_DT[link] self._model = _link(scores=scores, init_slop=init_slop, init_threshold=init_threshold, max_iter=max_iter, tol=tol, *args, **kwargs)
[docs] def fit(self): return self._model.fit()
[docs]class Mirt(_BaseEmIrt, LogitMixin): # 多维项目反应理论(全息项目因子分析)参数估计 def __init__(self, dim_size, init_slop=None, init_threshold=None, max_iter=1000, tol=1e-4, *args, **kwargs): super(Mirt, self).__init__(*args, **kwargs) self._dim_size = dim_size if init_slop is not None and init_threshold is not None: self._init_slop = init_slop.copy() self._init_threshold = init_threshold.copy() else: self._init_slop, self._init_threshold = self._get_init_slop_threshold(dim_size) self._fix_slop(dim_size) self._max_iter = max_iter self._tol = tol self._theta_size = self.scores.shape[0] self._theta_comb = self._get_theta_comb() self._nodes, self._weights = get_nodes_weights(dim_size) def _fix_slop(self, dim_size): # 由于多维参数的解空间无数,需要固定参数 temp_idx = dim_size - 1 while temp_idx: self._init_slop[temp_idx][-temp_idx:] = 0 temp_idx -= 1
[docs] @staticmethod def z(slop, threshold, theta): _z = np.dot(theta, slop) + threshold _z[_z > 35] = 35 _z[_z < -35] = -35 return _z
def _get_theta_comb(self): # 多维特质的两两组合分布,这个主要目的是求二阶导用 return list(combinations(range(self._dim_size), 2)) def _get_theta_mat(self, theta): # theta的矩阵,包括0次方,1次方,二次方和交互,目的是矩阵乘法求海塞矩阵,避免for循环 col1 = np.ones((theta.shape[0], 1)) col2 = theta col3 = theta ** 2 mat = np.zeros((theta.shape[0], len(self._theta_comb))) for i, v in enumerate(self._theta_comb): mat[:, i] = theta[:, v[0]] * theta[:, v[1]] return np.concatenate((col1, col2, col3, mat), axis=1) def _m_step(self, full_dis, right_dis, slop, threshold, theta, p_val): dp = right_dis - full_dis * p_val ddp = full_dis * p_val * (1 - p_val) jac1 = np.sum(dp, axis=0) jac1.shape = 1, jac1.shape[0] jac2 = np.dot(theta.transpose(), dp) jac_all = np.vstack((jac1, jac2)) base_hess = self._get_theta_mat(theta) # 海塞矩阵的数值矩阵,不是海塞矩阵 fake_hess = -1 * np.dot(ddp.transpose(), base_hess) slop_delta_list = np.zeros_like(slop) threshold_delta_list = np.zeros_like(threshold) i = slop.shape[1] - 1 # 固定参数的初始位置 fix_param_size = self._dim_size - 1 # 把求稀疏矩阵的逆转化成求每个题目的小矩阵的逆 while i >= 0: # jac矩阵 jac = self._get_jac(jac_all, i, fix_param_size) # 海塞矩阵 hess = self._get_hess(fake_hess, i, fix_param_size) delta = np.linalg.solve(hess, jac) slop_est_param_idx = self._dim_size - fix_param_size slop[:slop_est_param_idx, i] = slop[:slop_est_param_idx, i] - delta[1:] threshold[:, i] = threshold[:, i] - delta[0] slop_delta_list[:slop_est_param_idx, i] = delta[1:] threshold_delta_list[:, i] = delta[0] i -= 1 if fix_param_size > 0: fix_param_size -= 1 return slop, threshold, slop_delta_list, threshold_delta_list def _get_jac(self, jac_all, i, fix_param_size): return jac_all[:, i][:self._dim_size - fix_param_size + 1] def _get_hess(self, hess_vals, i, fix_param_size): # 求海赛矩阵 param_size = self._dim_size + 1 - fix_param_size hess = np.zeros((param_size, param_size)) hess[0] = hess_vals[i][:param_size] hess[1:, 0] = hess_vals[i][1:param_size] for j in range(1, param_size): hess[j, j] = hess_vals[i][self._dim_size + j] for k, comb in enumerate(self._theta_comb): try: val = hess_vals[i][self._dim_size + 1 + self._dim_size + k] hess[comb[0] + 1, comb[1] + 1] = val hess[comb[1] + 1, comb[0] + 1] = val except IndexError: break return hess
[docs] def em(self): max_iter = self._max_iter tol = self._tol slop = self._init_slop threshold = self._init_threshold for i in range(max_iter): z = self.z(slop, threshold, self._nodes) p_val = self.p(z) full_dis, right_dis, loglik = self._e_step(p_val, self._weights) slop, threshold, slop_delta_list, threshold_delta_list = self._m_step(full_dis, right_dis, slop, threshold, self._nodes, p_val) if np.max(np.abs(slop_delta_list)) < tol and np.max(np.abs(threshold_delta_list)) < tol: print(i) return slop, threshold, self._get_factor_loadings(slop) warnings.warn("no convergence, the smallest delta is %s" % max(np.max(np.abs(slop_delta_list)), np.max(np.abs(threshold_delta_list)))) return slop, threshold, self._get_factor_loadings(slop)
@staticmethod def _get_factor_loadings(slop): # 将求得解转化为因子载荷 d = (1 + np.sum((slop / 1.702) ** 2, axis=0)) ** 0.5 d.shape = 1, d.shape[0] init_loadings = slop / (d * 1.702) loadings = GPForth(init_loadings.transpose()).solve() return loadings def _get_init_slop_threshold(self, dim_size): # 求初始值 # 斜率是因子分析后的因子载荷转化 # 阈值是logistic函数的反函数转化 loadings = Factor(self.scores, dim_size).mirt_loading loadings_tr = loadings.transpose() d = (1 - np.sum(loadings_tr ** 2, axis=0)) ** 0.5 init_slop = loadings_tr / d * 1.702 init_threshold = inverse_logistic(np.mean(self.scores, axis=0)) init_threshold.shape = 1, init_threshold.shape[0] init_threshold = init_threshold / d return init_slop, init_threshold