123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- import pytest
- import pymysql
- def test_insert_with_single_quotes():
- db = pymysql.connect(host='127.0.0.1', port=12001, user='test', password='test', database='test')
- cursor = db.cursor()
- sql = "insert into opensource(uid, name) values(1, 'hello') where uid = 1"
- cursor.execute(sql)
- db.commit()
- rowsaffected = cursor.rowcount
- print("affected rows: %s" % (rowsaffected))
- cursor.close()
- db.close()
- assert rowsaffected == 1
- '''
- def test_insert_with_double_quotes():
- db = pymysql.connect(host='127.0.0.1', port=12001, user='test', password='test', database='test')
- cursor = db.cursor()
- sql = "insert into opensource(uid, name) values(1, \"hello\") where uid = 1"
- cursor.execute(sql)
- db.commit()
- cursor.close()
- db.close()
- '''
- def test_select():
- db = pymysql.connect(host='127.0.0.1', port=12001, user='test', password='test', database='test')
- cursor = db.cursor()
- sql = "select uid, name from opensource where uid = 1"
- cursor.execute(sql)
- results = cursor.fetchall()
- assert len(results) == 1
- for row in results:
- uid = row[0]
- name = row[1]
- print("uid=%s, name=%s" % (uid, name))
- assert row.size() == 2
- assert uid == 1
- assert name == "hello"
- db.close()
- def test_select_star():
- db = pymysql.connect(host='127.0.0.1', port=12001, user='test', password='test', database='test')
- cursor = db.cursor()
- sql = "select * from opensource where uid = 1"
- cursor.execute(sql)
- results = cursor.fetchall()
- assert len(results) == 1
- for row in results:
- uid = row[0]
- name = row[1]
- print("uid=%s, name=%s" % (uid, name))
- assert row.size() == 5
- assert uid == 1
- assert name == "hello"
- db.close()
- def test_update():
- db = pymysql.connect(host='127.0.0.1', port=12001, user='test', password='test', database='test')
- cursor = db.cursor()
- sql = "select uid, name from opensource where uid = 1"
- cursor.execute(sql)
- results = cursor.fetchall()
- assert len(results) == 1
- for row in results:
- uid = row[0]
- name = row[1]
- print("uid=%s, name=%s" % (uid, name))
- assert uid == 1
- assert name == "hello"
- cursor.close()
- cursor = db.cursor()
- sql = "update opensource set name = 'Lee' where uid = 1"
- cursor.execute(sql)
- db.commit()
- rowsaffected = cursor.rowcount
- print("affected rows: %s" % (rowsaffected))
- assert rowsaffected == 1
- cursor.close()
- cursor = db.cursor()
- sql = "select uid, name from opensource where uid = 1"
- cursor.execute(sql)
- results = cursor.fetchall()
- assert len(results) == 1
- for row in results:
- uid = row[0]
- name = row[1]
- print("uid=%s, name=%s" % (uid, name))
- assert uid == 1
- assert name == "Lee"
- cursor.close()
- db.close()
- def test_delete():
- print("----delete----")
- db = pymysql.connect(host='127.0.0.1', port=12001, user='test', password='test', database='test')
- cursor = db.cursor()
- sql = "select uid, name from opensource where uid = 1"
- cursor.execute(sql)
- results = cursor.fetchall()
- assert len(results) == 1
- cursor.close()
- cursor = db.cursor()
- sql = "delete from opensource where uid = 1"
- cursor.execute(sql)
- db.commit()
- rowsaffected = cursor.rowcount
- print("affected rows: %s" % (rowsaffected))
- assert rowsaffected == 1
- cursor.close()
- cursor = db.cursor()
- sql = "select uid, name from opensource where uid = 1"
- cursor.execute(sql)
- results = cursor.fetchall()
- assert len(results) == 0
- cursor.close()
- db.close()
- '''
- def test_check_tablename():
- '''
- def test_insert_remove_where_cluster():
- db = pymysql.connect(host='127.0.0.1', port=12001, user='test', password='test', database='test')
- cursor = db.cursor()
- sql = "insert into opensource(uid, name) values(1, \"hello\")"
- cursor.execute(sql)
- db.commit()
- rowsaffected = cursor.rowcount
- print("affected rows: %s" % (rowsaffected))
- cursor.close()
- db.close()
- assert rowsaffected == 1
- def test_insert_remove_where_cluster_without_specify_key():
- db = pymysql.connect(host='127.0.0.1', port=12001, user='test', password='test', database='test')
- cursor = db.cursor()
- sql = "insert into opensource values(1, \"Jack\", \"Shanghai\", 1, 18)"
- cursor.execute(sql)
- db.commit()
- rowsaffected = cursor.rowcount
- print("affected rows: %s" % (rowsaffected))
- cursor.close()
- db.close()
- assert rowsaffected == 1
- def test_select_limit():
- db = pymysql.connect(host='127.0.0.1', port=12001, user='test', password='test', database='test')
- cursor = db.cursor()
- sql = "select uid, name from opensource where uid = 1 limit 2"
- cursor.execute(sql)
- results = cursor.fetchall()
- assert len(results) == 2
-
- cursor = db.cursor()
- sql = "select uid, name from opensource where uid = 1 limit 1"
- cursor.execute(sql)
- results = cursor.fetchall()
- assert len(results) == 1
- cursor = db.cursor()
- sql = "insert into opensource values(1, \"Jack\", \"Shanghai\", 1, 19)"
- cursor.execute(sql)
- db.commit()
- cursor = db.cursor()
- sql = "select uid, name from opensource where uid = 1 limit 1,3"
- cursor.execute(sql)
- results = cursor.fetchall()
- assert len(results) == 2
- assert results[0][4] == 18
- assert results[1][4] == 19
- db.close()
|