PK J5 5 $ crowdlib-0.8.50/crowdlib/__init__.py# vim: set fileencoding=utf-8 noexpandtab:
# Created at the University of Maryland, Human-Computer Interaction Lab
# (c) Copyright 2011 Alexander J. Quinn
# Licensed under the MIT License (see doc/LICENSE.txt)
'''
@author: Alexander J. Quinn
@contact: aq@purdue.edu
@since: January 2010
'''
from __future__ import division
__version__ = "0.8.50"
__author__ = "Alexander J. Quinn"
__license__ = "MIT License"
__copyright__ = "Copyright 2011-2017, Alexander J. Quinn"
################################################################################
# COMPATIBILITY CHECK
#
# Check for a couple common installation issues:
# - wrong Python version (need >=2.6)
# - broken sqlite3 module (common problem with Redhat 4)
#
MINIMUM_PYTHON_VERSION = "2.5"
import sys
if sys.version[:3] < MINIMUM_PYTHON_VERSION:
sys.stderr.write("\n")
sys.stderr.write("**********************************************\n")
sys.stderr.write("* CrowdLib currently requires Python %s+ *\n"%MINIMUM_PYTHON_VERSION)
sys.stderr.write("**********************************************\n")
sys.stderr.write("\n")
raise Exception("CrowdLib currently requires Python %s+"%MINIMUM_PYTHON_VERSION)
try:
import sqlite3
del sqlite3
except ImportError:
sys.stderr.write("\n")
sys.stderr.write("\n")
sys.stderr.write("**********************************************\n")
sys.stderr.write("* Your Python install is broken. *\n")
sys.stderr.write("* *\n")
sys.stderr.write("* When Python was compiled on this machine, *\n")
sys.stderr.write("* the SQLite3 libraries were not present or *\n")
sys.stderr.write("* not found. *\n")
sys.stderr.write("**********************************************\n")
sys.stderr.write("\n")
sys.stderr.write("\n")
raise
del sys
# [pylint] tolerate use of `id` (name of a built-in function) below : pylint:disable=W0622
# Private imports we want to keep out of the general crowdlib namespace.
#
# This is a hack. Basically, if somebody does ...
# import crowdlib
# print(dir(crowdlib))
# ... I don't want them to see classes and things that aren't meant to be used
# directly by clients. With this, there will be a single entry called "_".
class _(object):
from crowdlib.AMT import AMT
from crowdlib.AMTInstanceManager import AMTInstanceManager
from crowdlib.CrowdLibSettings import CrowdLibSettings
from crowdlib.RequesterStatistics import RequesterStatistics
from crowdlib.QualificationRequirement import QualificationRequirement
from crowdlib.QuestionField import TextField
from crowdlib.QuestionField import NumberField
from crowdlib.QuestionField import RadioButtonField
from crowdlib.QuestionField import MultiChooserField
from crowdlib.QuestionField import DropDownField
from crowdlib.QuestionField import ComboBoxField
from crowdlib.QuestionField import CheckBoxField
from crowdlib.QuestionField import FileUploadField
from crowdlib.QuestionField import ListField
settings = CrowdLibSettings() # CrowdLibSettings is a singleton class.
amt_instance_manager = AMTInstanceManager()
@staticmethod
def get_amt():
return _.amt_instance_manager.get_amt(_.settings)
################################################################################
# GLOBAL SETTINGS OBJECT
settings = _.settings
################################################################################
# CREATING HITS
#
def create_hit_type(title,
description,
reward=None,
currency=None,
time_limit=None,
keywords=None,
autopay_delay=None,
qualification_requirements=None):
# Passing None means to use your defaults from your settings.
# Amazon does not allow "" for description. You will get an error if you try.
hit_type = _.get_amt().create_hit_type(
title=title,
description=description,
reward=reward,
currency=currency,
time_limit=time_limit,
keywords=keywords,
autopay_delay=autopay_delay,
qualification_requirements=qualification_requirements)
return hit_type
def text_field(label, id):
return _.TextField(label=label, id=id)
def checkbox_field(label, id, choices):
return _.CheckBoxField(label, id, choices)
def combobox_field(label, id, choices):
return _.ComboBoxField(label, id, choices)
def dropdown_field(label, id, choices):
return _.DropDownField(label, id, choices)
def file_upload_field(label, id, min_bytes=0, max_bytes=2000000000):
return _.FileUploadField(label, id, min_bytes, max_bytes)
def list_field(label, id, choices):
return _.ListField(label, id, choices)
def multichooser_field(label, id, choices):
return _.MultiChooserField(label, id, choices)
def number_field(label, id, min_value=None, max_value=None):
return _.NumberField(label=label, id=id, min_value=min_value, max_value=max_value)
def radiobutton_field(label, id, choices, other=None):
return _.RadioButtonField(label, id, choices, other)
################################################################################
# RECEIVING RESULTS
#
def get_current_notification_events(cgi_fields=None):
return _.get_amt().get_current_notification_events_from_cgi(cgi_fields)
def start_notifications(url):
return _.get_amt().start_notifications(url)
def stop_notifications():
return _.get_amt().stop_notifications()
################################################################################
# GETTERS
#
def get_hit_type(id):
from crowdlib.utility import to_unicode
hit_type_id = to_unicode(id)
return _.get_amt().get_hit_type(hit_type_id, do_sync_if_not_found=True)
def get_hit_types(since=None, until=None, title_re=None):
from crowdlib.utility import coerce_to_date_time
from crowdlib.utility.miscellaneous import GeneratingSequence
since = coerce_to_date_time(since, interpret_durations_as_past=True, default_time="00:00:00.000000")
until = coerce_to_date_time(until, interpret_durations_as_past=True, default_time="23:59:59.999999")
return GeneratingSequence(_.get_amt().get_hit_types(since=since, until=until, title_re=title_re))
def get_hit(id):
from crowdlib.utility import to_unicode
hit_id = to_unicode(id)
return _.get_amt().get_hit(hit_id)
def get_hits(since=None, until=None, title_re=None):
from crowdlib.utility import coerce_to_date_time
from crowdlib.utility.miscellaneous import GeneratingSequence
since = coerce_to_date_time(since, interpret_durations_as_past=True, default_time="00:00:00.000000")
until = coerce_to_date_time(until, interpret_durations_as_past=True, default_time="23:59:59.999999")
return GeneratingSequence(_.get_amt().get_hits(since=since, until=until, title_re=title_re))
def get_qualification_types():
from crowdlib.utility.miscellaneous import GeneratingSequence
return GeneratingSequence(_.get_amt().get_qualification_types())
def get_qualification_type(id):
from crowdlib.utility import to_unicode
qualification_type_id = to_unicode(id)
return _.get_amt().get_qualification_type(qualification_type_id)
def get_worker(id):
from crowdlib.utility import to_unicode
worker_id = to_unicode(id)
return _.get_amt().get_worker(worker_id)
def get_workers(since=None, until=None, title_re=None):
from crowdlib.utility import coerce_to_date_time
from crowdlib.utility.miscellaneous import GeneratingSequence
since = coerce_to_date_time(since, interpret_durations_as_past=True, default_time="00:00:00.000000")
until = coerce_to_date_time(until, interpret_durations_as_past=True, default_time="23:59:59.999999")
return GeneratingSequence(_.get_amt().get_workers(since=since, until=until, title_re=title_re))
def get_assignment(id):
from crowdlib.utility import to_unicode
assignment_id = to_unicode(id)
return _.get_amt().get_assignment(assignment_id)
def get_assignments(since=None, until=None, title_re=None):
from crowdlib.utility import coerce_to_date_time
from crowdlib.utility.miscellaneous import GeneratingSequence
since = coerce_to_date_time(since, interpret_durations_as_past=True, default_time="00:00:00.000000")
until = coerce_to_date_time(until, interpret_durations_as_past=True, default_time="23:59:59.999999")
return GeneratingSequence(_.get_amt().get_assignments(since=since, until=until, title_re=title_re))
def get_account_balance():
return _.get_amt().get_account_balance()
################################################################################
# HOUSEKEEPING
#
def sync_with_amt():
_.get_amt().sync_with_amt()
def set_all_hits_unavailable():
_.get_amt().set_all_hits_unavailable()
requester_statistics = _.RequesterStatistics(_.get_amt, time_period="LifeToDate")
requester_statistics_by_day = _.RequesterStatistics(_.get_amt, time_period="OneDay")
################################################################################
# QUALIFICATION REQUIREMENTS
#
def create_agreement_requirement(name, description, xhtml, agree_text, keywords=None, initially_active=True, for_preview=False):
return _.get_amt().create_click_through_qualification_requirement(name=name,
description=description, xhtml=xhtml, agree_text=agree_text, keywords=keywords,
initially_active=initially_active, required_to_preview=for_preview)
def create_qualification_type( name, description, initially_active=None,
keywords=None, retry_delay=None,
test_xml=None, answer_key_xml=None,
test_duration=None, auto_granted=None, auto_granted_value=None,
is_requestable=None):
if initially_active is None:
initially_active = True
if keywords is None:
keywords = ()
# retry_delay may be None (to indicate that retry is not allowed)
# test_xml may be None (to indicate that the worker need not do a test)
# answer_key_xml may be None (to indicate that you will process requests manually)
# test_duration may be None but only if test_xml is None
if auto_granted is None:
auto_granted = False
# auto_granted_value may be None but only if auto_granted==False and must not be None otherwise.
qualification_type = _.get_amt().create_qualification_type(name=name, description=description, initially_active=initially_active,
keywords=keywords, retry_delay=retry_delay,
test_xml=test_xml, answer_key_xml=answer_key_xml,
test_duration=test_duration, auto_granted=auto_granted, auto_granted_value=auto_granted_value,
is_requestable=is_requestable)
return qualification_type
################################################################################
# BUILT-IN QUALIFICATION REQUIREMENTS
#
def masters_requirement(for_preview=False): # min_pct between 0 and 100
return _.QualificationRequirement.masters_qualification_requirement(_.settings.service_type, for_preview)
def photo_moderation_masters_requirement(for_preview=False): # min_pct between 0 and 100
return _.QualificationRequirement.photo_moderation_masters_qualification_requirement(_.settings.service_type, for_preview)
def categorization_masters_requirement(for_preview=False): # min_pct between 0 and 100
return _.QualificationRequirement.categorization_masters_qualification_requirement(_.settings.service_type, for_preview)
def approval_requirement(min_pct, for_preview=False): # min_pct between 0 and 100
return _.QualificationRequirement.approval_rate_qualification_requirement(min_pct, for_preview)
def submission_requirement(min_pct, for_preview=False): # min_pct between 0 and 100
return _.QualificationRequirement.submission_rate_qualification_requirement(min_pct, for_preview)
def return_requirement(max_pct, for_preview=False): # max_pct between 0 and 100
return _.QualificationRequirement.return_rate_qualification_requirement(max_pct, for_preview)
def abandon_requirement(max_pct, for_preview=False): # max_pct between 0 and 100
return _.QualificationRequirement.abandon_rate_qualification_requirement(max_pct, for_preview)
def rejection_requirement(max_pct, for_preview=False): # max_pct between 0 and 100
return _.QualificationRequirement.rejection_rate_qualification_requirement(max_pct, for_preview)
def adult_requirement(for_preview=False):
return _.QualificationRequirement.adult_qualification_requirement(for_preview)
def in_country_requirement(country, for_preview=False):
# country is a ISO 3166 a code (e.g. US, IN, PK, CN, MX, CA, etc.)
return _.QualificationRequirement.locale_qualification_requirement("EqualTo", country, for_preview)
def not_in_country_requirement(country, for_preview=False):
# country is a ISO 3166 a code (e.g. US, IN, PK, CN, MX, CA, etc.)
return _.QualificationRequirement.locale_qualification_requirement("NotEqualTo", country, for_preview)
################################################################################
# UTILITY
#
from crowdlib.utility import to_duration
from crowdlib.utility import to_date_time as to_datetime
from crowdlib.utility import to_date
from crowdlib.reports import assignment_report
################################################################################
# EXCEPTIONS
#
from crowdlib.all_exceptions import AssignmentAlreadyFinalizedException
from crowdlib.all_exceptions import HITNotFoundException
from crowdlib.all_exceptions import HITTypeNotFoundException
from crowdlib.all_exceptions import AssignmentNotFoundException
from crowdlib.all_exceptions import WorkerNotFoundException
from crowdlib.all_exceptions import CannotInterpretDateTimeError
from crowdlib.all_exceptions import CannotInterpretDurationError
from crowdlib.all_exceptions import AMTRequestFailed
# Clean up the namespace
del division
if __name__=="__main__":
print( "\nThis is just a module, not the code entry point.\n" )
PK ፆHjT * crowdlib-0.8.50/crowdlib/all_exceptions.py# vim: set fileencoding=utf-8 noexpandtab:
# Created at the University of Maryland, Human-Computer Interaction Lab
# (c) Copyright 2011 Alexander J. Quinn
# Licensed under the MIT License (see doc/LICENSE.txt)
'''
@author: Alexander J. Quinn
@contact: aq@purdue.edu
@since: October 2010 (approx)
'''
from __future__ import division, with_statement
class CrowdLibBaseException(Exception):
def __str__( self ):
from crowdlib.utility import is_sequence_of_strings, to_tuple_if_non_sequence_iterable
args = self.args
args = to_tuple_if_non_sequence_iterable(args)
if is_sequence_of_strings(args) and len(args)==1:
return args[0]
else:
return repr(args)
@property
def message(self):
return str(self)
__repr__=__str__
class AMTRequestFailed( CrowdLibBaseException ): # [pylint] doesn't call super(..).__init__(..) : pylint:disable=W0231
'''
Exception indicating some problem with the query to AMT.
'''
def __init__( self, code, msg, operation, query_params=None ): # [pylint] doesn't call super(..).__init__(..) : pylint:disable=W0231
self.code = code
self.msg = msg
self.operation = operation
self.query_params = query_params
def _param_str(self):
return repr(tuple(sorted(self.__dict__)))
def __str__( self ):
return "%s during %s: %s"%(self.code, self.operation, self.msg)
def __repr__( self ):
return "AMTRequestFailed( %s, %s, %s )"%( self.code, self.msg, self.operation )
class AMTNotificationNotAvailable(CrowdLibBaseException):
pass
class NotificationsAddressNotFoundException(CrowdLibBaseException):
def __init__(self, hit_type_id): # [pylint] doesn't call super(..).__init__(..) : pylint:disable=W0231
self.hit_type_id = hit_type_id
class AMTQualificationTypeAlreadyExists(AMTRequestFailed):
def __init__(self, code, msg, operation, name):
AMTRequestFailed.__init__(self, code, msg, operation)
self.name = name
def __str__( self ):
return "A QualificationType with the name %r already exists. ... Try changing a character in the name. Notify Alex Quinn if this persists."%self.name
def __repr__( self ):
return "AMTQualificationTypeAlreadyExists(%s, %s, %s, %s )"% \
( self.code, self.msg, self.operation, self.name )
class AssignmentAlreadyFinalizedException(CrowdLibBaseException):
def __init__(self, assignment_id, assignment_status): # [pylint] doesn't call super(..).__init__(..) : pylint:disable=W0231
self.assignment_id = assignment_id
self.assignment_status = assignment_status
@property
def message(self):
return "Already %s"%(self.assignment_status.lower())
def __str__(self):
return "AssignmentAlreadyFinalizedException(%s, %s)"%(self.assignment_id, self.assignment_status)
__repr__ = __str__
class NotFoundException(CrowdLibBaseException):
pass
class AssignmentNotFoundException(NotFoundException):
def __init__(self, assignment_id, msg=""):
NotFoundException.__init__(self, assignment_id, msg)
self.assignment_id = assignment_id
if msg=="":
msg = assignment_id
elif assignment_id not in msg:
msg = msg + "(" + assignment_id + ")"
self.msg = msg
class HITTypeNotFoundException(NotFoundException):
def __init__(self, hit_type_id, msg=""):
NotFoundException.__init__(self, hit_type_id, msg)
self.hit_type_id = hit_type_id
if msg=="":
msg = hit_type_id
elif hit_type_id not in msg:
msg = msg + "(" + hit_type_id + ")"
self.msg = msg
class HITNotFoundException(NotFoundException):
def __init__(self, hit_id, msg=""):
NotFoundException.__init__(self, hit_id, msg)
self.hit_id = hit_id
if msg=="":
msg = hit_id
elif hit_id not in msg:
msg = msg + "(" + hit_id + ")"
self.msg = msg
class QualificationTypeNotFoundException(NotFoundException):
def __init__(self, qtype_id, msg=""):
NotFoundException.__init__(self, qtype_id, msg)
self.qualification_type_id = qtype_id
if msg=="":
msg = qtype_id
elif qtype_id not in msg:
msg = msg + "(" + qtype_id + ")"
self.msg = msg
class WorkerNotFoundException(NotFoundException):
def __init__(self, worker_id, msg=""):
NotFoundException.__init__(self, worker_id, msg)
self.worker_id = worker_id
if msg=="":
msg = worker_id
elif worker_id not in msg:
msg = msg + "(" + worker_id + ")"
self.msg = msg
class CannotInterpretError(ValueError, CrowdLibBaseException):
pass
class CannotInterpretDurationError(CannotInterpretError):
pass
class CannotInterpretDateTimeError(CannotInterpretError):
pass
class CannotInterpretDateError(CannotInterpretError):
pass
class CannotInterpretTimeError(CannotInterpretError):
pass
class QuestionXMLError(CrowdLibBaseException):
def __init__(self, xml_str, amt_error_msg, amt_error_code): # [pylint] doesn't call super(..).__init__(..) : pylint:disable=W0231
self.xml = xml_str
self.amt_error_code = amt_error_code
self.amt_error_msg = amt_error_msg
from crowdlib.utility import xml2dom
import xml.parsers.expat
try:
xml2dom(xml_str)
self.msg = "Question XML appears to be well-formed XML. However, it was not accepted by the Amazon Mechanical Turk server. This probably means it conform to neither the QuestionForm or the ExternalQuestion schemas. AWS said: %s, %s"%(amt_error_code, amt_error_msg)
self.is_well_formed = True
except xml.parsers.expat.ExpatError:
import sys
e = sys.exc_info()[1]
self.is_well_formed = False
emsg = e.message
emsg = emsg[0].upper() + emsg[1:]
self.msg = "Question XML is not well-formed XML. " + emsg
def __str__(self):
return self.msg
class CrowdLibInternalError(CrowdLibBaseException):
def __init__( self, msg ): # [pylint] doesn't call super(..).__init__(..) : pylint:disable=W0231
self.msg = msg
def __str__(self):
return self.msg
class AMTDataError(CrowdLibInternalError):
pass
class XMLProcessingException(CrowdLibInternalError):
def __init__( self, msg ): # [pylint] doesn't call super(..).__init__(..) : pylint:disable=W0231
self.msg = msg
def __str__(self):
return self.msg
class CrowdLibSettingsError(ValueError):
pass
#JUNK#class AMTConnectionNotOpenException(CrowdLibBaseException):
#JUNK# pass
#JUNK#class CannotUpdateAssignmentException(CrowdLibBaseException):
#JUNK# pass
#JUNK#class CaughtKeyboardInterrupt(KeyboardInterrupt):
#JUNK# pass
#JUNK#class AbstractClassMethodException( CrowdLibBaseException ):
#JUNK# pass
#JUNK#class CrowdLibNotEnabledException(CrowdLibBaseException):
#JUNK# pass
# [no longer used as far as I know, 12/4/2013]
#class CrowdLibOperationException(Exception):
# msg = ""
#
# def __str__( self ):
# return "%s during %s: %s"%(self.code, self.operation, self.msg)
#
# def __repr__( self ):
# type_name = type(self).__name__
# param_str = ", ".join((k+"="+repr(v)) for (k,v) in self.__dict__.items())
# return type_name + "(" + param_str + ")"
# def print_msg( self ):
# from crowdlib.utility import log
# log( '- Error: '+', '.join( (self.code, self.msg, self.operation) ) )
# def print_msg( self ):
# from crowdlib.utility import log
# log(str(self))
#class HITNotFoundException(NotFoundException):
# def __init__(self, hit_id, msg=""):
# self.hit_id = hit_id
# self.msg = msg
PK ፆH/, crowdlib-0.8.50/crowdlib/AMT.py# vim: set fileencoding=utf-8 noexpandtab:
# Created at the University of Maryland, Human-Computer Interaction Lab
# (c) Copyright 2011 Alexander J. Quinn
# Licensed under the MIT License (see doc/LICENSE.txt)
'''
@author: Alexander J. Quinn
@contact: aq@purdue.edu
@since: January 2010
'''
# NOTE: The AMT class will call its own close() method when Python exists, using the `atexit` standard Python module.
from __future__ import division, with_statement
# [pylint] Tolerate too many public methods, branches, and instance attributes. : pylint:disable=R0904,R0912,R0902
from crowdlib.all_exceptions import AMTDataError, AMTNotificationNotAvailable, \
AMTQualificationTypeAlreadyExists, AMTRequestFailed, AssignmentNotFoundException, \
CrowdLibInternalError, CrowdLibSettingsError, HITNotFoundException, \
HITTypeNotFoundException, QualificationTypeNotFoundException, WorkerNotFoundException
from crowdlib.AMTDB import AMTDB, DATABASE_ERROR_BASE_CLASS
from crowdlib.AMTMemoryCache import AMTMemoryCache
from crowdlib.AMTServer import AMTServer
from crowdlib.Answer import AnswerBlank, AnswerFreeText, AnswerSelection, AnswerUploadedFile
from crowdlib.Assignment import Assignment
from crowdlib.Bonus import Bonus
from crowdlib.currency_codes_iso_4217 import currency_codes_iso_4217
from crowdlib.HIT import HIT
from crowdlib.HITType import HITType
from crowdlib.NotificationEvent import NotificationEvent
from crowdlib.QualificationRequirement import QualificationRequirement
from crowdlib.QualificationType import QualificationType
from crowdlib.utility import dom_cdata, dom_element, duration_to_seconds, is_iterable, \
is_sequence_of_strings, is_number, is_sequence, is_string, is_valid_xml, \
make_xml_document_root_element_fn_cdatas_fn, to_date_time, \
to_duration, to_unicode, urlopen_py23_compatible, urlretrieve_py23_compatible, \
to_tuple_if_non_sequence_iterable
from crowdlib.utility.time_utils import now_local
from crowdlib.utility.debugging import dbg_log
from crowdlib.Worker import Worker
from xml.sax.saxutils import quoteattr
import atexit, cgi, datetime, functools, os, re, sys, textwrap, xml.dom.minidom as xdm
class AMT( object ):
def __init__(self, settings):
if not hasattr(self, "_already_initialized"):
self._hit_sync_last_time = None
self._hit_sync_interval = datetime.timedelta(seconds=60)
# WARNING: If _hit_sync_interval is too small, it will cause infinite loops.
self._service_type = settings.service_type
self._db_dir = settings.db_dir
self._aws_account_id = settings.aws_account_id
self._aws_account_key = settings.aws_account_key
self._default_reward = settings.default_reward
self._default_autopay_delay = settings.default_autopay_delay
self._default_currency = settings.default_currency
self._default_lifetime = settings.default_lifetime
self._default_max_assignments = settings.default_max_assignments
self._default_time_limit = settings.default_time_limit
self._default_qualification_requirements = settings.default_qualification_requirements
self._default_keywords = settings.default_keywords
self._html_template = self._get_html_template()
self._check_settings()
self._server = AMTServer(self._aws_account_id, self._aws_account_key, self._service_type)
self._memory_cache = AMTMemoryCache()
self._db = self._set_up_db()
if not self._db.is_new:
self.sync_with_amt() # DB file was just created, so we must sync with the server.
atexit.register(AMT.close, self)
self._already_initialized = True
def _get_html_template(self):
# helper for __init__(..)
crowdlib_src_dir = os.path.dirname(os.path.abspath(__file__))
html_template_path = os.path.join(crowdlib_src_dir, "html_hit_template.html")
with open(html_template_path, "rb") as infile:
return infile.read().decode("utf8")
def _check_settings(self):
if not self._aws_account_id and not self._aws_account_key:
raise CrowdLibSettingsError("No AWS account credentials. Create a crowdlib_settings.py file with this and other information, and import it before doing any operations with CrowdLib."%self._aws_account_id)
if self._aws_account_id=="":
raise CrowdLibSettingsError("AWS Account ID in crowdlib_settings must not be %r."%self._aws_account_id)
if self._aws_account_key=="":
raise CrowdLibSettingsError("AWS Account Key ID in crowdlib_settings must not be %r."%self._aws_account_key)
# helper for __init__(..)
if self._db_dir is None:
raise CrowdLibSettingsError("db_dir must be a directory path. db_dir==%s"%repr(self._db_dir))
if self._service_type not in ("sandbox", "production"):
raise CrowdLibSettingsError("service_type must be \"sandbox\" or \"production\". Got %s"%self._service_type)
def _set_up_db(self):
# helper for __init__(..)
db_filename = ".".join(("crowdlib", self._aws_account_id, self._service_type, "sqlite3"))
db_dir = os.path.abspath(self._db_dir)
if not os.path.isdir(db_dir):
os.makedirs(db_dir)
db_path = os.path.join(db_dir, db_filename)
#return AMTDB(db_path)
return AMTDB(db_path, always_commit=True) # FIXME: Why did you think this was needed?
def open(self):
self._db.open()
def close(self):
self._db.close()
def do_request( self, operation, specific_parameters):
return self._server.do_request(operation, specific_parameters)
def create_hit_type(self, title, description=None, reward=None, currency=None, time_limit=None,
keywords=None, autopay_delay=None, qualification_requirements=None):
# Fill in defaults for any parameters that are None
if reward is None:
reward = self._default_reward
if description is None:
description = ""
if currency is None:
currency = self._default_currency
if time_limit is None:
time_limit = self._default_time_limit
if keywords is None:
keywords = self._default_keywords
if autopay_delay is None:
autopay_delay = self._default_autopay_delay
if qualification_requirements is None:
qualification_requirements = self._default_qualification_requirements
# Convert duration parameters to a uniform type.
time_limit = to_duration(time_limit)
autopay_delay = to_duration(autopay_delay)
assert currency in currency_codes_iso_4217
assert is_number(reward)
assert is_string(title)
assert is_string(description)
assert is_iterable(keywords)
assert is_iterable(qualification_requirements)
assert isinstance(autopay_delay, datetime.timedelta)
assert isinstance(time_limit, datetime.timedelta)
keywords = to_tuple_if_non_sequence_iterable(keywords)
assert is_sequence_of_strings(keywords)
qualification_requirements = tuple(qualification_requirements)
assert all(isinstance(qr, QualificationRequirement) for qr in qualification_requirements)
hit_type_id = self._server.register_hit_type( title=title, description=description,
reward=reward, currency=currency, time_limit=time_limit,
keywords=keywords,
autopay_delay=autopay_delay, qualification_requirements=qualification_requirements)
if self._memory_cache.has_hit_type(hit_type_id):
hit_type = self._memory_cache.get_hit_type(hit_type_id)
else:
# Create the HITType object, which will be put into the database.
hit_type = HITType(
id = hit_type_id,
title = title,
description = description,
reward = reward,
currency = currency,
time_limit = time_limit,
autopay_delay = autopay_delay,
keywords = keywords,
qualification_requirements = qualification_requirements,
amt = self
)
# Store in the database.
self._db.put_hit_type(hit_type)
# Store in the memory cache.
self._memory_cache.put_hit_type(hit_type)
assert isinstance(hit_type, HITType)
return hit_type
def url_for_hit_type_id(self,hit_type_id):
return self._server.preview_hit_type_url_stem + hit_type_id
def create_hit(self, hit_type, question_xml, max_assignments=None, lifetime=None, requester_annotation=None):
unique_request_token = None # TODO: Implement this for real; not fully implemented as of 12/1/2013
# Fill in defaults for any parameters that are None
if lifetime is None:
lifetime = self._default_lifetime
if max_assignments is None:
max_assignments = self._default_max_assignments
if requester_annotation is None:
requester_annotation = ""
# Convert duration parameters to a uniform type.
lifetime = to_duration(lifetime)
hit_record = self._server.create_hit(
hit_type_id = hit_type.id,
question_xml = question_xml,
lifetime_in_seconds = duration_to_seconds(lifetime),
max_assignments = max_assignments,
requester_annotation = requester_annotation,
unique_request_token = unique_request_token
)
assert hit_record.num_pending==0
assert hit_record.num_available==max_assignments
assert hit_record.num_completed==0
assert hit_record.hit_status==HIT.HIT_STATUS_ASSIGNABLE
assert hit_record.requester_annotation==requester_annotation
hit = HIT(
id = hit_record.hit_id,
hit_type = hit_type,
requester_annotation = hit_record.requester_annotation,
max_assignments = hit_record.max_assignments,
question_xml = question_xml,
creation_time = hit_record.creation_time,
hit_status = hit_record.hit_status,
expiration_time = hit_record.expiration_time,
num_pending = hit_record.num_pending,
num_available = hit_record.num_available,
num_completed = hit_record.num_completed,
hit_review_status = HIT.HIT_REVIEW_STATUS_NOT_REVIEWED,
approximate_expiration_time = None,
amt = self
)
# Store in the database.
self._db.put_hit(hit)
hit_in_memory_cache = self._memory_cache.has_hit(hit_record.hit_id)
assert not hit_in_memory_cache, "Already have HIT with ID %s. Why??? (error #3162)"%hit_record.hit_id
# This assertion was removed 8-18-2011 because it appears that I was
# getting HIT IDs that had been seen before. That might lead to
# consistency bugs. Need to investigate. I re-enabled it on
# 12/1/2013. Although I never resolved the issue, enough has changed
# in the last 2+ years that there's at least some chance that the bug
# no longer exists. We'll see. (ref#3125)
# Store in the memory cache.
self._memory_cache.put_hit(hit)
# This also had been disabled at some point. On 12-1-2013, I put it back. (ref#3126)
return hit
def create_hit_from_fields(self, fields, hit_type, lifetime, max_assignments, requester_annotation):
from crowdlib.QuestionField import AbstractQuestionField, make_text_or_formatted_content_xml
# from crowdlib.utility.xml_helpers import looks_like_limited_xhtml
# Construct the XML dynamically using the xml.dom.minidom module.
namespace = 'http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionForm.xsd'
contents = []
for field in fields:
if isinstance(field, AbstractQuestionField):
contents.append(field.xml)
# elif looks_like_limited_xhtml(field):
# contents.append("FormattedContent", *cdatas(field))
elif is_string(field):
# contents.append("Text", field)
contents.append(make_text_or_formatted_content_xml(field))
else:
raise ValueError("Invalid field: %r"%field)
contents = "".join(contents)
question_xml = "%(contents)s"%{
"namespace" : quoteattr(namespace),
"contents" : "".join(contents)
}
# Create the HIT.
hit = self.create_hit(
hit_type = hit_type,
question_xml = question_xml,
max_assignments = max_assignments,
lifetime = lifetime,
requester_annotation = requester_annotation
)
return hit
def _create_hit_from_url_or_html(self, which, content, frame_height, hit_type, lifetime, max_assignments, requester_annotation):
if which == "url":
root_node_name = "ExternalQuestion"
namespace = 'http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd'
url = content
elif which == "html":
root_node_name = "HTMLQuestion"
namespace = "http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2011-11-11/HTMLQuestion.xsd"
html = content
else:
assert False, "Unexpected value of `which`: %r"%which
root,element,cdatas = make_xml_document_root_element_fn_cdatas_fn(root_node_name=root_node_name, namespace=namespace)
if which == "url":
root.appendChild(element("ExternalURL", url ))
else:
root.appendChild(element("HTMLContent", *cdatas(html) ))
root.appendChild(element("FrameHeight", frame_height))
question_xml = root.toxml()
# Create the HIT.
hit = self.create_hit(
hit_type = hit_type,
question_xml = question_xml,
max_assignments = max_assignments,
lifetime = lifetime,
requester_annotation = requester_annotation
)
return hit
def create_hit_from_html_parts(self, body, onload="", style="", *args, **kwargs):
html = self._html_template%{"body":body, "onload":onload, "style":style}
return self.create_hit_from_html(html=html, *args, **kwargs)
def create_hit_from_html(self, html, *args, **kwargs):
return self._create_hit_from_url_or_html(which="html", content=html, *args, **kwargs)
def create_hit_from_url(self, url, *args, **kwargs):
return self._create_hit_from_url_or_html(which="url", content=url, *args, **kwargs)
def create_qualification_type( self, name, description, initially_active, keywords, retry_delay, test_xml,
answer_key_xml, test_duration, auto_granted, auto_granted_value):
assert is_iterable(keywords)
keywords = to_tuple_if_non_sequence_iterable(keywords)
assert is_string(name)
assert is_string(description)
assert isinstance(initially_active, bool)
assert is_sequence_of_strings(keywords)
assert isinstance(retry_delay, datetime.timedelta), retry_delay
assert (answer_key_xml is None) or (is_string(answer_key_xml) and is_valid_xml(answer_key_xml))
assert isinstance(test_duration, datetime.timedelta)
assert isinstance(auto_granted, bool)
assert auto_granted_value is None or isinstance(auto_granted_value,int)
kwargs = dict(
name = name,
description = description,
initially_active = initially_active,
keywords = keywords,
retry_delay = retry_delay,
test_xml = test_xml,
answer_key_xml = answer_key_xml,
test_duration = test_duration,
auto_granted = auto_granted,
auto_granted_value = auto_granted_value,
)
try:
qtype_id = self._server.create_qualification_type(**kwargs) # returns qualification_type_id
except AMTQualificationTypeAlreadyExists:
matches = self._search_qualification_types(name=name, description=description, test_xml=test_xml, keywords=keywords)
if len(matches) == 1:
qtype = matches[0] # instance of QualificationType
else:
dbg_log("AMT says this QualificationType is duplicate. When we searched AMT and the local DB by name, description, test XML, and keywords, we found %d matches"%len(matches)) #(ref#7055)
raise
# if len(matches)==0:
# raise # This would indicate a CrowdLib bug. AMT says it already exists but we can't find it in server or DB???
# elif len(matches) > 1:
# raise AMTDataError("%d QualificationType matches by name, description, test XML, keywords"%len(matches)) #(ref#7055)
else:
qtype = QualificationType(id=qtype_id, creation_time=now_local(), **kwargs)
return qtype
def _search_qualification_types(self, name, description, test_xml, keywords):
qtypes = self.get_qualification_types() # from both server and database
matches = []
for qt in qtypes:
if (qt.name.strip() == name) \
and (qt.description == description) \
and (qt.test_xml is None or qt.test_xml == test_xml) \
and (qt.keywords is None or sorted(qt.keywords)==sorted(keywords)):
matches.append(qt)
return tuple(matches)
def create_click_through_qualification_requirement( self, name, description, xhtml, agree_text,
initially_active=True, keywords=None, required_to_preview=False): #remove_old=False,
# Primarily used for IRB informed consent form. The given XHTML is displayed with a checkbox below it.
# TODO: Add back in the ability to remove an old qualification type if need be.
# if remove_old:
# existing_qualification_types = self.get_my_qualification_types()
# for id,nm in existing_qualification_types:
# if nm==name:
# self.dispose_qualification_type( id )
qtype = self.create_qualification_type(
name=name,
description=description,
initially_active=initially_active,
keywords=keywords,
retry_delay=datetime.timedelta(seconds=0), # DEFAULT VALUE
test_xml=self._create_click_through_qualification_requirement_question_xml(xhtml=xhtml, agree_text=agree_text),
answer_key_xml=self._CLICK_THROUGH_QUALIFICATION_REQUIREMENT_ANSWER_KEY_XML,
test_duration=datetime.timedelta(seconds=60*60*24), # DEFAULT VALUE : allow 24 hours to view form
auto_granted=False, # DEFAULT VALUE
auto_granted_value=1) # DEFAULT VALUE
qreq = QualificationRequirement(qtype.id, "=", 1, locale_value=None, required_to_preview=required_to_preview)
return qreq
def _create_click_through_qualification_requirement_question_xml(self, xhtml, agree_text):
# WEIRD CODE: To build the XML for the qualification requirement, we use DOM
# and build it programmatically. "qf" is the DOM document. element creates
# an element with the specified contents.
dom_impl = xdm.getDOMImplementation()
namespace = 'http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionForm.xsd'
doc = dom_impl.createDocument(namespace, "QuestionForm", None)
qf = doc.documentElement
qf.setAttribute( "xmlns", namespace )
element = functools.partial( dom_element, doc )
cdata = functools.partial( dom_cdata, doc )
qf.appendChild(
element( "Question",
element( "QuestionIdentifier", "do_you_agree" ),
element( "IsRequired", "true" ),
element( "QuestionContent",
element( "FormattedContent", cdata( xhtml ) ) ),
element( "AnswerSpecification",
element( "SelectionAnswer",
element( "MinSelectionCount", "1" ),
element( "StyleSuggestion", "checkbox" ),
element( "Selections",
element( "Selection",
element( "SelectionIdentifier", "agree" ),
element( "Text", agree_text )
) ) ) ) ) )
question_form_xml = doc.toxml( encoding="UTF-8" )
return question_form_xml
_CLICK_THROUGH_QUALIFICATION_REQUIREMENT_ANSWER_KEY_XML = '''\
do_you_agree
agree
1
'''
def get_qualification_type(self, qualification_type_id):
qualification_types = tuple(self.get_qualification_types())
qualification_type = tuple(qt for qt in qualification_types if qt.id==qualification_type_id)
if len(qualification_type)==1:
return qualification_type[0]
elif len(qualification_type)==0:
raise QualificationTypeNotFoundException(qualification_type_id)
else:
raise AMTDataError("Didn't expect to find %d qualification types for the same ID."%(len(qualification_type)))
def get_qualification_types(self):
return self._server.get_qualification_types()
def get_hit(self, hit_id, hit_type=None, hit_record=None):
assert is_string(hit_id), hit_id
hit = None
# 1. Try the cache.
try:
hit = self._memory_cache.get_hit(hit_id)
except HITNotFoundException:
# 2. Try the DB
try:
hit_record = hit_record or self._db.get_hit_record(hit_id=hit_id)
# (11/19/2013) Modified to allow passing in hit_record (for efficiency); might not be used at all
if hit_type is None:
# This hit_record does NOT have enough information to create the HITType.
hit_type = self.get_hit_type(hit_type_id=hit_record.hit_type_id, hit_id=hit_id, do_sync_if_not_found=False)
# This must have do_sync_if_not_found=False
should_update_db = False
except HITNotFoundException:
# 3. Try the AMT server.
hit_record = self._server.get_hit(hit_id=hit_id)
if hit_type is None:
# This hit_record DOES have enough information to create the HITType.
hit_type = self.get_hit_type(hit_type_id=hit_record.hit_type_id, hit_record=hit_record, do_sync_if_not_found=False)
# Not sure if this must have do_sync_if_not_found=False or not but that's how I found it.
should_update_db = True
assert isinstance(hit_type, HITType)
hit = self._make_hit_from_hit_record(hit_record, hit_type)
self._memory_cache.put_hit(hit)
if should_update_db:
self._db.put_hit(hit)
assert hit.id == hit_id
assert isinstance(hit, HIT)
return hit
def _make_hit_type_from_hit_record(self, hit_record):
qrrs = hit_record.qualification_requirements
qreqs = tuple(self._make_qualification_requirement_from_qualification_requirement_record(qrr) for qrr in qrrs)
hit_type = HITType( id=hit_record.hit_type_id,
title=hit_record.title,
description=hit_record.description,
reward=hit_record.reward.amount,
currency=hit_record.reward.currency_code,
time_limit=hit_record.assignment_duration,
autopay_delay=hit_record.auto_approval_delay,
keywords=hit_record.keywords,
qualification_requirements=qreqs,
amt=self)
return hit_type
def _make_hit_from_hit_record(self, hit_record, hit_type):
approximate_expiration_time = getattr(hit_record, "approximate_expiration_time", None)
hit = HIT( id=hit_record.hit_id,
hit_type = hit_type,
question_xml = hit_record.question,
max_assignments = hit_record.max_assignments,
requester_annotation = hit_record.requester_annotation,
creation_time = hit_record.creation_time,
hit_status = hit_record.hit_status,
expiration_time = hit_record.expiration_time,
num_pending = hit_record.num_pending,
num_available = hit_record.num_available,
num_completed = hit_record.num_completed,
hit_review_status = hit_record.hit_review_status,
approximate_expiration_time = approximate_expiration_time,
amt=self)
return hit
def _make_qualification_requirement_from_qualification_requirement_record(self, qualification_requirement_record):
qreq = QualificationRequirement(
qualification_type_id=qualification_requirement_record.qualification_type_id,
comparator=qualification_requirement_record.comparator,
integer_value=qualification_requirement_record.integer_value,
locale_value=qualification_requirement_record.locale_value,
required_to_preview=qualification_requirement_record.required_to_preview)
return qreq
def suggest_hit_sync(self, hit_type_id=None, must_sync=False):
# hit_type_id is only used to see if the given HITType has notifications working. If
# so, then we do nothing.
SKIP_SYNC_IF_NOTIFICATIONS_ARE_ACTIVE_AND_WORKING = False # This isn't working, yet.
now = now_local()
just_connected = False
if hit_type_id:
hit_sync_last_time = self._hit_sync_last_time
# NEW APPROACH (not working, yet): If notifications have been enabled and the first
# notification has been received, then don't bother syncing.
if SKIP_SYNC_IF_NOTIFICATIONS_ARE_ACTIVE_AND_WORKING:
if hit_sync_last_time:
try:
notification_hit_type_registration = self._db.get_notification_hit_type_registration(hit_type_id=hit_type_id)
except HITTypeNotFoundException:
pass
else:
max_notification_activation_delay = datetime.timedelta(minutes=5)
is_connected = notification_hit_type_registration.is_connected
registered_time = notification_hit_type_registration.registered_time
was_test_received = notification_hit_type_registration.test_received_time is not None
last_received_time = notification_hit_type_registration.last_received_time
if is_connected:
return False
elif was_test_received and hit_sync_last_time - registered_time >= max_notification_activation_delay:
self._db.set_notification_hit_type_is_connected(hit_type_id)
return False
elif last_received_time is not None:
assert was_test_received, (hit_type_id, notification_hit_type_registration)
just_connected = True
if (self._hit_sync_last_time is None) or (now - self._hit_sync_last_time > self._hit_sync_interval) or must_sync:
self._hit_sync_last_time = now
self.sync_with_amt()
if SKIP_SYNC_IF_NOTIFICATIONS_ARE_ACTIVE_AND_WORKING:
if just_connected:
self._db.set_notification_hit_type_is_connected(hit_type_id)
return True
else:
return False
def sync_with_amt(self):
hit_ids_seen = []
hit_records_from_db = dict((hr.hit_id, hr) for hr in self._db.get_hit_records())
for hit_record in self._server.search_hits():
hit_record_from_db = hit_records_from_db.get(hit_record.hit_id, None)
hit_ids_seen.append( hit_record.hit_id) # >=20131120
# The hit_record returned by SearchHITs does not contain the qualification_requirements
# so instead of passing the hit_record to get_hit_type, we will pass only the hit_id.
# That will typically cause get_hit_type to fetch the details using GetHIT, which should
# return the needed info.
hit_type = self.get_hit_type(hit_type_id=hit_record.hit_type_id, hit_id=hit_record.hit_id, do_sync_if_not_found=False)
try:
hit = self.get_hit(hit_id=hit_record.hit_id, hit_type=hit_type, hit_record=hit_record_from_db)
# [pylint] Tolerate access of protected member _update_from_amt. : pylint:disable=W0212
should_update = hit._update_from_amt(
expiration_time=hit_record.expiration_time,
max_assignments=hit_record.max_assignments,
num_pending=hit_record.num_pending,
num_completed=hit_record.num_completed,
num_available=hit_record.num_available,
hit_status=hit_record.hit_status,
question_xml=hit_record.question,
approximate_expiration_time=None, # be explicit, we are setting this to NULL because we now have the real value
)
except HITNotFoundException:
hit = self._make_hit_from_hit_record(hit_record=hit_record, hit_type=hit_type)
self._memory_cache.put_hit(hit)
should_update = True
if should_update: # added this check on 11-13-2013
self._db.put_hit(hit)
known_hit_ids_not_disposed = self._db.get_known_hit_ids_except_hit_status(HIT.HIT_STATUS_DISPOSED)
hit_ids_recently_disposed = set(known_hit_ids_not_disposed) - set(hit_ids_seen)
self._db.set_hit_statuses(hit_ids_recently_disposed, HIT.HIT_STATUS_DISPOSED)
sync_with_amt_to_update_assignment_counts = sync_with_amt # just an alias to make other code clearer
def get_requester_statistic(self, statistic, time_period):
# The max count is 730. Discovered by experiment on 2-14-2011.
if time_period=="OneDay":
count = 730 # Get as many as possible. This seems to be the max, based on experiment 2-14-2011.
stats = self._server.get_requester_statistic(statistic=statistic, time_period=time_period, count=count)
return stats
else:
stats = self._server.get_requester_statistic(statistic=statistic, time_period=time_period)
single_stat = stats[0][1]
return single_stat
def get_worker(self, worker_id):
assert is_string(worker_id), worker_id
try:
return self._memory_cache.get_worker(worker_id)
except WorkerNotFoundException:
worker = Worker(worker_id, self)
self._memory_cache.put_worker(worker)
return worker
def get_hits(self, since=None, until=None, title_re=None):
for hit_type in self.get_hit_types():
for hit in hit_type.hits:
if self._hit_matches_criteria(hit, since, until, title_re):
yield hit
def set_all_hits_unavailable(self):
for hit in self.get_hits():
try:
hit.is_available = False
except AMTRequestFailed:
pass
def _hit_matches_criteria(self, hit, since, until, title_re):
# Any of these criteria may be None, indicating that it should not be applied.
if (since is None) and (until is None) and (title_re is None):
return True
elif (since is not None) and not (hit.expiration_time > since):
return False
elif (until is not None) and not (hit.creation_time < until):
return False
elif (title_re is not None) and not (re.match(title_re, hit.hit_type.title)):
return False
else:
return True
def _hit_type_matches_criteria(self, hit_type, since, until, title_re):
# Return True iff *any* hit in the hit_type matches the given criteria.
# Any of the criteria may be None. Return True if all are None.
if title_re is not None and re.match(title_re, hit_type.title) is None:
return False
elif (since is None) and (until is None):
return True
else:
assert title_re is None or re.match(title_re, hit_type.title)
assert since is not None or until is not None
return any(self._hit_matches_criteria(hit, since, until, title_re=None) for hit in hit_type.hits)
def _assignment_matches_criteria(self, assignment, since, until, title_re):
if (since is None) and (until is None) and (title_re is None):
return True
elif (since is not None) and not (assignment.submit_time > since):
return False
elif (until is not None) and not (assignment.accept_time < until):
return False
elif (title_re is not None) and not (re.match(title_re, assignment.hit.hit_type.title)):
return False
else:
return True
def get_hits_for_hit_type(self, hit_type, since=None, until=None, title_re=None):
# Fetch all HITs for this HITType from the DB, but if a HIT was
# already in the memory cache, then return that one. We should never
# have two copies of the same HIT in memory at once.
for hit_record in self._db.get_hit_records(hit_type_id=hit_type.id):
try:
# If already in the DB, then use that.
hit = self._memory_cache.get_hit(hit_record.hit_id)
except HITNotFoundException:
# Otherwise, use what we found in the DB.
hit = self._make_hit_from_hit_record(hit_record=hit_record, hit_type=hit_type)
self._memory_cache.put_hit(hit)
if self._hit_matches_criteria(hit=hit, since=since, until=until, title_re=title_re):
yield hit
def get_hit_types(self, since=None, until=None, title_re=None):
# Fetch all known HITTypes from the DB, but if a HITType was already
# in the memory cache, then return that one.
for hit_type_record in self._db.get_hit_type_records():
hit_type_id = hit_type_record.hit_type_id
hit_type = None
if self._memory_cache.has_hit_type(hit_type_id):
hit_type = self._memory_cache.get_hit_type(hit_type_id)
else:
hit_type = self._make_hit_type_from_hit_record(hit_type_record)
self._memory_cache.put_hit_type(hit_type)
if self._hit_type_matches_criteria(hit_type, since, until, title_re):
yield hit_type
def get_hit_type(self, hit_type_id, hit_id=None, hit_record=None, do_sync_if_not_found=False):
hit_type = None
assert not ((hit_id is not None) and (do_sync_if_not_found==True)), \
"We will never do a sync if hit_id was provided, since the hit_id allows us to " \
"request the information directly without going through unrelated HITs."
assert (hit_record is None) or (hit_record.hit_type_id==hit_type_id)
try:
# 1. Try the memory cache.
hit_type = self._memory_cache.get_hit_type(hit_type_id=hit_type_id)
except HITTypeNotFoundException:
#2. Try the hit_record, if passed in.
if hit_record is not None:
hit_type = self._make_hit_type_from_hit_record(hit_record=hit_record)
else:
try:
# 3. Try the DB.
hit_type_record = self._db.get_hit_type_record(hit_type_id)
hit_type = self._make_hit_type_from_hit_record(hit_type_record)
self._memory_cache.put_hit_type(hit_type)
# On 7-4-2014, I got CrowdLibInternalError from the above call to put_hit_type(hit_type).
except HITTypeNotFoundException:
if hit_id is not None:
# 4. Try fetching a hit_record, if we have the hit_id.
hit_record = self._server.get_hit(hit_id=hit_id)
hit_type = self._make_hit_type_from_hit_record(hit_record=hit_record)
self._memory_cache.put_hit_type(hit_type)
self._db.put_hit_type(hit_type)
elif do_sync_if_not_found:
# 5. Try doing a full sync.
self.suggest_hit_sync()
hit_type = self.get_hit_type(hit_type_id=hit_type_id,
hit_record=hit_record,
do_sync_if_not_found=False) # RECURSIVE
else:
# 6. Give up. :(
raise
assert isinstance(hit_type, HITType)
assert hit_type.id==hit_type_id
return hit_type
def get_assignment(self, assignment_id, hit_id=None):
# If this assignment ID has never been seen, you will get an error.
if hit_id is None:
hit_id = self._db.get_hit_id_for_assignment_id(assignment_id)
# Will raise AssignmentNotFoundException if this assignment has never been
# seen by this instance of CrowdLib. May need to call sync_with_amt()
hit = self.get_hit(hit_id=hit_id)
# Might raise HITNotFoundException if the HIT is
try:
assignment_record = self._db.get_assignment_record(assignment_id=assignment_id)
worker = self.get_worker(assignment_record.worker_id)
assignment = self._make_assignment(assignment_record=assignment_record, hit=hit, worker=worker)
except AssignmentNotFoundException:
assignments = tuple( asg for asg in self.get_assignments_for_hit(hit=hit) if asg.id==assignment_id)
if len(assignments)==1:
assignment = assignments[0]
elif len(assignments)==0:
raise
else:
assert False, repr(assignments)
return assignment
def _make_assignment(self, assignment_record, hit, worker=None):
if worker is None:
worker = self.get_worker(worker_id=assignment_record.worker_id)
answer_records = assignment_record.answer_records
assignment_id = assignment_record.assignment_id
answers = tuple(self._make_answer(answer_record=answer_record, assignment_id=assignment_id) for answer_record in answer_records)
asg = Assignment(
id=assignment_record.assignment_id,
worker=worker,
hit=hit,
assignment_status=assignment_record.assignment_status,
autopay_time=assignment_record.auto_approval_time,
accept_time=assignment_record.accept_time,
submit_time=assignment_record.submit_time,
approval_time=assignment_record.approval_time,
rejection_time=assignment_record.rejection_time,
requester_feedback=assignment_record.requester_feedback,
answers=answers,
amt=self)
return asg
def _make_answer(self, answer_record, assignment_id):
if answer_record.selection_identifier is not None:
answer = AnswerSelection(question_id=answer_record.question_identifier,
selection_id=answer_record.selection_identifier,
other_selection_text=answer_record.other_selection,
assignment_id=assignment_id,
amt=self)
elif answer_record.uploaded_file_key is not None:
answer = AnswerUploadedFile(question_id=answer_record.question_identifier,
uploaded_file_key=answer_record.uploaded_file_key,
uploaded_file_size=answer_record.uploaded_file_size,
assignment_id=assignment_id,
amt=self)
elif answer_record.free_text is not None:
answer = AnswerFreeText(question_id=answer_record.question_identifier,
free_text=answer_record.free_text,
assignment_id=assignment_id,
amt=self)
else:
answer = AnswerBlank(assignment_id=assignment_id)
return answer
def get_reviewable_hits(self, hit_type):
assert isinstance(hit_type, HITType)
for hit_id in self._get_reviewable_hit_ids(hit_type=hit_type):
assert hit_id is not None
hit = self.get_hit(hit_id, hit_type)
if hit.hit_status!=HIT.HIT_STATUS_REVIEWABLE:
hit.sync_with_amt()
yield hit
def _get_reviewable_hit_ids(self, hit_type=None): # GENERATOR
if hit_type is None:
hit_type_id = None
else:
hit_type_id = hit_type.id
return self._server.get_reviewable_hit_ids(hit_type_id=hit_type_id) # generator
def extend_hit(self, hit, max_assignments_increment=0, expiration_increment=0):
max_assignments_increment = int(round(max_assignments_increment,0))
expiration_increment = int(round(expiration_increment,0))
assert max_assignments_increment>=0 and expiration_increment>=0
assert max_assignments_increment>0 or expiration_increment>0
self._server.extend_hit_id(hit_id=hit.id,
max_assignments_increment=max_assignments_increment,
expiration_increment=expiration_increment)
changes = hit._update_from_amt_due_to_extend_hit( # [pylint] accessing friend method : pylint:disable=W0212
max_assignments_increment=max_assignments_increment,
expiration_increment=expiration_increment
)
if changes:
self._db.update_hit(hit_id=hit.id, **changes)
def unblock_worker_id( self, worker_id, reason ):
# http://docs.amazonwebservices.com/AWSMturkAPI/2008-08-02/ApiReference_BlockWorkerOperation.html
# Turker will not see the reason. Reason is NOT required for unblock
self._server.unblock_worker_id(worker_id, reason)
self._db.update_worker_unblocked(worker_id)
def block_worker_id( self, worker_id, reason ):
# Reason is required for block and thus may not be None or "".
# http://docs.amazonwebservices.com/AWSMturkAPI/2008-08-02/ApiReference_UnblockWorkerOperation.html
self._server.do_request("BlockWorker", {"WorkerId" : worker_id, "Reason" : reason})
self._db.update_worker_blocked(worker_id, reason)
def is_worker_id_blocked(self, worker_id):
return self._db.is_worker_id_blocked(worker_id)
def get_worker_block_reason(self, worker_id):
return self._db.get_worker_block_reason(worker_id)
def notify_worker( self, worker_id, subject, message_text ):
self.notify_workers((worker_id,), subject, message_text)
def notify_workers( self, workers_or_worker_ids, subject, message_text ):
worker_ids = tuple((w.id if isinstance(w,Worker) else w) for w in workers_or_worker_ids)
send_time = now_local()
email_wrap = 65
message_text = "\n".join(textwrap.fill(s,email_wrap) for s in message_text.splitlines())
for i in range(0,len(worker_ids),100):
worker_id_batch = worker_ids[i:i+100]
self._server.notify_workers(worker_ids=worker_ids, subject=subject, message_text=message_text)
for worker_id in worker_id_batch:
try:
self._db.record_worker_message(worker_id=worker_id,
send_time=send_time,
subject=subject,
message_text=message_text)
except DATABASE_ERROR_BASE_CLASS:
raise CrowdLibInternalError("Could not record sending of message to worker(s) in local DB. (error#2191)\n"
+ repr(sys.exc_info()[1]))
def get_workers(self, since=None, until=None, title_re=None):
worker_ids_seen_so_far = set()
for hit_type in self.get_hit_types():
for hit in hit_type.hits:
for assignment in hit.assignments:
if self._assignment_matches_criteria(assignment=assignment, since=since, until=until, title_re=title_re):
worker = assignment.worker
worker_id = worker.id
if worker_id not in worker_ids_seen_so_far:
worker_ids_seen_so_far.add(worker_id)
yield worker
def get_assignments(self, since=None, until=None, title_re=None):
for hit in self.get_hits(since=since, until=until, title_re=title_re):
for asg in self.get_assignments_for_hit(hit):
if self._assignment_matches_criteria(assignment=asg, since=since, until=until, title_re=title_re):
yield asg
def get_assignments_for_hit(self, hit):
# This doesn't refetch assignments just because they're not finalized, yet.
# At this stage, all we should care about is if we have all of them,
# not the status. That logic is handled by the Assignment class when
# you call the relevant properties.
# NOTE: This function may be called as a result of aggregate HIT properties,
# such as hourly_rate and average_time_taken.
BE_LAZY = True # Don't re-download assignments for any HIT. The only reason to
# do that would be to see the status (i.e., approved, rejected, submitted)
# and that can/should be done by Assignment, lazily.
assignments_dict = {}
max_assignments = hit.max_assignments
if not BE_LAZY:
expected_num_finalized = hit.num_completed
def seems_up_to_date():
if BE_LAZY:
result = (len(assignments_dict) == (max_assignments - hit.num_available - hit.num_pending))
else:
# Helper: Return true iff we already have all assignments for this HIT.
# This can be determined by comparing the assignments seen so far with
# the metrics in the HIT, such as num_pending, num_completed, and
# num_available.
# Number of assignments that are final (already approved or rejected)
num_finalized = sum(1 for asg in assignments_dict.values() if asg.is_final)
# Have we received and finalized (approved or rejected) all of the HITs we asked for?
have_all = (num_finalized == max_assignments)
# ??? What's this next statement for? ???
all_appear_current = (num_finalized == expected_num_finalized)
result = (have_all and all_appear_current)
return result
def handle_assignment_record(assignment_record, info_from_db):
# If info came from DB, then probably don't need to write it back to the DB, unless
# something changed.
# ??? -- I don't believe anything would ever change. You just read it out of the DB.
# The Assignment constructor doesn't make any changes. That code is probably unnecessary
# if the code came from the DB. The only aspects of an Assignment that could change
# are approval_time, rejection_time, assignment_status, and requester_feedback.
# TODO: Probably remove the unnecessary code. Too paranoid right now.
write_to_db = not info_from_db
assignment_id = assignment_record.assignment_id
if assignment_id in assignments_dict:
assignment = assignments_dict[assignment_id]
# [pylint] Tolerate access to protected member _update_status_from_amt : pylint:disable=W0212
changed = assignment._update_status_from_amt(
assignment_status=assignment_record.assignment_status,
approval_time=assignment_record.approval_time,
rejection_time=assignment_record.rejection_time)
assert isinstance(changed, bool)
if changed:
# Something changed. Write changes back to DB.
write_to_db = True
else:
assignment = self._make_assignment(assignment_record=assignment_record, hit=hit)
assignments_dict[assignment.id] = assignment
self._memory_cache.put_assignment(assignment)
if write_to_db:
self._db.put_assignment(assignment)
return assignment
# 1. Try the memory cache.
for asg in self._memory_cache.get_assignments_for_hit_id(hit_id=hit.id):
assignments_dict[asg.id] = asg
# 2. Add in whatever we have in the DB, if needed.
if not seems_up_to_date():
for assignment_record in self._db.get_assignment_records(hit_id=hit.id):
asg = handle_assignment_record(assignment_record=assignment_record, info_from_db=True)
# 3. Try server.
if not seems_up_to_date():
for assignment_record in self._server.get_assignments_for_hit(hit=hit):
asg = handle_assignment_record(assignment_record=assignment_record, info_from_db=False)
if not seems_up_to_date():
if BE_LAZY:
# for asg in sorted(assignments_dict.values(), key=lambda asg:asg.id):
# log( "assignment_id=%s worker_id=%s submitted=%s"%(asg.id,asg.worker.id,asg.submit_time) )
raise AMTDataError(
"Number of assignments returned from AMT doesn't match the predicted count in the HIT structure. " \
"len(assignments_dict)==%d, max_assignments==%d, hit.num_available==%s, hit.num_pending==%d"% \
(len(assignments_dict), max_assignments, hit.num_available, hit.num_pending) )
else:
raise AMTDataError(
"Number of assignments returned from AMT doesn't match the predicted count in the HIT structure." )
assignments = assignments_dict.values()
assignments = sorted(assignments, key=lambda asg:asg.submit_time)
assignments = tuple(assignments)
return assignments
def force_expire_hit( self, hit ):
self._server.force_expire_hit(hit_id=hit.id)
approximate_expiration_time = now_local()
changes = hit._set_force_expired_from_amt( # [pylint] accessing friend method : pylint:disable=W0212
approximate_expiration_time=approximate_expiration_time)
if changes:
self._db.update_hit(hit_id=hit.id, **changes)
def reject_assignment(self, assignment, reason, rejection_time):
# reason may be None
self._server.reject_assignment(assignment, reason)
self._db.update_assignment_rejected(assignment.id, rejection_time)
def approve_assignment(self, assignment, requester_feedback, approval_time):
# requester_feedback may be None
self._server.approve_assignment(assignment, requester_feedback)
self._db.update_assignment_approved(assignment.id, approval_time)
def reject_assignment_id(self, assignment_id, reason, rejection_time):
# reason may be None
self._server.reject_assignment(assignment_id, reason)
self._db.update_assignment_rejected(assignment_id, rejection_time)
def approve_assignment_id(self, assignment_id, requester_feedback, approval_time):
# requester_feedback may be None
self._server.approve_assignment(assignment_id, requester_feedback)
self._db.update_assignment_approved(assignment_id, approval_time)
def grant_bonus(self, assignment_id, worker_id, amount, currency, reason):
# reason may be None
if currency is None:
currency = self._default_currency
self._server.grant_bonus(assignment_id, worker_id, amount, currency, reason)
payment_time = now_local()
self._db.record_worker_bonus(worker_id, assignment_id, amount, currency, payment_time, reason)
def get_bonuses(self, assignment_id=None, worker_id=None):
for bonus_record in self._db.get_bonuses(assignment_id, worker_id):
bonus = Bonus(
assignment_id=bonus_record.assignment_id,
worker_id=bonus_record.worker_id,
amount=bonus_record.amount,
currency=bonus_record.currency,
payment_time=bonus_record.payment_time,
reason=bonus_record.reason,
amt=self
)
yield bonus
def get_account_balance( self ):
account_balance = self._server.get_account_balance()
if account_balance.currency_code != self._default_currency:
raise ValueError("Unexpected currency %s doesn't match %s"%(account_balance.currency_code, self._default_currency))
return account_balance.amount
def start_notifications(self, url): # DO NOT USE - new feature in progress, not fully implemented
active_hit_types = set()
for hit in self.get_hits():
if hit.hit_status != HIT.HIT_STATUS_DISPOSED and hit.num_pending + hit.num_available > 0:
active_hit_types.add( hit.hit_type ) # TODO: Do as a batch operation, for efficiency.
assert len(active_hit_types) == len(set(ht.id for ht in active_hit_types))
for hit_type in active_hit_types:
print( "Active HIT Type: " + hit_type.id )
self.set_hit_type_notification(hit_type_id=hit_type.id, address=url, transport="REST", event_type=AMTServer.NOTIFICATION_EVENT_TYPES_ALL)
# self._db.set_notification_started(url)
# self.send_test_notification(url)
def stop_notifications(self): # DO NOT USE - new feature in progress, not fully implemented
for hit_type_id in self._db.get_notification_hit_type_ids_registered():
self.set_hit_type_notification(hit_type_id=hit_type_id, address=None, transport=None, event_type=AMTServer.NOTIFICATION_EVENT_TYPES_ALL)
self._db.set_notification_stopped()
def _get_active_hit_type_ids(self):
active_hit_type_ids = set()
for hit in self.get_hits():
if hit.hit_status != HIT.HIT_STATUS_DISPOSED and hit.num_pending + hit.num_available > 0:
active_hit_type_ids.add(hit.hit_type.id)
def get_current_notification_events_from_cgi(self, cgi_fields=None):
if cgi_fields is None:
cgi_fields = cgi.FieldStorage()
cgi_fields = dict((k, cgi_fields.getfirst(k, None)) for k in cgi_fields.keys())
# service = "AWSMechanicalTurkRequesterNotification"
# operation = "Notify"
method = cgi_fields.get("method", None)
version = cgi_fields.get("Version", None)
timestamp = cgi_fields.get("Timestamp", None)
signature = cgi_fields.get("Signature", None)
if (method is None) or (method != "Notify") or (version is None) or (timestamp is None) or (signature is None):
raise AMTNotificationNotAvailable("Notification fields are not complete. (7010)")
notification_events = []
for i in range(1,1000): # normally, this will quit after 1 or a few iterations.
stem = "Event.%d."%i
event_type = cgi_fields.get(stem+"EventType", None)
event_time = cgi_fields.get(stem+"EventTime", None)
hit_type_id = cgi_fields.get(stem+"HITTypeId", None)
hit_id = cgi_fields.get(stem+"HITId", None)
assignment_id = cgi_fields.get(stem+"AssignmentId", None)
if (event_type is None) and (event_time is None) and (hit_type_id is None) and (hit_id is None):
break
elif (event_type is not None) and (event_time is not None):
assert (hit_type_id is None) == (hit_id is None)
event_time = to_date_time(event_time)
event_type = to_unicode(event_type)
if hit_type_id is not None:
hit_type_id = to_unicode(hit_type_id)
hit_id = to_unicode(hit_id)
hit = self.get_hit(hit_id)
hit_type = hit.hit_type
if assignment_id is not None:
assignment_id = to_unicode(assignment_id)
assignment= self.get_assignment(assignment_id, hit_id=hit_id)
else:
assignment= None
notification_event = NotificationEvent(event_type, event_time, hit_type, hit, assignment)
notification_events.append(notification_event)
self._db.set_notification_hit_type_received(hit_type_id)
else:
self._db.set_notification_test_received()
# If we get event_type and event_time but not hit_type_id and hit_id, it's probably a Ping or
# some other test event. Just ignore it.
self.sync_with_amt()
else:
raise AMTNotificationNotAvailable("Notification fields are not complete. (7011)", (event_type, event_type, hit_type_id, hit_id, i))
return notification_events
def send_test_notification(self, address, event_type=None): # event_type defaults to AMTServer.NOTIFICATION_EVENT_TYPE_SUBMITTED
if event_type is None:
event_type = AMTServer.NOTIFICATION_EVENT_TYPE_SUBMITTED
assert event_type in AMTServer.NOTIFICATION_EVENT_TYPES_ALL
transport = self._guess_notification_address_transport(address)
self._server.send_test_event_notification(address=address, transport=transport, event_type=event_type)
def set_hit_type_notification(self, hit_type_id, address, transport, event_type):
assert (address is None)==(transport is None)
if event_type is None:
event_type = AMTServer.NOTIFICATION_EVENT_TYPE_SUBMITTED
if event_type in AMTServer.NOTIFICATION_EVENT_TYPES_ALL:
event_types = (event_type,)
elif is_sequence(event_type):
event_types = event_type
assert is_sequence(event_types) and all(et in AMTServer.NOTIFICATION_EVENT_TYPES_ALL for et in event_types), event_types
if address is None:
self._server.set_hit_type_notification_inactive(hit_type_id)
# self._db.put_hit_type_notification_url(hit_type_id=hit_type_id, url=None)
self._db.set_notification_hit_type_unregistered(hit_type_id)
else:
transport = self._guess_notification_address_transport(address)
self._server.set_hit_type_notification(hit_type_id=hit_type_id, address=address, transport=transport, event_types=event_type)
# self._db.put_hit_type_notification_url(hit_type_id=hit_type_id, url=address)
self._db.set_notification_hit_type_registered(hit_type_id)
def _guess_notification_address_transport(self, address):
# SOAP is not currently supported.
if address.startswith("http://") or address.startswith("https://"):
return "REST"
elif address.count("@")==1 and address.count(".") >= 1:
return "Email"
else:
raise ValueError("%s does not appear to be either a valid URL or Email address."%address)
def save_file_answer_content(self, assignment_id, question_id, path):
url = self._server.get_file_upload_url(assignment_id=assignment_id, question_id=question_id)
urlretrieve_py23_compatible(url=url, filename=path)
def get_file_answer_content(self, assignment_id, question_id):
url = self._server.get_file_upload_url(assignment_id=assignment_id, question_id=question_id)
infile = urlopen_py23_compatible(url=url)
content = infile.read()
infile.close()
return content
#vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
# EXPERIEMENTAL CODE (do not delete, yet)
#
# def get_hit_type_notification_url(self, hit_type_id):
# url = self._db.get_hit_type_notification_url(hit_type_id)
# return url
# def set_hit_type_notification_url(self, hit_type_id, url):
# # Per documentation:
# # "After you make the call to SetHITTypeNotification, it can take up to five minutes for
# # changes to a HIT type's notification specification to take effect."
# # http://docs.amazonwebservices.com/AWSMturkAPI/2008-08-02/ApiReference_SetHITTypeNotificationOperation.html
# url = self._db.put_hit_type_notification_url(hit_type_id, url)
PK ፆHh-w ! crowdlib-0.8.50/crowdlib/AMTDB.py# vim: set fileencoding=utf-8 noexpandtab:
# Created at the University of Maryland, Human-Computer Interaction Lab
# (c) Copyright 2011 Alexander J. Quinn
# Licensed under the MIT License (see doc/LICENSE.txt)
'''
@author: Alexander J. Quinn
@contact: aq@purdue.edu
@since: January 2010
'''
# FIXME: The use of always_commit probably breaks any use of transactions. Also, we
# have no way to ensure all transactions are properly committed. (11-17-2013)
# TODO: Store custom QualificationType info locally. (12/3/2013)
from __future__ import division, with_statement
import os, os.path, datetime, sqlite3, datetime, threading, collections
from crowdlib.utility import format_datetime_to_iso_utc, is_string, is_collection_of_strings, is_unicode, literal_eval, log, namedtuple, now_iso_utc, parse_iso_utc_to_datetime_local, total_seconds
try:
import pysqlite2._sqlite as sqlite3
except ImportError:
import sqlite3 # [pylint] reimport sqlite : pylint:disable=W0404
DATABASE_ERROR_BASE_CLASS = sqlite3.Error
_DBG_PRINT_QUERY_ON_ERROR = False
_UNDEFINED = object()
class AMTDB( object ):
'''
Encapsulates the database (currently implemented with SQLite3) and manages storing results and information
about HITs.
'''
_SCHEMA_PATH = os.path.join(os.path.dirname(__file__), "sql/schema.sql")
_thread_local_data = threading.local()
_connections_weak_refs = []
def open(self): # called by AMT class
# Don't actually open. Just set a flag so we don't need to commit after
# every non-SELECT query. Trust the caller to close.
self.always_commit = False
def close(self): # called by AMT class
# Don't actually close. Just commit and set a flag so we commit after
# every non-SELECT query.
# from crowdlib.utility import log
# if self._get_all_connections_in_pool():
# self.commit()
self.always_commit = True
def _log(self, s):
if self._verbose:
if not is_string(s):
s = repr(s)
s = "%s : %s"%(threading.current_thread().ident, s)
log(s)
def _make_where_clause_for_alphanumeric_ids(self, ids, column_name):
assert is_collection_of_strings(ids), ids
MAX_ID_LENGTH = 64
if len(ids) == 0:
raise ValueError("ids is empty")
bad_ids = [i for i in ids if not i.isalnum() and 1 <= len(id) >= MAX_ID_LENGTH]
if bad_ids:
raise ValueError("Bad ID(s) found: %r"%bad_ids)
elif len(ids) == 1:
the_id, = ids
return '%s="%s"'%(column_name, the_id)
else:
assert len(ids) >= 2
ids_quoted = (('"' + i + '"') for i in ids)
return column_name + ' in (' + ','.join(ids_quoted) + ')'
def _get_connection(self):
current_thread_ident = threading.current_thread().ident
thread_local_data = self.__class__._thread_local_data # [pylint] access private member : pylint:disable=W0212
try:
conn = thread_local_data.connection
self._log("AMTDB._get_connection() : [R] Reused connection for thread %r."%current_thread_ident)
except AttributeError:
conn = sqlite3.connect(self._filename, detect_types=sqlite3.PARSE_DECLTYPES) # >= 11/17/2013
thread_local_data.connection = conn
conn.row_factory = sqlite3.Row
self._log("AMTDB._get_connection() : [C] Created connection for thread %r."%current_thread_ident)
return conn
def __init__(self, filename, verbose=False, always_commit=False):
self._filename = filename
self._dirty_tables = set()
self._verbose = verbose
#self._conn = None
self._conn_pool_by_thread_ident = {}
self.always_commit = always_commit
# If True, then always commit after any query other than SELECT.
self.is_new = (not os.path.isfile(filename)) or (os.path.getsize(filename)==0)
self._log( "Opened DB at %s"%filename )
if self.is_new:
conn = self._get_connection()
self._log( "Creating tables...." )
with open(self._SCHEMA_PATH, "rb") as infile:
schema_sql = infile.read().decode("utf8")
conn.executescript(schema_sql)
#conn.commit() # disabled on 11/19/2013 FIXME
#TODO: Make sure it syncs after the DB is created. Maybe require
# an initial setup command or just do transparently in AMT.py.
def _insert(self, table_name, vals, on_conflict=None):
# Credit: Taken from Alex's stevenjessebernstein.com project.
assert on_conflict is None or on_conflict.lower() in ("rollback", "abort", "fail", "ignore", "replace")
self._log(">>> INSERT into %s"%table_name)
sql_parts = ["insert"]
if on_conflict is not None:
sql_parts += ["or", on_conflict]
sql_parts += ["into", table_name]
cols = []
params = []
for k,v in vals.items():
cols.append(k)
params.append(v)
cols_list = ", ".join(cols)
placeholders_list = ", ".join(("?",) * len(vals))
sql_parts += ["(", cols_list, ")", "values", "(", placeholders_list, ");"]
sql = " ".join(sql_parts)
cur = self._query_get_cursor(sql, *params) # [pylint] tolerate *params magic : pylint:disable=W0142
row_id = cur.lastrowid
self._log("<<< /INSERT into %s"%table_name)
return row_id
################################################################################
# Mechanics
#
def commit(self):
'''
Force the database to commit.
@return: nothing
'''
conn = self._get_connection()
conn.commit()
self._log( "Did DB commit" )
def query(self, sql, *params):
# Do a raw SQL query against the SQLite database. This is primarily used internally
# but can also be used to run specialized reports.
return self._query_get_cursor(sql, *params).fetchall()
def _dump_query_on_error(self, sql, params):
indent_spaces = 4
lines = [l.strip() for l in sql.splitlines()]
lines = [((" " * indent_spaces) + l) for l in lines if len(l)>0]
sql = "\n".join(lines)
if _DBG_PRINT_QUERY_ON_ERROR:
print("")
print("="*80)
print("SQL:")
print(sql)
if len(params)>0:
print("-"*40)
print("Parameters:")
for i,param in enumerate(params):
num_str = "%d."%(i+1)
indent_str = " "*(indent_spaces - len(num_str))
print( num_str + indent_str + repr(param) )
print("="*80)
print("")
def _query_get_cursor(self, sql, *params):
# Do a raw SQL query against the SQLite database. This is primarily used internally
# but can also be used to run specialized reports.
#
# sql (string) -- Raw SQL query to be run
#
# Returns: Cursor object from the sqlite3 module.
conn = self._get_connection()
while True:
try:
cursor = conn.execute(sql, params)
except sqlite3.InterfaceError:
self._dump_query_on_error(sql, params)
raise
except sqlite3.OperationalError:
self._dump_query_on_error(sql, params)
raise
except ValueError:
e = sys.exc_info()[1]
print(sql)
print(repr(params))
print( repr(e.args) )
raise
break
# Commit after every non-SELECT query iff always_commit==True
if self.always_commit and not sql.lower().lstrip().startswith("select"):
self.commit()
return cursor
def _count(self, table_name, where, *params):
import re
if re.match(r"^[a-zA-Z_]+$", table_name) is None: # for safety
raise ValueError("Table name must be a regular identifier")
if re.match(r"^[a-zA-Z_ =]+$", where) is None: # for safety
raise ValueError("Where clause must be nothng but regular identifiers, equal signs, and spaces - very simple")
sql = "select count(*) from " + table_name + " where " + where + ";"
rows = self.query(sql, *params)
return len(rows)
def _query_single_value(self, sql, *params):
rows = self.query(sql,*params)
if len(rows)==0:
return None
elif len(rows)==1:
return rows[0][0]
else:
assert False, "Shouldn't have >1 row for query: %s %s"%(sql,repr(params))
################################################################################
# HITs / HITTypes
#
def _make_hit_record_from_hit_row(self, row):
"""Returns a HITRecordDB object"""
from crowdlib.HITRecord import HITRecordDB
hit_record = HITRecordDB(
creation_time=_parse_datetime(row[str("creation_time")]),
expiration_time=_parse_datetime(row[str("expiration_time")]),
hit_id=row[str("hit_id")],
hit_review_status=row[str("hit_review_status")],
hit_status=row[str("hit_status")],
hit_type_id=row[str("hit_type_id")],
max_assignments=row[str("max_assignments")],
num_pending=row[str("num_pending")],
num_available=row[str("num_available")],
num_completed=row[str("num_completed")],
question=row[str("question_xml")],
requester_annotation=row[str("requester_annotation")],
approximate_expiration_time=_parse_datetime(row[str("approximate_expiration_time")]),
)
return hit_record
def get_hit_records(self, hit_type_id=None, hit_id=None):
"""Returns an iterable (generator) of HITRecordDB objects."""
assert (hit_type_id is None) or (hit_id is None)
hit_records = []
sql = 'select * from hit'
if hit_type_id is not None:
sql += ' where hit_type_id=?'
params = (hit_type_id,)
elif hit_id is not None:
sql += ' where hit_id=?'
params = (hit_id,)
else:
params = ()
rows = self.query(sql, *params) # [pylint] tolerate *params magic : pylint:disable=W0142
for row in rows:
hit_record = self._make_hit_record_from_hit_row(row=row)
#yield hit_record
hit_records.append(hit_record) # changed 11/19/2013
return tuple(hit_records)
def get_hit_ids(self):
"""Returns a set of all HIT IDs in the database."""
sql = 'select hit_id from hit'
rows = self.query(sql)
hit_ids = set(row[0] for row in rows)
return hit_ids
def get_hit_record(self, hit_id):
from crowdlib.all_exceptions import HITNotFoundException
hit_records = tuple(self.get_hit_records(hit_id=hit_id))
if len(hit_records)==1:
return hit_records[0]
else:
raise HITNotFoundException(hit_id)
def put_hit(self,hit,replace_if_exists=True):
if replace_if_exists:
on_conflict = "replace"
else:
on_conflict = "abort"
try:
self._insert("hit", {
"hit_id" : hit.id,
"hit_type_id" : hit.hit_type.id,
"creation_time" : _format_datetime(hit.creation_time),
"hit_status" : hit.hit_status,
"expiration_time" : _format_datetime(hit.expiration_time),
"max_assignments" : hit.max_assignments,
"requester_annotation" : hit.requester_annotation,
"hit_review_status" : hit.hit_review_status,
"num_pending" : hit.num_pending,
"num_available" : hit.num_available,
"num_completed" : hit.num_completed,
"question_xml" : hit.question_xml,
"approximate_expiration_time" : _format_datetime(hit.approximate_expiration_time)
}, on_conflict=on_conflict)
self._log( "put_hit: "+hit.id )
except ValueError: # should be an sqlite exception
self.update_hit(
hit_id=hit.id,
hit_status=hit.hit_status,
creation_time=hit.creation_time,
expiration_time=hit.expiration_time,
num_pending=hit.num_pending,
num_available=hit.num_available,
num_completed=hit.num_completed,
max_assignments=hit.max_assignments,
approximate_expiration_time=hit.approximate_expiration_time
)
self._log( "put_hit (as update): "+hit.id )
def put_hit_type(self, hit_type):
self._log( "put_hit_type: %s (\"%s\")"%(hit_type.id, hit_type.title) )
self._insert("hit_type", {
"hit_type_id" : hit_type.id,
"title" : hit_type.title,
"description" : hit_type.description,
"keywords" : _format_tuple(hit_type.keywords),
"reward_amount" : hit_type.reward,
"reward_currency" : hit_type.currency,
"assignment_duration" : _format_duration(hit_type.time_limit),
"auto_approval_delay" : _format_duration(hit_type.autopay_delay),
}, on_conflict="ignore")
for qr in hit_type.qualification_requirements:
self._insert("hit_type_qualification_requirement", {
"hit_type_id" : hit_type.id,
"qualification_type_id" : qr.qualification_type_id,
"comparator" : qr.comparator,
"integer_value" : qr.integer_value,
"locale_value" : qr.locale_value,
"required_to_preview" : _format_bool(qr.required_to_preview),
}, on_conflict="replace")
def get_qualification_requirement_records_for_hit_type_id(self, hit_type_id):
from crowdlib.QualificationRequirementRecord import QualificationRequirementRecord
assert is_string(hit_type_id) and len(hit_type_id)>2
sql = "select * from hit_type_qualification_requirement where hit_type_id=?;"
rows = self.query(sql, hit_type_id)
qualification_requirement_records = []
for row in rows:
integer_value = row[str("integer_value")],
locale_value = row[str("locale_value")],
assert (integer_value is None) ^ (locale_value is not None)
qreq = QualificationRequirementRecord(
qualification_type_id = row[str("qualification_type_id")],
comparator = row[str("comparator")],
integer_value = integer_value,
locale_value = locale_value,
required_to_preview = _parse_bool(row[str("required_to_preview")])
)
qualification_requirement_records.append(qreq)
return qualification_requirement_records
def _make_hit_type_record_from_hit_type_row(self, row):
from crowdlib.HITRecord import HITTypeRecordDB
from crowdlib.Reward import Reward
reward = Reward(
amount = row[str("reward_amount")],
currency_code = row[str("reward_currency")],
formatted_price=None
)
hit_type_id = row[str("hit_type_id")]
qual_req_records = self.get_qualification_requirement_records_for_hit_type_id(hit_type_id=hit_type_id)
hit_type_record = HITTypeRecordDB(
assignment_duration = _parse_duration(row[str("assignment_duration")]),
auto_approval_delay = _parse_duration(row[str("auto_approval_delay")]),
description = row[str("description")],
hit_type_id = row[str("hit_type_id")],
keywords = _parse_tuple(row[str("keywords")]),
qualification_requirements=qual_req_records,
reward = reward,
title = row[str("title")],
)
return hit_type_record
def _get_hit_type_records(self, hit_type_id=None): # GENERATOR
sql = "select * from hit_type"
if hit_type_id is None:
rows = self.query(sql)
else:
sql += " where hit_type_id=?"
rows = self.query(sql, hit_type_id)
for row in rows:
hit_type_record = self._make_hit_type_record_from_hit_type_row(row=row)
yield hit_type_record
def get_hit_type_records(self): # GENERATOR
return self._get_hit_type_records()
def set_notification_stopped(self):
self.query("delete from notification_status")
def set_notification_started(self, url):
self.set_notification_stopped()
self.query("update notification_status set url=?", url)
def set_notification_test_received(self):
self.query("update notification_status set test_received_time=?", now_iso_utc())
def set_notification_hit_type_received(self, hit_type_id):
self.query("update notification_hit_type set last_received_time=? where hit_type_id=?", now_iso_utc(), hit_type_id)
def set_notification_hit_type_registered(self, hit_type_id):
self._insert("notification_hit_type", dict(
hit_type_id=hit_type_id,
registered_time=now_iso_utc(),
is_connected=_format_bool(False),
last_received_time=None,
), on_conflict="replace")
def get_known_hit_ids_except_hit_status(self, hit_status):
rows = self.query("select hit_id from hit where hit_status != ?", hit_status) # no particular order
hit_ids = tuple(row[0] for row in rows)
assert len(hit_ids) == len(set(hit_ids))
return hit_ids
def set_hit_statuses(self, hit_ids, hit_status):
if len(hit_ids):
self.update_hits(hit_ids, hit_status=hit_status)
# where_clause = self._make_where_clause_for_alphanumeric_ids(ids=hit_ids, column_name="hit_id")
# sql = "update hit set hit_status=? where " + where_clause
# self.query(sql, hit_status)
def set_notification_hit_type_unregistered(self, hit_type_id):
self.query("delete from notification_hit_type where hit_type_id=?", hit_type_id)
def set_notification_hit_type_is_connected(self, hit_type_id):
self.query("update notification_hit_type set is_connected=? where hit_type_id=?", _format_bool(True), hit_type_id)
def get_notification_hit_type_registration(self, hit_type_id):
from crowdlib.all_exceptions import HITTypeNotFoundException
rows = self.query("select registered_time, is_connected, last_received_time from notification_hit_type where hit_type_id=?",
hit_type_id)
if len(rows) == 1:
test_received_time = _parse_datetime(self._query_single_value("select test_received_time from notification_status"))
return NotificationHitTypeRegistration(
test_received_time=test_received_time,
registered_time = _parse_datetime(rows[0][0]),
is_connected = _parse_bool(rows[0][1]),
last_received_time = _parse_datetime(rows[0][2]),
)
elif len(rows) == 0:
raise HITTypeNotFoundException(hit_type_id, "does not appear to be registered for notifications")
else:
assert False, "found %d rows but expected either 0 or 1"%len(rows)
def get_notification_hit_type_ids_registered(self):
rows = self.query("select hit_type_id from notification_hit_type")
hit_type_ids = tuple(row[0] for row in rows)
return hit_type_ids
def get_hit_type_record(self, hit_type_id):
from crowdlib.all_exceptions import HITTypeNotFoundException
hit_type_records = tuple(self._get_hit_type_records(hit_type_id))
if len(hit_type_records)==1:
return hit_type_records[0]
elif len(hit_type_records)==0:
raise HITTypeNotFoundException(hit_type_id)
else:
assert False, "Should not have multiple rows in DB for HIT type ID %s"%hit_type_id
def record_worker_message(self, worker_id, send_time, subject, message_text):
self._insert("sent_mail", {
"worker_id":worker_id,
"send_time":_format_datetime(send_time),
"subject":subject,
"message_text":message_text
})
def record_worker_bonus(self, worker_id, assignment_id, amount, currency, payment_time, reason):
self._insert("bonus", {
"worker_id" : worker_id,
"assignment_id" : assignment_id,
"amount" : amount,
"currency" : currency,
"payment_time" : _format_datetime(payment_time),
"reason" : reason
})
################################################################################
# Assignments
#
def put_assignment(self, assignment):
self._insert("assignment", {
"assignment_id" : assignment.id,
"worker_id" : assignment.worker.id,
"hit_id" : assignment.hit.id,
"assignment_status" : assignment.assignment_status,
"auto_approval_time" : _format_datetime(assignment.autopay_time),
"accept_time" : _format_datetime(assignment.accept_time),
"submit_time" : _format_datetime(assignment.submit_time),
"approval_time" : _format_datetime(assignment.approval_time),
"rejection_time" : _format_datetime(assignment.rejection_time),
"requester_feedback" : assignment.requester_feedback
}, on_conflict="ignore")
for answer in assignment.answers:
self.put_answer( answer, assignment.id )
def count_assignments_for_hit_id(self, hit_id):
sql = "select count(*) from assignment where hit_id=?;"
num_asgs = self.query(sql, hit_id).fetch_all()[0]
return num_asgs
def update_assignment_approved(self, assignment_id, approval_time):
sql = "update assignment set approval_time=?, assignment_status='Approved' where assignment_id=?;"
self.query(sql, _format_datetime(approval_time), assignment_id)
def update_assignment_rejected(self, assignment_id, rejection_time):
sql = "update assignment set rejection_time=?, assignment_status='Rejected' where assignment_id=?;"
self.query(sql, _format_datetime(rejection_time), assignment_id)
def update_assignment(self, assignment_id, assignment_status=None, submit_time=None, approval_time=None, rejection_time=None, requester_feedback=None):
sql = ("update assignment set ")
params = []
potential_params = (
("assignment_status",assignment_status),
("submit_time",_format_datetime(submit_time)),
("approval_time",_format_datetime(approval_time)),
("rejection_time",_format_datetime(rejection_time)),
("requester_feedback",requester_feedback)
)
usable_params = tuple((pnm,pval) for (pnm,pval) in potential_params if pval is not None)
assert len(usable_params) > 0
sql += ", ".join("%s=?"%pnm for (pnm,pval) in usable_params)
sql += " where assignment_id=?;"
params = tuple(pval for (pnm,pval) in usable_params) + (assignment_id,)
self.query(sql, *params) # [pylint] tolerate *params magic : pylint:disable=W0142
def update_hits(self, hit_ids, hit_status=_UNDEFINED, creation_time=_UNDEFINED, expiration_time=_UNDEFINED, num_pending=_UNDEFINED,
num_available=_UNDEFINED, num_completed=_UNDEFINED, max_assignments=_UNDEFINED, approximate_expiration_time=_UNDEFINED):
sql = ("update hit set ")
# params = []
set_parts = []
query_params = []
if hit_status is not None and hit_status is not _UNDEFINED:
set_parts.append("hit_status=?")
query_params.append(hit_status)
if creation_time is not None and creation_time is not _UNDEFINED:
set_parts.append("creation_time=?")
query_params.append(_format_datetime(creation_time))
if expiration_time is not None and expiration_time is not _UNDEFINED:
set_parts.append("expiration_time=?")
query_params.append(_format_datetime(_format_datetime(expiration_time)))
if num_pending is not None and num_pending is not _UNDEFINED:
set_parts.append("num_pending=?")
query_params.append(num_pending)
if num_available is not None and num_available is not _UNDEFINED:
set_parts.append("num_available=?")
query_params.append(num_available)
if num_completed is not None and num_completed is not _UNDEFINED:
set_parts.append("num_completed=?")
query_params.append(num_completed)
if max_assignments is not None and max_assignments is not _UNDEFINED:
set_parts.append("max_assignments=?")
query_params.append(max_assignments)
if approximate_expiration_time is not _UNDEFINED:
if approximate_expiration_time is None:
set_parts.append("approximate_expiration_time=NULL")
else:
set_parts.append("approximate_expiration_time=?")
query_params.append(_format_datetime(approximate_expiration_time))
assert len(set_parts) >= 1
assert len(set_parts) in (len(query_params), len(query_params)-1) # might have one less due to approximate_expiration_time
sql += ", ".join(set_parts)
sql += " where " + self._make_where_clause_for_alphanumeric_ids(ids=hit_ids, column_name="hit_id")
self.query(sql, *query_params) # [pylint] Tolerate *params magic : pylint:disable=W0142
#sql += ", ".join("%s=?"%pnm for (pnm,pval) in usable_params)
#sql += "where hit_id=?;"
#params = tuple(pval for (pnm,pval) in usable_params) + (hit_id,)
#self.query(sql, *params) # [pylint] Tolerate *params magic : pylint:disable=W0142
def update_hit(self, hit_id, *args, **kwargs):
self.update_hits(hit_ids=(hit_id,), *args, **kwargs)
def set_hit_approximate_expiration_time(self, hit_id, approximate_expiration_time):
self.update_hit(hit_id=hit_id, approximate_expiration_time=approximate_expiration_time)
#sql.query("update hit set approximate_expiration_time=? where hit_id=?", (approximate_expiration_time, hit_id))
def update_worker_blocked(self, worker_id, reason):
# Reason may be None
self._insert("worker_block", {"worker_id":worker_id, "reason":reason}, on_conflict="replace")
def update_worker_unblocked(self, worker_id):
sql = "delete from worker_block where worker_id=?;"
self.query(sql, worker_id)
def is_worker_id_blocked(self, worker_id):
num_rows = self._count("worker_block", "worker_id=?", worker_id)
assert num_rows in (0,1), "Shouldn't have >1 row for a given worker!"
return (num_rows==1)
def get_worker_block_reason(self, worker_id):
from crowdlib.all_exceptions import WorkerNotFoundException
reason = self._query_single_value("select reason from worker_block where worker_id=?;", worker_id)
if reason is None:
raise WorkerNotFoundException(worker_id)
return reason
def get_hit_id_for_assignment_id(self,assignment_id):
from crowdlib.all_exceptions import AssignmentNotFoundException
hit_id = self._query_single_value("select hit_id from assignment where assignment_id=?", assignment_id)
if hit_id is None:
raise AssignmentNotFoundException(assignment_id)
return hit_id
def get_hit_type_id_for_hit_id(self, hit_id):
from crowdlib.all_exceptions import HITNotFoundException
hit_type_id = self._query_single_value("select hit_type_id from hit where hit_id=?;", hit_id)
if hit_type_id is None:
raise HITNotFoundException(hit_id)
return hit_type_id
def get_assignment_record(self, assignment_id):
from crowdlib.all_exceptions import AssignmentNotFoundException
assignment_records = tuple(self.get_assignment_records(assignment_id=assignment_id))
if len(assignment_records)==1:
assignment_record = assignment_records[0]
return assignment_record
else:
raise AssignmentNotFoundException(assignment_id=assignment_id)
def get_assignment_records(self, hit_id=None, assignment_id=None): # GENERATOR
# If assignment_id is specified, we will filter and retrieve only that assignment.
from crowdlib.AssignmentRecord import AssignmentRecord
from crowdlib.AnswerRecord import AnswerRecord
assert (hit_id is None) or (assignment_id is None), "Do not pass both assignment_id and hit_id - one or the other, please."
assert (hit_id is None) or is_unicode(hit_id), hit_id
assert (assignment_id is None) or is_unicode(assignment_id)
sql = """\
select
assignment.assignment_id as assignment_id,
assignment.worker_id as worker_id,
assignment.hit_id as hit_id,
assignment.assignment_status as assignment_status,
assignment.auto_approval_time as auto_approval_time,
assignment.accept_time as accept_time,
assignment.submit_time as submit_time,
assignment.approval_time as approval_time,
assignment.rejection_time as rejection_time,
assignment.requester_feedback as requester_feedback,
answer.task_id as task_id,
answer.answer_type as answer_type,
answer.free_text as free_text,
answer.selection_id as selection_id,
answer.other_selection_text as other_selection_text,
answer.uploaded_file_key as uploaded_file_key,
answer.uploaded_file_size as uploaded_file_size
from
assignment
inner join answer using (assignment_id)"""
if hit_id is not None:
sql += "\n\t\t\twhere\n\t\t\t\tassignment.hit_id=?"
params = (hit_id,)
elif assignment_id is not None:
sql += "\n\t\t\twhere\n\t\t\t\tassignment.assignment_id=?"
params = (assignment_id,)
else:
params = ()
sql += "\n\t\t\torder by\n\t\t\t\tassignment.assignment_id asc"
last_assignment_id = None
rows = self.query(sql, *params) # [pylint] tolerate *params magic : pylint:disable=W0142
assignment_record = None
for row in rows:
assignment_id = row[str("assignment_id")]
if assignment_id != last_assignment_id:
if last_assignment_id is not None:
yield assignment_record # Last assignment will be yielded from below.
answer_records = []
if hit_id is not None:
assert row[str("hit_id")]==hit_id, "%s != %s"%(repr(row[str("hit_id")]), repr(hit_id))
assignment_record = AssignmentRecord(
assignment_id = row[str("assignment_id")],
worker_id = row[str("worker_id")],
hit_id = row[str("hit_id")],
assignment_status = row[str("assignment_status")],
auto_approval_time = _parse_datetime(row[str("auto_approval_time")]),
accept_time = _parse_datetime(row[str("accept_time")]),
submit_time = _parse_datetime(row[str("submit_time")]),
approval_time = _parse_datetime(row[str("approval_time")]),
rejection_time = _parse_datetime(row[str("rejection_time")]),
requester_feedback = row[str("requester_feedback")],
answer_records = answer_records,
)
last_assignment_id = assignment_id
answer_record = AnswerRecord(
# assignment_id=assignment_id,
question_identifier=row[str("task_id")],
free_text=row[str("free_text")],
selection_identifier=row[str("selection_id")],
other_selection=row[str("other_selection_text")],
uploaded_file_key=row[str("uploaded_file_key")],
uploaded_file_size_in_bytes=row[str("uploaded_file_size")])
answer_records.append(answer_record)
if last_assignment_id is not None:
assert isinstance(assignment_record, AssignmentRecord)
assert all(isinstance(asr,AnswerRecord) for asr in assignment_record.answer_records)
yield assignment_record # Previous assignments were yielded from above.
def put_answer(self, answer, assignment_id):
# DO NOT CALL answer.assignment HERE. IT WILL CREATE INFINITE RECURSION.
from crowdlib.Answer import AnswerFreeText, AnswerSelection, AnswerUploadedFile
# from crowdlib.Answer import AnswerBlank # Why did we need this again?
answer_id = answer.question_id + "-" + assignment_id
free_text = answer.free_text if isinstance(answer,AnswerFreeText) else None
selection_id = answer.selection_id if isinstance(answer,AnswerSelection) else None
other_selection_text = answer.other_selection_text if isinstance(answer,AnswerSelection) else None
uploaded_file_key = answer.uploaded_file_key if isinstance(answer,AnswerUploadedFile) else None
uploaded_file_size = answer.uploaded_file_size if isinstance(answer,AnswerUploadedFile) else None
self._insert("answer", {
"answer_id" : answer.id,
"assignment_id" : assignment_id,
"task_id" : answer.question_id,
"answer_type" : answer.answer_type,
"free_text" : free_text,
"selection_id" : selection_id,
"other_selection_text" : other_selection_text,
"uploaded_file_key" : uploaded_file_key,
"uploaded_file_size" : uploaded_file_size,
}, on_conflict="replace")
self._log( "put_answer: %s / %s"%(answer_id, assignment_id) )
def _make_where_clause(self,conditions):
if len(conditions)==0:
return ""
else:
return "WHERE" + " AND ".join("(%s)"%cond for cond in conditions)
def get_bonuses(self, assignment_id, worker_id): # GENERATOR
from crowdlib.BonusRecord import BonusRecord
sql = """\
select assignment_id, worker_id, amount, currency, payment_time, reason
from bonus
"""
conditions = []
params = []
if assignment_id is not None:
conditions.append("assignment_id=?")
params.append(assignment_id)
if worker_id is not None:
conditions.append("worker_id=?")
params.append(worker_id)
sql += self._make_where_clause(conditions)
rows = self.query(sql, *params) # [pylint] tolerate *params magic : pylint:disable=W0142
for row in rows:
bonus_record = BonusRecord(
assignment_id=row[str("assignment_id")],
worker_id=row[str("worker_id")],
amount=row[str("amount")],
currency=row[str("currency")],
payment_time=_parse_datetime(row[str("payment_time")]),
reason=row[str("reason")],
)
yield bonus_record
NotificationHitTypeRegistration = namedtuple("NotificationHitTypeRegistration",
("test_received_time", "registered_time", "is_connected", "last_received_time"))
def _format_datetime(dt):
if dt is None:
return dt
else:
return format_datetime_to_iso_utc(dt)
def _parse_datetime(s):
if s:
return parse_iso_utc_to_datetime_local(s)
else:
return None
def _format_bool(b):
if b is None:
return None
elif b == True:
return 1
elif b == False:
return 0
else:
raise ValueError("Invalid value for bool: %r"%b)
def _parse_bool(s):
if s is None:
return None
elif s == 1:
return True
elif s == 0:
return False
else:
raise ValueError("Invalid value for bool: %r"%s)
def _format_tuple(t):
if t is None:
return t
else:
return repr(t)
def _parse_tuple(s):
if s is None:
return s
else:
return literal_eval(s)
def _format_duration(d):
if d is None:
return None
else:
return int(round(total_seconds(d),0))
def _parse_duration(d):
if d is None:
return None
else:
return datetime.timedelta(seconds=d)
#vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
# EXPERIEMENTAL CODE (do not delete, yet)
#
# def is_notification_started_and_tested(self):
# rows = self.query("select test_received_time from notification_status")
# assert len(rows) in (0, 1)
# is_started = False
# is_tested = False
# if len(rows) == 1:
# is_started = True
# is_tested = (rows[0][0] is not None)
# elif len(rows) == 0:
# is_started = False
# is_tested = False
# else:
# assert False
# return (is_started, is_tested)
# def get_hit_type_notification_url(self, hit_type_id):
# from crowdlib.all_exceptions import NotificationsAddressNotFoundException
# sql = "select url from hit_type_notification_address where hit_type_id=?"
# url = self._query_single_value(sql, hit_type_id)
# if url is None:
# raise NotificationsAddressNotFoundException(hit_type_id)
# else:
# return url
# def put_hit_type_notification_url(self, hit_type_id, url):
# if url is None:
# self.query("delete from hit_type_notification_address where hit_type_id=?", hit_type_id)
# else:
# self._insert("hit_type_notification_address", {
# "hit_type_id" : hit_type_id,
# "url" : url},
# on_conflict="abort")
# return url
#vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
# SHRAPNEL (delete any time if you don't think it will be needed)
#
# def update_hit(self, hit_id, hit_status=None, creation_time=None, expiration_time=None, num_pending=None,
# num_available=None, num_completed=None, max_assignments=None, approximate_expiration_time=None):
# sql = ("update hit set ")
# params = []
# potential_params = (
# ("hit_status",hit_status),
# ("creation_time",_format_datetime(creation_time)),
# ("expiration_time",_format_datetime(expiration_time)),
# ("num_pending",num_pending),
# ("num_available",num_available),
# ("num_completed",num_completed),
# ("max_assignments",max_assignments),
# ("approximate_expiration_time",_format_datetime(approximate_expiration_time)),
# )
# usable_params = tuple((pnm,pval) for (pnm,pval) in potential_params if pval is not None)
# assert len(usable_params) > 0
# sql += ", ".join("%s=?"%pnm for (pnm,pval) in usable_params)
# sql += "where hit_id=?;"
# params = tuple(pval for (pnm,pval) in usable_params) + (hit_id,)
# self.query(sql, *params) # [pylint] tolerate *params magic : pylint:disable=W0142
## from update_hits() ... old implementation
# potential_params = (
# ("hit_status",hit_status),
# ("creation_time",_format_datetime(creation_time) if creation_time is not _UNDEFINED else None),
# ("expiration_time",_format_datetime(expiration_time)),
# ("num_pending",num_pending),
# ("num_available",num_available),
# ("num_completed",num_completed),
# ("max_assignments",max_assignments),
# ("approximate_expiration_time",_format_datetime(approximate_expiration_time)),
# )
# usable_params = tuple((pnm,pval) for (pnm,pval) in potential_params if pval is not _UNDEFINED)
# assert not any(v is None for (k,v) in usable_params if k != "approximate_expiration_time")
#
# assert len(usable_params) > 0
# set_parts = []
# query_params = []
# for pnm,pval in potential_params:
# if pval is not _UNDEFINED:
# if pval is None:
# assert pnm == "approximate_expiration_time"
# set_parts.append("%s=NULL")
# else:
# set_parts.append( "%s=?" )
# query_params.append(pval)
# def _commit_all_connections(self):
# conns = self._get_all_connections_in_pool()
# for conn in conns:
# conn.commit()
# self._log( "Did DB commit on all %d connection(s) in connection pool"%len(conns) )
PK ՍHgYJ J . crowdlib-0.8.50/crowdlib/AMTInstanceManager.py# vim: set fileencoding=utf-8 noexpandtab:
# Created at the University of Maryland, Human-Computer Interaction Lab
# (c) Copyright 2011 Alexander J. Quinn
# Licensed under the MIT License (see doc/LICENSE.txt)
'''
@author: Alexander J. Quinn
@contact: aq@purdue.edu
@since: December 2013
'''
from __future__ import division, with_statement
import threading
from crowdlib.utility import to_unicode
from crowdlib.AMT import AMT
_ONE_INSTANCE_PER_THREAD = False
class AMTInstanceManager(object):
def __init__(self):
self._instances_dict = {}
def get_amt(self, settings):
# Return same instance for any unique set of settings plus, optionally, the current thread ID.
instance_key = [(k,v) for (k,v) in vars(settings).items() if not k.startswith("_")]
instance_key.sort()
if _ONE_INSTANCE_PER_THREAD:
thread_ident = threading.current_thread().ident
instance_key.insert(0, thread_ident)
instance_key = to_unicode(instance_key)
try:
amt = self._instances_dict[instance_key]
except KeyError:
amt = AMT(settings)
self._instances_dict[instance_key] = amt
return amtPK ՍH * crowdlib-0.8.50/crowdlib/AMTMemoryCache.py# vim: set fileencoding=utf-8 noexpandtab:
# Created at the University of Maryland, Human-Computer Interaction Lab
# (c) Copyright 2011 Alexander J. Quinn
# Licensed under the MIT License (see doc/LICENSE.txt)
'''
@author: Alexander J. Quinn
@contact: aq@purdue.edu
@since: January 2011
'''
from __future__ import division, with_statement
_USE_WEAKREF_FOR_ASSIGNMENTS = True # reenabled on 12/3/2013
class AMTMemoryCache(object):
def __init__(self):
import weakref
self._hit_dict = {}
self._hit_type_dict = {}
self._worker_dict = {}
if _USE_WEAKREF_FOR_ASSIGNMENTS:
self._assignment_dict = weakref.WeakValueDictionary()
else:
self._assignment_dict = {} # disabled weakref 8-20-2011 for debugging; reenabled on 12/3/2013
#############################################
# WORKER
def has_worker(self, worker_id):
return (worker_id in self._worker_dict)
def put_worker(self, worker):
from crowdlib.all_exceptions import CrowdLibInternalError
if self.has_worker(worker.id):
raise CrowdLibInternalError("Worker %s already exists in memory cache."%worker.id)
self._worker_dict[worker.id] = worker
def get_worker(self, worker_id):
from crowdlib.all_exceptions import WorkerNotFoundException
try:
return self._worker_dict[worker_id]
except KeyError:
raise WorkerNotFoundException(worker_id, "Worker %s not found in memory cache"%worker_id)
def get_worker_ids(self):
return sorted(self._worker_dict.keys())
#############################################
# HIT_TYPE
def has_hit_type(self, hit_type_id):
return (hit_type_id in self._hit_type_dict)
def put_hit_type(self, hit_type):
from crowdlib.all_exceptions import CrowdLibInternalError
if self.has_hit_type(hit_type.id):
raise CrowdLibInternalError("HITType %s already exists in memory cache."%(hit_type.id))
self._hit_type_dict[hit_type.id] = hit_type
def get_hit_type(self, hit_type_id):
from crowdlib.all_exceptions import HITTypeNotFoundException
try:
return self._hit_type_dict[hit_type_id]
except KeyError:
raise HITTypeNotFoundException(hit_type_id, "HITType %s not found in memory cache."%hit_type_id)
def get_hit_type_ids(self):
return sorted(self._hit_type_dict.keys())
#############################################
# HIT
def has_hit(self, hit_id):
return (hit_id in self._hit_dict)
def put_hit(self, hit):
from crowdlib.all_exceptions import CrowdLibInternalError
if self.has_hit(hit.id):
raise CrowdLibInternalError("HIT %s already exists in memory cache."%(hit.id))
self._hit_dict[hit.id] = hit
def get_hit(self, hit_id):
from crowdlib.all_exceptions import HITNotFoundException
try:
return self._hit_dict[hit_id]
except KeyError:
raise HITNotFoundException(hit_id, "HIT %s not found in memory cache"%hit_id)
def get_hit_ids(self):
return sorted(self._hit_dict.keys())
#############################################
# ASSIGNMENT
# has_assignment is omitted intentionally to avert the possibility of a bug where you do...
# if self._memory_cache.has_assignment(asg_id):
# asg = self._memory_cache.get_assignment(asg_id)
# ... and find that it has been garbage collected between the two statements. This
# is only an issue because this uses a weakref structure.
def put_assignment(self, assignment):
from crowdlib.all_exceptions import CrowdLibInternalError
if assignment.id in self._assignment_dict:
raise CrowdLibInternalError("Assignment %s already exists in memory cache."%(assignment.id))
self._assignment_dict[assignment.id] = assignment
def get_assignment(self, assignment_id):
from crowdlib.all_exceptions import AssignmentNotFoundException
try:
return self._assignment_dict[assignment_id]
except KeyError:
raise AssignmentNotFoundException(assignment_id, "Assignment %s not found in memory cache"%assignment_id)
def get_assignments_for_hit_id(self, hit_id):
if _USE_WEAKREF_FOR_ASSIGNMENTS:
assignment_refs = self._assignment_dict.itervaluerefs() # iterable
assignments = (assignment_ref() for assignment_ref in assignment_refs) # iterable, generator
else:
try:
# Python 2
assignments = self._assignment_dict.itervalues()
except AttributeError:
# Python 3
assignments = self._assignment_dict.values()
assignments = tuple(asg for asg in assignments if (asg is not None) and (asg.hit.id==hit_id))
return assignments
if __name__=="__main__":
import sys
sys.stderr.write( "\nThis is just a module, not the code entry point.\n" )PK J
g S S % crowdlib-0.8.50/crowdlib/AMTServer.py# vim: set fileencoding=utf-8 noexpandtab:
# Created at the University of Maryland, Human-Computer Interaction Lab
# (c) Copyright 2011 Alexander J. Quinn
# Licensed under the MIT License (see doc/LICENSE.txt)
'''
@author: Alexander J. Quinn
@contact: aq@purdue.edu
@since: November 2010
'''
import datetime, sys
from crowdlib.all_exceptions import AMTRequestFailed, AssignmentAlreadyFinalizedException, XMLProcessingException
from crowdlib.AMTServerConnection import AMTServerConnection
from crowdlib.AnswerRecord import AnswerRecord
from crowdlib.AssignmentRecord import AssignmentRecord
from crowdlib.HITRecord import HITRecord
from crowdlib.QualificationRequirement import QualificationRequirement
from crowdlib.QualificationRequirementRecord import QualificationRequirementRecord
from crowdlib.QualificationType import QualificationType
from crowdlib.Reward import Reward
from crowdlib.utility import bool_in_element, datetime_in_element, duration_in_element, is_number, is_sequence_of, is_sequence_of_strings, is_string, number_in_element, parse_iso_utc_to_datetime_local, text_in_element, text_node_content, to_boolean, to_duration, to_tuple_if_non_sequence_iterable, to_unicode, total_seconds, xml2dom, xml_in_element
class AMTServer(object):
SERVICE_TYPE_SANDBOX = "sandbox"
SERVICE_TYPE_PRODUCTION = "production"
VALID_SERVICE_TYPES = (SERVICE_TYPE_SANDBOX, SERVICE_TYPE_PRODUCTION)
_WSDL_SCHEMA_VERSION_NOTIFICATIONS = "2006-05-05"
_PREVIEW_HIT_TYPE_URL_STEMS = {
"production":"http://mturk.com/mturk/preview?groupId=",
"sandbox":"http://workersandbox.mturk.com/mturk/preview?groupId="
}
def __init__( self, aws_account_id, aws_account_key, service_type):
assert service_type in self.VALID_SERVICE_TYPES
self._server = AMTServerConnection(aws_account_id, aws_account_key, service_type)
self._service_type = service_type
@property
def preview_hit_type_url_stem(self):
return self._PREVIEW_HIT_TYPE_URL_STEMS[self._service_type]
def do_request( self, operation, specific_parameters):
return self._server.do_request(operation, specific_parameters)
def grant_bonus(self, assignment_id, worker_id, amount, currency, reason):
params = {
"AssignmentId":assignment_id,
"WorkerId":worker_id,
"BonusAmount.1.Amount":amount,
"BonusAmount.1.CurrencyCode":currency
}
if reason is not None:
params["Reason"] = reason
self._server.do_request( "GrantBonus", params)
def create_hit(self, hit_type_id, question_xml, lifetime_in_seconds, max_assignments, requester_annotation, unique_request_token):
kwargs = {
"HITTypeId" : hit_type_id,
"Question" : question_xml,
"LifetimeInSeconds" : lifetime_in_seconds,
"MaxAssignments" : max_assignments,
"RequesterAnnotation" : requester_annotation,
"ResponseGroup.0":"HITDetail",
"ResponseGroup.1":"HITQuestion",
"ResponseGroup.2":"Minimal",
"ResponseGroup.3":"HITAssignmentSummary"
}
if unique_request_token is not None and unique_request_token != "":
if not is_string(unique_request_token):
raise ValueError("unique_request_token must be a string. Got a %r."%type(unique_request_token))
if len(unique_request_token) > 64:
raise ValueError("unique_request_token should be <=64 characters. Got %d characters."%len(unique_request_token))
kwargs["UniqueRequestToken"] = unique_request_token
dom = self._server.do_request( "CreateHIT", kwargs)
hit_nodes = dom.getElementsByTagName("HIT")
assert len(hit_nodes)==1
hit_node = hit_nodes[0]
result = self._extract_hit_node(hit_node=hit_node)
return result
def register_hit_type(self, title, description, reward, currency, time_limit, keywords, autopay_delay,
qualification_requirements):
keywords = to_tuple_if_non_sequence_iterable(keywords)
qualification_requirements = to_tuple_if_non_sequence_iterable(qualification_requirements)
assert is_string(title)
assert is_string(description)
assert is_number(reward)
assert is_string(currency)
assert isinstance(time_limit, datetime.timedelta)
assert is_sequence_of_strings(keywords)
assert isinstance(autopay_delay, datetime.timedelta)
assert is_sequence_of(qualification_requirements, QualificationRequirement)
params = { "Title": title,
"Description" : description,
"Reward.1.Amount" : reward,
"Reward.1.CurrencyCode" : currency,
"AssignmentDurationInSeconds" : total_seconds(time_limit),
"Keywords" : ",".join(keywords),
"AutoApprovalDelayInSeconds" : total_seconds(autopay_delay)
}
# Add in parameters for any qualification_requirement requirements.
for i,qualification_requirement in enumerate(qualification_requirements):
params['QualificationRequirement.%d.QualificationTypeId'%(i+1)] = qualification_requirement.qualification_type_id
params['QualificationRequirement.%d.Comparator'%(i+1)] = qualification_requirement.comparator
if qualification_requirement.integer_value is not None:
params['QualificationRequirement.%d.IntegerValue'%(i+1)] = str(qualification_requirement.integer_value)
if qualification_requirement.locale_value is not None:
params['QualificationRequirement.%d.LocaleValue.Country'%(i+1)] = qualification_requirement.locale_value
# Send the request to AMT.
dom = self._server.do_request( "RegisterHITType", params)
# Get the HIT Type ID out of the response.
hit_type_id = dom.getElementsByTagName("HITTypeId")[0].childNodes[0].data
return hit_type_id
def search_hits(self):
page_num = 0
page_size = 100
request_params = {
"PageSize":page_size,
"SortProperty":"Enumeration",
"PageNumber":page_num,
"ResponseGroup.0":"HITDetail",
"ResponseGroup.1":"HITQuestion",
"ResponseGroup.2":"Minimal",
"ResponseGroup.3":"HITAssignmentSummary"
}
results = []
while True:
page_num += 1
request_params["PageNumber"] = page_num
# Do request
dom = self._server.do_request("SearchHITs", request_params)
total_num_results = int(text_in_element(dom,"TotalNumResults"))
reported_page_num = int(text_in_element(dom,"PageNumber"))
if page_num != reported_page_num:
# This should never happen unless server breaks our expectations.
raise XMLProcessingException("Reported page number doesn't match expected")
hit_nodes = dom.getElementsByTagName("HIT")
for hit_node in hit_nodes:
result = self._extract_hit_node(hit_node=hit_node)
results.append(result)
if total_num_results <= page_num*page_size:
break
return results
def get_hit(self, hit_id):
params = {
"HITId":hit_id,
"ResponseGroup.0":"HITDetail",
"ResponseGroup.1":"HITQuestion",
"ResponseGroup.2":"Minimal",
"ResponseGroup.3":"HITAssignmentSummary"
}
dom = self._server.do_request( "GetHIT", params)
hit_nodes = dom.getElementsByTagName("HIT")
assert len(hit_nodes)==1
hit_node = hit_nodes[0]
result = self._extract_hit_node(hit_node=hit_node)
return result
def get_reviewable_hit_ids(self, hit_type_id=None): # GENERATOR
page_num = 0
page_size = 100
request_params = {"PageSize":page_size, "SortProperty":"Enumeration", "PageNumber":page_num}
if hit_type_id is not None:
request_params["HITTypeId"] = hit_type_id
while True:
page_num += 1
assert page_num < 100000 # safe upper bound
request_params["PageNumber"] = page_num
dom = self.do_request("GetReviewableHITs", request_params)
total_num_results = int( dom.getElementsByTagName("TotalNumResults")[0].childNodes[0].data )
page_num_reported = int( dom.getElementsByTagName("PageNumber")[0].childNodes[0].data )
assert page_num_reported==page_num
for hit_id_node in dom.getElementsByTagName("HITId"):
hit_id = str( hit_id_node.childNodes[0].data )
yield hit_id
if total_num_results <= page_num*page_size:
break
def unblock_worker_id( self, worker_id, reason ):
if reason is None:
# TODO: Refactor to AMTServer
self._server.do_request("UnblockWorker", {"WorkerId" : worker_id})
else:
# TODO: Refactor to AMTServer
self._server.do_request("UnblockWorker", {"WorkerId" : worker_id, "Reason" : reason})
def extend_hit_id(self, hit_id, max_assignments_increment=0, expiration_increment=0):
assert isinstance(max_assignments_increment, int) and max_assignments_increment >= 0
assert isinstance(expiration_increment, int) and expiration_increment >= 0
assert max_assignments_increment>0 or expiration_increment>0
params = {"HITId": hit_id}
if max_assignments_increment>0:
params["MaxAssignmentsIncrement"] = int(round(max_assignments_increment,0))
if expiration_increment>0:
params["ExpirationIncrementInSeconds"] = int(round(expiration_increment,0))
# TODO: Refactor to AMTServer
self._server.do_request("ExtendHIT", params )
def _extract_reward_node(self, reward_node):
reward_info = {}
for child in reward_node.childNodes:
child_name = child.nodeName
assert child_name in ("Amount","CurrencyCode","FormattedPrice")
reward_info[child_name] = child.childNodes[0].data
reward_node_info = Reward(
amount=reward_info["Amount"],
currency_code=reward_info["CurrencyCode"],
formatted_price=reward_info["FormattedPrice"])
return reward_node_info
def _extract_qualification_requirement_node(self, qreq_node):
qreq_info = {}
assert qreq_node.nodeName=="QualificationRequirement"
for child in qreq_node.childNodes:
if child.nodeName=="LocaleValue":
country_node = child.childNodes[0]
assert country_node.nodeName=="Country"
qreq_info["LocaleValue"] = text_node_content(country_node)
else:
# Common case
qreq_info[child.nodeName] = text_node_content(child)
qreq_node_info = QualificationRequirementRecord(
qualification_type_id=qreq_info["QualificationTypeId"],
comparator=qreq_info["Comparator"],
integer_value=qreq_info.get("IntegerValue",None),
locale_value=qreq_info.get("LocaleValue",None),
required_to_preview=to_boolean(qreq_info["RequiredToPreview"]))
return qreq_node_info
def _extract_hit_node(self, hit_node):
# SearchHITs does not return qualification requirements and possibly other attributes.
# GetHIT might not return NumberOfAssignmentsCompleted and some others. Not sure yet.
kwargs = {}
kwargs["qualification_requirements"] = []
kwargs["hit_review_status"] = None
kwargs["number_of_similar_hits"] = None
kwargs["requester_annotation"] = ""
keys_preppers_by_child_name = {
"HITId": ("hit_id", None),
"HITTypeId": ("hit_type_id", None),
"CreationTime": ("creation_time", parse_iso_utc_to_datetime_local),
"Title": ("title", None),
"Description": ("description", None),
"Question": ("question", None),
"Keywords": ("keywords", lambda content:tuple(kw.strip() for kw in content.split(","))),
"HITStatus": ("hit_status", None),
"MaxAssignments": ("max_assignments", int),
"AutoApprovalDelayInSeconds": ("auto_approval_delay", to_duration),
"Expiration": ("expiration_time", parse_iso_utc_to_datetime_local),
"AssignmentDurationInSeconds": ("assignment_duration", to_duration),
"NumberOfSimilarHITs": ("number_of_similar_hits", int),
"HITReviewStatus": ("hit_review_status", None),
"RequesterAnnotation": ("requester_annotation", None),
"NumberOfAssignmentsPending": ("num_pending", int),
"NumberOfAssignmentsAvailable": ("num_available", int),
"NumberOfAssignmentsCompleted": ("num_completed", int),
}
for child in hit_node.childNodes:
child_name = child.nodeName
if child_name=="Reward":
kwargs["reward"] = self._extract_reward_node(child)
elif child_name=="QualificationRequirement" and len(child.childNodes)>1:
qreq = self._extract_qualification_requirement_node(child)
kwargs["qualification_requirements"].append(qreq)
elif child_name=="Request":
pass # Ignore this one.
elif child_name in ("HITGroupId", "HITLayoutId"):
pass # not supported, ignore for now
else:
key,prepper_fn = keys_preppers_by_child_name[child_name] # KeyError here would indicate unexpected info in HIT structure
content = text_node_content(child)
if prepper_fn is not None:
content = prepper_fn(content)
kwargs[key] = content
kwargs = dict((str(k),v) for (k,v) in kwargs.items())
return HITRecord(**kwargs)
def force_expire_hit( self, hit_id ):
self._server.do_request( "ForceExpireHIT", {"HITId":hit_id} )
def reject_assignment(self, assignment, reason):
# reason may be None
try:
if reason is None:
self._server.do_request( "RejectAssignment", {"AssignmentId":assignment.id} )
else:
self._server.do_request( "RejectAssignment", {"AssignmentId":assignment.id, "RequesterFeedback":reason} )
except AMTRequestFailed:
e = sys.exc_info()[1]
if e.code=="AWS.MechanicalTurk.InvalidAssignmentState" and e.operation=="RejectAssignment":
raise AssignmentAlreadyFinalizedException(assignment.id, assignment.assignment_status)
else:
raise
def get_account_balance( self ):
dom = self._server.do_request( "GetAccountBalance", {} )
available_balance = float(text_in_element(dom,"Amount"))
currency = text_in_element(dom, "CurrencyCode")
formatted_price = text_in_element(dom, "FormattedPrice")
amount = Reward(amount=available_balance, currency_code=currency, formatted_price=formatted_price)
return amount
def approve_assignment(self, assignment, requester_feedback):
# requester_feedback may be None
try:
if requester_feedback is None:
self._server.do_request( "ApproveAssignment", {"AssignmentId":assignment.id} )
else:
self._server.do_request( "ApproveAssignment", {"AssignmentId":assignment.id, "RequesterFeedback":requester_feedback} )
except AMTRequestFailed:
e = sys.exc_info()[1]
if e.code=="AWS.MechanicalTurk.InvalidAssignmentState" and e.operation=="ApproveAssignment":
raise AssignmentAlreadyFinalizedException(assignment.id, assignment.assignment_status)
else:
raise
def create_qualification_type( self, name, description, initially_active, keywords, retry_delay, test_xml,
answer_key_xml, test_duration, auto_granted, auto_granted_value):
assert isinstance(retry_delay, datetime.timedelta), retry_delay
assert isinstance(test_duration, datetime.timedelta)
param_pairs = ( ( "Name", name ),
( "Description", description ),
( "Keywords", ",".join(keywords) ),
( "RetryDelayInSeconds", total_seconds(retry_delay)),
( "QualificationTypeStatus", ("Active" if initially_active else "Inactive") ),
( "Test", test_xml ),
( "AnswerKey", answer_key_xml ),
( "TestDurationInSeconds", total_seconds(test_duration)),
( "AutoGranted", ("true" if auto_granted else "false") ) )
if auto_granted:
param_pairs += (( "AutoGrantedValue", auto_granted_value ),)
params = dict((k,v) for (k,v) in param_pairs if v is not None)
dom = self._server.do_request( "CreateQualificationType", params )
qtype_id = text_in_element( dom, "QualificationTypeId" )
return qtype_id
def get_qualification_types(self):
# TODO: Refactor to AMTServer
dom = self._server.do_request( "SearchQualificationTypes", {
"MustBeRequestable" : "false",
"MustBeOwnedByCaller" : "true",
"PageSize" : "100",
})
num_results = int( text_in_element( dom, "NumResults" ) )
assert num_results<100, "This code needs revision to handle >100 qualification types."
results = []
for node in dom.getElementsByTagName( "QualificationType" ):
qtype_id = text_in_element(node, "QualificationTypeId")
creation_time = datetime_in_element(node, "CreationTime")
name = text_in_element(node, "Name")
description = text_in_element(node, "Description")
keywords = text_in_element(node, "Keywords")
keywords = tuple(s.strip() for s in keywords.split(","))
try:
is_active = bool_in_element(node, "QualificationTypeStatus",
value_if_true="Active", value_if_false="Inactive")
except XMLProcessingException:
is_active = None
try:
retry_delay = text_in_element(node, "RetryDelayInSeconds")
except XMLProcessingException:
retry_delay = None
test_node = node.getElementsByTagName("Test")[0]
# question_node = test_node.firstChild # removed this 10-8-2011. Why was this needed before?
test_xml = text_node_content(test_node)
test_duration = duration_in_element(node, "TestDurationInSeconds")
try:
answer_key_xml = xml_in_element(node, "AnswerSpecification")
except XMLProcessingException:
answer_key_xml = None
try:
auto_granted = bool_in_element(node, "AutoGranted")
except XMLProcessingException:
auto_granted = None
try:
auto_granted_value = number_in_element(node, "AutoGrantedValue")
except XMLProcessingException:
auto_granted_value = None
try:
is_requestable = bool_in_element(node, "IsRequestable")
except XMLProcessingException:
is_requestable = None
assert is_requestable == True or is_requestable is None # Notify Alex Quinn if this ever happens. I don't think it will.
# IsRequestable (is_requestable) should be True for any qualification requirement I created.
# "Specifies whether the Qualification type is one that a user can request through the
# Amazon Mechanical Turk web site, such as by taking a Qualification test. This value is
# false for Qualifications assigned automatically by the system."
# http://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_QualificationTypeDataStructureArticle.html
if is_requestable is None:
is_requestable = True # not sure why this would ever happen
qtype = QualificationType(id=qtype_id,
creation_time=creation_time,
name=name,
description=description,
keywords=keywords,
is_active=is_active,
retry_delay=retry_delay,
test_xml=test_xml,
test_duration=test_duration,
answer_key_xml=answer_key_xml,
auto_granted=auto_granted,
auto_granted_value=auto_granted_value,
#is_requestable=is_requestable
)
results.append( qtype )
return results
def notify_workers(self, worker_ids, subject, message_text):
assert 1 <= len(worker_ids) <= 100
params = {
"Subject" : subject,
"MessageText" : message_text
}
for j,worker_id in enumerate(worker_ids):
params["WorkerId.%d"%(j+1)] = worker_id
self._server.do_request("NotifyWorkers", params)
def get_assignments_for_hit(self, hit): # GENERATOR
# TODO: Refactor to reduce duplication with search_hits(..) ... 20170425
page_number = 0
page_size = 100
while True:
page_number += 1
request_params = {"HITId":hit.id, "PageSize":page_size, "PageNumber":page_number}
# Do request
dom = self._server.do_request( "GetAssignmentsForHIT", request_params )
# Check that reported page number matches our internal count
total_num_results = int(text_in_element(dom,"TotalNumResults"))
reported_page_num = int(text_in_element(dom,"PageNumber"))
if page_number != reported_page_num:
# This should never happen unless server breaks our expectations.
raise XMLProcessingException("Reported page number doesn't match expected")
for assignment_node in dom.getElementsByTagName( "Assignment" ):
assignment_record = self._extract_assignment_data( assignment_node=assignment_node )
assert hit.id==assignment_record.hit_id, "Expected them to be the same: "%(repr((hit.id,assignment_record.hit_id)))
yield assignment_record
if total_num_results <= page_number*page_size:
break
# def get_assignments_for_hit(self, hit): # GENERATOR
# dom = self._server.do_request( "GetAssignmentsForHIT", {"HITId":hit.id, "PageSize":100, "PageNumber":1} )
# for assignment_node in dom.getElementsByTagName( "Assignment" ):
# assignment_record = self._extract_assignment_data( assignment_node=assignment_node )
# assert hit.id==assignment_record.hit_id, "Expected them to be the same: "%(repr((hit.id,assignment_record.hit_id)))
# yield assignment_record
#
def _extract_assignment_data( self, assignment_node):
autopay_time = rejection_time = submit_time = approval_time = None
answer_xml = None
answer_records = ()
requester_feedback = None
for node in assignment_node.childNodes:
name = node.nodeName
if name=="AssignmentId":
assignment_id = text_node_content( node )
elif name=="WorkerId":
worker_id = text_node_content( node )
assert is_string(worker_id)
elif name=="HITId":
hit_id = text_node_content( node )
elif name=="AssignmentStatus":
assignment_status = text_node_content( node )
elif name=="AutoApprovalTime":
autopay_time = parse_iso_utc_to_datetime_local( text_node_content( node ) )
elif name=="SubmitTime":
submit_time = parse_iso_utc_to_datetime_local( text_node_content( node ) )
elif name=="ApprovalTime":
approval_time = parse_iso_utc_to_datetime_local( text_node_content( node ) )
elif name=="AcceptTime":
accept_time = parse_iso_utc_to_datetime_local( text_node_content( node ) )
elif name=="RejectionTime":
rejection_time = parse_iso_utc_to_datetime_local( text_node_content( node ) )
elif name=="RequesterFeedback":
requester_feedback = text_node_content( node )
elif name=="Answer":
assert answer_xml is None, "Only expected one Answer node per Assignment node"
answer_xml = text_node_content( node )
answer_dom = xml2dom(answer_xml)
answer_dom_nodes = answer_dom.getElementsByTagName("Answer")
answer_records = tuple(self._extract_answer_from_dom_node(node) for node in answer_dom_nodes)
assignment_record = AssignmentRecord(
accept_time=accept_time,
answer_records=answer_records,
approval_time=approval_time,
assignment_id=assignment_id,
assignment_status=assignment_status,
auto_approval_time=autopay_time,
hit_id=hit_id,
rejection_time=rejection_time,
requester_feedback=requester_feedback,
submit_time=submit_time,
worker_id=worker_id)
return assignment_record
def get_requester_statistic(self, statistic, time_period, count=None):
# Details at:
# http://docs.amazonwebservices.com/AWSMturkAPI/2008-08-02/ApiReference_GetRequesterStatisticOperation.html
# The max count is 730. Discovered by experiment on 2-14-2011.
assert time_period in ("OneDay", "SevenDays", "ThirtyDays", "LifeToDate"), time_period
assert statistic in ("NumberAssignmentsAvailable", "NumberAssignmentsAccepted",
"NumberAssignmentsPending", "NumberAssignmentsApproved", "NumberAssignmentsRejected",
"NumberAssignmentsReturned", "NumberAssignmentsAbandoned", "PercentAssignmentsApproved",
"PercentAssignmentsRejected", "TotalRewardPayout", "AverageRewardAmount",
"TotalRewardFeePayout", "TotalFeePayout", "TotalRewardAndFeePayout", "TotalBonusPayout",
"TotalBonusFeePayout", "NumberHITsCreated", "NumberHITsCompleted", "NumberHITsAssignable",
"NumberHITsReviewable", "EstimatedRewardLiability", "EstimatedFeeLiability",
"EstimatedTotalLiability")
assert not (statistic=="NumberHITsAssignable" and time_period!="LifeToDate")
# NumberHITsAssignable may not be collected on a daily basis. See note in docs:
# http://docs.amazonwebservices.com/AWSMechTurk/2008-08-02/AWSMturkAPI/ApiReference_GetRequesterStatisticOperation.html
assert (time_period=="LifeToDate") ^ (isinstance(count,int)) # need count iff time_period is anything other than LifeToDate
params = {
"Statistic" : statistic,
"TimePeriod" : time_period
}
if count is not None:
params["Count"] = count
assert time_period=="OneDay", "Count can only be specified if the time period is OneDay" # per docs
dom = self._server.do_request("GetRequesterStatistic", params)
nodes = dom.getElementsByTagName("DataPoint")
assert len(nodes)==1 or (time_period=="OneDay" and len(nodes)>=1)
results = []
for node in nodes:
date = parse_iso_utc_to_datetime_local(text_in_element(node, "Date"))
if statistic.startswith("Number"):
val = int(text_in_element(node, "LongValue"))
else:
val = float(text_in_element(node, "DoubleValue"))
results.append((date,val))
return results
def _extract_answer_from_dom_node( self, answer_node):
free_text = uploaded_file_key = uploaded_file_size_in_bytes = selection_identifier = other_selection = None
for answer_child_node in answer_node.childNodes:
answer_child_name = answer_child_node.nodeName
if answer_child_name=="QuestionIdentifier":
question_identifier = to_unicode( text_node_content( answer_child_node ) )
elif answer_child_name == "FreeText":
free_text = to_unicode( text_node_content( answer_child_node ) )
elif answer_child_name == "SelectionIdentifier":
selection_identifier = to_unicode( text_node_content( answer_child_node ) )
elif answer_child_name == "OtherSelection":
other_selection = to_unicode( text_node_content(answer_child_node ) )
elif answer_child_name == "UploadedFileKey":
uploaded_file_key = to_unicode( text_node_content( answer_child_node ) )
elif answer_child_name == "UploadedFileSizeInBytes":
uploaded_file_size_in_bytes = int( text_node_content( answer_child_node ) )
elif answer_child_name=="#text":
pass
else:
assert False, "Unexpected node type found: %s"%answer_child_name
answer_record = AnswerRecord(
question_identifier=question_identifier,
free_text=free_text,
selection_identifier=selection_identifier,
other_selection=other_selection,
uploaded_file_key=uploaded_file_key,
uploaded_file_size_in_bytes=uploaded_file_size_in_bytes)
return answer_record
NOTIFICATION_EVENT_TYPE_ASSIGNMENT_ACCEPTED = "AssignmentAccepted"
NOTIFICATION_EVENT_TYPE_ASSIGNMENT_ABANDONED = "AssignmentAbandoned"
NOTIFICATION_EVENT_TYPE_RETURNED = "AssignmentReturned"
NOTIFICATION_EVENT_TYPE_SUBMITTED = "AssignmentSubmitted"
NOTIFICATION_EVENT_TYPE_REVIEWABLE = "HITReviewable"
NOTIFICATION_EVENT_TYPE_EXPIRED = "HITExpired"
NOTIFICATION_EVENT_TYPES_ALL = (NOTIFICATION_EVENT_TYPE_ASSIGNMENT_ACCEPTED,
NOTIFICATION_EVENT_TYPE_ASSIGNMENT_ABANDONED,
NOTIFICATION_EVENT_TYPE_RETURNED,
NOTIFICATION_EVENT_TYPE_SUBMITTED,
NOTIFICATION_EVENT_TYPE_REVIEWABLE,
NOTIFICATION_EVENT_TYPE_EXPIRED)
def set_hit_type_notification(self, hit_type_id, address, transport, event_types):
params = { "HITTypeId" : hit_type_id,
"Notification.1.Destination" : address,
"Notification.1.Transport" : transport,
"Notification.1.Version" : self._WSDL_SCHEMA_VERSION_NOTIFICATIONS,
"Notification.1.Active" : "true" }
if len(event_types) == 1:
params["Notification.1.EventType"] = event_types[0]
else:
i = 0
for event_type in event_types:
i += 1 # should start with 1
params["Notification.1.EventType.%d"%i] = event_type
self._server.do_request("SetHITTypeNotification", params)
def send_test_event_notification(self, address, transport, event_type):
if event_type not in self.NOTIFICATION_EVENT_TYPES_ALL:
raise ValueError("event_type %s is supposed to be one of %s"%(event_type, repr(self.NOTIFICATION_EVENT_TYPES_ALL)))
params = { "Notification.1.Destination" : address,
"Notification.1.Transport" : transport,
"Notification.1.EventType" : event_type,
"Notification.1.Version" : self._WSDL_SCHEMA_VERSION_NOTIFICATIONS,
"Notification.1.Active" : "true",
"TestEventType" : event_type,
}
self._server.do_request("SendTestEventNotification", params)
def set_hit_type_notification_inactive(self, hit_type_id):
params = { "HITTypeId" : hit_type_id,
"Active" : "false" }
self._server.do_request("SetHITTypeNotification", params)
def get_file_upload_url(self, assignment_id, question_id):
params = { "AssignmentId" : assignment_id,
"QuestionIdentifier" : question_id }
dom = self._server.do_request("GetFileUploadURL", params)
url = text_in_element(dom, "FileUploadURL")
return url
#vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
# EXPERIEMENTAL CODE (do not delete, yet)
#
# TODO: Make search_hits multi-threaded. Below is a start on doing that.
#
# There are some options. I'm not sure which is best, easiest, or even possible/functional:
# * worker threads
# * asynchttp module
# * pycurl
# * eventlets / greenlets
# * concurrent futures
#
# The advantage of threads is that it would pose no serious Python 2/3 or platform issues. However,
# someone said in this StackOverflow discussion that threads may not solve the problem.
# http://stackoverflow.com/questions/4962808/asynchronous-http-calls-in-python#comment21544074_4963126
#
# def search_hits(self):
# # Strategy: Use a queue and worker threads.
#
# # MULTI-THREADED
# from crowdlib.all_exceptions import XMLProcessingException
# from crowdlib.utility import text_in_element, dmp_xml
# from math import ceil
# page_num = 0
# page_size = 100
# request_params = {"PageSize":page_size, "SortProperty":"Enumeration", "PageNumber":page_num, "ResponseGroup.0":"HITDetail", "ResponseGroup.1":"HITQuestion","ResponseGroup.2":"Minimal","ResponseGroup.3":"HITAssignmentSummary"}
# hits_seen = 0
# hit_type_dict = {}
#
# # Page number is 1-based.
# # http://docs.amazonwebservices.com/AWSMechTurk/2008-08-02/AWSMturkAPI/ApiReference_SearchHITsOperation.html
#
# first_page_num = 1
# request_params["PageNumber"] = first_page_num
# operation = "SearchHITs"
# dom = self._server.do_request(operation, request_params)
# doms = [dom]
## num_results = int(text_in_element(dom, "NumResults"))
# total_num_results = int(text_in_element(dom,"TotalNumResults"))
## reported_page_num = int(text_in_element(dom,"PageNumber"))
## if page_num != reported_page_num:
## raise XMLProcessingException("Reported page number doesn't match expected")
# num_pages = int(ceil(total_num_results / page_size))
# operation_specific_parameters_pairs = []
# for page_num in range(first_page_num + 1, first_page_num + num_pages):
# request_params = request_params.copy()
# request_params["PageNumber"] = page_num
# operation_specific_parameters_pairs.append((operation, request_params))
#
# doms += self._server.do_requests_simultaneously(operation_specific_parameters_pairs=operation_specific_parameters_pairs)
# for dom in doms:
# hit_nodes = dom.getElementsByTagName("HIT")
# for hit_node in hit_nodes:
# result = self._extract_hit_node(hit_node=hit_node)
# yield result
#vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
# SHRAPNEL (delete any time if you don't think it will be needed)
#
# kwargs["hit_type_id"]=hit_info["HITTypeId"]
# kwargs["creation_time"] = to_date_time( hit_info["CreationTime"] )
# kwargs["title"]=hit_info["Title"]
# kwargs["keywords"]=tuple(hit_info.get("Keywords","").split(","))
# kwargs["description"]=hit_info["Description"]
# kwargs["hit_status"]=hit_info["HITStatus"]
# kwargs["currency"]=hit_info["Reward.CurrencyCode"]
# kwargs["time_limit"]=to_duration(hit_info["AssignmentDurationInSeconds"]
# kwargs["autopay_delay"]=to_duration(hit_info["AutoApprovalDelayInSeconds"]
#
# if expect_partial:
# hit_node_partial_info = SearchHITsResult(
# hit_id=hit_id,
# hit_type_id=hit_type_id,
# title=title,
# description=description,
# creation_time=creation_time,
# expiration_time=expiration_time,
#
# hit_id=hit_info["HITId"],
# hit_type_id=hit_info["HITTypeId"],
# creation_time = to_date_time( hit_info["CreationTime"] )
# title=to_unicode(hit_info["Title"]),
# description=to_unicode(hit_info["Description"]),
# hit_status=hit_info["HITStatus"],
#
# reward=float(hit_info["Reward.Amount"]),
# currency=to_unicode( hit_info["Reward.CurrencyCode"] ),
# time_limit=to_duration(hit_info["AssignmentDurationInSeconds"]),
# autopay_delay=to_duration(hit_info["AutoApprovalDelayInSeconds"]),
# keywords=keywords,
# qualification_requirements=qualification_requirements,
# details=details,
# amt=self)
# hit_id)
# hit_type_id = to_unicode(hit_info["HITTypeId"])
# keywords=tuple(to_unicode(kw) for kw in hit_info.get("Keywords","").split(","))
# details = {} # Details aren't stored on server, so if we don't have them, we're out of luck.
# hit_type = HITType( id=hit_type_id,
# title=to_unicode(hit_info["Title"]),
# description=to_unicode(hit_info["Description"]),
# reward=float(hit_info["Reward.Amount"]),
# currency=to_unicode( hit_info["Reward.CurrencyCode"] ),
# time_limit=to_duration(hit_info["AssignmentDurationInSeconds"]),
# autopay_delay=to_duration(hit_info["AutoApprovalDelayInSeconds"]),
# keywords=keywords,
# qualification_requirements=qualification_requirements,
# details=details,
# amt=self)
# self._db.put_hit_type(hit_type)
# assert hit_type_id not in self._hit_type_dict
# self._hit_type_dict[hit_type_id] = hit_type
# return hit_type
# def _extract_hit_type_from_hit_node(self, hit_node):
# '''
# @warning: SearchHITs does not return the qualification requirements. Make sure
# to pass results from GetHIT, not SearchHITs, into this.
# '''
# from crowdlib.utility import to_unicode, to_date_time
# from crowdlib.QualificationRequirement import QualificationRequirement
# parts_of_interest = ("HITTypeId", "Title", "Description", "Keywords",
# "AssignmentDurationInSeconds", "Reward",
# "AutoApprovalDelayInSeconds", "QualificationRequirement")
#
# hit_info = {}
# qualification_requirements = []
# for child in hit_node.childNodes:
# child_name = child.nodeName
#
# if child_name=="Reward":
# # Use dot notation for the reward since it has subnodes.
# for reward_child in child.childNodes:
# reward_child_name = reward_child.nodeName
# if reward_child_name in ( "Amount","CurrencyCode","FormattedPrice" ):
# hit_info["Reward" + "." + reward_child_name] = reward_child.childNodes[0].data
# elif child_name=="QualificationRequirement" and len(child.childNodes)>1:
# qid = text_in_element(child, "QualificationTypeId")
# qcomp = text_in_element(child, "Comparator")
# try:
# qintval = int(text_in_element(child, "IntegerValue"))
# qlocaleval = None
# except XMLProcessingException:
# qlocaleval = int(text_in_element(child, "LocaleValue"))
# qintval = None
#
# qualification_requirement = QualificationRequirement(
# qualification_type_id=qid,
# comparator=qcomp,
# integer_value=qintval,
# locale_value=qlocaleval,
# required_to_preview=False)
# qualification_requirements.append(qualification_requirement)
# #hit_info["QualificationTypeId"] = text_in_element( child, "QualificationTypeId" )
# elif child_name in parts_of_interest:
# try:
# contents = child.childNodes[0].data
# except IndexError: # Why???? (11-15-2010)
# contents = ""
# hit_info[child_name] = contents
#
# hit_type_id = to_unicode(hit_info["HITTypeId"])
# keywords=tuple(to_unicode(kw) for kw in hit_info.get("Keywords","").split(","))
# details = {} # Details aren't stored on server, so if we don't have them, we're out of luck.
# hit_type = HITType( id=hit_type_id,
# title=to_unicode(hit_info["Title"]),
# description=to_unicode(hit_info["Description"]),
# reward=float(hit_info["Reward.Amount"]),
# currency=to_unicode( hit_info["Reward.CurrencyCode"] ),
# time_limit=to_duration(hit_info["AssignmentDurationInSeconds"]),
# autopay_delay=to_duration(hit_info["AutoApprovalDelayInSeconds"]),
# keywords=keywords,
# qualification_requirements=qualification_requirements,
# details=details,
# amt=self)
# self._db.put_hit_type(hit_type)
# assert hit_type_id not in self._hit_type_dict
# self._hit_type_dict[hit_type_id] = hit_type
# return hit_type
# def _extract_from_hit_node( self, hit_node, hit_type, just_update_shell ):
#
# from crowdlib.utility import to_unicode, to_date_time
# parts_of_interest = (
# "HITId", "HITTypeId", "CreationTime", "HITStatus", "MaxAssignments", "Expiration",
# "RequesterAnnotation", "Question", "NumberOfAssignmentsPending",
# "NumberOfAssignmentsAvailable", "NumberOfAssignmentsCompleted")
#
# hit_info = {}
# hit_info["HITReviewStatus"] = None
# for child in hit_node.childNodes:
# child_name = child.nodeName
# if child_name in parts_of_interest:
# try:
# contents = to_unicode(child.childNodes[0].data)
# except IndexError: # Why???? (11-15-2010)
# contents = ""
# hit_info[child_name] = contents
#
# assert to_unicode(hit_info["HITTypeId"]) == hit_type.id
#
# hit_id = to_unicode(hit_info["HITId"])
# details = {} # Details aren't stored on server, so if we don't have them, we're out of luck.
#
# # If we're just making a shell for an update, then pass None for amt
# if just_update_shell:
# amt = None
# else:
# amt = self
#
# hit_status = hit_info["HITStatus"]
# creation_time = to_date_time( hit_info["CreationTime"] )
# expiration_time = to_date_time( hit_info["Expiration"] )
# num_pending = int( hit_info.get("NumberOfAssignmentsPending", 0) )
# num_available = int( hit_info.get("NumberOfAssignmentsAvailable", 0) )
# num_completed = int( hit_info.get("NumberOfAssignmentsCompleted", 0) )
# hit_review_status = hit_info.get("HITReviewStatus", None)
# max_assignments=int( hit_info["MaxAssignments"] )
#
#
# hit = HIT( id=hit_id,
# hit_type=hit_type,
# question_xml=hit_info.get("Question",None),
# max_assignments=max_assignments,
# requester_annotation=hit_info.get("RequesterAnnotation",""),
# creation_time = creation_time,
# hit_status = hit_status,
# expiration_time = expiration_time,
# num_pending = num_pending,
# num_available = num_available,
# num_completed = num_completed,
# hit_review_status = hit_review_status,
# details=details,
# amt=amt)
#
# if just_update_shell:
# self._db.update_hit(hit_id=hit_id, hit_status=hit_status, creation_time=creation_time,
# expiration_time=expiration_time, num_pending=num_pending, num_available=num_available,
# num_completed=num_completed, max_assignments=max_assignments)
# else:
# self._db.put_hit(hit)
# assert hit_id not in self._hit_dict, hit_id
# self._hit_dict[hit_id] = hit
#
# return hit
# def get_bonus_payments(hit_id=None, assignment_id=None):
# if not ((hit_id is None) ^ (assignment_id is None)):
# raise ValueError("You must pass in one of hit_id or assignment_id, and "
# "not both. Received hit_id=%s and assignment_id=%s"%(
# repr(hit_id), repr(assignment_id)))
# if hit_id is not None:
# params = {"HITId":hit_id}
# else:
# params = {"AssignmentId":assignment_id}
# params[""]
# dom = self._server.do_request("GetBonusPayments", params)
# def create_qualification_type(self, name, description, keywords, retry_delay, qualification_type_status,
# test_xml, answer_key_xml, test_duration, auto_granted, auto_granted_value):
# param_pairs = ( ( "Name", name ),
# ( "Description", description ),
# ( "Keywords", ",".join(keywords) ),
# ( "RetryDelayInSeconds", total_seconds(retry_delay)),
# ( "QualificationTypeStatus", ("Active" if is_active else "Inactive") ),
# ( "Test", test_xml ),
# ( "AnswerKey", answer_key_xml ),
# ( "TestDurationInSeconds", total_seconds(test_duration)),
# ( "AutoGranted", ("true" if auto_granted else "false") ) )
# if auto_granted:
# param_pairs += (( "AutoGrantedValue", auto_granted_value ),)
# params = dict((k,v) for (k,v) in param_pairs if v is not None)
# try:
# dom = self._server.do_request( "CreateQualificationType", params )
# def search_hits(self):
# from crowdlib.all_exceptions import XMLProcessingException
# page_num = 0
# page_size = 100
# request_params = {"PageSize":page_size, "SortProperty":"Enumeration", "PageNumber":page_num}
# hits_seen = 0
# hit_type_dict = {}
# while True:
# page_num += 1
# request_params["PageNumber"] = page_num
# dom = self._server.do_request("SearchHITs", request_params)
# num_results = int(text_in_element(dom, "NumResults"))
# total_num_results = int(text_in_element(dom,"TotalNumResults"))
# reported_page_num = int(text_in_element(dom,"PageNumber"))
# if page_num != reported_page_num:
# raise XMLProcessingException("Reported page number doesn't match expected")
#
# hit_nodes = dom.getElementsByTagName("HIT")
# for hit_node in hit_nodes:
# hit_info = {}
# for child in hit_node.childNodes:
# child_name = child.nodeName
# if child_name=="Reward":
# # Use dot notation for the reward since it has subnodes.
# for reward_child in child.childNodes:
# reward_child_name = reward_child.nodeName
# if reward_child_name in ( "Amount","CurrencyCode","FormattedPrice" ):
# hit_info["Reward" + "." + reward_child_name] = reward_child.childNodes[0].data
# elif child_name=="QualificationRequirement" and len(child.childNodes)>1:
# qid = text_in_element(child, "QualificationTypeId")
# qcomp = text_in_element(child, "Comparator")
# try:
# qintval = int(text_in_element(child, "IntegerValue"))
# qlocaleval = None
# except XMLProcessingException:
# qlocaleval = int(text_in_element(child, "LocaleValue"))
# qintval = None
#
# qualification_requirement = QualificationRequirement(
# qualification_type_id=qid,
# comparator=qcomp,
# integer_value=qintval,
# locale_value=qlocaleval,
# required_to_preview=False)
# qualification_requirements.append(qualification_requirement)
# #hit_info["QualificationTypeId"] = text_in_element( child, "QualificationTypeId" )
# elif child_name in parts_of_interest:
# try:
# contents = child.childNodes[0].data
# except IndexError: # Why???? (11-15-2010)
# contents = ""
# hit_info[child_name] = contents
# for hit_node in hit_nodes:
# hit_type_id = text_in_element(hit_node, "HITTypeId")
# if hit_type_id in hit_type_dict:
# hit_type = hit_type_dict[hit_type_id]
#
# hit = self._extract_from_hit_node(hit_node, hit_type, just_update_shell=False)
# # This will add it to the DB.
#
# else:
# hit_id = text_in_element(hit_node, "HITId")
#
# assert False, "This is broken. Won't return an actual HIT." #FIXME
# hit = self.get_hit(hit_id)
# # This will add it to the DB.
#
# hit_type_dict[hit_type_id] = hit.hit_type
#
# if total_num_results <= page_num*page_size:
# break
# TAKEN FROM END OF _extract_assignment_data(..)
# answers = []
# assignment = Assignment( id=assignment_id,
# worker=worker,
# hit=hit,
# assignment_status=assignment_status,
# autopay_time=autopay_time,
# submit_time=submit_time,
# approval_time=approval_time,
# accept_time=accept_time,
# rejection_time=rejection_time,
# requester_feedback=requester_feedback,
# answers=answers,
# amt=amt)
#
# # ***** CIRCULAR REFERENCE *****
# # WARNING: We are creating a circular reference here. The Assignment
# # keeps a list of Answers, each of which maintains a reference back to
# # the assignment. We first feed the Assignment object with an empty list
# # and then populate the list from the outside.
#
# answer_dom = xml2dom(answer_xml)
#
# for answer_node in answer_dom.getElementsByTagName("Answer"):
# answer = self._extract_answer_from_dom_node( answer_node, assignment )
# answers.append( answer ) # answers was given to assignment above.
#
# if just_update_shell:
# self._db.update_assignment(assignment_id=assignment_id, assignment_status=assignment_status,
# submit_time=submit_time, approval_time=approval_time,
# rejection_time=rejection_time, requester_feedback=requester_feedback)
# else:
# self._db.put_assignment(assignment)
#
# return assignment
PK ꚙJnS
Y- - / crowdlib-0.8.50/crowdlib/AMTServerConnection.py# vim: set fileencoding=utf-8 noexpandtab:
# Created at the University of Maryland, Human-Computer Interaction Lab
# (c) Copyright 2011 Alexander J. Quinn
# Licensed under the MIT License (see doc/LICENSE.txt)
'''
@author: Alexander J. Quinn
@contact: aq@purdue.edu
@since: November 2010
'''
from __future__ import division, with_statement
import codecs, hmac, os, pprint, sys, time, traceback
from crowdlib.all_exceptions import AMTQualificationTypeAlreadyExists, AMTRequestFailed
from crowdlib.utility import base64_encodestring_py23_compatible, clear_line, get_call_stack_strs, log, urlencode_py23_compatible, urlopen_py23_compatible, xml2dom
from crowdlib.utility.debugging import is_debugging
from crowdlib.utility.time_utils import now_local
from hashlib import sha1 as sha
VERBOSE = False
DEBUG_LOG_REQUESTS_TO_FILE = "~/.crowdlib_data/server_requests_log.py"
class AMTServerConnection(object):
# Service types: sandbox and production
SERVICE_TYPE_SANDBOX = "sandbox"
SERVICE_TYPE_PRODUCTION = "production"
VALID_SERVICE_TYPES = (SERVICE_TYPE_SANDBOX, SERVICE_TYPE_PRODUCTION)
#_SERVICE_VERSION = '2008-08-02'
_SERVICE_VERSION = '2012-03-25'
_SERVICE_NAME = 'AWSMechanicalTurkRequester'
_PREVIEW_HIT_TYPE_URL_STEMS = {
"production":"http://mturk.com/mturk/preview?groupId=",
"sandbox":"http://workersandbox.mturk.com/mturk/preview?groupId="
}
_SERVICE_URLS = {
"sandbox" : "https://mechanicalturk.sandbox.amazonaws.com/onca/xml?",
"production" : 'https://mechanicalturk.amazonaws.com/onca/xml?'
}
def __init__( self, aws_account_id,
aws_account_key,
service_type):
assert service_type in self.VALID_SERVICE_TYPES
# Main SETTINGS
self._aws_account_id = aws_account_id
self._aws_account_key = aws_account_key
self._service_type = service_type
self._max_requests_per_second = {"sandbox":5, "production":100}[service_type]
self._url = self._SERVICE_URLS[service_type] # URL for submitting requests to AMT
self._last_request_time = None # for dealing with AMT throttling
@property
def preview_hit_type_url_stem(self):
return self._PREVIEW_HIT_TYPE_URL_STEMS[self._service_type]
def _generate_timestamp(self,gmtime):
#return '2010-06-13T04:04:49Z'
return time.strftime("%Y-%m-%dT%H:%M:%SZ", gmtime)
def _generate_signature(self,service, operation, timestamp, secret_access_key):
# Encoding to "ascii" is to make this work on either Python2 or Python3. On v2.6 it just returns
# the same string. On v3.x, it returns a bytes object.
my_sha_hmac = hmac.new(secret_access_key.encode("ascii"), (service + operation + timestamp).encode("ascii"), sha)
my_b64_hmac_digest = base64_encodestring_py23_compatible(my_sha_hmac.digest()).strip()
my_b64_hmac_digest = my_b64_hmac_digest.decode("ascii")
return my_b64_hmac_digest
# pylint: disable=R0912,R0915
# Tolerate too many branches and too many statements.
def do_request( self, operation, specific_parameters):
'''
Run an AWS request.
This method takes care of signing the request, submitting it to AWS via a REST call,
managing load on AWS, and coping with small server and network errors that
inevitably come up. It can be used when you need to do something differently
than CrowdLib provides.
operation (str) : The AWS operation code (i.e., "CreateHIT")
specific_parameters (dict) : All REST parameters except Service, Version, AWSAccessKeyId, Timestamp, Signature, and Operation.
Returns (DOM object) : AMT's response as a DOM object created by the xml.dom.minidom module
Note: This does not currently deal with disposing of the DOM objects.
'''
_verbose = VERBOSE or is_debugging()
if _verbose:
dbg_before_time = time.time()
msg = " > "+operation
log(msg, should_terminate=False)
self._last_request_time = time.time()
# Parameters for quick retry (in case of throttling or minor network issues)
query_retry_delay = 1.0 # seconds, also subject to exponential backoff (delay **= 1.5 each time)
query_retry_delay_backoff_exponent = 1.5
query_retry_delay_max = 60*5 # Don't stall longer than 15 minutes
query_retry_count = int(60*60*24 / query_retry_delay_max) # Give up after 24 hours.
if DEBUG_LOG_REQUESTS_TO_FILE:
formatted_traceback_if_exception = None
# Start trying. Normally, it will succeed on the first try... we hope. :)
for try_counter in range( query_retry_count ):
try:
result_xml = None
errors_nodes = None
timestamp = self._generate_timestamp(time.gmtime())
signature = self._generate_signature('AWSMechanicalTurkRequester', operation, timestamp, self._aws_account_key)
parameters = {
'Service': self._SERVICE_NAME,
'Version': self._SERVICE_VERSION,
'AWSAccessKeyId': self._aws_account_id,
'Timestamp': timestamp,
'Signature': signature,
'Operation': operation,
}
parameters.update( specific_parameters )
# Make the request
encoded_parameters = urlencode_py23_compatible(parameters)
result_xml = urlopen_py23_compatible(self._url, encoded_parameters).read()
result_dom = xml2dom(result_xml)
errors_nodes = result_dom.getElementsByTagName('Errors')
if errors_nodes:
for errors_node in errors_nodes:
for error_node in errors_node.getElementsByTagName('Error'):
code = error_node.getElementsByTagName('Code')[0].childNodes[0].data
msg = error_node.getElementsByTagName('Message')[0].childNodes[0].data
if code.endswith("AWS.MechanicalTurk.QualificationTypeAlreadyExists"):
raise AMTQualificationTypeAlreadyExists(code, msg, operation, parameters.get("Name",""))
else:
raise AMTRequestFailed( code=code, msg=msg, operation=operation, query_params=specific_parameters )
else:
break
except Exception: # [pylint] blanket exception handler, will re-raise if not handled : pylint:disable=W0703
e = sys.exc_info()[1]
if DEBUG_LOG_REQUESTS_TO_FILE:
formatted_traceback_if_exception = traceback.format_exc().splitlines()
if isinstance(e, AMTRequestFailed):
if not (operation=="ForceExpireHIT" and code.endswith("InvalidHITState")):
pass
if code in ("ServiceUnavailable", "AWS.ServiceUnavailable"):
self._max_requests_per_second = max(0.01, self._max_requests_per_second-0.1)
else:
# Don't retry unknown AMT exceptions.
raise e
elif isinstance(e, IOError):
pass
elif try_counter+1 >= query_retry_count:
raise e
else:
#import traceback
#traceback.print_exc()
raise e
time.sleep( query_retry_delay )
query_retry_delay = query_retry_delay ** query_retry_delay_backoff_exponent
query_retry_delay = min(query_retry_delay, query_retry_delay_max)
if DEBUG_LOG_REQUESTS_TO_FILE:
formatted_traceback_if_exception = None #If we get this far, then it was handled and need not be logged after all.
finally:
if _verbose or DEBUG_LOG_REQUESTS_TO_FILE:
call_stack_strs = get_call_stack_strs(include_only_paths_starting_with="S:/d/")
if DEBUG_LOG_REQUESTS_TO_FILE:
self._write_to_request_log_file(operation, specific_parameters, call_stack_strs, formatted_traceback_if_exception)
if _verbose:
clear_line()
elapsed_time = time.time() - dbg_before_time
msg = " > %s in %.5f seconds"%(operation, elapsed_time)
#msg = msg + "\n".join("\n - %s"%s for s in reversed(call_stack_strs))
log(msg)
return result_dom
def _write_to_request_log_file(self, operation, specific_parameters, call_stack_strs, formatted_traceback_if_exception):
now = now_local()
now_str = now.strftime("%Y%m%d-%H%M%S")
path = os.path.expanduser(DEBUG_LOG_REQUESTS_TO_FILE)
if not os.path.exists(path):
s = "server_requests = []\n"
s += "\n"
else:
s = ""
s += "# " + "_"*80 + "\n"
s += "# " + now_str + "\n"
log_data = {
"timestamp" : now_str,
"service_type" : self._service_type,
"aws_account_id" : self._aws_account_id,
"last_request_time" : self._last_request_time,
"operation" : operation,
"specific_parameters" : specific_parameters,
"call_stack_strs" : call_stack_strs,
}
if formatted_traceback_if_exception:
log_data["traceback"] = formatted_traceback_if_exception
s += "server_requests.append(" + pprint.pformat(log_data) + ")" + "\n"
s += "\n"
with codecs.open(path, "a", "utf8") as log_file:
log_file.write(s)
#vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
# EXPERIEMENTAL CODE (do not delete, yet)
#
# def do_requests_simultaneously( self, operation_specific_parameters_pairs ):
#
# # Only do threading in production mode. Sandbox doesn't like heavy load.
#
# doms = []
# if self._service_type==self.SERVICE_TYPE_PRODUCTION and False:
# try:
# # Python 2
# from Queue import Queue, Empty
# except ImportError:
# # Python 3
# from queue import Queue, Empty
# import threading
# num_worker_threads = 4
#
# # Create a thread-safe queue and populate with the requests.
# inputs = Queue()
# results = Queue()
#
# def worker():
# operation,specific_parameters = inputs.get()
# dom = self.do_request(operation=operation, specific_parameters=specific_parameters)
# results.put(dom)
# q.task_done()
#
# for operation,specific_parameters in operation_specific_parameters_pairs:
# inputs.put((operation,specific_parameters))
#
# for i in range(num_worker_threads):
# t = threading.Thread(target=worker)
# t.daemon = True
# t.start()
#
# inputs.join()
# try:
# while True:
# dom = results.get()
# doms.append(dom)
# except Empty:
# pass
# else:
# for operation,specific_parameters in operation_specific_parameters_pairs:
# dom = self.do_request(operation=operation, specific_parameters=specific_parameters)
# doms.append(dom)
# return doms
#vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
# SHRAPNEL (delete any time if you don't think it will be needed)
#
# ## taken from do_request() .. near top
# # Rate limiting, but only in sandbox
# if self._service_type == self.SERVICE_TYPE_SANDBOX:
# if self._last_request_time is not None:
# seconds_since_last_request = time.time()-self._last_request_time
# required_interval = 1.0 / self._max_requests_per_second
# if seconds_since_last_request < required_interval:
# time.sleep( max( 0.01, required_interval - seconds_since_last_request ) )
#def get_call_stack_strs(clip_most_recent=0, include_only_paths_starting_with=None):
# import inspect, os, sys
# parts = []
# last_part = ""
# ellipsis = "..."
# case_sensitive_paths = (sys.platform != "win32")
# if not case_sensitive_paths:
# include_only_paths_starting_with = include_only_paths_starting_with.lower()
# include_only_paths_starting_with = include_only_paths_starting_with.replace("\\", "/")
#
# for stack_frame in inspect.stack()[clip_most_recent:]:
# frame,code_path,line_num,fn_name,code,code_idx = stack_frame # [pylint] allow unused variables : pylint:disable=W0612
#
# code_abs_path = os.path.abspath(code_path)
# if not case_sensitive_paths:
# code_abs_path = code_abs_path.lower()
# code_abs_path = code_abs_path.replace("\\", "/")
#
# if code_abs_path.startswith(include_only_paths_starting_with):
# code_filename = os.path.basename(code_path)
# part = "%s:%d(%s)"%(code_filename,line_num,fn_name)
# else:
# part = ellipsis
#
# if part != ellipsis or part != last_part:
# parts.append(part)
#
# last_part = part
#
# return tuple(parts)
#
PK ፆHLPk( ( " crowdlib-0.8.50/crowdlib/Answer.py# vim: set fileencoding=utf-8 noexpandtab:
# Created at the University of Maryland, Human-Computer Interaction Lab
# (c) Copyright 2011 Alexander J. Quinn
# Licensed under the MIT License (see doc/LICENSE.txt)
'''
@author: Alexander J. Quinn
@contact: aq@purdue.edu
@since: January 2010
'''
from __future__ import division, with_statement
#TODO: Go back and figure out why AnswerBlank is needed. This may be garbage.
#TODO: Cannonical representation for AnswerUploadedFile
ANSWER_TYPES = ANSWER_TYPE_BLANK,ANSWER_TYPE_FREETEXT,ANSWER_TYPE_SELECTION,ANSWER_TYPE_UPLOADEDFILE \
= ("Blank", "FreeText", "Selection", "UploadedFile")
class Answer( object ):
"Abstract base class for AnswerFreeText, AnswerSelection, AnswerUploadedFile"
def __init__( self, question_id, answer_type, assignment_id, amt):
assert answer_type in ANSWER_TYPES
self._question_id = question_id
self._answer_type = answer_type
self._assignment_id = assignment_id
self._amt = amt # needed for fetching the assignment or downloading file contents in the case of UploadedFile
# [pylint] accessing seemingly private members : pylint:disable=W0212
question_id = property(lambda self: self._question_id)
answer_type = property(lambda self: self._answer_type)
assignment = property(lambda self: self._amt.get_assignment(self._assignment_id))
# [pylint] accessing seemingly private members : pylint:enable=W0212
def __eq__( self, other ):
return type(other)==type(self) and repr(self)==repr(other)
def __ne__( self, other ):
return not self.__eq__(other)
def __str__( self ):
raise NotImplementedError( "cannonical_representation of %s"%repr(self) )
@property
def text(self):
return unicode(self)
@property
def id(self):
return self._assignment_id + "-" + self._question_id
class AnswerBlank( Answer ):
'''
For dealing with Assignment structures where there is no legitimate answer
present.
'''
def __init__( self, question_id, assignment_id, amt ):
Answer.__init__(self, question_id, ANSWER_TYPE_BLANK, assignment_id, amt)
def __repr__(self):
return ANSWER_TYPE_BLANK+repr( ( self.question_id, ) )
def __str__( self ):
return ""
class AnswerFreeText( Answer ):
def __init__( self, question_id, free_text, assignment_id, amt):
Answer.__init__(self, question_id, ANSWER_TYPE_FREETEXT, assignment_id, amt)
self._free_text = free_text
free_text = property(lambda self: self._free_text) # [pylint] accessing seemingly private members : pylint:disable=W0212
def __repr__(self):
return "AnswerFreeText"+repr( ( self.question_id, self.free_text ) )
def __str__( self ):
return self.free_text
class AnswerSelection( Answer ):
def __init__( self, question_id, selection_id, other_selection_text, assignment_id, amt):
Answer.__init__(self, question_id, ANSWER_TYPE_SELECTION, assignment_id, amt)
self._selection_id = selection_id
self._other_selection_text = other_selection_text
# [pylint] accessing seemingly private members : pylint:disable=W0212
selection_id = property(lambda self: self._selection_id)
other_selection_text = property(lambda self: self._other_selection_text)
# [pylint] accessing seemingly private members : pylint:enable=W0212
def __repr__(self):
return "AnswerSelection"+repr( ( self.question_id, self.selection_id, self.other_selection_text ) )
def __str__( self ):
return self.selection_id
class AnswerUploadedFile( Answer ):
def __init__( self, question_id, uploaded_file_key, uploaded_file_size, assignment_id, amt):
Answer.__init__(self, question_id, ANSWER_TYPE_UPLOADEDFILE, assignment_id, amt)
self._uploaded_file_key = uploaded_file_key
self._uploaded_file_size = uploaded_file_size
# [pylint] accessing seemingly private members : pylint:disable=W0212
key = property(lambda self: self._uploaded_file_key)
size = property(lambda self: self._uploaded_file_size)
# [pylint] accessing seemingly private members : pylint:enable=W0212
def save(self, path):
self._amt.save_file_answer_content(assignment_id=self._assignment_id, question_id=self._question_id, path=path)
@property
def content(self):
return self._amt.get_file_answer_content(assignment_id=self._assignment_id, question_id=self._question_id)
def __repr__(self):
return "AnswerUploadedFile"+repr( ( self._question_id, self._uploaded_file_key, self._uploaded_file_size ) )
def __str__(self):
return repr(self)PK ՍHM ( crowdlib-0.8.50/crowdlib/AnswerRecord.py# vim: set fileencoding=utf-8 noexpandtab:
# Created at the University of Maryland, Human-Computer Interaction Lab
# (c) Copyright 2011 Alexander J. Quinn
# Licensed under the MIT License (see doc/LICENSE.txt)
'''
@author: Alexander J. Quinn
@contact: aq@purdue.edu
@since: November 2010
'''
from __future__ import division, with_statement
# INTERNAL USE ONLY - NOT RETURNED TO THE USER OF THE CROWDLIB MODULE
from crowdlib.utility import namedtuple
AnswerRecord = namedtuple("AnswerRecord", (
# "assignment_id",
"question_identifier", "free_text", "selection_identifier", "other_selection",
"uploaded_file_key", "uploaded_file_size_in_bytes"))
PK 2zH! &