• 使用python生成大量数据写入es数据库并查询操作2


    模拟学生个人信息写入es数据库,包括姓名、性别、年龄、特点、科目、成绩,创建时间。

    方案一:

    在写入数据时未提前创建索引mapping,而是每插入一条数据都包含了索引的信息。

    示例代码:【多线程写入数据】【一次性写入10000*1000条数据】  【本人亲测耗时3266秒】

    1. from elasticsearch import Elasticsearch
    2. from elasticsearch import helpers
    3. from datetime import datetime
    4. from queue import Queue
    5. import random
    6. import time
    7. import threading
    8. es = Elasticsearch(hosts='http://127.0.0.1:9200')
    9. # print(es)
    10. names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
    11. sexs = ['男', '女']
    12. age = [25, 28, 29, 32, 31, 26, 27, 30]
    13. character = ['自信但不自负,不以自我为中心',
    14. '努力、积极、乐观、拼搏是我的人生信条',
    15. '抗压能力强,能够快速适应周围环境',
    16. '敢做敢拼,脚踏实地;做事认真负责,责任心强',
    17. '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',
    18. '主动性强,自学能力强,具有团队合作意识,有一定组织能力',
    19. '忠实诚信,讲原则,说到做到,决不推卸责任',
    20. '有自制力,做事情始终坚持有始有终,从不半途而废',
    21. '肯学习,有问题不逃避,愿意虚心向他人学习',
    22. '愿意以谦虚态度赞扬接纳优越者,权威者',
    23. '会用100%的热情和精力投入到工作中;平易近人',
    24. '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',
    25. '有较强的团队精神,工作积极进取,态度认真']
    26. subjects = ['语文', '数学', '英语', '生物', '地理']
    27. grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
    28. create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    29. def save_to_es(num):
    30. """
    31. 批量写入数据到es数据库
    32. :param num:
    33. :return:
    34. """
    35. start = time.time()
    36. action = [
    37. {
    38. "_index": "personal_info_10000000",
    39. "_type": "doc",
    40. "_id": i,
    41. "_source": {
    42. "id": i,
    43. "name": random.choice(names),
    44. "sex": random.choice(sexs),
    45. "age": random.choice(age),
    46. "character": random.choice(character),
    47. "subject": random.choice(subjects),
    48. "grade": random.choice(grades),
    49. "create_time": create_time
    50. }
    51. } for i in range(10000 * num, 10000 * num + 10000)
    52. ]
    53. helpers.bulk(es, action)
    54. end = time.time()
    55. print(f"{num}耗时{end - start}s!")
    56. def run():
    57. global queue
    58. while queue.qsize() > 0:
    59. num = queue.get()
    60. print(num)
    61. save_to_es(num)
    62. if __name__ == '__main__':
    63. start = time.time()
    64. queue = Queue()
    65. # 序号数据进队列
    66. for num in range(1000):
    67. queue.put(num)
    68. # 多线程执行程序
    69. consumer_lst = []
    70. for _ in range(10):
    71. thread = threading.Thread(target=run)
    72. thread.start()
    73. consumer_lst.append(thread)
    74. for consumer in consumer_lst:
    75. consumer.join()
    76. end = time.time()
    77. print('程序执行完毕!花费时间:', end - start)

    运行结果:

     自动创建的索引mapping:

    1. GET personal_info_10000000/_mapping
    2. {
    3. "personal_info_10000000" : {
    4. "mappings" : {
    5. "properties" : {
    6. "age" : {
    7. "type" : "long"
    8. },
    9. "character" : {
    10. "type" : "text",
    11. "fields" : {
    12. "keyword" : {
    13. "type" : "keyword",
    14. "ignore_above" : 256
    15. }
    16. }
    17. },
    18. "create_time" : {
    19. "type" : "text",
    20. "fields" : {
    21. "keyword" : {
    22. "type" : "keyword",
    23. "ignore_above" : 256
    24. }
    25. }
    26. },
    27. "grade" : {
    28. "type" : "long"
    29. },
    30. "id" : {
    31. "type" : "long"
    32. },
    33. "name" : {
    34. "type" : "text",
    35. "fields" : {
    36. "keyword" : {
    37. "type" : "keyword",
    38. "ignore_above" : 256
    39. }
    40. }
    41. },
    42. "sex" : {
    43. "type" : "text",
    44. "fields" : {
    45. "keyword" : {
    46. "type" : "keyword",
    47. "ignore_above" : 256
    48. }
    49. }
    50. },
    51. "subject" : {
    52. "type" : "text",
    53. "fields" : {
    54. "keyword" : {
    55. "type" : "keyword",
    56. "ignore_above" : 256
    57. }
    58. }
    59. }
    60. }
    61. }
    62. }
    63. }

    方案二:

    1.顺序插入5000000条数据

    先创建索引personal_info_5000000,确定好mapping后,再插入数据。

    新建索引并设置mapping信息:

    1. PUT personal_info_5000000
    2. {
    3. "settings": {
    4. "number_of_shards": 3,
    5. "number_of_replicas": 1
    6. },
    7. "mappings": {
    8. "properties": {
    9. "id": {
    10. "type": "long"
    11. },
    12. "name": {
    13. "type": "text",
    14. "fields": {
    15. "keyword": {
    16. "type": "keyword",
    17. "ignore_above": 32
    18. }
    19. }
    20. },
    21. "sex": {
    22. "type": "text",
    23. "fields": {
    24. "keyword": {
    25. "type": "keyword",
    26. "ignore_above": 8
    27. }
    28. }
    29. },
    30. "age": {
    31. "type": "long"
    32. },
    33. "character": {
    34. "type": "text",
    35. "analyzer": "ik_smart",
    36. "fields": {
    37. "keyword": {
    38. "type": "keyword",
    39. "ignore_above": 256
    40. }
    41. }
    42. },
    43. "subject": {
    44. "type": "text",
    45. "fields": {
    46. "keyword": {
    47. "type": "keyword",
    48. "ignore_above": 256
    49. }
    50. }
    51. },
    52. "grade": {
    53. "type": "long"
    54. },
    55. "create_time": {
    56. "type": "date",
    57. "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
    58. }
    59. }
    60. }
    61. }

    查看新建索引信息:

    1. GET personal_info_5000000
    2. {
    3. "personal_info_5000000" : {
    4. "aliases" : { },
    5. "mappings" : {
    6. "properties" : {
    7. "age" : {
    8. "type" : "long"
    9. },
    10. "character" : {
    11. "type" : "text",
    12. "fields" : {
    13. "keyword" : {
    14. "type" : "keyword",
    15. "ignore_above" : 256
    16. }
    17. },
    18. "analyzer" : "ik_smart"
    19. },
    20. "create_time" : {
    21. "type" : "date",
    22. "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
    23. },
    24. "grade" : {
    25. "type" : "long"
    26. },
    27. "id" : {
    28. "type" : "long"
    29. },
    30. "name" : {
    31. "type" : "text",
    32. "fields" : {
    33. "keyword" : {
    34. "type" : "keyword",
    35. "ignore_above" : 32
    36. }
    37. }
    38. },
    39. "sex" : {
    40. "type" : "text",
    41. "fields" : {
    42. "keyword" : {
    43. "type" : "keyword",
    44. "ignore_above" : 8
    45. }
    46. }
    47. },
    48. "subject" : {
    49. "type" : "text",
    50. "fields" : {
    51. "keyword" : {
    52. "type" : "keyword",
    53. "ignore_above" : 256
    54. }
    55. }
    56. }
    57. }
    58. },
    59. "settings" : {
    60. "index" : {
    61. "routing" : {
    62. "allocation" : {
    63. "include" : {
    64. "_tier_preference" : "data_content"
    65. }
    66. }
    67. },
    68. "number_of_shards" : "3",
    69. "provided_name" : "personal_info_50000000",
    70. "creation_date" : "1663471072176",
    71. "number_of_replicas" : "1",
    72. "uuid" : "5DfmfUhUTJeGk1k4XnN-lQ",
    73. "version" : {
    74. "created" : "7170699"
    75. }
    76. }
    77. }
    78. }
    79. }

    开始插入数据:

    示例代码: 【单线程写入数据】【一次性写入10000*500条数据】  【本人亲测耗时7916秒】

    1. from elasticsearch import Elasticsearch
    2. from datetime import datetime
    3. from queue import Queue
    4. import random
    5. import time
    6. import threading
    7. es = Elasticsearch(hosts='http://127.0.0.1:9200')
    8. # print(es)
    9. names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
    10. sexs = ['男', '女']
    11. age = [25, 28, 29, 32, 31, 26, 27, 30]
    12. character = ['自信但不自负,不以自我为中心',
    13. '努力、积极、乐观、拼搏是我的人生信条',
    14. '抗压能力强,能够快速适应周围环境',
    15. '敢做敢拼,脚踏实地;做事认真负责,责任心强',
    16. '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',
    17. '主动性强,自学能力强,具有团队合作意识,有一定组织能力',
    18. '忠实诚信,讲原则,说到做到,决不推卸责任',
    19. '有自制力,做事情始终坚持有始有终,从不半途而废',
    20. '肯学习,有问题不逃避,愿意虚心向他人学习',
    21. '愿意以谦虚态度赞扬接纳优越者,权威者',
    22. '会用100%的热情和精力投入到工作中;平易近人',
    23. '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',
    24. '有较强的团队精神,工作积极进取,态度认真']
    25. subjects = ['语文', '数学', '英语', '生物', '地理']
    26. grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
    27. create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    28. # 添加程序耗时的功能
    29. def timer(func):
    30. def wrapper(*args, **kwargs):
    31. start = time.time()
    32. res = func(*args, **kwargs)
    33. end = time.time()
    34. print('id{}共耗时约 {:.2f} 秒'.format(*args, end - start))
    35. return res
    36. return wrapper
    37. @timer
    38. def save_to_es(num):
    39. """
    40. 顺序写入数据到es数据库
    41. :param num:
    42. :return:
    43. """
    44. body = {
    45. "id": num,
    46. "name": random.choice(names),
    47. "sex": random.choice(sexs),
    48. "age": random.choice(age),
    49. "character": random.choice(character),
    50. "subject": random.choice(subjects),
    51. "grade": random.choice(grades),
    52. "create_time": create_time
    53. }
    54. # 此时若索引不存在时会新建
    55. es.index(index="personal_info_5000000", id=num, doc_type="_doc", document=body)
    56. def run():
    57. global queue
    58. while queue.qsize() > 0:
    59. num = queue.get()
    60. print(num)
    61. save_to_es(num)
    62. if __name__ == '__main__':
    63. start = time.time()
    64. queue = Queue()
    65. # 序号数据进队列
    66. for num in range(5000000):
    67. queue.put(num)
    68. # 多线程执行程序
    69. consumer_lst = []
    70. for _ in range(10):
    71. thread = threading.Thread(target=run)
    72. thread.start()
    73. consumer_lst.append(thread)
    74. for consumer in consumer_lst:
    75. consumer.join()
    76. end = time.time()
    77. print('程序执行完毕!花费时间:', end - start)

    运行结果:

    2.批量插入5000000条数据

    先创建索引personal_info_5000000_v2,确定好mapping后,再插入数据。

    新建索引并设置mapping信息:

    1. PUT personal_info_5000000_v2
    2. {
    3. "settings": {
    4. "number_of_shards": 3,
    5. "number_of_replicas": 1
    6. },
    7. "mappings": {
    8. "properties": {
    9. "id": {
    10. "type": "long"
    11. },
    12. "name": {
    13. "type": "text",
    14. "fields": {
    15. "keyword": {
    16. "type": "keyword",
    17. "ignore_above": 32
    18. }
    19. }
    20. },
    21. "sex": {
    22. "type": "text",
    23. "fields": {
    24. "keyword": {
    25. "type": "keyword",
    26. "ignore_above": 8
    27. }
    28. }
    29. },
    30. "age": {
    31. "type": "long"
    32. },
    33. "character": {
    34. "type": "text",
    35. "analyzer": "ik_smart",
    36. "fields": {
    37. "keyword": {
    38. "type": "keyword",
    39. "ignore_above": 256
    40. }
    41. }
    42. },
    43. "subject": {
    44. "type": "text",
    45. "fields": {
    46. "keyword": {
    47. "type": "keyword",
    48. "ignore_above": 256
    49. }
    50. }
    51. },
    52. "grade": {
    53. "type": "long"
    54. },
    55. "create_time": {
    56. "type": "date",
    57. "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
    58. }
    59. }
    60. }
    61. }

    查看新建索引信息:

    1. GET personal_info_5000000_v2
    2. {
    3. "personal_info_5000000_v2" : {
    4. "aliases" : { },
    5. "mappings" : {
    6. "properties" : {
    7. "age" : {
    8. "type" : "long"
    9. },
    10. "character" : {
    11. "type" : "text",
    12. "fields" : {
    13. "keyword" : {
    14. "type" : "keyword",
    15. "ignore_above" : 256
    16. }
    17. },
    18. "analyzer" : "ik_smart"
    19. },
    20. "create_time" : {
    21. "type" : "date",
    22. "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
    23. },
    24. "grade" : {
    25. "type" : "long"
    26. },
    27. "id" : {
    28. "type" : "long"
    29. },
    30. "name" : {
    31. "type" : "text",
    32. "fields" : {
    33. "keyword" : {
    34. "type" : "keyword",
    35. "ignore_above" : 32
    36. }
    37. }
    38. },
    39. "sex" : {
    40. "type" : "text",
    41. "fields" : {
    42. "keyword" : {
    43. "type" : "keyword",
    44. "ignore_above" : 8
    45. }
    46. }
    47. },
    48. "subject" : {
    49. "type" : "text",
    50. "fields" : {
    51. "keyword" : {
    52. "type" : "keyword",
    53. "ignore_above" : 256
    54. }
    55. }
    56. }
    57. }
    58. },
    59. "settings" : {
    60. "index" : {
    61. "routing" : {
    62. "allocation" : {
    63. "include" : {
    64. "_tier_preference" : "data_content"
    65. }
    66. }
    67. },
    68. "number_of_shards" : "3",
    69. "provided_name" : "personal_info_5000000_v2",
    70. "creation_date" : "1663485323617",
    71. "number_of_replicas" : "1",
    72. "uuid" : "XBPaDn_gREmAoJmdRyBMAA",
    73. "version" : {
    74. "created" : "7170699"
    75. }
    76. }
    77. }
    78. }
    79. }

    批量插入数据:

            通过elasticsearch模块导入helper,通过helper.bulk来批量处理大量的数据。首先将所有的数据定义成字典形式,各字段含义如下:

    • _index对应索引名称,并且该索引必须存在。
    • _type对应类型名称。
    • _source对应的字典内,每一篇文档的字段和值,可有有多个字段。

    示例代码:  【程序中途异常,写入4714000条数据】

    1. from elasticsearch import Elasticsearch
    2. from elasticsearch import helpers
    3. from datetime import datetime
    4. from queue import Queue
    5. import random
    6. import time
    7. import threading
    8. es = Elasticsearch(hosts='http://127.0.0.1:9200')
    9. # print(es)
    10. names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
    11. sexs = ['男', '女']
    12. age = [25, 28, 29, 32, 31, 26, 27, 30]
    13. character = ['自信但不自负,不以自我为中心',
    14. '努力、积极、乐观、拼搏是我的人生信条',
    15. '抗压能力强,能够快速适应周围环境',
    16. '敢做敢拼,脚踏实地;做事认真负责,责任心强',
    17. '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',
    18. '主动性强,自学能力强,具有团队合作意识,有一定组织能力',
    19. '忠实诚信,讲原则,说到做到,决不推卸责任',
    20. '有自制力,做事情始终坚持有始有终,从不半途而废',
    21. '肯学习,有问题不逃避,愿意虚心向他人学习',
    22. '愿意以谦虚态度赞扬接纳优越者,权威者',
    23. '会用100%的热情和精力投入到工作中;平易近人',
    24. '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',
    25. '有较强的团队精神,工作积极进取,态度认真']
    26. subjects = ['语文', '数学', '英语', '生物', '地理']
    27. grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
    28. create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    29. # 添加程序耗时的功能
    30. def timer(func):
    31. def wrapper(*args, **kwargs):
    32. start = time.time()
    33. res = func(*args, **kwargs)
    34. end = time.time()
    35. print('id{}共耗时约 {:.2f} 秒'.format(*args, end - start))
    36. return res
    37. return wrapper
    38. @timer
    39. def save_to_es(num):
    40. """
    41. 批量写入数据到es数据库
    42. :param num:
    43. :return:
    44. """
    45. action = [
    46. {
    47. "_index": "personal_info_5000000_v2",
    48. "_type": "_doc",
    49. "_id": i,
    50. "_source": {
    51. "id": i,
    52. "name": random.choice(names),
    53. "sex": random.choice(sexs),
    54. "age": random.choice(age),
    55. "character": random.choice(character),
    56. "subject": random.choice(subjects),
    57. "grade": random.choice(grades),
    58. "create_time": create_time
    59. }
    60. } for i in range(10000 * num, 10000 * num + 10000)
    61. ]
    62. helpers.bulk(es, action)
    63. def run():
    64. global queue
    65. while queue.qsize() > 0:
    66. num = queue.get()
    67. print(num)
    68. save_to_es(num)
    69. if __name__ == '__main__':
    70. start = time.time()
    71. queue = Queue()
    72. # 序号数据进队列
    73. for num in range(500):
    74. queue.put(num)
    75. # 多线程执行程序
    76. consumer_lst = []
    77. for _ in range(10):
    78. thread = threading.Thread(target=run)
    79. thread.start()
    80. consumer_lst.append(thread)
    81. for consumer in consumer_lst:
    82. consumer.join()
    83. end = time.time()
    84. print('程序执行完毕!花费时间:', end - start)

    运行结果:

    3.批量插入50000000条数据

    先创建索引personal_info_5000000_v2,确定好mapping后,再插入数据。

    此过程是在上面批量插入的前提下进行优化,采用python生成器。

    建立索引和mapping同上,直接上代码:

    示例代码: 【程序中途异常,写入3688000条数据】

    1. from elasticsearch import Elasticsearch
    2. from elasticsearch import helpers
    3. from datetime import datetime
    4. from queue import Queue
    5. import random
    6. import time
    7. import threading
    8. es = Elasticsearch(hosts='http://127.0.0.1:9200')
    9. # print(es)
    10. names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
    11. sexs = ['男', '女']
    12. age = [25, 28, 29, 32, 31, 26, 27, 30]
    13. character = ['自信但不自负,不以自我为中心',
    14. '努力、积极、乐观、拼搏是我的人生信条',
    15. '抗压能力强,能够快速适应周围环境',
    16. '敢做敢拼,脚踏实地;做事认真负责,责任心强',
    17. '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',
    18. '主动性强,自学能力强,具有团队合作意识,有一定组织能力',
    19. '忠实诚信,讲原则,说到做到,决不推卸责任',
    20. '有自制力,做事情始终坚持有始有终,从不半途而废',
    21. '肯学习,有问题不逃避,愿意虚心向他人学习',
    22. '愿意以谦虚态度赞扬接纳优越者,权威者',
    23. '会用100%的热情和精力投入到工作中;平易近人',
    24. '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',
    25. '有较强的团队精神,工作积极进取,态度认真']
    26. subjects = ['语文', '数学', '英语', '生物', '地理']
    27. grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
    28. create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    29. # 添加程序耗时的功能
    30. def timer(func):
    31. def wrapper(*args, **kwargs):
    32. start = time.time()
    33. res = func(*args, **kwargs)
    34. end = time.time()
    35. print('id{}共耗时约 {:.2f} 秒'.format(*args, end - start))
    36. return res
    37. return wrapper
    38. @timer
    39. def save_to_es(num):
    40. """
    41. 使用生成器批量写入数据到es数据库
    42. :param num:
    43. :return:
    44. """
    45. action = (
    46. {
    47. "_index": "personal_info_5000000_v3",
    48. "_type": "_doc",
    49. "_id": i,
    50. "_source": {
    51. "id": i,
    52. "name": random.choice(names),
    53. "sex": random.choice(sexs),
    54. "age": random.choice(age),
    55. "character": random.choice(character),
    56. "subject": random.choice(subjects),
    57. "grade": random.choice(grades),
    58. "create_time": create_time
    59. }
    60. } for i in range(10000 * num, 10000 * num + 10000)
    61. )
    62. helpers.bulk(es, action)
    63. def run():
    64. global queue
    65. while queue.qsize() > 0:
    66. num = queue.get()
    67. print(num)
    68. save_to_es(num)
    69. if __name__ == '__main__':
    70. start = time.time()
    71. queue = Queue()
    72. # 序号数据进队列
    73. for num in range(500):
    74. queue.put(num)
    75. # 多线程执行程序
    76. consumer_lst = []
    77. for _ in range(10):
    78. thread = threading.Thread(target=run)
    79. thread.start()
    80. consumer_lst.append(thread)
    81. for consumer in consumer_lst:
    82. consumer.join()
    83. end = time.time()
    84. print('程序执行完毕!花费时间:', end - start)

    运行结果:

  • 相关阅读:
    《神经网络与深度学习》算法伪代码汇总
    微服务的发展历程的详细说明及每个阶段主流的架构和组件
    React Refs 使用场景及核心要点
    【字符串】分割字符串的最大得分
    408计算机组成原理需要背的部分
    这里有一份超实用Excel快捷键合集(常用+八大类汇总)
    fpga bitstream userid
    SpringBoot + Docker 实现一次构建到处运行
    pytorch中的view与reshape
    【C语言】堆排序详解
  • 原文地址:https://blog.csdn.net/weixin_44799217/article/details/126911481